You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:18 UTC
[08/49] incubator-gearpump git commit: fix GEARPUMP-118 change
package name to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/task/TaskControlMessage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskControlMessage.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskControlMessage.scala
deleted file mode 100644
index d89387a..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskControlMessage.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.task
-
-import io.gearpump.TimeStamp
-import io.gearpump.streaming.ProcessorId
-
-/*
- * Initial AckRequest
- */
-case class InitialAckRequest(taskId: TaskId, sessionId: Int)
-
-/*
- Here the sessionId filed is used to distinguish messages
- between different replays after the application restart
- */
-case class AckRequest(taskId: TaskId, seq: Short, sessionId: Int)
-
-/**
- * Ack back to sender task actor.
- *
- * @param seq The seq field represents the expected number of received messages and the
- * actualReceivedNum field means the actual received number since start.
- */
-case class Ack(taskId: TaskId, seq: Short, actualReceivedNum: Short, sessionId: Int)
-
-sealed trait ClockEvent
-
-case class UpdateClock(taskId: TaskId, time: TimeStamp) extends ClockEvent
-
-object GetLatestMinClock extends ClockEvent
-
-case class GetUpstreamMinClock(taskId: TaskId) extends ClockEvent
-
-case class UpstreamMinClock(latestMinClock: TimeStamp)
-
-case class LatestMinClock(clock: TimeStamp)
-
-case class ReportCheckpointClock(taskId: TaskId, clock: TimeStamp)
-
-case object GetCheckpointClock
-
-case class CheckpointClock(clock: Option[TimeStamp])
-
-case object GetStartClock
-
-case class StartClock(clock: TimeStamp)
-
-/** Probe the latency between two upstream to downstream tasks. */
-case class LatencyProbe(timestamp: Long)
-
-case class SendMessageLoss()
-
-case object GetDAG
-
-case class CheckProcessorDeath(processorId: ProcessorId)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/task/TaskId.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskId.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskId.scala
deleted file mode 100644
index 66c3c52..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskId.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.task
-
-import io.gearpump.streaming._
-
-case class TaskId(processorId: ProcessorId, index: TaskIndex)
-
-object TaskId {
- def toLong(id: TaskId): Long = (id.processorId.toLong << 32) + id.index
- def fromLong(id: Long): TaskId = TaskId(((id >> 32) & 0xFFFFFFFF).toInt, (id & 0xFFFFFFFF).toInt)
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/task/TaskMessageSerializer.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskMessageSerializer.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskMessageSerializer.scala
deleted file mode 100644
index 500f8b3..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskMessageSerializer.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.task
-
-import java.io.{DataInput, DataOutput}
-
-trait TaskMessageSerializer[T] {
- def write(dataOutput: DataOutput, obj: T)
-
- def read(dataInput: DataInput): T
-
- def getLength(obj: T): Int
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/task/TaskUtil.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskUtil.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskUtil.scala
deleted file mode 100644
index 040fc2e..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskUtil.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.task
-
-object TaskUtil {
-
- /**
- * Resolves a classname to a Task class.
- *
- * @param className the class name to resolve
- * @return resolved class
- */
- def loadClass(className: String): Class[_ <: Task] = {
- val loader = Thread.currentThread().getContextClassLoader()
- loader.loadClass(className).asSubclass(classOf[Task])
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/task/TaskWrapper.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskWrapper.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskWrapper.scala
deleted file mode 100644
index e7e883c..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskWrapper.scala
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.task
-
-import scala.concurrent.duration.FiniteDuration
-
-import akka.actor.Actor._
-import akka.actor.{ActorRef, ActorSystem, Cancellable, Props}
-import org.slf4j.Logger
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.util.LogUtil
-import io.gearpump.{Message, TimeStamp}
-
-/**
- * This provides TaskContext for user defined tasks
- *
- * @param taskClass task class
- * @param context context class
- * @param userConf user config
- */
-class TaskWrapper(
- val taskId: TaskId, taskClass: Class[_ <: Task], context: TaskContextData,
- userConf: UserConfig) extends TaskContext with TaskInterface {
-
- private val LOG = LogUtil.getLogger(taskClass, task = taskId)
-
- private var actor: TaskActor = null
-
- private var task: Option[Task] = None
-
- def setTaskActor(actor: TaskActor): Unit = this.actor = actor
-
- override def appId: Int = context.appId
-
- override def appName: String = context.appName
-
- override def executorId: Int = context.executorId
-
- override def parallelism: Int = context.parallelism
-
- override def appMaster: ActorRef = context.appMaster
-
- override def output(msg: Message): Unit = actor.output(msg)
-
- /**
- * See [[io.gearpump.streaming.task.TaskActor]] output(arrayIndex: Int, msg: Message): Unit
- *
- * @param index, not same as ProcessorId
- */
- def output(index: Int, msg: Message): Unit = actor.output(index, msg)
-
- /**
- * Use with caution, output unmanaged message to target tasks
- *
- * @param msg message to output
- * @param tasks the tasks to output to
- */
- def outputUnManaged(msg: AnyRef, tasks: TaskId*): Unit = {
- actor.transport(msg, tasks: _*)
- }
-
- override def self: ActorRef = actor.context.self
-
- override def sender: ActorRef = actor.context.sender()
-
- def system: ActorSystem = actor.context.system
-
- override def actorOf(props: Props): ActorRef = actor.context.actorOf(props)
-
- override def actorOf(props: Props, name: String): ActorRef = actor.context.actorOf(props, name)
-
- override def onStart(startTime: StartTime): Unit = {
- if (None != task) {
- LOG.error(s"Task.onStart should not be called multiple times... ${task.getClass}")
- }
- val constructor = taskClass.getConstructor(classOf[TaskContext], classOf[UserConfig])
- task = Some(constructor.newInstance(this, userConf))
- task.foreach(_.onStart(startTime))
- }
-
- override def onNext(msg: Message): Unit = task.foreach(_.onNext(msg))
-
- override def onStop(): Unit = {
- task.foreach(_.onStop())
- task = None
- }
-
- override def receiveUnManagedMessage: Receive = {
- task.map(_.receiveUnManagedMessage).getOrElse(defaultMessageHandler)
- }
-
- override def upstreamMinClock: TimeStamp = {
- actor.getUpstreamMinClock
- }
-
- def schedule(initialDelay: FiniteDuration, interval: FiniteDuration)(f: => Unit): Cancellable = {
- val dispatcher = actor.context.system.dispatcher
- actor.context.system.scheduler.schedule(initialDelay, interval)(f)(dispatcher)
- }
-
- def scheduleOnce(initialDelay: FiniteDuration)(f: => Unit): Cancellable = {
- val dispatcher = actor.context.system.dispatcher
- actor.context.system.scheduler.scheduleOnce(initialDelay)(f)(dispatcher)
- }
-
- private def defaultMessageHandler: Receive = {
- case msg =>
- LOG.error("Failed! Received unknown message " + "taskId: " + taskId + ", " + msg.toString)
- }
-
- /**
- * Logger is environment dependant, it should be provided by
- * containing environment.
- */
- override def logger: Logger = LOG
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/transaction/api/CheckpointStore.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/CheckpointStore.scala b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/CheckpointStore.scala
deleted file mode 100644
index f3894ea..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/CheckpointStore.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.transaction.api
-
-import io.gearpump.TimeStamp
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.task.TaskContext
-
-/**
- * CheckpointStore persistently stores mapping of timestamp to checkpoint
- * it's possible that two checkpoints have the same timestamp
- * CheckpointStore needs to handle this either during write or read
- */
-trait CheckpointStore {
-
- def persist(timeStamp: TimeStamp, checkpoint: Array[Byte]): Unit
-
- def recover(timestamp: TimeStamp): Option[Array[Byte]]
-
- def close(): Unit
-}
-
-trait CheckpointStoreFactory extends java.io.Serializable {
- def getCheckpointStore(conf: UserConfig, taskContext: TaskContext): CheckpointStore
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/transaction/api/MessageDecoder.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/MessageDecoder.scala b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/MessageDecoder.scala
deleted file mode 100644
index 7039b71..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/MessageDecoder.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.transaction.api
-
-import io.gearpump.Message
-
-/**
- * MessageDecoder decodes raw bytes to Message It is usually written by end user and
- * passed into TimeReplayableSource
- */
-trait MessageDecoder extends java.io.Serializable {
- def fromBytes(bytes: Array[Byte]): Message
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetManager.scala b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetManager.scala
deleted file mode 100644
index 412ddcc..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetManager.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.transaction.api
-
-import scala.util.Try
-
-import io.gearpump.{Message, TimeStamp}
-
-/**
- * Filters offsets and store the mapping from timestamp to offset
- */
-trait MessageFilter {
- def filter(messageAndOffset: (Message, Long)): Option[Message]
-}
-
-/**
- * Resolves timestamp to offset by look up the underlying storage
- */
-trait OffsetTimeStampResolver {
- def resolveOffset(time: TimeStamp): Try[Long]
-}
-
-/**
- * Manages message's offset on TimeReplayableSource and timestamp
- */
-trait OffsetManager extends MessageFilter with OffsetTimeStampResolver {
- def close(): Unit
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetStorage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetStorage.scala b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetStorage.scala
deleted file mode 100644
index fa7161c..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetStorage.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.transaction.api
-
-import scala.util.Try
-
-import io.gearpump.TimeStamp
-
-object OffsetStorage {
-
- /**
- * StorageEmpty means no data has been stored
- */
- case object StorageEmpty extends Throwable
-
- /**
- * Overflow means the looked up time is
- * larger than the maximum stored TimeStamp
- */
- case class Overflow(maxTimestamp: Array[Byte]) extends Throwable
-
- /**
- * Underflow means the looked up time is
- * smaller than the minimum stored TimeStamp
- */
- case class Underflow(minTimestamp: Array[Byte]) extends Throwable
-}
-
-/**
- * OffsetStorage stores the mapping from TimeStamp to Offset
- */
-trait OffsetStorage {
- /**
- * Tries to look up the time in the OffsetStorage return the corresponding Offset if the time is
- * in the range of stored TimeStamps or one of the failure info (StorageEmpty, Overflow,
- * Underflow)
- *
- * @param time the time to look for
- * @return the corresponding offset if the time is in the range, otherwise failure
- */
- def lookUp(time: TimeStamp): Try[Array[Byte]]
-
- def append(time: TimeStamp, offset: Array[Byte]): Unit
-
- def close(): Unit
-}
-
-trait OffsetStorageFactory extends java.io.Serializable {
- def getOffsetStorage(dir: String): OffsetStorage
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeReplayableSource.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeReplayableSource.scala b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeReplayableSource.scala
deleted file mode 100644
index 50711ee..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeReplayableSource.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.transaction.api
-
-import io.gearpump.streaming.source.DataSource
-
-/**
- * AT-LEAST-ONCE API. Represents a data source which allow replaying.
- *
- * Subclass should be able to replay messages on recovery from the time
- * when an application crashed.
- */
-trait TimeReplayableSource extends DataSource
-
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeStampFilter.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeStampFilter.scala b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeStampFilter.scala
deleted file mode 100644
index 7c34e1a..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeStampFilter.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.transaction.api
-
-import io.gearpump.{Message, TimeStamp}
-
-/**
- * TimeStampFilter filters out messages that are obsolete.
- */
-trait TimeStampFilter extends java.io.Serializable {
- def filter(msg: Message, predicate: TimeStamp): Option[Message]
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/util/ActorPathUtil.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/util/ActorPathUtil.scala b/streaming/src/main/scala/io/gearpump/streaming/util/ActorPathUtil.scala
deleted file mode 100644
index c2ac32a..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/util/ActorPathUtil.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.util
-
-import akka.actor.{ActorPath, ActorRef}
-
-import io.gearpump.streaming.task.TaskId
-
-object ActorPathUtil {
-
- def executorActorName(executorId: Int): String = executorId.toString
-
- def taskActorName(taskId: TaskId): String = {
- s"processor_${taskId.processorId}_task_${taskId.index}"
- }
-
- def taskActorPath(appMaster: ActorRef, executorId: Int, taskId: TaskId): ActorPath = {
- val executorManager = appMaster.path.child(executorManagerActorName)
- val executor = executorManager.child(executorActorName(executorId))
- val task = executor.child(taskActorName(taskId))
- task
- }
-
- def executorManagerActorName: String = "executors"
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala
new file mode 100644
index 0000000..4b801a2
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming
+
+import scala.language.existentials
+
+import akka.actor.ActorRef
+
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.cluster.appmaster.WorkerInfo
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.streaming.appmaster.TaskRegistry.TaskLocations
+import org.apache.gearpump.streaming.task.{Subscriber, TaskId}
+import org.apache.gearpump.transport.HostPort
+
+object AppMasterToExecutor {
+ case class LaunchTasks(
+ taskId: List[TaskId], dagVersion: Int, processorDescription: ProcessorDescription,
+ subscribers: List[Subscriber])
+
+ case object TasksLaunched
+
+ /**
+ * dagVersion, life, and subscribers will be changed on target task list.
+ */
+ case class ChangeTasks(
+ taskId: List[TaskId], dagVersion: Int, life: LifeTime, subscribers: List[Subscriber])
+
+ case class TasksChanged(taskIds: List[TaskId])
+
+ case class ChangeTask(
+ taskId: TaskId, dagVersion: Int, life: LifeTime, subscribers: List[Subscriber])
+
+ case class TaskChanged(taskId: TaskId, dagVersion: Int)
+
+ case class StartTask(taskId: TaskId)
+
+ case class StopTask(taskId: TaskId)
+
+ case class TaskLocationsReady(taskLocations: TaskLocations, dagVersion: Int)
+
+ case class TaskLocationsReceived(dagVersion: Int, executorId: ExecutorId)
+
+ case class TaskLocationsRejected(
+ dagVersion: Int, executorId: ExecutorId, reason: String, ex: Throwable)
+
+ case class StartAllTasks(dagVersion: Int)
+
+ case class StartDynamicDag(dagVersion: Int)
+ case class TaskRegistered(taskId: TaskId, sessionId: Int, startClock: TimeStamp)
+ case class TaskRejected(taskId: TaskId)
+
+ case object RestartClockService
+ class MsgLostException extends Exception
+}
+
+object ExecutorToAppMaster {
+ case class RegisterExecutor(
+ executor: ActorRef, executorId: Int, resource: Resource, worker : WorkerInfo)
+
+ case class RegisterTask(taskId: TaskId, executorId: Int, task: HostPort)
+ case class UnRegisterTask(taskId: TaskId, executorId: Int)
+
+ case class MessageLoss(executorId: Int, taskId: TaskId, cause: String)
+}
+
+object AppMasterToMaster {
+ case class StallingTasks(tasks: List[TaskId])
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
new file mode 100644
index 0000000..445f26c
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming
+
+object Constants {
+
+ val GEARPUMP_STREAMING_OPERATOR = "gearpump.streaming.dsl.operator"
+ val GEARPUMP_STREAMING_SOURCE = "gearpump.streaming.dsl.source"
+ val GEARPUMP_STREAMING_SINK = "gearpump.streaming.dsl.sink"
+ val GEARPUMP_STREAMING_GROUPBY_FUNCTION = "gearpump.streaming.dsl.groupby-function"
+
+ val GEARPUMP_STREAMING_LOCALITIES = "gearpump.streaming.localities"
+
+ val GEARPUMP_STREAMING_REGISTER_TASK_TIMEOUT_MS = "gearpump.streaming.register-task-timeout-ms"
+
+ val GEARPUMP_STREAMING_MAX_PENDING_MESSAGE_COUNT =
+ "gearpump.streaming.max-pending-message-count-per-connection"
+
+ val GEARPUMP_STREAMING_ACK_ONCE_EVERY_MESSAGE_COUNT =
+ "gearpump.streaming.ack-once-every-message-count"
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala
new file mode 100644
index 0000000..4a94ad3
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming
+
+import org.apache.gearpump.partitioner.PartitionerDescription
+import org.apache.gearpump.streaming.task.TaskId
+import org.apache.gearpump.util.Graph
+
+/**
+ * DAG is wrapper for [[org.apache.gearpump.util.Graph]] for streaming applications.
+ */
+case class DAG(version: Int, processors : Map[ProcessorId, ProcessorDescription],
+ graph : Graph[ProcessorId, PartitionerDescription])
+ extends Serializable {
+
+ def isEmpty: Boolean = {
+ processors.isEmpty
+ }
+
+ def taskCount: Int = {
+ processors.foldLeft(0) { (count, task) =>
+ count + task._2.parallelism
+ }
+ }
+
+ def tasks: List[TaskId] = {
+ processors.flatMap { pair =>
+ val (processorId, processor) = pair
+ (0 until processor.parallelism).map(TaskId(processorId, _))
+ }.toList
+ }
+}
+
+object DAG {
+ def apply(graph: Graph[ProcessorDescription, PartitionerDescription], version: Int = 0): DAG = {
+ val processors = graph.vertices.map { processorDescription =>
+ (processorDescription.id, processorDescription)
+ }.toMap
+ val dag = graph.mapVertex { processor =>
+ processor.id
+ }
+ new DAG(version, processors, dag)
+ }
+
+ def empty: DAG = apply(Graph.empty[ProcessorDescription, PartitionerDescription])
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/MessageSerializer.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/MessageSerializer.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/MessageSerializer.scala
new file mode 100644
index 0000000..20e2529
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/MessageSerializer.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming
+
+import java.io.{DataInput, DataOutput}
+
+import org.apache.gearpump.streaming.task._
+
+class TaskIdSerializer extends TaskMessageSerializer[TaskId] {
+ override def getLength(obj: TaskId): Int = 8
+
+ override def write(dataOutput: DataOutput, obj: TaskId): Unit = {
+ dataOutput.writeInt(obj.processorId)
+ dataOutput.writeInt(obj.index)
+ }
+
+ override def read(dataInput: DataInput): TaskId = {
+ val processorId = dataInput.readInt()
+ val index = dataInput.readInt()
+ new TaskId(processorId, index)
+ }
+}
+
+class AckSerializer extends TaskMessageSerializer[Ack] {
+ val taskIdSerializer = new TaskIdSerializer
+
+ override def getLength(obj: Ack): Int = taskIdSerializer.getLength(obj.taskId) + 8
+
+ override def write(dataOutput: DataOutput, obj: Ack): Unit = {
+ taskIdSerializer.write(dataOutput, obj.taskId)
+ dataOutput.writeShort(obj.seq)
+ dataOutput.writeShort(obj.actualReceivedNum)
+ dataOutput.writeInt(obj.sessionId)
+ }
+
+ override def read(dataInput: DataInput): Ack = {
+ val taskId = taskIdSerializer.read(dataInput)
+ val seq = dataInput.readShort()
+ val actualReceivedNum = dataInput.readShort()
+ val sessionId = dataInput.readInt()
+ Ack(taskId, seq, actualReceivedNum, sessionId)
+ }
+}
+
+class InitialAckRequestSerializer extends TaskMessageSerializer[InitialAckRequest] {
+ val taskIdSerialzer = new TaskIdSerializer()
+
+ override def getLength(obj: InitialAckRequest): Int = taskIdSerialzer.getLength(obj.taskId) + 4
+
+ override def write(dataOutput: DataOutput, obj: InitialAckRequest): Unit = {
+ taskIdSerialzer.write(dataOutput, obj.taskId)
+ dataOutput.writeInt(obj.sessionId)
+ }
+
+ override def read(dataInput: DataInput): InitialAckRequest = {
+ val taskId = taskIdSerialzer.read(dataInput)
+ val sessionId = dataInput.readInt()
+ InitialAckRequest(taskId, sessionId)
+ }
+}
+
+class AckRequestSerializer extends TaskMessageSerializer[AckRequest] {
+ val taskIdSerializer = new TaskIdSerializer
+
+ override def getLength(obj: AckRequest): Int = taskIdSerializer.getLength(obj.taskId) + 6
+
+ override def write(dataOutput: DataOutput, obj: AckRequest): Unit = {
+ taskIdSerializer.write(dataOutput, obj.taskId)
+ dataOutput.writeShort(obj.seq)
+ dataOutput.writeInt(obj.sessionId)
+ }
+
+ override def read(dataInput: DataInput): AckRequest = {
+ val taskId = taskIdSerializer.read(dataInput)
+ val seq = dataInput.readShort()
+ val sessionId = dataInput.readInt()
+ AckRequest(taskId, seq, sessionId)
+ }
+}
+
+class LatencyProbeSerializer extends TaskMessageSerializer[LatencyProbe] {
+ override def getLength(obj: LatencyProbe): Int = 8
+
+ override def write(dataOutput: DataOutput, obj: LatencyProbe): Unit = {
+ dataOutput.writeLong(obj.timestamp)
+ }
+
+ override def read(dataInput: DataInput): LatencyProbe = {
+ val timestamp = dataInput.readLong()
+ LatencyProbe(timestamp)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
new file mode 100644
index 0000000..66ec873
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming
+
+import scala.language.implicitConversions
+import scala.reflect.ClassTag
+
+import akka.actor.ActorSystem
+
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.cluster._
+import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner, PartitionerDescription, PartitionerObject}
+import org.apache.gearpump.streaming.appmaster.AppMaster
+import org.apache.gearpump.streaming.task.Task
+import org.apache.gearpump.util.{Graph, LogUtil, ReferenceEqual}
+
+/**
+ * Processor is the blueprint for tasks.
+ */
+trait Processor[+T <: Task] extends ReferenceEqual {
+
+ /**
+ * How many tasks you want to use for this processor.
+ */
+ def parallelism: Int
+
+ /**
+ * The custom [[org.apache.gearpump.cluster.UserConfig]], it is used to
+ * initialize a task in runtime.
+ */
+ def taskConf: UserConfig
+
+ /**
+ * Some description text for this processor.
+ */
+ def description: String
+
+ /**
+ * The task class, should be a subtype of Task.
+ *
+ * Each runtime instance of this class is a task.
+ */
+ def taskClass: Class[_ <: Task]
+}
+
+object Processor {
+ def ProcessorToProcessorDescription(id: ProcessorId, processor: Processor[_ <: Task])
+ : ProcessorDescription = {
+ import processor._
+ ProcessorDescription(id, taskClass.getName, parallelism, description, taskConf)
+ }
+
+ def apply[T<: Task](
+ parallelism : Int, description: String = "",
+ taskConf: UserConfig = UserConfig.empty)(implicit classtag: ClassTag[T])
+ : DefaultProcessor[T] = {
+ new DefaultProcessor[T](parallelism, description, taskConf,
+ classtag.runtimeClass.asInstanceOf[Class[T]])
+ }
+
+ def apply[T<: Task](
+ taskClazz: Class[T], parallelism : Int, description: String, taskConf: UserConfig)
+ : DefaultProcessor[T] = {
+ new DefaultProcessor[T](parallelism, description, taskConf, taskClazz)
+ }
+
+ case class DefaultProcessor[T<: Task](
+ parallelism : Int, description: String, taskConf: UserConfig, taskClass: Class[T])
+ extends Processor[T] {
+
+ def withParallelism(parallel: Int): DefaultProcessor[T] = {
+ new DefaultProcessor[T](parallel, description, taskConf, taskClass)
+ }
+
+ def withDescription(desc: String): DefaultProcessor[T] = {
+ new DefaultProcessor[T](parallelism, desc, taskConf, taskClass)
+ }
+
+ def withConfig(conf: UserConfig): DefaultProcessor[T] = {
+ new DefaultProcessor[T](parallelism, description, conf, taskClass)
+ }
+ }
+}
+
+/**
+ * Each processor has a LifeTime.
+ *
+ * When input message's timestamp is beyond current processor's lifetime,
+ * then it will not be processed by this processor.
+ */
+case class LifeTime(birth: TimeStamp, death: TimeStamp) {
+ def contains(timestamp: TimeStamp): Boolean = {
+ timestamp >= birth && timestamp < death
+ }
+
+ def cross(another: LifeTime): LifeTime = {
+ LifeTime(Math.max(birth, another.birth), Math.min(death, another.death))
+ }
+}
+
+object LifeTime {
+ val Immortal = LifeTime(0L, Long.MaxValue)
+}
+
+/**
+ * Represent a streaming application
+ */
+class StreamApplication(
+ override val name: String, val inputUserConfig: UserConfig,
+ val dag: Graph[ProcessorDescription, PartitionerDescription])
+ extends Application {
+
+ require(!dag.hasDuplicatedEdge(), "Graph should not have duplicated edges")
+
+ override def appMaster: Class[_ <: ApplicationMaster] = classOf[AppMaster]
+ override def userConfig(implicit system: ActorSystem): UserConfig = {
+ inputUserConfig.withValue(StreamApplication.DAG, dag)
+ }
+}
+
+case class ProcessorDescription(
+ id: ProcessorId,
+ taskClass: String,
+ parallelism : Int,
+ description: String = "",
+ taskConf: UserConfig = null,
+ life: LifeTime = LifeTime.Immortal,
+ jar: AppJar = null) extends ReferenceEqual
+
+object StreamApplication {
+
+ private val hashPartitioner = new HashPartitioner()
+ private val LOG = LogUtil.getLogger(getClass)
+
+ def apply[T <: Processor[Task], P <: Partitioner](
+ name: String, dag: Graph[T, P], userConfig: UserConfig): StreamApplication = {
+ import org.apache.gearpump.streaming.Processor._
+
+ if (dag.hasCycle()) {
+ LOG.warn(s"Detected cycles in DAG of application $name!")
+ }
+
+ val indices = dag.topologicalOrderWithCirclesIterator.toList.zipWithIndex.toMap
+ val graph = dag.mapVertex { processor =>
+ val updatedProcessor = ProcessorToProcessorDescription(indices(processor), processor)
+ updatedProcessor
+ }.mapEdge { (node1, edge, node2) =>
+ PartitionerDescription(new PartitionerObject(
+ Option(edge).getOrElse(StreamApplication.hashPartitioner)))
+ }
+ new StreamApplication(name, userConfig, graph)
+ }
+
+ val DAG = "DAG"
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
new file mode 100644
index 0000000..7c08b9b
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.appmaster
+
+import java.lang.management.ManagementFactory
+
+import akka.actor._
+import org.apache.gearpump._
+import org.apache.gearpump.cluster.ClientToMaster.{GetLastFailure, GetStallingTasks, QueryHistoryMetrics, ShutdownApplication}
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterDataDetailRequest, ReplayFromTimestampWindowTrailingEdge}
+import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, LastFailure}
+import org.apache.gearpump.cluster._
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.metrics.Metrics.ReportMetrics
+import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService}
+import org.apache.gearpump.partitioner.PartitionerDescription
+import org.apache.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, RegisterExecutor, RegisterTask, UnRegisterTask}
+import org.apache.gearpump.streaming._
+import org.apache.gearpump.streaming.appmaster.AppMaster._
+import org.apache.gearpump.streaming.appmaster.DagManager.{GetLatestDAG, LatestDAG, ReplaceProcessor}
+import org.apache.gearpump.streaming.appmaster.ExecutorManager.{ExecutorInfo, GetExecutorInfo}
+import org.apache.gearpump.streaming.appmaster.TaskManager.{FailedToRecover, GetTaskList, TaskList}
+import org.apache.gearpump.streaming.executor.Executor.{ExecutorConfig, ExecutorSummary, GetExecutorSummary, QueryExecutorConfig}
+import org.apache.gearpump.streaming.storage.InMemoryAppStoreOnMaster
+import org.apache.gearpump.streaming.task._
+import org.apache.gearpump.streaming.util.ActorPathUtil
+import org.apache.gearpump.util.Constants.{APPMASTER_DEFAULT_EXECUTOR_ID, _}
+import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
+import org.apache.gearpump.util._
+import org.slf4j.Logger
+
+import scala.concurrent.Future
+
+/**
+ * AppMaster is the head of a streaming application.
+ *
+ * It contains:
+ * 1. ExecutorManager to manage all executors.
+ * 2. TaskManager to manage all tasks,
+ * 3. ClockService to track the global clock for this streaming application.
+ * 4. Scheduler to decide which a task should be scheduled to.
+ */
+class AppMaster(appContext: AppMasterContext, app: AppDescription) extends ApplicationMaster {
+ import app.userConfig
+ import appContext.{appId, masterProxy, username}
+
+ private implicit val actorSystem = context.system
+ private implicit val timeOut = FUTURE_TIMEOUT
+
+ import akka.pattern.ask
+ private implicit val dispatcher = context.dispatcher
+
+ private val startTime: TimeStamp = System.currentTimeMillis()
+
+ private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
+ LOG.info(s"AppMaster[$appId] is launched by $username, app: $app xxxxxxxxxxxxxxxxx")
+ LOG.info(s"AppMaster actor path: ${ActorUtil.getFullPath(context.system, self.path)}")
+
+ private val address = ActorUtil.getFullPath(context.system, self.path)
+
+ private val store = new InMemoryAppStoreOnMaster(appId, appContext.masterProxy)
+ private val dagManager = context.actorOf(Props(new DagManager(appContext.appId, userConfig, store,
+ Some(getUpdatedDAG()))))
+
+ private var taskManager: Option[ActorRef] = None
+ private var clockService: Option[ActorRef] = None
+ private val systemConfig = context.system.settings.config
+ private var lastFailure = LastFailure(0L, null)
+
+ private val appMasterBrief = ExecutorBrief(APPMASTER_DEFAULT_EXECUTOR_ID,
+ self.path.toString,
+ Option(appContext.workerInfo).map(_.workerId).getOrElse(WorkerId.unspecified), "active")
+
+ private val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig)
+
+ private val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED)
+
+ private val userDir = System.getProperty("user.dir")
+ private val logFile = LogUtil.applicationLogDir(actorSystem.settings.config)
+
+ private val appMasterExecutorSummary = ExecutorSummary(
+ APPMASTER_DEFAULT_EXECUTOR_ID,
+ Option(appContext.workerInfo).map(_.workerId).getOrElse(WorkerId.unspecified),
+ self.path.toString,
+ logFile.getAbsolutePath,
+ status = "Active",
+ taskCount = 0,
+ tasks = Map.empty[ProcessorId, List[TaskId]],
+ jvmName = ManagementFactory.getRuntimeMXBean().getName()
+ )
+
+ private val historyMetricsService = if (metricsEnabled) {
+ // Registers jvm metrics
+ Metrics(context.system).register(new JvmMetricsSet(
+ s"app${appId}.executor${APPMASTER_DEFAULT_EXECUTOR_ID}"))
+
+ val historyMetricsService = context.actorOf(Props(new HistoryMetricsService(
+ s"app$appId", getHistoryMetricsConfig)))
+
+ val metricsReportService = context.actorOf(Props(
+ new MetricsReporterService(Metrics(context.system))))
+ historyMetricsService.tell(ReportMetrics, metricsReportService)
+
+ Some(historyMetricsService)
+ } else {
+ None
+ }
+
+ private val executorManager: ActorRef =
+ context.actorOf(ExecutorManager.props(userConfig, appContext, app.clusterConfig, app.name),
+ ActorPathUtil.executorManagerActorName)
+
+ for (dag <- getDAG) {
+ clockService = Some(context.actorOf(Props(new ClockService(dag, store))))
+ val jarScheduler = new JarScheduler(appId, app.name, systemConfig, context)
+
+ taskManager = Some(context.actorOf(Props(new TaskManager(appContext.appId, dagManager,
+ jarScheduler, executorManager, clockService.get, self, app.name))))
+ }
+
+ override def receive: Receive = {
+ taskMessageHandler orElse
+ executorMessageHandler orElse
+ recover orElse
+ appMasterService orElse
+ ActorUtil.defaultMsgHandler(self)
+ }
+
+ /** Handles messages from Tasks */
+ def taskMessageHandler: Receive = {
+ case clock: ClockEvent =>
+ taskManager.foreach(_ forward clock)
+ case register: RegisterTask =>
+ taskManager.foreach(_ forward register)
+ case unRegister: UnRegisterTask =>
+ taskManager.foreach(_ forward unRegister)
+ // Checks whether this processor dead, if it is, then we should remove it from clockService.
+ clockService.foreach(_ forward CheckProcessorDeath(unRegister.taskId.processorId))
+ case replay: ReplayFromTimestampWindowTrailingEdge =>
+ taskManager.foreach(_ forward replay)
+ case messageLoss: MessageLoss =>
+ lastFailure = LastFailure(System.currentTimeMillis(), messageLoss.cause)
+ taskManager.foreach(_ forward messageLoss)
+ case lookupTask: LookupTaskActorRef =>
+ taskManager.foreach(_ forward lookupTask)
+ case checkpoint: ReportCheckpointClock =>
+ clockService.foreach(_ forward checkpoint)
+ case GetDAG =>
+ val task = sender
+ getDAG.foreach {
+ dag => task ! dag
+ }
+ case GetCheckpointClock =>
+ clockService.foreach(_ forward GetCheckpointClock)
+ }
+
+ /** Handles messages from Executors */
+ def executorMessageHandler: Receive = {
+ case register: RegisterExecutor =>
+ executorManager forward register
+ case ReportMetrics =>
+ historyMetricsService.foreach(_ forward ReportMetrics)
+ }
+
+ /** Handles messages from AppMaster */
+ def appMasterService: Receive = {
+ case appMasterDataDetailRequest: AppMasterDataDetailRequest =>
+ LOG.debug(s"AppMaster got AppMasterDataDetailRequest for $appId ")
+
+ val executorsFuture = executorBrief
+ val clockFuture = getMinClock
+ val taskFuture = getTaskList
+ val dagFuture = getDAG
+
+ val appMasterDataDetail = for {
+ executors <- executorsFuture
+ clock <- clockFuture
+ tasks <- taskFuture
+ dag <- dagFuture
+ } yield {
+ val graph = dag.graph
+
+ val executorToTasks = tasks.tasks.groupBy(_._2).mapValues {
+ _.keys.toList
+ }
+
+ val processors = dag.processors.map { kv =>
+ val processor = kv._2
+ import processor._
+ val tasks = executorToTasks.map { kv =>
+ (kv._1, TaskCount(kv._2.count(_.processorId == id)))
+ }.filter(_._2.count != 0)
+ (id,
+ ProcessorSummary(id, taskClass, parallelism, description, taskConf, life,
+ tasks.keys.toList, tasks))
+ }
+
+ StreamAppMasterSummary(
+ appId = appId,
+ appName = app.name,
+ actorPath = address,
+ clock = clock,
+ status = MasterToAppMaster.AppMasterActive,
+ startTime = startTime,
+ uptime = System.currentTimeMillis() - startTime,
+ user = username,
+ homeDirectory = userDir,
+ logFile = logFile.getAbsolutePath,
+ processors = processors,
+ processorLevels = graph.vertexHierarchyLevelMap(),
+ dag = graph.mapEdge { (node1, edge, node2) =>
+ edge.partitionerFactory.name
+ },
+ executors = executors,
+ historyMetricsConfig = getHistoryMetricsConfig
+ )
+ }
+
+ val client = sender()
+
+ appMasterDataDetail.map { appData =>
+ client ! appData
+ }
+ // TODO: WebSocket is buggy and disabled.
+ // case appMasterMetricsRequest: AppMasterMetricsRequest =>
+ // val client = sender()
+ // actorSystem.eventStream.subscribe(client, classOf[MetricType])
+ case query: QueryHistoryMetrics =>
+ if (historyMetricsService.isEmpty) {
+ // Returns empty metrics so that we don't hang the UI
+ sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem])
+ } else {
+ historyMetricsService.get forward query
+ }
+ case getStalling: GetStallingTasks =>
+ clockService.foreach(_ forward getStalling)
+ case replaceDAG: ReplaceProcessor =>
+ dagManager forward replaceDAG
+ case GetLastFailure(_) =>
+ sender ! lastFailure
+ case get@GetExecutorSummary(executorId) =>
+ val client = sender
+ if (executorId == APPMASTER_DEFAULT_EXECUTOR_ID) {
+ client ! appMasterExecutorSummary
+ } else {
+ ActorUtil.askActor[Map[ExecutorId, ExecutorInfo]](executorManager, GetExecutorInfo)
+ .map { map =>
+ map.get(executorId).foreach { executor =>
+ executor.executor.tell(get, client)
+ }
+ }
+ }
+ case query@QueryExecutorConfig(executorId) =>
+ val client = sender
+ if (executorId == -1) {
+ val systemConfig = context.system.settings.config
+ sender ! ExecutorConfig(ClusterConfig.filterOutDefaultConfig(systemConfig))
+ } else {
+ ActorUtil.askActor[Map[ExecutorId, ExecutorInfo]](executorManager, GetExecutorInfo)
+ .map { map =>
+ map.get(executorId).foreach { executor =>
+ executor.executor.tell(query, client)
+ }
+ }
+ }
+ }
+
+ /** Error handling */
+ def recover: Receive = {
+ case FailedToRecover(errorMsg) =>
+ if (context.children.toList.contains(sender())) {
+ LOG.error(errorMsg)
+ masterProxy ! ShutdownApplication(appId)
+ }
+ case AllocateResourceTimeOut =>
+ LOG.error(s"Failed to allocate resource in time, shutdown application $appId")
+ masterProxy ! ShutdownApplication(appId)
+ context.stop(self)
+ }
+
+ private def getMinClock: Future[TimeStamp] = {
+ clockService match {
+ case Some(clockService) =>
+ (clockService ? GetLatestMinClock).asInstanceOf[Future[LatestMinClock]].map(_.clock)
+ case None =>
+ Future.failed(new ServiceNotAvailableException("clock service not ready"))
+ }
+ }
+
+ private def executorBrief: Future[List[ExecutorBrief]] = {
+ ActorUtil.askActor[Map[ExecutorId, ExecutorInfo]](executorManager, GetExecutorInfo)
+ .map { infos =>
+ infos.values.map { info =>
+ ExecutorBrief(info.executorId,
+ info.executor.path.toSerializationFormat,
+ info.worker.workerId,
+ "active")
+ }.toList :+ appMasterBrief
+ }
+ }
+
+ private def getTaskList: Future[TaskList] = {
+ taskManager match {
+ case Some(taskManager) =>
+ (taskManager ? GetTaskList).asInstanceOf[Future[TaskList]]
+ case None =>
+ Future.failed(new ServiceNotAvailableException("task manager not ready"))
+ }
+ }
+
+ private def getDAG: Future[DAG] = {
+ (dagManager ? GetLatestDAG).asInstanceOf[Future[LatestDAG]].map(_.dag)
+ }
+
+ private def getUpdatedDAG(): DAG = {
+ val dag = DAG(userConfig.getValue[Graph[ProcessorDescription,
+ PartitionerDescription]](StreamApplication.DAG).get)
+ val updated = dag.processors.map { idAndProcessor =>
+ val (id, oldProcessor) = idAndProcessor
+ val newProcessor = if (oldProcessor.jar == null) {
+ oldProcessor.copy(jar = appContext.appJar.getOrElse(null))
+ } else {
+ oldProcessor
+ }
+ (id, newProcessor)
+ }
+ DAG(dag.version, updated, dag.graph)
+ }
+}
+
+object AppMaster {
+
+ /** Master node doesn't return resource in time */
+ case object AllocateResourceTimeOut
+
+ /** Query task ActorRef by providing the taskId */
+ case class LookupTaskActorRef(taskId: TaskId)
+
+ case class TaskActorRef(task: ActorRef)
+
+ class ServiceNotAvailableException(reason: String) extends Exception(reason)
+
+ case class ExecutorBrief(
+ executorId: ExecutorId, executor: String, workerId: WorkerId, status: String)
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
new file mode 100644
index 0000000..458fded
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.appmaster
+
+import java.util
+import java.util.Date
+import java.util.concurrent.TimeUnit
+
+import akka.actor.{Actor, Cancellable, Stash}
+import io.gearpump.google.common.primitives.Longs
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.cluster.ClientToMaster.GetStallingTasks
+import org.apache.gearpump.streaming.AppMasterToMaster.StallingTasks
+import org.apache.gearpump.streaming._
+import org.apache.gearpump.streaming.appmaster.ClockService.HealthChecker.ClockValue
+import org.apache.gearpump.streaming.appmaster.ClockService._
+import org.apache.gearpump.streaming.storage.AppDataStore
+import org.apache.gearpump.streaming.task._
+import org.apache.gearpump.util.LogUtil
+import org.slf4j.Logger
+
+import scala.concurrent.Future
+import scala.concurrent.duration.FiniteDuration
+import scala.language.implicitConversions
+
+/**
+ * Maintains a global view of message timestamp in the application
+ */
+class ClockService(private var dag: DAG, store: AppDataStore) extends Actor with Stash {
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+
+ import context.dispatcher
+
+ private val healthChecker = new HealthChecker(stallingThresholdSeconds = 60)
+ private var healthCheckScheduler: Cancellable = null
+ private var snapshotScheduler: Cancellable = null
+
+ override def receive: Receive = null
+
+ override def preStart(): Unit = {
+ LOG.info("Initializing Clock service, get snapshotted StartClock ....")
+ store.get(START_CLOCK).asInstanceOf[Future[TimeStamp]].map { clock =>
+ val startClock = Option(clock).getOrElse(0L)
+
+ minCheckpointClock = Some(startClock)
+
+ // Recover the application by restarting from last persisted startClock.
+ // Only messge after startClock will be replayed.
+ self ! StoredStartClock(startClock)
+ LOG.info(s"Start Clock Retrieved, starting ClockService, startClock: $startClock")
+ }
+
+ context.become(waitForStartClock)
+ }
+
+ override def postStop(): Unit = {
+ Option(healthCheckScheduler).map(_.cancel)
+ Option(snapshotScheduler).map(_.cancel)
+ }
+
+ // Keep track of clock value of all processors.
+ private var clocks = Map.empty[ProcessorId, ProcessorClock]
+
+ // Each process can have multiple upstream processors. This keep track of the upstream clocks.
+ private var upstreamClocks = Map.empty[ProcessorId, Array[ProcessorClock]]
+
+ // We use Array instead of List for Performance consideration
+ private var processorClocks = Array.empty[ProcessorClock]
+
+ private var checkpointClocks: Map[TaskId, Vector[TimeStamp]] = null
+
+ private var minCheckpointClock: Option[TimeStamp] = None
+
+ private def checkpointEnabled(processor: ProcessorDescription): Boolean = {
+ val taskConf = processor.taskConf
+ taskConf != null && taskConf.getBoolean("state.checkpoint.enable") == Some(true)
+ }
+
+ private def resetCheckpointClocks(dag: DAG, startClock: TimeStamp): Unit = {
+ this.checkpointClocks = dag.processors.filter(startClock < _._2.life.death)
+ .filter { case (_, processor) =>
+ checkpointEnabled(processor)
+ }.flatMap { case (id, processor) =>
+ (0 until processor.parallelism).map(TaskId(id, _) -> Vector.empty[TimeStamp])
+ }
+ if (this.checkpointClocks.isEmpty) {
+ minCheckpointClock = None
+ }
+ }
+
+ private def initDag(startClock: TimeStamp): Unit = {
+ recoverDag(this.dag, startClock)
+ }
+
+ private def recoverDag(dag: DAG, startClock: TimeStamp): Unit = {
+ this.clocks = dag.processors.filter(startClock < _._2.life.death).
+ map { pair =>
+ val (processorId, processor) = pair
+ val parallelism = processor.parallelism
+ val clock = new ProcessorClock(processorId, processor.life, parallelism)
+ clock.init(startClock)
+ (processorId, clock)
+ }
+
+ this.upstreamClocks = clocks.map { pair =>
+ val (processorId, processor) = pair
+
+ val upstreams = dag.graph.incomingEdgesOf(processorId).map(_._1)
+ val upstreamClocks = upstreams.flatMap(clocks.get(_))
+ (processorId, upstreamClocks.toArray)
+ }
+
+ this.processorClocks = clocks.toArray.map(_._2)
+
+ resetCheckpointClocks(dag, startClock)
+ }
+
+ private def dynamicDAG(dag: DAG, startClock: TimeStamp): Unit = {
+ val newClocks = dag.processors.filter(startClock < _._2.life.death).
+ map { pair =>
+ val (processorId, processor) = pair
+ val parallelism = processor.parallelism
+
+ val clock = if (clocks.contains(processor.id)) {
+ clocks(processorId).copy(life = processor.life)
+ } else {
+ new ProcessorClock(processorId, processor.life, parallelism)
+ }
+ (processorId, clock)
+ }
+
+ this.clocks = newClocks
+
+ this.upstreamClocks = newClocks.map { pair =>
+ val (processorId, processor) = pair
+
+ val upstreams = dag.graph.incomingEdgesOf(processorId).map(_._1)
+ val upstreamClocks = upstreams.flatMap(newClocks.get(_))
+ (processorId, upstreamClocks.toArray)
+ }
+
+ // Inits the clock of all processors.
+ newClocks.map { pair =>
+ val (processorId, processorClock) = pair
+ val upstreamClock = getUpStreamMinClock(processorId)
+ val birth = processorClock.life.birth
+
+ if (dag.graph.inDegreeOf(processorId) == 0) {
+ processorClock.init(Longs.max(birth, startClock))
+ } else {
+ processorClock.init(upstreamClock)
+ }
+ }
+
+ this.processorClocks = clocks.toArray.map(_._2)
+
+ resetCheckpointClocks(dag, startClock)
+ }
+
+ def waitForStartClock: Receive = {
+ case StoredStartClock(startClock) =>
+ initDag(startClock)
+
+ import context.dispatcher
+
+ // Period report current clock
+ healthCheckScheduler = context.system.scheduler.schedule(
+ new FiniteDuration(5, TimeUnit.SECONDS),
+ new FiniteDuration(60, TimeUnit.SECONDS), self, HealthCheck)
+
+ // Period snpashot latest min startclock to external storage
+ snapshotScheduler = context.system.scheduler.schedule(new FiniteDuration(5, TimeUnit.SECONDS),
+ new FiniteDuration(5, TimeUnit.SECONDS), self, SnapshotStartClock)
+
+ unstashAll()
+ context.become(clockService)
+
+ case _ =>
+ stash()
+ }
+
+ private def getUpStreamMinClock(processorId: ProcessorId): TimeStamp = {
+ val clocks = upstreamClocks.get(processorId)
+ if (clocks.isDefined) {
+ if (clocks.get == null || clocks.get.length == 0) {
+ Long.MaxValue
+ } else {
+ ProcessorClocks.minClock(clocks.get)
+ }
+ } else {
+ Long.MaxValue
+ }
+ }
+
+ def clockService: Receive = {
+ case GetUpstreamMinClock(task) =>
+ sender ! UpstreamMinClock(getUpStreamMinClock(task.processorId))
+
+ case update@UpdateClock(task, clock) =>
+ val upstreamMinClock = getUpStreamMinClock(task.processorId)
+
+ val processorClock = clocks.get(task.processorId)
+ if (processorClock.isDefined) {
+ processorClock.get.updateMinClock(task.index, clock)
+ } else {
+ LOG.error(s"Cannot updateClock for task $task")
+ }
+ sender ! UpstreamMinClock(upstreamMinClock)
+
+ case GetLatestMinClock =>
+ sender ! LatestMinClock(minClock)
+
+ case GetStartClock =>
+ sender ! StartClock(getStartClock)
+
+ case deathCheck: CheckProcessorDeath =>
+ val processorId = deathCheck.processorId
+ val processorClock = clocks.get(processorId)
+ if (processorClock.isDefined) {
+ val life = processorClock.get.life
+ if (processorClock.get.min >= life.death) {
+
+ LOG.info(s"Removing $processorId from clock service...")
+ removeProcessor(processorId)
+ } else {
+ LOG.info(s"Unsuccessfully in removing $processorId from clock service...," +
+ s" min: ${processorClock.get.min}, life: $life")
+ }
+ }
+ case HealthCheck =>
+ selfCheck()
+
+ case SnapshotStartClock =>
+ snapshotStartClock()
+
+ case ReportCheckpointClock(task, time) =>
+ updateCheckpointClocks(task, time)
+
+ case GetCheckpointClock =>
+ sender ! CheckpointClock(minCheckpointClock)
+
+ case getStalling: GetStallingTasks =>
+ sender ! StallingTasks(healthChecker.getReport.stallingTasks)
+
+ case ChangeToNewDAG(dag) =>
+ if (dag.version > this.dag.version) {
+ // Transits to a new dag version
+ this.dag = dag
+ dynamicDAG(dag, getStartClock)
+ } else {
+ // Restarts current dag.
+ recoverDag(dag, getStartClock)
+ }
+ LOG.info(s"Change to new DAG(dag = ${dag.version}), send back ChangeToNewDAGSuccess")
+ sender ! ChangeToNewDAGSuccess(clocks.map { pair =>
+ val (id, clock) = pair
+ (id, clock.min)
+ })
+ }
+
+ private def removeProcessor(processorId: ProcessorId): Unit = {
+ clocks = clocks - processorId
+ processorClocks = processorClocks.filter(_.processorId != processorId)
+
+ upstreamClocks = upstreamClocks.map { pair =>
+ val (id, upstreams) = pair
+ val updatedUpstream = upstreams.filter(_.processorId != processorId)
+ (id, updatedUpstream)
+ }
+
+ upstreamClocks = upstreamClocks - processorId
+
+ // Removes dead processor from checkpoints.
+ checkpointClocks = checkpointClocks.filter { kv =>
+ val (taskId, processor) = kv
+ taskId.processorId != processorId
+ }
+ }
+
+ private def minClock: TimeStamp = {
+ ProcessorClocks.minClock(processorClocks)
+ }
+
+ def selfCheck(): Unit = {
+ val minTimestamp = minClock
+
+ if (Long.MaxValue == minTimestamp) {
+ processorClocks.foreach { clock =>
+ LOG.info(s"Processor ${clock.processorId} Clock: min: ${clock.min}, " +
+ s"taskClocks: " + clock.taskClocks.mkString(","))
+ }
+ }
+
+ healthChecker.check(minTimestamp, clocks, dag, System.currentTimeMillis())
+ }
+
+ private def getStartClock: TimeStamp = {
+ minCheckpointClock.getOrElse(minClock)
+ }
+
+ private def snapshotStartClock(): Unit = {
+ store.put(START_CLOCK, getStartClock)
+ }
+
+ private def updateCheckpointClocks(task: TaskId, time: TimeStamp): Unit = {
+ val clocks = checkpointClocks(task) :+ time
+ checkpointClocks += task -> clocks
+
+ if (checkpointClocks.forall(_._2.contains(time))) {
+ minCheckpointClock = Some(time)
+ LOG.info(s"minCheckpointTime $minCheckpointClock")
+
+ checkpointClocks = checkpointClocks.mapValues(_.dropWhile(_ <= time))
+ }
+ }
+}
+
+object ClockService {
+ val START_CLOCK = "startClock"
+
+ case object HealthCheck
+
+ class ProcessorClock(val processorId: ProcessorId, val life: LifeTime, val parallism: Int,
+ private var _min: TimeStamp = 0L, private var _taskClocks: Array[TimeStamp] = null) {
+
+ def copy(life: LifeTime): ProcessorClock = {
+ new ProcessorClock(processorId, life, parallism, _min, _taskClocks)
+ }
+
+ def min: TimeStamp = _min
+ def taskClocks: Array[TimeStamp] = _taskClocks
+
+ def init(startClock: TimeStamp): Unit = {
+ if (taskClocks == null) {
+ this._min = startClock
+ this._taskClocks = new Array(parallism)
+ util.Arrays.fill(taskClocks, startClock)
+ }
+ }
+
+ def updateMinClock(taskIndex: Int, clock: TimeStamp): Unit = {
+ taskClocks(taskIndex) = clock
+ _min = Longs.min(taskClocks: _*)
+ }
+ }
+
+ case object SnapshotStartClock
+
+ case class Report(stallingTasks: List[TaskId])
+
+ /**
+ * Check whether the clock is advancing normally
+ */
+ class HealthChecker(stallingThresholdSeconds: Int) {
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+
+ private var minClock: ClockValue = null
+ private val stallingThresholdMilliseconds = stallingThresholdSeconds * 1000
+ // 60 seconds
+ private var stallingTasks = Array.empty[TaskId]
+
+ /** Check for stalling tasks */
+ def check(
+ currentMinClock: TimeStamp, processorClocks: Map[ProcessorId, ProcessorClock],
+ dag: DAG, now: TimeStamp): Unit = {
+ var isClockStalling = false
+ if (null == minClock || currentMinClock > minClock.appClock) {
+ minClock = ClockValue(systemClock = now, appClock = currentMinClock)
+ } else {
+ // Clock not advancing
+ if (now > minClock.systemClock + stallingThresholdMilliseconds) {
+ LOG.warn(s"Clock has not advanced for ${(now - minClock.systemClock) / 1000} seconds " +
+ s"since ${minClock.prettyPrint}...")
+ isClockStalling = true
+ }
+ }
+
+ if (isClockStalling) {
+ val processorId = dag.graph.topologicalOrderWithCirclesIterator.toList.find { processorId =>
+ val clock = processorClocks.get(processorId)
+ if (clock.isDefined) {
+ clock.get.min == minClock.appClock
+ } else {
+ false
+ }
+ }
+
+ processorId.foreach { processorId =>
+ val processorClock = processorClocks(processorId)
+ val taskClocks = processorClock.taskClocks
+ stallingTasks = taskClocks.zipWithIndex.filter(_._1 == minClock.appClock).
+ map(pair => TaskId(processorId, pair._2))
+ }
+ LOG.info(s"Stalling Tasks: ${stallingTasks.mkString(",")}")
+ } else {
+ stallingTasks = Array.empty[TaskId]
+ }
+ }
+
+ def getReport: Report = {
+ Report(stallingTasks.toList)
+ }
+ }
+
+ object HealthChecker {
+ case class ClockValue(systemClock: TimeStamp, appClock: TimeStamp) {
+ def prettyPrint: String = {
+ "(system clock: " + new Date(systemClock).toString + ", app clock: " + appClock + ")"
+ }
+ }
+ }
+
+ object ProcessorClocks {
+
+ // Get the Min clock of all processors
+ def minClock(clock: Array[ProcessorClock]): TimeStamp = {
+ var i = 0
+ var min = if (clock.length == 0) 0L else clock(0).min
+ while (i < clock.length) {
+ min = Math.min(min, clock(i).min)
+ i += 1
+ }
+ min
+ }
+ }
+
+ case class ChangeToNewDAG(dag: DAG)
+ case class ChangeToNewDAGSuccess(clocks: Map[ProcessorId, TimeStamp])
+
+ case class StoredStartClock(clock: TimeStamp)
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala
new file mode 100644
index 0000000..3341d4f
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.appmaster
+
+import akka.actor.{Actor, ActorRef, Stash}
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.partitioner.PartitionerDescription
+import org.apache.gearpump.streaming._
+import org.apache.gearpump.streaming.appmaster.DagManager._
+import org.apache.gearpump.streaming.storage.AppDataStore
+import org.apache.gearpump.streaming.task.Subscriber
+import org.apache.gearpump.util.{Graph, LogUtil}
+import org.slf4j.Logger
+
+import scala.concurrent.Future
+
+/**
+ * Handles dag modification and other stuff related with DAG
+ *
+ * DagManager maintains multiple version of DAGs. For each version, the DAG is immutable.
+ * For operations like modifying a processor, it will create a new version of DAG.
+ */
+class DagManager(appId: Int, userConfig: UserConfig, store: AppDataStore, dag: Option[DAG])
+ extends Actor with Stash {
+
+ import context.dispatcher
+ private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
+ private val NOT_INITIALIZED = -1
+
+ private var dags = List.empty[DAG]
+ private var maxProcessorId = -1
+ private implicit val system = context.system
+
+ private var watchers = List.empty[ActorRef]
+
+ override def receive: Receive = null
+
+ override def preStart(): Unit = {
+ LOG.info("Initializing Dag Service, get stored Dag ....")
+ store.get(StreamApplication.DAG).asInstanceOf[Future[DAG]].map { storedDag =>
+ if (storedDag != null) {
+ dags :+= storedDag
+ } else {
+ dags :+= dag.getOrElse(DAG(userConfig.getValue[Graph[ProcessorDescription,
+ PartitionerDescription]](StreamApplication.DAG).get))
+ }
+ maxProcessorId = {
+ val keys = dags.head.processors.keys
+ if (keys.size == 0) {
+ 0
+ } else {
+ keys.max
+ }
+ }
+ self ! DagInitiated
+ }
+ context.become(waitForDagInitiate)
+ }
+
+ def waitForDagInitiate: Receive = {
+ case DagInitiated =>
+ unstashAll()
+ context.become(dagService)
+ case _ =>
+ stash()
+ }
+
+ private def nextProcessorId: ProcessorId = {
+ maxProcessorId += 1
+ maxProcessorId
+ }
+
+ private def taskLaunchData(dag: DAG, processorId: Int, context: AnyRef): TaskLaunchData = {
+ val processorDescription = dag.processors(processorId)
+ val subscribers = Subscriber.of(processorId, dag)
+ TaskLaunchData(processorDescription, subscribers, context)
+ }
+
+ def dagService: Receive = {
+ case GetLatestDAG =>
+ // Get the latest version of DAG.
+ sender ! LatestDAG(dags.last)
+ case GetTaskLaunchData(version, processorId, context) =>
+ // Task information like Processor class, downstream subscriber processors and etc.
+ dags.find(_.version == version).foreach { dag =>
+ LOG.info(s"Get task launcher data for processor: $processorId, dagVersion: $version")
+ sender ! taskLaunchData(dag, processorId, context)
+ }
+ case ReplaceProcessor(oldProcessorId, inputNewProcessor) =>
+ // Replace a processor with new implementation. The upstream processors and downstream
+ // processors are NOT changed.
+ var newProcessor = inputNewProcessor.copy(id = nextProcessorId)
+ if (inputNewProcessor.jar == null) {
+ val oldJar = dags.last.processors.get(oldProcessorId).get
+ newProcessor = newProcessor.copy(jar = oldJar.jar)
+ }
+ if (dags.length > 1) {
+ sender ! DAGOperationFailed(
+ "We are in the process of handling previous dynamic dag change")
+ } else {
+ val oldDAG = dags.last
+ val newVersion = oldDAG.version + 1
+ val newDAG = replaceDAG(oldDAG, oldProcessorId, newProcessor, newVersion)
+ dags :+= newDAG
+
+ LOG.info(s"ReplaceProcessor old: $oldProcessorId, new: $newProcessor")
+ LOG.info(s"new DAG: $newDAG")
+ watchers.foreach(_ ! LatestDAG(newDAG))
+ sender ! DAGOperationSuccess
+ }
+
+ case WatchChange(watcher) =>
+ // Checks whether there are modifications for this DAG.
+ if (!this.watchers.contains(watcher)) {
+ this.watchers :+= watcher
+ }
+
+ case NewDAGDeployed(dagVersion) =>
+ // Means dynamic Dag transition completed, and the new DAG version has been successfully
+ // deployed. The obsolete dag versions will be removed.
+ if (dagVersion != NOT_INITIALIZED) {
+ dags = dags.filter(_.version == dagVersion)
+ store.put(StreamApplication.DAG, dags.last)
+ }
+ }
+
+ private def replaceDAG(
+ dag: DAG, oldProcessorId: ProcessorId, newProcessor: ProcessorDescription, newVersion: Int)
+ : DAG = {
+ val oldProcessorLife = LifeTime(dag.processors(oldProcessorId).life.birth,
+ newProcessor.life.birth)
+
+ val newProcessorMap = dag.processors ++
+ Map(oldProcessorId -> dag.processors(oldProcessorId).copy(life = oldProcessorLife),
+ newProcessor.id -> newProcessor)
+
+ val newGraph = dag.graph.subGraph(oldProcessorId).
+ replaceVertex(oldProcessorId, newProcessor.id).addGraph(dag.graph)
+ new DAG(newVersion, newProcessorMap, newGraph)
+ }
+}
+
+object DagManager {
+ case object DagInitiated
+
+ case class WatchChange(watcher: ActorRef)
+
+ case object GetLatestDAG
+ case class LatestDAG(dag: DAG)
+
+ case class GetTaskLaunchData(dagVersion: Int, processorId: Int, context: AnyRef = null)
+ case class TaskLaunchData(processorDescription : ProcessorDescription,
+ subscribers: List[Subscriber], context: AnyRef = null)
+
+ sealed trait DAGOperation
+
+ case class ReplaceProcessor(oldProcessorId: ProcessorId,
+ newProcessorDescription: ProcessorDescription) extends DAGOperation
+
+ sealed trait DAGOperationResult
+ case object DAGOperationSuccess extends DAGOperationResult
+ case class DAGOperationFailed(reason: String) extends DAGOperationResult
+
+ case class NewDAGDeployed(dagVersion: Int)
+}
\ No newline at end of file