You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:17 UTC
[07/49] incubator-gearpump git commit: fix GEARPUMP-118 change
package name to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ExecutorManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ExecutorManager.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ExecutorManager.scala
new file mode 100644
index 0000000..79a65c4
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ExecutorManager.scala
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.appmaster
+
+import scala.concurrent.duration._
+import scala.util.{Failure, Try}
+
+import akka.actor.SupervisorStrategy.Stop
+import akka.actor._
+import akka.remote.RemoteScope
+import com.typesafe.config.Config
+import org.apache.commons.lang.exception.ExceptionUtils
+
+import org.apache.gearpump.cluster.AppMasterToWorker.ChangeExecutorResource
+import org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, ExecutorSystemStarted, StartExecutorSystemTimeout, StartExecutorSystems}
+import org.apache.gearpump.cluster.appmaster.WorkerInfo
+import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest}
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.cluster.{AppJar, AppMasterContext, ExecutorContext, UserConfig}
+import org.apache.gearpump.streaming.ExecutorId
+import org.apache.gearpump.streaming.ExecutorToAppMaster.RegisterExecutor
+import org.apache.gearpump.streaming.appmaster.ExecutorManager._
+import org.apache.gearpump.streaming.executor.Executor
+import org.apache.gearpump.util.{LogUtil, Util}
+
+/**
+ * ExecutorManager manage the start and stop of all executors.
+ *
+ * ExecutorManager will launch Executor when asked. It hide the details like starting
+ * a new ExecutorSystem from user. Please use ExecutorManager.props() to construct this actor
+ */
+private[appmaster] class ExecutorManager(
+ userConfig: UserConfig,
+ appContext: AppMasterContext,
+ executorFactory: (ExecutorContext, UserConfig, Address, ExecutorId) => Props,
+ clusterConfig: Config,
+ appName: String)
+ extends Actor {
+
+ private val LOG = LogUtil.getLogger(getClass)
+
+ import appContext.{appId, masterProxy, username}
+
+ private var taskManager: ActorRef = null
+
+ private implicit val actorSystem = context.system
+ private val systemConfig = context.system.settings.config
+
+ private var executors = Map.empty[Int, ExecutorInfo]
+
+ def receive: Receive = waitForTaskManager
+
+ def waitForTaskManager: Receive = {
+ case SetTaskManager(taskManager) =>
+ this.taskManager = taskManager
+ context.become(service orElse terminationWatch)
+ }
+
+ // If something wrong on executor, ExecutorManager will stop the current executor,
+ // and wait for AppMaster to start a new executor.
+ override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10,
+ withinTimeRange = 1.minute) {
+ case ex: Throwable =>
+ val executorId = Try(sender.path.name.toInt)
+ executorId match {
+ case scala.util.Success(id) => {
+ executors -= id
+ LOG.error(s"Executor $id throws exception, stop it...\n" +
+ ExceptionUtils.getStackTrace(ex))
+ }
+ case Failure(ex) => {
+ LOG.error(s"Sender ${sender.path} is dead, but seems it is not an executor...")
+ }
+ }
+ Stop
+ }
+
+ // Responds to outside queries
+ def service: Receive = {
+ case StartExecutors(resources, jar) =>
+ masterProxy ! StartExecutorSystems(resources, getExecutorJvmConfig(Some(jar)))
+ case ExecutorSystemStarted(executorSystem, boundedJar) =>
+ import executorSystem.{address, executorSystemId, resource => executorResource, worker}
+
+ val executorId = executorSystemId
+ val executorContext = ExecutorContext(executorId, worker, appId, appName,
+ appMaster = context.parent, executorResource)
+ executors += executorId -> ExecutorInfo(executorId, null, worker, boundedJar)
+
+ // Starts executor
+ val executor = context.actorOf(executorFactory(executorContext, userConfig,
+ address, executorId), executorId.toString)
+ executorSystem.bindLifeCycleWith(executor)
+ case StartExecutorSystemTimeout =>
+ taskManager ! StartExecutorsTimeOut
+
+ case RegisterExecutor(executor, executorId, resource, worker) =>
+ LOG.info(s"executor $executorId has been launched")
+ // Watches for executor termination
+ context.watch(executor)
+ val executorInfo = executors.get(executorId).get
+ executors += executorId -> executorInfo.copy(executor = executor)
+ taskManager ! ExecutorStarted(executorId, resource, worker.workerId, executorInfo.boundedJar)
+
+ // Broadcasts message to all executors
+ case BroadCast(msg) =>
+ LOG.info(s"Broadcast ${msg.getClass.getSimpleName} to all executors")
+ context.children.foreach(_ forward msg)
+
+ // Unicasts message to single executor
+ case UniCast(executorId, msg) =>
+ LOG.info(s"Unicast ${msg.getClass.getSimpleName} to executor $executorId")
+ val executor = executors.get(executorId)
+ executor.foreach(_.executor forward msg)
+
+ case GetExecutorInfo =>
+ sender ! executors
+
+ // Tells Executor manager resources that are occupied. The Executor Manager can use this
+ // information to tell worker to reclaim un-used resources
+ case ExecutorResourceUsageSummary(resources) =>
+ executors.foreach { pair =>
+ val (executorId, executor) = pair
+ val resource = resources.get(executorId)
+ val worker = executor.worker.ref
+ // Notifies the worker the actual resource used by this application.
+ resource match {
+ case Some(resource) =>
+ worker ! ChangeExecutorResource(appId, executorId, resource)
+ case None =>
+ worker ! ChangeExecutorResource(appId, executorId, Resource(0))
+ }
+ }
+ }
+
+ def terminationWatch: Receive = {
+ case Terminated(actor) =>
+ val executorId = Try(actor.path.name.toInt)
+ executorId match {
+ case scala.util.Success(id) => {
+ executors -= id
+ LOG.error(s"Executor $id is down")
+ taskManager ! ExecutorStopped(id)
+ }
+ case scala.util.Failure(ex) =>
+ LOG.error(s"failed to get the executor Id from path string ${actor.path}", ex)
+ }
+ }
+
+ private def getExecutorJvmConfig(jar: Option[AppJar]): ExecutorSystemJvmConfig = {
+ val executorAkkaConfig = clusterConfig
+ val jvmSetting = Util.resolveJvmSetting(executorAkkaConfig.withFallback(systemConfig)).executor
+
+ ExecutorSystemJvmConfig(jvmSetting.classPath, jvmSetting.vmargs, jar,
+ username, executorAkkaConfig)
+ }
+}
+
+private[appmaster] object ExecutorManager {
+ case class StartExecutors(resources: Array[ResourceRequest], jar: AppJar)
+ case class BroadCast(msg: Any)
+
+ case class UniCast(executorId: Int, msg: Any)
+
+ case object GetExecutorInfo
+
+ case class ExecutorStarted(
+ executorId: Int, resource: Resource, workerId: WorkerId, boundedJar: Option[AppJar])
+ case class ExecutorStopped(executorId: Int)
+
+ case class SetTaskManager(taskManager: ActorRef)
+
+ case object StartExecutorsTimeOut
+
+ def props(
+ userConfig: UserConfig, appContext: AppMasterContext, clusterConfig: Config, appName: String)
+ : Props = {
+ val executorFactory =
+ (executorContext: ExecutorContext,
+ userConfig: UserConfig,
+ address: Address,
+ executorId: ExecutorId) =>
+ Props(classOf[Executor], executorContext, userConfig)
+ .withDeploy(Deploy(scope = RemoteScope(address)))
+
+ Props(new ExecutorManager(userConfig, appContext, executorFactory, clusterConfig, appName))
+ }
+
+ case class ExecutorResourceUsageSummary(resources: Map[ExecutorId, Resource])
+
+ case class ExecutorInfo(
+ executorId: ExecutorId, executor: ActorRef, worker: WorkerInfo, boundedJar: Option[AppJar])
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala
new file mode 100644
index 0000000..6de5306
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.appmaster
+
+import akka.actor._
+import akka.pattern.ask
+import com.typesafe.config.Config
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.cluster.AppJar
+import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest}
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.partitioner.PartitionerDescription
+import org.apache.gearpump.streaming.appmaster.JarScheduler._
+import org.apache.gearpump.streaming.task.TaskId
+import org.apache.gearpump.streaming.{DAG, ProcessorDescription}
+import org.apache.gearpump.util.{Constants, Graph, LogUtil}
+
+import scala.concurrent.Future
+
+/**
+ * Different processors of the stream application can use different jars. JarScheduler is the
+ * scheduler for different jars.
+ *
+ * For a DAG of multiple processors, each processor can have its own jar. Tasks of same jar
+ * is scheduled by TaskScheduler, and TaskSchedulers are scheduled by JarScheduler.
+ *
+ * In runtime, the implementation is delegated to actor JarSchedulerImpl
+ */
+class JarScheduler(appId: Int, appName: String, config: Config, factory: ActorRefFactory) {
+ private val actor: ActorRef = factory.actorOf(Props(new JarSchedulerImpl(appId, appName, config)))
+ private implicit val dispatcher = factory.dispatcher
+ private implicit val timeout = Constants.FUTURE_TIMEOUT
+
+ /** Set the current DAG version active */
+ def setDag(dag: DAG, startClock: Future[TimeStamp]): Unit = {
+ actor ! TransitToNewDag
+ startClock.map { start =>
+ actor ! NewDag(dag, start)
+ }
+ }
+
+ /** AppMaster ask JarScheduler about how many resource it wants */
+ def getResourceRequestDetails(): Future[Array[ResourceRequestDetail]] = {
+ (actor ? GetResourceRequestDetails).asInstanceOf[Future[Array[ResourceRequestDetail]]]
+ }
+
+ /**
+ * AppMaster has resource allocated, and ask the jar scheduler to schedule tasks
+ * for this executor.
+ */
+ def scheduleTask(appJar: AppJar, workerId: WorkerId, executorId: Int, resource: Resource)
+ : Future[List[TaskId]] = {
+ (actor ? ScheduleTask(appJar, workerId, executorId, resource))
+ .asInstanceOf[Future[List[TaskId]]]
+ }
+
+ /**
+ * Some executor JVM process is dead. AppMaster asks jar scheduler to re-schedule the impacted
+ * tasks.
+ */
+ def executorFailed(executorId: Int): Future[Option[ResourceRequestDetail]] = {
+ (actor ? ExecutorFailed(executorId)).asInstanceOf[Future[Option[ResourceRequestDetail]]]
+ }
+}
+
+object JarScheduler {
+
+ case class ResourceRequestDetail(jar: AppJar, requests: Array[ResourceRequest])
+
+ case class NewDag(dag: DAG, startTime: TimeStamp)
+
+ case object TransitToNewDag
+
+ case object GetResourceRequestDetails
+
+ /**
+ * Schedule tasks for one appJar.
+ *
+ * @param appJar Application jar.
+ * @param workerId Worker machine Id.
+ * @param executorId Executor Id.
+ * @param resource Slots that are available.
+ */
+ case class ScheduleTask(appJar: AppJar, workerId: WorkerId, executorId: Int, resource: Resource)
+
+ /** Some executor JVM is dead, try to recover tasks that are located on failed executor */
+ case class ExecutorFailed(executorId: Int)
+
+ class JarSchedulerImpl(appId: Int, appName: String, config: Config) extends Actor with Stash {
+
+ // Each TaskScheduler maps to a jar.
+ private var taskSchedulers = Map.empty[AppJar, TaskScheduler]
+
+ private val LOG = LogUtil.getLogger(getClass)
+
+ def receive: Receive = waitForNewDag
+
+ def waitForNewDag: Receive = {
+ case TransitToNewDag => // Continue current state
+ case NewDag(dag, startTime) =>
+
+ LOG.info(s"Init JarScheduler, dag version: ${dag.version}, startTime: $startTime")
+
+ val processors = dag.processors.values.groupBy(_.jar)
+
+ taskSchedulers = processors.map { jarAndProcessors =>
+ val (jar, processors) = jarAndProcessors
+ // Construct the sub DAG, each sub DAG maps to a separate jar.
+ val subGraph = Graph.empty[ProcessorDescription, PartitionerDescription]
+ processors.foreach { processor =>
+ if (startTime < processor.life.death) {
+ subGraph.addVertex(processor)
+ }
+ }
+ val subDagForSingleJar = DAG(subGraph)
+
+ val taskScheduler = taskSchedulers
+ .getOrElse(jar, new TaskSchedulerImpl(appId, appName, config))
+
+ LOG.info(s"Set DAG for TaskScheduler, count: " + subDagForSingleJar.processors.size)
+ taskScheduler.setDAG(subDagForSingleJar)
+ jar -> taskScheduler
+ }
+ unstashAll()
+ context.become(ready)
+ case other =>
+ stash()
+ }
+
+ def ready: Receive = {
+ // Notifies there is a new DAG coming.
+ case TransitToNewDag =>
+ context.become(waitForNewDag)
+
+ case GetResourceRequestDetails =>
+
+ // Asks each TaskScheduler (Each for one jar) the resource requests.
+ val result: Array[ResourceRequestDetail] = taskSchedulers.map { jarAndScheduler =>
+ val (jar, scheduler) = jarAndScheduler
+ ResourceRequestDetail(jar, scheduler.getResourceRequests())
+ }.toArray
+ LOG.info(s"GetRequestDetails " + result.mkString(";"))
+ sender ! result
+
+ case ScheduleTask(appJar, workerId, executorId, resource) =>
+ val result: List[TaskId] = taskSchedulers.get(appJar).map { scheduler =>
+ scheduler.schedule(workerId, executorId, resource)
+ }.getOrElse(List.empty)
+ LOG.info(s"ScheduleTask " + result.mkString(";"))
+ sender ! result
+ case ExecutorFailed(executorId) =>
+ val result: Option[ResourceRequestDetail] = taskSchedulers.
+ find(_._2.scheduledTasks(executorId).nonEmpty).map { jarAndScheduler =>
+ ResourceRequestDetail(jarAndScheduler._1, jarAndScheduler._2.executorFailed(executorId))
+ }
+ LOG.info(s"ExecutorFailed " + result.mkString(";"))
+ sender ! result
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
new file mode 100644
index 0000000..3d43ee7
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.appmaster
+
+import org.apache.gearpump._
+import org.apache.gearpump.cluster.AppMasterToMaster.AppMasterSummary
+import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterStatus
+import org.apache.gearpump.cluster.{MasterToAppMaster, UserConfig}
+import org.apache.gearpump.streaming.appmaster.AppMaster.ExecutorBrief
+import org.apache.gearpump.streaming.{ExecutorId, LifeTime, ProcessorId}
+import org.apache.gearpump.util.Graph
+import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
+
+/** Stream application summary, used for REST API */
+case class StreamAppMasterSummary(
+ appType: String = "streaming",
+ appId: Int,
+ appName: String = null,
+ actorPath: String = null,
+ clock: TimeStamp = 0L,
+ status: AppMasterStatus = MasterToAppMaster.AppMasterActive,
+ startTime: TimeStamp = 0L,
+ uptime: TimeStamp = 0L,
+ user: String = null,
+ homeDirectory: String = "",
+ logFile: String = "",
+ dag: Graph[ProcessorId, String] = null,
+ executors: List[ExecutorBrief] = null,
+ processors: Map[ProcessorId, ProcessorSummary] = Map.empty[ProcessorId, ProcessorSummary],
+ // Hiearachy level for each processor
+ processorLevels: Map[ProcessorId, Int] = Map.empty[ProcessorId, Int],
+ historyMetricsConfig: HistoryMetricsConfig = null)
+ extends AppMasterSummary
+
+case class TaskCount(count: Int)
+
+case class ProcessorSummary(
+ id: ProcessorId,
+ taskClass: String,
+ parallelism: Int,
+ description: String,
+ taskConf: UserConfig,
+ life: LifeTime,
+ executors: List[ExecutorId],
+ taskCount: Map[ExecutorId, TaskCount])
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskLocator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskLocator.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskLocator.scala
new file mode 100644
index 0000000..87630b7
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskLocator.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.appmaster
+
+import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions, ConfigValueFactory}
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.streaming.appmaster.TaskLocator.{Localities, Locality, NonLocality, WorkerLocality}
+import org.apache.gearpump.streaming.task.TaskId
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+/**
+ * TaskLocator is used to decide which machine one task should run on.
+ *
+ * User can config [[org.apache.gearpump.streaming.Constants#GEARPUMP_STREAMING_LOCALITIES]] to
+ * decide to control which machine the task is running on.
+ */
+class TaskLocator(appName: String, config: Config) {
+ private val taskLocalities: Map[TaskId, Locality] = loadTaskLocalities(config)
+
+ /** Finds where a task should belongs to */
+ def locateTask(taskId: TaskId): Locality = {
+ taskLocalities.getOrElse(taskId, NonLocality)
+ }
+
+ private def loadTaskLocalities(config: Config): Map[TaskId, Locality] = {
+ import org.apache.gearpump.streaming.Constants.GEARPUMP_STREAMING_LOCALITIES
+ Try(config.getConfig(s"$GEARPUMP_STREAMING_LOCALITIES.$appName")).map { appConfig =>
+ val json = appConfig.root().render(ConfigRenderOptions.concise)
+ Localities.fromJson(json)
+ }.map { localityConfig =>
+ import localityConfig.localities
+ localities.keySet.flatMap { workerId =>
+ val tasks = localities(workerId)
+ tasks.map((_, WorkerLocality(workerId)))
+ }.toArray.toMap
+ }.getOrElse(Map.empty[TaskId, Locality])
+ }
+}
+
+object TaskLocator {
+
+ trait Locality
+
+ /** Means we require the resource from the specific worker */
+ case class WorkerLocality(workerId: WorkerId) extends Locality
+
+ /** Means no preference on worker */
+ object NonLocality extends Locality
+
+ /** Localities settings. Mapping from workerId to list of taskId */
+ case class Localities(localities: Map[WorkerId, Array[TaskId]])
+
+ object Localities {
+ val pattern = "task_([0-9]+)_([0-9]+)".r
+
+ // To avoid polluting the classpath, we do the JSON translation ourself instead of
+ // introducing JSON library dependencies directly.
+ def fromJson(json: String): Localities = {
+ val localities = ConfigFactory.parseString(json).getAnyRef("localities")
+ .asInstanceOf[java.util.Map[String, String]].asScala.map { pair =>
+ val workerId: WorkerId = WorkerId.parse(pair._1)
+ val tasks = pair._2.split(",").map { task =>
+ val pattern(processorId, taskIndex) = task
+ TaskId(processorId.toInt, taskIndex.toInt)
+ }
+ (workerId, tasks)
+ }.toMap
+ new Localities(localities)
+ }
+
+ def toJson(localities: Localities): String = {
+ val map = localities.localities.toList.map { pair =>
+ (WorkerId.render(pair._1), pair._2.map(task =>
+ s"task_${task.processorId}_${task.index}").mkString(","))
+ }.toMap.asJava
+ ConfigFactory.empty().withValue("localities", ConfigValueFactory.fromAnyRef(map)).
+ root.render(ConfigRenderOptions.concise())
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
new file mode 100644
index 0000000..662418c
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
@@ -0,0 +1,496 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.appmaster
+
+import akka.actor._
+import akka.pattern.ask
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.cluster.MasterToAppMaster.ReplayFromTimestampWindowTrailingEdge
+import org.apache.gearpump.streaming.AppMasterToExecutor._
+import org.apache.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, RegisterTask, UnRegisterTask}
+import org.apache.gearpump.streaming._
+import org.apache.gearpump.streaming.appmaster.AppMaster.{AllocateResourceTimeOut, LookupTaskActorRef, TaskActorRef}
+import org.apache.gearpump.streaming.appmaster.ClockService.{ChangeToNewDAG, ChangeToNewDAGSuccess}
+import org.apache.gearpump.streaming.appmaster.DagManager.{GetLatestDAG, GetTaskLaunchData, LatestDAG, NewDAGDeployed, TaskLaunchData, WatchChange}
+import org.apache.gearpump.streaming.appmaster.ExecutorManager.{ExecutorStarted, StartExecutorsTimeOut, _}
+import org.apache.gearpump.streaming.appmaster.TaskManager._
+import org.apache.gearpump.streaming.appmaster.TaskRegistry.{Accept, TaskLocation}
+import org.apache.gearpump.streaming.executor.Executor.RestartTasks
+import org.apache.gearpump.streaming.executor.ExecutorRestartPolicy
+import org.apache.gearpump.streaming.task._
+import org.apache.gearpump.streaming.util.ActorPathUtil
+import org.apache.gearpump.util.{Constants, LogUtil}
+import org.slf4j.Logger
+
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
+/**
+ *
+ * TaskManager track all tasks's status.
+ *
+ * It is state machine with three states:
+ * 1. applicationReady
+ * 2. recovery
+ * 3. dynamicDag
+ *
+ * When in state applicationReady:
+ * 1. When there is message-loss or JVM crash, transit to state recovery.
+ * 2. When user modify the DAG, transit to dynamicDag.
+ *
+ * When in state recovery:
+ * 1. When all tasks has been recovered, transit to applicationReady.
+ *
+ * When in state dynamicDag:
+ * 1. When dynamic dag transition is complete, transit to applicationReady.
+ * 2. When there is message loss or JVM crash, transit to state recovery.
+ */
+private[appmaster] class TaskManager(
+ appId: Int,
+ dagManager: ActorRef,
+ jarScheduler: JarScheduler,
+ executorManager: ActorRef,
+ clockService: ActorRef,
+ appMaster: ActorRef,
+ appName: String)
+ extends Actor {
+
+ private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
+ private val systemConfig = context.system.settings.config
+
+ private val ids = new SessionIdFactory()
+
+ private val executorRestartPolicy = new ExecutorRestartPolicy(maxNrOfRetries = 5,
+ withinTimeRange = 20.seconds)
+
+ private implicit val timeout = Constants.FUTURE_TIMEOUT
+ private implicit val actorSystem = context.system
+
+ import context.dispatcher
+
+ dagManager ! WatchChange(watcher = self)
+ executorManager ! SetTaskManager(self)
+
+ private def getStartClock: Future[TimeStamp] = {
+ (clockService ? GetStartClock).asInstanceOf[Future[StartClock]].map(_.clock)
+ }
+
+ private var startClock: Future[TimeStamp] = getStartClock
+
+ def receive: Receive = applicationReady(DagReadyState.empty)
+
+ private def onClientQuery(taskRegistry: TaskRegistry): Receive = {
+ case clock: ClockEvent =>
+ clockService forward clock
+ case GetTaskList =>
+ sender ! TaskList(taskRegistry.getTaskExecutorMap)
+ case LookupTaskActorRef(taskId) =>
+ val executorId = taskRegistry.getExecutorId(taskId)
+ val requestor = sender
+ executorId.map { executorId =>
+ val taskPath = ActorPathUtil.taskActorPath(appMaster, executorId, taskId)
+ context.actorSelection(taskPath).resolveOne(3.seconds).map { taskActorRef =>
+ requestor ! TaskActorRef(taskActorRef)
+ }
+ }
+ }
+
+ /**
+ * State applicationReady
+ */
+ def applicationReady(state: DagReadyState): Receive = {
+ executorManager ! state.taskRegistry.usedResource
+ dagManager ! NewDAGDeployed(state.dag.version)
+ dagManager ! GetLatestDAG
+ LOG.info(s"goto state ApplicationReady(dag = ${state.dag.version})...")
+
+ val recoverRegistry = new TaskRegistry(expectedTasks = state.dag.tasks,
+ deadTasks = state.taskRegistry.deadTasks)
+
+ val recoverState = new StartDagState(state.dag, recoverRegistry)
+
+ val onError: Receive = {
+ case executorStopped@ExecutorStopped(executorId) =>
+ if (state.taskRegistry.isTaskRegisteredForExecutor(executorId)) {
+ self ! executorStopped
+ context.become(recovery(recoverState))
+ }
+ case MessageLoss(executorId, taskId, cause) =>
+ if (state.taskRegistry.isTaskRegisteredForExecutor(executorId) &&
+ executorRestartPolicy.allowRestartExecutor(executorId)) {
+ context.become(recovery(recoverState))
+ } else {
+ val errorMsg = s"Task $taskId fails too many times to recover"
+ appMaster ! FailedToRecover(errorMsg)
+ }
+ case replay: ReplayFromTimestampWindowTrailingEdge =>
+ LOG.error(s"Received $replay")
+ context.become(recovery(recoverState))
+ }
+
+ val onNewDag: Receive = {
+ case LatestDAG(newDag) =>
+
+ if (newDag.version > state.dag.version) {
+
+ executorManager ! BroadCast(StartDynamicDag(newDag.version))
+ LOG.info("Broadcasting StartDynamicDag")
+
+ val dagDiff = migrate(state.dag, newDag)
+ jarScheduler.setDag(newDag, startClock)
+ val resourceRequestsDetails = jarScheduler.getResourceRequestDetails()
+ resourceRequestsDetails.map { details =>
+ details.foreach { detail =>
+ if (detail.requests.length > 0 && detail.requests.exists(!_.resource.isEmpty)) {
+ executorManager ! StartExecutors(detail.requests, detail.jar)
+ }
+ }
+ }
+
+ var modifiedTasks = List.empty[TaskId]
+ for (processorId <- dagDiff.modifiedProcessors ++ dagDiff.impactedUpstream) {
+ val executors = state.taskRegistry.processorExecutors(processorId)
+ executors.foreach { pair =>
+ val (executorId, tasks) = pair
+ modifiedTasks ++= tasks
+ dagManager ! GetTaskLaunchData(newDag.version, processorId,
+ ChangeTasksOnExecutor(executorId, tasks))
+ }
+ }
+
+ val taskChangeRegistry = new TaskChangeRegistry(modifiedTasks)
+
+ val deadTasks = state.taskRegistry.deadTasks
+ val registeredTasks = state.taskRegistry.registeredTasks
+ val dynamicTaskRegistry = new TaskRegistry(newDag.tasks, registeredTasks, deadTasks)
+
+ val nextState = new StartDagState(newDag, dynamicTaskRegistry, taskChangeRegistry)
+ context.become(dynamicDag(nextState, recoverState))
+ }
+ }
+
+ val onUnRegisterTask: Receive = {
+ case unRegister: UnRegisterTask =>
+
+ LOG.info(s"Received $unRegister, stop task ${unRegister.taskId}")
+ sender ! StopTask(unRegister.taskId)
+
+ val taskId = unRegister.taskId
+ val registry = state.taskRegistry
+ val deadTasks = registry.deadTasks
+
+ val newRegistry = registry.copy(registeredTasks = registry.registeredTasks - taskId,
+ deadTasks = deadTasks + taskId)
+
+ val newState = new DagReadyState(state.dag, newRegistry)
+ context.become(applicationReady(newState))
+ }
+
+ // Recovers to same version
+ onClientQuery(state.taskRegistry) orElse onError orElse onNewDag orElse
+ onUnRegisterTask orElse unHandled("applicationReady")
+ }
+
+ /**
+ * State dynamicDag
+ */
+ def dynamicDag(state: StartDagState, recoverState: StartDagState): Receive = {
+ LOG.info(s"DynamicDag transit to dag version: ${state.dag.version}...")
+
+ val onMessageLoss: Receive = {
+ case executorStopped@ExecutorStopped(executorId) =>
+ context.become(recovery(recoverState))
+ case MessageLoss(executorId, taskId, cause) =>
+ if (state.taskRegistry.isTaskRegisteredForExecutor(executorId) &&
+ executorRestartPolicy.allowRestartExecutor(executorId)) {
+ context.become(recovery(recoverState))
+ } else {
+ val errorMsg = s"Task $taskId fails too many times to recover"
+ appMaster ! FailedToRecover(errorMsg)
+ }
+ }
+
+ onMessageLoss orElse onClientQuery(state.taskRegistry) orElse
+ startDag(state, recoverState) orElse unHandled("dynamicDag")
+ }
+
+ private def startDag(state: StartDagState, recoverState: StartDagState): Receive = {
+ case executor: ExecutorStarted =>
+ import executor.{boundedJar, executorId, resource, workerId}
+ val taskIdsFuture = jarScheduler.scheduleTask(boundedJar.get, workerId, executorId, resource)
+ taskIdsFuture.foreach { taskIds =>
+ LOG.info(s"Executor $executor has been started, " +
+ s"start to schedule tasks: ${taskIds.mkString(",")}")
+ taskIds.groupBy(_.processorId).foreach { pair =>
+ val (processorId, tasks) = pair
+ dagManager ! GetTaskLaunchData(state.dag.version, processorId,
+ StartTasksOnExecutor(executor.executorId, tasks))
+ }
+ }
+
+ case StartExecutorsTimeOut =>
+ appMaster ! AllocateResourceTimeOut
+ case TaskLaunchData(processorDescription, subscribers, command) =>
+ command match {
+ case StartTasksOnExecutor(executorId, tasks) =>
+ LOG.info(s"Start tasks on Executor($executorId), tasks: " + tasks)
+ val launchTasks = LaunchTasks(tasks, state.dag.version, processorDescription, subscribers)
+ executorManager ! UniCast(executorId, launchTasks)
+ tasks.foreach(executorRestartPolicy.addTaskToExecutor(executorId, _))
+ case ChangeTasksOnExecutor(executorId, tasks) =>
+ LOG.info("change Task on executor: " + executorId + ", tasks: " + tasks)
+ val changeTasks = ChangeTasks(tasks, state.dag.version, processorDescription.life,
+ subscribers)
+ executorManager ! UniCast(executorId, changeTasks)
+ case other =>
+ LOG.error(s"severe error! we expect ExecutorStarted but get ${other.getClass.toString}")
+ }
+ case TasksLaunched =>
+ // We will track all launched task by message RegisterTask
+ case TasksChanged(tasks) =>
+ tasks.foreach(task => state.taskChangeRegistry.taskChanged(task))
+
+ if (allTasksReady(state)) {
+ broadcastLocations(state)
+ }
+
+ case RegisterTask(taskId, executorId, host) =>
+ val client = sender
+ val register = state.taskRegistry
+ val status = register.registerTask(taskId, TaskLocation(executorId, host))
+ if (status == Accept) {
+ LOG.info(s"RegisterTask($taskId) TaskLocation: $host, Executor: $executorId")
+ val sessionId = ids.newSessionId
+
+ startClock.foreach(clock => client ! TaskRegistered(taskId, sessionId, clock))
+ if (allTasksReady(state)) {
+ broadcastLocations(state)
+ }
+ } else {
+ sender ! TaskRejected(taskId)
+ }
+
+ case TaskChanged(taskId, dagVersion) =>
+ state.taskChangeRegistry.taskChanged(taskId)
+ if (allTasksReady(state)) {
+ broadcastLocations(state)
+ }
+ case locationReceived: TaskLocationsReceived =>
+ state.executorReadyRegistry.registerExecutor(locationReceived.executorId)
+ if (allTasksReady(state) &&
+ state.executorReadyRegistry.allRegistered(state.taskRegistry.executors)) {
+ LOG.info("All executors are ready to start...")
+ clockService ! ChangeToNewDAG(state.dag)
+ }
+ case locationRejected: TaskLocationsRejected =>
+ LOG.error(s"received $locationRejected, start to recover")
+ context.become(recovery(recoverState))
+
+ case ChangeToNewDAGSuccess(_) =>
+ if (allTasksReady(state) &&
+ state.executorReadyRegistry.allRegistered(state.taskRegistry.executors)) {
+ executorManager ! BroadCast(StartAllTasks(state.dag.version))
+ context.become(applicationReady(new DagReadyState(state.dag, state.taskRegistry)))
+ }
+ }
+
+ def onExecutorError: Receive = {
+ case ExecutorStopped(executorId) =>
+ if (executorRestartPolicy.allowRestartExecutor(executorId)) {
+ jarScheduler.executorFailed(executorId).foreach { resourceRequestDetail =>
+ if (resourceRequestDetail.isDefined) {
+ executorManager ! StartExecutors(resourceRequestDetail.get.requests,
+ resourceRequestDetail.get.jar)
+ }
+ }
+ } else {
+ val errorMsg = s"Executor restarted too many times to recover"
+ appMaster ! FailedToRecover(errorMsg)
+ }
+ }
+
+ private def allTasksReady(state: StartDagState): Boolean = {
+ import state.{taskChangeRegistry, taskRegistry}
+ taskRegistry.isAllTasksRegistered && taskChangeRegistry.allTaskChanged
+ }
+
+ private def broadcastLocations(state: StartDagState): Unit = {
+ LOG.info(s"All tasks have been launched; send Task locations to all executors")
+ val taskLocations = state.taskRegistry.getTaskLocations
+ executorManager ! BroadCast(TaskLocationsReady(taskLocations, state.dag.version))
+ }
+
+ /**
+ * State recovery
+ */
+ def recovery(state: StartDagState): Receive = {
+ val recoverDagVersion = state.dag.version
+ executorManager ! BroadCast(RestartTasks(recoverDagVersion))
+
+ // Use new Start Clock so that we recover at timepoint we fails.
+ startClock = getStartClock
+
+ jarScheduler.setDag(state.dag, startClock)
+
+ LOG.info(s"goto state Recovery(recoverDag = $recoverDagVersion)...")
+ val ignoreClock: Receive = {
+ case clock: ClockEvent =>
+ // Ignores clock events.
+ }
+
+ if (state.dag.isEmpty) {
+ applicationReady(new DagReadyState(state.dag, state.taskRegistry))
+ } else {
+ val registry = new TaskRegistry(expectedTasks = state.dag.tasks,
+ deadTasks = state.taskRegistry.deadTasks)
+
+ val recoverState = new StartDagState(state.dag, registry)
+ ignoreClock orElse startDag(state, recoverState) orElse onExecutorError orElse
+ unHandled("recovery")
+ }
+ }
+
+ private def unHandled(state: String): Receive = {
+ case other =>
+ LOG.info(s"Received unknown message $other in state $state")
+ }
+}
+
+private[appmaster] object TaskManager {
+
+ /**
+ * When application is ready, then transit to DagReadyState
+ */
+ class DagReadyState(val dag: DAG, val taskRegistry: TaskRegistry)
+
+ object DagReadyState {
+ def empty: DagReadyState = {
+ new DagReadyState(
+ DAG.empty.copy(version = -1),
+ new TaskRegistry(List.empty[TaskId]))
+ }
+ }
+
+ /**
+ * When application is booting up or doing recovery, it use StartDagState
+ */
+ class StartDagState(
+ val dag: DAG,
+ val taskRegistry: TaskRegistry,
+ val taskChangeRegistry: TaskChangeRegistry = new TaskChangeRegistry(List.empty[TaskId]),
+ val executorReadyRegistry: ExecutorRegistry = new ExecutorRegistry)
+
+ case object GetTaskList
+
+ case class TaskList(tasks: Map[TaskId, ExecutorId])
+
+ case class FailedToRecover(errorMsg: String)
+
+ /**
+ * Starts new Tasks on Executor executorId
+ */
+ case class StartTasksOnExecutor(executorId: Int, tasks: List[TaskId])
+
+ /**
+ * Changes existing tasks on executor executorId
+ */
+ case class ChangeTasksOnExecutor(executorId: Int, tasks: List[TaskId])
+
+ /**
+ * Tracks the registration of all new started executors.
+ */
+ class ExecutorRegistry {
+ private var registeredExecutors = Set.empty[ExecutorId]
+
+ def registerExecutor(executorId: ExecutorId): Unit = {
+ registeredExecutors += executorId
+ }
+
+ def allRegistered(all: List[ExecutorId]): Boolean = {
+ all.forall(executor => registeredExecutors.contains(executor))
+ }
+ }
+
+ /**
+ * Tracks the registration of all changed tasks.
+ */
+ class TaskChangeRegistry(targetTasks: List[TaskId]) {
+ private var registeredTasks = Set.empty[TaskId]
+ def taskChanged(taskId: TaskId): Unit = {
+ registeredTasks += taskId
+ }
+ def allTaskChanged: Boolean = {
+ targetTasks.forall(taskId => registeredTasks.contains(taskId))
+ }
+ }
+
+ object TaskChangeRegistry {
+ def empty: TaskChangeRegistry = new TaskChangeRegistry(List.empty[TaskId])
+ }
+
+ /**
+ * DAGDiff is used to track impacted processors when doing dynamic dag.
+ */
+ case class DAGDiff(
+ addedProcessors: List[ProcessorId],
+ modifiedProcessors: List[ProcessorId],
+ impactedUpstream: List[ProcessorId])
+
+ /**
+ * Migrates from old DAG to new DAG, return DAGDiff
+ */
+ def migrate(leftDAG: DAG, rightDAG: DAG): DAGDiff = {
+ val left = leftDAG.processors.keySet
+ val right = rightDAG.processors.keySet
+
+ val added = right -- left
+ val join = right -- added
+
+ val modified = join.filter { processorId =>
+ leftDAG.processors(processorId) != rightDAG.processors(processorId)
+ }
+
+ val upstream = (list: Set[ProcessorId]) => {
+ list.flatMap { processorId =>
+ rightDAG.graph.incomingEdgesOf(processorId).map(_._1).toSet
+ } -- list
+ }
+
+ val impactedUpstream = upstream(added ++ modified)
+
+ // All upstream tasks are affected, and should be handled properly.
+ DAGDiff(added.toList, modified.toList, impactedUpstream.toList)
+ }
+
+ /**
+ * Each new task will be assigned with a unique session Id.
+ */
+ class SessionIdFactory {
+ private var nextSessionId = 1
+
+ /**
+ * Returns a new session Id for new task
+ */
+ final def newSessionId: Int = {
+ val sessionId = nextSessionId
+ nextSessionId += 1
+ sessionId
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskRegistry.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskRegistry.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskRegistry.scala
new file mode 100644
index 0000000..b910d57
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskRegistry.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.appmaster
+
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.streaming.appmaster.ExecutorManager.ExecutorResourceUsageSummary
+import org.apache.gearpump.streaming.appmaster.TaskRegistry._
+import org.apache.gearpump.streaming.task.TaskId
+import org.apache.gearpump.streaming.{ExecutorId, ProcessorId}
+import org.apache.gearpump.transport.HostPort
+import org.apache.gearpump.util.LogUtil
+import org.slf4j.Logger
+
+/**
+ * Tracks the registration of all tasks, when the application is booting up.
+ */
+class TaskRegistry(val expectedTasks: List[TaskId],
+ var registeredTasks: Map[TaskId, TaskLocation] = Map.empty[TaskId, TaskLocation],
+ var deadTasks: Set[TaskId] = Set.empty[TaskId]) {
+
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+
+ private val processors = expectedTasks.map(_.processorId).toSet
+
+ /**
+ * When a task is booted, it need to call registerTask to register itself.
+ * If this task is valid, then accept it, otherwise reject it.
+ *
+ * @param taskId Task that register itself to TaskRegistry.
+ * @param location The host and port where this task is running on. NOTE: The host and port
+ * is NOT the same host and port of Akka remoting. Instead, it is host and port
+ * of custom netty layer, see [[org.apache.gearpump.transport.netty.Context]].
+ */
+ def registerTask(taskId: TaskId, location: TaskLocation): RegisterTaskStatus = {
+ val processorId = taskId.processorId
+
+ if (processors.contains(processorId)) {
+ registeredTasks += taskId -> location
+ Accept
+ } else {
+ LOG.error(s" the task is not accepted for registration, taskId: ${taskId}")
+ Reject
+ }
+ }
+
+ def copy(expectedTasks: List[TaskId] = this.expectedTasks,
+ registeredTasks: Map[TaskId, TaskLocation] = this.registeredTasks,
+ deadTasks: Set[TaskId] = this.deadTasks): TaskRegistry = {
+ new TaskRegistry(expectedTasks, registeredTasks, deadTasks)
+ }
+
+ def getTaskLocations: TaskLocations = {
+ val taskLocations = registeredTasks.toList.groupBy(_._2.host).map { pair =>
+ val (k, v) = pair
+ val taskIds = v.map(_._1)
+ (k, taskIds.toSet)
+ }
+ TaskLocations(taskLocations)
+ }
+
+ def getTaskExecutorMap: Map[TaskId, ExecutorId] = {
+ getTaskLocations.locations.flatMap { pair =>
+ val (hostPort, taskSet) = pair
+ taskSet.map { taskId =>
+ (taskId, getExecutorId(taskId).getOrElse(-1))
+ }
+ }
+ }
+
+ /** Query the executor Id where the task is running on */
+ def getExecutorId(taskId: TaskId): Option[Int] = {
+ registeredTasks.get(taskId).map(_.executorId)
+ }
+
+ /** Gets list of allocated executor Ids */
+ def executors: List[ExecutorId] = {
+ registeredTasks.toList.map(_._2.executorId)
+ }
+
+ def isAllTasksRegistered: Boolean = {
+ val aliveTasks = (expectedTasks.toSet -- deadTasks)
+ aliveTasks.forall(task => registeredTasks.contains(task))
+ }
+
+ def isTaskRegisteredForExecutor(executorId: ExecutorId): Boolean = {
+ registeredTasks.exists(_._2.executorId == executorId)
+ }
+
+ private def filterTasks(processorId: ProcessorId): List[TaskId] = {
+ registeredTasks.keys.toList.filter(_.processorId == processorId)
+ }
+
+ /** List of executors that current processor taks are running on */
+ def processorExecutors(processorId: ProcessorId): Map[ExecutorId, List[TaskId]] = {
+ val taskToExecutor = filterTasks(processorId).flatMap { taskId =>
+ getExecutorId(taskId).map { executorId =>
+ (taskId, executorId)
+ }
+ }
+
+ val executorToTasks = taskToExecutor.groupBy(_._2).map { kv =>
+ val (k, v) = kv
+ (k, v.map(_._1))
+ }
+ executorToTasks
+ }
+
+ /** Summary about how many resources are used for all running tasks */
+ def usedResource: ExecutorResourceUsageSummary = {
+ val resourceMap = registeredTasks.foldLeft(Map.empty[ExecutorId, Resource]) { (map, task) =>
+ val resource = map.getOrElse(task._2.executorId, Resource(0)) + Resource(1)
+ map + (task._2.executorId -> resource)
+ }
+ ExecutorResourceUsageSummary(resourceMap)
+ }
+}
+
+object TaskRegistry {
+ sealed trait RegisterTaskStatus
+ case object Accept extends RegisterTaskStatus
+ case object Reject extends RegisterTaskStatus
+
+ case class TaskLocation(executorId: Int, host: HostPort)
+
+ case class TaskLocations(locations: Map[HostPort, Set[TaskId]])
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerImpl.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerImpl.scala
new file mode 100644
index 0000000..df4490c
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerImpl.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.appmaster
+
+import com.typesafe.config.Config
+import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest}
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.streaming.DAG
+import org.apache.gearpump.streaming.appmaster.TaskLocator.{Locality, WorkerLocality}
+import org.apache.gearpump.streaming.appmaster.TaskScheduler.{Location, TaskStatus}
+import org.apache.gearpump.streaming.task.TaskId
+import org.apache.gearpump.util.Constants
+
+/**
+ * Schedules tasks to run for new allocated resources. TaskScheduler only schedule tasks that
+ * share the same jar. For scheduling for multiple jars, see
+ * [[org.apache.gearpump.streaming.appmaster.JarScheduler]].
+ */
+trait TaskScheduler {
+
+ /**
+ * This notify the scheduler that the task DAG is created.
+ * @param dag task dag
+ */
+ def setDAG(dag: DAG): Unit
+
+ /**
+ * Get the resource requirements for all unscheduled tasks.
+ */
+ def getResourceRequests(): Array[ResourceRequest]
+
+ /**
+ * This notifies the scheduler that a resource slot on {workerId} and {executorId} is allocated
+ * , and expect a task to be scheduled in return.
+ * Task locality should be considered when deciding whether to offer a task on target {worker}
+ * and {executor}
+ *
+ * @param workerId which worker this resource is located.
+ * @param executorId which executorId this resource belongs to.
+ * @return a list of tasks
+ */
+ def schedule(workerId: WorkerId, executorId: Int, resource: Resource): List[TaskId]
+
+ /**
+ * This notifies the scheduler that {executorId} is failed, and expect a set of
+ * ResourceRequest for all failed tasks on that executor.
+ *
+ * @param executorId executor that failed
+ * @return resource requests of the failed executor
+ */
+ def executorFailed(executorId: Int): Array[ResourceRequest]
+
+ /**
+ * Queries the task list that already scheduled on the executor
+ *
+ * @param executorId executor to query
+ * @return a list of tasks
+ */
+ def scheduledTasks(executorId: Int): List[TaskId]
+}
+
+object TaskScheduler {
+ case class Location(workerId: WorkerId, executorId: Int)
+
+ class TaskStatus(val taskId: TaskId, val preferLocality: Locality, var allocation: Location)
+}
+
+class TaskSchedulerImpl(appId: Int, appName: String, config: Config) extends TaskScheduler {
+ private val executorNum = config.getInt(Constants.APPLICATION_EXECUTOR_NUMBER)
+
+ private var tasks = List.empty[TaskStatus]
+
+ // Finds the locality of the tasks
+ private val taskLocator = new TaskLocator(appName, config)
+
+ override def setDAG(dag: DAG): Unit = {
+ val taskMap = tasks.map(_.taskId).zip(tasks).toMap
+
+ tasks = dag.tasks.sortBy(_.index).map { taskId =>
+ val locality = taskLocator.locateTask(taskId)
+ taskMap.getOrElse(taskId, new TaskStatus(taskId, locality, allocation = null))
+ }
+ }
+
+ def getResourceRequests(): Array[ResourceRequest] = {
+ fetchResourceRequests(fromOneWorker = false)
+ }
+
+ import org.apache.gearpump.cluster.scheduler.Relaxation._
+ private def fetchResourceRequests(fromOneWorker: Boolean = false): Array[ResourceRequest] = {
+ var workersResourceRequest = Map.empty[WorkerId, Resource]
+
+ tasks.filter(_.allocation == null).foreach { task =>
+ task.preferLocality match {
+ case WorkerLocality(workerId) =>
+ val current = workersResourceRequest.getOrElse(workerId, Resource.empty)
+ workersResourceRequest += workerId -> (current + Resource(1))
+ case _ =>
+ val workerId = WorkerId.unspecified
+ val current = workersResourceRequest.getOrElse(workerId, Resource.empty)
+ workersResourceRequest += workerId -> (current + Resource(1))
+ }
+ }
+
+ workersResourceRequest.map { workerIdAndResource =>
+ val (workerId, resource) = workerIdAndResource
+ if (workerId == WorkerId.unspecified) {
+ ResourceRequest(resource, workerId = WorkerId.unspecified, executorNum = executorNum)
+ } else {
+ ResourceRequest(resource, workerId, relaxation = SPECIFICWORKER)
+ }
+ }.toArray
+ }
+
+ override def schedule(workerId: WorkerId, executorId: Int, resource: Resource): List[TaskId] = {
+ var scheduledTasks = List.empty[TaskId]
+ val location = Location(workerId, executorId)
+ // Schedules tasks for specific worker
+ scheduledTasks ++= scheduleTasksForLocality(resource, location,
+ (locality) => locality == WorkerLocality(workerId))
+
+ // Schedules tasks without specific location preference
+ scheduledTasks ++= scheduleTasksForLocality(resource - Resource(scheduledTasks.length),
+ location, (locality) => true)
+ scheduledTasks
+ }
+
+ private def scheduleTasksForLocality(
+ resource: Resource, resourceLocation: Location, matcher: (Locality) => Boolean)
+ : List[TaskId] = {
+ var scheduledTasks = List.empty[TaskId]
+ var index = 0
+ var remain = resource.slots
+ while (index < tasks.length && remain > 0) {
+ val taskStatus = tasks(index)
+ if (taskStatus.allocation == null && matcher(taskStatus.preferLocality)) {
+ taskStatus.allocation = resourceLocation
+ scheduledTasks +:= taskStatus.taskId
+ remain -= 1
+ }
+ index += 1
+ }
+ scheduledTasks
+ }
+
+ override def executorFailed(executorId: Int): Array[ResourceRequest] = {
+ val failedTasks = tasks.filter { status =>
+ status.allocation != null && status.allocation.executorId == executorId
+ }
+ // Cleans the location of failed tasks
+ failedTasks.foreach(_.allocation = null)
+
+ Array(ResourceRequest(Resource(failedTasks.length),
+ workerId = WorkerId.unspecified, relaxation = ONEWORKER))
+ }
+
+ override def scheduledTasks(executorId: Int): List[TaskId] = {
+ tasks.filter { status =>
+ status.allocation != null && status.allocation.executorId == executorId
+ }.map(_.taskId)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala
new file mode 100644
index 0000000..5ca92dd
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.dsl.op._
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+import org.apache.gearpump.util.Graph
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.language.implicitConversions
+
+class Stream[T](
+ private val graph: Graph[Op, OpEdge], private val thisNode: Op,
+ private val edge: Option[OpEdge] = None) {
+
+ /**
+ * converts a value[T] to a list of value[R]
+ *
+ * @param fun FlatMap function
+ * @param description The description message for this operation
+ * @return A new stream with type [R]
+ */
+ def flatMap[R](fun: T => TraversableOnce[R], description: String = null): Stream[R] = {
+ val flatMapOp = FlatMapOp(fun, Option(description).getOrElse("flatmap"))
+ graph.addVertex(flatMapOp)
+ graph.addEdge(thisNode, edge.getOrElse(Direct), flatMapOp)
+ new Stream[R](graph, flatMapOp)
+ }
+
+ /**
+ * Maps message of type T message of type R
+ *
+ * @param fun Function
+ * @return A new stream with type [R]
+ */
+ def map[R](fun: T => R, description: String = null): Stream[R] = {
+ this.flatMap({ data =>
+ Option(fun(data))
+ }, Option(description).getOrElse("map"))
+ }
+
+ /**
+ * Keeps records when fun(T) == true
+ *
+ * @param fun the filter
+ * @return a new stream after filter
+ */
+ def filter(fun: T => Boolean, description: String = null): Stream[T] = {
+ this.flatMap({ data =>
+ if (fun(data)) Option(data) else None
+ }, Option(description).getOrElse("filter"))
+ }
+
+ /**
+ * Reduces operations.
+ *
+ * @param fun reduction function
+ * @param description description message for this operator
+ * @return a new stream after reduction
+ */
+ def reduce(fun: (T, T) => T, description: String = null): Stream[T] = {
+ val reduceOp = ReduceOp(fun, Option(description).getOrElse("reduce"))
+ graph.addVertex(reduceOp)
+ graph.addEdge(thisNode, edge.getOrElse(Direct), reduceOp)
+ new Stream(graph, reduceOp)
+ }
+
+ /**
+ * Log to task log file
+ */
+ def log(): Unit = {
+ this.map(msg => LoggerFactory.getLogger("dsl").info(msg.toString), "log")
+ }
+
+ /**
+ * Merges data from two stream into one
+ *
+ * @param other the other stream
+ * @return the merged stream
+ */
+ def merge(other: Stream[T], description: String = null): Stream[T] = {
+ val mergeOp = MergeOp(Option(description).getOrElse("merge"))
+ graph.addVertex(mergeOp)
+ graph.addEdge(thisNode, edge.getOrElse(Direct), mergeOp)
+ graph.addEdge(other.thisNode, other.edge.getOrElse(Shuffle), mergeOp)
+ new Stream[T](graph, mergeOp)
+ }
+
+ /**
+ * Group by function (T => Group)
+ *
+ * For example, we have T type, People(name: String, gender: String, age: Int)
+ * groupBy[People](_.gender) will group the people by gender.
+ *
+ * You can append other combinators after groupBy
+ *
+ * For example,
+ * {{{
+ * Stream[People].groupBy(_.gender).flatmap(..).filter.(..).reduce(..)
+ * }}}
+ *
+ * @param fun Group by function
+ * @param parallelism Parallelism level
+ * @param description The description
+ * @return the grouped stream
+ */
+ def groupBy[Group](fun: T => Group, parallelism: Int = 1, description: String = null)
+ : Stream[T] = {
+ val groupOp = GroupByOp(fun, parallelism, Option(description).getOrElse("groupBy"))
+ graph.addVertex(groupOp)
+ graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp)
+ new Stream[T](graph, groupOp)
+ }
+
+ /**
+ * Connects with a low level Processor(TaskDescription)
+ *
+ * @param processor a user defined processor
+ * @param parallelism parallelism level
+ * @return new stream after processing with type [R]
+ */
+ def process[R](
+ processor: Class[_ <: Task], parallelism: Int, conf: UserConfig = UserConfig.empty,
+ description: String = null): Stream[R] = {
+ val processorOp = ProcessorOp(processor, parallelism, conf,
+ Option(description).getOrElse("process"))
+ graph.addVertex(processorOp)
+ graph.addEdge(thisNode, edge.getOrElse(Shuffle), processorOp)
+ new Stream[R](graph, processorOp, Some(Shuffle))
+ }
+}
+
+class KVStream[K, V](stream: Stream[Tuple2[K, V]]) {
+ /**
+ * GroupBy key
+ *
+ * Applies to Stream[Tuple2[K,V]]
+ *
+ * @param parallelism the parallelism for this operation
+ * @return the new KV stream
+ */
+ def groupByKey(parallelism: Int = 1): Stream[Tuple2[K, V]] = {
+ stream.groupBy(Stream.getTupleKey[K, V], parallelism, "groupByKey")
+ }
+
+ /**
+ * Sum the value of the tuples
+ *
+ * Apply to Stream[Tuple2[K,V]], V must be of type Number
+ *
+ * For input (key, value1), (key, value2), will generate (key, value1 + value2)
+ * @param numeric the numeric operations
+ * @return the sum stream
+ */
+ def sum(implicit numeric: Numeric[V]): Stream[(K, V)] = {
+ stream.reduce(Stream.sumByValue[K, V](numeric), "sum")
+ }
+}
+
+object Stream {
+
+ def apply[T](graph: Graph[Op, OpEdge], node: Op, edge: Option[OpEdge]): Stream[T] = {
+ new Stream[T](graph, node, edge)
+ }
+
+ def getTupleKey[K, V](tuple: Tuple2[K, V]): K = tuple._1
+
+ def sumByValue[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, V]) => Tuple2[K, V]
+ = (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2))
+
+ implicit def streamToKVStream[K, V](stream: Stream[Tuple2[K, V]]): KVStream[K, V] = {
+ new KVStream(stream)
+ }
+
+ implicit class Sink[T](stream: Stream[T]) extends java.io.Serializable {
+ def sink[T](dataSink: DataSink, parallism: Int, conf: UserConfig, description: String)
+ : Stream[T] = {
+ implicit val sink = DataSinkOp[T](dataSink, parallism, conf,
+ Some(description).getOrElse("traversable"))
+ stream.graph.addVertex(sink)
+ stream.graph.addEdge(stream.thisNode, Shuffle, sink)
+ new Stream[T](stream.graph, sink)
+ }
+
+ def sink[T](
+ sink: Class[_ <: Task], parallism: Int, conf: UserConfig = UserConfig.empty,
+ description: String = null): Stream[T] = {
+ val sinkOp = ProcessorOp(sink, parallism, conf, Option(description).getOrElse("source"))
+ stream.graph.addVertex(sinkOp)
+ stream.graph.addEdge(stream.thisNode, Shuffle, sinkOp)
+ new Stream[T](stream.graph, sinkOp)
+ }
+ }
+}
+
+class LoggerSink[T] extends DataSink {
+ var logger: Logger = null
+
+ private var context: TaskContext = null
+
+ override def open(context: TaskContext): Unit = {
+ this.logger = context.logger
+ }
+
+ override def write(message: Message): Unit = {
+ logger.info("logging message " + message.msg)
+ }
+
+ override def close(): Unit = Unit
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
new file mode 100644
index 0000000..5027500
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.streaming.StreamApplication
+import org.apache.gearpump.streaming.dsl.op.{DataSourceOp, Op, OpEdge, ProcessorOp}
+import org.apache.gearpump.streaming.dsl.plan.Planner
+import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+import org.apache.gearpump.util.Graph
+import org.apache.gearpump.{Message, TimeStamp}
+
+import scala.language.implicitConversions
+
+/**
+ * Example:
+ * {{{
+ * val data = "This is a good start, bingo!! bingo!!"
+ * app.fromCollection(data.lines.toList).
+ * // word => (word, count)
+ * flatMap(line => line.split("[\\s]+")).map((_, 1)).
+ * // (word, count1), (word, count2) => (word, count1 + count2)
+ * groupBy(kv => kv._1).reduce(sum(_, _))
+ *
+ * val appId = context.submit(app)
+ * context.close()
+ * }}}
+ *
+ * @param name name of app
+ */
+class StreamApp(
+ val name: String, system: ActorSystem, userConfig: UserConfig, val graph: Graph[Op, OpEdge]) {
+
+ def this(name: String, system: ActorSystem, userConfig: UserConfig) = {
+ this(name, system, userConfig, Graph.empty[Op, OpEdge])
+ }
+
+ def plan(): StreamApplication = {
+ implicit val actorSystem = system
+ val planner = new Planner
+ val dag = planner.plan(graph)
+ StreamApplication(name, dag, userConfig)
+ }
+}
+
+object StreamApp {
+ def apply(name: String, context: ClientContext, userConfig: UserConfig = UserConfig.empty)
+ : StreamApp = {
+ new StreamApp(name, context.system, userConfig)
+ }
+
+ implicit def streamAppToApplication(streamApp: StreamApp): StreamApplication = {
+ streamApp.plan
+ }
+
+ implicit class Source(app: StreamApp) extends java.io.Serializable {
+
+ def source[T](dataSource: DataSource, parallism: Int): Stream[T] = {
+ source(dataSource, parallism, UserConfig.empty)
+ }
+
+ def source[T](dataSource: DataSource, parallism: Int, description: String): Stream[T] = {
+ source(dataSource, parallism, UserConfig.empty, description)
+ }
+
+ def source[T](dataSource: DataSource, parallism: Int, conf: UserConfig): Stream[T] = {
+ source(dataSource, parallism, conf, description = null)
+ }
+
+ def source[T](dataSource: DataSource, parallism: Int, conf: UserConfig, description: String)
+ : Stream[T] = {
+ implicit val sourceOp = DataSourceOp(dataSource, parallism, conf, description)
+ app.graph.addVertex(sourceOp)
+ new Stream[T](app.graph, sourceOp)
+ }
+ def source[T](seq: Seq[T], parallism: Int, description: String): Stream[T] = {
+ this.source(new CollectionDataSource[T](seq), parallism, UserConfig.empty, description)
+ }
+
+ def source[T](source: Class[_ <: Task], parallism: Int, conf: UserConfig, description: String)
+ : Stream[T] = {
+ val sourceOp = ProcessorOp(source, parallism, conf, Option(description).getOrElse("source"))
+ app.graph.addVertex(sourceOp)
+ new Stream[T](app.graph, sourceOp)
+ }
+ }
+}
+
+/** A test message source which generated message sequence repeatedly. */
+class CollectionDataSource[T](seq: Seq[T]) extends DataSource {
+ private lazy val iterator: Iterator[T] = seq.iterator
+
+ override def read(): Message = {
+ if (iterator.hasNext) {
+ Message(iterator.next())
+ } else {
+ null
+ }
+ }
+
+ override def close(): Unit = {}
+
+ override def open(context: TaskContext, startTime: TimeStamp): Unit = {}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
new file mode 100644
index 0000000..6eff20c
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.javaapi
+
+import scala.collection.JavaConverters._
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.dsl.Stream
+import org.apache.gearpump.streaming.javaapi.dsl.functions._
+import org.apache.gearpump.streaming.task.Task
+
+/**
+ * Java DSL
+ */
+class JavaStream[T](val stream: Stream[T]) {
+
+ /** FlatMap on stream */
+ def flatMap[R](fn: FlatMapFunction[T, R], description: String): JavaStream[R] = {
+ new JavaStream[R](stream.flatMap({ t: T => fn(t).asScala }, description))
+ }
+
+ /** Map on stream */
+ def map[R](fn: MapFunction[T, R], description: String): JavaStream[R] = {
+ new JavaStream[R](stream.map({ t: T => fn(t) }, description))
+ }
+
+ /** Only keep the messages that FilterFunction returns true. */
+ def filter(fn: FilterFunction[T], description: String): JavaStream[T] = {
+ new JavaStream[T](stream.filter({ t: T => fn(t) }, description))
+ }
+
+ /** Does aggregation on the stream */
+ def reduce(fn: ReduceFunction[T], description: String): JavaStream[T] = {
+ new JavaStream[T](stream.reduce({ (t1: T, t2: T) => fn(t1, t2) }, description))
+ }
+
+ def log(): Unit = {
+ stream.log()
+ }
+
+ /** Merges streams of same type together */
+ def merge(other: JavaStream[T], description: String): JavaStream[T] = {
+ new JavaStream[T](stream.merge(other.stream, description))
+ }
+
+ /**
+ * Group by a stream and turns it to a list of sub-streams. Operations chained after
+ * groupBy applies to sub-streams.
+ */
+ def groupBy[Group](fn: GroupByFunction[T, Group], parallelism: Int, description: String)
+ : JavaStream[T] = {
+ new JavaStream[T](stream.groupBy({t: T => fn(t)}, parallelism, description))
+ }
+
+ /** Add a low level Processor to process messages */
+ def process[R](
+ processor: Class[_ <: Task], parallelism: Int, conf: UserConfig, description: String)
+ : JavaStream[R] = {
+ new JavaStream[R](stream.process(processor, parallelism, conf, description))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
new file mode 100644
index 0000000..0d841be
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.javaapi
+
+import java.util.Collection
+import scala.collection.JavaConverters._
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.streaming.dsl.{CollectionDataSource, StreamApp}
+import org.apache.gearpump.streaming.source.DataSource
+
+class JavaStreamApp(name: String, context: ClientContext, userConfig: UserConfig) {
+
+ private val streamApp = StreamApp(name, context, userConfig)
+
+ def source[T](collection: Collection[T], parallelism: Int,
+ conf: UserConfig, description: String): JavaStream[T] = {
+ val dataSource = new CollectionDataSource(collection.asScala.toSeq)
+ source(dataSource, parallelism, conf, description)
+ }
+
+ def source[T](dataSource: DataSource, parallelism: Int,
+ conf: UserConfig, description: String): JavaStream[T] = {
+ new JavaStream[T](streamApp.source(dataSource, parallelism, conf, description))
+ }
+
+ def run(): Unit = {
+ context.submit(streamApp)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala
new file mode 100644
index 0000000..49d9dec
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.op
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.task.Task
+
+/**
+ * Operators for the DSL
+ */
+sealed trait Op {
+ def description: String
+ def conf: UserConfig
+}
+
+/**
+ * When translated to running DAG, SlaveOP can be attach to MasterOP or other SlaveOP
+ * "Attach" means running in same Actor.
+ */
+trait SlaveOp[T] extends Op
+
+case class FlatMapOp[T, R](
+ fun: (T) => TraversableOnce[R], description: String, conf: UserConfig = UserConfig.empty)
+ extends SlaveOp[T]
+
+case class ReduceOp[T](fun: (T, T) => T, description: String, conf: UserConfig = UserConfig.empty)
+ extends SlaveOp[T]
+
+trait MasterOp extends Op
+
+trait ParameterizedOp[T] extends MasterOp
+
+case class MergeOp(description: String, override val conf: UserConfig = UserConfig.empty)
+ extends MasterOp
+
+case class GroupByOp[T, R](
+ fun: T => R, parallelism: Int, description: String,
+ override val conf: UserConfig = UserConfig.empty)
+ extends ParameterizedOp[T]
+
+case class ProcessorOp[T <: Task](
+ processor: Class[T], parallelism: Int, conf: UserConfig, description: String)
+ extends ParameterizedOp[T]
+
+case class DataSourceOp[T](
+ dataSource: DataSource, parallelism: Int, conf: UserConfig, description: String)
+ extends ParameterizedOp[T]
+
+case class DataSinkOp[T](
+ dataSink: DataSink, parallelism: Int, conf: UserConfig, description: String)
+ extends ParameterizedOp[T]
+
+/**
+ * Contains operators which can be chained to single one.
+ *
+ * For example, flatmap().map().reduce() can be chained to single operator as
+ * no data shuffling is required.
+ * @param ops list of operations
+ */
+case class OpChain(ops: List[Op]) extends Op {
+ def head: Op = ops.head
+ def last: Op = ops.last
+
+ def description: String = null
+
+ override def conf: UserConfig = {
+ // The head's conf has priority
+ ops.reverse.foldLeft(UserConfig.empty) { (conf, op) =>
+ conf.withConfig(op.conf)
+ }
+ }
+}
+
+trait OpEdge
+
+/**
+ * The upstream OP and downstream OP doesn't require network data shuffle.
+ *
+ * For example, map, flatmap operation doesn't require network shuffle, we can use Direct
+ * to represent the relation with upstream operators.
+ */
+case object Direct extends OpEdge
+
+/**
+ * The upstream OP and downstream OP DOES require network data shuffle.
+ *
+ * For example, map, flatmap operation doesn't require network shuffle, we can use Direct
+ * to represent the relation with upstream operators.
+ */
+case object Shuffle extends OpEdge
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala
new file mode 100644
index 0000000..b2e2932
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.partitioner
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.partitioner.UnicastPartitioner
+
+/**
+ * Partition messages by applying group by function first.
+ *
+ * For example:
+ * {{{
+ * case class People(name: String, gender: String)
+ *
+ * object Test{
+ *
+ * val groupBy: (People => String) = people => people.gender
+ * val partitioner = GroupByPartitioner(groupBy)
+ * }
+ * }}}
+ *
+ * @param groupBy First apply message with groupBy function, then pick the hashCode of the output
+ * to do the partitioning. You must define hashCode() for output type of groupBy function.
+ */
+class GroupByPartitioner[T, GROUP](groupBy: T => GROUP = null) extends UnicastPartitioner {
+ override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
+ val hashCode = groupBy(msg.msg.asInstanceOf[T]).hashCode()
+ (hashCode & Integer.MAX_VALUE) % partitionNum
+ }
+}
\ No newline at end of file