You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/08/19 11:58:03 UTC
[2/3] incubator-gearpump git commit: [GEARPUMP-217] Merge master into
sql branch
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaStoreSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaStoreSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaStoreSpec.scala
index 67c64c4..da99d64 100644
--- a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaStoreSpec.scala
+++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaStoreSpec.scala
@@ -22,7 +22,7 @@ import java.util.Properties
import com.twitter.bijection.Injection
import kafka.api.OffsetRequest
import kafka.common.TopicAndPartition
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.streaming.MockUtil
import org.apache.gearpump.streaming.kafka.lib.source.consumer.{KafkaMessage, KafkaConsumer}
import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient
@@ -92,7 +92,7 @@ class KafkaStoreSpec extends PropSpec with PropertyChecks with Matchers with Moc
property("KafkaStore should read checkpoint from timestamp on recover") {
forAll(Gen.alphaStr, timestampGen) {
- (topic: String, recoverTime: TimeStamp) =>
+ (topic: String, recoverTime: MilliSeconds) =>
val consumer = mock[KafkaConsumer]
val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
val kafkaStore = new KafkaStore(topic, producer, Some(consumer))
@@ -104,7 +104,7 @@ class KafkaStoreSpec extends PropSpec with PropertyChecks with Matchers with Moc
}
forAll(Gen.alphaStr, timestampGen) {
- (topic: String, recoverTime: TimeStamp) =>
+ (topic: String, recoverTime: MilliSeconds) =>
val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
val kafkaStore = new KafkaStore(topic, producer, None)
@@ -113,12 +113,12 @@ class KafkaStoreSpec extends PropSpec with PropertyChecks with Matchers with Moc
}
forAll(Gen.alphaStr, timestampGen, timestampGen) {
- (topic: String, recoverTime: TimeStamp, checkpointTime: TimeStamp) =>
+ (topic: String, recoverTime: MilliSeconds, checkpointTime: MilliSeconds) =>
val consumer = mock[KafkaConsumer]
val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
val kafkaStore = new KafkaStore(topic, producer, Some(consumer))
- val key = Injection[TimeStamp, Array[Byte]](checkpointTime)
+ val key = Injection[MilliSeconds, Array[Byte]](checkpointTime)
val msg = key
val kafkaMsg = KafkaMessage(TopicAndPartition(topic, 0), 0, Some(key), msg)
@@ -139,7 +139,7 @@ class KafkaStoreSpec extends PropSpec with PropertyChecks with Matchers with Moc
property("KafkaStore persist should write checkpoint with monotonically increasing timestamp") {
forAll(Gen.alphaStr, timestampGen, Gen.alphaStr) {
- (topic: String, checkpointTime: TimeStamp, data: String) =>
+ (topic: String, checkpointTime: MilliSeconds, data: String) =>
val consumer = mock[KafkaConsumer]
val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
val kafkaStore = new KafkaStore(topic, producer, Some(consumer))
@@ -155,12 +155,12 @@ class KafkaStoreSpec extends PropSpec with PropertyChecks with Matchers with Moc
}
def verifyProducer(producer: Producer[Array[Byte], Array[Byte]], count: Int,
- topic: String, partition: Int, time: TimeStamp, data: String): Unit = {
+ topic: String, partition: Int, time: MilliSeconds, data: String): Unit = {
verify(producer, times(count)).send(
MockUtil.argMatch[ProducerRecord[Array[Byte], Array[Byte]]](record =>
record.topic() == topic
&& record.partition() == partition
- && Injection.invert[TimeStamp, Array[Byte]](record.key()).get == time
+ && Injection.invert[MilliSeconds, Array[Byte]](record.key()).get == time
&& Injection.invert[String, Array[Byte]](record.value()).get == data
))
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/project/BuildExamples.scala
----------------------------------------------------------------------
diff --git a/project/BuildExamples.scala b/project/BuildExamples.scala
index afb7459..b3a8e4a 100644
--- a/project/BuildExamples.scala
+++ b/project/BuildExamples.scala
@@ -106,7 +106,7 @@ object BuildExamples extends sbt.Build {
"commons-io" % "commons-io" % commonsIOVersion,
"io.spray" %% "spray-can" % sprayVersion,
"io.spray" %% "spray-routing-shapeless2" % sprayVersion
- )
+ ) ++ annotationDependencies
) ++ include("examples/distributeservice")
).dependsOn(core % "provided; test->test")
@@ -160,11 +160,12 @@ object BuildExamples extends sbt.Build {
CrossVersion.binaryScalaVersion(scalaVersion.value)
)
- private def include(files: String*): Seq[Def.Setting[_]] = Seq(
- assemblyExcludedJars in assembly := {
- val cp = (fullClasspath in assembly).value
- cp.filterNot(p =>
- files.exists(p.data.getAbsolutePath.contains))
- }
- )
+ private def include(files: String*): Seq[Def.Setting[_]] =
+ Seq(
+ assemblyExcludedJars in assembly := {
+ val cp = (fullClasspath in assembly).value
+ cp.filterNot(p =>
+ files.exists(p.data.getAbsolutePath.contains))
+ }
+ )
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/project/BuildExperiments.scala
----------------------------------------------------------------------
diff --git a/project/BuildExperiments.scala b/project/BuildExperiments.scala
index c603ec1..84c80f0 100644
--- a/project/BuildExperiments.scala
+++ b/project/BuildExperiments.scala
@@ -60,7 +60,7 @@ object BuildExperiments extends sbt.Build {
libraryDependencies ++= Seq(
"org.json4s" %% "json4s-jackson" % "3.2.11",
"com.typesafe.akka" %% "akka-stream" % akkaVersion
- ),
+ ) ++ annotationDependencies,
mainClass in(Compile, packageBin) := Some("akka.stream.gearpump.example.Test")
))
.dependsOn (core % "provided", streaming % "test->test; provided")
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/project/BuildGearpump.scala
----------------------------------------------------------------------
diff --git a/project/BuildGearpump.scala b/project/BuildGearpump.scala
index 895c042..6c2acad 100644
--- a/project/BuildGearpump.scala
+++ b/project/BuildGearpump.scala
@@ -54,7 +54,17 @@ object BuildGearpump extends sbt.Build {
useGpg := false,
pgpSecretRing := file("./secring.asc"),
pgpPublicRing := file("./pubring.asc"),
- scalacOptions ++= Seq("-Yclosure-elim", "-Yinline"),
+ scalacOptions ++= Seq(
+ "-deprecation", // Emit warning and location for usages of deprecated APIs
+ "-encoding", "UTF-8", // Specify character encoding used by source files
+ "-feature", // Emit warning and location for usages of features
+ // that should be imported explicitly
+ "-language:existentials", // Enable existential types
+ "-language:implicitConversions", // Enable implicit conversions
+ "-Yclosure-elim", // Perform closure elimination
+ "-Yinline", // Perform inlining when possible
+ "-Ywarn-unused-import" // Warn on unused imports
+ ),
publishMavenStyle := true,
pgpPassphrase := Option(System.getenv().get("PASSPHRASE")).map(_.toArray),
@@ -74,30 +84,32 @@ object BuildGearpump extends sbt.Build {
}
},
- publishArtifact in Test := true,
-
pomExtra := {
+ // scalastyle:off line.size.limit
<url>https://github.com/apache/incubator-gearpump</url>
- <licenses>
- <license>
- <name>Apache 2</name>
- <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
- </license>
- </licenses>
- <scm>
- <connection>scm:git://git.apache.org/incubator-gearpump.git</connection>
- <developerConnection>scm:git:git@github.com:apache/incubator-gearpump</developerConnection>
- <url>github.com/apache/incubator-gearpump</url>
- </scm>
- <developers>
- <developer>
- <id>gearpump</id>
- <name>Gearpump Team</name>
- <url>http://gearpump.incubator.apache.org/community.html#who-we-are</url>
- </developer>
- </developers>
+ <licenses>
+ <license>
+ <name>Apache 2</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ </license>
+ </licenses>
+ <scm>
+ <connection>scm:git://git.apache.org/incubator-gearpump.git</connection>
+ <developerConnection>scm:git:git@github.com:apache/incubator-gearpump</developerConnection>
+ <url>github.com/apache/incubator-gearpump</url>
+ </scm>
+ <developers>
+ <developer>
+ <id>gearpump</id>
+ <name>Gearpump Team</name>
+ <url>http://gearpump.incubator.apache.org/community.html#who-we-are</url>
+ </developer>
+ </developers>
+ // scalastyle:on line.size.limit
},
+ publishArtifact in Test := true,
+
pomPostProcess := {
(node: xml.Node) => changeShadedDeps(
Set(
@@ -194,7 +206,7 @@ object BuildGearpump extends sbt.Build {
libraryDependencies ++= Seq(
"com.goldmansachs" % "gs-collections" % gsCollectionsVersion
- ),
+ ) ++ annotationDependencies,
pomPostProcess := {
(node: xml.Node) => changeShadedDeps(
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/project/Dependencies.scala
----------------------------------------------------------------------
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 40b6380..b146c08 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -56,6 +56,14 @@ object Dependencies {
val rabbitmqVersion = "3.5.3"
val calciteVersion = "1.12.0"
+ val annotationDependencies = Seq(
+ // work around for compiler warnings like
+ // "Class javax.annotation.CheckReturnValue not found - continuing with a stub"
+ // see https://issues.scala-lang.org/browse/SI-8978
+ // marked as "provided" to be excluded from assembling
+ "com.google.code.findbugs" % "jsr305" % "3.0.2" % "provided"
+ )
+
val coreDependencies = Seq(
libraryDependencies ++= Seq(
"org.slf4j" % "slf4j-api" % slf4jVersion,
@@ -94,11 +102,12 @@ object Dependencies {
exclude("org.slf4j", "slf4j-api"),
"com.codahale.metrics" % "metrics-jvm" % codahaleVersion
exclude("org.slf4j", "slf4j-api"),
+
"com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"org.scalacheck" %% "scalacheck" % scalaCheckVersion % "test",
"org.mockito" % "mockito-core" % mockitoVersion % "test",
"junit" % "junit" % junitVersion % "test"
- )
+ ) ++ annotationDependencies
)
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
index be96577..1f6141a 100644
--- a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
@@ -40,7 +40,7 @@ import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, MasterConfig,
import org.apache.gearpump.cluster.client.ClientContext
import org.apache.gearpump.cluster.worker.WorkerSummary
import org.apache.gearpump.cluster.{ClusterConfig, UserConfig}
-import org.apache.gearpump.jarstore.{JarStoreClient, FileDirective, JarStoreServer}
+import org.apache.gearpump.jarstore.{JarStoreClient, FileDirective}
import org.apache.gearpump.streaming.partitioner.{PartitionerByClassName, PartitionerDescription}
import org.apache.gearpump.services.MasterService.{BuiltinPartitioners, SubmitApplicationRequest}
// NOTE: This cannot be removed!!!
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala
index 7b33987..762b9e4 100644
--- a/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala
@@ -23,7 +23,6 @@ import akka.http.scaladsl.marshalling.ToResponseMarshallable
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.marshalling.ToResponseMarshallable._
-import akka.http.scaladsl.server.{RejectionHandler, StandardRoute}
import akka.stream.Materializer
import org.apache.gearpump.util.Util
// NOTE: This cannot be removed!!!
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala
index 80264d2..4f2b642 100644
--- a/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala
+++ b/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala
@@ -27,16 +27,14 @@ import akka.testkit.TestActor.{AutoPilot, KeepRunning}
import akka.testkit.{TestKit, TestProbe}
import com.typesafe.config.{Config, ConfigFactory}
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-import org.slf4j.Logger
import upickle.default.read
import org.apache.gearpump.cluster.AppMasterToMaster.GeneralAppMasterSummary
import org.apache.gearpump.cluster.ClientToMaster.{GetLastFailure, QueryAppMasterConfig, QueryHistoryMetrics, ResolveAppId}
import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataDetailRequest, AppMasterDataRequest}
import org.apache.gearpump.cluster.MasterToClient._
import org.apache.gearpump.cluster.{ApplicationStatus, TestUtil}
-import org.apache.gearpump.jarstore.{JarStoreClient, JarStoreServer}
+import org.apache.gearpump.jarstore.JarStoreClient
import org.apache.gearpump.streaming.executor.Executor.{ExecutorConfig, ExecutorSummary, GetExecutorSummary, QueryExecutorConfig}
-import org.apache.gearpump.util.LogUtil
// NOTE: This cannot be removed!!!
import org.apache.gearpump.services.util.UpickleUtil._
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala
index 39c0de0..07e44c1 100644
--- a/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala
+++ b/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala
@@ -40,7 +40,7 @@ import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersD
import org.apache.gearpump.cluster.MasterToClient._
import org.apache.gearpump.cluster.TestUtil
import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary}
-import org.apache.gearpump.jarstore.{JarStoreClient, JarStoreServer}
+import org.apache.gearpump.jarstore.JarStoreClient
import org.apache.gearpump.services.MasterService.{BuiltinPartitioners, SubmitApplicationRequest}
// NOTE: This cannot be removed!!!
import org.apache.gearpump.services.util.UpickleUtil._
@@ -166,7 +166,7 @@ class MasterServiceSpec extends FlatSpec with ScalatestRouteTest
private def entity(file: File)(implicit ec: ExecutionContext): Future[RequestEntity] = {
val entity = HttpEntity(MediaTypes.`application/octet-stream`, file.length(),
- FileIO.fromFile(file, chunkSize = 100000))
+ FileIO.fromPath(file.toPath, chunkSize = 100000))
val body = Source.single(
Multipart.FormData.BodyPart(
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala
index 8a76916..f0e4f84 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala
@@ -18,11 +18,9 @@
package org.apache.gearpump.streaming
-import scala.language.existentials
-
import akka.actor.ActorRef
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.cluster.appmaster.WorkerInfo
import org.apache.gearpump.cluster.scheduler.Resource
import org.apache.gearpump.streaming.appmaster.TaskRegistry.TaskLocations
@@ -63,7 +61,7 @@ object AppMasterToExecutor {
case class StartAllTasks(dagVersion: Int)
case class StartDynamicDag(dagVersion: Int)
- case class TaskRegistered(taskId: TaskId, sessionId: Int, startClock: TimeStamp)
+ case class TaskRegistered(taskId: TaskId, sessionId: Int, startClock: MilliSeconds)
case class TaskRejected(taskId: TaskId)
case object RestartClockService
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
index 435414b..f15e1b3 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
@@ -18,10 +18,10 @@
package org.apache.gearpump.streaming
-import scala.language.implicitConversions
import scala.reflect.ClassTag
import akka.actor.ActorSystem
-import org.apache.gearpump.{MAX_TIME_MILLIS, MIN_TIME_MILLIS, TimeStamp}
+import org.apache.gearpump.Time
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.cluster._
import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner, PartitionerDescription, PartitionerObject}
import org.apache.gearpump.streaming.appmaster.AppMaster
@@ -102,8 +102,8 @@ object Processor {
* When input message's timestamp is beyond current processor's lifetime,
* then it will not be processed by this processor.
*/
-case class LifeTime(birth: TimeStamp, death: TimeStamp) {
- def contains(timestamp: TimeStamp): Boolean = {
+case class LifeTime(birth: MilliSeconds, death: MilliSeconds) {
+ def contains(timestamp: MilliSeconds): Boolean = {
timestamp >= birth && timestamp < death
}
@@ -113,8 +113,7 @@ case class LifeTime(birth: TimeStamp, death: TimeStamp) {
}
object LifeTime {
- // MAX_TIME_MILLIS is Long.MaxValue - 1
- val Immortal = LifeTime(MIN_TIME_MILLIS, MAX_TIME_MILLIS + 1)
+ val Immortal = LifeTime(Time.MIN_TIME_MILLIS, Time.UNREACHABLE)
}
/**
@@ -159,7 +158,7 @@ object StreamApplication {
val graph = dag.mapVertex { processor =>
val updatedProcessor = ProcessorToProcessorDescription(indices(processor), processor)
updatedProcessor
- }.mapEdge { (node1, edge, node2) =>
+ }.mapEdge { (_, edge, _) =>
PartitionerDescription(new PartitionerObject(
Option(edge).getOrElse(StreamApplication.hashPartitioner)))
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
index ba4b058..3c5c7da 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
@@ -21,7 +21,7 @@ package org.apache.gearpump.streaming.appmaster
import java.lang.management.ManagementFactory
import akka.actor._
-import org.apache.gearpump._
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.cluster.AppMasterToMaster.ApplicationStatusChanged
import org.apache.gearpump.cluster.ClientToMaster._
import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterActivated, AppMasterDataDetailRequest, ReplayFromTimestampWindowTrailingEdge}
@@ -67,7 +67,7 @@ class AppMaster(appContext: AppMasterContext, app: AppDescription) extends Appli
import akka.pattern.ask
private implicit val dispatcher = context.dispatcher
- private val startTime: TimeStamp = System.currentTimeMillis()
+ private val startTime: MilliSeconds = System.currentTimeMillis()
private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
LOG.info(s"AppMaster[$appId] is launched by $username, app: $app xxxxxxxxxxxxxxxxx")
@@ -322,7 +322,7 @@ class AppMaster(appContext: AppMasterContext, app: AppDescription) extends Appli
context.stop(self)
}
- private def getMinClock: Future[TimeStamp] = {
+ private def getMinClock: Future[MilliSeconds] = {
clockService match {
case Some(service) =>
(service ? GetLatestMinClock).asInstanceOf[Future[LatestMinClock]].map(_.clock)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
index 77a966a..90141d4 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
@@ -25,7 +25,8 @@ import java.util.concurrent.TimeUnit
import akka.actor.{Actor, ActorRef, Cancellable, Stash}
import com.google.common.primitives.Longs
-import org.apache.gearpump.{MIN_TIME_MILLIS, TimeStamp}
+import org.apache.gearpump.Time
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.cluster.ClientToMaster.GetStallingTasks
import org.apache.gearpump.streaming.AppMasterToMaster.StallingTasks
import org.apache.gearpump.streaming._
@@ -38,7 +39,6 @@ import org.apache.gearpump.util.LogUtil
import org.slf4j.Logger
import scala.concurrent.duration.FiniteDuration
-import scala.language.implicitConversions
/**
* Maintains a global view of message timestamp in the application
@@ -61,8 +61,8 @@ class ClockService(
LOG.info("Initializing Clock service, get snapshotted StartClock ....")
store.get(START_CLOCK).map { clock =>
// check for null first since
- // (null).asInstanceOf[TimeStamp] is zero
- val startClock = if (clock != null) clock.asInstanceOf[TimeStamp] else MIN_TIME_MILLIS
+ // (null).asInstanceOf[MilliSeconds] is zero
+ val startClock = if (clock != null) clock.asInstanceOf[MilliSeconds] else Time.MIN_TIME_MILLIS
minCheckpointClock = Some(startClock)
@@ -89,32 +89,32 @@ class ClockService(
// We use Array instead of List for Performance consideration
private var processorClocks = Array.empty[ProcessorClock]
- private var checkpointClocks: Map[TaskId, Vector[TimeStamp]] = _
+ private var checkpointClocks: Map[TaskId, Vector[MilliSeconds]] = _
- private var minCheckpointClock: Option[TimeStamp] = None
+ private var minCheckpointClock: Option[MilliSeconds] = None
private def checkpointEnabled(processor: ProcessorDescription): Boolean = {
val taskConf = processor.taskConf
taskConf != null && taskConf.getBoolean("state.checkpoint.enable").contains(true)
}
- private def resetCheckpointClocks(dag: DAG, startClock: TimeStamp): Unit = {
+ private def resetCheckpointClocks(dag: DAG, startClock: MilliSeconds): Unit = {
this.checkpointClocks = dag.processors.filter(startClock < _._2.life.death)
.filter { case (_, processor) =>
checkpointEnabled(processor)
}.flatMap { case (id, processor) =>
- (0 until processor.parallelism).map(TaskId(id, _) -> Vector.empty[TimeStamp])
+ (0 until processor.parallelism).map(TaskId(id, _) -> Vector.empty[MilliSeconds])
}
if (this.checkpointClocks.isEmpty) {
minCheckpointClock = None
}
}
- private def initDag(startClock: TimeStamp): Unit = {
+ private def initDag(startClock: MilliSeconds): Unit = {
recoverDag(this.dag, startClock)
}
- private def recoverDag(dag: DAG, startClock: TimeStamp): Unit = {
+ private def recoverDag(dag: DAG, startClock: MilliSeconds): Unit = {
this.clocks = dag.processors.filter(startClock < _._2.life.death).
map { pair =>
val (processorId, processor) = pair
@@ -131,7 +131,7 @@ class ClockService(
resetCheckpointClocks(dag, startClock)
}
- private def dynamicDAG(dag: DAG, startClock: TimeStamp): Unit = {
+ private def dynamicDAG(dag: DAG, startClock: MilliSeconds): Unit = {
val newClocks = dag.processors.filter(startClock < _._2.life.death).
map { pair =>
val (processorId, processor) = pair
@@ -208,7 +208,7 @@ class ClockService(
}
}
- private def getUpStreamMinClock(processorId: ProcessorId): Option[TimeStamp] = {
+ private def getUpStreamMinClock(processorId: ProcessorId): Option[MilliSeconds] = {
upstreamClocks.get(processorId).map(ProcessorClocks.minClock)
}
@@ -304,7 +304,7 @@ class ClockService(
}
}
- private def minClock: TimeStamp = {
+ private def minClock: MilliSeconds = {
ProcessorClocks.minClock(processorClocks)
}
@@ -314,7 +314,7 @@ class ClockService(
healthChecker.check(minTimestamp, clocks, dag, System.currentTimeMillis())
}
- private def getStartClock: TimeStamp = {
+ private def getStartClock: MilliSeconds = {
minCheckpointClock.getOrElse(minClock)
}
@@ -322,7 +322,7 @@ class ClockService(
store.put(START_CLOCK, getStartClock)
}
- private def updateCheckpointClocks(task: TaskId, time: TimeStamp): Unit = {
+ private def updateCheckpointClocks(task: TaskId, time: MilliSeconds): Unit = {
val clocks = checkpointClocks(task) :+ time
checkpointClocks += task -> clocks
@@ -341,17 +341,17 @@ object ClockService {
case object HealthCheck
class ProcessorClock(val processorId: ProcessorId, val life: LifeTime, val parallelism: Int,
- private var _min: TimeStamp = MIN_TIME_MILLIS,
- private var _taskClocks: Array[TimeStamp] = null) {
+ private var _min: MilliSeconds = Time.MIN_TIME_MILLIS,
+ private var _taskClocks: Array[MilliSeconds] = null) {
def copy(life: LifeTime): ProcessorClock = {
new ProcessorClock(processorId, life, parallelism, _min, _taskClocks)
}
- def min: TimeStamp = _min
- def taskClocks: Array[TimeStamp] = _taskClocks
+ def min: MilliSeconds = _min
+ def taskClocks: Array[MilliSeconds] = _taskClocks
- def init(startClock: TimeStamp): Unit = {
+ def init(startClock: MilliSeconds): Unit = {
if (taskClocks == null) {
this._min = startClock
this._taskClocks = new Array(parallelism)
@@ -359,7 +359,7 @@ object ClockService {
}
}
- def updateMinClock(taskIndex: Int, clock: TimeStamp): Unit = {
+ def updateMinClock(taskIndex: Int, clock: MilliSeconds): Unit = {
taskClocks(taskIndex) = clock
_min = Longs.min(taskClocks: _*)
}
@@ -382,8 +382,8 @@ object ClockService {
/** Check for stalling tasks */
def check(
- currentMinClock: TimeStamp, processorClocks: Map[ProcessorId, ProcessorClock],
- dag: DAG, now: TimeStamp): Unit = {
+ currentMinClock: MilliSeconds, processorClocks: Map[ProcessorId, ProcessorClock],
+ dag: DAG, now: MilliSeconds): Unit = {
var isClockStalling = false
if (null == minClock || currentMinClock > minClock.appClock) {
minClock = ClockValue(systemClock = now, appClock = currentMinClock)
@@ -424,7 +424,7 @@ object ClockService {
}
object HealthChecker {
- case class ClockValue(systemClock: TimeStamp, appClock: TimeStamp) {
+ case class ClockValue(systemClock: MilliSeconds, appClock: MilliSeconds) {
def prettyPrint: String = {
"(system clock: " + new Date(systemClock).toString + ", app clock: " + appClock + ")"
}
@@ -434,7 +434,7 @@ object ClockService {
object ProcessorClocks {
// Get the Min clock of all processors
- def minClock(clock: Array[ProcessorClock]): TimeStamp = {
+ def minClock(clock: Array[ProcessorClock]): MilliSeconds = {
var i = 0
var min = if (clock.length == 0) 0L else clock(0).min
while (i < clock.length) {
@@ -446,7 +446,7 @@ object ClockService {
}
case class ChangeToNewDAG(dag: DAG)
- case class ChangeToNewDAGSuccess(clocks: Map[ProcessorId, TimeStamp])
+ case class ChangeToNewDAGSuccess(clocks: Map[ProcessorId, MilliSeconds])
- case class StoredStartClock(clock: TimeStamp)
+ case class StoredStartClock(clock: MilliSeconds)
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala
index e023cdf..e31f863 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala
@@ -20,7 +20,7 @@ package org.apache.gearpump.streaming.appmaster
import akka.actor._
import akka.pattern.ask
import com.typesafe.config.Config
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.cluster.AppJar
import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest}
import org.apache.gearpump.cluster.worker.WorkerId
@@ -47,7 +47,7 @@ class JarScheduler(appId: Int, appName: String, config: Config, factory: ActorRe
private implicit val timeout = Constants.FUTURE_TIMEOUT
/** Set the current DAG version active */
- def setDag(dag: DAG, startClock: Future[TimeStamp]): Unit = {
+ def setDag(dag: DAG, startClock: Future[MilliSeconds]): Unit = {
actor ! TransitToNewDag
startClock.map { start =>
actor ! NewDag(dag, start)
@@ -82,7 +82,7 @@ object JarScheduler {
case class ResourceRequestDetail(jar: AppJar, requests: Array[ResourceRequest])
- case class NewDag(dag: DAG, startTime: TimeStamp)
+ case class NewDag(dag: DAG, startTime: MilliSeconds)
case object TransitToNewDag
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
index 126ab92..1214cd0 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
@@ -18,7 +18,7 @@
package org.apache.gearpump.streaming.appmaster
-import org.apache.gearpump._
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.cluster.AppMasterToMaster.AppMasterSummary
import org.apache.gearpump.cluster.{ApplicationStatus, UserConfig}
import org.apache.gearpump.streaming.appmaster.AppMaster.ExecutorBrief
@@ -32,10 +32,10 @@ case class StreamAppMasterSummary(
appId: Int,
appName: String = null,
actorPath: String = null,
- clock: TimeStamp = 0L,
+ clock: MilliSeconds = 0L,
status: ApplicationStatus = ApplicationStatus.ACTIVE,
- startTime: TimeStamp = 0L,
- uptime: TimeStamp = 0L,
+ startTime: MilliSeconds = 0L,
+ uptime: MilliSeconds = 0L,
user: String = null,
homeDirectory: String = "",
logFile: String = "",
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
index 51c4de9..bae5c02 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
@@ -20,7 +20,7 @@ package org.apache.gearpump.streaming.appmaster
import akka.actor._
import akka.pattern.ask
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.cluster.MasterToAppMaster.ReplayFromTimestampWindowTrailingEdge
import org.apache.gearpump.streaming.AppMasterToExecutor._
import org.apache.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, RegisterTask, UnRegisterTask}
@@ -86,11 +86,11 @@ private[appmaster] class TaskManager(
dagManager ! WatchChange(watcher = self)
executorManager ! SetTaskManager(self)
- private def getStartClock: Future[TimeStamp] = {
+ private def getStartClock: Future[MilliSeconds] = {
(clockService ? GetStartClock).asInstanceOf[Future[StartClock]].map(_.clock)
}
- private var startClock: Future[TimeStamp] = getStartClock
+ private var startClock: Future[MilliSeconds] = getStartClock
def receive: Receive = applicationReady(DagReadyState.empty)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala
index 25a0929..8d3ffb3 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala
@@ -17,8 +17,6 @@
*/
package org.apache.gearpump.streaming.dsl.api.functions
-import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
-
object FilterFunction {
def apply[T](fn: T => Boolean): FilterFunction[T] = {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala
index 9ff44a8..1525d6e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala
@@ -18,8 +18,6 @@
package org.apache.gearpump.streaming.dsl.api.functions
-import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
-
/**
* Combines input into an accumulator.
*
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala
index a4fdca6..7880c2f 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala
@@ -17,8 +17,6 @@
*/
package org.apache.gearpump.streaming.dsl.api.functions
-import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
-
object MapFunction {
def apply[T, R](fn: T => R): MapFunction[T, R] = {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/SerializableFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/SerializableFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/SerializableFunction.scala
new file mode 100644
index 0000000..b90ba28
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/SerializableFunction.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.api.functions
+
+/**
+ * Superclass for all user defined function interfaces.
+ * This ensures all functions are serializable and provides common methods
+ * like setup and teardown. Users should not extend this class directly
+ * but subclasses like [[org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction]].
+ */
+abstract class SerializableFunction extends java.io.Serializable {
+
+ def setup(): Unit = {}
+
+ def teardown(): Unit = {}
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
index 11e2416..adad878 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
@@ -17,7 +17,7 @@
*/
package org.apache.gearpump.streaming.dsl.javaapi.functions
-import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
+import org.apache.gearpump.streaming.dsl.api.functions.SerializableFunction
/**
* Transforms one input into zero or more outputs of possibly different types.
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala
new file mode 100644
index 0000000..6d43f16
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming
+
+
+// scalastyle:off line.size.limit
+/**
+ *
+ * The architecture of Gearpump Streaming DSL consists of several layers:
+ *
+ * * User facing [[org.apache.gearpump.streaming.dsl.scalaapi.Stream]] DSL. Stream is created by [[org.apache.gearpump.streaming.dsl.scalaapi.StreamApp]]
+ * from input source like Kafka or by applying high level operations (e.g. flatMap, window, groupBy) to user defined functions(UDFs). UDFs are subclasses
+ * of [[org.apache.gearpump.streaming.dsl.api.functions.SerializableFunction]], represented by [[org.apache.gearpump.streaming.dsl.plan.Op]]
+ * in the underlying [[org.apache.gearpump.util.Graph]].
+ * * [[org.apache.gearpump.streaming.dsl.plan.Planner]], responsible for interpreting the Op Graph, optimizing it and building a low level Graph of
+ * [[org.apache.gearpump.streaming.Processor]]. Finally, it creates a runnable Graph of [[org.apache.gearpump.streaming.task.Task]].
+ * * The execution layer is usually composed of the following four tasks.
+ *
+ * * [[org.apache.gearpump.streaming.source.DataSourceTask]] for [[org.apache.gearpump.streaming.source.DataSource]] to ingest data into Gearpump
+ * * [[org.apache.gearpump.streaming.sink.DataSinkTask]] for [[org.apache.gearpump.streaming.sink.DataSink]] to write data out.
+ * * [[org.apache.gearpump.streaming.dsl.task.GroupByTask]] to execute Ops followed by [[org.apache.gearpump.streaming.dsl.plan.GroupByOp]]
+ * * [[org.apache.gearpump.streaming.dsl.task.TransformTask]] to execute all other Ops.
+ *
+ * All but [[org.apache.gearpump.streaming.sink.DataSinkTask]] delegates execution to [[org.apache.gearpump.streaming.dsl.window.impl.WindowRunner]], which internally
+ * runs a chain of [[org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner]] grouped by windows. Window assignments are either explicitly defined with
+ * [[org.apache.gearpump.streaming.dsl.window.api.Windows]] API or implicitly in [[org.apache.gearpump.streaming.dsl.window.api.GlobalWindows]]. UDFs are eventually
+ * executed by [[org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner]].
+ *
+ */
+// scalastyle:on line.size.limit
+package object dsl {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
index 2a45a8f..c37ced6 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
@@ -35,16 +35,29 @@ import scala.reflect.ClassTag
object Op {
+ /**
+ * Concatenates two descriptions with "." or returns one if the other is empty.
+ */
def concatenate(desc1: String, desc2: String): String = {
if (desc1 == null || desc1.isEmpty) desc2
else if (desc2 == null || desc2.isEmpty) desc1
else desc1 + "." + desc2
}
+ /**
+ * Concatenates two configs according to the following rules
+ * 1. The first config cannot be null.
+ * 2. The first config is returned if the second config is null
+ * 3. The second config takes precedence for overlapping config keys
+ */
def concatenate(config1: UserConfig, config2: UserConfig): UserConfig = {
config1.withConfig(config2)
}
+ /**
+ * This adds a [[org.apache.gearpump.streaming.dsl.plan.functions.DummyRunner]] in
+ * [[GlobalWindows]] if a targeting [[Task]] has no executable UDF.
+ */
def withGlobalWindowsDummyRunner(op: Op, userConfig: UserConfig,
processor: Processor[_ <: Task])(implicit system: ActorSystem): Processor[_ <: Task] = {
if (userConfig.getValue(Constants.GEARPUMP_STREAMING_OPERATOR).isEmpty) {
@@ -59,22 +72,37 @@ object Op {
}
/**
- * This is a vertex on the logical plan.
+ * This is a vertex on the logical Graph, representing user defined functions in
+ * [[org.apache.gearpump.streaming.dsl.scalaapi.Stream]] DSL.
*/
sealed trait Op {
+ /**
+ * This comes from user function description and is used to display it on front end.
+ */
def description: String
+ /**
+ * This will ship user function to [[org.apache.gearpump.streaming.task.Task]] to be executed.
+ */
def userConfig: UserConfig
+ /**
+ * This creates a new Op by merging their user functions, user configs and descriptions.
+ */
def chain(op: Op)(implicit system: ActorSystem): Op
+ /**
+ * This creates a Processor after chaining.
+ */
def toProcessor(implicit system: ActorSystem): Processor[_ <: Task]
}
/**
- * This represents a low level Processor.
+ * This represents a low level Processor. It is deprecated since it
+ * doesn't work with other Ops.
*/
+@deprecated
case class ProcessorOp[T <: Task](
processor: Class[T],
parallelism: Int,
@@ -99,7 +127,8 @@ case class ProcessorOp[T <: Task](
}
/**
- * This represents a DataSource.
+ * This represents a DataSource and creates a
+ * [[org.apache.gearpump.streaming.source.DataSourceTask]]
*/
case class DataSourceOp(
dataSource: DataSource,
@@ -142,7 +171,7 @@ case class DataSourceOp(
}
/**
- * This represents a DataSink.
+ * This represents a DataSink and creates a [[org.apache.gearpump.streaming.sink.DataSinkTask]].
*/
case class DataSinkOp(
dataSink: DataSink,
@@ -163,7 +192,7 @@ case class DataSinkOp(
/**
* This represents operations that can be chained together
* (e.g. flatMap, map, filter, reduce) and further chained
- * to another Op to be used
+ * to another Op to be executed
*/
case class TransformOp[IN, OUT](
fn: FunctionRunner[IN, OUT],
@@ -201,61 +230,7 @@ case class TransformOp[IN, OUT](
}
}
-/**
- * This is an intermediate operation, produced by chaining WindowOp and TransformOp.
- * Usually, it will be chained to a DataSourceOp, GroupByOp or MergeOp.
- * Otherwise, it will be translated to a Processor of TransformTask.
- */
-case class WindowTransformOp[IN, OUT](
- windowRunner: WindowRunner[IN, OUT],
- description: String,
- userConfig: UserConfig) extends Op {
-
- override def chain(other: Op)(implicit system: ActorSystem): Op = {
- other match {
- case op: WindowTransformOp[OUT, _] =>
- WindowTransformOp(
- WindowRunnerAT(windowRunner, op.windowRunner),
- Op.concatenate(description, op.description),
- Op.concatenate(userConfig, op.userConfig)
- )
- case _ =>
- throw new OpChainException(this, other)
- }
- }
-
- override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
- // TODO: this should be chained to DataSourceOp / GroupByOp / MergeOp
- Processor[TransformTask[Any, Any]](1, description, userConfig.withValue(
- Constants.GEARPUMP_STREAMING_OPERATOR, windowRunner))
- }
-}
-
-/**
- * This is an intermediate operation, produced by chaining TransformOp and WindowOp.
- * It will later be chained to a WindowOp, which results in two WindowTransformOps.
- * Finally, they will be chained to a single WindowTransformOp.
- */
-case class TransformWindowTransformOp[IN, MIDDLE, OUT](
- transformOp: TransformOp[IN, MIDDLE],
- windowTransformOp: WindowTransformOp[MIDDLE, OUT]) extends Op {
-
- override def description: String = {
- throw new UnsupportedOperationException(s"description is not supported on $this")
- }
-
- override def userConfig: UserConfig = {
- throw new UnsupportedOperationException(s"userConfig is not supported on $this")
- }
-
- override def chain(op: Op)(implicit system: ActorSystem): Op = {
- throw new UnsupportedOperationException(s"chain is not supported on $this")
- }
- override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
- WindowOp(GlobalWindows()).chain(this).toProcessor
- }
-}
/**
* This represents a window aggregation, together with a following TransformOp
@@ -290,7 +265,14 @@ case class WindowOp(
}
/**
- * This represents a Processor with groupBy and window aggregation
+ * This represents an operation with groupBy followed by window aggregation.
+ *
+ * It can only be chained with [[WindowTransformOp]] to be executed in
+ * [[org.apache.gearpump.streaming.dsl.task.GroupByTask]].
+ * However, it's possible a window function has no following aggregations. In that case,
+ * we manually tail a [[WindowOp]] with [[TransformOp]] of
+ * [[org.apache.gearpump.streaming.dsl.plan.functions.DummyRunner]] to create a
+ * [[WindowTransformOp]].
*/
case class GroupByOp[IN, GROUP] private(
groupBy: IN => GROUP,
@@ -325,7 +307,14 @@ case class GroupByOp[IN, GROUP] private(
}
/**
- * This represents a Processor transforming merged streams
+ * This represents an operation with merge followed by window aggregation.
+ *
+ * It can only be chained with [[WindowTransformOp]] to be executed in
+ * [[org.apache.gearpump.streaming.dsl.task.TransformTask]].
+ * However, it's possible a merge function has no following aggregations. In that case,
+ * we manually tail a [[WindowOp]] with [[TransformOp]] of
+ * [[org.apache.gearpump.streaming.dsl.plan.functions.DummyRunner]] to create a
+ * [[WindowTransformOp]].
*/
case class MergeOp(
parallelism: Int = 1,
@@ -357,7 +346,65 @@ case class MergeOp(
}
/**
- * This is an edge on the logical plan.
+ * This is an intermediate operation, produced by chaining [[WindowOp]] and [[TransformOp]].
+ * Usually, it will be chained to a [[DataSourceOp]], [[GroupByOp]] or [[MergeOp]]. Nonetheless,
+ * Op with more than 1 outgoing edge or incoming edge cannot be chained. In that case,
+ * it will be translated to a [[org.apache.gearpump.streaming.dsl.task.TransformTask]].
+ */
+private case class WindowTransformOp[IN, OUT](
+ windowRunner: WindowRunner[IN, OUT],
+ description: String,
+ userConfig: UserConfig) extends Op {
+
+ override def chain(other: Op)(implicit system: ActorSystem): Op = {
+ other match {
+ case op: WindowTransformOp[OUT, _] =>
+ WindowTransformOp(
+ WindowRunnerAT(windowRunner, op.windowRunner),
+ Op.concatenate(description, op.description),
+ Op.concatenate(userConfig, op.userConfig)
+ )
+ case _ =>
+ throw new OpChainException(this, other)
+ }
+ }
+
+ override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+ // TODO: this should be chained to DataSourceOp / GroupByOp / MergeOp
+ Processor[TransformTask[Any, Any]](1, description, userConfig.withValue(
+ Constants.GEARPUMP_STREAMING_OPERATOR, windowRunner))
+ }
+}
+
+/**
+ * This is an intermediate operation, produced by chaining [[TransformOp]] and
+ * [[WindowTransformOp]]. It will later be chained to a [[WindowOp]], which results in
+ * two [[WindowTransformOp]]s. Finally, they will be chained to a single WindowTransformOp.
+ */
+private case class TransformWindowTransformOp[IN, MIDDLE, OUT](
+ transformOp: TransformOp[IN, MIDDLE],
+ windowTransformOp: WindowTransformOp[MIDDLE, OUT]) extends Op {
+
+ override def description: String = {
+ throw new UnsupportedOperationException(s"description is not supported on $this")
+ }
+
+ override def userConfig: UserConfig = {
+ throw new UnsupportedOperationException(s"userConfig is not supported on $this")
+ }
+
+ override def chain(op: Op)(implicit system: ActorSystem): Op = {
+ throw new UnsupportedOperationException(s"chain is not supported on $this")
+ }
+
+ override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+ WindowOp(GlobalWindows()).chain(this).toProcessor
+ }
+}
+
+/**
+ * This is an edge on the logical plan. It defines whether data should be transported locally
+ * or shuffled remotely between [[Op]].
*/
trait OpEdge
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
index b1b39c9..04b5337 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
@@ -24,11 +24,26 @@ import org.apache.gearpump.streaming.Processor
import org.apache.gearpump.streaming.task.Task
import org.apache.gearpump.util.Graph
+/**
+ * This class is responsible for turning the high level
+ * [[org.apache.gearpump.streaming.dsl.scalaapi.Stream]] DSL into low level
+ * [[org.apache.gearpump.streaming.Processor]] API.
+ */
class Planner {
/**
- * Converts Dag of Op to Dag of TaskDescription. TaskDescription is part of the low
- * level Graph API.
+ * This method interprets a Graph of [[Op]] and creates a Graph of
+ * [[org.apache.gearpump.streaming.Processor]].
+ *
+ * It firstly reversely traverses the Graph from a leaf Op and merges it with
+ * its downstream Op according to the following rules.
+ *
+ * 1. The Op has only one outgoing edge and the downstream Op has only one incoming edge
+ * 2. Neither Op is [[ProcessorOp]]
+ * 3. The edge is [[Direct]]
+ *
+ * Finally the vertices of the optimized Graph are translated to Processors
+ * and the edges to Partitioners.
*/
def plan(dag: Graph[Op, OpEdge])
(implicit system: ActorSystem): Graph[Processor[_ <: Task], _ <: Partitioner] = {
@@ -43,6 +58,7 @@ class Planner {
case _ => new HashPartitioner
}
case Direct =>
+ // FIXME: This is never used
new CoLocationPartitioner
}
}.mapVertex(_.toProcessor)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
index ef2753e..d0c733e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
@@ -210,6 +210,7 @@ class Stream[T](
* @param parallelism parallelism level
* @return new stream after processing with type [R]
*/
+ @deprecated
def process[R](
processor: Class[_ <: Task], parallelism: Int, conf: UserConfig = UserConfig.empty,
description: String = "process"): Stream[R] = {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
index bce8c0c..17d77bc 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
@@ -62,6 +62,17 @@ class StreamApp(
val dag = planner.plan(graph)
StreamApplication(name, dag, userConfig)
}
+
+ def source[T](dataSource: DataSource, parallelism: Int = 1,
+ conf: UserConfig = UserConfig.empty, description: String = "source"): Stream[T] = {
+ val sourceOp = DataSourceOp(dataSource, parallelism, description, conf)
+ graph.addVertex(sourceOp)
+ new Stream[T](graph, sourceOp)
+ }
+
+ def source[T](seq: Seq[T], parallelism: Int, description: String): Stream[T] = {
+ this.source(new CollectionDataSource[T](seq), parallelism, UserConfig.empty, description)
+ }
}
object StreamApp {
@@ -73,20 +84,6 @@ object StreamApp {
implicit def streamAppToApplication(streamApp: StreamApp): StreamApplication = {
streamApp.plan()
}
-
- implicit class Source(app: StreamApp) extends java.io.Serializable {
-
- def source[T](dataSource: DataSource, parallelism: Int = 1,
- conf: UserConfig = UserConfig.empty, description: String = "source"): Stream[T] = {
- implicit val sourceOp = DataSourceOp(dataSource, parallelism, description, conf)
- app.graph.addVertex(sourceOp)
- new Stream[T](app.graph, sourceOp)
- }
-
- def source[T](seq: Seq[T], parallelism: Int, description: String): Stream[T] = {
- this.source(new CollectionDataSource[T](seq), parallelism, UserConfig.empty, description)
- }
- }
}
/** A test message source which generated message sequence repeatedly. */
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
index 252b5bd..2d26df6 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
@@ -17,7 +17,7 @@
*/
package org.apache.gearpump.streaming.dsl.scalaapi.functions
-import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction}
+import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction, SerializableFunction}
import org.apache.gearpump.streaming.dsl.javaapi.functions.{FlatMapFunction => JFlatMapFunction}
import scala.collection.JavaConverters._
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala
deleted file mode 100644
index ab88bf1..0000000
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.dsl.scalaapi.functions
-
-/**
- * Superclass for all user defined function interfaces.
- * This ensures all functions are serializable and provides common methods
- * like setup and teardown. Users should not extend this class directly
- * but subclasses like [[FlatMapFunction]].
- */
-abstract class SerializableFunction extends java.io.Serializable {
-
- def setup(): Unit = {}
-
- def teardown(): Unit = {}
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala
index a4524a8..46b8e92 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala
@@ -17,8 +17,18 @@
*/
package org.apache.gearpump.streaming.dsl.window.api
+
+/**
+ * Determines relationship between multiple results for the same window.
+ */
sealed trait AccumulationMode
+/**
+ * Window results are accumulated.
+ */
case object Accumulating extends AccumulationMode
+/**
+ * Window results are independent.
+ */
case object Discarding extends AccumulationMode
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala
index 02d52a0..b9a8695 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala
@@ -17,7 +17,16 @@
*/
package org.apache.gearpump.streaming.dsl.window.api
+/**
+ * Determines when window results are emitted.
+ * For now, [[EventTimeTrigger]] is used for all applications.
+ */
+// TODO: Make this a public API
sealed trait Trigger
+/**
+ * Triggers emitting when watermark past the end of window on event time.
+ */
+// FIXME: This is no more than a tag now and the logic is hard corded in WindowRunner
case object EventTimeTrigger extends Trigger
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
index a2f51c7..4db02e7 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
@@ -19,7 +19,8 @@ package org.apache.gearpump.streaming.dsl.window.api
import java.time.{Duration, Instant}
-import org.apache.gearpump.{MIN_TIME_MILLIS, MAX_TIME_MILLIS, TimeStamp}
+import org.apache.gearpump.Time
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.streaming.dsl.window.impl.Window
import scala.collection.mutable.ArrayBuffer
@@ -32,8 +33,14 @@ object WindowFunction {
}
}
+/**
+ * Determines how elements are assigned to windows for calculation.
+ */
trait WindowFunction {
+ /**
+ * Assigns elements into windows.
+ */
def apply[T](context: WindowFunction.Context[T]): Array[Window]
def isNonMerging: Boolean
@@ -46,10 +53,13 @@ abstract class NonMergingWindowFunction extends WindowFunction {
object GlobalWindowFunction {
- val globalWindow = Array(Window(Instant.ofEpochMilli(MIN_TIME_MILLIS),
- Instant.ofEpochMilli(MAX_TIME_MILLIS)))
+ val globalWindow = Array(Window(Instant.ofEpochMilli(Time.MIN_TIME_MILLIS),
+ Instant.ofEpochMilli(Time.MAX_TIME_MILLIS)))
}
+/**
+ * All elements are assigned to the same global window for calculation.
+ */
case class GlobalWindowFunction() extends NonMergingWindowFunction {
override def apply[T](context: WindowFunction.Context[T]): Array[Window] = {
@@ -57,6 +67,12 @@ case class GlobalWindowFunction() extends NonMergingWindowFunction {
}
}
+/**
+ * Elements are assigned to non-merging sliding windows for calculation.
+ *
+ * @param size window size
+ * @param step window step to slide forward
+ */
case class SlidingWindowFunction(size: Duration, step: Duration)
extends NonMergingWindowFunction {
@@ -80,11 +96,17 @@ case class SlidingWindowFunction(size: Duration, step: Duration)
windows.toArray
}
- private def lastStartFor(timestamp: TimeStamp, windowStep: Long): TimeStamp = {
+ private def lastStartFor(timestamp: MilliSeconds, windowStep: Long): MilliSeconds = {
timestamp - (timestamp + windowStep) % windowStep
}
}
+/**
+ * Elements are assigned to merging windows for calculation. Windows are merged
+ * if their distance is within the defined gap.
+ *
+ * @param gap session gap
+ */
case class SessionWindowFunction(gap: Duration) extends WindowFunction {
override def apply[T](context: WindowFunction.Context[T]): Array[Window] = {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
index d53bc96..e15b5c4 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
@@ -20,11 +20,14 @@ package org.apache.gearpump.streaming.dsl.window.api
import java.time.Duration
/**
- * Defines how to apply window functions.
+ * User facing Window DSL.
+ * Defines how to apply [[WindowFunction]], [[Trigger]]
+ * and [[AccumulationMode]].
*
* @param windowFn how to divide windows
* @param trigger when to trigger window result
- * @param accumulationMode whether to accumulate results across windows
+ * @param accumulationMode whether to accumulate window results
+ * @param description window description
*/
case class Windows(
windowFn: WindowFunction,
@@ -47,6 +50,11 @@ case class Windows(
object GlobalWindows {
+ /**
+ * Defines a [[GlobalWindowFunction]].
+ *
+ * @return a Window definition
+ */
def apply(): Windows = {
Windows(GlobalWindowFunction(), description = "globalWindows")
}
@@ -55,7 +63,7 @@ object GlobalWindows {
object FixedWindows {
/**
- * Defines a FixedWindow.
+ * Defines a non-overlapping [[SlidingWindowFunction]].
*
* @param size window size
* @return a Window definition
@@ -68,7 +76,7 @@ object FixedWindows {
object SlidingWindows {
/**
- * Defines a SlidingWindow.
+ * Defines a overlapping [[SlidingWindowFunction]].
*
* @param size window size
* @param step window step to slide forward
@@ -82,7 +90,7 @@ object SlidingWindows {
object SessionWindows {
/**
- * Defines a SessionWindow.
+ * Defines a [[SessionWindowFunction]].
*
* @param gap session gap
* @return a Window definition
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala
deleted file mode 100644
index e978983..0000000
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.dsl.window.impl
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.streaming.dsl.window.api.Trigger
-
-trait ReduceFnRunner {
-
- def process(message: Message): Unit
-
- def onTrigger(trigger: Trigger): Unit
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
index 870c334..d6d08c9 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
@@ -19,22 +19,16 @@ package org.apache.gearpump.streaming.dsl.window.impl
import java.time.Instant
-import akka.actor.ActorSystem
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.Processor
-import org.apache.gearpump.{Message, TimeStamp}
-import org.apache.gearpump.streaming.dsl.window.api._
-import org.apache.gearpump.streaming.task.Task
+import org.apache.gearpump.Time.MilliSeconds
object Window {
- def ofEpochMilli(startTime: TimeStamp, endTime: TimeStamp): Window = {
+ def ofEpochMilli(startTime: MilliSeconds, endTime: MilliSeconds): Window = {
Window(Instant.ofEpochMilli(startTime), Instant.ofEpochMilli(endTime))
}
}
/**
- * A window unit including startTime and excluding endTime.
+ * A window unit from startTime(including) to endTime(excluding).
*/
case class Window(startTime: Instant, endTime: Instant) extends Comparable[Window] {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
index 17a9525..ee3c067 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
@@ -31,11 +31,23 @@ import org.apache.gearpump.streaming.task.TaskUtil
import scala.collection.mutable.ArrayBuffer
+/**
+ * Inputs for [[WindowRunner]].
+ */
case class TimestampedValue[T](value: T, timestamp: Instant)
+/**
+ * Outputs triggered by [[WindowRunner]]
+ */
case class TriggeredOutputs[T](outputs: TraversableOnce[TimestampedValue[T]],
watermark: Instant)
+/**
+ * This is responsible for executing window calculation.
+ * 1. Groups elements into windows as defined by window function
+ * 2. Applies window calculation to each group
+ * 3. Emits results on triggering
+ */
trait WindowRunner[IN, OUT] extends java.io.Serializable {
def process(timestampedValue: TimestampedValue[IN]): Unit
@@ -43,6 +55,10 @@ trait WindowRunner[IN, OUT] extends java.io.Serializable {
def trigger(time: Instant): TriggeredOutputs[OUT]
}
+/**
+ * A composite WindowRunner that first executes its left child and feeds results
+ * into result child.
+ */
case class AndThen[IN, MIDDLE, OUT](left: WindowRunner[IN, MIDDLE],
right: WindowRunner[MIDDLE, OUT]) extends WindowRunner[IN, OUT] {
@@ -57,6 +73,9 @@ case class AndThen[IN, MIDDLE, OUT](left: WindowRunner[IN, MIDDLE],
}
}
+/**
+ * Default implementation for [[WindowRunner]].
+ */
class DefaultWindowRunner[IN, OUT](
windows: Windows,
fnRunner: FunctionRunner[IN, OUT])
@@ -137,11 +156,15 @@ class DefaultWindowRunner[IN, OUT](
}
onTrigger(outputs, newWmk)
} else {
- // minimum of end of last triggered window and start of first un-triggered window
+ // The output watermark is the minimum of end of last triggered window
+ // and start of first un-triggered window
TriggeredOutputs(outputs, TaskUtil.min(wmk, firstWin.startTime))
}
} else {
+ // All windows have been triggered.
if (time == Watermark.MAX) {
+ // This means there will be no more inputs
+ // so it's safe to advance to the maximum watermark.
TriggeredOutputs(outputs, Watermark.MAX)
} else {
TriggeredOutputs(outputs, wmk)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala
index 8f8b7ab..058d36b 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala
@@ -22,7 +22,7 @@ import java.util
import com.google.common.collect.Iterators
import com.typesafe.config.Config
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.cluster.ClientToMaster.ReadOption
import org.apache.gearpump.cluster.MasterToClient.HistoryMetricsItem
import org.apache.gearpump.metrics.Metrics.{Histogram, Meter}
@@ -64,7 +64,7 @@ class ProcessorAggregator(historyMetricConfig: HistoryMetricsConfig) extends Met
}
def aggregate(
- readOption: ReadOption.ReadOption, inputs: Iterator[HistoryMetricsItem], now: TimeStamp)
+ readOption: ReadOption.ReadOption, inputs: Iterator[HistoryMetricsItem], now: MilliSeconds)
: List[HistoryMetricsItem] = {
val (start, end, interval) = getTimeRange(readOption, now)
val timeSlotsCount = ((end - start - 1) / interval + 1).toInt
@@ -103,8 +103,8 @@ class ProcessorAggregator(historyMetricConfig: HistoryMetricsConfig) extends Met
}
// Returns (start, end, interval)
- private def getTimeRange(readOption: ReadOption.ReadOption, now: TimeStamp)
- : (TimeStamp, TimeStamp, TimeStamp) = {
+ private def getTimeRange(readOption: ReadOption.ReadOption, now: MilliSeconds)
+ : (MilliSeconds, MilliSeconds, MilliSeconds) = {
readOption match {
case ReadOption.ReadRecent =>
val end = now
@@ -229,7 +229,7 @@ object ProcessorAggregator {
var p99: Double = 0
var p999: Double = 0
- var startTime: TimeStamp = Long.MaxValue
+ var startTime: MilliSeconds = Long.MaxValue
override def aggregate(item: HistoryMetricsItem): Unit = {
val input = item.value.asInstanceOf[Histogram]
@@ -263,7 +263,7 @@ object ProcessorAggregator {
var m1: Double = 0
var rateUnit: String = null
- var startTime: TimeStamp = Long.MaxValue
+ var startTime: MilliSeconds = Long.MaxValue
override def aggregate(item: HistoryMetricsItem): Unit = {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala
index 0db44f2..932c750 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala
@@ -52,4 +52,8 @@ class DataSinkTask private[sink](context: TaskContext, conf: UserConfig, sink: D
LOG.info("closing data sink...")
sink.close()
}
+
+ override def onWatermarkProgress(watermark: Instant): Unit = {
+ context.updateWatermark(watermark)
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
index 14abff8..607af85 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
@@ -19,7 +19,7 @@ package org.apache.gearpump.streaming.source
import java.time.Instant
-import org.apache.gearpump.{MAX_TIME_MILLIS, MIN_TIME_MILLIS, Message}
+import org.apache.gearpump.{Message, Time}
/**
* message used by source task to report source watermark.
@@ -28,9 +28,14 @@ case class Watermark(instant: Instant) {
def toMessage: Message = Message("watermark", instant)
}
+/**
+ * All input data with event times less than watermark have been observed
+ */
object Watermark {
- val MAX: Instant = Instant.ofEpochMilli(MAX_TIME_MILLIS + 1)
+ // all input data have been observed
+ val MAX: Instant = Instant.ofEpochMilli(Time.MAX_TIME_MILLIS + 1)
- val MIN: Instant = Instant.ofEpochMilli(MIN_TIME_MILLIS)
+ // no input data have been observed
+ val MIN: Instant = Instant.ofEpochMilli(Time.MIN_TIME_MILLIS)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/MonoidState.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/MonoidState.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/MonoidState.scala
index 0e2f83a..0118c07 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/MonoidState.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/MonoidState.scala
@@ -18,7 +18,7 @@
package org.apache.gearpump.streaming.state.api
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
/**
* MonoidState uses Algebird Monoid to aggregate state
@@ -37,11 +37,11 @@ abstract class MonoidState[T](monoid: Monoid[T]) extends PersistentState[T] {
override def get: Option[T] = Option(monoid.plus(left, right))
- override def setNextCheckpointTime(nextCheckpointTime: TimeStamp): Unit = {
+ override def setNextCheckpointTime(nextCheckpointTime: MilliSeconds): Unit = {
checkpointTime = nextCheckpointTime
}
- protected def updateState(timestamp: TimeStamp, t: T): Unit = {
+ protected def updateState(timestamp: MilliSeconds, t: T): Unit = {
if (timestamp < checkpointTime) {
left = monoid.plus(left, t)
} else {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentState.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentState.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentState.scala
index 906d331..39b17c9 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentState.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentState.scala
@@ -18,7 +18,7 @@
package org.apache.gearpump.streaming.state.api
-import org.apache.gearpump._
+import org.apache.gearpump.Time.MilliSeconds
/**
* PersistentState is part of the transaction API
@@ -33,19 +33,19 @@ trait PersistentState[T] {
* Recovers state to a previous checkpoint
* usually invoked by the framework
*/
- def recover(timestamp: TimeStamp, bytes: Array[Byte]): Unit
+ def recover(timestamp: MilliSeconds, bytes: Array[Byte]): Unit
/**
* Updates state on a new message
* this is invoked by user
*/
- def update(timestamp: TimeStamp, t: T): Unit
+ def update(timestamp: MilliSeconds, t: T): Unit
/**
* Sets next checkpoint time
* should be invoked by the framework
*/
- def setNextCheckpointTime(timeStamp: TimeStamp): Unit
+ def setNextCheckpointTime(timeStamp: MilliSeconds): Unit
/**
* Gets a binary snapshot of state
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
index d3ffaa9..3a3b0a7 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
@@ -20,12 +20,13 @@ package org.apache.gearpump.streaming.state.api
import java.time.Instant
+import org.apache.gearpump.Message
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.state.impl.{CheckpointManager, PersistentStateConfig}
import org.apache.gearpump.streaming.task.{Task, TaskContext, UpdateCheckpointClock}
import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
import org.apache.gearpump.util.LogUtil
-import org.apache.gearpump.{Message, TimeStamp}
object PersistentTask {
val LOG = LogUtil.getLogger(getClass)
@@ -41,8 +42,6 @@ abstract class PersistentTask[T](taskContext: TaskContext, conf: UserConfig)
extends Task(taskContext, conf) {
import taskContext._
- import org.apache.gearpump.streaming.state.api.PersistentTask._
-
val checkpointStoreFactory = conf.getValue[CheckpointStoreFactory](
PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY).get
val checkpointStore = checkpointStoreFactory.getCheckpointStore(
@@ -99,7 +98,7 @@ abstract class PersistentTask[T](taskContext: TaskContext, conf: UserConfig)
checkpointManager.close()
}
- private def reportCheckpointClock(timestamp: TimeStamp): Unit = {
+ private def reportCheckpointClock(timestamp: MilliSeconds): Unit = {
appMaster ! UpdateCheckpointClock(taskContext.taskId, timestamp)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/CheckpointManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/CheckpointManager.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/CheckpointManager.scala
index 82b7952..7d9e92a 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/CheckpointManager.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/CheckpointManager.scala
@@ -18,7 +18,7 @@
package org.apache.gearpump.streaming.state.impl
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.streaming.transaction.api.CheckpointStore
/** Manage physical checkpoints to persitent storage like HDFS */
@@ -28,11 +28,11 @@ class CheckpointManager(checkpointInterval: Long,
private var maxMessageTime: Long = 0L
private var checkpointTime: Option[Long] = None
- def recover(timestamp: TimeStamp): Option[Array[Byte]] = {
+ def recover(timestamp: MilliSeconds): Option[Array[Byte]] = {
checkpointStore.recover(timestamp)
}
- def checkpoint(timestamp: TimeStamp, checkpoint: Array[Byte]): Option[TimeStamp] = {
+ def checkpoint(timestamp: MilliSeconds, checkpoint: Array[Byte]): Option[MilliSeconds] = {
checkpointStore.persist(timestamp, checkpoint)
checkpointTime = checkpointTime.collect { case time if maxMessageTime > time =>
time + (1 + (maxMessageTime - time) / checkpointInterval) * checkpointInterval
@@ -41,7 +41,7 @@ class CheckpointManager(checkpointInterval: Long,
checkpointTime
}
- def update(messageTime: TimeStamp): Option[TimeStamp] = {
+ def update(messageTime: MilliSeconds): Option[MilliSeconds] = {
maxMessageTime = Math.max(maxMessageTime, messageTime)
if (checkpointTime.isEmpty) {
checkpointTime = Some((1 + messageTime / checkpointInterval) * checkpointInterval)
@@ -50,15 +50,15 @@ class CheckpointManager(checkpointInterval: Long,
checkpointTime
}
- def shouldCheckpoint(upstreamMinClock: TimeStamp): Boolean = {
+ def shouldCheckpoint(upstreamMinClock: MilliSeconds): Boolean = {
checkpointTime.exists(time => upstreamMinClock >= time)
}
- def getCheckpointTime: Option[TimeStamp] = checkpointTime
+ def getCheckpointTime: Option[MilliSeconds] = checkpointTime
def close(): Unit = {
checkpointStore.close()
}
- private[impl] def getMaxMessageTime: TimeStamp = maxMessageTime
+ private[impl] def getMaxMessageTime: MilliSeconds = maxMessageTime
}