You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:18 UTC

[08/49] incubator-gearpump git commit: fix GEARPUMP-118 change package name to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/task/TaskControlMessage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskControlMessage.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskControlMessage.scala
deleted file mode 100644
index d89387a..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskControlMessage.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.task
-
-import io.gearpump.TimeStamp
-import io.gearpump.streaming.ProcessorId
-
-/*
- * Initial AckRequest
- */
-case class InitialAckRequest(taskId: TaskId, sessionId: Int)
-
-/*
-  Here the sessionId filed is used to distinguish messages
-    between different replays after the application restart
- */
-case class AckRequest(taskId: TaskId, seq: Short, sessionId: Int)
-
-/**
- * Ack back to sender task actor.
- *
- * @param seq The seq field represents the expected number of received messages and the
- *            actualReceivedNum field means the actual received number since start.
- */
-case class Ack(taskId: TaskId, seq: Short, actualReceivedNum: Short, sessionId: Int)
-
-sealed trait ClockEvent
-
-case class UpdateClock(taskId: TaskId, time: TimeStamp) extends ClockEvent
-
-object GetLatestMinClock extends ClockEvent
-
-case class GetUpstreamMinClock(taskId: TaskId) extends ClockEvent
-
-case class UpstreamMinClock(latestMinClock: TimeStamp)
-
-case class LatestMinClock(clock: TimeStamp)
-
-case class ReportCheckpointClock(taskId: TaskId, clock: TimeStamp)
-
-case object GetCheckpointClock
-
-case class CheckpointClock(clock: Option[TimeStamp])
-
-case object GetStartClock
-
-case class StartClock(clock: TimeStamp)
-
-/** Probe the latency between two upstream to downstream tasks. */
-case class LatencyProbe(timestamp: Long)
-
-case class SendMessageLoss()
-
-case object GetDAG
-
-case class CheckProcessorDeath(processorId: ProcessorId)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/task/TaskId.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskId.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskId.scala
deleted file mode 100644
index 66c3c52..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskId.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.task
-
-import io.gearpump.streaming._
-
-case class TaskId(processorId: ProcessorId, index: TaskIndex)
-
-object TaskId {
-  def toLong(id: TaskId): Long = (id.processorId.toLong << 32) + id.index
-  def fromLong(id: Long): TaskId = TaskId(((id >> 32) & 0xFFFFFFFF).toInt, (id & 0xFFFFFFFF).toInt)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/task/TaskMessageSerializer.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskMessageSerializer.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskMessageSerializer.scala
deleted file mode 100644
index 500f8b3..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskMessageSerializer.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.task
-
-import java.io.{DataInput, DataOutput}
-
-trait TaskMessageSerializer[T] {
-  def write(dataOutput: DataOutput, obj: T)
-
-  def read(dataInput: DataInput): T
-
-  def getLength(obj: T): Int
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/task/TaskUtil.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskUtil.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskUtil.scala
deleted file mode 100644
index 040fc2e..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskUtil.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.task
-
-object TaskUtil {
-
-  /**
-   * Resolves a classname to a Task class.
-   *
-   * @param className  the class name to resolve
-   * @return resolved class
-   */
-  def loadClass(className: String): Class[_ <: Task] = {
-    val loader = Thread.currentThread().getContextClassLoader()
-    loader.loadClass(className).asSubclass(classOf[Task])
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/task/TaskWrapper.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskWrapper.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskWrapper.scala
deleted file mode 100644
index e7e883c..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskWrapper.scala
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.task
-
-import scala.concurrent.duration.FiniteDuration
-
-import akka.actor.Actor._
-import akka.actor.{ActorRef, ActorSystem, Cancellable, Props}
-import org.slf4j.Logger
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.util.LogUtil
-import io.gearpump.{Message, TimeStamp}
-
-/**
- * This provides TaskContext for user defined tasks
- *
- * @param taskClass task class
- * @param context context class
- * @param userConf user config
- */
-class TaskWrapper(
-    val taskId: TaskId, taskClass: Class[_ <: Task], context: TaskContextData,
-    userConf: UserConfig) extends TaskContext with TaskInterface {
-
-  private val LOG = LogUtil.getLogger(taskClass, task = taskId)
-
-  private var actor: TaskActor = null
-
-  private var task: Option[Task] = None
-
-  def setTaskActor(actor: TaskActor): Unit = this.actor = actor
-
-  override def appId: Int = context.appId
-
-  override def appName: String = context.appName
-
-  override def executorId: Int = context.executorId
-
-  override def parallelism: Int = context.parallelism
-
-  override def appMaster: ActorRef = context.appMaster
-
-  override def output(msg: Message): Unit = actor.output(msg)
-
-  /**
-   * See [[io.gearpump.streaming.task.TaskActor]] output(arrayIndex: Int, msg: Message): Unit
-   *
-   * @param index, not same as ProcessorId
-   */
-  def output(index: Int, msg: Message): Unit = actor.output(index, msg)
-
-  /**
-   * Use with caution, output unmanaged message to target tasks
-   *
-   * @param msg  message to output
-   * @param tasks  the tasks to output to
-   */
-  def outputUnManaged(msg: AnyRef, tasks: TaskId*): Unit = {
-    actor.transport(msg, tasks: _*)
-  }
-
-  override def self: ActorRef = actor.context.self
-
-  override def sender: ActorRef = actor.context.sender()
-
-  def system: ActorSystem = actor.context.system
-
-  override def actorOf(props: Props): ActorRef = actor.context.actorOf(props)
-
-  override def actorOf(props: Props, name: String): ActorRef = actor.context.actorOf(props, name)
-
-  override def onStart(startTime: StartTime): Unit = {
-    if (None != task) {
-      LOG.error(s"Task.onStart should not be called multiple times... ${task.getClass}")
-    }
-    val constructor = taskClass.getConstructor(classOf[TaskContext], classOf[UserConfig])
-    task = Some(constructor.newInstance(this, userConf))
-    task.foreach(_.onStart(startTime))
-  }
-
-  override def onNext(msg: Message): Unit = task.foreach(_.onNext(msg))
-
-  override def onStop(): Unit = {
-    task.foreach(_.onStop())
-    task = None
-  }
-
-  override def receiveUnManagedMessage: Receive = {
-    task.map(_.receiveUnManagedMessage).getOrElse(defaultMessageHandler)
-  }
-
-  override def upstreamMinClock: TimeStamp = {
-    actor.getUpstreamMinClock
-  }
-
-  def schedule(initialDelay: FiniteDuration, interval: FiniteDuration)(f: => Unit): Cancellable = {
-    val dispatcher = actor.context.system.dispatcher
-    actor.context.system.scheduler.schedule(initialDelay, interval)(f)(dispatcher)
-  }
-
-  def scheduleOnce(initialDelay: FiniteDuration)(f: => Unit): Cancellable = {
-    val dispatcher = actor.context.system.dispatcher
-    actor.context.system.scheduler.scheduleOnce(initialDelay)(f)(dispatcher)
-  }
-
-  private def defaultMessageHandler: Receive = {
-    case msg =>
-      LOG.error("Failed! Received unknown message " + "taskId: " + taskId + ", " + msg.toString)
-  }
-
-  /**
-   * Logger is environment dependant, it should be provided by
-   * containing environment.
-   */
-  override def logger: Logger = LOG
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/transaction/api/CheckpointStore.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/CheckpointStore.scala b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/CheckpointStore.scala
deleted file mode 100644
index f3894ea..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/CheckpointStore.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.transaction.api
-
-import io.gearpump.TimeStamp
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.task.TaskContext
-
-/**
- * CheckpointStore persistently stores mapping of timestamp to checkpoint
- * it's possible that two checkpoints have the same timestamp
- * CheckpointStore needs to handle this either during write or read
- */
-trait CheckpointStore {
-
-  def persist(timeStamp: TimeStamp, checkpoint: Array[Byte]): Unit
-
-  def recover(timestamp: TimeStamp): Option[Array[Byte]]
-
-  def close(): Unit
-}
-
-trait CheckpointStoreFactory extends java.io.Serializable {
-  def getCheckpointStore(conf: UserConfig, taskContext: TaskContext): CheckpointStore
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/transaction/api/MessageDecoder.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/MessageDecoder.scala b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/MessageDecoder.scala
deleted file mode 100644
index 7039b71..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/MessageDecoder.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.transaction.api
-
-import io.gearpump.Message
-
-/**
- * MessageDecoder decodes raw bytes to Message It is usually written by end user and
- * passed into TimeReplayableSource
- */
-trait MessageDecoder extends java.io.Serializable {
-  def fromBytes(bytes: Array[Byte]): Message
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetManager.scala b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetManager.scala
deleted file mode 100644
index 412ddcc..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetManager.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.transaction.api
-
-import scala.util.Try
-
-import io.gearpump.{Message, TimeStamp}
-
-/**
- * Filters offsets and store the mapping from timestamp to offset
- */
-trait MessageFilter {
-  def filter(messageAndOffset: (Message, Long)): Option[Message]
-}
-
-/**
- * Resolves timestamp to offset by look up the underlying storage
- */
-trait OffsetTimeStampResolver {
-  def resolveOffset(time: TimeStamp): Try[Long]
-}
-
-/**
- * Manages message's offset on TimeReplayableSource and timestamp
- */
-trait OffsetManager extends MessageFilter with OffsetTimeStampResolver {
-  def close(): Unit
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetStorage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetStorage.scala b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetStorage.scala
deleted file mode 100644
index fa7161c..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetStorage.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.transaction.api
-
-import scala.util.Try
-
-import io.gearpump.TimeStamp
-
-object OffsetStorage {
-
-  /**
-   * StorageEmpty means no data has been stored
-   */
-  case object StorageEmpty extends Throwable
-
-  /**
-   * Overflow means the looked up time is
-   * larger than the maximum stored TimeStamp
-   */
-  case class Overflow(maxTimestamp: Array[Byte]) extends Throwable
-
-  /**
-   * Underflow means the looked up time is
-   * smaller than the minimum stored TimeStamp
-   */
-  case class Underflow(minTimestamp: Array[Byte]) extends Throwable
-}
-
-/**
- * OffsetStorage stores the mapping from TimeStamp to Offset
- */
-trait OffsetStorage {
-  /**
-   * Tries to look up the time in the OffsetStorage return the corresponding Offset if the time is
-   * in the range of stored TimeStamps or one of the failure info (StorageEmpty, Overflow,
-   * Underflow)
-   *
-   * @param time the time to look for
-   * @return the corresponding offset if the time is in the range, otherwise failure
-   */
-  def lookUp(time: TimeStamp): Try[Array[Byte]]
-
-  def append(time: TimeStamp, offset: Array[Byte]): Unit
-
-  def close(): Unit
-}
-
-trait OffsetStorageFactory extends java.io.Serializable {
-  def getOffsetStorage(dir: String): OffsetStorage
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeReplayableSource.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeReplayableSource.scala b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeReplayableSource.scala
deleted file mode 100644
index 50711ee..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeReplayableSource.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.transaction.api
-
-import io.gearpump.streaming.source.DataSource
-
-/**
- * AT-LEAST-ONCE API. Represents a data source which allow replaying.
- *
- * Subclass should be able to replay messages on recovery from the time
- * when an application crashed.
- */
-trait TimeReplayableSource extends DataSource
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeStampFilter.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeStampFilter.scala b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeStampFilter.scala
deleted file mode 100644
index 7c34e1a..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeStampFilter.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.transaction.api
-
-import io.gearpump.{Message, TimeStamp}
-
-/**
- * TimeStampFilter filters out messages that are obsolete.
- */
-trait TimeStampFilter extends java.io.Serializable {
-  def filter(msg: Message, predicate: TimeStamp): Option[Message]
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/util/ActorPathUtil.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/util/ActorPathUtil.scala b/streaming/src/main/scala/io/gearpump/streaming/util/ActorPathUtil.scala
deleted file mode 100644
index c2ac32a..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/util/ActorPathUtil.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.util
-
-import akka.actor.{ActorPath, ActorRef}
-
-import io.gearpump.streaming.task.TaskId
-
-object ActorPathUtil {
-
-  def executorActorName(executorId: Int): String = executorId.toString
-
-  def taskActorName(taskId: TaskId): String = {
-    s"processor_${taskId.processorId}_task_${taskId.index}"
-  }
-
-  def taskActorPath(appMaster: ActorRef, executorId: Int, taskId: TaskId): ActorPath = {
-    val executorManager = appMaster.path.child(executorManagerActorName)
-    val executor = executorManager.child(executorActorName(executorId))
-    val task = executor.child(taskActorName(taskId))
-    task
-  }
-
-  def executorManagerActorName: String = "executors"
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala
new file mode 100644
index 0000000..4b801a2
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming
+
+import scala.language.existentials
+
+import akka.actor.ActorRef
+
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.cluster.appmaster.WorkerInfo
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.streaming.appmaster.TaskRegistry.TaskLocations
+import org.apache.gearpump.streaming.task.{Subscriber, TaskId}
+import org.apache.gearpump.transport.HostPort
+
+object AppMasterToExecutor {
+  case class LaunchTasks(
+      taskId: List[TaskId], dagVersion: Int, processorDescription: ProcessorDescription,
+      subscribers: List[Subscriber])
+
+  case object TasksLaunched
+
+  /**
+   * dagVersion, life, and subscribers will be changed on target task list.
+   */
+  case class ChangeTasks(
+      taskId: List[TaskId], dagVersion: Int, life: LifeTime, subscribers: List[Subscriber])
+
+  case class TasksChanged(taskIds: List[TaskId])
+
+  case class ChangeTask(
+      taskId: TaskId, dagVersion: Int, life: LifeTime, subscribers: List[Subscriber])
+
+  case class TaskChanged(taskId: TaskId, dagVersion: Int)
+
+  case class StartTask(taskId: TaskId)
+
+  case class StopTask(taskId: TaskId)
+
+  case class TaskLocationsReady(taskLocations: TaskLocations, dagVersion: Int)
+
+  case class TaskLocationsReceived(dagVersion: Int, executorId: ExecutorId)
+
+  case class TaskLocationsRejected(
+      dagVersion: Int, executorId: ExecutorId, reason: String, ex: Throwable)
+
+  case class StartAllTasks(dagVersion: Int)
+
+  case class StartDynamicDag(dagVersion: Int)
+  case class TaskRegistered(taskId: TaskId, sessionId: Int, startClock: TimeStamp)
+  case class TaskRejected(taskId: TaskId)
+
+  case object RestartClockService
+  class MsgLostException extends Exception
+}
+
+object ExecutorToAppMaster {
+  case class RegisterExecutor(
+      executor: ActorRef, executorId: Int, resource: Resource, worker : WorkerInfo)
+
+  case class RegisterTask(taskId: TaskId, executorId: Int, task: HostPort)
+  case class UnRegisterTask(taskId: TaskId, executorId: Int)
+
+  case class MessageLoss(executorId: Int, taskId: TaskId, cause: String)
+}
+
+object AppMasterToMaster {
+  case class StallingTasks(tasks: List[TaskId])
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
new file mode 100644
index 0000000..445f26c
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming
+
+object Constants {
+
+  val GEARPUMP_STREAMING_OPERATOR = "gearpump.streaming.dsl.operator"
+  val GEARPUMP_STREAMING_SOURCE = "gearpump.streaming.dsl.source"
+  val GEARPUMP_STREAMING_SINK = "gearpump.streaming.dsl.sink"
+  val GEARPUMP_STREAMING_GROUPBY_FUNCTION = "gearpump.streaming.dsl.groupby-function"
+
+  val GEARPUMP_STREAMING_LOCALITIES = "gearpump.streaming.localities"
+
+  val GEARPUMP_STREAMING_REGISTER_TASK_TIMEOUT_MS = "gearpump.streaming.register-task-timeout-ms"
+
+  val GEARPUMP_STREAMING_MAX_PENDING_MESSAGE_COUNT =
+    "gearpump.streaming.max-pending-message-count-per-connection"
+
+  val GEARPUMP_STREAMING_ACK_ONCE_EVERY_MESSAGE_COUNT =
+    "gearpump.streaming.ack-once-every-message-count"
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala
new file mode 100644
index 0000000..4a94ad3
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming
+
+import org.apache.gearpump.partitioner.PartitionerDescription
+import org.apache.gearpump.streaming.task.TaskId
+import org.apache.gearpump.util.Graph
+
+/**
+ * DAG is wrapper for [[org.apache.gearpump.util.Graph]] for streaming applications.
+ */
+case class DAG(version: Int, processors : Map[ProcessorId, ProcessorDescription],
+    graph : Graph[ProcessorId, PartitionerDescription])
+  extends Serializable {
+
+  def isEmpty: Boolean = {
+    processors.isEmpty
+  }
+
+  def taskCount: Int = {
+    processors.foldLeft(0) { (count, task) =>
+      count + task._2.parallelism
+    }
+  }
+
+  def tasks: List[TaskId] = {
+    processors.flatMap { pair =>
+      val (processorId, processor) = pair
+      (0 until processor.parallelism).map(TaskId(processorId, _))
+    }.toList
+  }
+}
+
+object DAG {
+  def apply(graph: Graph[ProcessorDescription, PartitionerDescription], version: Int = 0): DAG = {
+    val processors = graph.vertices.map { processorDescription =>
+      (processorDescription.id, processorDescription)
+    }.toMap
+    val dag = graph.mapVertex { processor =>
+      processor.id
+    }
+    new DAG(version, processors, dag)
+  }
+
+  def empty: DAG = apply(Graph.empty[ProcessorDescription, PartitionerDescription])
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/MessageSerializer.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/MessageSerializer.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/MessageSerializer.scala
new file mode 100644
index 0000000..20e2529
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/MessageSerializer.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming
+
+import java.io.{DataInput, DataOutput}
+
+import org.apache.gearpump.streaming.task._
+
+class TaskIdSerializer extends TaskMessageSerializer[TaskId] {
+  override def getLength(obj: TaskId): Int = 8
+
+  override def write(dataOutput: DataOutput, obj: TaskId): Unit = {
+    dataOutput.writeInt(obj.processorId)
+    dataOutput.writeInt(obj.index)
+  }
+
+  override def read(dataInput: DataInput): TaskId = {
+    val processorId = dataInput.readInt()
+    val index = dataInput.readInt()
+    new TaskId(processorId, index)
+  }
+}
+
+class AckSerializer extends TaskMessageSerializer[Ack] {
+  val taskIdSerializer = new TaskIdSerializer
+
+  override def getLength(obj: Ack): Int = taskIdSerializer.getLength(obj.taskId) + 8
+
+  override def write(dataOutput: DataOutput, obj: Ack): Unit = {
+    taskIdSerializer.write(dataOutput, obj.taskId)
+    dataOutput.writeShort(obj.seq)
+    dataOutput.writeShort(obj.actualReceivedNum)
+    dataOutput.writeInt(obj.sessionId)
+  }
+
+  override def read(dataInput: DataInput): Ack = {
+    val taskId = taskIdSerializer.read(dataInput)
+    val seq = dataInput.readShort()
+    val actualReceivedNum = dataInput.readShort()
+    val sessionId = dataInput.readInt()
+    Ack(taskId, seq, actualReceivedNum, sessionId)
+  }
+}
+
+class InitialAckRequestSerializer extends TaskMessageSerializer[InitialAckRequest] {
+  val taskIdSerialzer = new TaskIdSerializer()
+
+  override def getLength(obj: InitialAckRequest): Int = taskIdSerialzer.getLength(obj.taskId) + 4
+
+  override def write(dataOutput: DataOutput, obj: InitialAckRequest): Unit = {
+    taskIdSerialzer.write(dataOutput, obj.taskId)
+    dataOutput.writeInt(obj.sessionId)
+  }
+
+  override def read(dataInput: DataInput): InitialAckRequest = {
+    val taskId = taskIdSerialzer.read(dataInput)
+    val sessionId = dataInput.readInt()
+    InitialAckRequest(taskId, sessionId)
+  }
+}
+
+class AckRequestSerializer extends TaskMessageSerializer[AckRequest] {
+  val taskIdSerializer = new TaskIdSerializer
+
+  override def getLength(obj: AckRequest): Int = taskIdSerializer.getLength(obj.taskId) + 6
+
+  override def write(dataOutput: DataOutput, obj: AckRequest): Unit = {
+    taskIdSerializer.write(dataOutput, obj.taskId)
+    dataOutput.writeShort(obj.seq)
+    dataOutput.writeInt(obj.sessionId)
+  }
+
+  override def read(dataInput: DataInput): AckRequest = {
+    val taskId = taskIdSerializer.read(dataInput)
+    val seq = dataInput.readShort()
+    val sessionId = dataInput.readInt()
+    AckRequest(taskId, seq, sessionId)
+  }
+}
+
+class LatencyProbeSerializer extends TaskMessageSerializer[LatencyProbe] {
+  override def getLength(obj: LatencyProbe): Int = 8
+
+  override def write(dataOutput: DataOutput, obj: LatencyProbe): Unit = {
+    dataOutput.writeLong(obj.timestamp)
+  }
+
+  override def read(dataInput: DataInput): LatencyProbe = {
+    val timestamp = dataInput.readLong()
+    LatencyProbe(timestamp)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
new file mode 100644
index 0000000..66ec873
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming
+
+import scala.language.implicitConversions
+import scala.reflect.ClassTag
+
+import akka.actor.ActorSystem
+
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.cluster._
+import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner, PartitionerDescription, PartitionerObject}
+import org.apache.gearpump.streaming.appmaster.AppMaster
+import org.apache.gearpump.streaming.task.Task
+import org.apache.gearpump.util.{Graph, LogUtil, ReferenceEqual}
+
+/**
+ * Processor is the blueprint for tasks.
+ */
+trait Processor[+T <: Task] extends ReferenceEqual {
+
+  /**
+   * How many tasks you want to use for this processor.
+   */
+  def parallelism: Int
+
+  /**
+   * The custom [[org.apache.gearpump.cluster.UserConfig]], it is used to
+   * initialize a task in runtime.
+   */
+  def taskConf: UserConfig
+
+  /**
+   * Some description text for this processor.
+   */
+  def description: String
+
+  /**
+   * The task class, should be a subtype of Task.
+   *
+   * Each runtime instance of this class is a task.
+   */
+  def taskClass: Class[_ <: Task]
+}
+
+object Processor {
+  def ProcessorToProcessorDescription(id: ProcessorId, processor: Processor[_ <: Task])
+    : ProcessorDescription = {
+    import processor._
+    ProcessorDescription(id, taskClass.getName, parallelism, description, taskConf)
+  }
+
+  def apply[T<: Task](
+      parallelism : Int, description: String = "",
+      taskConf: UserConfig = UserConfig.empty)(implicit classtag: ClassTag[T])
+    : DefaultProcessor[T] = {
+    new DefaultProcessor[T](parallelism, description, taskConf,
+      classtag.runtimeClass.asInstanceOf[Class[T]])
+  }
+
+  def apply[T<: Task](
+      taskClazz: Class[T], parallelism : Int, description: String, taskConf: UserConfig)
+    : DefaultProcessor[T] = {
+    new DefaultProcessor[T](parallelism, description, taskConf, taskClazz)
+  }
+
+  case class DefaultProcessor[T<: Task](
+      parallelism : Int, description: String, taskConf: UserConfig, taskClass: Class[T])
+    extends Processor[T] {
+
+    def withParallelism(parallel: Int): DefaultProcessor[T] = {
+      new DefaultProcessor[T](parallel, description, taskConf, taskClass)
+    }
+
+    def withDescription(desc: String): DefaultProcessor[T] = {
+      new DefaultProcessor[T](parallelism, desc, taskConf, taskClass)
+    }
+
+    def withConfig(conf: UserConfig): DefaultProcessor[T] = {
+      new DefaultProcessor[T](parallelism, description, conf, taskClass)
+    }
+  }
+}
+
+/**
+ * Each processor has a LifeTime.
+ *
+ * When input message's timestamp is beyond current processor's lifetime,
+ * then it will not be processed by this processor.
+ */
+case class LifeTime(birth: TimeStamp, death: TimeStamp) {
+  def contains(timestamp: TimeStamp): Boolean = {
+    timestamp >= birth && timestamp < death
+  }
+
+  def cross(another: LifeTime): LifeTime = {
+    LifeTime(Math.max(birth, another.birth), Math.min(death, another.death))
+  }
+}
+
+object LifeTime {
+  val Immortal = LifeTime(0L, Long.MaxValue)
+}
+
+/**
+ * Represent a streaming application
+ */
+class StreamApplication(
+    override val name: String, val inputUserConfig: UserConfig,
+    val dag: Graph[ProcessorDescription, PartitionerDescription])
+  extends Application {
+
+  require(!dag.hasDuplicatedEdge(), "Graph should not have duplicated edges")
+
+  override def appMaster: Class[_ <: ApplicationMaster] = classOf[AppMaster]
+  override def userConfig(implicit system: ActorSystem): UserConfig = {
+    inputUserConfig.withValue(StreamApplication.DAG, dag)
+  }
+}
+
+case class ProcessorDescription(
+    id: ProcessorId,
+    taskClass: String,
+    parallelism : Int,
+    description: String = "",
+    taskConf: UserConfig = null,
+    life: LifeTime = LifeTime.Immortal,
+    jar: AppJar = null) extends ReferenceEqual
+
+object StreamApplication {
+
+  private val hashPartitioner = new HashPartitioner()
+  private val LOG = LogUtil.getLogger(getClass)
+
+  def apply[T <: Processor[Task], P <: Partitioner](
+      name: String, dag: Graph[T, P], userConfig: UserConfig): StreamApplication = {
+    import org.apache.gearpump.streaming.Processor._
+
+    if (dag.hasCycle()) {
+      LOG.warn(s"Detected cycles in DAG of application $name!")
+    }
+
+    val indices = dag.topologicalOrderWithCirclesIterator.toList.zipWithIndex.toMap
+    val graph = dag.mapVertex { processor =>
+      val updatedProcessor = ProcessorToProcessorDescription(indices(processor), processor)
+      updatedProcessor
+    }.mapEdge { (node1, edge, node2) =>
+      PartitionerDescription(new PartitionerObject(
+        Option(edge).getOrElse(StreamApplication.hashPartitioner)))
+    }
+    new StreamApplication(name, userConfig, graph)
+  }
+
+  val DAG = "DAG"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
new file mode 100644
index 0000000..7c08b9b
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.appmaster
+
+import java.lang.management.ManagementFactory
+
+import akka.actor._
+import org.apache.gearpump._
+import org.apache.gearpump.cluster.ClientToMaster.{GetLastFailure, GetStallingTasks, QueryHistoryMetrics, ShutdownApplication}
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterDataDetailRequest, ReplayFromTimestampWindowTrailingEdge}
+import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, LastFailure}
+import org.apache.gearpump.cluster._
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.metrics.Metrics.ReportMetrics
+import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService}
+import org.apache.gearpump.partitioner.PartitionerDescription
+import org.apache.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, RegisterExecutor, RegisterTask, UnRegisterTask}
+import org.apache.gearpump.streaming._
+import org.apache.gearpump.streaming.appmaster.AppMaster._
+import org.apache.gearpump.streaming.appmaster.DagManager.{GetLatestDAG, LatestDAG, ReplaceProcessor}
+import org.apache.gearpump.streaming.appmaster.ExecutorManager.{ExecutorInfo, GetExecutorInfo}
+import org.apache.gearpump.streaming.appmaster.TaskManager.{FailedToRecover, GetTaskList, TaskList}
+import org.apache.gearpump.streaming.executor.Executor.{ExecutorConfig, ExecutorSummary, GetExecutorSummary, QueryExecutorConfig}
+import org.apache.gearpump.streaming.storage.InMemoryAppStoreOnMaster
+import org.apache.gearpump.streaming.task._
+import org.apache.gearpump.streaming.util.ActorPathUtil
+import org.apache.gearpump.util.Constants.{APPMASTER_DEFAULT_EXECUTOR_ID, _}
+import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
+import org.apache.gearpump.util._
+import org.slf4j.Logger
+
+import scala.concurrent.Future
+
+/**
+ * AppMaster is the head of a streaming application.
+ *
+ * It contains:
+ *  1. ExecutorManager to manage all executors.
+ *  2. TaskManager to manage all tasks,
+ *  3. ClockService to track the global clock for this streaming application.
+ *  4. Scheduler to decide which a task should be scheduled to.
+ */
+class AppMaster(appContext: AppMasterContext, app: AppDescription) extends ApplicationMaster {
+  import app.userConfig
+  import appContext.{appId, masterProxy, username}
+
+  private implicit val actorSystem = context.system
+  private implicit val timeOut = FUTURE_TIMEOUT
+
+  import akka.pattern.ask
+  private implicit val dispatcher = context.dispatcher
+
+  private val startTime: TimeStamp = System.currentTimeMillis()
+
+  private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
+  LOG.info(s"AppMaster[$appId] is launched by $username, app: $app xxxxxxxxxxxxxxxxx")
+  LOG.info(s"AppMaster actor path: ${ActorUtil.getFullPath(context.system, self.path)}")
+
+  private val address = ActorUtil.getFullPath(context.system, self.path)
+
+  private val store = new InMemoryAppStoreOnMaster(appId, appContext.masterProxy)
+  private val dagManager = context.actorOf(Props(new DagManager(appContext.appId, userConfig, store,
+    Some(getUpdatedDAG()))))
+
+  private var taskManager: Option[ActorRef] = None
+  private var clockService: Option[ActorRef] = None
+  private val systemConfig = context.system.settings.config
+  private var lastFailure = LastFailure(0L, null)
+
+  private val appMasterBrief = ExecutorBrief(APPMASTER_DEFAULT_EXECUTOR_ID,
+    self.path.toString,
+    Option(appContext.workerInfo).map(_.workerId).getOrElse(WorkerId.unspecified), "active")
+
+  private val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig)
+
+  private val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED)
+
+  private val userDir = System.getProperty("user.dir")
+  private val logFile = LogUtil.applicationLogDir(actorSystem.settings.config)
+
+  private val appMasterExecutorSummary = ExecutorSummary(
+    APPMASTER_DEFAULT_EXECUTOR_ID,
+    Option(appContext.workerInfo).map(_.workerId).getOrElse(WorkerId.unspecified),
+    self.path.toString,
+    logFile.getAbsolutePath,
+    status = "Active",
+    taskCount = 0,
+    tasks = Map.empty[ProcessorId, List[TaskId]],
+    jvmName = ManagementFactory.getRuntimeMXBean().getName()
+  )
+
+  private val historyMetricsService = if (metricsEnabled) {
+    // Registers jvm metrics
+    Metrics(context.system).register(new JvmMetricsSet(
+      s"app${appId}.executor${APPMASTER_DEFAULT_EXECUTOR_ID}"))
+
+    val historyMetricsService = context.actorOf(Props(new HistoryMetricsService(
+      s"app$appId", getHistoryMetricsConfig)))
+
+    val metricsReportService = context.actorOf(Props(
+      new MetricsReporterService(Metrics(context.system))))
+    historyMetricsService.tell(ReportMetrics, metricsReportService)
+
+    Some(historyMetricsService)
+  } else {
+    None
+  }
+
+  private val executorManager: ActorRef =
+    context.actorOf(ExecutorManager.props(userConfig, appContext, app.clusterConfig, app.name),
+      ActorPathUtil.executorManagerActorName)
+
+  for (dag <- getDAG) {
+    clockService = Some(context.actorOf(Props(new ClockService(dag, store))))
+    val jarScheduler = new JarScheduler(appId, app.name, systemConfig, context)
+
+    taskManager = Some(context.actorOf(Props(new TaskManager(appContext.appId, dagManager,
+      jarScheduler, executorManager, clockService.get, self, app.name))))
+  }
+
+  override def receive: Receive = {
+    taskMessageHandler orElse
+      executorMessageHandler orElse
+      recover orElse
+      appMasterService orElse
+      ActorUtil.defaultMsgHandler(self)
+  }
+
+  /** Handles messages from Tasks */
+  def taskMessageHandler: Receive = {
+    case clock: ClockEvent =>
+      taskManager.foreach(_ forward clock)
+    case register: RegisterTask =>
+      taskManager.foreach(_ forward register)
+    case unRegister: UnRegisterTask =>
+      taskManager.foreach(_ forward unRegister)
+      // Checks whether this processor dead, if it is, then we should remove it from clockService.
+      clockService.foreach(_ forward CheckProcessorDeath(unRegister.taskId.processorId))
+    case replay: ReplayFromTimestampWindowTrailingEdge =>
+      taskManager.foreach(_ forward replay)
+    case messageLoss: MessageLoss =>
+      lastFailure = LastFailure(System.currentTimeMillis(), messageLoss.cause)
+      taskManager.foreach(_ forward messageLoss)
+    case lookupTask: LookupTaskActorRef =>
+      taskManager.foreach(_ forward lookupTask)
+    case checkpoint: ReportCheckpointClock =>
+      clockService.foreach(_ forward checkpoint)
+    case GetDAG =>
+      val task = sender
+      getDAG.foreach {
+        dag => task ! dag
+      }
+    case GetCheckpointClock =>
+      clockService.foreach(_ forward GetCheckpointClock)
+  }
+
+  /** Handles messages from Executors */
+  def executorMessageHandler: Receive = {
+    case register: RegisterExecutor =>
+      executorManager forward register
+    case ReportMetrics =>
+      historyMetricsService.foreach(_ forward ReportMetrics)
+  }
+
+  /** Handles messages from AppMaster */
+  def appMasterService: Receive = {
+    case appMasterDataDetailRequest: AppMasterDataDetailRequest =>
+      LOG.debug(s"AppMaster got AppMasterDataDetailRequest for $appId ")
+
+      val executorsFuture = executorBrief
+      val clockFuture = getMinClock
+      val taskFuture = getTaskList
+      val dagFuture = getDAG
+
+      val appMasterDataDetail = for {
+        executors <- executorsFuture
+        clock <- clockFuture
+        tasks <- taskFuture
+        dag <- dagFuture
+      } yield {
+        val graph = dag.graph
+
+        val executorToTasks = tasks.tasks.groupBy(_._2).mapValues {
+          _.keys.toList
+        }
+
+        val processors = dag.processors.map { kv =>
+          val processor = kv._2
+          import processor._
+          val tasks = executorToTasks.map { kv =>
+            (kv._1, TaskCount(kv._2.count(_.processorId == id)))
+          }.filter(_._2.count != 0)
+          (id,
+            ProcessorSummary(id, taskClass, parallelism, description, taskConf, life,
+              tasks.keys.toList, tasks))
+        }
+
+        StreamAppMasterSummary(
+          appId = appId,
+          appName = app.name,
+          actorPath = address,
+          clock = clock,
+          status = MasterToAppMaster.AppMasterActive,
+          startTime = startTime,
+          uptime = System.currentTimeMillis() - startTime,
+          user = username,
+          homeDirectory = userDir,
+          logFile = logFile.getAbsolutePath,
+          processors = processors,
+          processorLevels = graph.vertexHierarchyLevelMap(),
+          dag = graph.mapEdge { (node1, edge, node2) =>
+            edge.partitionerFactory.name
+          },
+          executors = executors,
+          historyMetricsConfig = getHistoryMetricsConfig
+        )
+      }
+
+      val client = sender()
+
+      appMasterDataDetail.map { appData =>
+        client ! appData
+      }
+    // TODO: WebSocket is buggy and disabled.
+    //    case appMasterMetricsRequest: AppMasterMetricsRequest =>
+    //      val client = sender()
+    //      actorSystem.eventStream.subscribe(client, classOf[MetricType])
+    case query: QueryHistoryMetrics =>
+      if (historyMetricsService.isEmpty) {
+        // Returns empty metrics so that we don't hang the UI
+        sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem])
+      } else {
+        historyMetricsService.get forward query
+      }
+    case getStalling: GetStallingTasks =>
+      clockService.foreach(_ forward getStalling)
+    case replaceDAG: ReplaceProcessor =>
+      dagManager forward replaceDAG
+    case GetLastFailure(_) =>
+      sender ! lastFailure
+    case get@GetExecutorSummary(executorId) =>
+      val client = sender
+      if (executorId == APPMASTER_DEFAULT_EXECUTOR_ID) {
+        client ! appMasterExecutorSummary
+      } else {
+        ActorUtil.askActor[Map[ExecutorId, ExecutorInfo]](executorManager, GetExecutorInfo)
+          .map { map =>
+            map.get(executorId).foreach { executor =>
+              executor.executor.tell(get, client)
+            }
+          }
+      }
+    case query@QueryExecutorConfig(executorId) =>
+      val client = sender
+      if (executorId == -1) {
+        val systemConfig = context.system.settings.config
+        sender ! ExecutorConfig(ClusterConfig.filterOutDefaultConfig(systemConfig))
+      } else {
+        ActorUtil.askActor[Map[ExecutorId, ExecutorInfo]](executorManager, GetExecutorInfo)
+          .map { map =>
+            map.get(executorId).foreach { executor =>
+              executor.executor.tell(query, client)
+            }
+          }
+      }
+  }
+
+  /** Error handling */
+  def recover: Receive = {
+    case FailedToRecover(errorMsg) =>
+      if (context.children.toList.contains(sender())) {
+        LOG.error(errorMsg)
+        masterProxy ! ShutdownApplication(appId)
+      }
+    case AllocateResourceTimeOut =>
+      LOG.error(s"Failed to allocate resource in time, shutdown application $appId")
+      masterProxy ! ShutdownApplication(appId)
+      context.stop(self)
+  }
+
+  private def getMinClock: Future[TimeStamp] = {
+    clockService match {
+      case Some(clockService) =>
+        (clockService ? GetLatestMinClock).asInstanceOf[Future[LatestMinClock]].map(_.clock)
+      case None =>
+        Future.failed(new ServiceNotAvailableException("clock service not ready"))
+    }
+  }
+
+  private def executorBrief: Future[List[ExecutorBrief]] = {
+    ActorUtil.askActor[Map[ExecutorId, ExecutorInfo]](executorManager, GetExecutorInfo)
+      .map { infos =>
+        infos.values.map { info =>
+          ExecutorBrief(info.executorId,
+            info.executor.path.toSerializationFormat,
+            info.worker.workerId,
+            "active")
+        }.toList :+ appMasterBrief
+      }
+  }
+
+  private def getTaskList: Future[TaskList] = {
+    taskManager match {
+      case Some(taskManager) =>
+        (taskManager ? GetTaskList).asInstanceOf[Future[TaskList]]
+      case None =>
+        Future.failed(new ServiceNotAvailableException("task manager not ready"))
+    }
+  }
+
+  private def getDAG: Future[DAG] = {
+    (dagManager ? GetLatestDAG).asInstanceOf[Future[LatestDAG]].map(_.dag)
+  }
+
+  private def getUpdatedDAG(): DAG = {
+    val dag = DAG(userConfig.getValue[Graph[ProcessorDescription,
+      PartitionerDescription]](StreamApplication.DAG).get)
+    val updated = dag.processors.map { idAndProcessor =>
+      val (id, oldProcessor) = idAndProcessor
+      val newProcessor = if (oldProcessor.jar == null) {
+        oldProcessor.copy(jar = appContext.appJar.getOrElse(null))
+      } else {
+        oldProcessor
+      }
+      (id, newProcessor)
+    }
+    DAG(dag.version, updated, dag.graph)
+  }
+}
+
+object AppMaster {
+
+  /** Master node doesn't return resource in time */
+  case object AllocateResourceTimeOut
+
+  /** Query task ActorRef by providing the taskId */
+  case class LookupTaskActorRef(taskId: TaskId)
+
+  case class TaskActorRef(task: ActorRef)
+
+  class ServiceNotAvailableException(reason: String) extends Exception(reason)
+
+  case class ExecutorBrief(
+      executorId: ExecutorId, executor: String, workerId: WorkerId, status: String)
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
new file mode 100644
index 0000000..458fded
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.appmaster
+
+import java.util
+import java.util.Date
+import java.util.concurrent.TimeUnit
+
+import akka.actor.{Actor, Cancellable, Stash}
+import io.gearpump.google.common.primitives.Longs
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.cluster.ClientToMaster.GetStallingTasks
+import org.apache.gearpump.streaming.AppMasterToMaster.StallingTasks
+import org.apache.gearpump.streaming._
+import org.apache.gearpump.streaming.appmaster.ClockService.HealthChecker.ClockValue
+import org.apache.gearpump.streaming.appmaster.ClockService._
+import org.apache.gearpump.streaming.storage.AppDataStore
+import org.apache.gearpump.streaming.task._
+import org.apache.gearpump.util.LogUtil
+import org.slf4j.Logger
+
+import scala.concurrent.Future
+import scala.concurrent.duration.FiniteDuration
+import scala.language.implicitConversions
+
+/**
+ * Maintains a global view of message timestamp in the application
+ */
+class ClockService(private var dag: DAG, store: AppDataStore) extends Actor with Stash {
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+
+  import context.dispatcher
+
+  private val healthChecker = new HealthChecker(stallingThresholdSeconds = 60)
+  private var healthCheckScheduler: Cancellable = null
+  private var snapshotScheduler: Cancellable = null
+
+  override def receive: Receive = null
+
+  override def preStart(): Unit = {
+    LOG.info("Initializing Clock service, get snapshotted StartClock ....")
+    store.get(START_CLOCK).asInstanceOf[Future[TimeStamp]].map { clock =>
+      val startClock = Option(clock).getOrElse(0L)
+
+      minCheckpointClock = Some(startClock)
+
+      // Recover the application by restarting from last persisted startClock.
+      // Only messge after startClock will be replayed.
+      self ! StoredStartClock(startClock)
+      LOG.info(s"Start Clock Retrieved, starting ClockService, startClock: $startClock")
+    }
+
+    context.become(waitForStartClock)
+  }
+
+  override def postStop(): Unit = {
+    Option(healthCheckScheduler).map(_.cancel)
+    Option(snapshotScheduler).map(_.cancel)
+  }
+
+  // Keep track of clock value of all processors.
+  private var clocks = Map.empty[ProcessorId, ProcessorClock]
+
+  // Each process can have multiple upstream processors. This keep track of the upstream clocks.
+  private var upstreamClocks = Map.empty[ProcessorId, Array[ProcessorClock]]
+
+  // We use Array instead of List for Performance consideration
+  private var processorClocks = Array.empty[ProcessorClock]
+
+  private var checkpointClocks: Map[TaskId, Vector[TimeStamp]] = null
+
+  private var minCheckpointClock: Option[TimeStamp] = None
+
+  private def checkpointEnabled(processor: ProcessorDescription): Boolean = {
+    val taskConf = processor.taskConf
+    taskConf != null && taskConf.getBoolean("state.checkpoint.enable") == Some(true)
+  }
+
+  private def resetCheckpointClocks(dag: DAG, startClock: TimeStamp): Unit = {
+    this.checkpointClocks = dag.processors.filter(startClock < _._2.life.death)
+      .filter { case (_, processor) =>
+        checkpointEnabled(processor)
+      }.flatMap { case (id, processor) =>
+      (0 until processor.parallelism).map(TaskId(id, _) -> Vector.empty[TimeStamp])
+    }
+    if (this.checkpointClocks.isEmpty) {
+      minCheckpointClock = None
+    }
+  }
+
+  private def initDag(startClock: TimeStamp): Unit = {
+    recoverDag(this.dag, startClock)
+  }
+
+  private def recoverDag(dag: DAG, startClock: TimeStamp): Unit = {
+    this.clocks = dag.processors.filter(startClock < _._2.life.death).
+      map { pair =>
+        val (processorId, processor) = pair
+        val parallelism = processor.parallelism
+        val clock = new ProcessorClock(processorId, processor.life, parallelism)
+        clock.init(startClock)
+        (processorId, clock)
+      }
+
+    this.upstreamClocks = clocks.map { pair =>
+      val (processorId, processor) = pair
+
+      val upstreams = dag.graph.incomingEdgesOf(processorId).map(_._1)
+      val upstreamClocks = upstreams.flatMap(clocks.get(_))
+      (processorId, upstreamClocks.toArray)
+    }
+
+    this.processorClocks = clocks.toArray.map(_._2)
+
+    resetCheckpointClocks(dag, startClock)
+  }
+
+  private def dynamicDAG(dag: DAG, startClock: TimeStamp): Unit = {
+    val newClocks = dag.processors.filter(startClock < _._2.life.death).
+      map { pair =>
+        val (processorId, processor) = pair
+        val parallelism = processor.parallelism
+
+        val clock = if (clocks.contains(processor.id)) {
+          clocks(processorId).copy(life = processor.life)
+        } else {
+          new ProcessorClock(processorId, processor.life, parallelism)
+        }
+        (processorId, clock)
+      }
+
+    this.clocks = newClocks
+
+    this.upstreamClocks = newClocks.map { pair =>
+      val (processorId, processor) = pair
+
+      val upstreams = dag.graph.incomingEdgesOf(processorId).map(_._1)
+      val upstreamClocks = upstreams.flatMap(newClocks.get(_))
+      (processorId, upstreamClocks.toArray)
+    }
+
+    // Inits the clock of all processors.
+    newClocks.map { pair =>
+      val (processorId, processorClock) = pair
+      val upstreamClock = getUpStreamMinClock(processorId)
+      val birth = processorClock.life.birth
+
+      if (dag.graph.inDegreeOf(processorId) == 0) {
+        processorClock.init(Longs.max(birth, startClock))
+      } else {
+        processorClock.init(upstreamClock)
+      }
+    }
+
+    this.processorClocks = clocks.toArray.map(_._2)
+
+    resetCheckpointClocks(dag, startClock)
+  }
+
+  def waitForStartClock: Receive = {
+    case StoredStartClock(startClock) =>
+      initDag(startClock)
+
+      import context.dispatcher
+
+      // Period report current clock
+      healthCheckScheduler = context.system.scheduler.schedule(
+        new FiniteDuration(5, TimeUnit.SECONDS),
+        new FiniteDuration(60, TimeUnit.SECONDS), self, HealthCheck)
+
+      // Period snpashot latest min startclock to external storage
+      snapshotScheduler = context.system.scheduler.schedule(new FiniteDuration(5, TimeUnit.SECONDS),
+        new FiniteDuration(5, TimeUnit.SECONDS), self, SnapshotStartClock)
+
+      unstashAll()
+      context.become(clockService)
+
+    case _ =>
+      stash()
+  }
+
+  private def getUpStreamMinClock(processorId: ProcessorId): TimeStamp = {
+    val clocks = upstreamClocks.get(processorId)
+    if (clocks.isDefined) {
+      if (clocks.get == null || clocks.get.length == 0) {
+        Long.MaxValue
+      } else {
+        ProcessorClocks.minClock(clocks.get)
+      }
+    } else {
+      Long.MaxValue
+    }
+  }
+
+  def clockService: Receive = {
+    case GetUpstreamMinClock(task) =>
+      sender ! UpstreamMinClock(getUpStreamMinClock(task.processorId))
+
+    case update@UpdateClock(task, clock) =>
+      val upstreamMinClock = getUpStreamMinClock(task.processorId)
+
+      val processorClock = clocks.get(task.processorId)
+      if (processorClock.isDefined) {
+        processorClock.get.updateMinClock(task.index, clock)
+      } else {
+        LOG.error(s"Cannot updateClock for task $task")
+      }
+      sender ! UpstreamMinClock(upstreamMinClock)
+
+    case GetLatestMinClock =>
+      sender ! LatestMinClock(minClock)
+
+    case GetStartClock =>
+      sender ! StartClock(getStartClock)
+
+    case deathCheck: CheckProcessorDeath =>
+      val processorId = deathCheck.processorId
+      val processorClock = clocks.get(processorId)
+      if (processorClock.isDefined) {
+        val life = processorClock.get.life
+        if (processorClock.get.min >= life.death) {
+
+          LOG.info(s"Removing $processorId from clock service...")
+          removeProcessor(processorId)
+        } else {
+          LOG.info(s"Unsuccessfully in removing $processorId from clock service...," +
+            s" min: ${processorClock.get.min}, life: $life")
+        }
+      }
+    case HealthCheck =>
+      selfCheck()
+
+    case SnapshotStartClock =>
+      snapshotStartClock()
+
+    case ReportCheckpointClock(task, time) =>
+      updateCheckpointClocks(task, time)
+
+    case GetCheckpointClock =>
+      sender ! CheckpointClock(minCheckpointClock)
+
+    case getStalling: GetStallingTasks =>
+      sender ! StallingTasks(healthChecker.getReport.stallingTasks)
+
+    case ChangeToNewDAG(dag) =>
+      if (dag.version > this.dag.version) {
+        // Transits to a new dag version
+        this.dag = dag
+        dynamicDAG(dag, getStartClock)
+      } else {
+        // Restarts current dag.
+        recoverDag(dag, getStartClock)
+      }
+      LOG.info(s"Change to new DAG(dag = ${dag.version}), send back ChangeToNewDAGSuccess")
+      sender ! ChangeToNewDAGSuccess(clocks.map { pair =>
+        val (id, clock) = pair
+        (id, clock.min)
+      })
+  }
+
+  private def removeProcessor(processorId: ProcessorId): Unit = {
+    clocks = clocks - processorId
+    processorClocks = processorClocks.filter(_.processorId != processorId)
+
+    upstreamClocks = upstreamClocks.map { pair =>
+      val (id, upstreams) = pair
+      val updatedUpstream = upstreams.filter(_.processorId != processorId)
+      (id, updatedUpstream)
+    }
+
+    upstreamClocks = upstreamClocks - processorId
+
+    // Removes dead processor from checkpoints.
+    checkpointClocks = checkpointClocks.filter { kv =>
+      val (taskId, processor) = kv
+      taskId.processorId != processorId
+    }
+  }
+
+  private def minClock: TimeStamp = {
+    ProcessorClocks.minClock(processorClocks)
+  }
+
+  def selfCheck(): Unit = {
+    val minTimestamp = minClock
+
+    if (Long.MaxValue == minTimestamp) {
+      processorClocks.foreach { clock =>
+        LOG.info(s"Processor ${clock.processorId} Clock: min: ${clock.min}, " +
+          s"taskClocks: " + clock.taskClocks.mkString(","))
+      }
+    }
+
+    healthChecker.check(minTimestamp, clocks, dag, System.currentTimeMillis())
+  }
+
+  private def getStartClock: TimeStamp = {
+    minCheckpointClock.getOrElse(minClock)
+  }
+
+  private def snapshotStartClock(): Unit = {
+    store.put(START_CLOCK, getStartClock)
+  }
+
+  private def updateCheckpointClocks(task: TaskId, time: TimeStamp): Unit = {
+    val clocks = checkpointClocks(task) :+ time
+    checkpointClocks += task -> clocks
+
+    if (checkpointClocks.forall(_._2.contains(time))) {
+      minCheckpointClock = Some(time)
+      LOG.info(s"minCheckpointTime $minCheckpointClock")
+
+      checkpointClocks = checkpointClocks.mapValues(_.dropWhile(_ <= time))
+    }
+  }
+}
+
+object ClockService {
+  val START_CLOCK = "startClock"
+
+  case object HealthCheck
+
+  class ProcessorClock(val processorId: ProcessorId, val life: LifeTime, val parallism: Int,
+      private var _min: TimeStamp = 0L, private var _taskClocks: Array[TimeStamp] = null) {
+
+    def copy(life: LifeTime): ProcessorClock = {
+      new ProcessorClock(processorId, life, parallism, _min, _taskClocks)
+    }
+
+    def min: TimeStamp = _min
+    def taskClocks: Array[TimeStamp] = _taskClocks
+
+    def init(startClock: TimeStamp): Unit = {
+      if (taskClocks == null) {
+        this._min = startClock
+        this._taskClocks = new Array(parallism)
+        util.Arrays.fill(taskClocks, startClock)
+      }
+    }
+
+    def updateMinClock(taskIndex: Int, clock: TimeStamp): Unit = {
+      taskClocks(taskIndex) = clock
+      _min = Longs.min(taskClocks: _*)
+    }
+  }
+
+  case object SnapshotStartClock
+
+  case class Report(stallingTasks: List[TaskId])
+
+  /**
+   * Check whether the clock is advancing normally
+   */
+  class HealthChecker(stallingThresholdSeconds: Int) {
+    private val LOG: Logger = LogUtil.getLogger(getClass)
+
+    private var minClock: ClockValue = null
+    private val stallingThresholdMilliseconds = stallingThresholdSeconds * 1000
+    // 60 seconds
+    private var stallingTasks = Array.empty[TaskId]
+
+    /** Check for stalling tasks */
+    def check(
+        currentMinClock: TimeStamp, processorClocks: Map[ProcessorId, ProcessorClock],
+        dag: DAG, now: TimeStamp): Unit = {
+      var isClockStalling = false
+      if (null == minClock || currentMinClock > minClock.appClock) {
+        minClock = ClockValue(systemClock = now, appClock = currentMinClock)
+      } else {
+        // Clock not advancing
+        if (now > minClock.systemClock + stallingThresholdMilliseconds) {
+          LOG.warn(s"Clock has not advanced for ${(now - minClock.systemClock) / 1000} seconds " +
+            s"since ${minClock.prettyPrint}...")
+          isClockStalling = true
+        }
+      }
+
+      if (isClockStalling) {
+        val processorId = dag.graph.topologicalOrderWithCirclesIterator.toList.find { processorId =>
+          val clock = processorClocks.get(processorId)
+          if (clock.isDefined) {
+            clock.get.min == minClock.appClock
+          } else {
+            false
+          }
+        }
+
+        processorId.foreach { processorId =>
+          val processorClock = processorClocks(processorId)
+          val taskClocks = processorClock.taskClocks
+          stallingTasks = taskClocks.zipWithIndex.filter(_._1 == minClock.appClock).
+            map(pair => TaskId(processorId, pair._2))
+        }
+        LOG.info(s"Stalling Tasks: ${stallingTasks.mkString(",")}")
+      } else {
+        stallingTasks = Array.empty[TaskId]
+      }
+    }
+
+    def getReport: Report = {
+      Report(stallingTasks.toList)
+    }
+  }
+
+  object HealthChecker {
+    case class ClockValue(systemClock: TimeStamp, appClock: TimeStamp) {
+      def prettyPrint: String = {
+        "(system clock: " + new Date(systemClock).toString + ", app clock: " + appClock + ")"
+      }
+    }
+  }
+
+  object ProcessorClocks {
+
+    // Get the Min clock of all processors
+    def minClock(clock: Array[ProcessorClock]): TimeStamp = {
+      var i = 0
+      var min = if (clock.length == 0) 0L else clock(0).min
+      while (i < clock.length) {
+        min = Math.min(min, clock(i).min)
+        i += 1
+      }
+      min
+    }
+  }
+
+  case class ChangeToNewDAG(dag: DAG)
+  case class ChangeToNewDAGSuccess(clocks: Map[ProcessorId, TimeStamp])
+
+  case class StoredStartClock(clock: TimeStamp)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala
new file mode 100644
index 0000000..3341d4f
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.appmaster
+
+import akka.actor.{Actor, ActorRef, Stash}
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.partitioner.PartitionerDescription
+import org.apache.gearpump.streaming._
+import org.apache.gearpump.streaming.appmaster.DagManager._
+import org.apache.gearpump.streaming.storage.AppDataStore
+import org.apache.gearpump.streaming.task.Subscriber
+import org.apache.gearpump.util.{Graph, LogUtil}
+import org.slf4j.Logger
+
+import scala.concurrent.Future
+
+/**
+ * Handles dag modification and other stuff related with DAG
+ *
+ * DagManager maintains multiple version of DAGs. For each version, the DAG is immutable.
+ * For operations like modifying a processor, it will create a new version of DAG.
+ */
+class DagManager(appId: Int, userConfig: UserConfig, store: AppDataStore, dag: Option[DAG])
+  extends Actor with Stash {
+
+  import context.dispatcher
+  private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
+  private val NOT_INITIALIZED = -1
+
+  private var dags = List.empty[DAG]
+  private var maxProcessorId = -1
+  private implicit val system = context.system
+
+  private var watchers = List.empty[ActorRef]
+
+  override def receive: Receive = null
+
+  override def preStart(): Unit = {
+    LOG.info("Initializing Dag Service, get stored Dag ....")
+    store.get(StreamApplication.DAG).asInstanceOf[Future[DAG]].map { storedDag =>
+      if (storedDag != null) {
+        dags :+= storedDag
+      } else {
+        dags :+= dag.getOrElse(DAG(userConfig.getValue[Graph[ProcessorDescription,
+          PartitionerDescription]](StreamApplication.DAG).get))
+      }
+      maxProcessorId = {
+        val keys = dags.head.processors.keys
+        if (keys.size == 0) {
+          0
+        } else {
+          keys.max
+        }
+      }
+      self ! DagInitiated
+    }
+    context.become(waitForDagInitiate)
+  }
+
+  def waitForDagInitiate: Receive = {
+    case DagInitiated =>
+      unstashAll()
+      context.become(dagService)
+    case _ =>
+      stash()
+  }
+
+  private def nextProcessorId: ProcessorId = {
+    maxProcessorId += 1
+    maxProcessorId
+  }
+
+  private def taskLaunchData(dag: DAG, processorId: Int, context: AnyRef): TaskLaunchData = {
+    val processorDescription = dag.processors(processorId)
+    val subscribers = Subscriber.of(processorId, dag)
+    TaskLaunchData(processorDescription, subscribers, context)
+  }
+
+  def dagService: Receive = {
+    case GetLatestDAG =>
+      // Get the latest version of DAG.
+      sender ! LatestDAG(dags.last)
+    case GetTaskLaunchData(version, processorId, context) =>
+      // Task information like Processor class, downstream subscriber processors and etc.
+      dags.find(_.version == version).foreach { dag =>
+        LOG.info(s"Get task launcher data for processor: $processorId, dagVersion: $version")
+        sender ! taskLaunchData(dag, processorId, context)
+      }
+    case ReplaceProcessor(oldProcessorId, inputNewProcessor) =>
+      // Replace a processor with new implementation. The upstream processors and downstream
+      // processors are NOT changed.
+      var newProcessor = inputNewProcessor.copy(id = nextProcessorId)
+      if (inputNewProcessor.jar == null) {
+        val oldJar = dags.last.processors.get(oldProcessorId).get
+        newProcessor = newProcessor.copy(jar = oldJar.jar)
+      }
+      if (dags.length > 1) {
+        sender ! DAGOperationFailed(
+          "We are in the process of handling previous dynamic dag change")
+      } else {
+        val oldDAG = dags.last
+        val newVersion = oldDAG.version + 1
+        val newDAG = replaceDAG(oldDAG, oldProcessorId, newProcessor, newVersion)
+        dags :+= newDAG
+
+        LOG.info(s"ReplaceProcessor old: $oldProcessorId, new: $newProcessor")
+        LOG.info(s"new DAG: $newDAG")
+        watchers.foreach(_ ! LatestDAG(newDAG))
+        sender ! DAGOperationSuccess
+      }
+
+    case WatchChange(watcher) =>
+      // Checks whether there are modifications for this DAG.
+      if (!this.watchers.contains(watcher)) {
+        this.watchers :+= watcher
+      }
+
+    case NewDAGDeployed(dagVersion) =>
+      // Means dynamic Dag transition completed, and the new DAG version has been successfully
+      // deployed. The obsolete dag versions will be removed.
+      if (dagVersion != NOT_INITIALIZED) {
+        dags = dags.filter(_.version == dagVersion)
+        store.put(StreamApplication.DAG, dags.last)
+      }
+  }
+
+  private def replaceDAG(
+      dag: DAG, oldProcessorId: ProcessorId, newProcessor: ProcessorDescription, newVersion: Int)
+    : DAG = {
+    val oldProcessorLife = LifeTime(dag.processors(oldProcessorId).life.birth,
+      newProcessor.life.birth)
+
+    val newProcessorMap = dag.processors ++
+      Map(oldProcessorId -> dag.processors(oldProcessorId).copy(life = oldProcessorLife),
+        newProcessor.id -> newProcessor)
+
+    val newGraph = dag.graph.subGraph(oldProcessorId).
+      replaceVertex(oldProcessorId, newProcessor.id).addGraph(dag.graph)
+    new DAG(newVersion, newProcessorMap, newGraph)
+  }
+}
+
+object DagManager {
+  case object DagInitiated
+
+  case class WatchChange(watcher: ActorRef)
+
+  case object GetLatestDAG
+  case class LatestDAG(dag: DAG)
+
+  case class GetTaskLaunchData(dagVersion: Int, processorId: Int, context: AnyRef = null)
+  case class TaskLaunchData(processorDescription : ProcessorDescription,
+      subscribers: List[Subscriber], context: AnyRef = null)
+
+  sealed trait DAGOperation
+
+  case class ReplaceProcessor(oldProcessorId: ProcessorId,
+      newProcessorDescription: ProcessorDescription) extends DAGOperation
+
+  sealed trait DAGOperationResult
+  case object DAGOperationSuccess extends DAGOperationResult
+  case class DAGOperationFailed(reason: String) extends DAGOperationResult
+
+  case class NewDAGDeployed(dagVersion: Int)
+}
\ No newline at end of file