You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:17 UTC

[07/49] incubator-gearpump git commit: fix GEARPUMP-118 change package name to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ExecutorManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ExecutorManager.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ExecutorManager.scala
new file mode 100644
index 0000000..79a65c4
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ExecutorManager.scala
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.appmaster
+
+import scala.concurrent.duration._
+import scala.util.{Failure, Try}
+
+import akka.actor.SupervisorStrategy.Stop
+import akka.actor._
+import akka.remote.RemoteScope
+import com.typesafe.config.Config
+import org.apache.commons.lang.exception.ExceptionUtils
+
+import org.apache.gearpump.cluster.AppMasterToWorker.ChangeExecutorResource
+import org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, ExecutorSystemStarted, StartExecutorSystemTimeout, StartExecutorSystems}
+import org.apache.gearpump.cluster.appmaster.WorkerInfo
+import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest}
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.cluster.{AppJar, AppMasterContext, ExecutorContext, UserConfig}
+import org.apache.gearpump.streaming.ExecutorId
+import org.apache.gearpump.streaming.ExecutorToAppMaster.RegisterExecutor
+import org.apache.gearpump.streaming.appmaster.ExecutorManager._
+import org.apache.gearpump.streaming.executor.Executor
+import org.apache.gearpump.util.{LogUtil, Util}
+
+/**
+ * ExecutorManager manage the start and stop of all executors.
+ *
+ * ExecutorManager will launch Executor when asked. It hide the details like starting
+ * a new ExecutorSystem from user. Please use ExecutorManager.props() to construct this actor
+ */
+private[appmaster] class ExecutorManager(
+    userConfig: UserConfig,
+    appContext: AppMasterContext,
+    executorFactory: (ExecutorContext, UserConfig, Address, ExecutorId) => Props,
+    clusterConfig: Config,
+    appName: String)
+  extends Actor {
+
+  private val LOG = LogUtil.getLogger(getClass)
+
+  import appContext.{appId, masterProxy, username}
+
+  private var taskManager: ActorRef = null
+
+  private implicit val actorSystem = context.system
+  private val systemConfig = context.system.settings.config
+
+  private var executors = Map.empty[Int, ExecutorInfo]
+
+  def receive: Receive = waitForTaskManager
+
+  def waitForTaskManager: Receive = {
+    case SetTaskManager(taskManager) =>
+      this.taskManager = taskManager
+      context.become(service orElse terminationWatch)
+  }
+
+  // If something wrong on executor, ExecutorManager will stop the current executor,
+  // and wait for AppMaster to start a new executor.
+  override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10,
+    withinTimeRange = 1.minute) {
+    case ex: Throwable =>
+      val executorId = Try(sender.path.name.toInt)
+      executorId match {
+        case scala.util.Success(id) => {
+          executors -= id
+          LOG.error(s"Executor $id throws exception, stop it...\n" +
+            ExceptionUtils.getStackTrace(ex))
+        }
+        case Failure(ex) => {
+          LOG.error(s"Sender ${sender.path} is dead, but seems it is not an executor...")
+        }
+      }
+      Stop
+  }
+
+  // Responds to outside queries
+  def service: Receive = {
+    case StartExecutors(resources, jar) =>
+      masterProxy ! StartExecutorSystems(resources, getExecutorJvmConfig(Some(jar)))
+    case ExecutorSystemStarted(executorSystem, boundedJar) =>
+      import executorSystem.{address, executorSystemId, resource => executorResource, worker}
+
+      val executorId = executorSystemId
+      val executorContext = ExecutorContext(executorId, worker, appId, appName,
+        appMaster = context.parent, executorResource)
+      executors += executorId -> ExecutorInfo(executorId, null, worker, boundedJar)
+
+      // Starts executor
+      val executor = context.actorOf(executorFactory(executorContext, userConfig,
+        address, executorId), executorId.toString)
+      executorSystem.bindLifeCycleWith(executor)
+    case StartExecutorSystemTimeout =>
+      taskManager ! StartExecutorsTimeOut
+
+    case RegisterExecutor(executor, executorId, resource, worker) =>
+      LOG.info(s"executor $executorId has been launched")
+      // Watches for executor termination
+      context.watch(executor)
+      val executorInfo = executors.get(executorId).get
+      executors += executorId -> executorInfo.copy(executor = executor)
+      taskManager ! ExecutorStarted(executorId, resource, worker.workerId, executorInfo.boundedJar)
+
+    // Broadcasts message to all executors
+    case BroadCast(msg) =>
+      LOG.info(s"Broadcast ${msg.getClass.getSimpleName} to all executors")
+      context.children.foreach(_ forward msg)
+
+    // Unicasts message to single executor
+    case UniCast(executorId, msg) =>
+      LOG.info(s"Unicast ${msg.getClass.getSimpleName} to executor $executorId")
+      val executor = executors.get(executorId)
+      executor.foreach(_.executor forward msg)
+
+    case GetExecutorInfo =>
+      sender ! executors
+
+    // Tells Executor manager resources that are occupied. The Executor Manager can use this
+    // information to tell worker to reclaim un-used resources
+    case ExecutorResourceUsageSummary(resources) =>
+      executors.foreach { pair =>
+        val (executorId, executor) = pair
+        val resource = resources.get(executorId)
+        val worker = executor.worker.ref
+        // Notifies the worker the actual resource used by this application.
+        resource match {
+          case Some(resource) =>
+            worker ! ChangeExecutorResource(appId, executorId, resource)
+          case None =>
+            worker ! ChangeExecutorResource(appId, executorId, Resource(0))
+        }
+      }
+  }
+
+  def terminationWatch: Receive = {
+    case Terminated(actor) =>
+      val executorId = Try(actor.path.name.toInt)
+      executorId match {
+        case scala.util.Success(id) => {
+          executors -= id
+          LOG.error(s"Executor $id is down")
+          taskManager ! ExecutorStopped(id)
+        }
+        case scala.util.Failure(ex) =>
+          LOG.error(s"failed to get the executor Id from path string ${actor.path}", ex)
+      }
+  }
+
+  private def getExecutorJvmConfig(jar: Option[AppJar]): ExecutorSystemJvmConfig = {
+    val executorAkkaConfig = clusterConfig
+    val jvmSetting = Util.resolveJvmSetting(executorAkkaConfig.withFallback(systemConfig)).executor
+
+    ExecutorSystemJvmConfig(jvmSetting.classPath, jvmSetting.vmargs, jar,
+      username, executorAkkaConfig)
+  }
+}
+
+private[appmaster] object ExecutorManager {
+  case class StartExecutors(resources: Array[ResourceRequest], jar: AppJar)
+  case class BroadCast(msg: Any)
+
+  case class UniCast(executorId: Int, msg: Any)
+
+  case object GetExecutorInfo
+
+  case class ExecutorStarted(
+      executorId: Int, resource: Resource, workerId: WorkerId, boundedJar: Option[AppJar])
+  case class ExecutorStopped(executorId: Int)
+
+  case class SetTaskManager(taskManager: ActorRef)
+
+  case object StartExecutorsTimeOut
+
+  def props(
+      userConfig: UserConfig, appContext: AppMasterContext, clusterConfig: Config, appName: String)
+    : Props = {
+    val executorFactory =
+      (executorContext: ExecutorContext,
+        userConfig: UserConfig,
+        address: Address,
+        executorId: ExecutorId) =>
+        Props(classOf[Executor], executorContext, userConfig)
+          .withDeploy(Deploy(scope = RemoteScope(address)))
+
+    Props(new ExecutorManager(userConfig, appContext, executorFactory, clusterConfig, appName))
+  }
+
+  case class ExecutorResourceUsageSummary(resources: Map[ExecutorId, Resource])
+
+  case class ExecutorInfo(
+      executorId: ExecutorId, executor: ActorRef, worker: WorkerInfo, boundedJar: Option[AppJar])
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala
new file mode 100644
index 0000000..6de5306
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.appmaster
+
+import akka.actor._
+import akka.pattern.ask
+import com.typesafe.config.Config
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.cluster.AppJar
+import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest}
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.partitioner.PartitionerDescription
+import org.apache.gearpump.streaming.appmaster.JarScheduler._
+import org.apache.gearpump.streaming.task.TaskId
+import org.apache.gearpump.streaming.{DAG, ProcessorDescription}
+import org.apache.gearpump.util.{Constants, Graph, LogUtil}
+
+import scala.concurrent.Future
+
+/**
+ * Different processors of the stream application can use different jars. JarScheduler is the
+ * scheduler for different jars.
+ *
+ * For a DAG of multiple processors, each processor can have its own jar. Tasks of same jar
+ * is scheduled by TaskScheduler, and TaskSchedulers are scheduled by JarScheduler.
+ *
+ * In runtime, the implementation is delegated to actor JarSchedulerImpl
+ */
+class JarScheduler(appId: Int, appName: String, config: Config, factory: ActorRefFactory) {
+  private val actor: ActorRef = factory.actorOf(Props(new JarSchedulerImpl(appId, appName, config)))
+  private implicit val dispatcher = factory.dispatcher
+  private implicit val timeout = Constants.FUTURE_TIMEOUT
+
+  /** Set the current DAG version active */
+  def setDag(dag: DAG, startClock: Future[TimeStamp]): Unit = {
+    actor ! TransitToNewDag
+    startClock.map { start =>
+      actor ! NewDag(dag, start)
+    }
+  }
+
+  /** AppMaster ask JarScheduler about how many resource it wants */
+  def getResourceRequestDetails(): Future[Array[ResourceRequestDetail]] = {
+    (actor ? GetResourceRequestDetails).asInstanceOf[Future[Array[ResourceRequestDetail]]]
+  }
+
+  /**
+   * AppMaster has resource allocated, and ask the jar scheduler to schedule tasks
+   * for this executor.
+   */
+  def scheduleTask(appJar: AppJar, workerId: WorkerId, executorId: Int, resource: Resource)
+  : Future[List[TaskId]] = {
+    (actor ? ScheduleTask(appJar, workerId, executorId, resource))
+      .asInstanceOf[Future[List[TaskId]]]
+  }
+
+  /**
+   * Some executor JVM process is dead. AppMaster asks jar scheduler to re-schedule the impacted
+   * tasks.
+   */
+  def executorFailed(executorId: Int): Future[Option[ResourceRequestDetail]] = {
+    (actor ? ExecutorFailed(executorId)).asInstanceOf[Future[Option[ResourceRequestDetail]]]
+  }
+}
+
+object JarScheduler {
+
+  case class ResourceRequestDetail(jar: AppJar, requests: Array[ResourceRequest])
+
+  case class NewDag(dag: DAG, startTime: TimeStamp)
+
+  case object TransitToNewDag
+
+  case object GetResourceRequestDetails
+
+  /**
+   * Schedule tasks for one appJar.
+   *
+   * @param appJar Application jar.
+   * @param workerId Worker machine Id.
+   * @param executorId Executor Id.
+   * @param resource Slots that are available.
+   */
+  case class ScheduleTask(appJar: AppJar, workerId: WorkerId, executorId: Int, resource: Resource)
+
+  /** Some executor JVM is dead, try to recover tasks that are located on failed executor */
+  case class ExecutorFailed(executorId: Int)
+
+  class JarSchedulerImpl(appId: Int, appName: String, config: Config) extends Actor with Stash {
+
+    // Each TaskScheduler maps to a jar.
+    private var taskSchedulers = Map.empty[AppJar, TaskScheduler]
+
+    private val LOG = LogUtil.getLogger(getClass)
+
+    def receive: Receive = waitForNewDag
+
+    def waitForNewDag: Receive = {
+      case TransitToNewDag => // Continue current state
+      case NewDag(dag, startTime) =>
+
+        LOG.info(s"Init JarScheduler, dag version: ${dag.version}, startTime: $startTime")
+
+        val processors = dag.processors.values.groupBy(_.jar)
+
+        taskSchedulers = processors.map { jarAndProcessors =>
+          val (jar, processors) = jarAndProcessors
+          // Construct the sub DAG, each sub DAG maps to a separate jar.
+          val subGraph = Graph.empty[ProcessorDescription, PartitionerDescription]
+          processors.foreach { processor =>
+            if (startTime < processor.life.death) {
+              subGraph.addVertex(processor)
+            }
+          }
+          val subDagForSingleJar = DAG(subGraph)
+
+          val taskScheduler = taskSchedulers
+            .getOrElse(jar, new TaskSchedulerImpl(appId, appName, config))
+
+          LOG.info(s"Set DAG for TaskScheduler, count: " + subDagForSingleJar.processors.size)
+          taskScheduler.setDAG(subDagForSingleJar)
+          jar -> taskScheduler
+        }
+        unstashAll()
+        context.become(ready)
+      case other =>
+        stash()
+    }
+
+    def ready: Receive = {
+      // Notifies there is a new DAG coming.
+      case TransitToNewDag =>
+        context.become(waitForNewDag)
+
+      case GetResourceRequestDetails =>
+
+        // Asks each TaskScheduler (Each for one jar) the resource requests.
+        val result: Array[ResourceRequestDetail] = taskSchedulers.map { jarAndScheduler =>
+          val (jar, scheduler) = jarAndScheduler
+          ResourceRequestDetail(jar, scheduler.getResourceRequests())
+        }.toArray
+        LOG.info(s"GetRequestDetails " + result.mkString(";"))
+        sender ! result
+
+      case ScheduleTask(appJar, workerId, executorId, resource) =>
+        val result: List[TaskId] = taskSchedulers.get(appJar).map { scheduler =>
+          scheduler.schedule(workerId, executorId, resource)
+        }.getOrElse(List.empty)
+        LOG.info(s"ScheduleTask " + result.mkString(";"))
+        sender ! result
+      case ExecutorFailed(executorId) =>
+        val result: Option[ResourceRequestDetail] = taskSchedulers.
+          find(_._2.scheduledTasks(executorId).nonEmpty).map { jarAndScheduler =>
+          ResourceRequestDetail(jarAndScheduler._1, jarAndScheduler._2.executorFailed(executorId))
+        }
+        LOG.info(s"ExecutorFailed " + result.mkString(";"))
+        sender ! result
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
new file mode 100644
index 0000000..3d43ee7
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.appmaster
+
+import org.apache.gearpump._
+import org.apache.gearpump.cluster.AppMasterToMaster.AppMasterSummary
+import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterStatus
+import org.apache.gearpump.cluster.{MasterToAppMaster, UserConfig}
+import org.apache.gearpump.streaming.appmaster.AppMaster.ExecutorBrief
+import org.apache.gearpump.streaming.{ExecutorId, LifeTime, ProcessorId}
+import org.apache.gearpump.util.Graph
+import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
+
+/** Stream application summary, used for REST API */
+case class StreamAppMasterSummary(
+    appType: String = "streaming",
+    appId: Int,
+    appName: String = null,
+    actorPath: String = null,
+    clock: TimeStamp = 0L,
+    status: AppMasterStatus = MasterToAppMaster.AppMasterActive,
+    startTime: TimeStamp = 0L,
+    uptime: TimeStamp = 0L,
+    user: String = null,
+    homeDirectory: String = "",
+    logFile: String = "",
+    dag: Graph[ProcessorId, String] = null,
+    executors: List[ExecutorBrief] = null,
+    processors: Map[ProcessorId, ProcessorSummary] = Map.empty[ProcessorId, ProcessorSummary],
+    // Hiearachy level for each processor
+    processorLevels: Map[ProcessorId, Int] = Map.empty[ProcessorId, Int],
+    historyMetricsConfig: HistoryMetricsConfig = null)
+  extends AppMasterSummary
+
+case class TaskCount(count: Int)
+
+case class ProcessorSummary(
+    id: ProcessorId,
+    taskClass: String,
+    parallelism: Int,
+    description: String,
+    taskConf: UserConfig,
+    life: LifeTime,
+    executors: List[ExecutorId],
+    taskCount: Map[ExecutorId, TaskCount])
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskLocator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskLocator.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskLocator.scala
new file mode 100644
index 0000000..87630b7
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskLocator.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.appmaster
+
+import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions, ConfigValueFactory}
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.streaming.appmaster.TaskLocator.{Localities, Locality, NonLocality, WorkerLocality}
+import org.apache.gearpump.streaming.task.TaskId
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+/**
+ * TaskLocator is used to decide which machine one task should run on.
+ *
+ * User can config [[org.apache.gearpump.streaming.Constants#GEARPUMP_STREAMING_LOCALITIES]] to
+ * decide to control which machine the task is running on.
+ */
+class TaskLocator(appName: String, config: Config) {
+  private val taskLocalities: Map[TaskId, Locality] = loadTaskLocalities(config)
+
+  /** Finds where a task should belongs to */
+  def locateTask(taskId: TaskId): Locality = {
+    taskLocalities.getOrElse(taskId, NonLocality)
+  }
+
+  private def loadTaskLocalities(config: Config): Map[TaskId, Locality] = {
+    import org.apache.gearpump.streaming.Constants.GEARPUMP_STREAMING_LOCALITIES
+    Try(config.getConfig(s"$GEARPUMP_STREAMING_LOCALITIES.$appName")).map { appConfig =>
+      val json = appConfig.root().render(ConfigRenderOptions.concise)
+      Localities.fromJson(json)
+    }.map { localityConfig =>
+      import localityConfig.localities
+      localities.keySet.flatMap { workerId =>
+        val tasks = localities(workerId)
+        tasks.map((_, WorkerLocality(workerId)))
+      }.toArray.toMap
+    }.getOrElse(Map.empty[TaskId, Locality])
+  }
+}
+
+object TaskLocator {
+
+  trait Locality
+
+  /** Means we require the resource from the specific worker */
+  case class WorkerLocality(workerId: WorkerId) extends Locality
+
+  /** Means no preference on worker */
+  object NonLocality extends Locality
+
+  /** Localities settings. Mapping from workerId to list of taskId */
+  case class Localities(localities: Map[WorkerId, Array[TaskId]])
+
+  object Localities {
+    val pattern = "task_([0-9]+)_([0-9]+)".r
+
+    // To avoid polluting the classpath, we do the JSON translation ourself instead of
+    // introducing JSON library dependencies directly.
+    def fromJson(json: String): Localities = {
+      val localities = ConfigFactory.parseString(json).getAnyRef("localities")
+        .asInstanceOf[java.util.Map[String, String]].asScala.map { pair =>
+        val workerId: WorkerId = WorkerId.parse(pair._1)
+        val tasks = pair._2.split(",").map { task =>
+          val pattern(processorId, taskIndex) = task
+          TaskId(processorId.toInt, taskIndex.toInt)
+        }
+        (workerId, tasks)
+      }.toMap
+      new Localities(localities)
+    }
+
+    def toJson(localities: Localities): String = {
+      val map = localities.localities.toList.map { pair =>
+        (WorkerId.render(pair._1), pair._2.map(task =>
+          s"task_${task.processorId}_${task.index}").mkString(","))
+      }.toMap.asJava
+      ConfigFactory.empty().withValue("localities", ConfigValueFactory.fromAnyRef(map)).
+        root.render(ConfigRenderOptions.concise())
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
new file mode 100644
index 0000000..662418c
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
@@ -0,0 +1,496 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.appmaster
+
+import akka.actor._
+import akka.pattern.ask
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.cluster.MasterToAppMaster.ReplayFromTimestampWindowTrailingEdge
+import org.apache.gearpump.streaming.AppMasterToExecutor._
+import org.apache.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, RegisterTask, UnRegisterTask}
+import org.apache.gearpump.streaming._
+import org.apache.gearpump.streaming.appmaster.AppMaster.{AllocateResourceTimeOut, LookupTaskActorRef, TaskActorRef}
+import org.apache.gearpump.streaming.appmaster.ClockService.{ChangeToNewDAG, ChangeToNewDAGSuccess}
+import org.apache.gearpump.streaming.appmaster.DagManager.{GetLatestDAG, GetTaskLaunchData, LatestDAG, NewDAGDeployed, TaskLaunchData, WatchChange}
+import org.apache.gearpump.streaming.appmaster.ExecutorManager.{ExecutorStarted, StartExecutorsTimeOut, _}
+import org.apache.gearpump.streaming.appmaster.TaskManager._
+import org.apache.gearpump.streaming.appmaster.TaskRegistry.{Accept, TaskLocation}
+import org.apache.gearpump.streaming.executor.Executor.RestartTasks
+import org.apache.gearpump.streaming.executor.ExecutorRestartPolicy
+import org.apache.gearpump.streaming.task._
+import org.apache.gearpump.streaming.util.ActorPathUtil
+import org.apache.gearpump.util.{Constants, LogUtil}
+import org.slf4j.Logger
+
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
+/**
+ *
+ * TaskManager track all tasks's status.
+ *
+ * It is state machine with three states:
+ *  1. applicationReady
+ *  2. recovery
+ *  3. dynamicDag
+ *
+ * When in state applicationReady:
+ *  1. When there is message-loss or JVM crash, transit to state recovery.
+ *  2. When user modify the DAG, transit to dynamicDag.
+ *
+ * When in state recovery:
+ *  1. When all tasks has been recovered, transit to applicationReady.
+ *
+ * When in state dynamicDag:
+ *  1. When dynamic dag transition is complete, transit to applicationReady.
+ *  2. When there is message loss or JVM crash, transit to state recovery.
+ */
+private[appmaster] class TaskManager(
+    appId: Int,
+    dagManager: ActorRef,
+    jarScheduler: JarScheduler,
+    executorManager: ActorRef,
+    clockService: ActorRef,
+    appMaster: ActorRef,
+    appName: String)
+  extends Actor {
+
+  private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
+  private val systemConfig = context.system.settings.config
+
+  private val ids = new SessionIdFactory()
+
+  private val executorRestartPolicy = new ExecutorRestartPolicy(maxNrOfRetries = 5,
+    withinTimeRange = 20.seconds)
+
+  private implicit val timeout = Constants.FUTURE_TIMEOUT
+  private implicit val actorSystem = context.system
+
+  import context.dispatcher
+
+  dagManager ! WatchChange(watcher = self)
+  executorManager ! SetTaskManager(self)
+
+  private def getStartClock: Future[TimeStamp] = {
+    (clockService ? GetStartClock).asInstanceOf[Future[StartClock]].map(_.clock)
+  }
+
+  private var startClock: Future[TimeStamp] = getStartClock
+
+  def receive: Receive = applicationReady(DagReadyState.empty)
+
+  private def onClientQuery(taskRegistry: TaskRegistry): Receive = {
+    case clock: ClockEvent =>
+      clockService forward clock
+    case GetTaskList =>
+      sender ! TaskList(taskRegistry.getTaskExecutorMap)
+    case LookupTaskActorRef(taskId) =>
+      val executorId = taskRegistry.getExecutorId(taskId)
+      val requestor = sender
+      executorId.map { executorId =>
+        val taskPath = ActorPathUtil.taskActorPath(appMaster, executorId, taskId)
+        context.actorSelection(taskPath).resolveOne(3.seconds).map { taskActorRef =>
+          requestor ! TaskActorRef(taskActorRef)
+        }
+      }
+  }
+
+  /**
+   * State applicationReady
+   */
+  def applicationReady(state: DagReadyState): Receive = {
+    executorManager ! state.taskRegistry.usedResource
+    dagManager ! NewDAGDeployed(state.dag.version)
+    dagManager ! GetLatestDAG
+    LOG.info(s"goto state ApplicationReady(dag = ${state.dag.version})...")
+
+    val recoverRegistry = new TaskRegistry(expectedTasks = state.dag.tasks,
+      deadTasks = state.taskRegistry.deadTasks)
+
+    val recoverState = new StartDagState(state.dag, recoverRegistry)
+
+    val onError: Receive = {
+      case executorStopped@ExecutorStopped(executorId) =>
+        if (state.taskRegistry.isTaskRegisteredForExecutor(executorId)) {
+          self ! executorStopped
+          context.become(recovery(recoverState))
+        }
+      case MessageLoss(executorId, taskId, cause) =>
+        if (state.taskRegistry.isTaskRegisteredForExecutor(executorId) &&
+          executorRestartPolicy.allowRestartExecutor(executorId)) {
+          context.become(recovery(recoverState))
+        } else {
+          val errorMsg = s"Task $taskId fails too many times to recover"
+          appMaster ! FailedToRecover(errorMsg)
+        }
+      case replay: ReplayFromTimestampWindowTrailingEdge =>
+        LOG.error(s"Received $replay")
+        context.become(recovery(recoverState))
+    }
+
+    val onNewDag: Receive = {
+      case LatestDAG(newDag) =>
+
+        if (newDag.version > state.dag.version) {
+
+          executorManager ! BroadCast(StartDynamicDag(newDag.version))
+          LOG.info("Broadcasting StartDynamicDag")
+
+          val dagDiff = migrate(state.dag, newDag)
+          jarScheduler.setDag(newDag, startClock)
+          val resourceRequestsDetails = jarScheduler.getResourceRequestDetails()
+          resourceRequestsDetails.map { details =>
+            details.foreach { detail =>
+              if (detail.requests.length > 0 && detail.requests.exists(!_.resource.isEmpty)) {
+                executorManager ! StartExecutors(detail.requests, detail.jar)
+              }
+            }
+          }
+
+          var modifiedTasks = List.empty[TaskId]
+          for (processorId <- dagDiff.modifiedProcessors ++ dagDiff.impactedUpstream) {
+            val executors = state.taskRegistry.processorExecutors(processorId)
+            executors.foreach { pair =>
+              val (executorId, tasks) = pair
+              modifiedTasks ++= tasks
+              dagManager ! GetTaskLaunchData(newDag.version, processorId,
+                ChangeTasksOnExecutor(executorId, tasks))
+            }
+          }
+
+          val taskChangeRegistry = new TaskChangeRegistry(modifiedTasks)
+
+          val deadTasks = state.taskRegistry.deadTasks
+          val registeredTasks = state.taskRegistry.registeredTasks
+          val dynamicTaskRegistry = new TaskRegistry(newDag.tasks, registeredTasks, deadTasks)
+
+          val nextState = new StartDagState(newDag, dynamicTaskRegistry, taskChangeRegistry)
+          context.become(dynamicDag(nextState, recoverState))
+        }
+    }
+
+    val onUnRegisterTask: Receive = {
+      case unRegister: UnRegisterTask =>
+
+        LOG.info(s"Received $unRegister, stop task ${unRegister.taskId}")
+        sender ! StopTask(unRegister.taskId)
+
+        val taskId = unRegister.taskId
+        val registry = state.taskRegistry
+        val deadTasks = registry.deadTasks
+
+        val newRegistry = registry.copy(registeredTasks = registry.registeredTasks - taskId,
+          deadTasks = deadTasks + taskId)
+
+        val newState = new DagReadyState(state.dag, newRegistry)
+        context.become(applicationReady(newState))
+    }
+
+    // Recovers to same version
+    onClientQuery(state.taskRegistry) orElse onError orElse onNewDag orElse
+      onUnRegisterTask orElse unHandled("applicationReady")
+  }
+
+  /**
+   * State dynamicDag
+   */
+  def dynamicDag(state: StartDagState, recoverState: StartDagState): Receive = {
+    LOG.info(s"DynamicDag transit to dag version: ${state.dag.version}...")
+
+    val onMessageLoss: Receive = {
+      case executorStopped@ExecutorStopped(executorId) =>
+        context.become(recovery(recoverState))
+      case MessageLoss(executorId, taskId, cause) =>
+        if (state.taskRegistry.isTaskRegisteredForExecutor(executorId) &&
+          executorRestartPolicy.allowRestartExecutor(executorId)) {
+          context.become(recovery(recoverState))
+        } else {
+          val errorMsg = s"Task $taskId fails too many times to recover"
+          appMaster ! FailedToRecover(errorMsg)
+        }
+    }
+
+    onMessageLoss orElse onClientQuery(state.taskRegistry) orElse
+      startDag(state, recoverState) orElse unHandled("dynamicDag")
+  }
+
+  private def startDag(state: StartDagState, recoverState: StartDagState): Receive = {
+    case executor: ExecutorStarted =>
+      import executor.{boundedJar, executorId, resource, workerId}
+      val taskIdsFuture = jarScheduler.scheduleTask(boundedJar.get, workerId, executorId, resource)
+      taskIdsFuture.foreach { taskIds =>
+        LOG.info(s"Executor $executor has been started, " +
+          s"start to schedule tasks: ${taskIds.mkString(",")}")
+        taskIds.groupBy(_.processorId).foreach { pair =>
+          val (processorId, tasks) = pair
+          dagManager ! GetTaskLaunchData(state.dag.version, processorId,
+            StartTasksOnExecutor(executor.executorId, tasks))
+        }
+      }
+
+    case StartExecutorsTimeOut =>
+      appMaster ! AllocateResourceTimeOut
+    case TaskLaunchData(processorDescription, subscribers, command) =>
+      command match {
+        case StartTasksOnExecutor(executorId, tasks) =>
+          LOG.info(s"Start tasks on Executor($executorId), tasks: " + tasks)
+          val launchTasks = LaunchTasks(tasks, state.dag.version, processorDescription, subscribers)
+          executorManager ! UniCast(executorId, launchTasks)
+          tasks.foreach(executorRestartPolicy.addTaskToExecutor(executorId, _))
+        case ChangeTasksOnExecutor(executorId, tasks) =>
+          LOG.info("change Task on executor: " + executorId + ", tasks: " + tasks)
+          val changeTasks = ChangeTasks(tasks, state.dag.version, processorDescription.life,
+            subscribers)
+          executorManager ! UniCast(executorId, changeTasks)
+        case other =>
+          LOG.error(s"severe error! we expect ExecutorStarted but get ${other.getClass.toString}")
+      }
+    case TasksLaunched =>
+    // We will track all launched task by message RegisterTask
+    case TasksChanged(tasks) =>
+      tasks.foreach(task => state.taskChangeRegistry.taskChanged(task))
+
+      if (allTasksReady(state)) {
+        broadcastLocations(state)
+      }
+
+    case RegisterTask(taskId, executorId, host) =>
+      val client = sender
+      val register = state.taskRegistry
+      val status = register.registerTask(taskId, TaskLocation(executorId, host))
+      if (status == Accept) {
+        LOG.info(s"RegisterTask($taskId) TaskLocation: $host, Executor: $executorId")
+        val sessionId = ids.newSessionId
+
+        startClock.foreach(clock => client ! TaskRegistered(taskId, sessionId, clock))
+        if (allTasksReady(state)) {
+          broadcastLocations(state)
+        }
+      } else {
+        sender ! TaskRejected(taskId)
+      }
+
+    case TaskChanged(taskId, dagVersion) =>
+      state.taskChangeRegistry.taskChanged(taskId)
+      if (allTasksReady(state)) {
+        broadcastLocations(state)
+      }
+    case locationReceived: TaskLocationsReceived =>
+      state.executorReadyRegistry.registerExecutor(locationReceived.executorId)
+      if (allTasksReady(state) &&
+        state.executorReadyRegistry.allRegistered(state.taskRegistry.executors)) {
+        LOG.info("All executors are ready to start...")
+        clockService ! ChangeToNewDAG(state.dag)
+      }
+    case locationRejected: TaskLocationsRejected =>
+      LOG.error(s"received $locationRejected, start to recover")
+      context.become(recovery(recoverState))
+
+    case ChangeToNewDAGSuccess(_) =>
+      if (allTasksReady(state) &&
+        state.executorReadyRegistry.allRegistered(state.taskRegistry.executors)) {
+        executorManager ! BroadCast(StartAllTasks(state.dag.version))
+        context.become(applicationReady(new DagReadyState(state.dag, state.taskRegistry)))
+      }
+  }
+
+  def onExecutorError: Receive = {
+    case ExecutorStopped(executorId) =>
+      if (executorRestartPolicy.allowRestartExecutor(executorId)) {
+        jarScheduler.executorFailed(executorId).foreach { resourceRequestDetail =>
+          if (resourceRequestDetail.isDefined) {
+            executorManager ! StartExecutors(resourceRequestDetail.get.requests,
+              resourceRequestDetail.get.jar)
+          }
+        }
+      } else {
+        val errorMsg = s"Executor restarted too many times to recover"
+        appMaster ! FailedToRecover(errorMsg)
+      }
+  }
+
+  private def allTasksReady(state: StartDagState): Boolean = {
+    import state.{taskChangeRegistry, taskRegistry}
+    taskRegistry.isAllTasksRegistered && taskChangeRegistry.allTaskChanged
+  }
+
+  private def broadcastLocations(state: StartDagState): Unit = {
+    LOG.info(s"All tasks have been launched; send Task locations to all executors")
+    val taskLocations = state.taskRegistry.getTaskLocations
+    executorManager ! BroadCast(TaskLocationsReady(taskLocations, state.dag.version))
+  }
+
+  /**
+   * State recovery
+   */
+  def recovery(state: StartDagState): Receive = {
+    val recoverDagVersion = state.dag.version
+    executorManager ! BroadCast(RestartTasks(recoverDagVersion))
+
+    // Use new Start Clock so that we recover at timepoint we fails.
+    startClock = getStartClock
+
+    jarScheduler.setDag(state.dag, startClock)
+
+    LOG.info(s"goto state Recovery(recoverDag = $recoverDagVersion)...")
+    val ignoreClock: Receive = {
+      case clock: ClockEvent =>
+      // Ignores clock events.
+    }
+
+    if (state.dag.isEmpty) {
+      applicationReady(new DagReadyState(state.dag, state.taskRegistry))
+    } else {
+      val registry = new TaskRegistry(expectedTasks = state.dag.tasks,
+        deadTasks = state.taskRegistry.deadTasks)
+
+      val recoverState = new StartDagState(state.dag, registry)
+      ignoreClock orElse startDag(state, recoverState) orElse onExecutorError orElse
+        unHandled("recovery")
+    }
+  }
+
+  private def unHandled(state: String): Receive = {
+    case other =>
+      LOG.info(s"Received unknown message $other in state $state")
+  }
+}
+
+private[appmaster] object TaskManager {
+
+  /**
+   * When application is ready, then transit to DagReadyState
+   */
+  class DagReadyState(val dag: DAG, val taskRegistry: TaskRegistry)
+
+  object DagReadyState {
+    def empty: DagReadyState = {
+      new DagReadyState(
+        DAG.empty.copy(version = -1),
+        new TaskRegistry(List.empty[TaskId]))
+    }
+  }
+
+  /**
+   * When application is booting up or doing recovery, it use StartDagState
+   */
+  class StartDagState(
+      val dag: DAG,
+      val taskRegistry: TaskRegistry,
+      val taskChangeRegistry: TaskChangeRegistry = new TaskChangeRegistry(List.empty[TaskId]),
+      val executorReadyRegistry: ExecutorRegistry = new ExecutorRegistry)
+
+  case object GetTaskList
+
+  case class TaskList(tasks: Map[TaskId, ExecutorId])
+
+  case class FailedToRecover(errorMsg: String)
+
+  /**
+   * Starts new Tasks on Executor executorId
+   */
+  case class StartTasksOnExecutor(executorId: Int, tasks: List[TaskId])
+
+  /**
+   * Changes existing tasks on executor executorId
+   */
+  case class ChangeTasksOnExecutor(executorId: Int, tasks: List[TaskId])
+
+  /**
+   * Tracks the registration of all new started executors.
+   */
+  class ExecutorRegistry {
+    private var registeredExecutors = Set.empty[ExecutorId]
+
+    def registerExecutor(executorId: ExecutorId): Unit = {
+      registeredExecutors += executorId
+    }
+
+    def allRegistered(all: List[ExecutorId]): Boolean = {
+      all.forall(executor => registeredExecutors.contains(executor))
+    }
+  }
+
+  /**
+   * Tracks the registration of all changed tasks.
+   */
+  class TaskChangeRegistry(targetTasks: List[TaskId]) {
+    private var registeredTasks = Set.empty[TaskId]
+    def taskChanged(taskId: TaskId): Unit = {
+      registeredTasks += taskId
+    }
+    def allTaskChanged: Boolean = {
+      targetTasks.forall(taskId => registeredTasks.contains(taskId))
+    }
+  }
+
+  object TaskChangeRegistry {
+    def empty: TaskChangeRegistry = new TaskChangeRegistry(List.empty[TaskId])
+  }
+
+  /**
+   * DAGDiff is used to track impacted processors when doing dynamic dag.
+   */
+  case class DAGDiff(
+      addedProcessors: List[ProcessorId],
+      modifiedProcessors: List[ProcessorId],
+      impactedUpstream: List[ProcessorId])
+
+  /**
+   * Migrates from old DAG to new DAG, return DAGDiff
+   */
+  def migrate(leftDAG: DAG, rightDAG: DAG): DAGDiff = {
+    val left = leftDAG.processors.keySet
+    val right = rightDAG.processors.keySet
+
+    val added = right -- left
+    val join = right -- added
+
+    val modified = join.filter { processorId =>
+      leftDAG.processors(processorId) != rightDAG.processors(processorId)
+    }
+
+    val upstream = (list: Set[ProcessorId]) => {
+      list.flatMap { processorId =>
+        rightDAG.graph.incomingEdgesOf(processorId).map(_._1).toSet
+      } -- list
+    }
+
+    val impactedUpstream = upstream(added ++ modified)
+
+    // All upstream tasks are affected, and should be handled properly.
+    DAGDiff(added.toList, modified.toList, impactedUpstream.toList)
+  }
+
+  /**
+   * Each new task will be assigned with a unique session Id.
+   */
+  class SessionIdFactory {
+    private var nextSessionId = 1
+
+    /**
+     * Returns a new session Id for new task
+     */
+    final def newSessionId: Int = {
+      val sessionId = nextSessionId
+      nextSessionId += 1
+      sessionId
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskRegistry.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskRegistry.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskRegistry.scala
new file mode 100644
index 0000000..b910d57
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskRegistry.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.appmaster
+
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.streaming.appmaster.ExecutorManager.ExecutorResourceUsageSummary
+import org.apache.gearpump.streaming.appmaster.TaskRegistry._
+import org.apache.gearpump.streaming.task.TaskId
+import org.apache.gearpump.streaming.{ExecutorId, ProcessorId}
+import org.apache.gearpump.transport.HostPort
+import org.apache.gearpump.util.LogUtil
+import org.slf4j.Logger
+
+/**
+ * Tracks the registration of all tasks, when the application is booting up.
+ */
+class TaskRegistry(val expectedTasks: List[TaskId],
+    var registeredTasks: Map[TaskId, TaskLocation] = Map.empty[TaskId, TaskLocation],
+    var deadTasks: Set[TaskId] = Set.empty[TaskId]) {
+
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+
+  private val processors = expectedTasks.map(_.processorId).toSet
+
+  /**
+   * When a task is booted, it need to call registerTask to register itself.
+   * If this task is valid, then accept it, otherwise reject it.
+   *
+   * @param taskId Task that register itself to TaskRegistry.
+   * @param location The host and port where this task is running on. NOTE: The host and port
+   *                 is NOT the same host and port of Akka remoting. Instead, it is host and port
+   *                 of custom netty layer, see [[org.apache.gearpump.transport.netty.Context]].
+   */
+  def registerTask(taskId: TaskId, location: TaskLocation): RegisterTaskStatus = {
+    val processorId = taskId.processorId
+
+    if (processors.contains(processorId)) {
+      registeredTasks += taskId -> location
+      Accept
+    } else {
+      LOG.error(s" the task is not accepted for registration, taskId: ${taskId}")
+      Reject
+    }
+  }
+
+  def copy(expectedTasks: List[TaskId] = this.expectedTasks,
+      registeredTasks: Map[TaskId, TaskLocation] = this.registeredTasks,
+      deadTasks: Set[TaskId] = this.deadTasks): TaskRegistry = {
+    new TaskRegistry(expectedTasks, registeredTasks, deadTasks)
+  }
+
+  def getTaskLocations: TaskLocations = {
+    val taskLocations = registeredTasks.toList.groupBy(_._2.host).map { pair =>
+      val (k, v) = pair
+      val taskIds = v.map(_._1)
+      (k, taskIds.toSet)
+    }
+    TaskLocations(taskLocations)
+  }
+
+  def getTaskExecutorMap: Map[TaskId, ExecutorId] = {
+    getTaskLocations.locations.flatMap { pair =>
+      val (hostPort, taskSet) = pair
+      taskSet.map { taskId =>
+        (taskId, getExecutorId(taskId).getOrElse(-1))
+      }
+    }
+  }
+
+  /** Query the executor Id where the task is running on */
+  def getExecutorId(taskId: TaskId): Option[Int] = {
+    registeredTasks.get(taskId).map(_.executorId)
+  }
+
+  /** Gets list of allocated executor Ids */
+  def executors: List[ExecutorId] = {
+    registeredTasks.toList.map(_._2.executorId)
+  }
+
+  def isAllTasksRegistered: Boolean = {
+    val aliveTasks = (expectedTasks.toSet -- deadTasks)
+    aliveTasks.forall(task => registeredTasks.contains(task))
+  }
+
+  def isTaskRegisteredForExecutor(executorId: ExecutorId): Boolean = {
+    registeredTasks.exists(_._2.executorId == executorId)
+  }
+
+  private def filterTasks(processorId: ProcessorId): List[TaskId] = {
+    registeredTasks.keys.toList.filter(_.processorId == processorId)
+  }
+
+  /** List of executors that current processor taks are running on */
+  def processorExecutors(processorId: ProcessorId): Map[ExecutorId, List[TaskId]] = {
+    val taskToExecutor = filterTasks(processorId).flatMap { taskId =>
+      getExecutorId(taskId).map { executorId =>
+        (taskId, executorId)
+      }
+    }
+
+    val executorToTasks = taskToExecutor.groupBy(_._2).map { kv =>
+      val (k, v) = kv
+      (k, v.map(_._1))
+    }
+    executorToTasks
+  }
+
+  /** Summary about how many resources are used for all running tasks */
+  def usedResource: ExecutorResourceUsageSummary = {
+    val resourceMap = registeredTasks.foldLeft(Map.empty[ExecutorId, Resource]) { (map, task) =>
+      val resource = map.getOrElse(task._2.executorId, Resource(0)) + Resource(1)
+      map + (task._2.executorId -> resource)
+    }
+    ExecutorResourceUsageSummary(resourceMap)
+  }
+}
+
+object TaskRegistry {
+  sealed trait RegisterTaskStatus
+  case object Accept extends RegisterTaskStatus
+  case object Reject extends RegisterTaskStatus
+
+  case class TaskLocation(executorId: Int, host: HostPort)
+
+  case class TaskLocations(locations: Map[HostPort, Set[TaskId]])
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerImpl.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerImpl.scala
new file mode 100644
index 0000000..df4490c
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerImpl.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.appmaster
+
+import com.typesafe.config.Config
+import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest}
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.streaming.DAG
+import org.apache.gearpump.streaming.appmaster.TaskLocator.{Locality, WorkerLocality}
+import org.apache.gearpump.streaming.appmaster.TaskScheduler.{Location, TaskStatus}
+import org.apache.gearpump.streaming.task.TaskId
+import org.apache.gearpump.util.Constants
+
+/**
+ * Schedules tasks to run for new allocated resources. TaskScheduler only schedule tasks that
+ * share the same jar. For scheduling for multiple jars, see
+ * [[org.apache.gearpump.streaming.appmaster.JarScheduler]].
+ */
+trait TaskScheduler {
+
+  /**
+   * This notify the scheduler that the task DAG is created.
+   * @param dag task dag
+   */
+  def setDAG(dag: DAG): Unit
+
+  /**
+   * Get the resource requirements for all unscheduled tasks.
+   */
+  def getResourceRequests(): Array[ResourceRequest]
+
+  /**
+   * This notifies the scheduler that a resource slot on {workerId} and {executorId} is allocated
+   * , and expect a task to be scheduled in return.
+   * Task locality should be considered when deciding whether to offer a task on target {worker}
+   * and {executor}
+   *
+   * @param workerId which worker this resource is located.
+   * @param executorId which executorId this resource belongs to.
+   * @return a list of tasks
+   */
+  def schedule(workerId: WorkerId, executorId: Int, resource: Resource): List[TaskId]
+
+  /**
+   * This notifies the scheduler that {executorId} is failed, and expect a set of
+   * ResourceRequest for all failed tasks on that executor.
+   *
+   * @param executorId executor that failed
+   * @return resource requests of the failed executor
+   */
+  def executorFailed(executorId: Int): Array[ResourceRequest]
+
+  /**
+   * Queries the task list that already scheduled on the executor
+   *
+   * @param executorId executor to query
+   * @return a list of tasks
+   */
+  def scheduledTasks(executorId: Int): List[TaskId]
+}
+
+object TaskScheduler {
+  case class Location(workerId: WorkerId, executorId: Int)
+
+  class TaskStatus(val taskId: TaskId, val preferLocality: Locality, var allocation: Location)
+}
+
+class TaskSchedulerImpl(appId: Int, appName: String, config: Config) extends TaskScheduler {
+  private val executorNum = config.getInt(Constants.APPLICATION_EXECUTOR_NUMBER)
+
+  private var tasks = List.empty[TaskStatus]
+
+  // Finds the locality of the tasks
+  private val taskLocator = new TaskLocator(appName, config)
+
+  override def setDAG(dag: DAG): Unit = {
+    val taskMap = tasks.map(_.taskId).zip(tasks).toMap
+
+    tasks = dag.tasks.sortBy(_.index).map { taskId =>
+      val locality = taskLocator.locateTask(taskId)
+      taskMap.getOrElse(taskId, new TaskStatus(taskId, locality, allocation = null))
+    }
+  }
+
+  def getResourceRequests(): Array[ResourceRequest] = {
+    fetchResourceRequests(fromOneWorker = false)
+  }
+
+  import org.apache.gearpump.cluster.scheduler.Relaxation._
+  private def fetchResourceRequests(fromOneWorker: Boolean = false): Array[ResourceRequest] = {
+    var workersResourceRequest = Map.empty[WorkerId, Resource]
+
+    tasks.filter(_.allocation == null).foreach { task =>
+      task.preferLocality match {
+        case WorkerLocality(workerId) =>
+          val current = workersResourceRequest.getOrElse(workerId, Resource.empty)
+          workersResourceRequest += workerId -> (current + Resource(1))
+        case _ =>
+          val workerId = WorkerId.unspecified
+          val current = workersResourceRequest.getOrElse(workerId, Resource.empty)
+          workersResourceRequest += workerId -> (current + Resource(1))
+      }
+    }
+
+    workersResourceRequest.map { workerIdAndResource =>
+      val (workerId, resource) = workerIdAndResource
+      if (workerId == WorkerId.unspecified) {
+        ResourceRequest(resource, workerId = WorkerId.unspecified, executorNum = executorNum)
+      } else {
+        ResourceRequest(resource, workerId, relaxation = SPECIFICWORKER)
+      }
+    }.toArray
+  }
+
+  override def schedule(workerId: WorkerId, executorId: Int, resource: Resource): List[TaskId] = {
+    var scheduledTasks = List.empty[TaskId]
+    val location = Location(workerId, executorId)
+    // Schedules tasks for specific worker
+    scheduledTasks ++= scheduleTasksForLocality(resource, location,
+      (locality) => locality == WorkerLocality(workerId))
+
+    // Schedules tasks without specific location preference
+    scheduledTasks ++= scheduleTasksForLocality(resource - Resource(scheduledTasks.length),
+      location, (locality) => true)
+    scheduledTasks
+  }
+
+  private def scheduleTasksForLocality(
+      resource: Resource, resourceLocation: Location, matcher: (Locality) => Boolean)
+    : List[TaskId] = {
+    var scheduledTasks = List.empty[TaskId]
+    var index = 0
+    var remain = resource.slots
+    while (index < tasks.length && remain > 0) {
+      val taskStatus = tasks(index)
+      if (taskStatus.allocation == null && matcher(taskStatus.preferLocality)) {
+        taskStatus.allocation = resourceLocation
+        scheduledTasks +:= taskStatus.taskId
+        remain -= 1
+      }
+      index += 1
+    }
+    scheduledTasks
+  }
+
+  override def executorFailed(executorId: Int): Array[ResourceRequest] = {
+    val failedTasks = tasks.filter { status =>
+      status.allocation != null && status.allocation.executorId == executorId
+    }
+    // Cleans the location of failed tasks
+    failedTasks.foreach(_.allocation = null)
+
+    Array(ResourceRequest(Resource(failedTasks.length),
+      workerId = WorkerId.unspecified, relaxation = ONEWORKER))
+  }
+
+  override def scheduledTasks(executorId: Int): List[TaskId] = {
+    tasks.filter { status =>
+      status.allocation != null && status.allocation.executorId == executorId
+    }.map(_.taskId)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala
new file mode 100644
index 0000000..5ca92dd
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.dsl.op._
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+import org.apache.gearpump.util.Graph
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.language.implicitConversions
+
+class Stream[T](
+    private val graph: Graph[Op, OpEdge], private val thisNode: Op,
+    private val edge: Option[OpEdge] = None) {
+
+  /**
+   * converts a value[T] to a list of value[R]
+   *
+   * @param fun FlatMap function
+   * @param description The description message for this operation
+   * @return A new stream with type [R]
+   */
+  def flatMap[R](fun: T => TraversableOnce[R], description: String = null): Stream[R] = {
+    val flatMapOp = FlatMapOp(fun, Option(description).getOrElse("flatmap"))
+    graph.addVertex(flatMapOp)
+    graph.addEdge(thisNode, edge.getOrElse(Direct), flatMapOp)
+    new Stream[R](graph, flatMapOp)
+  }
+
+  /**
+   * Maps message of type T message of type R
+   *
+   * @param fun Function
+   * @return A new stream with type [R]
+   */
+  def map[R](fun: T => R, description: String = null): Stream[R] = {
+    this.flatMap({ data =>
+      Option(fun(data))
+    }, Option(description).getOrElse("map"))
+  }
+
+  /**
+   * Keeps records when fun(T) == true
+   *
+   * @param fun  the filter
+   * @return  a new stream after filter
+   */
+  def filter(fun: T => Boolean, description: String = null): Stream[T] = {
+    this.flatMap({ data =>
+      if (fun(data)) Option(data) else None
+    }, Option(description).getOrElse("filter"))
+  }
+
+  /**
+   * Reduces operations.
+   *
+   * @param fun  reduction function
+   * @param description description message for this operator
+   * @return a new stream after reduction
+   */
+  def reduce(fun: (T, T) => T, description: String = null): Stream[T] = {
+    val reduceOp = ReduceOp(fun, Option(description).getOrElse("reduce"))
+    graph.addVertex(reduceOp)
+    graph.addEdge(thisNode, edge.getOrElse(Direct), reduceOp)
+    new Stream(graph, reduceOp)
+  }
+
+  /**
+   * Log to task log file
+   */
+  def log(): Unit = {
+    this.map(msg => LoggerFactory.getLogger("dsl").info(msg.toString), "log")
+  }
+
+  /**
+   * Merges data from two stream into one
+   *
+   * @param other the other stream
+   * @return  the merged stream
+   */
+  def merge(other: Stream[T], description: String = null): Stream[T] = {
+    val mergeOp = MergeOp(Option(description).getOrElse("merge"))
+    graph.addVertex(mergeOp)
+    graph.addEdge(thisNode, edge.getOrElse(Direct), mergeOp)
+    graph.addEdge(other.thisNode, other.edge.getOrElse(Shuffle), mergeOp)
+    new Stream[T](graph, mergeOp)
+  }
+
+  /**
+   * Group by function (T => Group)
+   *
+   * For example, we have T type, People(name: String, gender: String, age: Int)
+   * groupBy[People](_.gender) will group the people by gender.
+   *
+   * You can append other combinators after groupBy
+   *
+   * For example,
+   * {{{
+   * Stream[People].groupBy(_.gender).flatmap(..).filter.(..).reduce(..)
+   * }}}
+   *
+   * @param fun  Group by function
+   * @param parallelism  Parallelism level
+   * @param description  The description
+   * @return  the grouped stream
+   */
+  def groupBy[Group](fun: T => Group, parallelism: Int = 1, description: String = null)
+    : Stream[T] = {
+    val groupOp = GroupByOp(fun, parallelism, Option(description).getOrElse("groupBy"))
+    graph.addVertex(groupOp)
+    graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp)
+    new Stream[T](graph, groupOp)
+  }
+
+  /**
+   * Connects with a low level Processor(TaskDescription)
+   *
+   * @param processor  a user defined processor
+   * @param parallelism  parallelism level
+   * @return  new stream after processing with type [R]
+   */
+  def process[R](
+      processor: Class[_ <: Task], parallelism: Int, conf: UserConfig = UserConfig.empty,
+      description: String = null): Stream[R] = {
+    val processorOp = ProcessorOp(processor, parallelism, conf,
+      Option(description).getOrElse("process"))
+    graph.addVertex(processorOp)
+    graph.addEdge(thisNode, edge.getOrElse(Shuffle), processorOp)
+    new Stream[R](graph, processorOp, Some(Shuffle))
+  }
+}
+
+class KVStream[K, V](stream: Stream[Tuple2[K, V]]) {
+  /**
+   * GroupBy key
+   *
+   * Applies to Stream[Tuple2[K,V]]
+   *
+   * @param parallelism  the parallelism for this operation
+   * @return  the new KV stream
+   */
+  def groupByKey(parallelism: Int = 1): Stream[Tuple2[K, V]] = {
+    stream.groupBy(Stream.getTupleKey[K, V], parallelism, "groupByKey")
+  }
+
+  /**
+   * Sum the value of the tuples
+   *
+   * Apply to Stream[Tuple2[K,V]], V must be of type Number
+   *
+   * For input (key, value1), (key, value2), will generate (key, value1 + value2)
+   * @param numeric  the numeric operations
+   * @return  the sum stream
+   */
+  def sum(implicit numeric: Numeric[V]): Stream[(K, V)] = {
+    stream.reduce(Stream.sumByValue[K, V](numeric), "sum")
+  }
+}
+
+object Stream {
+
+  def apply[T](graph: Graph[Op, OpEdge], node: Op, edge: Option[OpEdge]): Stream[T] = {
+    new Stream[T](graph, node, edge)
+  }
+
+  def getTupleKey[K, V](tuple: Tuple2[K, V]): K = tuple._1
+
+  def sumByValue[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, V]) => Tuple2[K, V]
+  = (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2))
+
+  implicit def streamToKVStream[K, V](stream: Stream[Tuple2[K, V]]): KVStream[K, V] = {
+    new KVStream(stream)
+  }
+
+  implicit class Sink[T](stream: Stream[T]) extends java.io.Serializable {
+    def sink[T](dataSink: DataSink, parallism: Int, conf: UserConfig, description: String)
+      : Stream[T] = {
+      implicit val sink = DataSinkOp[T](dataSink, parallism, conf,
+        Some(description).getOrElse("traversable"))
+      stream.graph.addVertex(sink)
+      stream.graph.addEdge(stream.thisNode, Shuffle, sink)
+      new Stream[T](stream.graph, sink)
+    }
+
+    def sink[T](
+        sink: Class[_ <: Task], parallism: Int, conf: UserConfig = UserConfig.empty,
+        description: String = null): Stream[T] = {
+      val sinkOp = ProcessorOp(sink, parallism, conf, Option(description).getOrElse("source"))
+      stream.graph.addVertex(sinkOp)
+      stream.graph.addEdge(stream.thisNode, Shuffle, sinkOp)
+      new Stream[T](stream.graph, sinkOp)
+    }
+  }
+}
+
+class LoggerSink[T] extends DataSink {
+  var logger: Logger = null
+
+  private var context: TaskContext = null
+
+  override def open(context: TaskContext): Unit = {
+    this.logger = context.logger
+  }
+
+  override def write(message: Message): Unit = {
+    logger.info("logging message " + message.msg)
+  }
+
+  override def close(): Unit = Unit
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
new file mode 100644
index 0000000..5027500
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.streaming.StreamApplication
+import org.apache.gearpump.streaming.dsl.op.{DataSourceOp, Op, OpEdge, ProcessorOp}
+import org.apache.gearpump.streaming.dsl.plan.Planner
+import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+import org.apache.gearpump.util.Graph
+import org.apache.gearpump.{Message, TimeStamp}
+
+import scala.language.implicitConversions
+
+/**
+ * Example:
+ * {{{
+ * val data = "This is a good start, bingo!! bingo!!"
+ * app.fromCollection(data.lines.toList).
+ * // word => (word, count)
+ * flatMap(line => line.split("[\\s]+")).map((_, 1)).
+ * // (word, count1), (word, count2) => (word, count1 + count2)
+ * groupBy(kv => kv._1).reduce(sum(_, _))
+ *
+ * val appId = context.submit(app)
+ * context.close()
+ * }}}
+ *
+ * @param name name of app
+ */
+class StreamApp(
+    val name: String, system: ActorSystem, userConfig: UserConfig, val graph: Graph[Op, OpEdge]) {
+
+  def this(name: String, system: ActorSystem, userConfig: UserConfig) = {
+    this(name, system, userConfig, Graph.empty[Op, OpEdge])
+  }
+
+  def plan(): StreamApplication = {
+    implicit val actorSystem = system
+    val planner = new Planner
+    val dag = planner.plan(graph)
+    StreamApplication(name, dag, userConfig)
+  }
+}
+
+object StreamApp {
+  def apply(name: String, context: ClientContext, userConfig: UserConfig = UserConfig.empty)
+    : StreamApp = {
+    new StreamApp(name, context.system, userConfig)
+  }
+
+  implicit def streamAppToApplication(streamApp: StreamApp): StreamApplication = {
+    streamApp.plan
+  }
+
+  implicit class Source(app: StreamApp) extends java.io.Serializable {
+
+    def source[T](dataSource: DataSource, parallism: Int): Stream[T] = {
+      source(dataSource, parallism, UserConfig.empty)
+    }
+
+    def source[T](dataSource: DataSource, parallism: Int, description: String): Stream[T] = {
+      source(dataSource, parallism, UserConfig.empty, description)
+    }
+
+    def source[T](dataSource: DataSource, parallism: Int, conf: UserConfig): Stream[T] = {
+      source(dataSource, parallism, conf, description = null)
+    }
+
+    def source[T](dataSource: DataSource, parallism: Int, conf: UserConfig, description: String)
+      : Stream[T] = {
+      implicit val sourceOp = DataSourceOp(dataSource, parallism, conf, description)
+      app.graph.addVertex(sourceOp)
+      new Stream[T](app.graph, sourceOp)
+    }
+    def source[T](seq: Seq[T], parallism: Int, description: String): Stream[T] = {
+      this.source(new CollectionDataSource[T](seq), parallism, UserConfig.empty, description)
+    }
+
+    def source[T](source: Class[_ <: Task], parallism: Int, conf: UserConfig, description: String)
+      : Stream[T] = {
+      val sourceOp = ProcessorOp(source, parallism, conf, Option(description).getOrElse("source"))
+      app.graph.addVertex(sourceOp)
+      new Stream[T](app.graph, sourceOp)
+    }
+  }
+}
+
+/** A test message source which generated message sequence repeatedly. */
+class CollectionDataSource[T](seq: Seq[T]) extends DataSource {
+  private lazy val iterator: Iterator[T] = seq.iterator
+
+  override def read(): Message = {
+    if (iterator.hasNext) {
+      Message(iterator.next())
+    } else {
+      null
+    }
+  }
+
+  override def close(): Unit = {}
+
+  override def open(context: TaskContext, startTime: TimeStamp): Unit = {}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
new file mode 100644
index 0000000..6eff20c
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.javaapi
+
+import scala.collection.JavaConverters._
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.dsl.Stream
+import org.apache.gearpump.streaming.javaapi.dsl.functions._
+import org.apache.gearpump.streaming.task.Task
+
+/**
+ * Java DSL
+ */
+class JavaStream[T](val stream: Stream[T]) {
+
+  /** FlatMap on stream */
+  def flatMap[R](fn: FlatMapFunction[T, R], description: String): JavaStream[R] = {
+    new JavaStream[R](stream.flatMap({ t: T => fn(t).asScala }, description))
+  }
+
+  /** Map on stream */
+  def map[R](fn: MapFunction[T, R], description: String): JavaStream[R] = {
+    new JavaStream[R](stream.map({ t: T => fn(t) }, description))
+  }
+
+  /** Only keep the messages that FilterFunction returns true.  */
+  def filter(fn: FilterFunction[T], description: String): JavaStream[T] = {
+    new JavaStream[T](stream.filter({ t: T => fn(t) }, description))
+  }
+
+  /** Does aggregation on the stream */
+  def reduce(fn: ReduceFunction[T], description: String): JavaStream[T] = {
+    new JavaStream[T](stream.reduce({ (t1: T, t2: T) => fn(t1, t2) }, description))
+  }
+
+  def log(): Unit = {
+    stream.log()
+  }
+
+  /** Merges streams of same type together */
+  def merge(other: JavaStream[T], description: String): JavaStream[T] = {
+    new JavaStream[T](stream.merge(other.stream, description))
+  }
+
+  /**
+   * Group by a stream and turns it to a list of sub-streams. Operations chained after
+   * groupBy applies to sub-streams.
+   */
+  def groupBy[Group](fn: GroupByFunction[T, Group], parallelism: Int, description: String)
+    : JavaStream[T] = {
+    new JavaStream[T](stream.groupBy({t: T => fn(t)}, parallelism, description))
+  }
+
+  /** Add a low level Processor to process messages */
+  def process[R](
+      processor: Class[_ <: Task], parallelism: Int, conf: UserConfig, description: String)
+    : JavaStream[R] = {
+    new JavaStream[R](stream.process(processor, parallelism, conf, description))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
new file mode 100644
index 0000000..0d841be
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.javaapi
+
+import java.util.Collection
+import scala.collection.JavaConverters._
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.streaming.dsl.{CollectionDataSource, StreamApp}
+import org.apache.gearpump.streaming.source.DataSource
+
+class JavaStreamApp(name: String, context: ClientContext, userConfig: UserConfig) {
+
+  private val streamApp = StreamApp(name, context, userConfig)
+
+  def source[T](collection: Collection[T], parallelism: Int,
+      conf: UserConfig, description: String): JavaStream[T] = {
+    val dataSource = new CollectionDataSource(collection.asScala.toSeq)
+    source(dataSource, parallelism, conf, description)
+  }
+
+  def source[T](dataSource: DataSource, parallelism: Int,
+      conf: UserConfig, description: String): JavaStream[T] = {
+    new JavaStream[T](streamApp.source(dataSource, parallelism, conf, description))
+  }
+
+  def run(): Unit = {
+    context.submit(streamApp)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala
new file mode 100644
index 0000000..49d9dec
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.op
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.task.Task
+
+/**
+ * Operators for the DSL
+ */
+sealed trait Op {
+  def description: String
+  def conf: UserConfig
+}
+
+/**
+ * When translated to running DAG, SlaveOP can be attach to MasterOP or other SlaveOP
+ * "Attach" means running in same Actor.
+ */
+trait SlaveOp[T] extends Op
+
+case class FlatMapOp[T, R](
+    fun: (T) => TraversableOnce[R], description: String, conf: UserConfig = UserConfig.empty)
+  extends SlaveOp[T]
+
+case class ReduceOp[T](fun: (T, T) => T, description: String, conf: UserConfig = UserConfig.empty)
+  extends SlaveOp[T]
+
+trait MasterOp extends Op
+
+trait ParameterizedOp[T] extends MasterOp
+
+case class MergeOp(description: String, override val conf: UserConfig = UserConfig.empty)
+  extends MasterOp
+
+case class GroupByOp[T, R](
+    fun: T => R, parallelism: Int, description: String,
+    override val conf: UserConfig = UserConfig.empty)
+  extends ParameterizedOp[T]
+
+case class ProcessorOp[T <: Task](
+    processor: Class[T], parallelism: Int, conf: UserConfig, description: String)
+  extends ParameterizedOp[T]
+
+case class DataSourceOp[T](
+    dataSource: DataSource, parallelism: Int, conf: UserConfig, description: String)
+  extends ParameterizedOp[T]
+
+case class DataSinkOp[T](
+    dataSink: DataSink, parallelism: Int, conf: UserConfig, description: String)
+  extends ParameterizedOp[T]
+
+/**
+ * Contains operators which can be chained to single one.
+ *
+ * For example, flatmap().map().reduce() can be chained to single operator as
+ * no data shuffling is required.
+ * @param ops list of operations
+ */
+case class OpChain(ops: List[Op]) extends Op {
+  def head: Op = ops.head
+  def last: Op = ops.last
+
+  def description: String = null
+
+  override def conf: UserConfig = {
+    // The head's conf has priority
+    ops.reverse.foldLeft(UserConfig.empty) { (conf, op) =>
+      conf.withConfig(op.conf)
+    }
+  }
+}
+
+trait OpEdge
+
+/**
+ * The upstream OP and downstream OP doesn't require network data shuffle.
+ *
+ * For example, map, flatmap operation doesn't require network shuffle, we can use Direct
+ * to represent the relation with upstream operators.
+ */
+case object Direct extends OpEdge
+
+/**
+ * The upstream OP and downstream OP DOES require network data shuffle.
+ *
+ * For example, map, flatmap operation doesn't require network shuffle, we can use Direct
+ * to represent the relation with upstream operators.
+ */
+case object Shuffle extends OpEdge
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala
new file mode 100644
index 0000000..b2e2932
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.partitioner
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.partitioner.UnicastPartitioner
+
+/**
+ * Partition messages by applying group by function first.
+ *
+ * For example:
+ * {{{
+ * case class People(name: String, gender: String)
+ *
+ * object Test{
+ *
+ *   val groupBy: (People => String) = people => people.gender
+ *   val partitioner = GroupByPartitioner(groupBy)
+ * }
+ * }}}
+ *
+ * @param groupBy First apply message with groupBy function, then pick the hashCode of the output
+ *   to do the partitioning. You must define hashCode() for output type of groupBy function.
+ */
+class GroupByPartitioner[T, GROUP](groupBy: T => GROUP = null) extends UnicastPartitioner {
+  override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
+    val hashCode = groupBy(msg.msg.asInstanceOf[T]).hashCode()
+    (hashCode & Integer.MAX_VALUE) % partitionNum
+  }
+}
\ No newline at end of file