You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/08/15 15:04:32 UTC
[1/3] incubator-gearpump git commit: [GEARPUMP-188] use
java.time.Instant for Task start time
Repository: incubator-gearpump
Updated Branches:
refs/heads/master 6d919ec97 -> 23daf0cf9
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Task.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Task.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Task.java
index 2efce45..89018a1 100644
--- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Task.java
+++ b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Task.java
@@ -21,9 +21,10 @@ package org.apache.gearpump.streaming.javaapi;
import akka.actor.ActorRef;
import org.apache.gearpump.Message;
import org.apache.gearpump.cluster.UserConfig;
-import org.apache.gearpump.streaming.task.StartTime;
import org.apache.gearpump.streaming.task.TaskContext;
+import java.time.Instant;
+
/**
* Java version of Task.
*
@@ -45,7 +46,7 @@ public class Task extends org.apache.gearpump.streaming.task.Task {
}
@Override
- public void onStart(StartTime startTime) {
+ public void onStart(Instant startTime) {
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
index 5027500..b6c087e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
@@ -18,6 +18,8 @@
package org.apache.gearpump.streaming.dsl
+import java.time.Instant
+
import akka.actor.ActorSystem
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.cluster.client.ClientContext
@@ -27,7 +29,7 @@ import org.apache.gearpump.streaming.dsl.plan.Planner
import org.apache.gearpump.streaming.source.DataSource
import org.apache.gearpump.streaming.task.{Task, TaskContext}
import org.apache.gearpump.util.Graph
-import org.apache.gearpump.{Message, TimeStamp}
+import org.apache.gearpump.Message
import scala.language.implicitConversions
@@ -69,36 +71,36 @@ object StreamApp {
}
implicit def streamAppToApplication(streamApp: StreamApp): StreamApplication = {
- streamApp.plan
+ streamApp.plan()
}
implicit class Source(app: StreamApp) extends java.io.Serializable {
- def source[T](dataSource: DataSource, parallism: Int): Stream[T] = {
- source(dataSource, parallism, UserConfig.empty)
+ def source[T](dataSource: DataSource, parallelism: Int): Stream[T] = {
+ source(dataSource, parallelism, UserConfig.empty)
}
- def source[T](dataSource: DataSource, parallism: Int, description: String): Stream[T] = {
- source(dataSource, parallism, UserConfig.empty, description)
+ def source[T](dataSource: DataSource, parallelism: Int, description: String): Stream[T] = {
+ source(dataSource, parallelism, UserConfig.empty, description)
}
- def source[T](dataSource: DataSource, parallism: Int, conf: UserConfig): Stream[T] = {
- source(dataSource, parallism, conf, description = null)
+ def source[T](dataSource: DataSource, parallelism: Int, conf: UserConfig): Stream[T] = {
+ source(dataSource, parallelism, conf, description = null)
}
- def source[T](dataSource: DataSource, parallism: Int, conf: UserConfig, description: String)
+ def source[T](dataSource: DataSource, parallelism: Int, conf: UserConfig, description: String)
: Stream[T] = {
- implicit val sourceOp = DataSourceOp(dataSource, parallism, conf, description)
+ implicit val sourceOp = DataSourceOp(dataSource, parallelism, conf, description)
app.graph.addVertex(sourceOp)
new Stream[T](app.graph, sourceOp)
}
- def source[T](seq: Seq[T], parallism: Int, description: String): Stream[T] = {
- this.source(new CollectionDataSource[T](seq), parallism, UserConfig.empty, description)
+ def source[T](seq: Seq[T], parallelism: Int, description: String): Stream[T] = {
+ this.source(new CollectionDataSource[T](seq), parallelism, UserConfig.empty, description)
}
- def source[T](source: Class[_ <: Task], parallism: Int, conf: UserConfig, description: String)
+ def source[T](source: Class[_ <: Task], parallelism: Int, conf: UserConfig, description: String)
: Stream[T] = {
- val sourceOp = ProcessorOp(source, parallism, conf, Option(description).getOrElse("source"))
+ val sourceOp = ProcessorOp(source, parallelism, conf, Option(description).getOrElse("source"))
app.graph.addVertex(sourceOp)
new Stream[T](app.graph, sourceOp)
}
@@ -119,5 +121,5 @@ class CollectionDataSource[T](seq: Seq[T]) extends DataSource {
override def close(): Unit = {}
- override def open(context: TaskContext, startTime: TimeStamp): Unit = {}
+ override def open(context: TaskContext, startTime: Instant): Unit = {}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
index 56d31db..6bd0da2 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
@@ -18,11 +18,11 @@
package org.apache.gearpump.streaming.dsl.plan
-import scala.collection.TraversableOnce
+import java.time.Instant
+import scala.collection.TraversableOnce
import akka.actor.ActorSystem
import org.slf4j.Logger
-
import org.apache.gearpump._
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.Constants._
@@ -32,7 +32,7 @@ import org.apache.gearpump.streaming.dsl.op._
import org.apache.gearpump.streaming.dsl.plan.OpTranslator._
import org.apache.gearpump.streaming.sink.DataSink
import org.apache.gearpump.streaming.source.DataSource
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
import org.apache.gearpump.util.LogUtil
/**
@@ -116,7 +116,7 @@ object OpTranslator {
class DummyInputFunction[T] extends SingleInputFunction[T, T] {
override def andThen[OUTER](other: SingleInputFunction[T, OUTER])
- : SingleInputFunction[T, OUTER] = {
+ : SingleInputFunction[T, OUTER] = {
other
}
@@ -131,13 +131,13 @@ object OpTranslator {
extends SingleInputFunction[IN, OUT] {
override def process(value: IN): TraversableOnce[OUT] = {
- first.process(value).flatMap(second.process(_))
+ first.process(value).flatMap(second.process)
}
override def description: String = {
Option(first.description).flatMap { description =>
Option(second.description).map(description + "." + _)
- }.getOrElse(null)
+ }.orNull
}
}
@@ -182,9 +182,6 @@ object OpTranslator {
private var groups = Map.empty[GROUP, SingleInputFunction[IN, OUT]]
- override def onStart(startTime: StartTime): Unit = {
- }
-
override def onNext(msg: Message): Unit = {
val time = msg.timestamp
@@ -216,8 +213,8 @@ object OpTranslator {
taskContext, userConf)
}
- override def onStart(startTime: StartTime): Unit = {
- source.open(taskContext, startTime.startTime)
+ override def onStart(startTime: Instant): Unit = {
+ source.open(taskContext, startTime)
self ! Message("start", System.currentTimeMillis())
}
@@ -256,9 +253,6 @@ object OpTranslator {
GEARPUMP_STREAMING_OPERATOR)(taskContext.system), taskContext, userConf)
}
- override def onStart(startTime: StartTime): Unit = {
- }
-
override def onNext(msg: Message): Unit = {
val time = msg.timestamp
@@ -281,7 +275,7 @@ object OpTranslator {
taskContext, userConf)
}
- override def onStart(startTime: StartTime): Unit = {
+ override def onStart(startTime: Instant): Unit = {
dataSink.open(taskContext)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala
index f8bc0ab..0db44f2 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala
@@ -18,9 +18,11 @@
package org.apache.gearpump.streaming.sink
+import java.time.Instant
+
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
object DataSinkTask {
val DATA_SINK = "data_sink"
@@ -32,11 +34,12 @@ object DataSinkTask {
class DataSinkTask private[sink](context: TaskContext, conf: UserConfig, sink: DataSink)
extends Task(context, conf) {
+
def this(context: TaskContext, conf: UserConfig) = {
this(context, conf, conf.getValue[DataSink](DataSinkTask.DATA_SINK)(context.system).get)
}
- override def onStart(startTime: StartTime): Unit = {
+ override def onStart(startTime: Instant): Unit = {
LOG.info("opening data sink...")
sink.open(context)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala
index 0fb6db4..f55d102 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala
@@ -18,11 +18,11 @@
package org.apache.gearpump.streaming.source
+import java.time.Instant
+
import org.apache.gearpump.streaming.task.TaskContext
import org.apache.gearpump.Message
-import scala.util.Random
-
/**
* Interface to implement custom source where data is read into the system.
* a DataSource could be a message queue like kafka or simply data generation source.
@@ -52,7 +52,7 @@ trait DataSource extends java.io.Serializable {
* @param context is the task context at runtime
* @param startTime is the start time of system
*/
- def open(context: TaskContext, startTime: Long): Unit
+ def open(context: TaskContext, startTime: Instant): Unit
/**
* Reads next message from data source and
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
index f845628..468ae3b 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
@@ -18,9 +18,11 @@
package org.apache.gearpump.streaming.source
+import java.time.Instant
+
import org.apache.gearpump._
import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
object DataSourceTask {
val DATA_SOURCE = "data_source"
@@ -45,10 +47,8 @@ class DataSourceTask private[source](context: TaskContext, conf: UserConfig, sou
this(context, conf, conf.getValue[DataSource](DataSourceTask.DATA_SOURCE)(context.system).get)
}
private val batchSize = conf.getInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE).getOrElse(1000)
- private var startTime = 0L
- override def onStart(newStartTime: StartTime): Unit = {
- startTime = newStartTime.startTime
+ override def onStart(startTime: Instant): Unit = {
LOG.info(s"opening data source at $startTime")
source.open(context, startTime)
self ! Message("start", System.currentTimeMillis())
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
index c7b503e..aceff4a 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
@@ -18,12 +18,13 @@
package org.apache.gearpump.streaming.state.api
+import java.time.Instant
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.FiniteDuration
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.state.impl.{CheckpointManager, PersistentStateConfig}
-import org.apache.gearpump.streaming.task.{UpdateCheckpointClock, StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{UpdateCheckpointClock, Task, TaskContext}
import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
import org.apache.gearpump.util.LogUtil
import org.apache.gearpump.{Message, TimeStamp}
@@ -70,8 +71,8 @@ abstract class PersistentTask[T](taskContext: TaskContext, conf: UserConfig)
/** Persistent state that will be stored (by checkpointing) automatically to storage like HDFS */
val state = persistentState
- final override def onStart(startTime: StartTime): Unit = {
- val timestamp = startTime.startTime
+ final override def onStart(startTime: Instant): Unit = {
+ val timestamp = startTime.toEpochMilli
checkpointManager
.recover(timestamp)
.foreach(state.recover(timestamp, _))
@@ -101,6 +102,7 @@ abstract class PersistentTask[T](taskContext: TaskContext, conf: UserConfig)
}
}
+
final override def onStop(): Unit = {
checkpointManager.close()
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/task/StartTime.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/StartTime.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/StartTime.scala
deleted file mode 100644
index fb097d3..0000000
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/StartTime.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.task
-
-import org.apache.gearpump.TimeStamp
-
-/** Start time of streaming application. All message older than start time will be dropped */
-case class StartTime(startTime: TimeStamp = 0)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
index 9c76a40..c94dec4 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
@@ -18,6 +18,8 @@
package org.apache.gearpump.streaming.task
+import java.time.Instant
+
import scala.concurrent.duration.FiniteDuration
import akka.actor.Actor.Receive
@@ -133,7 +135,7 @@ trait TaskInterface {
* replay the data source, or from when a processor task should recover its
* checkpoint data in to in-memory state.
*/
- def onStart(startTime: StartTime): Unit
+ def onStart(startTime: Instant): Unit
/**
* Method called for each message received.
@@ -176,7 +178,7 @@ abstract class Task(taskContext: TaskContext, userConf: UserConfig) extends Task
*/
protected def sender: ActorRef = taskContext.sender
- def onStart(startTime: StartTime): Unit = {}
+ def onStart(startTime: Instant): Unit = {}
def onNext(msg: Message): Unit = {}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
index d12aac1..30a24fa 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
@@ -18,12 +18,12 @@
package org.apache.gearpump.streaming.task
+import java.time.Instant
import java.util
import java.util.concurrent.TimeUnit
import akka.actor._
import org.slf4j.Logger
-
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.gs.collections.impl.map.mutable.primitive.IntShortHashMap
import org.apache.gearpump.metrics.Metrics
@@ -101,7 +101,7 @@ class TaskActor(
task.setTaskActor(this)
- def onStart(startTime: StartTime): Unit = {
+ def onStart(startTime: Instant): Unit = {
task.onStart(startTime)
}
@@ -111,6 +111,7 @@ class TaskActor(
def onStop(): Unit = task.onStop()
+
/**
* output to a downstream by specifying a arrayIndex
* @param arrayIndex this is not same as ProcessorId
@@ -193,7 +194,7 @@ class TaskActor(
// Put this as the last step so that the subscription is already initialized.
// Message sending in current Task before onStart will not be delivered to
// target
- onStart(new StartTime(upstreamMinClock))
+ onStart(Instant.ofEpochMilli(upstreamMinClock))
appMaster ! GetUpstreamMinClock(taskId)
context.become(handleMessages(sender))
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
index cd33f7e..31c991e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
@@ -18,15 +18,15 @@
package org.apache.gearpump.streaming.task
-import scala.concurrent.duration.FiniteDuration
+import java.time.Instant
+import scala.concurrent.duration.FiniteDuration
import akka.actor.Actor._
import akka.actor.{ActorRef, ActorSystem, Cancellable, Props}
import org.slf4j.Logger
-
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.util.LogUtil
-import org.apache.gearpump.{Message, TimeStamp}
+import org.apache.gearpump.{TimeStamp, Message}
/**
* This provides TaskContext for user defined tasks
@@ -41,7 +41,7 @@ class TaskWrapper(
private val LOG = LogUtil.getLogger(taskClass, task = taskId)
- private var actor: TaskActor = null
+ private var actor: TaskActor = _
private var task: Option[Task] = None
@@ -87,8 +87,8 @@ class TaskWrapper(
override def actorOf(props: Props, name: String): ActorRef = actor.context.actorOf(props, name)
- override def onStart(startTime: StartTime): Unit = {
- if (None != task) {
+ override def onStart(startTime: Instant): Unit = {
+ if (task.isDefined) {
LOG.error(s"Task.onStart should not be called multiple times... ${task.getClass}")
}
val constructor = taskClass.getConstructor(classOf[TaskContext], classOf[UserConfig])
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
index c9f1b89..647ad0a 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
@@ -38,7 +38,7 @@ import org.apache.gearpump.partitioner.HashPartitioner
import org.apache.gearpump.streaming.AppMasterToExecutor.StopTask
import org.apache.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, UnRegisterTask}
import org.apache.gearpump.streaming.appmaster.AppMaster.{TaskActorRef, LookupTaskActorRef}
-import org.apache.gearpump.streaming.task.{StartTime, TaskContext, _}
+import org.apache.gearpump.streaming.task.{TaskContext, _}
import org.apache.gearpump.streaming.{Constants, DAG, Processor, StreamApplication}
import org.apache.gearpump.util.ActorSystemBooter.RegisterActorSystem
import org.apache.gearpump.util.{ActorUtil, Graph}
@@ -300,17 +300,9 @@ object AppMasterSpec {
}
class TaskA(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
-
- override def onStart(startTime: StartTime): Unit = {
- }
-
override def onNext(msg: Message): Unit = {}
}
class TaskB(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
-
- override def onStart(startTime: StartTime): Unit = {
- }
-
override def onNext(msg: Message): Unit = {}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
index 0dd3e5b..bb495a7 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
@@ -36,7 +36,7 @@ import org.apache.gearpump.streaming.appmaster.JarScheduler.ResourceRequestDetai
import org.apache.gearpump.streaming.appmaster.TaskManager.ApplicationReady
import org.apache.gearpump.streaming.appmaster.TaskManagerSpec.{Env, Task1, Task2}
import org.apache.gearpump.streaming.executor.Executor.RestartTasks
-import org.apache.gearpump.streaming.task.{StartTime, TaskContext, _}
+import org.apache.gearpump.streaming.task.{TaskContext, _}
import org.apache.gearpump.streaming.{DAG, LifeTime, ProcessorDescription, ProcessorId}
import org.apache.gearpump.transport.HostPort
import org.apache.gearpump.util.Graph
@@ -270,13 +270,11 @@ object TaskManagerSpec {
class Task1(taskContext: TaskContext, userConf: UserConfig)
extends Task(taskContext, userConf) {
- override def onStart(startTime: StartTime): Unit = {}
override def onNext(msg: Message): Unit = {}
}
class Task2(taskContext: TaskContext, userConf: UserConfig)
extends Task(taskContext, userConf) {
- override def onStart(startTime: StartTime): Unit = {}
override def onNext(msg: Message): Unit = {}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
index 4a532dd..864aa93 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
@@ -18,14 +18,13 @@
package org.apache.gearpump.streaming.appmaster
import com.typesafe.config.ConfigFactory
-import org.apache.gearpump.Message
import org.apache.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceRequest}
import org.apache.gearpump.cluster.worker.WorkerId
import org.apache.gearpump.cluster.{TestUtil, UserConfig}
import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner}
import org.apache.gearpump.streaming.appmaster.TaskLocator.Localities
import org.apache.gearpump.streaming.appmaster.TaskSchedulerSpec.{TestTask1, TestTask2}
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
+import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskId}
import org.apache.gearpump.streaming.{DAG, ProcessorDescription}
import org.apache.gearpump.util.Graph
import org.apache.gearpump.util.Graph._
@@ -108,8 +107,8 @@ class TaskSchedulerSpec extends WordSpec with Matchers {
taskScheduler.setDAG(dag)
val tasks = taskScheduler.schedule(WorkerId(1, 0L), executorId = 0, Resource(4))
- assert(tasks.filter(_.processorId == 0).length == 2)
- assert(tasks.filter(_.processorId == 1).length == 2)
+ assert(tasks.count(_.processorId == 0) == 2)
+ assert(tasks.count(_.processorId == 1) == 2)
}
}
}
@@ -117,13 +116,9 @@ class TaskSchedulerSpec extends WordSpec with Matchers {
object TaskSchedulerSpec {
class TestTask1(taskContext: TaskContext, userConf: UserConfig)
extends Task(taskContext, userConf) {
- override def onStart(startTime: StartTime): Unit = Unit
- override def onNext(msg: Message): Unit = Unit
}
class TestTask2(taskContext: TaskContext, userConf: UserConfig)
extends Task(taskContext, userConf) {
- override def onStart(startTime: StartTime): Unit = Unit
- override def onNext(msg: Message): Unit = Unit
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
index 6bdd8aa..82979e0 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
@@ -26,7 +26,7 @@ import org.apache.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner}
import org.apache.gearpump.streaming.dsl.StreamSpec.Join
import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
import org.apache.gearpump.streaming.dsl.plan.OpTranslator._
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
import org.apache.gearpump.util.Graph
import org.apache.gearpump.util.Graph._
import org.mockito.Mockito.when
@@ -39,6 +39,7 @@ import scala.util.{Either, Left, Right}
class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar {
+
implicit var system: ActorSystem = null
override def beforeAll(): Unit = {
@@ -108,7 +109,6 @@ object StreamSpec {
class Join(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
var query: String = null
- override def onStart(startTime: StartTime): Unit = {}
override def onNext(msg: Message): Unit = {
msg.msg match {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
index ecc5352..144df0f 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
@@ -18,6 +18,8 @@
package org.apache.gearpump.streaming.dsl.plan
+import java.time.Instant
+
import scala.concurrent.Await
import scala.concurrent.duration.Duration
@@ -33,10 +35,10 @@ import org.apache.gearpump.streaming.Constants._
import org.apache.gearpump.streaming.MockUtil
import org.apache.gearpump.streaming.dsl.CollectionDataSource
import org.apache.gearpump.streaming.dsl.plan.OpTranslator._
-import org.apache.gearpump.streaming.task.StartTime
class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
+
"andThen" should "chain multiple single input function" in {
val dummy = new DummyInputFunction[String]
val split = new FlatMapFunction[String, String](line => line.split("\\s"), "split")
@@ -74,7 +76,7 @@ class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
// Source with no transformer
val source = new SourceTask[String, String](new CollectionDataSource[String](data), None,
taskContext, conf)
- source.onStart(StartTime(0))
+ source.onStart(Instant.EPOCH)
source.onNext(Message("next"))
verify(taskContext, times(1)).output(anyObject())
@@ -83,7 +85,7 @@ class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
val double = new FlatMapFunction[String, String](word => List(word, word), "double")
val another = new SourceTask(new CollectionDataSource[String](data), Some(double),
anotherTaskContext, conf)
- another.onStart(StartTime(0))
+ another.onStart(Instant.EPOCH)
another.onNext(Message("next"))
verify(anotherTaskContext, times(2)).output(anyObject())
}
@@ -106,7 +108,7 @@ class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
val taskContext = MockUtil.mockTaskContext
val task = new GroupByTask[String, String, String](input => input, taskContext, config)
- task.onStart(StartTime(0))
+ task.onStart(Instant.EPOCH)
val peopleCaptor = ArgumentCaptor.forClass(classOf[Message])
@@ -130,7 +132,7 @@ class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
val conf = UserConfig.empty
val double = new FlatMapFunction[String, String](word => List(word, word), "double")
val task = new TransformTask[String, String](Some(double), taskContext, conf)
- task.onStart(StartTime(0))
+ task.onStart(Instant.EPOCH)
val data = "1 2 2 3 3 3".split("\\s+")
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/test/scala/org/apache/gearpump/streaming/sink/DataSinkTaskSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/sink/DataSinkTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/sink/DataSinkTaskSpec.scala
index 7a2c2d1..55e59e0 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/sink/DataSinkTaskSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/sink/DataSinkTaskSpec.scala
@@ -18,10 +18,11 @@
package org.apache.gearpump.streaming.sink
+import java.time.Instant
+
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
import org.mockito.Mockito._
import org.scalacheck.Gen
import org.scalatest.mock.MockitoSugar
@@ -30,13 +31,14 @@ import org.scalatest.{PropSpec, Matchers}
class DataSinkTaskSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
+
property("DataSinkTask.onStart should call DataSink.open" ) {
- forAll(Gen.chooseNum[Long](0L, 1000L)) { (startTime: Long) =>
+ forAll(Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) { (startTime: Instant) =>
val taskContext = MockUtil.mockTaskContext
val config = UserConfig.empty
val dataSink = mock[DataSink]
val sinkTask = new DataSinkTask(taskContext, config, dataSink)
- sinkTask.onStart(StartTime(startTime))
+ sinkTask.onStart(startTime)
verify(dataSink).open(taskContext)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
index d4d580f..ae9bf37 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
@@ -18,10 +18,11 @@
package org.apache.gearpump.streaming.source
+import java.time.Instant
+
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.{TaskContext, StartTime}
import org.mockito.Mockito._
import org.scalacheck.Gen
import org.scalatest.mock.MockitoSugar
@@ -31,7 +32,7 @@ import org.scalatest.prop.PropertyChecks
class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
property("DataSourceTask.onStart should call DataSource.open") {
- forAll(Gen.chooseNum[Long](0L, 1000L)) { (startTime: Long) =>
+ forAll(Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) { (startTime: Instant) =>
val taskContext = MockUtil.mockTaskContext
implicit val system = MockUtil.system
val dataSource = mock[DataSource]
@@ -40,7 +41,7 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with
val sourceTask = new DataSourceTask(taskContext, config, dataSource)
- sourceTask.onStart(StartTime(startTime))
+ sourceTask.onStart(startTime)
verify(dataSource).open(taskContext, startTime)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
index 4afee8b..258a5ff 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
@@ -31,6 +31,7 @@ import org.apache.gearpump.streaming.task.SubscriptionSpec.NextTask
import org.apache.gearpump.streaming.{LifeTime, ProcessorDescription}
class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar {
+
val appId = 0
val executorId = 0
val taskId = TaskId(0, 0)
@@ -132,11 +133,5 @@ class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar {
object SubscriptionSpec {
class NextTask(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
-
- override def onStart(startTime: StartTime): Unit = {
- }
-
- override def onNext(msg: Message): Unit = {
- }
}
}
\ No newline at end of file
[3/3] incubator-gearpump git commit: [GEARPUMP-188] use
java.time.Instant for Task start time
Posted by ma...@apache.org.
[GEARPUMP-188] use java.time.Instant for Task start time
Author: manuzhang <ow...@gmail.com>
Closes #74 from manuzhang/replace_timestamp_with_instant.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/23daf0cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/23daf0cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/23daf0cf
Branch: refs/heads/master
Commit: 23daf0cf9c1db3fabc1b679993fcf1d6edb43e7d
Parents: 6d919ec
Author: manuzhang <ow...@gmail.com>
Authored: Mon Aug 15 23:04:11 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Mon Aug 15 23:04:11 2016 +0800
----------------------------------------------------------------------
.../pagerank/PageRankController.scala | 4 +-
.../streaming/examples/complexdag/Node.scala | 6 +-
.../streaming/examples/complexdag/Sink.scala | 6 +-
.../streaming/examples/complexdag/Source.scala | 6 +-
.../examples/fsio/SeqFileStreamProcessor.scala | 8 +-
.../examples/fsio/SeqFileStreamProducer.scala | 9 +-
.../fsio/SeqFileStreamProcessorSpec.scala | 5 +-
.../fsio/SeqFileStreamProducerSpec.scala | 5 +-
.../examples/kafka/wordcount/Split.scala | 7 +-
.../examples/kafka/wordcount/Sum.scala | 7 +-
.../examples/kafka/wordcount/SumSpec.scala | 5 +-
.../examples/sol/SOLStreamProcessor.scala | 5 +-
.../examples/sol/SOLStreamProducer.scala | 5 +-
.../examples/sol/SOLStreamProcessorSpec.scala | 5 +-
.../examples/sol/SOLStreamProducerSpec.scala | 5 +-
.../processor/NumberGeneratorProcessor.scala | 8 +-
.../state/processor/CountProcessorSpec.scala | 6 +-
.../NumberGeneratorProcessorSpec.scala | 5 +-
.../processor/WindowAverageProcessorSpec.scala | 6 +-
examples/streaming/stockcrawler/README.md | 19 --
.../src/main/resources/geardefault.conf | 9 -
.../src/main/resources/stock/css/body.png | Bin 201 -> 0 bytes
.../src/main/resources/stock/css/custom.css | 115 -----------
.../src/main/resources/stock/css/foot.png | Bin 2250 -> 0 bytes
.../src/main/resources/stock/css/header.png | Bin 17350 -> 0 bytes
.../src/main/resources/stock/js/stock.js | 157 ---------------
.../src/main/resources/stock/stock.html | 87 --------
.../streaming/examples/stock/Analyzer.scala | 170 ----------------
.../streaming/examples/stock/Crawler.scala | 60 ------
.../streaming/examples/stock/Data.scala | 61 ------
.../streaming/examples/stock/QueryServer.scala | 134 -------------
.../streaming/examples/stock/StockMarket.scala | 155 ---------------
.../streaming/examples/stock/main/Stock.scala | 86 --------
examples/streaming/transport/README.md | 3 -
.../src/main/resources/geardefault.conf | 12 --
.../src/main/resources/transport/css/body.png | Bin 201 -> 0 bytes
.../src/main/resources/transport/css/custom.css | 115 -----------
.../src/main/resources/transport/css/foot.png | Bin 2250 -> 0 bytes
.../src/main/resources/transport/css/header.png | Bin 17350 -> 0 bytes
.../main/resources/transport/js/transport.js | 180 -----------------
.../main/resources/transport/svg/beijing.svg | 199 -------------------
.../src/main/resources/transport/transport.html | 88 --------
.../streaming/examples/transport/Data.scala | 32 ---
.../examples/transport/DataSource.scala | 56 ------
.../examples/transport/QueryServer.scala | 154 --------------
.../examples/transport/Transport.scala | 69 -------
.../examples/transport/VelocityInspector.scala | 123 ------------
.../examples/transport/generator/MockCity.scala | 88 --------
.../generator/PassRecordGenerator.scala | 69 -------
.../examples/transport/DataSourceSpec.scala | 45 -----
.../examples/transport/TransportSpec.scala | 69 -------
.../transport/generator/MockCitySpec.scala | 31 ---
.../generator/PassRecordGeneratorSpec.scala | 34 ----
.../streaming/examples/wordcountjava/Split.java | 5 +-
.../streaming/examples/wordcountjava/Sum.java | 4 +-
.../streaming/examples/wordcount/Split.scala | 5 +-
.../streaming/examples/wordcount/Sum.scala | 5 +-
.../examples/wordcount/SplitSpec.scala | 7 +-
.../streaming/examples/wordcount/SumSpec.scala | 5 +-
.../storm/processor/StormProcessor.scala | 3 +-
.../storm/producer/StormProducer.scala | 3 +-
.../storm/topology/GearpumpStormComponent.scala | 9 +-
.../storm/processor/StormProcessorSpec.scala | 5 +-
.../storm/producer/StormProducerSpec.scala | 5 +-
.../topology/GearpumpStormComponentSpec.scala | 9 +-
.../topology/GearpumpStormTopologySpec.scala | 1 -
.../kafka/lib/source/AbstractKafkaSource.scala | 5 +-
.../streaming/kafka/KafkaSourceSpec.scala | 10 +-
.../apache/gearpump/streaming/javaapi/Task.java | 5 +-
.../gearpump/streaming/dsl/StreamApp.scala | 32 +--
.../streaming/dsl/plan/OpTranslator.scala | 24 +--
.../gearpump/streaming/sink/DataSinkTask.scala | 7 +-
.../gearpump/streaming/source/DataSource.scala | 6 +-
.../streaming/source/DataSourceTask.scala | 8 +-
.../streaming/state/api/PersistentTask.scala | 8 +-
.../gearpump/streaming/task/StartTime.scala | 24 ---
.../apache/gearpump/streaming/task/Task.scala | 6 +-
.../gearpump/streaming/task/TaskActor.scala | 7 +-
.../gearpump/streaming/task/TaskWrapper.scala | 12 +-
.../streaming/appmaster/AppMasterSpec.scala | 10 +-
.../streaming/appmaster/TaskManagerSpec.scala | 4 +-
.../streaming/appmaster/TaskSchedulerSpec.scala | 11 +-
.../gearpump/streaming/dsl/StreamSpec.scala | 4 +-
.../streaming/dsl/plan/OpTranslatorSpec.scala | 12 +-
.../streaming/sink/DataSinkTaskSpec.scala | 8 +-
.../streaming/source/DataSourceTaskSpec.scala | 7 +-
.../streaming/task/SubscriptionSpec.scala | 7 +-
87 files changed, 199 insertions(+), 2617 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankController.scala
----------------------------------------------------------------------
diff --git a/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankController.scala b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankController.scala
index d461876..aa250da 100644
--- a/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankController.scala
+++ b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankController.scala
@@ -17,6 +17,8 @@
*/
package org.apache.gearpump.experiments.pagerank
+import java.time.Instant
+
import akka.actor.Actor.Receive
import org.apache.gearpump.cluster.UserConfig
@@ -39,7 +41,7 @@ class PageRankController(taskContext: TaskContext, conf: UserConfig)
var weights = Map.empty[TaskId, Double]
var deltas = Map.empty[TaskId, Double]
- override def onStart(startTime: StartTime): Unit = {
+ override def onStart(startTime: Instant): Unit = {
output(Tick(tick), tasks: _*)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala
index 8d163f9..ddd4d1a 100644
--- a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala
+++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala
@@ -18,14 +18,16 @@
package org.apache.gearpump.streaming.examples.complexdag
+import java.time.Instant
+
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
class Node(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
import taskContext.output
- override def onStart(startTime: StartTime): Unit = {}
+ override def onStart(startTime: Instant): Unit = {}
override def onNext(msg: Message): Unit = {
val list = msg.msg.asInstanceOf[Vector[String]]
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala
index 8dfa565..e9b00a0 100644
--- a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala
+++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala
@@ -18,9 +18,11 @@
package org.apache.gearpump.streaming.examples.complexdag
+import java.time.Instant
+
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
import scala.collection.mutable
@@ -28,7 +30,7 @@ class Sink(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext,
var list = mutable.MutableList[String]()
- override def onStart(startTime: StartTime): Unit = {
+ override def onStart(startTime: Instant): Unit = {
list += getClass.getCanonicalName
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala
index 0359519..7abb3fc 100644
--- a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala
+++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala
@@ -18,14 +18,16 @@
package org.apache.gearpump.streaming.examples.complexdag
+import java.time.Instant
+
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
class Source(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
import taskContext.output
- override def onStart(startTime: StartTime): Unit = {
+ override def onStart(startTime: Instant): Unit = {
self ! Message("start")
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
index 2e4a556..561346e 100644
--- a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
+++ b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
@@ -18,19 +18,19 @@
package org.apache.gearpump.streaming.examples.fsio
import java.io.File
+import java.time.Instant
import java.util.concurrent.TimeUnit
-import scala.concurrent.duration.FiniteDuration
+import scala.concurrent.duration.FiniteDuration
import akka.actor.Cancellable
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.SequenceFile._
import org.apache.hadoop.io.{SequenceFile, Text}
-
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.examples.fsio.HadoopConfig._
import org.apache.gearpump.streaming.examples.fsio.SeqFileStreamProcessor._
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
class SeqFileStreamProcessor(taskContext: TaskContext, config: UserConfig)
extends Task(taskContext, config) {
@@ -49,7 +49,7 @@ class SeqFileStreamProcessor(taskContext: TaskContext, config: UserConfig)
private var snapShotTime: Long = 0
private var scheduler: Cancellable = null
- override def onStart(startTime: StartTime): Unit = {
+ override def onStart(startTime: Instant): Unit = {
val fs = FileSystem.get(hadoopConf)
fs.deleteOnExit(outputPath)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
index 02d2434..4106a2c 100644
--- a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
+++ b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
@@ -17,15 +17,16 @@
*/
package org.apache.gearpump.streaming.examples.fsio
+import java.time.Instant
+
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.SequenceFile._
import org.apache.hadoop.io.{SequenceFile, Text}
-
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.examples.fsio.HadoopConfig._
import org.apache.gearpump.streaming.examples.fsio.SeqFileStreamProducer._
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
class SeqFileStreamProducer(taskContext: TaskContext, config: UserConfig)
extends Task(taskContext, config) {
@@ -34,12 +35,12 @@ class SeqFileStreamProducer(taskContext: TaskContext, config: UserConfig)
val value = new Text()
val key = new Text()
- var reader: SequenceFile.Reader = null
+ var reader: SequenceFile.Reader = _
val hadoopConf = config.hadoopConf
val fs = FileSystem.get(hadoopConf)
val inputPath = new Path(config.getString(INPUT_PATH).get)
- override def onStart(startTime: StartTime): Unit = {
+ override def onStart(startTime: Instant): Unit = {
reader = new SequenceFile.Reader(hadoopConf, Reader.file(inputPath))
self ! Start
LOG.info("sequence file spout initiated")
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala
index 7831b14..2edb87f 100644
--- a/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala
+++ b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala
@@ -18,6 +18,7 @@
package org.apache.gearpump.streaming.examples.fsio
import java.io.File
+import java.time.Instant
import scala.collection.mutable.ArrayBuffer
import akka.actor.ActorSystem
@@ -33,7 +34,7 @@ import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.{TestUtil, UserConfig}
-import org.apache.gearpump.streaming.task.{StartTime, TaskId}
+import org.apache.gearpump.streaming.task.TaskId
import org.apache.gearpump.streaming.{MockUtil, Processor}
class SeqFileStreamProcessorSpec
extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter {
@@ -67,7 +68,7 @@ class SeqFileStreamProcessorSpec
when(context.taskId).thenReturn(taskId)
val processor = new SeqFileStreamProcessor(context, conf)
- processor.onStart(StartTime(0))
+ processor.onStart(Instant.EPOCH)
forAll(kvGenerator) { kv =>
val (key, value) = kv
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
index ad27e63..a03e68d 100644
--- a/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
+++ b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
@@ -17,6 +17,8 @@
*/
package org.apache.gearpump.streaming.examples.fsio
+import java.time.Instant
+
import scala.collection.mutable.ArrayBuffer
import org.apache.hadoop.conf.Configuration
@@ -32,7 +34,6 @@ import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.MockUtil
import org.apache.gearpump.streaming.MockUtil._
-import org.apache.gearpump.streaming.task.StartTime
class SeqFileStreamProducerSpec
extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter {
@@ -73,7 +74,7 @@ class SeqFileStreamProducerSpec
val context = MockUtil.mockTaskContext
val producer = new SeqFileStreamProducer(context, conf)
- producer.onStart(StartTime(0))
+ producer.onStart(Instant.EPOCH)
producer.onNext(Message("start"))
val expected = kvPairs.map(kv => kv._1 + "++" + kv._2).toSet
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala
index a95f596..b78e788 100644
--- a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala
+++ b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala
@@ -18,16 +18,17 @@
package org.apache.gearpump.streaming.examples.kafka.wordcount
-import com.twitter.bijection.Injection
+import java.time.Instant
+import com.twitter.bijection.Injection
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
class Split(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
import taskContext.output
- override def onStart(startTime: StartTime): Unit = {
+ override def onStart(startTime: Instant): Unit = {
}
override def onNext(msg: Message): Unit = {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala
index 9930b92..58bb884 100644
--- a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala
+++ b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala
@@ -18,18 +18,19 @@
package org.apache.gearpump.streaming.examples.kafka.wordcount
-import com.twitter.bijection.Injection
+import java.time.Instant
+import com.twitter.bijection.Injection
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
class Sum(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
import taskContext.output
private[wordcount] var wordcount = Map.empty[String, Long]
- override def onStart(startTime: StartTime): Unit = {}
+ override def onStart(startTime: Instant): Unit = {}
override def onNext(message: Message): Unit = {
val word = message.msg.asInstanceOf[String]
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala b/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala
index 3538ece..e37118a 100644
--- a/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala
+++ b/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala
@@ -17,6 +17,8 @@
*/
package org.apache.gearpump.streaming.examples.kafka.wordcount
+import java.time.Instant
+
import scala.collection.mutable
import org.mockito.Matchers._
@@ -27,7 +29,6 @@ import org.scalatest.{FlatSpec, Matchers}
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
class SumSpec extends FlatSpec with Matchers {
@@ -39,7 +40,7 @@ class SumSpec extends FlatSpec with Matchers {
val taskContext = MockUtil.mockTaskContext
val sum = new Sum(taskContext, UserConfig.empty)
- sum.onStart(StartTime(0))
+ sum.onStart(Instant.EPOCH)
val str = "once two two three three three"
var totalWordCount = 0
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
index 796b0d2..a16cf4c 100644
--- a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
+++ b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
@@ -18,6 +18,7 @@
package org.apache.gearpump.streaming.examples.sol
+import java.time.Instant
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.FiniteDuration
@@ -25,7 +26,7 @@ import akka.actor.Cancellable
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
class SOLStreamProcessor(taskContext: TaskContext, conf: UserConfig)
extends Task(taskContext, conf) {
@@ -38,7 +39,7 @@ class SOLStreamProcessor(taskContext: TaskContext, conf: UserConfig)
private var snapShotWordCount: Long = 0
private var snapShotTime: Long = 0
- override def onStart(startTime: StartTime): Unit = {
+ override def onStart(startTime: Instant): Unit = {
scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS),
new FiniteDuration(5, TimeUnit.SECONDS))(reportWordCount())
snapShotTime = System.currentTimeMillis()
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala
index 84ed038..c1b11e5 100644
--- a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala
+++ b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala
@@ -18,12 +18,13 @@
package org.apache.gearpump.streaming.examples.sol
+import java.time.Instant
import java.util.Random
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.examples.sol.SOLStreamProducer._
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
class SOLStreamProducer(taskContext: TaskContext, conf: UserConfig)
extends Task(taskContext, conf) {
@@ -36,7 +37,7 @@ class SOLStreamProducer(taskContext: TaskContext, conf: UserConfig)
private var rand: Random = null
private var messageCount: Long = 0
- override def onStart(startTime: StartTime): Unit = {
+ override def onStart(startTime: Instant): Unit = {
prepareRandomMessage
self ! Start
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala b/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala
index a6cc966..e3344bf 100644
--- a/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala
+++ b/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala
@@ -17,6 +17,8 @@
*/
package org.apache.gearpump.streaming.examples.sol
+import java.time.Instant
+
import org.mockito.Mockito._
import org.scalacheck.Gen
import org.scalatest.{FlatSpec, Matchers}
@@ -24,7 +26,6 @@ import org.scalatest.{FlatSpec, Matchers}
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
class SOLStreamProcessorSpec extends FlatSpec with Matchers {
@@ -33,7 +34,7 @@ class SOLStreamProcessorSpec extends FlatSpec with Matchers {
val context = MockUtil.mockTaskContext
val sol = new SOLStreamProcessor(context, UserConfig.empty)
- sol.onStart(StartTime(0))
+ sol.onStart(Instant.EPOCH)
val msg = Message("msg")
sol.onNext(msg)
verify(context, times(1)).output(msg)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala b/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala
index 2316de8..dc21171 100644
--- a/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala
+++ b/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala
@@ -17,6 +17,8 @@
*/
package org.apache.gearpump.streaming.examples.sol
+import java.time.Instant
+
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.scalatest.{Matchers, WordSpec}
@@ -24,7 +26,6 @@ import org.scalatest.{Matchers, WordSpec}
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
class SOLStreamProducerSpec extends WordSpec with Matchers {
@@ -35,7 +36,7 @@ class SOLStreamProducerSpec extends WordSpec with Matchers {
val context = MockUtil.mockTaskContext
val producer = new SOLStreamProducer(context, conf)
- producer.onStart(StartTime(0))
+ producer.onStart(Instant.EPOCH)
producer.onNext(Message("msg"))
verify(context).output(any[Message])
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
index 0e85f32..134afba 100644
--- a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
+++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
@@ -18,17 +18,19 @@
package org.apache.gearpump.streaming.examples.state.processor
+import java.time.Instant
+
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
class NumberGeneratorProcessor(taskContext: TaskContext, conf: UserConfig)
extends Task(taskContext, conf) {
import taskContext.output
private var num = 0L
- override def onStart(startTime: StartTime): Unit = {
- num = startTime.startTime
+ override def onStart(startTime: Instant): Unit = {
+ num = startTime.toEpochMilli
self ! Message("start")
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
index 6048034..b95d164 100644
--- a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
+++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
@@ -18,6 +18,8 @@
package org.apache.gearpump.streaming.examples.state.processor
+import java.time.Instant
+
import scala.concurrent.Await
import scala.concurrent.duration._
@@ -33,7 +35,7 @@ import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.MockUtil
import org.apache.gearpump.streaming.state.api.PersistentTask
import org.apache.gearpump.streaming.state.impl.{InMemoryCheckpointStoreFactory, PersistentStateConfig}
-import org.apache.gearpump.streaming.task.{UpdateCheckpointClock, StartTime}
+import org.apache.gearpump.streaming.task.UpdateCheckpointClock
import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
class CountProcessorSpec extends PropSpec with PropertyChecks with Matchers {
@@ -59,7 +61,7 @@ class CountProcessorSpec extends PropSpec with PropertyChecks with Matchers {
val appMaster = TestProbe()(system)
when(taskContext.appMaster).thenReturn(appMaster.ref)
- count.onStart(StartTime(0L))
+ count.onStart(Instant.EPOCH)
appMaster.expectMsg(UpdateCheckpointClock(taskContext.taskId, 0L))
for (i <- 0L to num) {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala
index 2268994..d3f645c 100644
--- a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala
+++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala
@@ -18,6 +18,8 @@
package org.apache.gearpump.streaming.examples.state.processor
+import java.time.Instant
+
import scala.concurrent.Await
import scala.concurrent.duration.Duration
@@ -30,7 +32,6 @@ import org.scalatest.{Matchers, WordSpec}
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
class NumberGeneratorProcessorSpec extends WordSpec with Matchers {
"NumberGeneratorProcessor" should {
@@ -47,7 +48,7 @@ class NumberGeneratorProcessorSpec extends WordSpec with Matchers {
val conf = UserConfig.empty
val genNum = new NumberGeneratorProcessor(taskContext, conf)
- genNum.onStart(StartTime(0))
+ genNum.onStart(Instant.EPOCH)
mockTaskActor.expectMsgType[Message]
genNum.onNext(Message("next"))
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
index 0963429..255f869 100644
--- a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
+++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
@@ -18,6 +18,8 @@
package org.apache.gearpump.streaming.examples.state.processor
+import java.time.Instant
+
import scala.concurrent.Await
import scala.concurrent.duration._
@@ -34,7 +36,7 @@ import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.MockUtil
import org.apache.gearpump.streaming.state.api.PersistentTask
import org.apache.gearpump.streaming.state.impl.{InMemoryCheckpointStoreFactory, PersistentStateConfig, WindowConfig}
-import org.apache.gearpump.streaming.task.{UpdateCheckpointClock, StartTime}
+import org.apache.gearpump.streaming.task.UpdateCheckpointClock
import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
class WindowAverageProcessorSpec extends PropSpec with PropertyChecks with Matchers {
@@ -61,7 +63,7 @@ class WindowAverageProcessorSpec extends PropSpec with PropertyChecks with Match
val appMaster = TestProbe()(system)
when(taskContext.appMaster).thenReturn(appMaster.ref)
- windowAverage.onStart(StartTime(0L))
+ windowAverage.onStart(Instant.EPOCH)
appMaster.expectMsg(UpdateCheckpointClock(taskContext.taskId, 0L))
for (i <- 0L until num) {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/README.md
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/README.md b/examples/streaming/stockcrawler/README.md
deleted file mode 100644
index b51590f..0000000
--- a/examples/streaming/stockcrawler/README.md
+++ /dev/null
@@ -1,19 +0,0 @@
-How to use
-===================
-1. Start local cluster,
- ```
- bin/local
- ```
-2. Submit the stock crawler
- ```
- bin\gear app -jar examples\gearpump-examples-assembly-0.3.2-SNAPSHOT.jar org.apache.gearpump.streaming.examples.stock.main.Stock
- ```
-
- If you are behind a proxy, you need to set the proxy address
- ```
- bin\gear app -jar examples\gearpump-examples-assembly-0.3.2-SNAPSHOT.jar org.apache.gearpump.streaming.examples.stock.main.Stock -proxy host:port
- ```
-
-3. Check the UI
- http://127.0.0.1:8080
-
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/resources/geardefault.conf
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/resources/geardefault.conf b/examples/streaming/stockcrawler/src/main/resources/geardefault.conf
deleted file mode 100644
index acee3bd..0000000
--- a/examples/streaming/stockcrawler/src/main/resources/geardefault.conf
+++ /dev/null
@@ -1,9 +0,0 @@
-gearpump {
- serializers {
- "org.apache.gearpump.streaming.examples.stock.StockPrice" = ""
- }
-}
-
-spray.can {
- server.parsing.max-content-length = "10M"
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/resources/stock/css/body.png
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/resources/stock/css/body.png b/examples/streaming/stockcrawler/src/main/resources/stock/css/body.png
deleted file mode 100644
index b5c536c..0000000
Binary files a/examples/streaming/stockcrawler/src/main/resources/stock/css/body.png and /dev/null differ
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/resources/stock/css/custom.css
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/resources/stock/css/custom.css b/examples/streaming/stockcrawler/src/main/resources/stock/css/custom.css
deleted file mode 100644
index 182d722..0000000
--- a/examples/streaming/stockcrawler/src/main/resources/stock/css/custom.css
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-.ui-datepicker {
- font-size: 11px;
-}
-
-.sidebar-label {
- font-size: 15px;
- font-family: calibri, Arial, Helvetica, sans-serif;
-}
-
-.help {
- font-size: 12px;
- font-family: calibri, Arial, Helvetica, sans-serif;
-}
-
-div.splitter {
- margin: 12px 0px 7px 0px;
- clear: both;
- border-top: 1px solid #EBEBEB;
-}
-
-input.sidebar {
- width: 165px
-}
-
-select.sidebar {
- width: 198px
-}
-
-table.dataintable {
- font-family: calibri, Arial, Helvetica, sans-serif;
- font-size: 15px;
- margin-top: 10px;
- border-collapse: collapse;
- border: 1px solid #888;
-}
-
-table.dataintable th {
- vertical-align: baseline;
- padding: 5px 15px 5px 5px;
- background-color: #EEE;
- border: 1px solid #888;
- text-align: left;
-}
-
-table.dataintable td {
- vertical-align: text-top;
- padding: 5px 15px 5px 5px;
- background-color: #FFFFFF;
- border: 1px solid #AAA;
-}
-
-#search {
- width: 100px;
- height: 25px;
- position: relative;
- left: 0px;
- top: 5px;
-}
-
-#mytable {
- width: 100%;
- height: 300;
- float: left;
-}
-
-#mychart {
- height: 250px;
- width: 100%;
-}
-
-#Menu {
- height: 100%;
- width: 245px;
- float: left;
-}
-
-#header {
- height: 115px;
- background-image: url(header.png);
-}
-
-#body {
- height: 100%;
- width: 100%;
- background-image: url(body.png);
- background-size: 100% 100%;
-}
-
-#footer {
- color: white;
- height: 70px;
- line-height: 70px;
- text-align: middle;
- clear: both;
- text-align: center;
- background-image: url(foot.png);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/resources/stock/css/foot.png
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/resources/stock/css/foot.png b/examples/streaming/stockcrawler/src/main/resources/stock/css/foot.png
deleted file mode 100644
index 5db91b5..0000000
Binary files a/examples/streaming/stockcrawler/src/main/resources/stock/css/foot.png and /dev/null differ
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/resources/stock/css/header.png
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/resources/stock/css/header.png b/examples/streaming/stockcrawler/src/main/resources/stock/css/header.png
deleted file mode 100644
index 9284e44..0000000
Binary files a/examples/streaming/stockcrawler/src/main/resources/stock/css/header.png and /dev/null differ
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/resources/stock/js/stock.js
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/resources/stock/js/stock.js b/examples/streaming/stockcrawler/src/main/resources/stock/js/stock.js
deleted file mode 100644
index 97f1e07..0000000
--- a/examples/streaming/stockcrawler/src/main/resources/stock/js/stock.js
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-function initChart(chartid, tableid, stockId) {
- require.config({
- paths: {
- echarts: 'http://echarts.baidu.com/build/dist'
- }
- });
-
- require(
- [
- 'echarts',
- 'echarts/chart/line'
- ],
-
- function (ec) {
- // \u57fa\u4e8e\u51c6\u5907\u597d\u7684dom\uff0c\u521d\u59cb\u5316echarts\u56fe\u8868
- var myChart = ec.init(document.getElementById(chartid));
- var dataPoints = 100;
- var timeTicket;
- clearInterval(timeTicket);
- timeTicket = setInterval(function () {
- $.getJSON("report/" + stockId, function (json) {
- STOCK_NAME = json.name
-
- var maxDrawnDown = json.currentMax[0].max.price - json.currentMax[0].min.price;
- var time = new Date(json.currentMax[0].current.timestamp).toLocaleTimeString().replace(/^\D*/, '');
- // \u52a8\u6001\u6570\u636e\u63a5\u53e3 addData
- myChart.addData([
- [
- 0, // \u7cfb\u5217\u7d22\u5f15
- maxDrawnDown.toFixed(2), // \u65b0\u589e\u6570\u636e
- false, // \u65b0\u589e\u6570\u636e\u662f\u5426\u4ece\u961f\u5217\u5934\u90e8\u63d2\u5165
- false, // \u662f\u5426\u589e\u52a0\u961f\u5217\u957f\u5ea6\uff0cfalse\u5219\u81ea\u5b9a\u5220\u9664\u539f\u6709\u6570\u636e\uff0c\u961f\u5934\u63d2\u5165\u5220\u961f\u5c3e\uff0c\u961f\u5c3e\u63d2\u5165\u5220\u961f\u5934
- time
- ],
- [
- 1, // \u7cfb\u5217\u7d22\u5f15
- json.currentMax[0].current.price.toFixed(2), // \u65b0\u589e\u6570\u636e
- false, // \u65b0\u589e\u6570\u636e\u662f\u5426\u4ece\u961f\u5217\u5934\u90e8\u63d2\u5165
- false, // \u662f\u5426\u589e\u52a0\u961f\u5217\u957f\u5ea6\uff0cfalse\u5219\u81ea\u5b9a\u5220\u9664\u539f\u6709\u6570\u636e\uff0c\u961f\u5934\u63d2\u5165\u5220\u961f\u5c3e\uff0c\u961f\u5c3e\u63d2\u5165\u5220\u961f\u5934
- time
- ]
- ]);
- document.getElementById(chartid).style.display = "block"
- document.getElementById(tableid).innerHTML = "<pre>" + JSON.stringify(json, null, 2) + "</pre>"
- });
- }, 2000);
-
- var subtext_ = "Draw Down"
-
- var option = {
- title: {
- text: 'Stock Analysis',
- subtext: "Max " + subtext_
- },
- tooltip: {
- trigger: 'axis'
- },
- legend: {
- data: ["Current Price", "Current Draw Down"]
- },
- toolbox: {
- show: false,
- feature: {
- mark: {show: true},
- dataView: {show: true, readOnly: false},
- magicType: {show: true, type: ['line', 'bar']},
- restore: {show: true},
- saveAsImage: {show: true}
- }
- },
- dataZoom: {
- show: false,
- start: 0,
- end: 100
- },
- xAxis: [
- {
- type: 'category',
- boundaryGap: true,
- data: (function () {
- var now = new Date();
- var res = [];
- var len = dataPoints;
- while (len--) {
- res.unshift(now.toLocaleTimeString().replace(/^\D*/, ''));
- now = new Date(now - 2000);
- }
- return res;
- })()
- }
- ],
- yAxis: [
- {
- type: 'value',
- scale: true,
- name: subtext_ + ' \u4ef7\u683c/\u5143',
- boundaryGap: [0, 0.3]
- },
- {
- type: 'value',
- scale: true,
- name: 'Current \u4ef7\u683c/\u5143',
- boundaryGap: [0, 0.1]
- }
- ],
- series: [
- {
- name: "Current Draw Down",
- type: 'line',
- data: (function () {
- var res = [];
- var len = dataPoints;
- while (len--) {
- res.push(0);
- }
- return res;
- })()
- },
- {
- name: "Current Price",
- type: 'line',
- yAxisIndex: 1,
- data: (function () {
- var res = [];
- var len = dataPoints;
- while (len--) {
- res.push(0);
- }
- return res;
- })()
- }
- ]
- };
-
- // \u4e3aecharts\u5bf9\u8c61\u52a0\u8f7d\u6570\u636e
- myChart.setOption(option);
- }
- );
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/resources/stock/stock.html
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/resources/stock/stock.html b/examples/streaming/stockcrawler/src/main/resources/stock/stock.html
deleted file mode 100644
index 9682a53..0000000
--- a/examples/streaming/stockcrawler/src/main/resources/stock/stock.html
+++ /dev/null
@@ -1,87 +0,0 @@
-<!DOCTYPE html>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<html>
-
-<head>
- <meta charset="utf-8">
- <link rel=stylesheet type=text/css href="css/custom.css">
- <script src="http://echarts.baidu.com/build/dist/echarts.js"></script>
- <script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js"></script>
- <script src="js/stock.js"></script>
- <script type="text/javascript">
- function search_onclick() {
- var stockId = document.getElementById('stockId').value
- initChart("mychart", "mytable", stockId)
- }
- </script>
-</head>
-
-<body style="background-color:#F2F2F2">
-<div id="container" style="width:882px; height:450px;margin-left:auto;margin-right:auto;">
- <div style="height:0px"></div>
- <div id="header">
- <div
- style="font-weight:600;position:relative;left:50px;top:50px;font-family: calibri, Arial, Helvetica, sans-serif;font-size:29px;color:white">
- Big Data Stock Analysis Demo
- </div>
- </div>
- <div id="body">
- <div id="Menu">
- <div style="position:relative;margin-left:30px; margin-right:20px;margin-top:20px;">
- <!-- form to post to accompany to get accompanying cars -->
-
- <table style="width:100%">
- <tr>
- <td class="sidebar-label">Stock Id:</td>
- </tr>
- <tr>
- <td class="sidebar-label">Example: sh600019, sz000002</td>
- </tr>
- <tr>
- <td style="vertical-align:top;">
- <input id="stockId" class="sidebar" type="text" name="stockId"/>
- </td>
- </tr>
- </table>
- <div class="splitter"></div>
- <div>
- <button id="search" onclick="search_onclick()">Search</button>
- </div>
- </div>
- </div>
- <div id="content"
- style="height:100%;width:585px;float:left;position:relative;left:20px;overflow:scroll;">
- <div
- style="height:50px;position:relative;top:15px;vertical-align:middle;font-weight:300;font-family: calibri, Arial, Helvetica, sans-serif;font-size:22px;color:black">
- Analysis Result:
- </div>
- <div style="height:7px;background-color:#92BDF2;"></div>
-
- <div id="mychart"></div>
-
- <div id="mytable"></div>
- </div>
- </div>
- <div id="footer">
- Big Data Team @ Intel
- </div>
-</div>
-</body>
-</html>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Analyzer.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Analyzer.scala b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Analyzer.scala
deleted file mode 100644
index f6fdff2..0000000
--- a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Analyzer.scala
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.examples.stock
-
-import scala.collection.immutable
-
-import akka.actor.Actor.Receive
-import org.joda.time.DateTime
-import org.joda.time.format.{DateTimeFormat, DateTimeFormatter}
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.examples.stock.Analyzer.HistoricalStates
-import org.apache.gearpump.streaming.examples.stock.Price._
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
-import org.apache.gearpump.util.LogUtil
-
-/**
- * Dradown analyzer
- * Definition: http://en.wikipedia.org/wiki/Drawdown_(economics)
- */
-class Analyzer(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
-
- val dateFormatter = DateTimeFormat forPattern "dd/MM/yyyy"
-
- private var stocksToReport = immutable.Set.empty[String]
- private var stockInfos = new immutable.HashMap[String, StockPrice]
-
- private var currentDownwardsStates = new immutable.HashMap[String, StockPriceState]
- private val historicalStates = new HistoricalStates()
- private var latestTimeStamp: Long = 0L
-
- override def onStart(startTime: StartTime): Unit = {
- LOG.info("analyzer is started")
- }
-
- override def onNext(msg: Message): Unit = {
- msg.msg match {
- case stock: StockPrice =>
- latestTimeStamp = stock.timestamp
- checkDate(stock)
- stockInfos += stock.stockId -> stock
- val downwardsState = updateCurrentStates(stock)
- val maxDrawdown = historicalStates.updatePresentMaximal(downwardsState)
- }
- }
-
- override def receiveUnManagedMessage: Receive = {
- case get@GetReport(stockId, date) =>
- var currentMax = currentDownwardsStates.get(stockId)
-
- val dateTime = Option(date) match {
- case Some(date) =>
- currentMax = None
- parseDate(dateFormatter, date)
- case None =>
- new DateTime(latestTimeStamp).withTimeAtStartOfDay
- }
-
- val historyMax = Option(dateTime).flatMap(handleHistoricalQuery(stockId, _))
- val name = stockInfos.get(stockId).map(_.name).getOrElse("")
- sender ! Report(stockId, name, dateTime.toString, historyMax, currentMax)
- }
-
- private def updateCurrentStates(stock: StockPrice) = {
- var downwardsState: StockPriceState = null
- if (currentDownwardsStates.contains(stock.stockId)) {
- downwardsState = generateNewState(stock, currentDownwardsStates.get(stock.stockId).get)
- } else {
- downwardsState = StockPriceState(stock.stockId, stock, stock, stock)
- }
- currentDownwardsStates += stock.stockId -> downwardsState
- downwardsState
- }
-
- // Update the stock's latest state.
- private def generateNewState(currentPrice: Price, oldState: StockPriceState): StockPriceState = {
- if (currentPrice.price > oldState.max.price) {
- StockPriceState(oldState.stockID, currentPrice, currentPrice, currentPrice)
- } else {
- val newState = StockPriceState(oldState.stockID, oldState.max,
- Price.min(currentPrice, oldState.min), currentPrice)
- newState
- }
- }
-
- private def checkDate(stock: StockPrice) = {
- if (currentDownwardsStates.contains(stock.stockId)) {
- val now = new DateTime(stock.timestamp)
- val lastTime = new DateTime(currentDownwardsStates.get(stock.stockId).get.current.timestamp)
- // New day
- if (now.getDayOfYear > lastTime.getDayOfYear || now.getYear > lastTime.getYear) {
- currentDownwardsStates -= stock.stockId
- }
- }
- }
-
- private def parseDate(format: DateTimeFormatter, input: String): DateTime = {
- format.parseDateTime(input)
- }
-
- private def handleHistoricalQuery(stockId: String, date: DateTime) = {
- val maximal = historicalStates.getHistoricalMaximal(stockId, date)
- maximal
- }
-}
-
-object Analyzer {
-
- class HistoricalStates {
- val LOG = LogUtil.getLogger(getClass)
- val dateFormatter = DateTimeFormat forPattern "dd/MM/yyyy"
- private var historicalMaxRaise = new immutable.HashMap[(String, DateTime), StockPriceState]
- private var historicalMaxDrawdown = new immutable.HashMap[(String, DateTime), StockPriceState]
-
- def updatePresentMaximal(newState: StockPriceState): Option[StockPriceState] = {
- val date = Analyzer.getDateFromTimeStamp(newState.current.timestamp)
- var newMaximalState: Option[StockPriceState] = null
- if (newState.max.price < Float.MinPositiveValue) {
- newMaximalState = generateNewMaximal(newState, date, historicalMaxRaise)
- if (newMaximalState.nonEmpty) {
- historicalMaxRaise += (newState.stockID, date) -> newMaximalState.get
- }
- } else {
- newMaximalState = generateNewMaximal(newState, date, historicalMaxDrawdown)
- if (newMaximalState.nonEmpty) {
- historicalMaxDrawdown += (newState.stockID, date) -> newMaximalState.get
- }
- }
- newMaximalState
- }
-
- def getHistoricalMaximal(stockId: String, date: DateTime): Option[StockPriceState] = {
- historicalMaxDrawdown.get((stockId, date))
- }
-
- private def generateNewMaximal(
- state: StockPriceState,
- date: DateTime,
- map: immutable.HashMap[(String, DateTime), StockPriceState])
- : Option[StockPriceState] = {
- val maximal = map.get((state.stockID, date))
- if (maximal.nonEmpty && maximal.get.drawDown > state.drawDown) {
- None
- } else {
- Some(state)
- }
- }
- }
-
- def getDateFromTimeStamp(timestamp: Long): DateTime = {
- new DateTime(timestamp).withTimeAtStartOfDay()
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Crawler.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Crawler.scala b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Crawler.scala
deleted file mode 100644
index bb444dd..0000000
--- a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Crawler.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.examples.stock
-
-import scala.concurrent.duration._
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
-
-class Crawler(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
-
- import taskContext._
-
- val FetchStockPrice = Message("FetchStockPrice")
-
- lazy val stocks = {
- val stockIds = conf.getValue[Array[String]]("StockId").get
- val size = if (stockIds.length % parallelism > 0) {
- stockIds.length / parallelism + 1
- } else {
- stockIds.length / parallelism
- }
-
- val start = taskId.index * size
- val end = (taskId.index + 1) * size
- stockIds.slice(start, end)
- }
-
- scheduleOnce(1.seconds)(self ! FetchStockPrice)
-
- val stockMarket = conf.getValue[StockMarket](classOf[StockMarket].getName).get
-
- override def onStart(startTime: StartTime): Unit = {
- // Nothing
- }
-
- override def onNext(msg: Message): Unit = {
- stockMarket.getPrice(stocks).foreach { price =>
- output(new Message(price, price.timestamp))
- }
- scheduleOnce(5.seconds)(self ! FetchStockPrice)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Data.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Data.scala b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Data.scala
deleted file mode 100644
index 94a85ff..0000000
--- a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Data.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.examples.stock
-
-// scalastyle:off equals.hash.code case class has equals defined
-case class StockPrice(
- stockId: String, name: String, price: String, delta: String, pecent: String, volume: String,
- money: String, timestamp: Long) {
- override def hashCode: Int = stockId.hashCode
-}
-// scalastyle:on equals.hash.code case class has equals defined
-
-case class Price(price: Float, timestamp: Long)
-
-object Price {
-
- import scala.language.implicitConversions
-
- implicit def StockPriceToPrice(stock: StockPrice): Price = {
- Price(stock.price.toFloat, stock.timestamp)
- }
-
- def min(first: Price, second: Price): Price = {
- if (first.price < second.price) {
- first
- } else {
- second
- }
- }
-}
-
-case class StockPriceState(stockID: String, max: Price, min: Price, current: Price) {
-
- def drawDownPeriod: Long = min.timestamp - max.timestamp
-
- def recoveryPeriod: Long = current.timestamp - min.timestamp
-
- def drawDown: Float = max.price - min.price
-}
-
-case class GetReport(stockId: String, date: String)
-
-case class Report(
- stockId: String, name: String, date: String, historyMax: Option[StockPriceState],
- currentMax: Option[StockPriceState])
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/QueryServer.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/QueryServer.scala b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/QueryServer.scala
deleted file mode 100644
index 01ccb3e..0000000
--- a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/QueryServer.scala
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.examples.stock
-
-import java.util.concurrent.TimeUnit
-import scala.concurrent.{ExecutionContext, Future}
-import scala.util.{Failure, Success}
-
-import akka.actor.Actor._
-import akka.actor.{Actor, ActorRefFactory, Props}
-import akka.io.IO
-import akka.pattern.ask
-import spray.can.Http
-import spray.http.StatusCodes
-import spray.json._
-import spray.routing.{HttpService, Route}
-import upickle.default.write
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterDataDetailRequest
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.ProcessorId
-import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef}
-import org.apache.gearpump.streaming.appmaster.{ProcessorSummary, StreamAppMasterSummary}
-import org.apache.gearpump.streaming.examples.stock.QueryServer.WebServer
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
-
-class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
- import scala.concurrent.ExecutionContext.Implicits.global
-
- import taskContext.{appId, appMaster}
-
- var analyzer: (ProcessorId, ProcessorSummary) = null
- implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
-
- override def onStart(startTime: StartTime): Unit = {
- appMaster ! AppMasterDataDetailRequest(appId)
- taskContext.actorOf(Props(new WebServer))
- }
-
- override def onNext(msg: Message): Unit = {
- // Skip
- }
-
- override def receiveUnManagedMessage: Receive = messageHandler
-
- def messageHandler: Receive = {
- case detail: StreamAppMasterSummary =>
- analyzer = detail.processors.find { kv =>
- val (processorId, processor) = kv
- processor.taskClass == classOf[Analyzer].getName
- }.get
- case getReport@GetReport(stockId, date) =>
- val parallism = analyzer._2.parallelism
- val processorId = analyzer._1
- val analyzerTaskId = TaskId(processorId, (stockId.hashCode & Integer.MAX_VALUE) % parallism)
- val requester = sender
- import scala.concurrent.Future
- (appMaster ? LookupTaskActorRef(analyzerTaskId))
- .asInstanceOf[Future[TaskActorRef]].flatMap { task =>
-
- (task.task ? getReport).asInstanceOf[Future[Report]]
- }.map { report =>
- LOG.info(s"reporting $report")
- requester ! report
- }
- case _ =>
- // Ignore
- }
-}
-
-object QueryServer {
- class WebServer extends Actor with HttpService {
-
- import context.dispatcher
- implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
- def actorRefFactory: ActorRefFactory = context
- implicit val system = context.system
-
- IO(Http) ! Http.Bind(self, interface = "localhost", port = 8080)
-
- override def receive: Receive = runRoute(webServer ~ staticRoute)
-
- def webServer: Route = {
- path("report" / Segment) { stockId =>
- get {
- onComplete((context.parent ? GetReport(stockId, null)).asInstanceOf[Future[Report]]) {
- case Success(report: Report) =>
- val json = write(report)
- complete(pretty(json))
- case Failure(ex) => complete(StatusCodes.InternalServerError,
- s"An error occurred: ${ex.getMessage}")
- }
- }
- }
- }
-
- val staticRoute = {
- pathEndOrSingleSlash {
- getFromResource("stock/stock.html")
- } ~
- pathPrefix("css") {
- get {
- getFromResourceDirectory("stock/css")
- }
- } ~
- pathPrefix("js") {
- get {
- getFromResourceDirectory("stock/js")
- }
- }
- }
-
- private def pretty(json: String): String = {
- json.parseJson.prettyPrint
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/StockMarket.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/StockMarket.scala b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/StockMarket.scala
deleted file mode 100644
index 24e050b..0000000
--- a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/StockMarket.scala
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.examples.stock
-
-import java.nio.charset.Charset
-import scala.io.Codec
-
-import org.apache.commons.httpclient.methods.GetMethod
-import org.apache.commons.httpclient.{HttpClient, MultiThreadedHttpConnectionManager}
-import org.htmlcleaner.{HtmlCleaner, TagNode}
-import org.joda.time.{DateTime, DateTimeZone}
-
-import org.apache.gearpump.streaming.examples.stock.StockMarket.ServiceHour
-import org.apache.gearpump.transport.HostPort
-import org.apache.gearpump.util.LogUtil
-
-class StockMarket(service: ServiceHour, proxy: HostPort = null) extends Serializable {
-
- private def LOG = LogUtil.getLogger(getClass)
-
- @transient
- private var connectionManager: MultiThreadedHttpConnectionManager = null
-
- private val eastMoneyStockPage = "http://quote.eastmoney.com/stocklist.html"
-
- private val stockPriceParser =
- """^var\shq_str_s_([a-z0-9A-Z]+)="([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+)";$""".r
-
- def shutdown(): Unit = {
- Option(connectionManager).map(_.shutdown())
- }
-
- @transient
- private var _client: HttpClient = null
-
- private def client: HttpClient = {
- _client = Option(_client).getOrElse {
- val connectionManager = new MultiThreadedHttpConnectionManager()
- val client = new HttpClient(connectionManager)
- Option(proxy).map(host => client.getHostConfiguration().setProxy(host.host, host.port))
- client
- }
- _client
- }
-
- def getPrice(stocks: Array[String]): Array[StockPrice] = {
-
- LOG.info(s"getPrice 1")
-
- val query = "http://hq.sinajs.cn/list=" + stocks.map("s_" + _).mkString(",")
- if (service.inService) {
-
- LOG.info(s"getPrice 2")
-
- val get = new GetMethod(query)
- client.executeMethod(get)
- val current = System.currentTimeMillis()
-
- val output = scala.io.Source.fromInputStream(get.getResponseBodyAsStream)(
- new Codec(Charset forName "GBK")).getLines().flatMap { line =>
- line match {
- case stockPriceParser(stockId, name, price, delta, pecent, volume, money) =>
- Some(StockPrice(stockId, name, price, delta, pecent, volume, money, current))
- case _ =>
- None
- }
- }.toArray
-
- LOG.info(s"getPrice 3 ${output.length}")
-
- output
- } else {
- Array.empty[StockPrice]
- }
- }
-
- private val urlPattern = """^.*/([a-zA-Z0-9]+)\.html$""".r
-
- def getStockIdList: Array[String] = {
- val cleaner = new HtmlCleaner
- val props = cleaner.getProperties
-
- val get = new GetMethod(eastMoneyStockPage)
- client.executeMethod(get)
-
- val root = cleaner.clean(get.getResponseBodyAsStream)
-
- val stockUrls = root.evaluateXPath("//div[@id='quotesearch']//li//a[@href]")
-
- val elements = root.getElementsByName("a", true)
-
- val hrefs = (0 until stockUrls.length)
- .map(stockUrls(_).asInstanceOf[TagNode].getAttributeByName("href"))
- .map { url =>
- url match {
- case urlPattern(code) => code
- case _ => null
- }
- }.toArray
- hrefs
- }
-}
-
-object StockMarket {
-
- class ServiceHour(all: Boolean) extends Serializable {
-
- /**
- * Morning openning: 9:30 am - 11:30 am
- */
- val morningStart = GMT8(new DateTime(0, 1, 1, 9, 30)).getMillis
- val morningEnd = GMT8(new DateTime(0, 1, 1, 11, 30)).getMillis
-
- /**
- * After noon openning: 13:00 pm - 15:00 pm
- */
- val afternoonStart = GMT8(new DateTime(0, 1, 1, 13, 0)).getMillis
- val afternoonEnd = GMT8(new DateTime(0, 1, 1, 15, 0)).getMillis
-
- def inService: Boolean = {
-
- if (all) {
- true
- } else {
- val now = GMT8(DateTime.now()).withDate(0, 1, 1).getMillis
- if (now >= morningStart && now <= morningEnd ||
- now >= afternoonStart && now <= afternoonEnd) {
- true
- } else {
- false
- }
- }
- }
-
- private def GMT8(time: DateTime): DateTime = {
- time.withZone(DateTimeZone.UTC).plusHours(8)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/main/Stock.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/main/Stock.scala b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/main/Stock.scala
deleted file mode 100644
index 6d17c20..0000000
--- a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/main/Stock.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.examples.stock.main
-
-import akka.actor.ActorSystem
-import org.slf4j.Logger
-
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
-import org.apache.gearpump.partitioner.HashPartitioner
-import org.apache.gearpump.streaming.examples.stock.StockMarket.ServiceHour
-import org.apache.gearpump.streaming.examples.stock.{Analyzer, Crawler, QueryServer, StockMarket}
-import org.apache.gearpump.streaming.{Processor, StreamApplication}
-import org.apache.gearpump.transport.HostPort
-import org.apache.gearpump.util.Graph.Node
-import org.apache.gearpump.util.{AkkaApp, Graph, LogUtil}
-
-/** Tracks the China's stock market index change */
-object Stock extends AkkaApp with ArgumentsParser {
-
- private val LOG: Logger = LogUtil.getLogger(getClass)
-
- override val options: Array[(String, CLIOption[Any])] = Array(
- "crawler" -> CLIOption[Int]("<how many fetcher to get data from remote>",
- required = false, defaultValue = Some(10)),
- "analyzer" -> CLIOption[Int]("<parallism of analyzer>",
- required = false, defaultValue = Some(1)),
- "proxy" -> CLIOption[String]("proxy setting host:port, for example: 127.0.0.1:8443",
- required = false, defaultValue = Some("")))
-
- def crawler(config: ParseResult)(implicit system: ActorSystem): StreamApplication = {
- val crawler = Processor[Crawler](config.getInt("crawler"))
- val analyzer = Processor[Analyzer](config.getInt("analyzer"))
- val queryServer = Processor[QueryServer](1)
-
- val proxySetting = config.getString("proxy")
- val proxy = if (proxySetting.isEmpty) {
- null
- } else HostPort(proxySetting)
- val stockMarket = new StockMarket(new ServiceHour(true), proxy)
- val stocks = stockMarket.getStockIdList
-
- // scalastyle:off println
- Console.println(s"Successfully fetched stock id for ${stocks.length} stocks")
- // scalastyle:on println
-
- val userConfig = UserConfig.empty.withValue("StockId", stocks)
- .withValue[StockMarket](classOf[StockMarket].getName, stockMarket)
- val partitioner = new HashPartitioner
-
- val p1 = crawler ~ partitioner ~> analyzer
- val p2 = Node(queryServer)
- val graph = Graph(p1, p2)
- val app = StreamApplication("stock_direct_analyzer", graph, userConfig
- )
- app
- }
-
- override def main(akkaConf: Config, args: Array[String]): Unit = {
- val config = parse(args)
- val context = ClientContext(akkaConf)
-
- implicit val system = context.system
-
- val app = crawler(config)
- val appId = context.submit(app)
- context.close()
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/README.md
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/README.md b/examples/streaming/transport/README.md
deleted file mode 100644
index fc9bdfe..0000000
--- a/examples/streaming/transport/README.md
+++ /dev/null
@@ -1,3 +0,0 @@
-What is this?
-=============
-A smart transportation example which simulate a city with millions of cars.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/geardefault.conf
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/geardefault.conf b/examples/streaming/transport/src/main/resources/geardefault.conf
deleted file mode 100644
index 0c8f421..0000000
--- a/examples/streaming/transport/src/main/resources/geardefault.conf
+++ /dev/null
@@ -1,12 +0,0 @@
-gearpump {
-
- serializers {
- ## Follow this format when adding new serializer for new message types
- ## "org.apache.gearpump.Message" = "org.apache.gearpump.streaming.MessageSerializer"
- "org.apache.gearpump.streaming.examples.transport.PassRecord" = ""
- }
-}
-
-spray.can {
- server.parsing.max-content-length = "10M"
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/transport/css/body.png
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/transport/css/body.png b/examples/streaming/transport/src/main/resources/transport/css/body.png
deleted file mode 100644
index b5c536c..0000000
Binary files a/examples/streaming/transport/src/main/resources/transport/css/body.png and /dev/null differ
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/transport/css/custom.css
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/transport/css/custom.css b/examples/streaming/transport/src/main/resources/transport/css/custom.css
deleted file mode 100644
index f324b6a..0000000
--- a/examples/streaming/transport/src/main/resources/transport/css/custom.css
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-.ui-datepicker {
- font-size: 11px;
-}
-
-.sidebar-label {
- font-size: 15px;
- font-family: calibri, Arial, Helvetica, sans-serif;
-}
-
-.help {
- font-size: 12px;
- font-family: calibri, Arial, Helvetica, sans-serif;
-}
-
-div.splitter {
- margin: 12px 0px 7px 0px;
- clear: both;
- border-top: 1px solid #EBEBEB;
-}
-
-input.sidebar {
- width: 165px
-}
-
-select.sidebar {
- width: 198px
-}
-
-table.dataintable {
- font-family: calibri, Arial, Helvetica, sans-serif;
- font-size: 15px;
- margin-top: 10px;
- border-collapse: collapse;
- border: 1px solid #888;
-}
-
-table.dataintable th {
- vertical-align: baseline;
- padding: 5px 15px 5px 5px;
- background-color: #EEE;
- border: 1px solid #888;
- text-align: left;
-}
-
-table.dataintable td {
- vertical-align: text-top;
- padding: 5px 15px 5px 5px;
- background-color: #FFFFFF;
- border: 1px solid #AAA;
-}
-
-#search {
- width: 100px;
- height: 25px;
- position: relative;
- left: 0px;
- top: 5px;
-}
-
-#mytable {
- width: 100%;
- height: 300;
- float: left;
-}
-
-#mychart {
- height: 400px;
- width: 100%;
-}
-
-#Menu {
- height: 100%;
- width: 245px;
- float: left;
-}
-
-#header {
- height: 115px;
- background-image: url(header.png);
-}
-
-#body {
- height: 100%;
- width: 100%;
- background-image: url(body.png);
- background-size: 100% 100%;
-}
-
-#footer {
- color: white;
- height: 70px;
- line-height: 70px;
- text-align: middle;
- clear: both;
- text-align: center;
- background-image: url(foot.png);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/transport/css/foot.png
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/transport/css/foot.png b/examples/streaming/transport/src/main/resources/transport/css/foot.png
deleted file mode 100644
index 5db91b5..0000000
Binary files a/examples/streaming/transport/src/main/resources/transport/css/foot.png and /dev/null differ
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/transport/css/header.png
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/transport/css/header.png b/examples/streaming/transport/src/main/resources/transport/css/header.png
deleted file mode 100644
index 9284e44..0000000
Binary files a/examples/streaming/transport/src/main/resources/transport/css/header.png and /dev/null differ
[2/3] incubator-gearpump git commit: [GEARPUMP-188] use
java.time.Instant for Task start time
Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/transport/js/transport.js
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/transport/js/transport.js b/examples/streaming/transport/src/main/resources/transport/js/transport.js
deleted file mode 100644
index eef0fe9..0000000
--- a/examples/streaming/transport/src/main/resources/transport/js/transport.js
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-var myChart = echarts.init(document.getElementById("mychart"))
-
-echarts.util.mapData.params.params.football = {
- getGeoJson: function (callback) {
- $.ajax({
- url: "../svg/beijing.svg",
- dataType: 'xml',
- success: function (xml) {
- callback(xml)
- }
- });
- }
-}
-
-function updateRecords(tableId) {
- $.getJSON("records", function (json) {
- var tableStr = "<table class=\"dataintable\" style=\"margin-left: 5px;\">";
- tableStr += "<tr><th>Over Speed Vehicle ID</th><th>Speed</th><th>Location</th><th>Time</th></tr>";
- var records = json.records;
- for (var i = 0; i < Math.min(records.length, 20); i++) {
- var record = records[i];
- var vehicleId = record.vehicleId;
- var location = record.locationId.split("_");
- var speed = record.speed;
- var row = location[1];
- var column = location[2];
- var time = new Date(Number(record.timestamp)).toLocaleTimeString().replace(/^\D*/, '');
- tableStr += "<tr><td>" + vehicleId + "</td>";
- tableStr += "<td>" + speed + "km/h </td>"
- tableStr += "<td>(" + row + ", " + column + ")</td>";
- tableStr += "<td>" + time + "</td></tr>";
- }
- if (records.length < 20) {
- for (var i = records.length; i < 20; i++) {
- tableStr += "<tr><td></td>";
- tableStr += "<td> </td>"
- tableStr += "<td> </td>";
- tableStr += "<td> </td></tr>";
- }
- }
- tableStr += "</table>"
- document.getElementById(tableId).innerHTML = tableStr;
- }
- )
-}
-
-function initChart(chartid, vehicleId) {
- // \u57fa\u4e8e\u51c6\u5907\u597d\u7684dom\uff0c\u521d\u59cb\u5316echarts\u56fe\u8868
- $.getJSON("trace/" + vehicleId, function (json) {
- // \u4e3aecharts\u5bf9\u8c61\u52a0\u8f7d\u6570\u636e
- var records = json.records;
- var timeLine = new Array(records.length);
- var markPoints = new Array(records.length);
- var options_ = new Array(records.length - 2);
- for (var i = 0; i < records.length; i++) {
- var record = records[i];
- var vehicleId = record.vehicleId;
- var location = record.locationId.split("_");
- var row = location[1];
- var column = location[2];
- var time = new Date(Number(record.timeStamp)).toLocaleTimeString().replace(/^\D*/, '');
- timeLine[i] = time;
- var currentPonit = {name: "", value: i, geoCoord: [row * 90, column * 90]};
- markPoints[i] = currentPonit;
- }
- options_[0] =
- {
- title: {
- text: 'Vehicle trace'
- },
- tooltip: {
- trigger: 'item'
- },
- toolbox: {
- show: false,
- feature: {
- mark: {show: true},
- dataView: {show: true, readOnly: false},
- magicType: {show: true, type: ['line', 'bar']},
- restore: {show: true},
- saveAsImage: {show: true}
- }
- },
- series: [
- {
- name: 'Vehicle trace',
- type: 'map',
- mapType: 'football',
- mapLocation: {
- y: 30,
- height: 430
- },
- itemStyle: {
- normal: {label: {show: false}},
- emphasis: {label: {show: false}}
- },
- data: [
- {name: 'City', hoverable: false, itemStyle: {normal: {label: {show: false}}}}
- ],
- markPoint: {
- symbol: 'circle',
- symbolSize: 8,
- itemStyle: {
- normal: {
- borderWidth: 1,
- color: 'blue',
- lineStyle: {
- type: 'solid'
- }
- }
- },
- data: [markPoints[0]]
- },
- markLine: {
- smooth: true,
- effect: {
- show: true,
- scaleSize: 1.5,
- period: 1.5,
- color: '#fff'
- },
- itemStyle: {
- normal: {
- borderWidth: 2,
- color: 'red',
- lineStyle: {
- type: 'solid'
- }
- }
- },
- data: []
- }
- }
- ]
- }
- for (var i = 1; i < markPoints.length; i++) {
- options_[i] =
- {
- series: [
- {
- markPoint: {
- data: [markPoints[i]]
- },
- markLine: {
- data: []
- }
- }
- ]
- }
- }
- var option = {
- timeline: {
- type: 'number',
- playInterval: 500,
- autoPlay: true,
- data: timeLine
- },
- options: options_
- };
- myChart.setOption(option);
- });
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/transport/svg/beijing.svg
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/transport/svg/beijing.svg b/examples/streaming/transport/src/main/resources/transport/svg/beijing.svg
deleted file mode 100644
index 5342c24..0000000
--- a/examples/streaming/transport/src/main/resources/transport/svg/beijing.svg
+++ /dev/null
@@ -1,199 +0,0 @@
-<?xml version="1.0" encoding="utf-8"?>
-
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-<!-- Generator: Adobe Illustrator 14.0.0, SVG Export Plug-In . SVG Version: 6.00 Build 43363) -->
-<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
-<svg version="1.1" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" x="0px" y="0px" width="1103px"
- height="1115px" viewBox="0 0 200 202" enable-background="new 0 0 1103 1115" xml:space="preserve">
-<g id="\u80cc\u666f">
-
- <polygon fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points="
- 719,377 533,377 487,398 491,563 469,571 469,682 504,682 546,676 661,667 672,672 710,672 740,676 751,645 746,567 725,563
- 719,510 "/>
-
- <path fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" d="
- M359,533V402l-7-76l36-18l333-10l76,65v315l-38,41h-49l-37,6H534l-81,31h-37c-33.5-0.5-56.5-54.5-57-70V533z"/>
-
- <path fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" d="
- M252,571v44l11,39l16,41v71l7.5,27.5L290,811l11,13l16,1l447-4l48-10l51-45l7-25l8-70V370c1.5-12.5-4.5-24.5-11-32l-103-96l-25-15
- l-21-6l-392,17l-19,8l-17,24l-35,28l-4,10l4,38l-3,40v47l3,37l-3,40V571z"/>
-
- <path fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" d="
- M259,133l63-8l52-13l57-19h49l241,6l69,26h51c18.5,2.5,27.5,6.5,37,16l82,121l16,28l9,20l11,37l11,21l25,37l7,122l11,80v24v73l6,30
- l-6,17L944,857l-93,45l-46,62l-17,10l-49,1c-17.5,1.5-37.5,20.5-43,35l-19,52l-18,12l-84,27c-5.5,1.5-21.5,6.5-27-13l-6-37
- c-2.5-8.5-3.5-22.5-27-23l-151-18c-15.5-1.5-30.5,5.5-50,18l-23,12c-17.5,8.5-41.5-2.5-49-30l-15-60v-35c0,0-49.5-86.5-55-90
- s-35-9-35-9l-38-65l-5-23l-33-60l-6-22l13-46v-89l2-43l-2-63l10-37l19-111l-3-27c0.5-8.5,5.5-30.5,24-36l29-6l25-15h46l17-10l16-22
- L259,133z"/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points="
- 1112,523 990,523 967,530 878,530 "/>
-
- <line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" x1="1030" y1="405" x2="1112" y2="406"/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points="
- 1112,48 962,122 777.5,343.5 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points="
- 856,-7 856,10 848,31 797,81 788,93 763,186 706,298 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points="
- 559.5,303.5 565,270 547,207 422,-7 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points="
- -10,1003 13,995 25,988 42,931 42,912 47,898 70,870 74,853 66,789 74,779 110,767 143,742 152,717 280,647 331,633 359.5,617.5
- "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points="
- 456,1115 449,1049 449,1022 467,986 466.5,977.5 455.5,868.5 460,842 460.5,754.5 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points="
- 1103,1115 1066,1044 1042,1011 1015,982 984,927 952,888 901,849 778,703 "/>
-
- <line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" x1="1036.5" y1="768.5" x2="1112" y2="810"/>
-
- <line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" x1="878" y1="671" x2="1112" y2="682"/>
-
- <line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" x1="1019" y1="-7" x2="955" y2="129"/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- -10,533 540,536 671,531 878,530 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 0,468 132,468 168,457 175,457 201,470 212,470 360,470 422,474 458,474 494,472 539,472 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 539,477 590,477 595,474 623,474 630,470 732,470 753,476 763,477 825,502 843,503 929,502 984.5,496.5 1037,500 1091,500
- 1112,503 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 494,565 533.5,564.5 556,562 616,562 626,559 670,559 680,562 725,562 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 1030,405 971,405 885,435 683,435 589.5,435.5 539,439 523,439 372,439 254,439 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 360,618 389,602 403,600 469,602 591.5,601.5 625.5,591.5 652,589 699,588 987,589 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 716,377 748,377 778,344 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- -10,596 6,603 47,603 63,588 91,572 394,572 472,571 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 725,544 689,544 674,559 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 539,406 474.5,410.5 458,415 424.5,417.5 400.5,414.5 378,413 305.5,379.5 283.5,385.5 254,384 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 878,530 864,541 725,549 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 213,688 231,694 278,694 397,682 467,682 "/>
-
- <line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" x1="469" y1="682" x2="460.5" y2="754.5"/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 308,755 313,638 313,496 316,487 315.5,420.5 313,399 313,344 282,280 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 352,327 322,338 299,344 254,344 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 162,-7 206,13 240,48 287,120 294,175 312,193 312,206 324,219.5 330,280 346,298 352,327 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 332.5,-9 357,41 364,70 369,109 379,124 381,155 379,159 373,159 370,168 375,265 391,322 401,414 401,474 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 477,93 482,229 486,361 489,372 490,399 "/>
-
- <line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" x1="560" y1="304" x2="556" y2="377"/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 537,377 542,536 542,694 539,703 535,716 535,727 530,825 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 613,596 615,671 624,924 666,921 666,930 670,935 678,991 755,1061 766,1115 "/>
-
- <line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" x1="600" y1="227" x2="604" y2="377"/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 642,434 640,377 638,302 638,227 642,99 653,10 653,0 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 1037,769 951,709 951,677 870,622 678,623 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 653,11 670,58 668,85 668,110 664,129 664,146 668,176 667,199 667,227 672,369 664,369 664,377 672,609 678,622 680,725 "/>
-
- <line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" x1="878" y1="671" x2="745" y2="671"/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 1112,470 973,472 965,474 770,474 763,478 "/>
-
- <line fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" x1="845" y1="434" x2="842" y2="696"/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 763,530 763,383 751,372 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 812.5,125 816,205 861,243 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 667,172 619,174 578,175 494,178 434,180 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 720,271 709,269 392,272 340,275 329,279 282,283 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 826,797 837,810 840.5,831.5 885.5,884.5 967.16,1014.78 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 888,1003 1025,917 1050,905 1064.5,888.5 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 1071,1051 1051,1065 1024,1081 987,1065 976,1051 961,1040 898,1017 861,959 851,935 837,923 813,888 763,838 757,825 751,769
- 749,737 747,682 737,677 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 1009,988 967.28,1015.13 933.5,1117.5 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 0,984 9,974 15,957 15,950 15,939 26,909 34,901 45,897 60,878 58,867 63,856 64,844 61,825 64,818 61,776 104,769 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 917,866 845,900 797,919 781,915 742,923 725,923 667,930 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#AAAAAA" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- -3,1076 93,1076 148,1056 248,1055 270,1047 333,1009 458,909 "/>
-
- <polyline fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#14A97E" stroke-width="10" stroke-linecap="round" stroke-linejoin="round" points="
- 239,129 241,154 251,172 270,176 292,176 295,187 312,193 346,192 352,184 371,184 374,237 377,272 391,327 401,414 422,418
- 449,416 472,411 530,406 539,414 542,677 560,703 535,725 530,810 "/>
-</g>
-<g id="\u5c42_1" display="none">
-
- <polyline display="inline" fill-rule="evenodd" clip-rule="evenodd" fill="none" stroke="#0096C0" stroke-width="5" stroke-linecap="round" stroke-linejoin="round" points="
- 797,623.5 797,362.5 778,345.5 792,328.5 725,273.5 706,270.5 386.5,273 340,277.5 333,279.5 299,282.5 "/>
-</g>
-</svg>
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/transport/transport.html
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/resources/transport/transport.html b/examples/streaming/transport/src/main/resources/transport/transport.html
deleted file mode 100644
index baee931..0000000
--- a/examples/streaming/transport/src/main/resources/transport/transport.html
+++ /dev/null
@@ -1,88 +0,0 @@
-<!DOCTYPE html>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<html>
-
-<head>
- <meta charset="utf-8">
- <link rel=stylesheet type=text/css href="css/custom.css">
- <script src="http://echarts.baidu.com/build/source/echarts-all.js"></script>
- <script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js"></script>
-</head>
-
-<body style="background-color:#F2F2F2">
-<div id="container" style="width:882px; height:450px;margin-left:auto;margin-right:auto;">
- <div style="height:0px"></div>
- <div id="header">
- <div
- style="font-weight:600;position:relative;left:50px;top:50px;font-family: calibri, Arial, Helvetica, sans-serif;font-size:29px;color:white">
- Big Data Transport Monitoring Demo
- </div>
- </div>
- <div id="body">
- <div id="Menu">
- <div style="position:relative;margin-left:30px; margin-right:20px;margin-top:20px;">
- <!-- form to post to accompany to get accompanying cars -->
-
- <table style="width:100%">
- <tr>
- <td class="sidebar-label">Vehicle Id:</td>
- </tr>
- <tr>
- <td class="sidebar-label"></td>
- </tr>
- <tr>
- <td style="vertical-align:top;">
- <input id="vehicleId" class="sidebar" type="text" name="vehicleId"/>
- </td>
- </tr>
- </table>
- <div class="splitter"></div>
- <div>
- <button id="search" onclick="search_onclick()">Search</button>
- </div>
- </div>
- </div>
- <div id="content"
- style="height:100%;width:585px;float:left;position:relative;left:20px;overflow:scroll;">
- <div
- style="height:50px;position:relative;top:15px;vertical-align:middle;font-weight:300;font-family: calibri, Arial, Helvetica, sans-serif;font-size:22px;color:black">
- Vehicle Trace:
- </div>
- <div style="height:7px;background-color:#92BDF2;"></div>
-
- <div id="mychart"></div>
-
- <div id="mytable"></div>
- </div>
- </div>
- <div id="footer">
- Big Data Team @ Intel
- </div>
- <script src="js/transport.js"></script>
- <script type="text/javascript">
- function search_onclick() {
- var vehicleId = document.getElementById('vehicleId').value
- initChart("mychart", vehicleId)
- }
- setInterval(updateRecords, 1000, "mytable")
- </script>
-</div>
-</body>
-</html>
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Data.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Data.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Data.scala
deleted file mode 100644
index 0aaf72c..0000000
--- a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Data.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport
-
-case class LocationInfo(id: String, row: Int, column: Int)
-
-// scalastyle:off equals.hash.code
-case class PassRecord(vehicleId: String, locationId: String, timeStamp: Long) {
- override def hashCode: Int = vehicleId.hashCode
-}
-// scalastyle:on equals.hash.code
-
-case class GetTrace(vehicleId: String)
-
-case class VehicleTrace(records: Array[PassRecord])
-
-case class OverSpeedReport(vehicleId: String, speed: String, timestamp: Long, locationId: String)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/DataSource.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/DataSource.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/DataSource.scala
deleted file mode 100644
index 555e850..0000000
--- a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/DataSource.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport
-
-import scala.concurrent.duration._
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.examples.transport.generator.{MockCity, PassRecordGenerator}
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
-
-class DataSource(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
- import taskContext.{output, parallelism, scheduleOnce, taskId}
- private val overdriveThreshold = conf.getInt(VelocityInspector.OVER_DRIVE_THRESHOLD).get
- private val vehicleNum = conf.getInt(DataSource.VEHICLE_NUM).get / parallelism
- private val citySize = conf.getInt(DataSource.MOCK_CITY_SIZE).get
- private val mockCity = new MockCity(citySize)
- private val recordGenerators: Array[PassRecordGenerator] =
- PassRecordGenerator.create(vehicleNum, getIdentifier(taskId), mockCity, overdriveThreshold)
-
- override def onStart(startTime: StartTime): Unit = {
- self ! Message("start", System.currentTimeMillis())
- }
-
- override def onNext(msg: Message): Unit = {
- recordGenerators.foreach(generator =>
- output(Message(generator.getNextPassRecord(), System.currentTimeMillis())))
- scheduleOnce(1.second)(self ! Message("continue", System.currentTimeMillis()))
- }
-
- private def getIdentifier(taskId: TaskId): String = {
- // scalastyle:off non.ascii.character.disallowed
- s"\u6caaA${taskId.processorId}${taskId.index}"
- // scalastyle:on non.ascii.character.disallowed
- }
-}
-
-object DataSource {
- final val VEHICLE_NUM = "vehicle.number"
- final val MOCK_CITY_SIZE = "mock.city.size"
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/QueryServer.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/QueryServer.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/QueryServer.scala
deleted file mode 100644
index ff3b4b4..0000000
--- a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/QueryServer.scala
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport
-
-import java.util.concurrent.TimeUnit
-import scala.concurrent.Future
-import scala.util.{Failure, Success}
-
-import akka.actor.Actor._
-import akka.actor.{Actor, ActorRefFactory, Props}
-import akka.io.IO
-import akka.pattern.ask
-import spray.can.Http
-import spray.http.StatusCodes
-import spray.json._
-import spray.routing.{HttpService, Route}
-import upickle.default.write
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.partitioner.PartitionerDescription
-import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef}
-import org.apache.gearpump.streaming.examples.transport.QueryServer.{GetAllRecords, WebServer}
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
-import org.apache.gearpump.streaming.{DAG, ProcessorDescription, ProcessorId, StreamApplication}
-import org.apache.gearpump.util.Graph
-
-class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
- import system.dispatcher
- import taskContext.appMaster
-
- var inspector: (ProcessorId, ProcessorDescription) = null
- implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
- private var overSpeedRecords = List.empty[OverSpeedReport]
-
- override def onStart(startTime: StartTime): Unit = {
- val dag = DAG(conf.getValue[Graph[ProcessorDescription, PartitionerDescription]](
- StreamApplication.DAG).get)
- inspector = dag.processors.find { kv =>
- val (_, processor) = kv
- processor.taskClass == classOf[VelocityInspector].getName
- }.get
- taskContext.actorOf(Props(new WebServer))
- }
-
- override def onNext(msg: Message): Unit = {
- }
-
- override def receiveUnManagedMessage: Receive = {
- case getTrace@GetTrace(vehicleId: String) =>
- val parallism = inspector._2.parallelism
- val processorId = inspector._1
- val analyzerTaskId = TaskId(processorId, (vehicleId.hashCode & Integer.MAX_VALUE) % parallism)
- val requester = sender
- (appMaster ? LookupTaskActorRef(analyzerTaskId))
- .asInstanceOf[Future[TaskActorRef]].flatMap { task =>
- (task.task ? getTrace).asInstanceOf[Future[VehicleTrace]]
- }.map { trace =>
- LOG.info(s"reporting $trace")
- requester ! trace
- }
- case record@OverSpeedReport(vehicleId, speed, timestamp, locationId) =>
- LOG.info(s"vehicle $vehicleId is over speed, the speed is $speed km/h")
- overSpeedRecords :+= record
- case GetAllRecords =>
- sender ! QueryServer.OverSpeedRecords(overSpeedRecords.toArray.sortBy(_.timestamp))
- overSpeedRecords = List.empty[OverSpeedReport]
- case _ =>
- // Ignore
- }
-}
-
-object QueryServer {
- object GetAllRecords
-
- case class OverSpeedRecords(records: Array[OverSpeedReport])
-
- class WebServer extends Actor with HttpService {
-
- import context.dispatcher
- implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
- def actorRefFactory: ActorRefFactory = context
- implicit val system = context.system
-
- IO(Http) ! Http.Bind(self, interface = "0.0.0.0", port = 8080)
-
- override def receive: Receive = runRoute(webServer ~ staticRoute)
-
- def webServer: Route = {
- path("trace" / Segment) { vehicleId =>
- get {
- onComplete((context.parent ? GetTrace(vehicleId)).asInstanceOf[Future[VehicleTrace]]) {
- case Success(trace: VehicleTrace) =>
- val json = write(trace)
- complete(pretty(json))
- case Failure(ex) => complete(StatusCodes.InternalServerError,
- s"An error occurred: ${ex.getMessage}")
- }
- }
- } ~
- path("records") {
- get {
- onComplete((context.parent ? GetAllRecords).asInstanceOf[Future[OverSpeedRecords]]) {
- case Success(records: OverSpeedRecords) =>
- val json = write(records)
- complete(pretty(json))
- case Failure(ex) => complete(StatusCodes.InternalServerError,
- s"An error occurred: ${ex.getMessage}")
- }
- }
- }
- }
-
- val staticRoute = {
- pathEndOrSingleSlash {
- getFromResource("transport/transport.html")
- } ~
- pathPrefix("css") {
- get {
- getFromResourceDirectory("transport/css")
- }
- } ~
- pathPrefix("svg") {
- get {
- getFromResourceDirectory("transport/svg")
- }
- } ~
- pathPrefix("js") {
- get {
- getFromResourceDirectory("transport/js")
- }
- }
- }
-
- private def pretty(json: String): String = {
- json.parseJson.prettyPrint
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Transport.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Transport.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Transport.scala
deleted file mode 100644
index 5beb2e1..0000000
--- a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/Transport.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport
-
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
-import org.apache.gearpump.partitioner.HashPartitioner
-import org.apache.gearpump.streaming.{Processor, StreamApplication}
-import org.apache.gearpump.util.Graph._
-import org.apache.gearpump.util.{AkkaApp, Graph}
-
-/** A city smart transportation streaming application */
-object Transport extends AkkaApp with ArgumentsParser {
- override val options: Array[(String, CLIOption[Any])] = Array(
- "source" -> CLIOption[Int]("<how many task to generate data>", required = false,
- defaultValue = Some(10)),
- "inspector" -> CLIOption[Int]("<how many over speed inspector>", required = false,
- defaultValue = Some(4)),
- "vehicle" -> CLIOption[Int]("<how many vehicles's to generate>", required = false,
- defaultValue = Some(1000)),
- "citysize" -> CLIOption[Int]("<the blocks number of the mock city>", required = false,
- defaultValue = Some(10)),
- "threshold" -> CLIOption[Int]("<overdrive threshold, km/h>", required = false,
- defaultValue = Some(60)))
-
- def application(config: ParseResult): StreamApplication = {
- val sourceNum = config.getInt("source")
- val inspectorNum = config.getInt("inspector")
- val vehicleNum = config.getInt("vehicle")
- val citysize = config.getInt("citysize")
- val threshold = config.getInt("threshold")
- val source = Processor[DataSource](sourceNum)
- val inspector = Processor[VelocityInspector](inspectorNum)
- val queryServer = Processor[QueryServer](1)
- val partitioner = new HashPartitioner
-
- val userConfig = UserConfig.empty.withInt(DataSource.VEHICLE_NUM, vehicleNum).
- withInt(DataSource.MOCK_CITY_SIZE, citysize).
- withInt(VelocityInspector.OVER_DRIVE_THRESHOLD, threshold).
- withInt(VelocityInspector.FAKE_PLATE_THRESHOLD, 200)
- StreamApplication("transport", Graph(source ~ partitioner ~> inspector,
- Node(queryServer)), userConfig)
- }
-
- override def main(akkaConf: Config, args: Array[String]): Unit = {
- val config = parse(args)
- val context = ClientContext(akkaConf)
- implicit val system = context.system
- context.submit(application(config))
- context.close()
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/VelocityInspector.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/VelocityInspector.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/VelocityInspector.scala
deleted file mode 100644
index 4d9bd04..0000000
--- a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/VelocityInspector.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport
-
-import java.util.concurrent.TimeUnit
-import scala.collection.immutable.Queue
-import scala.collection.mutable
-import scala.concurrent.Future
-
-import akka.actor.Actor._
-import akka.actor.ActorRef
-import akka.pattern.ask
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.partitioner.PartitionerDescription
-import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef}
-import org.apache.gearpump.streaming.examples.transport.generator.MockCity
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
-import org.apache.gearpump.streaming.{DAG, ProcessorDescription, StreamApplication}
-import org.apache.gearpump.util.Graph
-
-class VelocityInspector(taskContext: TaskContext, conf: UserConfig)
- extends Task(taskContext, conf) {
-
- import system.dispatcher
- import taskContext.appMaster
- implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
- private val passRecords = mutable.Map.empty[String, Queue[PassRecord]]
- private val fakePlateThreshold = conf.getInt(VelocityInspector.FAKE_PLATE_THRESHOLD).get
- private val overdriveThreshold = conf.getInt(VelocityInspector.OVER_DRIVE_THRESHOLD).get
- private val citySize = conf.getInt(DataSource.MOCK_CITY_SIZE).get
- private val mockCity = new MockCity(citySize)
- private var queryServerActor: ActorRef = null
-
- override def onStart(startTime: StartTime): Unit = {
- val dag = DAG(conf.getValue[Graph[ProcessorDescription, PartitionerDescription]](
- StreamApplication.DAG).get)
- val queryServer = dag.processors.find { kv =>
- val (_, processor) = kv
- processor.taskClass == classOf[QueryServer].getName
- }.get
- val queryServerTaskId = TaskId(queryServer._1, 0)
- (appMaster ? LookupTaskActorRef(queryServerTaskId)).asInstanceOf[Future[TaskActorRef]]
- .map { task =>
- queryServerActor = task.task
- }
- }
-
- import org.apache.gearpump.streaming.examples.transport.VelocityInspector._
- override def onNext(msg: Message): Unit = {
- msg.msg match {
- case passRecord: PassRecord =>
- val records = passRecords.getOrElse(passRecord.vehicleId, Queue.empty[PassRecord])
- if (records.size > 0) {
- val velocity = getVelocity(passRecord, records.last)
- val formatted = "%.2f".format(velocity)
- if (velocity > overdriveThreshold) {
- if (velocity > fakePlateThreshold) {
- LOG.info(s"vehicle ${passRecord.vehicleId} maybe a fake plate, " +
- s"the speed is $formatted km/h")
- }
- if (queryServerActor != null) {
- queryServerActor ! OverSpeedReport(passRecord.vehicleId, formatted,
- passRecord.timeStamp, passRecord.locationId)
- }
- }
- }
- passRecords.update(passRecord.vehicleId, records.enqueueFinite(passRecord, RECORDS_NUM))
- }
- }
-
- override def receiveUnManagedMessage: Receive = {
- case GetTrace(vehicleId) =>
- val records = passRecords.getOrElse(vehicleId, Queue.empty[PassRecord])
- sender ! VehicleTrace(records.toArray.sortBy(_.timeStamp))
- }
-
- private def getVelocity(passRecord: PassRecord, lastPassRecord: PassRecord): Float = {
- val distanceInKm = getDistance(lastPassRecord.locationId, passRecord.locationId)
- val timeInHour = (passRecord.timeStamp - lastPassRecord.timeStamp).toFloat / (1000 * 60 * 60)
- distanceInKm / timeInHour
- }
-
- private def getDistance(location1: String, location2: String): Long = {
- mockCity.getDistance(location1, location2)
- }
-}
-
-object VelocityInspector {
- final val OVER_DRIVE_THRESHOLD = "overdrive.threshold"
- final val FAKE_PLATE_THRESHOLD = "fakeplate.threshold"
- final val RECORDS_NUM = 20
-
- class FiniteQueue[T](q: Queue[T]) {
- def enqueueFinite[B >: T](elem: B, maxSize: Int): Queue[B] = {
- var result = q.enqueue(elem)
- while (result.size > maxSize) {
- result = result.dequeue._2
- }
- result
- }
- }
-
- import scala.language.implicitConversions
-
- implicit def queue2FiniteQueue[T](q: Queue[T]): FiniteQueue[T] = new FiniteQueue[T](q)
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCity.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCity.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCity.scala
deleted file mode 100644
index 60e0bcf..0000000
--- a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCity.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport.generator
-
-import scala.util.Random
-
-import org.apache.gearpump.streaming.examples.transport.generator.MockCity._
-
-class MockCity(size: Int) {
- private val random = new Random()
- private val directions = Array(UP, DOWN, LEFT, RIGHT)
-
- def nextLocation(currentLocationId: String): String = {
- val coordinate = idToCoordinate(currentLocationId)
- val direction = directions(random.nextInt(4))
- val newCoordinate = coordinate.addOffset(direction)
- if (inCity(newCoordinate)) {
- coordinateToId(newCoordinate)
- } else {
- nextLocation(currentLocationId)
- }
- }
-
- def getDistance(locationId1: String, locationId2: String): Long = {
- val coordinate1 = idToCoordinate(locationId1)
- val coordinate2 = idToCoordinate(locationId2)
- val blocks = Math.abs(coordinate1.row - coordinate2.row) +
- Math.abs(coordinate1.column - coordinate2.column)
- blocks * LENGTH_PER_BLOCK
- }
-
- def randomLocationId(): String = {
- val row = random.nextInt(size)
- val column = random.nextInt(size)
- coordinateToId(Coordinate(row, column))
- }
-
- private def coordinateToId(coordinate: Coordinate): String = {
- s"Id_${coordinate.row}_${coordinate.column}"
- }
-
- private def idToCoordinate(locationId: String): Coordinate = {
- val attr = locationId.split("_")
- val row = attr(1).toInt
- val column = attr(2).toInt
- Coordinate(row, column)
- }
-
- private def inCity(coordinate: Coordinate): Boolean = {
- coordinate.row >= 0 &&
- coordinate.row < size &&
- coordinate.column >= 0 &&
- coordinate.column < size
- }
-}
-
-object MockCity {
- // The length of the mock city, km
- final val LENGTH_PER_BLOCK = 5
- // The minimal speed, km/h
- final val MINIMAL_SPEED = 10
-
- final val UP = Coordinate(0, 1)
- final val DOWN = Coordinate(0, -1)
- final val LEFT = Coordinate(-1, 0)
- final val RIGHT = Coordinate(1, 0)
-
- case class Coordinate(row: Int, column: Int) {
- def addOffset(coordinate: Coordinate): Coordinate = {
- Coordinate(this.row + coordinate.row, this.column + coordinate.column)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala b/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala
deleted file mode 100644
index e8c1c59..0000000
--- a/examples/streaming/transport/src/main/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGenerator.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport.generator
-
-import scala.util.Random
-
-import org.apache.gearpump.streaming.examples.transport.PassRecord
-import org.apache.gearpump.util.LogUtil
-
-class PassRecordGenerator(vehicleId: String, city: MockCity, overdriveThreshold: Int) {
- private val LOG = LogUtil.getLogger(getClass)
- LOG.info(s"Generate pass record for vehicle $vehicleId")
- private var timeStamp = System.currentTimeMillis()
-
- private var locationId = city.randomLocationId()
- private val random = new Random()
- private val fakePlate = random.nextInt(1000) < 1000 * PassRecordGenerator.FAKE_PLATE_RATE
- private val (randomMin, randomRange) = {
- val lowerBound = MockCity.LENGTH_PER_BLOCK * 1000 * 60 * 60 / overdriveThreshold.toFloat
- val upperBound = MockCity.LENGTH_PER_BLOCK * 1000 * 60 * 60 / MockCity.MINIMAL_SPEED.toFloat
- val overdrive = (upperBound - lowerBound) * PassRecordGenerator.OVERDRIVE_RATE
- val randomMin = Math.max(lowerBound - overdrive, PassRecordGenerator.TWOMINUTES)
- val randomRange = upperBound - randomMin
- (randomMin.toInt, randomRange.toInt)
- }
-
- def getNextPassRecord(): PassRecord = {
- locationId = if (fakePlate) {
- city.randomLocationId()
- } else {
- city.nextLocation(locationId)
- }
- timeStamp += (random.nextInt(randomRange) + randomMin)
- PassRecord(vehicleId, locationId, timeStamp)
- }
-}
-
-object PassRecordGenerator {
- final val FAKE_PLATE_RATE = 0.01F
- final val OVERDRIVE_RATE = 0.05F
- final val TWOMINUTES = 2 * 60 * 1000
-
- def create(generatorNum: Int, prefix: String, city: MockCity, overdriveThreshold: Int)
- : Array[PassRecordGenerator] = {
- var result = Map.empty[String, PassRecordGenerator]
- val digitsNum = (Math.log10(generatorNum) + 1).toInt
- for (i <- 1 to generatorNum) {
- val vehicleId = prefix + s"%0${digitsNum}d".format(i)
- val generator = new PassRecordGenerator(vehicleId, city, overdriveThreshold)
- result += vehicleId -> generator
- }
- result.values.toArray
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/DataSourceSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/DataSourceSpec.scala b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/DataSourceSpec.scala
deleted file mode 100644
index 1f525ae..0000000
--- a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/DataSourceSpec.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport
-
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalatest.{FlatSpec, Matchers}
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
-
-class DataSourceSpec extends FlatSpec with Matchers {
- it should "create the pass record" in {
- val vehicleNum = 2
- val context = MockUtil.mockTaskContext
-
- val userConfig = UserConfig.empty.withInt(DataSource.VEHICLE_NUM, vehicleNum).
- withInt(DataSource.MOCK_CITY_SIZE, 10).
- withInt(VelocityInspector.OVER_DRIVE_THRESHOLD, 60).
- withInt(VelocityInspector.FAKE_PLATE_THRESHOLD, 200)
-
- val source = new DataSource(context, userConfig)
- source.onStart(StartTime(0))
- source.onNext(Message("start"))
- verify(context, times(vehicleNum)).output(any[Message])
- source.onStop()
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/TransportSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/TransportSpec.scala b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/TransportSpec.scala
deleted file mode 100644
index 2f83de5..0000000
--- a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/TransportSpec.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport
-
-import scala.concurrent.Future
-import scala.util.Success
-
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{BeforeAndAfterAll, Matchers, PropSpec}
-
-import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication
-import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult
-import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
-
-class TransportSpec
- extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll with MasterHarness {
-
- override def beforeAll {
- startActorSystem()
- }
-
- override def afterAll {
- shutdownActorSystem()
- }
-
- protected override def config = TestUtil.DEFAULT_CONFIG
-
- property("Transport should succeed to submit application with required arguments") {
- val requiredArgs = Array.empty[String]
- val optionalArgs = Array(
- "-source", "1",
- "-inspector", "1",
- "-vehicle", "100",
- "-citysize", "10",
- "-threshold", "60")
-
- val args = {
- Table(
- ("requiredArgs", "optionalArgs"),
- (requiredArgs, optionalArgs)
- )
- }
- val masterReceiver = createMockMaster()
- forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) =>
- val args = requiredArgs ++ optionalArgs
-
- Future {
- Transport.main(masterConfig, args)
- }
- masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
- masterReceiver.reply(SubmitApplicationResult(Success(0)))
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCitySpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCitySpec.scala b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCitySpec.scala
deleted file mode 100644
index ba4eb2d..0000000
--- a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/MockCitySpec.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport.generator
-
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-class MockCitySpec extends PropSpec with PropertyChecks with Matchers {
-
- property("MockCity should maintain the location properly") {
- val city = new MockCity(10)
- val start = city.randomLocationId()
- val nextLocation = city.nextLocation(start)
- assert(city.getDistance(start, nextLocation) == MockCity.LENGTH_PER_BLOCK)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala b/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala
deleted file mode 100644
index f0eebbf..0000000
--- a/examples/streaming/transport/src/test/scala/org/apache/gearpump/streaming/examples/transport/generator/PassRecordGeneratorSpec.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.examples.transport.generator
-
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-class PassRecordGeneratorSpec extends PropSpec with PropertyChecks with Matchers {
-
- property("PassRecordGenerator should generate pass record") {
- val id = "test"
- val city = new MockCity(10)
- val generator = new PassRecordGenerator(id, city, 60)
- val passrecord1 = generator.getNextPassRecord()
- val passrecord2 = generator.getNextPassRecord()
- assert(city.getDistance(passrecord1.locationId, passrecord2.locationId) ==
- MockCity.LENGTH_PER_BLOCK)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java
index 76069c1..0a8fb4f 100644
--- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java
+++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java
@@ -21,9 +21,10 @@ package org.apache.gearpump.streaming.examples.wordcountjava;
import org.apache.gearpump.Message;
import org.apache.gearpump.cluster.UserConfig;
import org.apache.gearpump.streaming.javaapi.Task;
-import org.apache.gearpump.streaming.task.StartTime;
import org.apache.gearpump.streaming.task.TaskContext;
+import java.time.Instant;
+
public class Split extends Task {
public static String TEXT = "This is a good start for java! bingo! bingo! ";
@@ -37,7 +38,7 @@ public class Split extends Task {
}
@Override
- public void onStart(StartTime startTime) {
+ public void onStart(Instant startTime) {
self().tell(new Message("start", now()), self());
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java
index 89c3b14..3daa6e0 100644
--- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java
+++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java
@@ -21,10 +21,10 @@ package org.apache.gearpump.streaming.examples.wordcountjava;
import org.apache.gearpump.Message;
import org.apache.gearpump.cluster.UserConfig;
import org.apache.gearpump.streaming.javaapi.Task;
-import org.apache.gearpump.streaming.task.StartTime;
import org.apache.gearpump.streaming.task.TaskContext;
import org.slf4j.Logger;
+import java.time.Instant;
import java.util.HashMap;
public class Sum extends Task {
@@ -37,7 +37,7 @@ public class Sum extends Task {
}
@Override
- public void onStart(StartTime startTime) {
+ public void onStart(Instant startTime) {
//skip
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
index ae63f10..af3c04c 100644
--- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
+++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
@@ -18,16 +18,17 @@
package org.apache.gearpump.streaming.examples.wordcount
+import java.time.Instant
import java.util.concurrent.TimeUnit
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
class Split(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
import taskContext.output
- override def onStart(startTime: StartTime): Unit = {
+ override def onStart(startTime: Instant): Unit = {
self ! Message("start")
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala
index c3fa82a..dbefc93 100644
--- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala
+++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala
@@ -18,6 +18,7 @@
package org.apache.gearpump.streaming.examples.wordcount
+import java.time.Instant
import java.util.concurrent.TimeUnit
import scala.collection.mutable
import scala.concurrent.duration.FiniteDuration
@@ -26,7 +27,7 @@ import akka.actor.Cancellable
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
class Sum(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
private[wordcount] val map: mutable.HashMap[String, Long] = new mutable.HashMap[String, Long]()
@@ -37,7 +38,7 @@ class Sum(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext,
private var scheduler: Cancellable = null
- override def onStart(startTime: StartTime): Unit = {
+ override def onStart(startTime: Instant): Unit = {
scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS),
new FiniteDuration(30, TimeUnit.SECONDS))(reportWordCount)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala
index cef9337..8b50890 100644
--- a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala
+++ b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala
@@ -17,6 +17,8 @@
*/
package org.apache.gearpump.streaming.examples.wordcount
+import java.time.Instant
+
import scala.concurrent.Await
import scala.concurrent.duration.Duration
@@ -29,7 +31,6 @@ import org.scalatest.{Matchers, WordSpec}
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.{TestUtil, UserConfig}
import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
class SplitSpec extends WordSpec with Matchers {
@@ -47,10 +48,10 @@ class SplitSpec extends WordSpec with Matchers {
val conf = UserConfig.empty
val split = new Split(taskContext, conf)
- split.onStart(StartTime(0))
+ split.onStart(Instant.EPOCH)
mockTaskActor.expectMsgType[Message]
- val expectedWordCount = Split.TEXT_TO_SPLIT.split( """[\s\n]+""").filter(_.nonEmpty).length
+ val expectedWordCount = Split.TEXT_TO_SPLIT.split( """[\s\n]+""").count(_.nonEmpty)
split.onNext(Message("next"))
verify(taskContext, times(expectedWordCount)).output(anyObject())
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SumSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SumSpec.scala b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SumSpec.scala
index e42d696..17e1765 100644
--- a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SumSpec.scala
+++ b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SumSpec.scala
@@ -17,6 +17,8 @@
*/
package org.apache.gearpump.streaming.examples.wordcount
+import java.time.Instant
+
import org.scalacheck.Gen
import org.scalatest.prop.PropertyChecks
import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
@@ -24,7 +26,6 @@ import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
class SumSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter {
val stringGenerator = Gen.alphaStr
@@ -39,7 +40,7 @@ class SumSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndA
val sum = new Sum(taskContext, conf)
- sum.onStart(StartTime(0))
+ sum.onStart(Instant.EPOCH)
forAll(stringGenerator) { txt =>
wordcount += 1
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala
index 1d3048e..e3b45fb 100644
--- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala
@@ -18,6 +18,7 @@
package org.apache.gearpump.experiments.storm.processor
+import java.time.Instant
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
@@ -46,7 +47,7 @@ private[storm] class StormProcessor(gearpumpBolt: GearpumpBolt,
private val freqOpt = gearpumpBolt.getTickFrequency
- override def onStart(startTime: StartTime): Unit = {
+ override def onStart(startTime: Instant): Unit = {
gearpumpBolt.start(startTime)
freqOpt.foreach(scheduleTick)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala
index 5d4a6a2..b92f037 100644
--- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala
@@ -18,6 +18,7 @@
package org.apache.gearpump.experiments.storm.producer
+import java.time.Instant
import java.util.concurrent.TimeUnit
import akka.actor.Actor.Receive
@@ -48,7 +49,7 @@ private[storm] class StormProducer(gearpumpSpout: GearpumpSpout,
private val timeoutMillis = gearpumpSpout.getMessageTimeout
- override def onStart(startTime: StartTime): Unit = {
+ override def onStart(startTime: Instant): Unit = {
gearpumpSpout.start(startTime)
if (gearpumpSpout.ackEnabled) {
getCheckpointClock
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
index d0f2949..a8e061c 100644
--- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
@@ -19,6 +19,7 @@
package org.apache.gearpump.experiments.storm.topology
import java.io.{File, FileOutputStream, IOException}
+import java.time.Instant
import java.util
import java.util.jar.JarFile
import java.util.{HashMap => JHashMap, List => JList, Map => JMap}
@@ -40,7 +41,7 @@ import org.apache.gearpump.experiments.storm.util.StormConstants._
import org.apache.gearpump.experiments.storm.util.StormUtil._
import org.apache.gearpump.experiments.storm.util.{StormOutputCollector, StormUtil}
import org.apache.gearpump.streaming.DAG
-import org.apache.gearpump.streaming.task.{GetDAG, TaskId, TaskContext, StartTime}
+import org.apache.gearpump.streaming.task.{GetDAG, TaskId, TaskContext}
import org.apache.gearpump.util.{Constants, LogUtil}
import org.apache.gearpump.{Message, TimeStamp}
import org.slf4j.Logger
@@ -57,7 +58,7 @@ trait GearpumpStormComponent {
* invoked at Task.onStart
* @param startTime task start time
*/
- def start(startTime: StartTime): Unit
+ def start(startTime: Instant): Unit
/**
* invoked at Task.onNext
@@ -123,7 +124,7 @@ object GearpumpStormComponent {
private var collector: StormSpoutOutputCollector = null
- override def start(startTime: StartTime): Unit = {
+ override def start(startTime: Instant): Unit = {
val dag = getDAG(taskContext.appMaster)
val topologyContext = getTopologyContext(dag, taskContext.taskId)
collector = getOutputCollector(taskContext, topologyContext)
@@ -206,7 +207,7 @@ object GearpumpStormComponent {
private var generalTopologyContext: GeneralTopologyContext = null
private var tickTuple: Tuple = null
- override def start(startTime: StartTime): Unit = {
+ override def start(startTime: Instant): Unit = {
val dag = getDAG(taskContext.appMaster)
topologyContext = getTopologyContext(dag, taskContext.taskId)
generalTopologyContext = getGeneralTopologyContext(dag)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormProcessorSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormProcessorSpec.scala
index 2111df6..9bbac58 100644
--- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormProcessorSpec.scala
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormProcessorSpec.scala
@@ -18,11 +18,12 @@
package org.apache.gearpump.experiments.storm.processor
+import java.time.Instant
+
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpBolt
import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
import org.mockito.Mockito._
import org.scalatest.mock.MockitoSugar
import org.scalatest.{Matchers, WordSpec}
@@ -31,7 +32,7 @@ class StormProcessorSpec extends WordSpec with Matchers with MockitoSugar {
"StormProcessor" should {
"start GearpumpSpout onStart" in {
- val startTime = mock[StartTime]
+ val startTime = Instant.EPOCH
val gearpumpBolt = mock[GearpumpBolt]
when(gearpumpBolt.getTickFrequency).thenReturn(None)
val taskContext = MockUtil.mockTaskContext
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormProducerSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormProducerSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormProducerSpec.scala
index 39a008f..ee89a4a 100644
--- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormProducerSpec.scala
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormProducerSpec.scala
@@ -18,12 +18,13 @@
package org.apache.gearpump.experiments.storm.producer
+import java.time.Instant
+
import akka.testkit.TestProbe
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpSpout
import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
import org.mockito.Mockito._
import org.scalatest.mock.MockitoSugar
import org.scalatest.{Matchers, WordSpec}
@@ -32,7 +33,7 @@ class StormProducerSpec extends WordSpec with Matchers with MockitoSugar {
"StormProducer" should {
"start GearpumpSpout onStart" in {
- val startTime = mock[StartTime]
+ val startTime = Instant.EPOCH
val gearpumpSpout = mock[GearpumpSpout]
when(gearpumpSpout.getMessageTimeout).thenReturn(None)
val taskContext = MockUtil.mockTaskContext
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
index bdea50c..0891070 100644
--- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
@@ -17,6 +17,7 @@
*/
package org.apache.gearpump.experiments.storm.topology
+import java.time.Instant
import java.util.{Map => JMap}
import akka.actor.ActorRef
@@ -26,7 +27,7 @@ import backtype.storm.tuple.Tuple
import org.apache.gearpump.experiments.storm.producer.StormSpoutOutputCollector
import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, GearpumpSpout}
import org.apache.gearpump.experiments.storm.util.StormOutputCollector
-import org.apache.gearpump.streaming.task.{StartTime, TaskContext, TaskId}
+import org.apache.gearpump.streaming.task.{TaskContext, TaskId}
import org.apache.gearpump.streaming.{DAG, MockUtil}
import org.apache.gearpump.{Message, TimeStamp}
import org.mockito.Matchers.{anyObject, eq => mockitoEq}
@@ -59,8 +60,7 @@ class GearpumpStormComponentSpec
getOutputCollector, ackEnabled = false, taskContext)
// Start
- val startTime = mock[StartTime]
- gearpumpSpout.start(startTime)
+ gearpumpSpout.start(Instant.EPOCH)
verify(spout).open(mockitoEq(config), mockitoEq(topologyContext),
anyObject[SpoutOutputCollector])
@@ -100,8 +100,7 @@ class GearpumpStormComponentSpec
getGeneralTopologyContext, getOutputCollector, getTickTuple, taskContext)
// Start
- val startTime = mock[StartTime]
- gearpumpBolt.start(startTime)
+ gearpumpBolt.start(Instant.EPOCH)
verify(bolt).prepare(mockitoEq(config), mockitoEq(topologyContext),
anyObject[OutputCollector])
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
index ef383ad..b92b2e1 100644
--- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
@@ -79,7 +79,6 @@ class GearpumpStormTopologySpec extends WordSpec with Matchers with MockitoSugar
"get target processors from source id" in {
val stormTopology = TopologyUtil.getTestTopology
implicit val system = MockUtil.system
- val sysConfig = new JHashMap[AnyRef, AnyRef]
val gearpumpStormTopology =
GearpumpStormTopology("app", stormTopology, null)
val targets0 = gearpumpStormTopology.getTargets("1")
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
index da08b04..314eae8 100644
--- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
@@ -18,6 +18,7 @@
package org.apache.gearpump.streaming.kafka.lib.source
+import java.time.Instant
import java.util.Properties
import com.twitter.bijection.Injection
@@ -87,11 +88,11 @@ abstract class AbstractKafkaSource(
this.checkpointStoreFactory = Some(checkpointStoreFactory)
}
- override def open(context: TaskContext, startTime: TimeStamp): Unit = {
+ override def open(context: TaskContext, startTime: Instant): Unit = {
import context.{parallelism, taskId}
LOG.info("KafkaSource opened at start time {}", startTime)
- this.startTime = startTime
+ this.startTime = startTime.toEpochMilli
val topicList = topic.split(",", -1).toList
val grouper = config.getConfiguredInstance(KafkaConfig.PARTITION_GROUPER_CLASS_CONFIG,
classOf[PartitionGrouper])
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala
index e40276f..6ccb231 100644
--- a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala
+++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala
@@ -18,6 +18,7 @@
package org.apache.gearpump.streaming.kafka
+import java.time.Instant
import java.util.Properties
import com.twitter.bijection.Injection
@@ -42,7 +43,7 @@ import org.scalatest.{Matchers, PropSpec}
class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
- val startTimeGen = Gen.choose[Long](0L, 100L)
+ val startTimeGen = Gen.choose[Long](0L, 100L).map(Instant.ofEpochMilli)
val offsetGen = Gen.choose[Long](0L, 100L)
val topicAndPartitionGen = for {
topic <- Gen.alphaStr suchThat (_.nonEmpty)
@@ -51,7 +52,7 @@ class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with Mo
val tpsGen = Gen.listOf[TopicAndPartition](topicAndPartitionGen) suchThat (_.nonEmpty)
property("KafkaSource open should not recover without checkpoint") {
- forAll(startTimeGen, tpsGen) { (startTime: Long, tps: List[TopicAndPartition]) =>
+ forAll(startTimeGen, tpsGen) { (startTime: Instant, tps: List[TopicAndPartition]) =>
val taskContext = MockUtil.mockTaskContext
val fetchThread = mock[FetchThread]
val kafkaClient = mock[KafkaClient]
@@ -84,7 +85,7 @@ class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with Mo
property("KafkaSource open should recover with checkpoint") {
forAll(startTimeGen, offsetGen, tpsGen) {
- (startTime: Long, offset: Long, tps: List[TopicAndPartition]) =>
+ (startTime: Instant, offset: Long, tps: List[TopicAndPartition]) =>
val taskContext = MockUtil.mockTaskContext
val checkpointStoreFactory = mock[CheckpointStoreFactory]
val checkpointStores = tps.map(_ -> mock[CheckpointStore]).toMap
@@ -115,7 +116,8 @@ class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with Mo
checkpointStores.foreach { case (tp, store) =>
when(checkpointStoreFactory.getCheckpointStore(
KafkaConfig.getCheckpointStoreNameSuffix(tp))).thenReturn(store)
- when(store.recover(startTime)).thenReturn(Some(Injection[Long, Array[Byte]](offset)))
+ when(store.recover(startTime.toEpochMilli))
+ .thenReturn(Some(Injection[Long, Array[Byte]](offset)))
}
source.setCheckpointStore(checkpointStoreFactory)