You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/08/04 02:12:37 UTC
[1/2] incubator-gearpump git commit: [GEARPUMP-338] Improve time
related types and constants
Repository: incubator-gearpump
Updated Branches:
refs/heads/master b6f5ccd6e -> f96aca995
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
index 1fb61bd..fb2aaed 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
@@ -24,6 +24,8 @@ import java.util.concurrent.TimeUnit
import akka.actor._
import com.gs.collections.impl.map.mutable.primitive.IntShortHashMap
+import org.apache.gearpump.Message
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.streaming.source.Watermark
import org.slf4j.Logger
import org.apache.gearpump.cluster.UserConfig
@@ -35,7 +37,6 @@ import org.apache.gearpump.streaming.ExecutorToAppMaster._
import org.apache.gearpump.streaming.ProcessorId
import org.apache.gearpump.streaming.task.TaskActor._
import org.apache.gearpump.util.{LogUtil, TimeOutScheduler}
-import org.apache.gearpump.{Message, TimeStamp}
import scala.collection.JavaConverters._
import scala.concurrent.duration._
@@ -141,7 +142,7 @@ class TaskActor(
context.become(waitForStartTask(startClock))
}
- def waitForStartTask(startClock: TimeStamp): Receive = {
+ def waitForStartTask(startClock: MilliSeconds): Receive = {
case start@StartTask(tid) =>
assert(tid == this.taskId, s"$start sent to the wrong task ${this.taskId}")
onStartTask(startClock)
@@ -227,7 +228,7 @@ class TaskActor(
/**
* Returns min clock of upstream task
*/
- def getUpstreamMinClock: TimeStamp = upstreamWatermark.toEpochMilli
+ def getUpstreamMinClock: MilliSeconds = upstreamWatermark.toEpochMilli
def getProcessingWatermark: Instant = processingWatermark
@@ -265,7 +266,7 @@ class TaskActor(
count
}
- private def onStartTask(startClock: TimeStamp): Unit = {
+ private def onStartTask(startClock: MilliSeconds): Unit = {
LOG.info(s"received start, clock: $startClock, sessionId: $sessionId")
subscriptions = taskContextData.subscribers.map { subscriber =>
(subscriber.processorId,
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
index c2e2faa..4ba9315 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
@@ -18,7 +18,7 @@
package org.apache.gearpump.streaming.task
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.streaming.ProcessorId
/*
@@ -42,25 +42,25 @@ case class Ack(taskId: TaskId, seq: Short, actualReceivedNum: Short, sessionId:
sealed trait ClockEvent
-case class UpdateClock(taskId: TaskId, time: TimeStamp) extends ClockEvent
+case class UpdateClock(taskId: TaskId, time: MilliSeconds) extends ClockEvent
object GetLatestMinClock extends ClockEvent
case class GetUpstreamMinClock(taskId: TaskId) extends ClockEvent
-case class UpdateCheckpointClock(taskId: TaskId, clock: TimeStamp) extends ClockEvent
+case class UpdateCheckpointClock(taskId: TaskId, clock: MilliSeconds) extends ClockEvent
case object GetCheckpointClock extends ClockEvent
-case class CheckpointClock(clock: Option[TimeStamp])
+case class CheckpointClock(clock: Option[MilliSeconds])
-case class UpstreamMinClock(latestMinClock: Option[TimeStamp])
+case class UpstreamMinClock(latestMinClock: Option[MilliSeconds])
-case class LatestMinClock(clock: TimeStamp)
+case class LatestMinClock(clock: MilliSeconds)
case object GetStartClock
-case class StartClock(clock: TimeStamp)
+case class StartClock(clock: MilliSeconds)
case object EndingClock
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
index 82cae96..1e4430b 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
@@ -23,10 +23,11 @@ import java.time.Instant
import scala.concurrent.duration.FiniteDuration
import akka.actor.Actor._
import akka.actor.{ActorRef, ActorSystem, Cancellable, Props}
-import org.slf4j.Logger
+import org.apache.gearpump.Message
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.util.LogUtil
-import org.apache.gearpump.{Message, TimeStamp}
+import org.slf4j.Logger
/**
* This provides TaskContext for user defined tasks
@@ -107,7 +108,7 @@ class TaskWrapper(
task.map(_.receiveUnManagedMessage).getOrElse(defaultMessageHandler)
}
- override def upstreamMinClock: TimeStamp = {
+ override def upstreamMinClock: MilliSeconds = {
actor.getUpstreamMinClock
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala
index 1ef255e..8d026db 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala
@@ -18,7 +18,7 @@
package org.apache.gearpump.streaming.transaction.api
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
/**
* CheckpointStore persistently stores mapping of timestamp to checkpoint
@@ -27,9 +27,9 @@ import org.apache.gearpump.TimeStamp
*/
trait CheckpointStore {
- def persist(timeStamp: TimeStamp, checkpoint: Array[Byte]): Unit
+ def persist(timeStamp: MilliSeconds, checkpoint: Array[Byte]): Unit
- def recover(timestamp: TimeStamp): Option[Array[Byte]]
+ def recover(timestamp: MilliSeconds): Option[Array[Byte]]
def close(): Unit
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala
index 2ddca3a..856b1c5 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala
@@ -18,12 +18,13 @@
package org.apache.gearpump.streaming.transaction.api
-import org.apache.gearpump.{Message, TimeStamp}
+import org.apache.gearpump.Message
+import org.apache.gearpump.Time.MilliSeconds
/**
- * TimeStampFilter filters out messages that are obsolete.
+ * MilliSecondsFilter filters out messages that are obsolete.
*/
trait TimeStampFilter extends java.io.Serializable {
- def filter(msg: Message, predicate: TimeStamp): Option[Message]
+ def filter(msg: Message, predicate: MilliSeconds): Option[Message]
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
index c223a53..d3bd51b 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
@@ -41,7 +41,7 @@ import org.apache.gearpump.streaming.{DAG, LifeTime, ProcessorDescription, Proce
import org.apache.gearpump.transport.HostPort
import org.apache.gearpump.util.Graph
import org.apache.gearpump.util.Graph._
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
@@ -238,7 +238,7 @@ class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
// Step11: Tell ClockService to update DAG.
clockService.expectMsgType[ChangeToNewDAG]
- clockService.reply(ChangeToNewDAGSuccess(Map.empty[ProcessorId, TimeStamp]))
+ clockService.reply(ChangeToNewDAGSuccess(Map.empty[ProcessorId, MilliSeconds]))
// Step12: start all tasks
import scala.concurrent.duration._
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/CheckpointManagerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/CheckpointManagerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/CheckpointManagerSpec.scala
index 9e42e85..bf50fad 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/CheckpointManagerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/CheckpointManagerSpec.scala
@@ -25,7 +25,7 @@ import org.scalatest.mock.MockitoSugar
import org.scalatest.prop.PropertyChecks
import org.scalatest.{Matchers, PropSpec}
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.streaming.transaction.api.CheckpointStore
class CheckpointManagerSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
@@ -34,7 +34,7 @@ class CheckpointManagerSpec extends PropSpec with PropertyChecks with Matchers w
val checkpointIntervalGen = Gen.chooseNum[Long](100L, 10000L)
property("CheckpointManager should recover from CheckpointStore") {
forAll(timestampGen, checkpointIntervalGen) {
- (timestamp: TimeStamp, checkpointInterval: Long) =>
+ (timestamp: MilliSeconds, checkpointInterval: Long) =>
val checkpointStore = mock[CheckpointStore]
val checkpointManager =
new CheckpointManager(checkpointInterval, checkpointStore)
@@ -47,7 +47,7 @@ class CheckpointManagerSpec extends PropSpec with PropertyChecks with Matchers w
property("CheckpointManager should write checkpoint to CheckpointStore") {
val checkpointGen = Gen.alphaStr.map(_.getBytes("UTF-8"))
forAll(timestampGen, checkpointIntervalGen, checkpointGen) {
- (timestamp: TimeStamp, checkpointInterval: Long, checkpoint: Array[Byte]) =>
+ (timestamp: MilliSeconds, checkpointInterval: Long, checkpoint: Array[Byte]) =>
val checkpointStore = mock[CheckpointStore]
val checkpointManager =
new CheckpointManager(checkpointInterval, checkpointStore)
@@ -70,7 +70,7 @@ class CheckpointManagerSpec extends PropSpec with PropertyChecks with Matchers w
property("CheckpointManager should update checkpoint time according to max message timestamp") {
forAll(timestampGen, checkpointIntervalGen) {
- (timestamp: TimeStamp, checkpointInterval: Long) =>
+ (timestamp: MilliSeconds, checkpointInterval: Long) =>
val checkpointStore = mock[CheckpointStore]
val checkpointManager =
new CheckpointManager(checkpointInterval, checkpointStore)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/NonWindowStateSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/NonWindowStateSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/NonWindowStateSpec.scala
index 4cdff95..41b0624 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/NonWindowStateSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/NonWindowStateSpec.scala
@@ -26,7 +26,7 @@ import org.scalatest.mock.MockitoSugar
import org.scalatest.prop.PropertyChecks
import org.scalatest.{Matchers, PropSpec}
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.streaming.state.api.{Monoid, Serializer}
class NonWindowStateSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
@@ -35,7 +35,7 @@ class NonWindowStateSpec extends PropSpec with PropertyChecks with Matchers with
property("NonWindowState should recover checkpointed state at given timestamp") {
forAll(longGen) {
- (timestamp: TimeStamp) =>
+ (timestamp: MilliSeconds) =>
val monoid = mock[Monoid[AnyRef]]
val serializer = mock[Serializer[AnyRef]]
val bytes = Array.empty[Byte]
@@ -61,7 +61,7 @@ class NonWindowStateSpec extends PropSpec with PropertyChecks with Matchers with
property("NonWindowState checkpoints state") {
forAll(longGen) {
- (checkpointTime: TimeStamp) =>
+ (checkpointTime: MilliSeconds) =>
val monoid = mock[Monoid[AnyRef]]
val serializer = mock[Serializer[AnyRef]]
@@ -95,7 +95,7 @@ class NonWindowStateSpec extends PropSpec with PropertyChecks with Matchers with
property("NonWindowState updates state") {
forAll(longGen) {
- (checkpointTime: TimeStamp) =>
+ (checkpointTime: MilliSeconds) =>
val monoid = mock[Monoid[AnyRef]]
val serializer = mock[Serializer[AnyRef]]
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowSpec.scala
index d9282ae..9975f49 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowSpec.scala
@@ -23,7 +23,7 @@ import org.scalatest.mock.MockitoSugar
import org.scalatest.prop.PropertyChecks
import org.scalatest.{Matchers, PropSpec}
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
class WindowSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
@@ -32,7 +32,7 @@ class WindowSpec extends PropSpec with PropertyChecks with Matchers with Mockito
val timestampGen = Gen.chooseNum[Long](0L, 1000L)
property("Window should only slide when time passes window end") {
forAll(timestampGen, windowSizeGen, windowStepGen) {
- (timestamp: TimeStamp, windowSize: Long, windowStep: Long) =>
+ (timestamp: MilliSeconds, windowSize: Long, windowStep: Long) =>
val window = new Window(windowSize, windowStep)
window.shouldSlide shouldBe false
window.update(timestamp)
@@ -42,7 +42,7 @@ class WindowSpec extends PropSpec with PropertyChecks with Matchers with Mockito
property("Window should slide by one or to given timestamp") {
forAll(timestampGen, windowSizeGen, windowStepGen) {
- (timestamp: TimeStamp, windowSize: Long, windowStep: Long) =>
+ (timestamp: MilliSeconds, windowSize: Long, windowStep: Long) =>
val window = new Window(windowSize, windowStep)
window.range shouldBe(0L, windowSize)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowStateSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowStateSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowStateSpec.scala
index 299a626..2b784bf 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowStateSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowStateSpec.scala
@@ -18,16 +18,15 @@
package org.apache.gearpump.streaming.state.impl
+import org.apache.gearpump.Time.MilliSeconds
+
import scala.collection.immutable.TreeMap
import scala.util.Success
-
import org.mockito.Mockito._
import org.scalacheck.Gen
import org.scalatest.mock.MockitoSugar
import org.scalatest.prop.PropertyChecks
import org.scalatest.{Matchers, PropSpec}
-
-import org.apache.gearpump._
import org.apache.gearpump.streaming.MockUtil
import org.apache.gearpump.streaming.state.api.{Group, Serializer}
@@ -74,7 +73,7 @@ class WindowStateSpec extends PropSpec with PropertyChecks with Matchers with Mo
}
property("WindowState checkpoints") {
- forAll(longGen) { (checkpointTime: TimeStamp) =>
+ forAll(longGen) { (checkpointTime: MilliSeconds) =>
val window = mock[Window]
val taskContext = MockUtil.mockTaskContext
val group = mock[Group[AnyRef]]
@@ -120,7 +119,7 @@ class WindowStateSpec extends PropSpec with PropertyChecks with Matchers with Mo
}
property("WindowState updates state") {
- forAll(longGen) { (checkpointTime: TimeStamp) =>
+ forAll(longGen) { (checkpointTime: MilliSeconds) =>
val window = mock[Window]
val taskContext = MockUtil.mockTaskContext
val group = mock[Group[AnyRef]]
@@ -205,7 +204,7 @@ class WindowStateSpec extends PropSpec with PropertyChecks with Matchers with Mo
property("WindowState gets interval for timestamp") {
forAll(longGen, longGen, longGen, longGen) {
- (timestamp: TimeStamp, checkpointTime: TimeStamp, windowSize: Long, windowStep: Long) =>
+ (timestamp: MilliSeconds, checkpointTime: MilliSeconds, windowSize: Long, windowStep: Long) =>
val windowManager = new Window(windowSize, windowStep)
val taskContext = MockUtil.mockTaskContext
val group = mock[Group[AnyRef]]
@@ -225,8 +224,8 @@ class WindowStateSpec extends PropSpec with PropertyChecks with Matchers with Mo
interval.endTime shouldBe nextInterval.startTime
}
- def intervalSpec(interval: Interval, timestamp: TimeStamp,
- checkpointTime: TimeStamp, windowSize: Long, windowStep: Long): Unit = {
+ def intervalSpec(interval: Interval, timestamp: MilliSeconds,
+ checkpointTime: MilliSeconds, windowSize: Long, windowStep: Long): Unit = {
interval.startTime should be <= interval.endTime
timestamp / windowStep * windowStep should (be <= interval.startTime)
(timestamp - windowSize) / windowStep * windowStep should (be <= interval.startTime)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
index 285bf44..b05befa 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
@@ -24,7 +24,7 @@ import java.util.Random
import org.mockito.Mockito._
import org.scalatest.mock.MockitoSugar
import org.scalatest.{FlatSpec, Matchers}
-import org.apache.gearpump.{MIN_TIME_MILLIS, Message}
+import org.apache.gearpump.{Message, Time}
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner}
import org.apache.gearpump.streaming.source.Watermark
@@ -73,14 +73,14 @@ class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar {
subscription.sendMessage(msg1)
verify(sender, times(1)).transport(msg1, TaskId(1, 1))
- assert(subscription.watermark == MIN_TIME_MILLIS)
+ assert(subscription.watermark == Time.MIN_TIME_MILLIS)
val msg2 = Message("0", timestamp = Instant.ofEpochMilli(50))
when(sender.getProcessingWatermark).thenReturn(msg2.timestamp)
subscription.sendMessage(msg2)
verify(sender, times(1)).transport(msg2, TaskId(1, 0))
- assert(subscription.watermark == MIN_TIME_MILLIS)
+ assert(subscription.watermark == Time.MIN_TIME_MILLIS)
val initialMinClock = subscription.watermark
[2/2] incubator-gearpump git commit: [GEARPUMP-338] Improve time
related types and constants
Posted by ma...@apache.org.
[GEARPUMP-338] Improve time related types and constants
Author: manuzhang <ow...@gmail.com>
Closes #208 from manuzhang/fix_time.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/f96aca99
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/f96aca99
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/f96aca99
Branch: refs/heads/master
Commit: f96aca995d6acd0770529a39fb25a4c4ef18c31e
Parents: b6f5ccd
Author: manuzhang <ow...@gmail.com>
Authored: Fri Aug 4 10:11:58 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Fri Aug 4 10:12:12 2017 +0800
----------------------------------------------------------------------
.../scala/org/apache/gearpump/Message.scala | 6 ++-
.../main/scala/org/apache/gearpump/Time.scala | 34 +++++++++++++
.../gearpump/cluster/ClusterMessage.scala | 20 ++++----
.../appmaster/ApplicationRuntimeInfo.scala | 12 ++---
.../gearpump/cluster/master/AppManager.scala | 4 +-
.../gearpump/cluster/scheduler/Scheduler.scala | 4 +-
.../scala/org/apache/gearpump/package.scala | 29 -----------
.../gearpump/util/HistoryMetricsService.scala | 4 +-
.../gearpump/util/RestartPolicySpec.scala | 2 -
.../producer/StormSpoutOutputCollector.scala | 8 +--
.../storm/topology/GearpumpStormComponent.scala | 5 +-
.../storm/topology/GearpumpTuple.scala | 6 +--
.../storm/util/StormOutputCollector.scala | 9 ++--
.../storm/topology/GearpumpTupleSpec.scala | 4 +-
external/hadoopfs/README.md | 2 +-
.../hadoop/HadoopCheckpointStore.scala | 6 +--
.../lib/HadoopCheckpointStoreReader.scala | 8 +--
.../lib/HadoopCheckpointStoreWriter.scala | 4 +-
.../lib/rotation/FileSizeRotationSpec.scala | 4 +-
.../kafka/lib/source/AbstractKafkaSource.scala | 5 +-
.../streaming/kafka/lib/store/KafkaStore.scala | 10 ++--
.../streaming/kafka/KafkaStoreSpec.scala | 16 +++---
.../gearpump/streaming/ClusterMessage.scala | 4 +-
.../gearpump/streaming/StreamApplication.scala | 12 ++---
.../streaming/appmaster/AppMaster.scala | 6 +--
.../streaming/appmaster/ClockService.scala | 53 ++++++++++----------
.../streaming/appmaster/JarScheduler.scala | 6 +--
.../appmaster/StreamAppMasterSummary.scala | 8 +--
.../streaming/appmaster/TaskManager.scala | 6 +--
.../dsl/window/api/WindowFunction.scala | 9 ++--
.../streaming/dsl/window/impl/Window.scala | 4 +-
.../streaming/metrics/ProcessorAggregator.scala | 12 ++---
.../gearpump/streaming/source/Watermark.scala | 11 ++--
.../streaming/state/api/MonoidState.scala | 6 +--
.../streaming/state/api/PersistentState.scala | 8 +--
.../streaming/state/api/PersistentTask.scala | 5 +-
.../state/impl/CheckpointManager.scala | 14 +++---
.../state/impl/InMemoryCheckpointStore.scala | 10 ++--
.../streaming/state/impl/NonWindowState.scala | 6 +--
.../gearpump/streaming/state/impl/Window.scala | 10 ++--
.../streaming/state/impl/WindowState.scala | 16 +++---
.../streaming/task/SerializedMessage.scala | 4 +-
.../gearpump/streaming/task/Subscription.scala | 13 ++---
.../apache/gearpump/streaming/task/Task.scala | 5 +-
.../gearpump/streaming/task/TaskActor.scala | 9 ++--
.../streaming/task/TaskControlMessage.scala | 14 +++---
.../gearpump/streaming/task/TaskWrapper.scala | 7 +--
.../transaction/api/CheckpointStore.scala | 6 +--
.../transaction/api/TimeStampFilter.scala | 7 +--
.../streaming/appmaster/TaskManagerSpec.scala | 4 +-
.../state/impl/CheckpointManagerSpec.scala | 8 +--
.../state/impl/NonWindowStateSpec.scala | 8 +--
.../streaming/state/impl/WindowSpec.scala | 6 +--
.../streaming/state/impl/WindowStateSpec.scala | 15 +++---
.../streaming/task/SubscriptionSpec.scala | 6 +--
55 files changed, 270 insertions(+), 250 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/core/src/main/scala/org/apache/gearpump/Message.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/Message.scala b/core/src/main/scala/org/apache/gearpump/Message.scala
index 4dc5c09..7051396 100644
--- a/core/src/main/scala/org/apache/gearpump/Message.scala
+++ b/core/src/main/scala/org/apache/gearpump/Message.scala
@@ -20,6 +20,8 @@ package org.apache.gearpump
import java.time.Instant
+import org.apache.gearpump.Time.MilliSeconds
+
trait Message {
val value: Any
@@ -35,7 +37,7 @@ trait Message {
*
* @param value Accept any type except Null, Nothing and Unit
*/
-case class DefaultMessage(value: Any, timeInMillis: TimeStamp) extends Message {
+case class DefaultMessage(value: Any, timeInMillis: MilliSeconds) extends Message {
/**
* @param value Accept any type except Null, Nothing and Unit
@@ -74,7 +76,7 @@ object Message {
* @param value Accept any type except Null, Nothing and Unit
* @param timestamp timestamp must be smaller than Long.MaxValue
*/
- def apply(value: Any, timestamp: TimeStamp): Message = {
+ def apply(value: Any, timestamp: MilliSeconds): Message = {
DefaultMessage(value, timestamp)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/core/src/main/scala/org/apache/gearpump/Time.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/Time.scala b/core/src/main/scala/org/apache/gearpump/Time.scala
new file mode 100644
index 0000000..054becf
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/Time.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump
+
+/**
+ * Types and constants of time in gearpump
+ */
+object Time {
+ type MilliSeconds = Long
+
+ // maximum valid time that won't overflow when being converted to milli-seconds
+ // Long.MaxValue is reserved for unreachable time
+ val MAX_TIME_MILLIS: Long = Long.MaxValue - 1
+
+ // minimum valid time won't overflow when being converted to milli-seconds
+ val MIN_TIME_MILLIS: Long = Long.MinValue
+
+ val UNREACHABLE: Long = Long.MaxValue
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala b/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala
index e8956ac..8a067b5 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala
@@ -23,7 +23,7 @@ import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary}
import scala.util.Try
import akka.actor.ActorRef
import com.typesafe.config.Config
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.cluster.appmaster.WorkerInfo
import org.apache.gearpump.cluster.master.MasterSummary
import org.apache.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest}
@@ -142,7 +142,7 @@ object MasterToClient {
case class MasterConfig(config: Config)
- case class HistoryMetricsItem(time: TimeStamp, value: MetricType)
+ case class HistoryMetricsItem(time: MilliSeconds, value: MetricType)
/**
* History metrics returned from master, worker, or app master.
@@ -157,7 +157,7 @@ object MasterToClient {
case class HistoryMetrics(path: String, metrics: List[HistoryMetricsItem])
/** Return the last error of this streaming application job */
- case class LastFailure(time: TimeStamp, error: String)
+ case class LastFailure(time: MilliSeconds, error: String)
sealed trait ApplicationResult
@@ -208,8 +208,8 @@ object AppMasterToMaster {
def appName: String
def actorPath: String
def status: ApplicationStatus
- def startTime: TimeStamp
- def uptime: TimeStamp
+ def startTime: MilliSeconds
+ def uptime: MilliSeconds
def user: String
}
@@ -220,8 +220,8 @@ object AppMasterToMaster {
appName: String = null,
actorPath: String = null,
status: ApplicationStatus = ApplicationStatus.ACTIVE,
- startTime: TimeStamp = 0L,
- uptime: TimeStamp = 0L,
+ startTime: MilliSeconds = 0L,
+ uptime: MilliSeconds = 0L,
user: String = null)
extends AppMasterSummary
@@ -244,7 +244,7 @@ object AppMasterToMaster {
* Denotes the application state change of an app.
*/
case class ApplicationStatusChanged(appId: Int, newStatus: ApplicationStatus,
- timeStamp: TimeStamp, error: Throwable)
+ timeStamp: MilliSeconds, error: Throwable)
}
object MasterToAppMaster {
@@ -263,8 +263,8 @@ object MasterToAppMaster {
sealed trait StreamingType
case class AppMasterData(status: ApplicationStatus, appId: Int = 0, appName: String = null,
- appMasterPath: String = null, workerPath: String = null, submissionTime: TimeStamp = 0,
- startTime: TimeStamp = 0, finishTime: TimeStamp = 0, user: String = null)
+ appMasterPath: String = null, workerPath: String = null, submissionTime: MilliSeconds = 0,
+ startTime: MilliSeconds = 0, finishTime: MilliSeconds = 0, user: String = null)
case class AppMasterDataRequest(appId: Int, detail: Boolean = false)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationRuntimeInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationRuntimeInfo.scala b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationRuntimeInfo.scala
index d9b73e2..1054628 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationRuntimeInfo.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationRuntimeInfo.scala
@@ -20,7 +20,7 @@ package org.apache.gearpump.cluster.appmaster
import akka.actor.ActorRef
import com.typesafe.config.{Config, ConfigFactory}
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.cluster.{ApplicationStatus, ApplicationTerminalStatus}
/** Run time info of Application */
@@ -31,9 +31,9 @@ case class ApplicationRuntimeInfo(
appMaster: ActorRef = ActorRef.noSender,
worker: ActorRef = ActorRef.noSender,
user: String = "",
- submissionTime: TimeStamp = 0,
- startTime: TimeStamp = 0,
- finishTime: TimeStamp = 0,
+ submissionTime: MilliSeconds = 0,
+ startTime: MilliSeconds = 0,
+ finishTime: MilliSeconds = 0,
config: Config = ConfigFactory.empty(),
status: ApplicationStatus = ApplicationStatus.NONEXIST) {
@@ -41,11 +41,11 @@ case class ApplicationRuntimeInfo(
this.copy(appMaster = appMaster, worker = worker)
}
- def onAppMasterActivated(timeStamp: TimeStamp): ApplicationRuntimeInfo = {
+ def onAppMasterActivated(timeStamp: MilliSeconds): ApplicationRuntimeInfo = {
this.copy(startTime = timeStamp, status = ApplicationStatus.ACTIVE)
}
- def onFinalStatus(timeStamp: TimeStamp, finalStatus: ApplicationTerminalStatus):
+ def onFinalStatus(timeStamp: MilliSeconds, finalStatus: ApplicationTerminalStatus):
ApplicationRuntimeInfo = {
this.copy(finishTime = timeStamp, status = finalStatus)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
index b00cc17..450d512 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
@@ -21,7 +21,7 @@ package org.apache.gearpump.cluster.master
import akka.actor._
import akka.pattern.ask
import com.typesafe.config.ConfigFactory
-import org.apache.gearpump._
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, SaveAppDataFailed, _}
import org.apache.gearpump.cluster.AppMasterToWorker._
import org.apache.gearpump.cluster.{ApplicationStatus, ApplicationTerminalStatus}
@@ -227,7 +227,7 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch
}
private def onApplicationStatusChanged(appId: Int, newStatus: ApplicationStatus,
- timeStamp: TimeStamp, error: Throwable): Unit = {
+ timeStamp: MilliSeconds, error: Throwable): Unit = {
applicationRegistry.get(appId) match {
case Some(appRuntimeInfo) =>
if (appRuntimeInfo.status.canTransitTo(newStatus)) {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala b/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala
index ec9f1ba..1329127 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala
@@ -18,7 +18,7 @@
package org.apache.gearpump.cluster.scheduler
import akka.actor.{Actor, ActorRef}
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, UpdateResourceSucceed, WorkerRegistered}
import org.apache.gearpump.cluster.WorkerToMaster.ResourceUpdate
import org.apache.gearpump.cluster.master.Master.WorkerTerminated
@@ -71,7 +71,7 @@ abstract class Scheduler extends Actor {
object Scheduler {
case class PendingRequest(
- appId: Int, appMaster: ActorRef, request: ResourceRequest, timeStamp: TimeStamp)
+ appId: Int, appMaster: ActorRef, request: ResourceRequest, timeStamp: MilliSeconds)
case class ApplicationFinished(appId: Int)
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/core/src/main/scala/org/apache/gearpump/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/package.scala b/core/src/main/scala/org/apache/gearpump/package.scala
deleted file mode 100644
index 6e20277..0000000
--- a/core/src/main/scala/org/apache/gearpump/package.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache
-
-package object gearpump {
- type TimeStamp = Long
-
- // maximum time won't overflow when converted to milli-seconds
- val MAX_TIME_MILLIS: Long = Long.MaxValue - 1
-
- // minimum time won't overflow when converted to milli-seconds
- val MIN_TIME_MILLIS: Long = Long.MinValue
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala b/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala
index ee59678..d45d761 100644
--- a/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala
+++ b/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala
@@ -25,7 +25,7 @@ import akka.actor.Actor
import com.typesafe.config.Config
import org.slf4j.Logger
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, ReadOption}
import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem}
import org.apache.gearpump.metrics.Metrics._
@@ -217,7 +217,7 @@ object HistoryMetricsService {
add(inputMetrics, System.currentTimeMillis())
}
- def add(inputMetrics: MetricType, now: TimeStamp): Unit = {
+ def add(inputMetrics: MetricType, now: MilliSeconds): Unit = {
val metrics = HistoryMetricsItem(now, inputMetrics)
latest = List(metrics)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/core/src/test/scala/org/apache/gearpump/util/RestartPolicySpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/util/RestartPolicySpec.scala b/core/src/test/scala/org/apache/gearpump/util/RestartPolicySpec.scala
index 5d0c66d..2dcae2f 100644
--- a/core/src/test/scala/org/apache/gearpump/util/RestartPolicySpec.scala
+++ b/core/src/test/scala/org/apache/gearpump/util/RestartPolicySpec.scala
@@ -20,8 +20,6 @@ package org.apache.gearpump.util
import org.scalatest.{FlatSpec, Matchers}
-import scala.concurrent.duration._
-
class RestartPolicySpec extends FlatSpec with Matchers {
"RestartPolicy" should "forbid too many restarts" in {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala
index 5794b1d..9b9bea7 100644
--- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala
@@ -21,10 +21,10 @@ package org.apache.gearpump.experiments.storm.producer
import java.util.{List => JList}
import backtype.storm.spout.{ISpout, ISpoutOutputCollector}
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.experiments.storm.util.StormOutputCollector
-case class PendingMessage(id: Object, messageTime: TimeStamp, startTime: TimeStamp)
+case class PendingMessage(id: Object, messageTime: MilliSeconds, startTime: MilliSeconds)
/**
* this is used by Storm Spout to emit messages
@@ -57,7 +57,7 @@ private[storm] class StormSpoutOutputCollector(
setPendingOrAck(messageId, curTime, curTime)
}
- def ackPendingMessage(checkpointClock: TimeStamp): Unit = {
+ def ackPendingMessage(checkpointClock: MilliSeconds): Unit = {
this.checkpointClock = checkpointClock
nextPendingMessage.foreach { case PendingMessage(_, messageTime, _) =>
if (messageTime <= this.checkpointClock) {
@@ -83,7 +83,7 @@ private[storm] class StormSpoutOutputCollector(
nextPendingMessage = None
}
- private def setPendingOrAck(messageId: Object, startTime: TimeStamp, messageTime: TimeStamp)
+ private def setPendingOrAck(messageId: Object, startTime: MilliSeconds, messageTime: MilliSeconds)
: Unit = {
if (ackEnabled) {
val newPendingMessage = PendingMessage(messageId, messageTime, startTime)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
index 248ca44..6aa5dc9 100644
--- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
@@ -43,7 +43,8 @@ import org.apache.gearpump.experiments.storm.util.{StormOutputCollector, StormUt
import org.apache.gearpump.streaming.DAG
import org.apache.gearpump.streaming.task.{GetDAG, TaskContext, TaskId}
import org.apache.gearpump.util.{Constants, LogUtil}
-import org.apache.gearpump.{Message, TimeStamp}
+import org.apache.gearpump.Message
+import org.apache.gearpump.Time.MilliSeconds
import org.slf4j.Logger
import scala.collection.JavaConverters._
@@ -149,7 +150,7 @@ object GearpumpStormComponent {
}
}
- def checkpoint(clock: TimeStamp): Unit = {
+ def checkpoint(clock: MilliSeconds): Unit = {
collector.ackPendingMessage(clock)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala
index eb61acb..9f2fa1f 100644
--- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala
@@ -23,7 +23,7 @@ import java.util.{List => JList}
import backtype.storm.task.GeneralTopologyContext
import backtype.storm.tuple.{Tuple, TupleImpl}
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
/**
* this carries Storm tuple values in the Gearpump world
@@ -42,7 +42,7 @@ private[storm] class GearpumpTuple(
* @param topologyContext topology context used for all tasks
* @return a Tuple
*/
- def toTuple(topologyContext: GeneralTopologyContext, timestamp: TimeStamp): Tuple = {
+ def toTuple(topologyContext: GeneralTopologyContext, timestamp: MilliSeconds): Tuple = {
TimedTuple(topologyContext, values, sourceTaskId, sourceStreamId, timestamp)
}
@@ -64,6 +64,6 @@ private[storm] class GearpumpTuple(
}
case class TimedTuple(topologyContext: GeneralTopologyContext, tuple: JList[AnyRef],
- sourceTaskId: Integer, sourceStreamId: String, timestamp: TimeStamp)
+ sourceTaskId: Integer, sourceStreamId: String, timestamp: MilliSeconds)
extends TupleImpl(topologyContext, tuple, sourceTaskId, sourceStreamId, null)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala
index fd023a9..a95725e 100644
--- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala
@@ -28,7 +28,8 @@ import backtype.storm.task.TopologyContext
import backtype.storm.tuple.Fields
import backtype.storm.utils.Utils
import org.slf4j.Logger
-import org.apache.gearpump.{MIN_TIME_MILLIS, Message, TimeStamp}
+import org.apache.gearpump.{Message, Time}
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.experiments.storm.topology.GearpumpTuple
import org.apache.gearpump.experiments.storm.util.StormUtil._
import org.apache.gearpump.streaming.ProcessorId
@@ -56,7 +57,7 @@ object StormOutputCollector {
streamGroupers, componentToProcessorId, values)
}
new StormOutputCollector(stormTaskId, taskToComponent, targets, getTargetPartitionsFn,
- taskContext, MIN_TIME_MILLIS)
+ taskContext, Time.MIN_TIME_MILLIS)
}
/**
@@ -164,7 +165,7 @@ class StormOutputCollector(
targets: JMap[String, JMap[String, Grouping]],
getTargetPartitionsFn: (String, JList[AnyRef]) => (Map[String, Array[Int]], JList[Integer]),
val taskContext: TaskContext,
- private var timestamp: TimeStamp) {
+ private var timestamp: MilliSeconds) {
import org.apache.gearpump.experiments.storm.util.StormOutputCollector._
/**
@@ -213,7 +214,7 @@ class StormOutputCollector(
/**
* set timestamp from each incoming Message if not attached.
*/
- def setTimestamp(timestamp: TimeStamp): Unit = {
+ def setTimestamp(timestamp: MilliSeconds): Unit = {
this.timestamp = timestamp
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala
index f12e54f..dacbdfd 100644
--- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala
@@ -21,7 +21,7 @@ import java.util.{List => JList}
import backtype.storm.task.GeneralTopologyContext
import backtype.storm.tuple.Fields
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
import org.mockito.Mockito._
import org.scalacheck.Gen
import org.scalatest.mock.MockitoSugar
@@ -40,7 +40,7 @@ class GearpumpTupleSpec extends PropSpec with PropertyChecks with Matchers with
} yield new GearpumpTuple(values, new Integer(sourceTaskId), sourceStreamId, null)
forAll(tupleGen, Gen.alphaStr, Gen.chooseNum[Long](0, Long.MaxValue)) {
- (gearpumpTuple: GearpumpTuple, componentId: String, timestamp: TimeStamp) =>
+ (gearpumpTuple: GearpumpTuple, componentId: String, timestamp: MilliSeconds) =>
val topologyContext = mock[GeneralTopologyContext]
val fields = new Fields(gearpumpTuple.values.asScala.map(_.asInstanceOf[String]): _*)
when(topologyContext.getComponentId(gearpumpTuple.sourceTaskId)).thenReturn(componentId)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/external/hadoopfs/README.md
----------------------------------------------------------------------
diff --git a/external/hadoopfs/README.md b/external/hadoopfs/README.md
index 7a9aeef..b02c378 100644
--- a/external/hadoopfs/README.md
+++ b/external/hadoopfs/README.md
@@ -7,7 +7,7 @@ Gearpump components for interacting with HDFS file systems.
1. File Rotation interface
```scala
trait Rotation extends Serializable {
- def mark(timestamp: TimeStamp, offset: Long): Unit
+ def mark(timestamp: MilliSeconds, offset: Long): Unit
def shouldRotate: Boolean
def rotate: Unit
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala
index e26a2ee..5f3ca74 100644
--- a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala
+++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala
@@ -23,7 +23,7 @@ import java.time.Instant
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.slf4j.Logger
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.streaming.hadoop.lib.rotation.Rotation
import org.apache.gearpump.streaming.hadoop.lib.{HadoopCheckpointStoreReader, HadoopCheckpointStoreWriter}
import org.apache.gearpump.streaming.transaction.api.CheckpointStore
@@ -72,7 +72,7 @@ class HadoopCheckpointStore(
* b. closes current writer and reset
* c. rotation rotates
*/
- override def persist(timestamp: TimeStamp, checkpoint: Array[Byte]): Unit = {
+ override def persist(timestamp: MilliSeconds, checkpoint: Array[Byte]): Unit = {
curTime = timestamp
if (curWriter.isEmpty) {
curStartTime = curTime
@@ -110,7 +110,7 @@ class HadoopCheckpointStore(
* 5. looks for the checkpoint in the found store
* }}}
*/
- override def recover(timestamp: TimeStamp): Option[Array[Byte]] = {
+ override def recover(timestamp: MilliSeconds): Option[Array[Byte]] = {
var checkpoint: Option[Array[Byte]] = None
if (fs.exists(dir)) {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala
index 082e963..cce4b5d 100644
--- a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala
+++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala
@@ -23,15 +23,15 @@ import java.io.EOFException
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
class HadoopCheckpointStoreReader(
path: Path,
hadoopConfig: Configuration)
- extends Iterator[(TimeStamp, Array[Byte])] {
+ extends Iterator[(MilliSeconds, Array[Byte])] {
private val stream = HadoopUtil.getInputStream(path, hadoopConfig)
- private var nextTimeStamp: Option[TimeStamp] = None
+ private var nextTimeStamp: Option[MilliSeconds] = None
private var nextData: Option[Array[Byte]] = None
override def hasNext: Boolean = {
@@ -56,7 +56,7 @@ class HadoopCheckpointStoreReader(
}
}
- override def next(): (TimeStamp, Array[Byte]) = {
+ override def next(): (MilliSeconds, Array[Byte]) = {
val timeAndData = for {
time <- nextTimeStamp
data <- nextData
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala
index 11c12c4..ce7154a 100644
--- a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala
+++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala
@@ -21,12 +21,12 @@ package org.apache.gearpump.streaming.hadoop.lib
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
class HadoopCheckpointStoreWriter(path: Path, hadoopConfig: Configuration) {
private lazy val stream = HadoopUtil.getOutputStream(path, hadoopConfig)
- def write(timestamp: TimeStamp, data: Array[Byte]): Long = {
+ def write(timestamp: MilliSeconds, data: Array[Byte]): Long = {
stream.writeLong(timestamp)
stream.writeInt(data.length)
stream.write(data)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala b/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala
index a469956..8d0170e 100644
--- a/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala
+++ b/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala
@@ -23,7 +23,7 @@ import java.time.Instant
import org.scalacheck.Gen
import org.scalatest.prop.PropertyChecks
import org.scalatest.{Matchers, PropSpec}
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
class FileSizeRotationSpec extends PropSpec with PropertyChecks with Matchers {
@@ -31,7 +31,7 @@ class FileSizeRotationSpec extends PropSpec with PropertyChecks with Matchers {
val fileSizeGen = Gen.chooseNum[Long](1, Long.MaxValue)
property("FileSize rotation rotates on file size") {
- forAll(timestampGen, fileSizeGen) { (timestamp: TimeStamp, fileSize: Long) =>
+ forAll(timestampGen, fileSizeGen) { (timestamp: MilliSeconds, fileSize: Long) =>
val rotation = new FileSizeRotation(fileSize)
rotation.shouldRotate shouldBe false
rotation.mark(Instant.ofEpochMilli(timestamp), rotation.maxBytes / 2)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
index d5a8729..6633bf4 100644
--- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
@@ -36,7 +36,8 @@ import org.apache.gearpump.streaming.kafka.util.KafkaConfig.KafkaConfigFactory
import org.apache.gearpump.streaming.task.TaskContext
import org.apache.gearpump.streaming.transaction.api._
import org.apache.gearpump.util.LogUtil
-import org.apache.gearpump.{Message, TimeStamp}
+import org.apache.gearpump.Message
+import org.apache.gearpump.Time.MilliSeconds
import org.slf4j.Logger
object AbstractKafkaSource {
@@ -147,7 +148,7 @@ abstract class AbstractKafkaSource(
}
}
- private def maybeRecover(startTime: TimeStamp): Unit = {
+ private def maybeRecover(startTime: MilliSeconds): Unit = {
checkpointStores.foreach { case (tp, store) =>
for {
bytes <- store.recover(startTime)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/store/KafkaStore.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/store/KafkaStore.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/store/KafkaStore.scala
index e2450f4..dbbd0ea 100644
--- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/store/KafkaStore.scala
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/store/KafkaStore.scala
@@ -22,7 +22,7 @@ import java.util.Properties
import com.twitter.bijection.Injection
import kafka.api.OffsetRequest
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.streaming.kafka.lib.source.consumer.KafkaConsumer
import org.apache.gearpump.streaming.kafka.util.KafkaConfig
import org.apache.gearpump.streaming.kafka.util.KafkaConfig.KafkaConfigFactory
@@ -82,9 +82,9 @@ class KafkaStore private[kafka](
extends CheckpointStore {
import org.apache.gearpump.streaming.kafka.lib.store.KafkaStore._
- private var maxTime: TimeStamp = 0L
+ private var maxTime: MilliSeconds = 0L
- override def persist(time: TimeStamp, checkpoint: Array[Byte]): Unit = {
+ override def persist(time: MilliSeconds, checkpoint: Array[Byte]): Unit = {
// make sure checkpointed timestamp is monotonically increasing
// hence (1, 1), (3, 2), (2, 3) is checkpointed as (1, 1), (3, 2), (3, 3)
if (time > maxTime) {
@@ -98,14 +98,14 @@ class KafkaStore private[kafka](
LOG.debug("KafkaStore persisted state ({}, {})", key, value)
}
- override def recover(time: TimeStamp): Option[Array[Byte]] = {
+ override def recover(time: MilliSeconds): Option[Array[Byte]] = {
var checkpoint: Option[Array[Byte]] = None
optConsumer.foreach { consumer =>
while (consumer.hasNext && checkpoint.isEmpty) {
val kafkaMsg = consumer.next()
checkpoint = for {
k <- kafkaMsg.key
- t <- Injection.invert[TimeStamp, Array[Byte]](k).toOption
+ t <- Injection.invert[MilliSeconds, Array[Byte]](k).toOption
c = kafkaMsg.msg if t >= time
} yield c
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaStoreSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaStoreSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaStoreSpec.scala
index 67c64c4..da99d64 100644
--- a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaStoreSpec.scala
+++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaStoreSpec.scala
@@ -22,7 +22,7 @@ import java.util.Properties
import com.twitter.bijection.Injection
import kafka.api.OffsetRequest
import kafka.common.TopicAndPartition
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.streaming.MockUtil
import org.apache.gearpump.streaming.kafka.lib.source.consumer.{KafkaMessage, KafkaConsumer}
import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient
@@ -92,7 +92,7 @@ class KafkaStoreSpec extends PropSpec with PropertyChecks with Matchers with Moc
property("KafkaStore should read checkpoint from timestamp on recover") {
forAll(Gen.alphaStr, timestampGen) {
- (topic: String, recoverTime: TimeStamp) =>
+ (topic: String, recoverTime: MilliSeconds) =>
val consumer = mock[KafkaConsumer]
val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
val kafkaStore = new KafkaStore(topic, producer, Some(consumer))
@@ -104,7 +104,7 @@ class KafkaStoreSpec extends PropSpec with PropertyChecks with Matchers with Moc
}
forAll(Gen.alphaStr, timestampGen) {
- (topic: String, recoverTime: TimeStamp) =>
+ (topic: String, recoverTime: MilliSeconds) =>
val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
val kafkaStore = new KafkaStore(topic, producer, None)
@@ -113,12 +113,12 @@ class KafkaStoreSpec extends PropSpec with PropertyChecks with Matchers with Moc
}
forAll(Gen.alphaStr, timestampGen, timestampGen) {
- (topic: String, recoverTime: TimeStamp, checkpointTime: TimeStamp) =>
+ (topic: String, recoverTime: MilliSeconds, checkpointTime: MilliSeconds) =>
val consumer = mock[KafkaConsumer]
val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
val kafkaStore = new KafkaStore(topic, producer, Some(consumer))
- val key = Injection[TimeStamp, Array[Byte]](checkpointTime)
+ val key = Injection[MilliSeconds, Array[Byte]](checkpointTime)
val msg = key
val kafkaMsg = KafkaMessage(TopicAndPartition(topic, 0), 0, Some(key), msg)
@@ -139,7 +139,7 @@ class KafkaStoreSpec extends PropSpec with PropertyChecks with Matchers with Moc
property("KafkaStore persist should write checkpoint with monotonically increasing timestamp") {
forAll(Gen.alphaStr, timestampGen, Gen.alphaStr) {
- (topic: String, checkpointTime: TimeStamp, data: String) =>
+ (topic: String, checkpointTime: MilliSeconds, data: String) =>
val consumer = mock[KafkaConsumer]
val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
val kafkaStore = new KafkaStore(topic, producer, Some(consumer))
@@ -155,12 +155,12 @@ class KafkaStoreSpec extends PropSpec with PropertyChecks with Matchers with Moc
}
def verifyProducer(producer: Producer[Array[Byte], Array[Byte]], count: Int,
- topic: String, partition: Int, time: TimeStamp, data: String): Unit = {
+ topic: String, partition: Int, time: MilliSeconds, data: String): Unit = {
verify(producer, times(count)).send(
MockUtil.argMatch[ProducerRecord[Array[Byte], Array[Byte]]](record =>
record.topic() == topic
&& record.partition() == partition
- && Injection.invert[TimeStamp, Array[Byte]](record.key()).get == time
+ && Injection.invert[MilliSeconds, Array[Byte]](record.key()).get == time
&& Injection.invert[String, Array[Byte]](record.value()).get == data
))
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala
index d85d042..f0e4f84 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala
@@ -20,7 +20,7 @@ package org.apache.gearpump.streaming
import akka.actor.ActorRef
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.cluster.appmaster.WorkerInfo
import org.apache.gearpump.cluster.scheduler.Resource
import org.apache.gearpump.streaming.appmaster.TaskRegistry.TaskLocations
@@ -61,7 +61,7 @@ object AppMasterToExecutor {
case class StartAllTasks(dagVersion: Int)
case class StartDynamicDag(dagVersion: Int)
- case class TaskRegistered(taskId: TaskId, sessionId: Int, startClock: TimeStamp)
+ case class TaskRegistered(taskId: TaskId, sessionId: Int, startClock: MilliSeconds)
case class TaskRejected(taskId: TaskId)
case object RestartClockService
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
index 125612b..f15e1b3 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
@@ -20,7 +20,8 @@ package org.apache.gearpump.streaming
import scala.reflect.ClassTag
import akka.actor.ActorSystem
-import org.apache.gearpump.{MAX_TIME_MILLIS, MIN_TIME_MILLIS, TimeStamp}
+import org.apache.gearpump.Time
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.cluster._
import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner, PartitionerDescription, PartitionerObject}
import org.apache.gearpump.streaming.appmaster.AppMaster
@@ -101,8 +102,8 @@ object Processor {
* When input message's timestamp is beyond current processor's lifetime,
* then it will not be processed by this processor.
*/
-case class LifeTime(birth: TimeStamp, death: TimeStamp) {
- def contains(timestamp: TimeStamp): Boolean = {
+case class LifeTime(birth: MilliSeconds, death: MilliSeconds) {
+ def contains(timestamp: MilliSeconds): Boolean = {
timestamp >= birth && timestamp < death
}
@@ -112,8 +113,7 @@ case class LifeTime(birth: TimeStamp, death: TimeStamp) {
}
object LifeTime {
- // MAX_TIME_MILLIS is Long.MaxValue - 1
- val Immortal = LifeTime(MIN_TIME_MILLIS, MAX_TIME_MILLIS + 1)
+ val Immortal = LifeTime(Time.MIN_TIME_MILLIS, Time.UNREACHABLE)
}
/**
@@ -158,7 +158,7 @@ object StreamApplication {
val graph = dag.mapVertex { processor =>
val updatedProcessor = ProcessorToProcessorDescription(indices(processor), processor)
updatedProcessor
- }.mapEdge { (node1, edge, node2) =>
+ }.mapEdge { (_, edge, _) =>
PartitionerDescription(new PartitionerObject(
Option(edge).getOrElse(StreamApplication.hashPartitioner)))
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
index ba4b058..3c5c7da 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
@@ -21,7 +21,7 @@ package org.apache.gearpump.streaming.appmaster
import java.lang.management.ManagementFactory
import akka.actor._
-import org.apache.gearpump._
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.cluster.AppMasterToMaster.ApplicationStatusChanged
import org.apache.gearpump.cluster.ClientToMaster._
import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterActivated, AppMasterDataDetailRequest, ReplayFromTimestampWindowTrailingEdge}
@@ -67,7 +67,7 @@ class AppMaster(appContext: AppMasterContext, app: AppDescription) extends Appli
import akka.pattern.ask
private implicit val dispatcher = context.dispatcher
- private val startTime: TimeStamp = System.currentTimeMillis()
+ private val startTime: MilliSeconds = System.currentTimeMillis()
private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
LOG.info(s"AppMaster[$appId] is launched by $username, app: $app xxxxxxxxxxxxxxxxx")
@@ -322,7 +322,7 @@ class AppMaster(appContext: AppMasterContext, app: AppDescription) extends Appli
context.stop(self)
}
- private def getMinClock: Future[TimeStamp] = {
+ private def getMinClock: Future[MilliSeconds] = {
clockService match {
case Some(service) =>
(service ? GetLatestMinClock).asInstanceOf[Future[LatestMinClock]].map(_.clock)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
index b514d6f..90141d4 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
@@ -25,7 +25,8 @@ import java.util.concurrent.TimeUnit
import akka.actor.{Actor, ActorRef, Cancellable, Stash}
import com.google.common.primitives.Longs
-import org.apache.gearpump.{MIN_TIME_MILLIS, TimeStamp}
+import org.apache.gearpump.Time
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.cluster.ClientToMaster.GetStallingTasks
import org.apache.gearpump.streaming.AppMasterToMaster.StallingTasks
import org.apache.gearpump.streaming._
@@ -60,8 +61,8 @@ class ClockService(
LOG.info("Initializing Clock service, get snapshotted StartClock ....")
store.get(START_CLOCK).map { clock =>
// check for null first since
- // (null).asInstanceOf[TimeStamp] is zero
- val startClock = if (clock != null) clock.asInstanceOf[TimeStamp] else MIN_TIME_MILLIS
+ // (null).asInstanceOf[MilliSeconds] is zero
+ val startClock = if (clock != null) clock.asInstanceOf[MilliSeconds] else Time.MIN_TIME_MILLIS
minCheckpointClock = Some(startClock)
@@ -88,32 +89,32 @@ class ClockService(
// We use Array instead of List for Performance consideration
private var processorClocks = Array.empty[ProcessorClock]
- private var checkpointClocks: Map[TaskId, Vector[TimeStamp]] = _
+ private var checkpointClocks: Map[TaskId, Vector[MilliSeconds]] = _
- private var minCheckpointClock: Option[TimeStamp] = None
+ private var minCheckpointClock: Option[MilliSeconds] = None
private def checkpointEnabled(processor: ProcessorDescription): Boolean = {
val taskConf = processor.taskConf
taskConf != null && taskConf.getBoolean("state.checkpoint.enable").contains(true)
}
- private def resetCheckpointClocks(dag: DAG, startClock: TimeStamp): Unit = {
+ private def resetCheckpointClocks(dag: DAG, startClock: MilliSeconds): Unit = {
this.checkpointClocks = dag.processors.filter(startClock < _._2.life.death)
.filter { case (_, processor) =>
checkpointEnabled(processor)
}.flatMap { case (id, processor) =>
- (0 until processor.parallelism).map(TaskId(id, _) -> Vector.empty[TimeStamp])
+ (0 until processor.parallelism).map(TaskId(id, _) -> Vector.empty[MilliSeconds])
}
if (this.checkpointClocks.isEmpty) {
minCheckpointClock = None
}
}
- private def initDag(startClock: TimeStamp): Unit = {
+ private def initDag(startClock: MilliSeconds): Unit = {
recoverDag(this.dag, startClock)
}
- private def recoverDag(dag: DAG, startClock: TimeStamp): Unit = {
+ private def recoverDag(dag: DAG, startClock: MilliSeconds): Unit = {
this.clocks = dag.processors.filter(startClock < _._2.life.death).
map { pair =>
val (processorId, processor) = pair
@@ -130,7 +131,7 @@ class ClockService(
resetCheckpointClocks(dag, startClock)
}
- private def dynamicDAG(dag: DAG, startClock: TimeStamp): Unit = {
+ private def dynamicDAG(dag: DAG, startClock: MilliSeconds): Unit = {
val newClocks = dag.processors.filter(startClock < _._2.life.death).
map { pair =>
val (processorId, processor) = pair
@@ -207,7 +208,7 @@ class ClockService(
}
}
- private def getUpStreamMinClock(processorId: ProcessorId): Option[TimeStamp] = {
+ private def getUpStreamMinClock(processorId: ProcessorId): Option[MilliSeconds] = {
upstreamClocks.get(processorId).map(ProcessorClocks.minClock)
}
@@ -303,7 +304,7 @@ class ClockService(
}
}
- private def minClock: TimeStamp = {
+ private def minClock: MilliSeconds = {
ProcessorClocks.minClock(processorClocks)
}
@@ -313,7 +314,7 @@ class ClockService(
healthChecker.check(minTimestamp, clocks, dag, System.currentTimeMillis())
}
- private def getStartClock: TimeStamp = {
+ private def getStartClock: MilliSeconds = {
minCheckpointClock.getOrElse(minClock)
}
@@ -321,7 +322,7 @@ class ClockService(
store.put(START_CLOCK, getStartClock)
}
- private def updateCheckpointClocks(task: TaskId, time: TimeStamp): Unit = {
+ private def updateCheckpointClocks(task: TaskId, time: MilliSeconds): Unit = {
val clocks = checkpointClocks(task) :+ time
checkpointClocks += task -> clocks
@@ -340,17 +341,17 @@ object ClockService {
case object HealthCheck
class ProcessorClock(val processorId: ProcessorId, val life: LifeTime, val parallelism: Int,
- private var _min: TimeStamp = MIN_TIME_MILLIS,
- private var _taskClocks: Array[TimeStamp] = null) {
+ private var _min: MilliSeconds = Time.MIN_TIME_MILLIS,
+ private var _taskClocks: Array[MilliSeconds] = null) {
def copy(life: LifeTime): ProcessorClock = {
new ProcessorClock(processorId, life, parallelism, _min, _taskClocks)
}
- def min: TimeStamp = _min
- def taskClocks: Array[TimeStamp] = _taskClocks
+ def min: MilliSeconds = _min
+ def taskClocks: Array[MilliSeconds] = _taskClocks
- def init(startClock: TimeStamp): Unit = {
+ def init(startClock: MilliSeconds): Unit = {
if (taskClocks == null) {
this._min = startClock
this._taskClocks = new Array(parallelism)
@@ -358,7 +359,7 @@ object ClockService {
}
}
- def updateMinClock(taskIndex: Int, clock: TimeStamp): Unit = {
+ def updateMinClock(taskIndex: Int, clock: MilliSeconds): Unit = {
taskClocks(taskIndex) = clock
_min = Longs.min(taskClocks: _*)
}
@@ -381,8 +382,8 @@ object ClockService {
/** Check for stalling tasks */
def check(
- currentMinClock: TimeStamp, processorClocks: Map[ProcessorId, ProcessorClock],
- dag: DAG, now: TimeStamp): Unit = {
+ currentMinClock: MilliSeconds, processorClocks: Map[ProcessorId, ProcessorClock],
+ dag: DAG, now: MilliSeconds): Unit = {
var isClockStalling = false
if (null == minClock || currentMinClock > minClock.appClock) {
minClock = ClockValue(systemClock = now, appClock = currentMinClock)
@@ -423,7 +424,7 @@ object ClockService {
}
object HealthChecker {
- case class ClockValue(systemClock: TimeStamp, appClock: TimeStamp) {
+ case class ClockValue(systemClock: MilliSeconds, appClock: MilliSeconds) {
def prettyPrint: String = {
"(system clock: " + new Date(systemClock).toString + ", app clock: " + appClock + ")"
}
@@ -433,7 +434,7 @@ object ClockService {
object ProcessorClocks {
// Get the Min clock of all processors
- def minClock(clock: Array[ProcessorClock]): TimeStamp = {
+ def minClock(clock: Array[ProcessorClock]): MilliSeconds = {
var i = 0
var min = if (clock.length == 0) 0L else clock(0).min
while (i < clock.length) {
@@ -445,7 +446,7 @@ object ClockService {
}
case class ChangeToNewDAG(dag: DAG)
- case class ChangeToNewDAGSuccess(clocks: Map[ProcessorId, TimeStamp])
+ case class ChangeToNewDAGSuccess(clocks: Map[ProcessorId, MilliSeconds])
- case class StoredStartClock(clock: TimeStamp)
+ case class StoredStartClock(clock: MilliSeconds)
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala
index e023cdf..e31f863 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala
@@ -20,7 +20,7 @@ package org.apache.gearpump.streaming.appmaster
import akka.actor._
import akka.pattern.ask
import com.typesafe.config.Config
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.cluster.AppJar
import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest}
import org.apache.gearpump.cluster.worker.WorkerId
@@ -47,7 +47,7 @@ class JarScheduler(appId: Int, appName: String, config: Config, factory: ActorRe
private implicit val timeout = Constants.FUTURE_TIMEOUT
/** Set the current DAG version active */
- def setDag(dag: DAG, startClock: Future[TimeStamp]): Unit = {
+ def setDag(dag: DAG, startClock: Future[MilliSeconds]): Unit = {
actor ! TransitToNewDag
startClock.map { start =>
actor ! NewDag(dag, start)
@@ -82,7 +82,7 @@ object JarScheduler {
case class ResourceRequestDetail(jar: AppJar, requests: Array[ResourceRequest])
- case class NewDag(dag: DAG, startTime: TimeStamp)
+ case class NewDag(dag: DAG, startTime: MilliSeconds)
case object TransitToNewDag
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
index 126ab92..1214cd0 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
@@ -18,7 +18,7 @@
package org.apache.gearpump.streaming.appmaster
-import org.apache.gearpump._
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.cluster.AppMasterToMaster.AppMasterSummary
import org.apache.gearpump.cluster.{ApplicationStatus, UserConfig}
import org.apache.gearpump.streaming.appmaster.AppMaster.ExecutorBrief
@@ -32,10 +32,10 @@ case class StreamAppMasterSummary(
appId: Int,
appName: String = null,
actorPath: String = null,
- clock: TimeStamp = 0L,
+ clock: MilliSeconds = 0L,
status: ApplicationStatus = ApplicationStatus.ACTIVE,
- startTime: TimeStamp = 0L,
- uptime: TimeStamp = 0L,
+ startTime: MilliSeconds = 0L,
+ uptime: MilliSeconds = 0L,
user: String = null,
homeDirectory: String = "",
logFile: String = "",
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
index 51c4de9..bae5c02 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
@@ -20,7 +20,7 @@ package org.apache.gearpump.streaming.appmaster
import akka.actor._
import akka.pattern.ask
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.cluster.MasterToAppMaster.ReplayFromTimestampWindowTrailingEdge
import org.apache.gearpump.streaming.AppMasterToExecutor._
import org.apache.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, RegisterTask, UnRegisterTask}
@@ -86,11 +86,11 @@ private[appmaster] class TaskManager(
dagManager ! WatchChange(watcher = self)
executorManager ! SetTaskManager(self)
- private def getStartClock: Future[TimeStamp] = {
+ private def getStartClock: Future[MilliSeconds] = {
(clockService ? GetStartClock).asInstanceOf[Future[StartClock]].map(_.clock)
}
- private var startClock: Future[TimeStamp] = getStartClock
+ private var startClock: Future[MilliSeconds] = getStartClock
def receive: Receive = applicationReady(DagReadyState.empty)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
index a2f51c7..85ca969 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
@@ -19,7 +19,8 @@ package org.apache.gearpump.streaming.dsl.window.api
import java.time.{Duration, Instant}
-import org.apache.gearpump.{MIN_TIME_MILLIS, MAX_TIME_MILLIS, TimeStamp}
+import org.apache.gearpump.Time
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.streaming.dsl.window.impl.Window
import scala.collection.mutable.ArrayBuffer
@@ -46,8 +47,8 @@ abstract class NonMergingWindowFunction extends WindowFunction {
object GlobalWindowFunction {
- val globalWindow = Array(Window(Instant.ofEpochMilli(MIN_TIME_MILLIS),
- Instant.ofEpochMilli(MAX_TIME_MILLIS)))
+ val globalWindow = Array(Window(Instant.ofEpochMilli(Time.MIN_TIME_MILLIS),
+ Instant.ofEpochMilli(Time.MAX_TIME_MILLIS)))
}
case class GlobalWindowFunction() extends NonMergingWindowFunction {
@@ -80,7 +81,7 @@ case class SlidingWindowFunction(size: Duration, step: Duration)
windows.toArray
}
- private def lastStartFor(timestamp: TimeStamp, windowStep: Long): TimeStamp = {
+ private def lastStartFor(timestamp: MilliSeconds, windowStep: Long): MilliSeconds = {
timestamp - (timestamp + windowStep) % windowStep
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
index 2425ff2..7536473 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
@@ -19,10 +19,10 @@ package org.apache.gearpump.streaming.dsl.window.impl
import java.time.Instant
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
object Window {
- def ofEpochMilli(startTime: TimeStamp, endTime: TimeStamp): Window = {
+ def ofEpochMilli(startTime: MilliSeconds, endTime: MilliSeconds): Window = {
Window(Instant.ofEpochMilli(startTime), Instant.ofEpochMilli(endTime))
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala
index 8f8b7ab..058d36b 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala
@@ -22,7 +22,7 @@ import java.util
import com.google.common.collect.Iterators
import com.typesafe.config.Config
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.cluster.ClientToMaster.ReadOption
import org.apache.gearpump.cluster.MasterToClient.HistoryMetricsItem
import org.apache.gearpump.metrics.Metrics.{Histogram, Meter}
@@ -64,7 +64,7 @@ class ProcessorAggregator(historyMetricConfig: HistoryMetricsConfig) extends Met
}
def aggregate(
- readOption: ReadOption.ReadOption, inputs: Iterator[HistoryMetricsItem], now: TimeStamp)
+ readOption: ReadOption.ReadOption, inputs: Iterator[HistoryMetricsItem], now: MilliSeconds)
: List[HistoryMetricsItem] = {
val (start, end, interval) = getTimeRange(readOption, now)
val timeSlotsCount = ((end - start - 1) / interval + 1).toInt
@@ -103,8 +103,8 @@ class ProcessorAggregator(historyMetricConfig: HistoryMetricsConfig) extends Met
}
// Returns (start, end, interval)
- private def getTimeRange(readOption: ReadOption.ReadOption, now: TimeStamp)
- : (TimeStamp, TimeStamp, TimeStamp) = {
+ private def getTimeRange(readOption: ReadOption.ReadOption, now: MilliSeconds)
+ : (MilliSeconds, MilliSeconds, MilliSeconds) = {
readOption match {
case ReadOption.ReadRecent =>
val end = now
@@ -229,7 +229,7 @@ object ProcessorAggregator {
var p99: Double = 0
var p999: Double = 0
- var startTime: TimeStamp = Long.MaxValue
+ var startTime: MilliSeconds = Long.MaxValue
override def aggregate(item: HistoryMetricsItem): Unit = {
val input = item.value.asInstanceOf[Histogram]
@@ -263,7 +263,7 @@ object ProcessorAggregator {
var m1: Double = 0
var rateUnit: String = null
- var startTime: TimeStamp = Long.MaxValue
+ var startTime: MilliSeconds = Long.MaxValue
override def aggregate(item: HistoryMetricsItem): Unit = {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
index 14abff8..607af85 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
@@ -19,7 +19,7 @@ package org.apache.gearpump.streaming.source
import java.time.Instant
-import org.apache.gearpump.{MAX_TIME_MILLIS, MIN_TIME_MILLIS, Message}
+import org.apache.gearpump.{Message, Time}
/**
* message used by source task to report source watermark.
@@ -28,9 +28,14 @@ case class Watermark(instant: Instant) {
def toMessage: Message = Message("watermark", instant)
}
+/**
+ * All input data with event times less than watermark have been observed
+ */
object Watermark {
- val MAX: Instant = Instant.ofEpochMilli(MAX_TIME_MILLIS + 1)
+ // all input data have been observed
+ val MAX: Instant = Instant.ofEpochMilli(Time.MAX_TIME_MILLIS + 1)
- val MIN: Instant = Instant.ofEpochMilli(MIN_TIME_MILLIS)
+ // no input data have been observed
+ val MIN: Instant = Instant.ofEpochMilli(Time.MIN_TIME_MILLIS)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/MonoidState.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/MonoidState.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/MonoidState.scala
index 0e2f83a..0118c07 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/MonoidState.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/MonoidState.scala
@@ -18,7 +18,7 @@
package org.apache.gearpump.streaming.state.api
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
/**
* MonoidState uses Algebird Monoid to aggregate state
@@ -37,11 +37,11 @@ abstract class MonoidState[T](monoid: Monoid[T]) extends PersistentState[T] {
override def get: Option[T] = Option(monoid.plus(left, right))
- override def setNextCheckpointTime(nextCheckpointTime: TimeStamp): Unit = {
+ override def setNextCheckpointTime(nextCheckpointTime: MilliSeconds): Unit = {
checkpointTime = nextCheckpointTime
}
- protected def updateState(timestamp: TimeStamp, t: T): Unit = {
+ protected def updateState(timestamp: MilliSeconds, t: T): Unit = {
if (timestamp < checkpointTime) {
left = monoid.plus(left, t)
} else {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentState.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentState.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentState.scala
index 906d331..39b17c9 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentState.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentState.scala
@@ -18,7 +18,7 @@
package org.apache.gearpump.streaming.state.api
-import org.apache.gearpump._
+import org.apache.gearpump.Time.MilliSeconds
/**
* PersistentState is part of the transaction API
@@ -33,19 +33,19 @@ trait PersistentState[T] {
* Recovers state to a previous checkpoint
* usually invoked by the framework
*/
- def recover(timestamp: TimeStamp, bytes: Array[Byte]): Unit
+ def recover(timestamp: MilliSeconds, bytes: Array[Byte]): Unit
/**
* Updates state on a new message
* this is invoked by user
*/
- def update(timestamp: TimeStamp, t: T): Unit
+ def update(timestamp: MilliSeconds, t: T): Unit
/**
* Sets next checkpoint time
* should be invoked by the framework
*/
- def setNextCheckpointTime(timeStamp: TimeStamp): Unit
+ def setNextCheckpointTime(timeStamp: MilliSeconds): Unit
/**
* Gets a binary snapshot of state
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
index df37ba1..3a3b0a7 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
@@ -20,12 +20,13 @@ package org.apache.gearpump.streaming.state.api
import java.time.Instant
+import org.apache.gearpump.Message
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.state.impl.{CheckpointManager, PersistentStateConfig}
import org.apache.gearpump.streaming.task.{Task, TaskContext, UpdateCheckpointClock}
import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
import org.apache.gearpump.util.LogUtil
-import org.apache.gearpump.{Message, TimeStamp}
object PersistentTask {
val LOG = LogUtil.getLogger(getClass)
@@ -97,7 +98,7 @@ abstract class PersistentTask[T](taskContext: TaskContext, conf: UserConfig)
checkpointManager.close()
}
- private def reportCheckpointClock(timestamp: TimeStamp): Unit = {
+ private def reportCheckpointClock(timestamp: MilliSeconds): Unit = {
appMaster ! UpdateCheckpointClock(taskContext.taskId, timestamp)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/CheckpointManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/CheckpointManager.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/CheckpointManager.scala
index 82b7952..7d9e92a 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/CheckpointManager.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/CheckpointManager.scala
@@ -18,7 +18,7 @@
package org.apache.gearpump.streaming.state.impl
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.streaming.transaction.api.CheckpointStore
/** Manage physical checkpoints to persitent storage like HDFS */
@@ -28,11 +28,11 @@ class CheckpointManager(checkpointInterval: Long,
private var maxMessageTime: Long = 0L
private var checkpointTime: Option[Long] = None
- def recover(timestamp: TimeStamp): Option[Array[Byte]] = {
+ def recover(timestamp: MilliSeconds): Option[Array[Byte]] = {
checkpointStore.recover(timestamp)
}
- def checkpoint(timestamp: TimeStamp, checkpoint: Array[Byte]): Option[TimeStamp] = {
+ def checkpoint(timestamp: MilliSeconds, checkpoint: Array[Byte]): Option[MilliSeconds] = {
checkpointStore.persist(timestamp, checkpoint)
checkpointTime = checkpointTime.collect { case time if maxMessageTime > time =>
time + (1 + (maxMessageTime - time) / checkpointInterval) * checkpointInterval
@@ -41,7 +41,7 @@ class CheckpointManager(checkpointInterval: Long,
checkpointTime
}
- def update(messageTime: TimeStamp): Option[TimeStamp] = {
+ def update(messageTime: MilliSeconds): Option[MilliSeconds] = {
maxMessageTime = Math.max(maxMessageTime, messageTime)
if (checkpointTime.isEmpty) {
checkpointTime = Some((1 + messageTime / checkpointInterval) * checkpointInterval)
@@ -50,15 +50,15 @@ class CheckpointManager(checkpointInterval: Long,
checkpointTime
}
- def shouldCheckpoint(upstreamMinClock: TimeStamp): Boolean = {
+ def shouldCheckpoint(upstreamMinClock: MilliSeconds): Boolean = {
checkpointTime.exists(time => upstreamMinClock >= time)
}
- def getCheckpointTime: Option[TimeStamp] = checkpointTime
+ def getCheckpointTime: Option[MilliSeconds] = checkpointTime
def close(): Unit = {
checkpointStore.close()
}
- private[impl] def getMaxMessageTime: TimeStamp = maxMessageTime
+ private[impl] def getMaxMessageTime: MilliSeconds = maxMessageTime
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala
index 8853d07..cecf127 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala
@@ -18,7 +18,7 @@
package org.apache.gearpump.streaming.state.impl
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.streaming.transaction.api.{CheckpointStore, CheckpointStoreFactory}
/**
@@ -26,18 +26,18 @@ import org.apache.gearpump.streaming.transaction.api.{CheckpointStore, Checkpoin
* should not be used in real cases
*/
class InMemoryCheckpointStore extends CheckpointStore {
- private var checkpoints = Map.empty[TimeStamp, Array[Byte]]
+ private var checkpoints = Map.empty[MilliSeconds, Array[Byte]]
- override def persist(timestamp: TimeStamp, checkpoint: Array[Byte]): Unit = {
+ override def persist(timestamp: MilliSeconds, checkpoint: Array[Byte]): Unit = {
checkpoints += timestamp -> checkpoint
}
- override def recover(timestamp: TimeStamp): Option[Array[Byte]] = {
+ override def recover(timestamp: MilliSeconds): Option[Array[Byte]] = {
checkpoints.get(timestamp)
}
override def close(): Unit = {
- checkpoints = Map.empty[TimeStamp, Array[Byte]]
+ checkpoints = Map.empty[MilliSeconds, Array[Byte]]
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/NonWindowState.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/NonWindowState.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/NonWindowState.scala
index b161713..1393f4a 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/NonWindowState.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/NonWindowState.scala
@@ -20,7 +20,7 @@ package org.apache.gearpump.streaming.state.impl
import org.slf4j.Logger
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.streaming.state.api.{Monoid, MonoidState, Serializer}
import org.apache.gearpump.streaming.state.impl.NonWindowState._
import org.apache.gearpump.util.LogUtil
@@ -35,11 +35,11 @@ object NonWindowState {
class NonWindowState[T](monoid: Monoid[T], serializer: Serializer[T])
extends MonoidState[T](monoid) {
- override def recover(timestamp: TimeStamp, bytes: Array[Byte]): Unit = {
+ override def recover(timestamp: MilliSeconds, bytes: Array[Byte]): Unit = {
serializer.deserialize(bytes).foreach(left = _)
}
- override def update(timestamp: TimeStamp, t: T): Unit = {
+ override def update(timestamp: MilliSeconds, t: T): Unit = {
updateState(timestamp, t)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/Window.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/Window.scala
index c1f647e..0318d3d 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/Window.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/Window.scala
@@ -17,7 +17,7 @@
*/
package org.apache.gearpump.streaming.state.impl
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
/**
* Used in window applications
@@ -29,10 +29,10 @@ class Window(val windowSize: Long, val windowStep: Long) {
this(windowConfig.windowSize, windowConfig.windowStep)
}
- private var clock: TimeStamp = 0L
+ private var clock: MilliSeconds = 0L
private var startTime = 0L
- def update(clock: TimeStamp): Unit = {
+ def update(clock: MilliSeconds): Unit = {
this.clock = clock
}
@@ -40,7 +40,7 @@ class Window(val windowSize: Long, val windowStep: Long) {
startTime += windowStep
}
- def slideTo(timestamp: TimeStamp): Unit = {
+ def slideTo(timestamp: MilliSeconds): Unit = {
startTime = timestamp / windowStep * windowStep
}
@@ -48,7 +48,7 @@ class Window(val windowSize: Long, val windowStep: Long) {
clock >= (startTime + windowSize)
}
- def range: (TimeStamp, TimeStamp) = {
+ def range: (MilliSeconds, MilliSeconds) = {
startTime -> (startTime + windowSize)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowState.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowState.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowState.scala
index 348f09e..a73b6db 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowState.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowState.scala
@@ -22,7 +22,7 @@ import scala.collection.immutable.TreeMap
import org.slf4j.Logger
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.streaming.state.api.{Group, MonoidState, Serializer}
import org.apache.gearpump.streaming.state.impl.WindowState._
import org.apache.gearpump.streaming.task.TaskContext
@@ -31,7 +31,7 @@ import org.apache.gearpump.util.LogUtil
/**
* an interval is a dynamic time range that is divided by window boundary and checkpoint time
*/
-case class Interval(startTime: TimeStamp, endTime: TimeStamp) extends Ordered[Interval] {
+case class Interval(startTime: MilliSeconds, endTime: MilliSeconds) extends Ordered[Interval] {
override def compare(that: Interval): Int = {
if (startTime < that.startTime) -1
else if (startTime > that.startTime) 1
@@ -63,7 +63,7 @@ class WindowState[T](group: Group[T],
private var lastCheckpointTime = 0L
- override def recover(timestamp: TimeStamp, bytes: Array[Byte]): Unit = {
+ override def recover(timestamp: MilliSeconds, bytes: Array[Byte]): Unit = {
window.slideTo(timestamp)
serializer.deserialize(bytes)
.foreach { states =>
@@ -74,7 +74,7 @@ class WindowState[T](group: Group[T],
}
}
- override def update(timestamp: TimeStamp, t: T): Unit = {
+ override def update(timestamp: MilliSeconds, t: T): Unit = {
val (startTime, endTime) = window.range
if (timestamp >= startTime && timestamp < endTime) {
updateState(timestamp, t)
@@ -127,7 +127,7 @@ class WindowState[T](group: Group[T],
* upperBound2 = step * Nmin2 + size > t
* }}}
*/
- private[impl] def getInterval(timestamp: TimeStamp, checkpointTime: TimeStamp): Interval = {
+ private[impl] def getInterval(timestamp: MilliSeconds, checkpointTime: MilliSeconds): Interval = {
val windowSize = window.windowSize
val windowStep = window.windowStep
val lowerBound1 = timestamp / windowStep * windowStep
@@ -147,8 +147,8 @@ class WindowState[T](group: Group[T],
}
}
- private[impl] def updateIntervalStates(timestamp: TimeStamp, t: T, checkpointTime: TimeStamp)
- : Unit = {
+ private[impl] def updateIntervalStates(timestamp: MilliSeconds, t: T,
+ checkpointTime: MilliSeconds): Unit = {
val interval = getInterval(timestamp, checkpointTime)
intervalStates.get(interval) match {
case Some(st) =>
@@ -158,7 +158,7 @@ class WindowState[T](group: Group[T],
}
}
- private[impl] def getIntervalStates(startTime: TimeStamp, endTime: TimeStamp)
+ private[impl] def getIntervalStates(startTime: MilliSeconds, endTime: MilliSeconds)
: TreeMap[Interval, T] = {
intervalStates.dropWhile(_._1.endTime <= startTime).takeWhile(_._1.endTime <= endTime)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializedMessage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializedMessage.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializedMessage.scala
index 675d5cc..c3e3b14 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializedMessage.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializedMessage.scala
@@ -19,9 +19,9 @@ package org.apache.gearpump.streaming.task
import java.io.{DataInput, DataOutput}
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
-case class SerializedMessage(timeStamp: TimeStamp, bytes: Array[Byte])
+case class SerializedMessage(timeStamp: MilliSeconds, bytes: Array[Byte])
class SerializedMessageSerializer extends TaskMessageSerializer[SerializedMessage] {
override def getLength(obj: SerializedMessage): Int = 12 + obj.bytes.length
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
index 8a6f04f..24f1763 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
@@ -20,13 +20,14 @@ package org.apache.gearpump.streaming.task
import org.slf4j.Logger
import com.google.common.primitives.Shorts
+import org.apache.gearpump.{Message, Time}
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.streaming.partitioner.{MulticastPartitioner, Partitioner, UnicastPartitioner}
import org.apache.gearpump.streaming.AppMasterToExecutor.MsgLostException
import org.apache.gearpump.streaming.LifeTime
import org.apache.gearpump.streaming.source.Watermark
import org.apache.gearpump.streaming.task.Subscription._
import org.apache.gearpump.util.LogUtil
-import org.apache.gearpump.{MIN_TIME_MILLIS, Message, TimeStamp}
/**
* Manages the output and message clock for single downstream processor
@@ -59,9 +60,9 @@ class Subscription(
private val pendingMessageCount: Array[Short] = new Array[Short](parallelism)
private val processingWatermarkSince: Array[Short] = new Array[Short](parallelism)
- private val outputWatermark: Array[TimeStamp] = Array.fill(parallelism)(
+ private val outputWatermark: Array[MilliSeconds] = Array.fill(parallelism)(
Watermark.MIN.toEpochMilli)
- private val processingWatermark: Array[TimeStamp] = Array.fill(parallelism)(
+ private val processingWatermark: Array[MilliSeconds] = Array.fill(parallelism)(
Watermark.MIN.toEpochMilli)
private var maxPendingCount: Short = 0
@@ -135,7 +136,7 @@ class Subscription(
}
}
- private var lastFlushTime: Long = MIN_TIME_MILLIS
+ private var lastFlushTime: Long = Time.MIN_TIME_MILLIS
private val FLUSH_INTERVAL = 5 * 1000 // ms
private def needFlush: Boolean = {
System.currentTimeMillis() - lastFlushTime > FLUSH_INTERVAL &&
@@ -181,7 +182,7 @@ class Subscription(
}
}
- def watermark: TimeStamp = {
+ def watermark: MilliSeconds = {
outputWatermark.min
}
@@ -189,7 +190,7 @@ class Subscription(
maxPendingCount < maxPendingMessageCount
}
- def onStallingTime(stallingTime: TimeStamp): Unit = {
+ def onStallingTime(stallingTime: MilliSeconds): Unit = {
outputWatermark.indices.foreach { i =>
if (outputWatermark(i) == stallingTime &&
pendingMessageCount(i) > 0 &&
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
index dc80511..b587cc7 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
@@ -23,10 +23,11 @@ import java.time.Instant
import scala.concurrent.duration.FiniteDuration
import akka.actor.Actor.Receive
import akka.actor.{ActorRef, ActorSystem, Cancellable, Props}
+import org.apache.gearpump.Message
+import org.apache.gearpump.Time.MilliSeconds
import org.slf4j.Logger
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.util.LogUtil
-import org.apache.gearpump.{Message, TimeStamp}
/**
* This provides context information for a task.
@@ -113,7 +114,7 @@ trait TaskContext {
*
* @return the min clock
*/
- def upstreamMinClock: TimeStamp
+ def upstreamMinClock: MilliSeconds
/**
* Update TaskActor with the processing progress (watermark)