You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:54 UTC
[44/49] incubator-gearpump git commit: fix GEARPUMP-118 change
package name to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/UserConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/UserConfig.scala b/core/src/main/scala/org/apache/gearpump/cluster/UserConfig.scala
new file mode 100644
index 0000000..28a4907
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/UserConfig.scala
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster
+
+import akka.actor.{ActorSystem, ExtendedActorSystem}
+import akka.serialization.JavaSerializer
+
+import io.gearpump.google.common.io.BaseEncoding
+
+/**
+ * Immutable configuration
+ */
+final class UserConfig(private val _config: Map[String, String]) extends Serializable {
+
+ def withBoolean(key: String, value: Boolean): UserConfig = {
+ new UserConfig(_config + (key -> value.toString))
+ }
+
+ def withDouble(key: String, value: Double): UserConfig = {
+ new UserConfig(_config + (key -> value.toString))
+ }
+
+ def withFloat(key: String, value: Float): UserConfig = {
+ new UserConfig(_config + (key -> value.toString))
+ }
+
+ def withInt(key: String, value: Int): UserConfig = {
+ new UserConfig(_config + (key -> value.toString))
+ }
+
+ def withLong(key: String, value: Long): UserConfig = {
+ new UserConfig(_config + (key -> value.toString))
+ }
+
+ def withString(key: String, value: String): UserConfig = {
+ if (null == value) {
+ this
+ } else {
+ new UserConfig(_config + (key -> value))
+ }
+ }
+
+ def without(key: String): UserConfig = {
+ val config = _config - key
+ new UserConfig(config)
+ }
+
+ def filter(p: ((String, String)) => Boolean): UserConfig = {
+ val updated = _config.filter(p)
+ new UserConfig(updated)
+ }
+
+ def getBoolean(key: String): Option[Boolean] = {
+ _config.get(key).map(_.toBoolean)
+ }
+
+ def getDouble(key: String): Option[Double] = {
+ _config.get(key).map(_.toDouble)
+ }
+
+ def getFloat(key: String): Option[Float] = {
+ _config.get(key).map(_.toFloat)
+ }
+
+ def getInt(key: String): Option[Int] = {
+ _config.get(key).map(_.toInt)
+ }
+
+ def getLong(key: String): Option[Long] = {
+ _config.get(key).map(_.toLong)
+ }
+
+ def getString(key: String): Option[String] = {
+ _config.get(key)
+ }
+
+ def getBytes(key: String): Option[Array[Byte]] = {
+ _config.get(key).map(BaseEncoding.base64().decode(_))
+ }
+
+ def withBytes(key: String, value: Array[Byte]): UserConfig = {
+ if (null == value) {
+ this
+ } else {
+ this.withString(key, BaseEncoding.base64().encode(value))
+ }
+ }
+
+ // scalastyle:off line.size.limit
+ /**
+ * This de-serializes value to object instance
+ *
+ * To do de-serialization, this requires an implicit ActorSystem, as
+ * the ActorRef and possibly other akka classes deserialization
+ * requires an implicit ActorSystem.
+ *
+ * See Link:
+ * http://doc.akka.io/docs/akka/snapshot/scala/serialization.html#A_Word_About_Java_Serialization
+ */
+
+ def getValue[T](key: String)(implicit system: ActorSystem): Option[T] = {
+ val serializer = new JavaSerializer(system.asInstanceOf[ExtendedActorSystem])
+ _config.get(key).map(BaseEncoding.base64().decode(_))
+ .map(serializer.fromBinary(_).asInstanceOf[T])
+ }
+
+ /**
+ * This serializes the object and store it as string.
+ *
+ * To do serialization, this requires an implicit ActorSystem, as
+ * the ActorRef and possibly other akka classes serialization
+ * requires an implicit ActorSystem.
+ *
+ * See Link:
+ * http://doc.akka.io/docs/akka/snapshot/scala/serialization.html#A_Word_About_Java_Serialization
+ */
+ def withValue[T <: AnyRef](key: String, value: T)(implicit system: ActorSystem): UserConfig = {
+
+ if (null == value) {
+ this
+ } else {
+ val serializer = new JavaSerializer(system.asInstanceOf[ExtendedActorSystem])
+ val bytes = serializer.toBinary(value)
+ val encoded = BaseEncoding.base64().encode(bytes)
+ this.withString(key, encoded)
+ }
+ }
+ // scalastyle:on line.size.limit
+
+ def withConfig(other: UserConfig): UserConfig = {
+ if (null == other) {
+ this
+ } else {
+ new UserConfig(_config ++ other._config)
+ }
+ }
+}
+
+object UserConfig {
+
+ def empty: UserConfig = new UserConfig(Map.empty[String, String])
+
+ def apply(config: Map[String, String]): UserConfig = new UserConfig(config)
+
+ def unapply(config: UserConfig): Option[Map[String, String]] = Option(config._config)
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala
new file mode 100644
index 0000000..170e56a
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.appmaster
+
+import akka.actor._
+
+import org.apache.gearpump.cluster.AppMasterToMaster.RegisterAppMaster
+import org.apache.gearpump.cluster.appmaster.AppMasterRuntimeEnvironment._
+import org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler.{Session, StartExecutorSystems}
+import org.apache.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus._
+import org.apache.gearpump.cluster.master.MasterProxy
+import org.apache.gearpump.cluster.{AppDescription, AppMasterContext}
+import org.apache.gearpump.util.LogUtil
+
+/**
+ * This serves as runtime environment for AppMaster.
+ * When starting an AppMaster, we need to setup the connection to master,
+ * and prepare other environments.
+ *
+ * This also extend the function of Master, by providing a scheduler service for Executor System.
+ * AppMaster can ask Master for executor system directly. details like requesting resource,
+ * contacting worker to start a process, and then starting an executor system is hidden from
+ * AppMaster.
+ *
+ * Please use AppMasterRuntimeEnvironment.props() to construct this actor.
+ */
+private[appmaster]
+class AppMasterRuntimeEnvironment(
+ appContextInput: AppMasterContext,
+ app: AppDescription,
+ masters: Iterable[ActorPath],
+ masterFactory: (AppId, MasterActorRef) => Props,
+ appMasterFactory: (AppMasterContext, AppDescription) => Props,
+ masterConnectionKeeperFactory: (MasterActorRef, RegisterAppMaster, ListenerActorRef) => Props)
+ extends Actor {
+
+ val appId = appContextInput.appId
+ private val LOG = LogUtil.getLogger(getClass, app = appId)
+
+ import scala.concurrent.duration._
+
+ private val master = context.actorOf(
+ masterFactory(appId, context.actorOf(Props(new MasterProxy(masters, 30.seconds)))))
+ private val appContext = appContextInput.copy(masterProxy = master)
+
+ // Create appMaster proxy to receive command and forward to appmaster
+ private val appMaster = context.actorOf(appMasterFactory(appContext, app))
+ context.watch(appMaster)
+
+ private val registerAppMaster = RegisterAppMaster(appMaster, appContext.registerData)
+ private val masterConnectionKeeper = context.actorOf(
+ masterConnectionKeeperFactory(master, registerAppMaster, self))
+ context.watch(masterConnectionKeeper)
+
+ def receive: Receive = {
+ case MasterConnected =>
+ LOG.info(s"Master is connected, start AppMaster ${appId}...")
+ appMaster ! StartAppMaster
+ case MasterStopped =>
+ LOG.error(s"Master is stopped, stop AppMaster ${appId}...")
+ context.stop(self)
+ case Terminated(actor) => actor match {
+ case `appMaster` =>
+ LOG.error(s"AppMaster ${appId} is stopped, shutdown myself")
+ context.stop(self)
+ case `masterConnectionKeeper` =>
+ LOG.error(s"Master connection keeper is stopped, appId: ${appId}, shutdown myself")
+ context.stop(self)
+ case _ => // Skip
+ }
+ }
+}
+
+object AppMasterRuntimeEnvironment {
+
+ def props(
+ masters: Iterable[ActorPath], app: AppDescription, appContextInput: AppMasterContext)
+ : Props = {
+
+ val master = (appId: AppId, masterProxy: MasterActorRef) =>
+ MasterWithExecutorSystemProvider.props(appId, masterProxy)
+
+ val appMaster = (appContext: AppMasterContext, app: AppDescription) =>
+ LazyStartAppMaster.props(appContext, app)
+
+ val masterConnectionKeeper = (master: MasterActorRef, registerAppMaster:
+ RegisterAppMaster, listener: ListenerActorRef) => Props(new MasterConnectionKeeper(
+ registerAppMaster, master, masterStatusListener = listener))
+
+ Props(new AppMasterRuntimeEnvironment(
+ appContextInput, app, masters, master, appMaster, masterConnectionKeeper))
+ }
+
+ /**
+ * This behavior like a AppMaster. Under the hood, It start start the real AppMaster in a lazy
+ * way. When real AppMaster is not started yet, all messages are stashed. The stashed
+ * messages are forwarded to real AppMaster when the real AppMaster is started.
+ *
+ * Please use LazyStartAppMaster.props to construct this actor
+ *
+ * @param appMasterProps underlying AppMaster Props
+ */
+ private[appmaster]
+ class LazyStartAppMaster(appId: Int, appMasterProps: Props) extends Actor with Stash {
+
+ private val LOG = LogUtil.getLogger(getClass, app = appId)
+
+ def receive: Receive = null
+
+ context.become(startAppMaster)
+
+ def startAppMaster: Receive = {
+ case StartAppMaster =>
+ val appMaster = context.actorOf(appMasterProps, "appmaster")
+ context.watch(appMaster)
+ context.become(terminationWatch(appMaster) orElse appMasterService(appMaster))
+ unstashAll()
+ case _ =>
+ stash()
+ }
+
+ def terminationWatch(appMaster: ActorRef): Receive = {
+ case Terminated(appMaster) =>
+ LOG.error("appmaster is stopped")
+ context.stop(self)
+ }
+
+ def appMasterService(appMaster: ActorRef): Receive = {
+ case msg => appMaster forward msg
+ }
+ }
+
+ private[appmaster]
+ object LazyStartAppMaster {
+ def props(appContext: AppMasterContext, app: AppDescription): Props = {
+ val appMasterProps = Props(Class.forName(app.appMaster), appContext, app)
+ Props(new LazyStartAppMaster(appContext.appId, appMasterProps))
+ }
+ }
+
+ private[appmaster] case object StartAppMaster
+
+ /**
+ * This enhance Master by providing new service: StartExecutorSystems
+ *
+ * Please use MasterWithExecutorSystemProvider.props to construct this actor
+ *
+ */
+ private[appmaster]
+ class MasterWithExecutorSystemProvider(master: ActorRef, executorSystemProviderProps: Props)
+ extends Actor {
+
+ val executorSystemProvider: ActorRef = context.actorOf(executorSystemProviderProps)
+
+ override def receive: Receive = {
+ case request: StartExecutorSystems =>
+ executorSystemProvider forward request
+ case msg =>
+ master forward msg
+ }
+ }
+
+ private[appmaster]
+ object MasterWithExecutorSystemProvider {
+ def props(appId: Int, master: ActorRef): Props = {
+
+ val executorSystemLauncher = (appId: Int, session: Session) =>
+ Props(new ExecutorSystemLauncher(appId, session))
+
+ val scheduler = Props(new ExecutorSystemScheduler(appId, master, executorSystemLauncher))
+
+ Props(new MasterWithExecutorSystemProvider(master, scheduler))
+ }
+ }
+
+ private[appmaster] type AppId = Int
+ private[appmaster] type MasterActorRef = ActorRef
+ private[appmaster] type ListenerActorRef = ActorRef
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala
new file mode 100644
index 0000000..b3ec88c
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.appmaster
+
+import akka.actor.ActorRef
+import com.typesafe.config.Config
+
+import org.apache.gearpump._
+import org.apache.gearpump.cluster.AppMasterRegisterData
+
+/** Run time info used to start an AppMaster */
+case class AppMasterRuntimeInfo(
+ appId: Int,
+ // AppName is the unique Id for an application
+ appName: String,
+ worker: ActorRef = null,
+ user: String = null,
+ submissionTime: TimeStamp = 0,
+ startTime: TimeStamp = 0,
+ finishTime: TimeStamp = 0,
+ config: Config = null)
+ extends AppMasterRegisterData
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationState.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationState.scala b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationState.scala
new file mode 100644
index 0000000..7240113
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationState.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.appmaster
+
+import org.apache.gearpump.cluster.{AppDescription, AppJar}
+
+/**
+ * This state for single application, it is be distributed across the masters.
+ */
+case class ApplicationState(
+ appId: Int, appName: String, attemptId: Int, app: AppDescription, jar: Option[AppJar],
+ username: String, state: Any) extends Serializable {
+
+ override def equals(other: Any): Boolean = {
+ other match {
+ case that: ApplicationState =>
+ if (appId == that.appId && attemptId == that.attemptId) {
+ true
+ } else {
+ false
+ }
+ case _ =>
+ false
+ }
+ }
+
+ override def hashCode: Int = {
+ import akka.routing.MurmurHash._
+ extendHash(appId, attemptId, startMagicA, startMagicB)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystem.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystem.scala b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystem.scala
new file mode 100644
index 0000000..5c44652
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystem.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.appmaster
+
+import akka.actor.{ActorRef, Address, PoisonPill}
+
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.util.ActorSystemBooter.BindLifeCycle
+
+case class WorkerInfo(workerId: WorkerId, ref: ActorRef)
+
+/**
+ * Configurations to start an executor system on remote machine
+ *
+ * @param address Remote address where we start an Actor System.
+ */
+case class ExecutorSystem(executorSystemId: Int, address: Address, daemon:
+ ActorRef, resource: Resource, worker: WorkerInfo) {
+ def bindLifeCycleWith(actor: ActorRef): Unit = {
+ daemon ! BindLifeCycle(actor)
+ }
+
+ def shutdown(): Unit = {
+ daemon ! PoisonPill
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala
new file mode 100644
index 0000000..0c5dbff
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.appmaster
+
+import scala.concurrent.duration._
+
+import akka.actor._
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.AppMasterToWorker.LaunchExecutor
+import org.apache.gearpump.cluster.ExecutorJVMConfig
+import org.apache.gearpump.cluster.WorkerToAppMaster._
+import org.apache.gearpump.cluster.appmaster.ExecutorSystemLauncher._
+import org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, Session}
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.util.ActorSystemBooter.{ActorSystemRegistered, RegisterActorSystem}
+import org.apache.gearpump.util.{ActorSystemBooter, ActorUtil, Constants, LogUtil}
+
+/**
+ * This launches single executor system on target worker.
+ *
+ * Please use ExecutorSystemLauncher.props() to construct this actor
+ *
+ * @param session The session that request to launch executor system
+ */
+private[appmaster]
+class ExecutorSystemLauncher(appId: Int, session: Session) extends Actor {
+
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+
+ val scheduler = context.system.scheduler
+ implicit val executionContext = context.dispatcher
+
+ private val systemConfig = context.system.settings.config
+ val timeoutSetting = systemConfig.getInt(Constants.GEARPUMP_START_EXECUTOR_SYSTEM_TIMEOUT_MS)
+
+ val timeout = scheduler.scheduleOnce(timeoutSetting.milliseconds,
+ self, LaunchExecutorSystemTimeout(session))
+
+ def receive: Receive = waitForLaunchCommand
+
+ def waitForLaunchCommand: Receive = {
+ case LaunchExecutorSystem(worker, executorSystemId, resource) =>
+ val launcherPath = ActorUtil.getFullPath(context.system, self.path)
+ val jvmConfig = Option(session.executorSystemJvmConfig)
+ .map(getExecutorJvmConfig(_, s"app${appId}system${executorSystemId}", launcherPath)).orNull
+
+ val launch = LaunchExecutor(appId, executorSystemId, resource, jvmConfig)
+ LOG.info(s"Launching Executor ...appId: $appId, executorSystemId: $executorSystemId, " +
+ s"slots: ${resource.slots} on worker $worker")
+
+ worker.ref ! launch
+ context.become(waitForActorSystemToStart(sender, launch, worker, executorSystemId))
+ }
+
+ def waitForActorSystemToStart(
+ replyTo: ActorRef, launch: LaunchExecutor, worker: WorkerInfo, executorSystemId: Int)
+ : Receive = {
+ case RegisterActorSystem(systemPath) =>
+ import launch._
+ timeout.cancel()
+ LOG.info(s"Received RegisterActorSystem $systemPath for session ${session.requestor}")
+ sender ! ActorSystemRegistered(worker.ref)
+ val system =
+ ExecutorSystem(executorId, AddressFromURIString(systemPath), sender, resource, worker)
+ replyTo ! LaunchExecutorSystemSuccess(system, session)
+ context.stop(self)
+ case reject@ExecutorLaunchRejected(reason, ex) =>
+ LOG.error(s"Executor Launch ${launch.resource} failed reason: $reason", ex)
+ replyTo ! LaunchExecutorSystemRejected(launch.resource, reason, session)
+ context.stop(self)
+ case timeout: LaunchExecutorSystemTimeout =>
+ LOG.error(s"The Executor ActorSystem $executorSystemId has not been started in time")
+ replyTo ! timeout
+ context.stop(self)
+ }
+}
+
+private[appmaster]
+object ExecutorSystemLauncher {
+
+ case class LaunchExecutorSystem(worker: WorkerInfo, systemId: Int, resource: Resource)
+
+ case class LaunchExecutorSystemSuccess(system: ExecutorSystem, session: Session)
+
+ case class LaunchExecutorSystemRejected(resource: Resource, reason: Any, session: Session)
+
+ case class LaunchExecutorSystemTimeout(session: Session)
+
+ private def getExecutorJvmConfig(conf: ExecutorSystemJvmConfig, systemName: String,
+ reportBack: String): ExecutorJVMConfig = {
+ Option(conf).map { conf =>
+ import conf._
+ ExecutorJVMConfig(classPath, jvmArguments, classOf[ActorSystemBooter].getName,
+ Array(systemName, reportBack), jar, username, executorAkkaConfig)
+ }.getOrElse(null)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala
new file mode 100644
index 0000000..d73cc2f
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.appmaster
+
+import org.apache.gearpump.cluster.worker.WorkerId
+
+import scala.concurrent.duration._
+
+import akka.actor._
+import com.typesafe.config.Config
+
+import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource
+import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated
+import org.apache.gearpump.cluster._
+import org.apache.gearpump.cluster.appmaster.ExecutorSystemLauncher._
+import org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler._
+import org.apache.gearpump.cluster.scheduler.{ResourceAllocation, ResourceRequest}
+import org.apache.gearpump.util.{Constants, LogUtil}
+
+/**
+ * ExecutorSystem is also a type of resource, this class schedules ExecutorSystem for AppMaster.
+ * AppMaster can use this class to directly request a live executor actor systems. The communication
+ * in the background with Master and Worker is hidden from AppMaster.
+ *
+ * Please use ExecutorSystemScheduler.props() to construct this actor
+ */
+private[appmaster]
+class ExecutorSystemScheduler(appId: Int, masterProxy: ActorRef,
+ executorSystemLauncher: (Int, Session) => Props) extends Actor {
+
+ private val LOG = LogUtil.getLogger(getClass, app = appId)
+ implicit val timeout = Constants.FUTURE_TIMEOUT
+ implicit val actorSystem = context.system
+ var currentSystemId = 0
+
+ var resourceAgents = Map.empty[Session, ActorRef]
+
+ def receive: Receive = {
+ clientCommands orElse resourceAllocationMessageHandler orElse executorSystemMessageHandler
+ }
+
+ def clientCommands: Receive = {
+ case start: StartExecutorSystems =>
+ LOG.info(s"starting executor systems (ExecutorSystemConfig(${start.executorSystemConfig}), " +
+ s"Resources(${start.resources.mkString(",")}))")
+ val requestor = sender()
+ val executorSystemConfig = start.executorSystemConfig
+ val session = Session(requestor, executorSystemConfig)
+ val agent = resourceAgents.getOrElse(session,
+ context.actorOf(Props(new ResourceAgent(masterProxy, session))))
+ resourceAgents = resourceAgents + (session -> agent)
+
+ start.resources.foreach { resource =>
+ agent ! RequestResource(appId, resource)
+ }
+
+ case StopExecutorSystem(executorSystem) =>
+ executorSystem.shutdown
+ }
+
+ def resourceAllocationMessageHandler: Receive = {
+ case ResourceAllocatedForSession(allocations, session) =>
+ if (isSessionAlive(session)) {
+ allocations.foreach { resourceAllocation =>
+ val ResourceAllocation(resource, worker, workerId) = resourceAllocation
+
+ val launcher = context.actorOf(executorSystemLauncher(appId, session))
+ launcher ! LaunchExecutorSystem(WorkerInfo(workerId, worker), currentSystemId, resource)
+ currentSystemId = currentSystemId + 1
+ }
+ }
+ case ResourceAllocationTimeOut(session) =>
+ if (isSessionAlive(session)) {
+ resourceAgents = resourceAgents - session
+ session.requestor ! StartExecutorSystemTimeout
+ }
+ }
+
+ def executorSystemMessageHandler: Receive = {
+ case LaunchExecutorSystemSuccess(system, session) =>
+ if (isSessionAlive(session)) {
+ LOG.info("LaunchExecutorSystemSuccess, send back to " + session.requestor)
+ system.bindLifeCycleWith(self)
+ session.requestor ! ExecutorSystemStarted(system, session.executorSystemJvmConfig.jar)
+ } else {
+ LOG.error("We get a ExecutorSystem back, but resource requestor is no longer valid. " +
+ "Will shutdown the allocated system")
+ system.shutdown
+ }
+ case LaunchExecutorSystemTimeout(session) =>
+ if (isSessionAlive(session)) {
+ LOG.error(s"Failed to launch executor system for ${session.requestor} due to timeout")
+ session.requestor ! StartExecutorSystemTimeout
+ }
+
+ case LaunchExecutorSystemRejected(resource, reason, session) =>
+ if (isSessionAlive(session)) {
+ LOG.error(s"Failed to launch executor system, due to $reason, " +
+ s"will ask master to allocate new resources $resource")
+ resourceAgents.get(session).map { resourceAgent: ActorRef =>
+ resourceAgent ! RequestResource(appId, ResourceRequest(resource, WorkerId.unspecified))
+ }
+ }
+ }
+
+ private def isSessionAlive(session: Session): Boolean = {
+ Option(session).flatMap(session => resourceAgents.get(session)).nonEmpty
+ }
+}
+
+object ExecutorSystemScheduler {
+
+ case class StartExecutorSystems(
+ resources: Array[ResourceRequest], executorSystemConfig: ExecutorSystemJvmConfig)
+
+ case class ExecutorSystemStarted(system: ExecutorSystem, boundedJar: Option[AppJar])
+
+ case class StopExecutorSystem(system: ExecutorSystem)
+
+ case object StartExecutorSystemTimeout
+
+ case class ExecutorSystemJvmConfig(classPath: Array[String], jvmArguments: Array[String],
+ jar: Option[AppJar], username: String, executorAkkaConfig: Config = null)
+
+ /**
+ * For each client which ask for an executor system, the scheduler will create a session for it.
+ *
+ */
+ private[appmaster]
+ case class Session(requestor: ActorRef, executorSystemJvmConfig: ExecutorSystemJvmConfig)
+
+ /**
+ * This is a agent for session to request resource
+ *
+ * @param session the original requester of the resource requests
+ */
+ private[appmaster]
+ class ResourceAgent(master: ActorRef, session: Session) extends Actor {
+ private var resourceRequestor: ActorRef = null
+ var timeOutClock: Cancellable = null
+ private var unallocatedResource: Int = 0
+
+ import context.dispatcher
+
+ import org.apache.gearpump.util.Constants._
+
+ val timeout = context.system.settings.config.getInt(GEARPUMP_RESOURCE_ALLOCATION_TIMEOUT)
+
+ def receive: Receive = {
+ case request: RequestResource =>
+ unallocatedResource += request.request.resource.slots
+ Option(timeOutClock).map(_.cancel)
+ timeOutClock = context.system.scheduler.scheduleOnce(
+ timeout.seconds, self, ResourceAllocationTimeOut(session))
+ resourceRequestor = sender
+ master ! request
+ case ResourceAllocated(allocations) =>
+ unallocatedResource -= allocations.map(_.resource.slots).sum
+ resourceRequestor forward ResourceAllocatedForSession(allocations, session)
+ case timeout: ResourceAllocationTimeOut =>
+ if (unallocatedResource > 0) {
+ resourceRequestor ! ResourceAllocationTimeOut(session)
+ // We will not receive any ResourceAllocation after timeout
+ context.stop(self)
+ }
+ }
+ }
+
+ private[ExecutorSystemScheduler]
+ case class ResourceAllocatedForSession(resource: Array[ResourceAllocation], session: Session)
+
+ private[ExecutorSystemScheduler]
+ case class ResourceAllocationTimeOut(session: Session)
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeper.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeper.scala b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeper.scala
new file mode 100644
index 0000000..fd2d11a
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeper.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.appmaster
+
+import java.util.concurrent.TimeUnit
+import scala.concurrent.duration.FiniteDuration
+
+import akka.actor._
+
+import org.apache.gearpump.cluster.AppMasterToMaster.RegisterAppMaster
+import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterRegistered
+import org.apache.gearpump.cluster.appmaster.MasterConnectionKeeper.AppMasterRegisterTimeout
+import org.apache.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus.{MasterConnected, MasterStopped}
+import org.apache.gearpump.cluster.master.MasterProxy.{MasterRestarted, WatchMaster}
+import org.apache.gearpump.util.LogUtil
+
+/**
+ * Watches the liveness of Master.
+ *
+ * When Master is restarted, it sends RegisterAppMaster to the new Master instance.
+ * If Master is stopped, it sends the MasterConnectionStatus to listener
+ *
+ * please use MasterConnectionKeeper.props() to construct this actor
+ */
+private[appmaster]
+class MasterConnectionKeeper(
+ register: RegisterAppMaster, masterProxy: ActorRef, masterStatusListener: ActorRef)
+ extends Actor {
+
+ import context.dispatcher
+
+ private val LOG = LogUtil.getLogger(getClass)
+ private var master: ActorRef = null
+
+ // Subscribe self to masterProxy,
+ masterProxy ! WatchMaster(self)
+
+ def registerAppMaster: Cancellable = {
+ masterProxy ! register
+ context.system.scheduler.scheduleOnce(FiniteDuration(30, TimeUnit.SECONDS),
+ self, AppMasterRegisterTimeout)
+ }
+
+ context.become(waitMasterToConfirm(registerAppMaster))
+
+ def waitMasterToConfirm(cancelRegister: Cancellable): Receive = {
+ case AppMasterRegistered(appId) =>
+ cancelRegister.cancel()
+ masterStatusListener ! MasterConnected
+ context.become(masterLivenessListener)
+ case AppMasterRegisterTimeout =>
+ cancelRegister.cancel()
+ masterStatusListener ! MasterStopped
+ context.stop(self)
+ }
+
+ def masterLivenessListener: Receive = {
+ case MasterRestarted =>
+ LOG.info("Master restarted, re-registering appmaster....")
+ context.become(waitMasterToConfirm(registerAppMaster))
+ case MasterStopped =>
+ LOG.info("Master is dead, killing this AppMaster....")
+ masterStatusListener ! MasterStopped
+ context.stop(self)
+ }
+
+ def receive: Receive = null
+}
+
+private[appmaster] object MasterConnectionKeeper {
+
+ case object AppMasterRegisterTimeout
+
+ object MasterConnectionStatus {
+
+ case object MasterConnected
+
+ case object MasterStopped
+
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
new file mode 100644
index 0000000..245f1bc
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.client
+
+import java.util.concurrent.TimeUnit
+import scala.collection.JavaConverters._
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
+import scala.util.Try
+
+import akka.actor.{ActorRef, ActorSystem}
+import akka.util.Timeout
+import com.typesafe.config.{Config, ConfigValueFactory}
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMastersData, ReplayFromTimestampWindowTrailingEdge}
+import org.apache.gearpump.cluster.MasterToClient.ReplayApplicationResult
+import org.apache.gearpump.cluster._
+import org.apache.gearpump.cluster.master.MasterProxy
+import org.apache.gearpump.jarstore.JarStoreService
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.{ActorUtil, Constants, LogUtil, Util}
+
+/**
+ * ClientContext is a user facing util to submit/manage an application.
+ *
+ * TODO: add interface to query master here
+ */
+class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) {
+
+ def this(system: ActorSystem) = {
+ this(system.settings.config, system, null)
+ }
+
+ def this(config: Config) = {
+ this(config, null, null)
+ }
+
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+ private implicit val timeout = Timeout(5, TimeUnit.SECONDS)
+
+ implicit val system = Option(sys).getOrElse(ActorSystem(s"client${Util.randInt()}", config))
+ LOG.info(s"Starting system ${system.name}")
+ val shouldCleanupSystem = Option(sys).isEmpty
+
+ private val jarStoreService = JarStoreService.get(config)
+ jarStoreService.init(config, system)
+
+ private lazy val master: ActorRef = {
+ val masters = config.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala
+ .flatMap(Util.parseHostList)
+ val master = Option(_master).getOrElse(system.actorOf(MasterProxy.props(masters),
+ s"masterproxy${system.name}"))
+ LOG.info(s"Creating master proxy ${master} for master list: $masters")
+ master
+ }
+
+ /**
+ * Submits an application with default jar setting. Use java property "gearpump.app.jar" if
+ * defined. Otherwise, it assumes the jar is on the target runtime classpath, thus will
+ * not send the jar across the wire.
+ */
+ def submit(app: Application): Int = {
+ submit(app, System.getProperty(GEARPUMP_APP_JAR))
+ }
+
+ def submit(app: Application, jar: String): Int = {
+ submit(app, jar, getExecutorNum())
+ }
+
+ def submit(app: Application, jar: String, executorNum: Int): Int = {
+ val client = getMasterClient
+ val appName = checkAndAddNamePrefix(app.name, System.getProperty(GEARPUMP_APP_NAME_PREFIX))
+ val submissionConfig = getSubmissionConfig(config)
+ .withValue(APPLICATION_EXECUTOR_NUMBER, ConfigValueFactory.fromAnyRef(executorNum))
+ val appDescription =
+ AppDescription(appName, app.appMaster.getName, app.userConfig, submissionConfig)
+ val appJar = Option(jar).map(loadFile)
+ client.submitApplication(appDescription, appJar)
+ }
+
+ private def getExecutorNum(): Int = {
+ Try(System.getProperty(APPLICATION_EXECUTOR_NUMBER).toInt).getOrElse(1)
+ }
+
+ private def getSubmissionConfig(config: Config): Config = {
+ ClusterConfig.filterOutDefaultConfig(config)
+ }
+
+ def replayFromTimestampWindowTrailingEdge(appId: Int): ReplayApplicationResult = {
+ import scala.concurrent.ExecutionContext.Implicits.global
+ val result = Await.result(
+ ActorUtil.askAppMaster[ReplayApplicationResult](master,
+ appId, ReplayFromTimestampWindowTrailingEdge(appId)), Duration.Inf)
+ result
+ }
+
+ def askAppMaster[T](appId: Int, msg: Any): Future[T] = {
+ import scala.concurrent.ExecutionContext.Implicits.global
+ ActorUtil.askAppMaster[T](master, appId, msg)
+ }
+
+ def listApps: AppMastersData = {
+ val client = getMasterClient
+ client.listApplications
+ }
+
+ def shutdown(appId: Int): Unit = {
+ val client = getMasterClient
+ client.shutdownApplication(appId)
+ }
+
+ def resolveAppID(appId: Int): ActorRef = {
+ val client = getMasterClient
+ client.resolveAppId(appId)
+ }
+
+ def close(): Unit = {
+ if (shouldCleanupSystem) {
+ LOG.info(s"Shutting down system ${system.name}")
+ system.terminate()
+ }
+ }
+
+ private def loadFile(jarPath: String): AppJar = {
+ val jarFile = new java.io.File(jarPath)
+ val path = jarStoreService.copyFromLocal(jarFile)
+ AppJar(jarFile.getName, path)
+ }
+
+ private def checkAndAddNamePrefix(appName: String, namePrefix: String): String = {
+ val fullName = if (namePrefix != null && namePrefix != "") {
+ namePrefix + "_" + appName
+ } else {
+ appName
+ }
+ if (!Util.validApplicationName(fullName)) {
+ close()
+ val error = s"The application name $appName is not a proper name. An app name can " +
+ "be a sequence of letters, numbers or underscore character \"_\""
+ throw new Exception(error)
+ }
+ fullName
+ }
+
+ private def getMasterClient: MasterClient = {
+ val timeout = Try(config.getInt(Constants.GEARPUMP_MASTERCLIENT_TIMEOUT)).getOrElse(90)
+ new MasterClient(master, akka.util.Timeout(timeout, TimeUnit.SECONDS))
+ }
+}
+
+object ClientContext {
+
+ def apply(): ClientContext = new ClientContext(ClusterConfig.default(), null, null)
+
+ def apply(system: ActorSystem): ClientContext = {
+ new ClientContext(ClusterConfig.default(), system, null)
+ }
+
+ def apply(system: ActorSystem, master: ActorRef): ClientContext = {
+ new ClientContext(ClusterConfig.default(), system, master)
+ }
+
+ def apply(config: Config): ClientContext = new ClientContext(config, null, null)
+
+ def apply(config: Config, system: ActorSystem, master: ActorRef): ClientContext = {
+ new ClientContext(config, system, master)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/client/MasterClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/MasterClient.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/MasterClient.scala
new file mode 100644
index 0000000..77ebedf
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/client/MasterClient.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.client
+
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
+import scala.util.{Failure, Success}
+
+import akka.actor.ActorRef
+import akka.pattern.ask
+import akka.util.Timeout
+
+import org.apache.gearpump.cluster.ClientToMaster._
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMastersData, AppMastersDataRequest}
+import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult, SubmitApplicationResult}
+import org.apache.gearpump.cluster.{AppDescription, AppJar}
+
+/**
+ * Client to inter-operate with Master node.
+ *
+ * NOTE: Stateless, thread safe
+ */
+class MasterClient(master: ActorRef, timeout: Timeout) {
+ implicit val masterClientTimeout = timeout
+
+ def submitApplication(app: AppDescription, appJar: Option[AppJar]): Int = {
+ val result = Await.result(
+ (master ? SubmitApplication(app, appJar)).asInstanceOf[Future[SubmitApplicationResult]],
+ Duration.Inf)
+ val appId = result.appId match {
+ case Success(appId) =>
+ // scalastyle:off println
+ Console.println(s"Submit application succeed. The application id is $appId")
+ // scalastyle:on println
+ appId
+ case Failure(ex) => throw ex
+ }
+ appId
+ }
+
+ def resolveAppId(appId: Int): ActorRef = {
+ val result = Await.result(
+ (master ? ResolveAppId(appId)).asInstanceOf[Future[ResolveAppIdResult]], Duration.Inf)
+ result.appMaster match {
+ case Success(appMaster) => appMaster
+ case Failure(ex) => throw ex
+ }
+ }
+
+ def shutdownApplication(appId: Int): Unit = {
+ val result = Await.result(
+ (master ? ShutdownApplication(appId)).asInstanceOf[Future[ShutdownApplicationResult]],
+ Duration.Inf)
+ result.appId match {
+ case Success(_) =>
+ case Failure(ex) => throw ex
+ }
+ }
+
+ def listApplications: AppMastersData = {
+ val result = Await.result(
+ (master ? AppMastersDataRequest).asInstanceOf[Future[AppMastersData]], Duration.Inf)
+ result
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/main/ArgumentsParser.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/ArgumentsParser.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/ArgumentsParser.scala
new file mode 100644
index 0000000..02c6f1a
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/main/ArgumentsParser.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.main
+
+import org.apache.gearpump.cluster.main.ArgumentsParser.Syntax
+
+case class CLIOption[+T](
+ description: String = "", required: Boolean = false, defaultValue: Option[T] = None)
+
+class ParseResult(optionMap: Map[String, String], remainArguments: Array[String]) {
+ def getInt(key: String): Int = optionMap.get(key).get.toInt
+
+ def getString(key: String): String = optionMap.get(key).get
+
+ def getBoolean(key: String): Boolean = optionMap.get(key).get.toBoolean
+
+ def exists(key: String): Boolean = !(optionMap.getOrElse(key, "").isEmpty)
+
+ def remainArgs: Array[String] = this.remainArguments
+}
+
+/**
+ * Parser for command line arguments
+ *
+ * Grammar: -option1 value1 -option2 value3 -flag1 -flag2 remainArg1 remainArg2...
+ */
+trait ArgumentsParser {
+
+ val ignoreUnknownArgument = false
+
+ // scalastyle:off println
+ def help(): Unit = {
+ Console.println(s"\nHelp: $description")
+ var usage = List.empty[String]
+ options.map(kv => if (kv._2.required) {
+ usage = usage :+ s"-${kv._1} (required:${kv._2.required})${kv._2.description}"
+ } else {
+ usage = usage :+ s"-${kv._1} (required:${kv._2.required}, " +
+ s"default:${kv._2.defaultValue.getOrElse("")})${kv._2.description}"
+ })
+ usage :+= remainArgs.map(k => s"<$k>").mkString(" ")
+ usage.foreach(Console.println(_))
+ }
+ // scalastyle:on println
+
+ def parse(args: Array[String]): ParseResult = {
+ val syntax = Syntax(options, remainArgs, ignoreUnknownArgument)
+ ArgumentsParser.parse(syntax, args)
+ }
+
+ val description: String = ""
+ val options: Array[(String, CLIOption[Any])] = Array.empty[(String, CLIOption[Any])]
+ val remainArgs: Array[String] = Array.empty[String]
+}
+
+object ArgumentsParser {
+
+ case class Syntax(
+ val options: Array[(String, CLIOption[Any])], val remainArgs: Array[String],
+ val ignoreUnknownArgument: Boolean)
+
+ def parse(syntax: Syntax, args: Array[String]): ParseResult = {
+ import syntax.{ignoreUnknownArgument, options, remainArgs}
+ var config = Map.empty[String, String]
+ var remain = Array.empty[String]
+
+ def doParse(argument: List[String]): Unit = {
+ argument match {
+ case Nil => Unit // true if everything processed successfully
+
+ case key :: value :: rest if key.startsWith("-") && !value.startsWith("-") =>
+ val fixedKey = key.substring(1)
+ if (!options.map(_._1).contains(fixedKey)) {
+ if (!ignoreUnknownArgument) {
+ throw new Exception(s"found unknown option $fixedKey")
+ } else {
+ remain ++= Array(key, value)
+ }
+ } else {
+ config += fixedKey -> value
+ }
+ doParse(rest)
+
+ case key :: rest if key.startsWith("-") =>
+ val fixedKey = key.substring(1)
+ if (!options.map(_._1).contains(fixedKey)) {
+ throw new Exception(s"found unknown option $fixedKey")
+ } else {
+ config += fixedKey -> "true"
+ }
+ doParse(rest)
+
+ case value :: rest =>
+ // scalastyle:off println
+ Console.err.println(s"Warning: get unknown argument $value, maybe it is a main class")
+ // scalastyle:on println
+ remain ++= value :: rest
+ doParse(Nil)
+ }
+ }
+ doParse(args.toList)
+
+ options.foreach { pair =>
+ val (key, option) = pair
+ if (!config.contains(key) && !option.required) {
+ config += key -> option.defaultValue.getOrElse("").toString
+ }
+ }
+
+ options.foreach { pair =>
+ val (key, value) = pair
+ if (config.get(key).isEmpty) {
+ throw new Exception(s"Missing option ${key}...")
+ }
+ }
+
+ if (remain.length < remainArgs.length) {
+ throw new Exception(s"Missing arguments ...")
+ }
+
+ new ParseResult(config, remain)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/master/AppMasterLauncher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/AppMasterLauncher.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/AppMasterLauncher.scala
new file mode 100644
index 0000000..9305d5c
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/master/AppMasterLauncher.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.master
+
+import java.util.concurrent.{TimeUnit, TimeoutException}
+import org.apache.gearpump.cluster.worker.WorkerId
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration.Duration
+import scala.util.{Failure, Success}
+
+import akka.actor.{Actor, ActorRef, Props, _}
+import com.typesafe.config.Config
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource
+import org.apache.gearpump.cluster.AppMasterToWorker.{LaunchExecutor, ShutdownExecutor}
+import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated
+import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult
+import org.apache.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected
+import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, AppMasterRuntimeInfo, WorkerInfo}
+import org.apache.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest}
+import org.apache.gearpump.cluster.{AppDescription, AppJar, _}
+import org.apache.gearpump.transport.HostPort
+import org.apache.gearpump.util.ActorSystemBooter._
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.{ActorSystemBooter, ActorUtil, LogUtil, Util}
+
+/**
+ *
+ * AppMasterLauncher is a child Actor of AppManager, it is responsible
+ * to launch the AppMaster on the cluster.
+ */
+class AppMasterLauncher(
+ appId: Int, executorId: Int, app: AppDescription,
+ jar: Option[AppJar], username: String, master: ActorRef, client: Option[ActorRef])
+ extends Actor {
+ private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
+
+ val scheduler = context.system.scheduler
+ val systemConfig = context.system.settings.config
+ val TIMEOUT = Duration(15, TimeUnit.SECONDS)
+
+ val appMasterAkkaConfig: Config = app.clusterConfig
+
+ LOG.info(s"Ask Master resource to start AppMaster $appId...")
+ master ! RequestResource(appId, ResourceRequest(Resource(1), WorkerId.unspecified))
+
+ def receive: Receive = waitForResourceAllocation
+
+ def waitForResourceAllocation: Receive = {
+ case ResourceAllocated(allocations) =>
+
+ val ResourceAllocation(resource, worker, workerId) = allocations(0)
+ LOG.info(s"Resource allocated for appMaster $appId on worker ${workerId}(${worker.path})")
+
+ val submissionTime = System.currentTimeMillis()
+
+ val appMasterInfo = AppMasterRuntimeInfo(appId, app.name, worker, username,
+ submissionTime, config = appMasterAkkaConfig)
+ val workerInfo = WorkerInfo(workerId, worker)
+ val appMasterContext =
+ AppMasterContext(appId, username, resource, workerInfo, jar, null, appMasterInfo)
+ LOG.info(s"Try to launch a executor for AppMaster on worker ${workerId} for app $appId")
+ val name = ActorUtil.actorNameForExecutor(appId, executorId)
+ val selfPath = ActorUtil.getFullPath(context.system, self.path)
+
+ val jvmSetting =
+ Util.resolveJvmSetting(appMasterAkkaConfig.withFallback(systemConfig)).appMater
+ val executorJVM = ExecutorJVMConfig(jvmSetting.classPath, jvmSetting.vmargs,
+ classOf[ActorSystemBooter].getName, Array(name, selfPath), jar,
+ username, appMasterAkkaConfig)
+
+ worker ! LaunchExecutor(appId, executorId, resource, executorJVM)
+ context.become(waitForActorSystemToStart(worker, appMasterContext, app.userConfig, resource))
+ }
+
+ def waitForActorSystemToStart(
+ worker: ActorRef, appContext: AppMasterContext, user: UserConfig, resource: Resource)
+ : Receive = {
+ case ExecutorLaunchRejected(reason, ex) =>
+ LOG.error(s"Executor Launch failed reason: $reason", ex)
+ LOG.info(s"reallocate resource $resource to start appmaster")
+ master ! RequestResource(appId, ResourceRequest(resource, WorkerId.unspecified))
+ context.become(waitForResourceAllocation)
+ case RegisterActorSystem(systemPath) =>
+ LOG.info(s"Received RegisterActorSystem $systemPath for AppMaster")
+ sender ! ActorSystemRegistered(worker)
+
+ val masterAddress = systemConfig.getStringList(GEARPUMP_CLUSTER_MASTERS)
+ .asScala.map(HostPort(_)).map(ActorUtil.getMasterActorPath)
+ sender ! CreateActor(
+ AppMasterRuntimeEnvironment.props(masterAddress, app, appContext), s"appdaemon$appId")
+
+ import context.dispatcher
+ val appMasterTimeout = scheduler.scheduleOnce(TIMEOUT, self,
+ CreateActorFailed(app.appMaster, new TimeoutException))
+ context.become(waitForAppMasterToStart(worker, appMasterTimeout))
+ }
+
+ def waitForAppMasterToStart(worker: ActorRef, cancel: Cancellable): Receive = {
+ case ActorCreated(appMaster, _) =>
+ cancel.cancel()
+ sender ! BindLifeCycle(appMaster)
+ LOG.info(s"AppMaster is created, mission complete...")
+ replyToClient(SubmitApplicationResult(Success(appId)))
+ context.stop(self)
+ case CreateActorFailed(name, reason) =>
+ cancel.cancel()
+ worker ! ShutdownExecutor(appId, executorId, reason.getMessage)
+ replyToClient(SubmitApplicationResult(Failure(reason)))
+ context.stop(self)
+ }
+
+ def replyToClient(result: SubmitApplicationResult): Unit = {
+ if (client.isDefined) {
+ client.get.tell(result, master)
+ }
+ }
+}
+
+object AppMasterLauncher extends AppMasterLauncherFactory {
+ def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar],
+ username: String, master: ActorRef, client: Option[ActorRef]): Props = {
+ Props(new AppMasterLauncher(appId, executorId, app, jar, username, master, client))
+ }
+}
+
+trait AppMasterLauncherFactory {
+ def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar],
+ username: String, master: ActorRef, client: Option[ActorRef]): Props
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/master/MasterProxy.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/MasterProxy.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/MasterProxy.scala
new file mode 100644
index 0000000..e2ee00e
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/master/MasterProxy.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.master
+
+import scala.concurrent.duration.FiniteDuration
+
+import akka.actor._
+import org.slf4j.Logger
+
+import org.apache.gearpump.transport.HostPort
+import org.apache.gearpump.util.{ActorUtil, LogUtil}
+
+/**
+ * This works with Master HA. When there are multiple Master nodes,
+ * This will find a active one.
+ */
+class MasterProxy(masters: Iterable[ActorPath], timeout: FiniteDuration)
+ extends Actor with Stash {
+ import org.apache.gearpump.cluster.master.MasterProxy._
+
+ val LOG: Logger = LogUtil.getLogger(getClass, name = self.path.name)
+
+ val contacts = masters.map { url =>
+ LOG.info(s"Contacts point URL: $url")
+ context.actorSelection(url)
+ }
+
+ var watchers: List[ActorRef] = List.empty[ActorRef]
+
+ import context.dispatcher
+
+ def findMaster(): Cancellable = {
+ repeatActionUtil(timeout) {
+ contacts foreach { contact =>
+ LOG.info(s"sending identity to $contact")
+ contact ! Identify(None)
+ }
+ }
+ }
+
+ context.become(establishing(findMaster()))
+
+ LOG.info("Master Proxy is started...")
+
+ override def postStop(): Unit = {
+ watchers.foreach(_ ! MasterStopped)
+ super.postStop()
+ }
+
+ override def receive: Receive = {
+ case _ =>
+ }
+
+ def establishing(findMaster: Cancellable): Actor.Receive = {
+ case ActorIdentity(_, Some(receptionist)) =>
+ context watch receptionist
+ LOG.info("Connected to [{}]", receptionist.path)
+ context.watch(receptionist)
+
+ watchers.foreach(_ ! MasterRestarted)
+ unstashAll()
+ findMaster.cancel()
+ context.become(active(receptionist) orElse messageHandler(receptionist))
+ case ActorIdentity(_, None) => // ok, use another instead
+ case msg =>
+ LOG.info(s"Stashing ${msg.getClass.getSimpleName}")
+ stash()
+ }
+
+ def active(receptionist: ActorRef): Actor.Receive = {
+ case Terminated(receptionist) =>
+ LOG.info("Lost contact with [{}], restablishing connection", receptionist)
+ context.become(establishing(findMaster))
+ case _: ActorIdentity => // ok, from previous establish, already handled
+ case WatchMaster(watcher) =>
+ watchers = watchers :+ watcher
+ }
+
+ def messageHandler(master: ActorRef): Receive = {
+ case msg =>
+ LOG.debug(s"Get msg ${msg.getClass.getSimpleName}, forwarding to ${master.path}")
+ master forward msg
+ }
+
+ def scheduler: Scheduler = context.system.scheduler
+ import scala.concurrent.duration._
+ private def repeatActionUtil(timeout: FiniteDuration)(action: => Unit): Cancellable = {
+ val send = scheduler.schedule(0.seconds, 2.seconds)(action)
+ val suicide = scheduler.scheduleOnce(timeout) {
+ send.cancel()
+ self ! PoisonPill
+ }
+
+ new Cancellable {
+ def cancel(): Boolean = {
+ val result1 = send.cancel()
+ val result2 = suicide.cancel()
+ result1 && result2
+ }
+
+ def isCancelled: Boolean = {
+ send.isCancelled && suicide.isCancelled
+ }
+ }
+ }
+}
+
+object MasterProxy {
+ case object MasterRestarted
+ case object MasterStopped
+ case class WatchMaster(watcher: ActorRef)
+
+ import scala.concurrent.duration._
+ def props(masters: Iterable[HostPort], duration: FiniteDuration = 30.seconds): Props = {
+ val contacts = masters.map(ActorUtil.getMasterActorPath(_))
+ Props(new MasterProxy(contacts, duration))
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/master/MasterSummary.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/MasterSummary.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/MasterSummary.scala
new file mode 100644
index 0000000..d182b22
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/master/MasterSummary.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.master
+
+import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
+
+/** Master status. Synced means all masters are live and synced. */
+object MasterStatus {
+ type Type = String
+ val Synced = "synced"
+ val UnSynced = "unsynced"
+}
+
+case class MasterNode(host: String, port: Int) {
+ def toTuple: (String, Int) = {
+ (host, port)
+ }
+}
+
+/**
+ * Master information returned for REST API call
+ */
+case class MasterSummary(
+ leader: MasterNode,
+ cluster: List[MasterNode],
+ aliveFor: Long,
+ logFile: String,
+ jarStore: String,
+ masterStatus: MasterStatus.Type,
+ homeDirectory: String,
+ activities: List[MasterActivity],
+ jvmName: String,
+ historyMetricsConfig: HistoryMetricsConfig = null)
+
+case class MasterActivity(time: Long, event: String)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Resource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Resource.scala b/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Resource.scala
new file mode 100644
index 0000000..c21d396
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Resource.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.scheduler
+
+import akka.actor.ActorRef
+import org.apache.gearpump.cluster.worker.WorkerId
+
+case class Resource(slots: Int) {
+
+ // scalastyle:off spaces.after.plus
+ def +(other: Resource): Resource = Resource(slots + other.slots)
+ // scalastyle:on spaces.after.plus
+
+ def -(other: Resource): Resource = Resource(slots - other.slots)
+
+ def >(other: Resource): Boolean = slots > other.slots
+
+ def >=(other: Resource): Boolean = !(this < other)
+
+ def <(other: Resource): Boolean = slots < other.slots
+
+ def <=(other: Resource): Boolean = !(this > other)
+
+ def isEmpty: Boolean = {
+ slots == 0
+ }
+}
+
+/**
+ * Each streaming job can have a priority, the job with higher priority
+ * will get scheduled resource earlier than those with lower priority.
+ */
+object Priority extends Enumeration {
+ type Priority = Value
+ val LOW, NORMAL, HIGH = Value
+}
+
+/**
+ * Relaxation.ONEWORKER means only resource (slot) from that worker will be accepted by
+ * the requestor application job.
+ */
+object Relaxation extends Enumeration {
+ type Relaxation = Value
+
+ // Option ONEWORKER allow user to schedule a task on specific worker.
+ val ANY, ONEWORKER, SPECIFICWORKER = Value
+}
+
+import org.apache.gearpump.cluster.scheduler.Priority._
+import org.apache.gearpump.cluster.scheduler.Relaxation._
+
+case class ResourceRequest(
+ resource: Resource, workerId: WorkerId, priority: Priority = NORMAL,
+ relaxation: Relaxation = ANY, executorNum: Int = 1)
+
+case class ResourceAllocation(resource: Resource, worker: ActorRef, workerId: WorkerId)
+
+object Resource {
+ def empty: Resource = new Resource(0)
+
+ def min(res1: Resource, res2: Resource): Resource = if (res1.slots < res2.slots) res1 else res2
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/worker/ExecutorProcessLauncher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/worker/ExecutorProcessLauncher.scala b/core/src/main/scala/org/apache/gearpump/cluster/worker/ExecutorProcessLauncher.scala
new file mode 100644
index 0000000..58f7dd8
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/worker/ExecutorProcessLauncher.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.worker
+
+import com.typesafe.config.Config
+
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.util.RichProcess
+
+/**
+ * ExecutorProcessLauncher is used to launch a process for Executor using given parameters.
+ *
+ * User can implement this interface to decide the behavior of launching a process.
+ * Set "gearpump.worker.executor-process-launcher" to your implemented class name.
+ */
+trait ExecutorProcessLauncher {
+ val config: Config
+
+ /**
+ * This function launches a process for Executor using given parameters.
+ *
+ * @param appId The appId of the executor to be launched
+ * @param executorId The executorId of the executor to be launched
+ * @param resource The resource allocated for that executor
+ * @param options The command options
+ * @param classPath The classpath of the process
+ * @param mainClass The main class of the process
+ * @param arguments The rest arguments
+ */
+ def createProcess(
+ appId: Int, executorId: Int, resource: Resource, config: Config, options: Array[String],
+ classPath: Array[String], mainClass: String, arguments: Array[String]): RichProcess
+
+ /**
+ * This function will clean resources for a launched process.
+ * @param appId The appId of the launched executor
+ * @param executorId The executorId of launched executor
+ */
+ def cleanProcess(appId: Int, executorId: Int): Unit
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/worker/WorkerId.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/worker/WorkerId.scala b/core/src/main/scala/org/apache/gearpump/cluster/worker/WorkerId.scala
new file mode 100644
index 0000000..93213e4
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/worker/WorkerId.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.worker
+
+/**
+ * WorkerId is used to uniquely track a worker machine.
+ *
+ * @param sessionId sessionId is assigned by Master node for easy tracking. It is possible that
+ * sessionId is **NOT** unique, so always use WorkerId for comparison.
+ * @param registerTime the timestamp when a worker node register itself to master node
+ */
+case class WorkerId(sessionId: Int, registerTime: Long)
+
+object WorkerId {
+ val unspecified: WorkerId = new WorkerId(-1, 0L)
+
+ def render(workerId: WorkerId): String = {
+ workerId.registerTime + "_" + workerId.sessionId
+ }
+
+ def parse(str: String): WorkerId = {
+ val pair = str.split("_")
+ new WorkerId(pair(1).toInt, pair(0).toLong)
+ }
+
+ implicit val workerIdOrdering: Ordering[WorkerId] = {
+ new Ordering[WorkerId] {
+
+ /** Compare timestamp first, then id */
+ override def compare(x: WorkerId, y: WorkerId): Int = {
+ if (x.registerTime < y.registerTime) {
+ -1
+ } else if (x.registerTime == y.registerTime) {
+ if (x.sessionId < y.sessionId) {
+ -1
+ } else if (x.sessionId == y.sessionId) {
+ 0
+ } else {
+ 1
+ }
+ } else {
+ 1
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/worker/WorkerSummary.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/worker/WorkerSummary.scala b/core/src/main/scala/org/apache/gearpump/cluster/worker/WorkerSummary.scala
new file mode 100644
index 0000000..f4c0587
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/worker/WorkerSummary.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.worker
+import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
+
+/**
+ * Worker summary information for REST API.
+ */
+case class WorkerSummary(
+ workerId: WorkerId,
+ state: String,
+ actorPath: String,
+ aliveFor: Long,
+ logFile: String,
+ executors: Array[ExecutorSlots],
+ totalSlots: Int,
+ availableSlots: Int,
+ homeDirectory: String,
+ jvmName: String,
+ // Id used to uniquely identity this worker process in low level resource manager like YARN.
+ resourceManagerContainerId: String,
+ historyMetricsConfig: HistoryMetricsConfig = null)
+
+object WorkerSummary {
+ def empty: WorkerSummary = {
+ WorkerSummary(WorkerId.unspecified, "", "", 0L, "",
+ Array.empty[ExecutorSlots], 0, 0, "", jvmName = "", resourceManagerContainerId = "")
+ }
+}
+
+case class ExecutorSlots(appId: Int, executorId: Int, slots: Int)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreService.scala b/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreService.scala
new file mode 100644
index 0000000..0ba9558
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreService.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.jarstore
+
+import java.io.File
+import java.net.URI
+import java.util.ServiceLoader
+import scala.collection.JavaConverters._
+
+import akka.actor.ActorSystem
+import com.typesafe.config.Config
+
+import org.apache.gearpump.util.{Constants, Util}
+
+case class FilePath(path: String)
+
+/**
+ * JarStoreService is used to manage the upload/download of binary files,
+ * like user submitted application jar.
+ */
+trait JarStoreService {
+ /**
+ * The scheme of the JarStoreService.
+ * Like "hdfs" for HDFS file system, and "file" for a local
+ * file system.
+ */
+ val scheme: String
+
+ /**
+ * Init the Jar Store.
+ */
+ def init(config: Config, system: ActorSystem)
+
+ /**
+ * This function will copy the local file to the remote JarStore, called from client side.
+ * @param localFile The local file
+ */
+ def copyFromLocal(localFile: File): FilePath
+
+ /**
+ * This function will copy the remote file to local file system, called from client side.
+ *
+ * @param localFile The destination of file path
+ * @param remotePath The remote file path from JarStore
+ */
+ def copyToLocalFile(localFile: File, remotePath: FilePath)
+}
+
+object JarStoreService {
+
+ /**
+ * Get a active JarStoreService by specifying a scheme.
+ *
+ * Please see config [[org.apache.gearpump.util.Constants#GEARPUMP_APP_JAR_STORE_ROOT_PATH]] for
+ * more information.
+ */
+ def get(config: Config): JarStoreService = {
+ val jarStoreRootPath = config.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH)
+ get(jarStoreRootPath)
+ }
+
+ private lazy val jarstoreServices: List[JarStoreService] = {
+ ServiceLoader.load(classOf[JarStoreService]).asScala.toList
+ }
+
+ private def get(rootPath: String): JarStoreService = {
+ val scheme = new URI(Util.resolvePath(rootPath)).getScheme
+ jarstoreServices.find(_.scheme == scheme).get
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/metrics/AkkaReporter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/metrics/AkkaReporter.scala b/core/src/main/scala/org/apache/gearpump/metrics/AkkaReporter.scala
new file mode 100644
index 0000000..7010048
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/metrics/AkkaReporter.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.metrics
+
+import scala.collection.JavaConverters._
+
+import akka.actor.{ActorRef, ActorSystem}
+
+import io.gearpump.codahale.metrics.{Gauge => CodaGauge, MetricRegistry}
+import org.apache.gearpump.metrics.Metrics.{Counter => CounterData, Gauge => GaugeData, Histogram => HistogramData, Meter => MeterData}
+import org.apache.gearpump.metrics.MetricsReporterService.ReportTo
+import org.apache.gearpump.util.LogUtil
+
+/**
+ * A reporter class for logging metrics values to a remote actor periodically
+ */
+class AkkaReporter(
+ system: ActorSystem,
+ registry: MetricRegistry)
+ extends ReportTo {
+ private val LOG = LogUtil.getLogger(getClass)
+ LOG.info("Start Metrics AkkaReporter")
+
+ override def report(to: ActorRef): Unit = {
+ val counters = registry.getCounters()
+ val histograms = registry.getHistograms()
+ val meters = registry.getMeters()
+ val gauges = registry.getGauges()
+
+ counters.entrySet().asScala.foreach { pair =>
+ to ! CounterData(pair.getKey, pair.getValue.getCount)
+ }
+
+ histograms.entrySet().asScala.foreach { pair =>
+ val key = pair.getKey
+ val value = pair.getValue
+ val s = value.getSnapshot
+ to ! HistogramData(
+ key, s.getMean, s.getStdDev, s.getMedian,
+ s.get95thPercentile, s.get99thPercentile, s.get999thPercentile)
+ }
+
+ meters.entrySet().asScala.foreach { pair =>
+ val key = pair.getKey
+ val value = pair.getValue
+ to ! MeterData(key,
+ value.getCount,
+ value.getMeanRate,
+ value.getOneMinuteRate,
+ getRateUnit)
+ }
+
+ gauges.entrySet().asScala.foreach { kv =>
+ val value = kv.getValue.asInstanceOf[CodaGauge[Number]].getValue.longValue()
+ to ! GaugeData(kv.getKey, value)
+ }
+ }
+
+ private def getRateUnit: String = {
+ "events/s"
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/metrics/Counter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/metrics/Counter.scala b/core/src/main/scala/org/apache/gearpump/metrics/Counter.scala
new file mode 100644
index 0000000..a6ce799
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/metrics/Counter.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.metrics
+
+import io.gearpump.codahale.metrics.{Counter => CodaHaleCounter}
+
+/**
+ * @see io.gearpump.codahale.metrics.Counter
+ */
+class Counter(val name: String, counter: CodaHaleCounter, sampleRate: Int = 1) {
+ private var sampleCount = 0L
+ private var toBeIncremented = 0L
+
+ def inc() {
+ inc(1)
+ }
+
+ def inc(n: Long) {
+ toBeIncremented += n
+ sampleCount += 1
+ if (null != counter && sampleCount % sampleRate == 0) {
+ counter.inc(toBeIncremented)
+ toBeIncremented = 0
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/metrics/Histogram.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/metrics/Histogram.scala b/core/src/main/scala/org/apache/gearpump/metrics/Histogram.scala
new file mode 100644
index 0000000..7d7e19b
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/metrics/Histogram.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.metrics
+
+import io.gearpump.codahale.metrics.{Histogram => CodaHaleHistogram}
+
+/**
+ * @see io.gearpump.codahale.metrics.Histogram
+ */
+class Histogram(val name: String, histogram: CodaHaleHistogram, sampleRate: Int = 1) {
+ private var sampleCount = 0L
+
+ def update(value: Long) {
+ sampleCount += 1
+ if (null != histogram && sampleCount % sampleRate == 0) {
+ histogram.update(value)
+ }
+ }
+
+ def getMean(): Double = {
+ histogram.getSnapshot.getMean
+ }
+
+ def getStdDev(): Double = {
+ histogram.getSnapshot.getStdDev
+ }
+}
\ No newline at end of file