You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/04/26 11:42:58 UTC
[48/49] incubator-gearpump git commit: GEARPUMP-11, fix code style
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/AppDescription.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/AppDescription.scala b/core/src/main/scala/io/gearpump/cluster/AppDescription.scala
index e8af854..799c20a 100644
--- a/core/src/main/scala/io/gearpump/cluster/AppDescription.scala
+++ b/core/src/main/scala/io/gearpump/cluster/AppDescription.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,44 +18,64 @@
package io.gearpump.cluster
+import scala.reflect.ClassTag
+
import akka.actor.{Actor, ActorRef, ActorSystem}
import com.typesafe.config.{Config, ConfigFactory}
+
import io.gearpump.cluster.appmaster.WorkerInfo
import io.gearpump.cluster.scheduler.Resource
import io.gearpump.jarstore.FilePath
-import io.gearpump.util.Util
-
-import scala.reflect.ClassTag
/**
* This contains all information to run an application
*
- * @param name: The name of this application
- * @param appMaster: The class name of AppMaster Actor
- * @param userConfig: user configuration.
- * @param clusterConfig: The user provided cluster config, it will override gear.conf when starting
- * new applications. In most cases, you wouldnot need to change it. If you do need to change it,
- * use ClusterConfigSource(filePath) to construct the object, while filePath points to the .conf file.
+ * @param name The name of this application
+ * @param appMaster The class name of AppMaster Actor
+ * @param userConfig user configuration.
+ * @param clusterConfig User provided cluster config, it overrides gear.conf when starting
+ * new applications. In most cases, you should not need to change it. If you do
+ * really need to change it, please use ClusterConfigSource(filePath) to
+ * construct the object, while filePath points to the .conf file.
*/
+case class AppDescription(
+ name: String, appMaster: String, userConfig: UserConfig,
+ clusterConfig: Config = ConfigFactory.empty())
-case class AppDescription(name : String, appMaster : String, userConfig: UserConfig, clusterConfig: Config = ConfigFactory.empty())
-
+/**
+ * Each job, streaming or not streaming, need to provide an Application class.
+ * The master uses this class to start AppMaster.
+ */
trait Application {
+
+ /** Name of this application, must be unique in the system */
def name: String
+
+ /** Custom user configuration */
def userConfig(implicit system: ActorSystem): UserConfig
+
+ /**
+ * AppMaster class, must have a constructor like this:
+ * this(appContext: AppMasterContext, app: AppDescription)
+ */
def appMaster: Class[_ <: ApplicationMaster]
}
object Application {
- def apply[T <: ApplicationMaster](name: String, userConfig: UserConfig)(implicit tag: ClassTag[T]): Application = {
- new DefaultApplication(name, userConfig, tag.runtimeClass.asInstanceOf[Class[_ <: ApplicationMaster]])
+ def apply[T <: ApplicationMaster](
+ name: String, userConfig: UserConfig)(implicit tag: ClassTag[T]): Application = {
+ new DefaultApplication(name, userConfig,
+ tag.runtimeClass.asInstanceOf[Class[_ <: ApplicationMaster]])
}
- class DefaultApplication(override val name: String, inputUserConfig: UserConfig, val appMaster: Class[_ <: ApplicationMaster]) extends Application {
+ class DefaultApplication(
+ override val name: String, inputUserConfig: UserConfig,
+ val appMaster: Class[_ <: ApplicationMaster]) extends Application {
override def userConfig(implicit system: ActorSystem): UserConfig = inputUserConfig
}
- def ApplicationToAppDescription(app: Application)(implicit system: ActorSystem): AppDescription = {
+ def ApplicationToAppDescription(app: Application)(implicit system: ActorSystem)
+ : AppDescription = {
val filterJvmReservedKeys = ClusterConfig.filterOutDefaultConfig(system.settings.config)
AppDescription(app.name, app.appMaster.getName, app.userConfig, filterJvmReservedKeys)
}
@@ -69,57 +89,57 @@ abstract class ApplicationMaster extends Actor
/**
* This contains context information when starting an AppMaster
*
- * @param appId: application instance id assigned, it is unique in the cluster
- * @param username: The username who submitted this application
- * @param resource: Resouce allocated to start this AppMaster daemon. AppMaster are allowed to
- * request more resource from Master.
- * @param appJar: application Jar. If the jar is already in classpath, then it can be None.
- * @param masterProxy: The proxy to master actor, it will bridge the messages between appmaster and master
- * @param registerData: The AppMaster are required to register this data back to Master by RegisterAppMaster
- *
+ * @param appId application instance id assigned, it is unique in the cluster
+ * @param username The username who submitted this application
+ * @param resource Resouce allocated to start this AppMaster daemon. AppMaster are allowed to
+ * request more resource from Master.
+ * @param appJar application Jar. If the jar is already in classpath, then it can be None.
+ * @param masterProxy The proxy to master actor, it bridges the messages between appmaster
+ * and master
+ * @param registerData AppMaster are required to send this data to Master by when doing
+ * RegisterAppMaster.
*/
case class AppMasterContext(
- appId : Int,
- username : String,
- resource : Resource,
+ appId: Int,
+ username: String,
+ resource: Resource,
workerInfo: WorkerInfo,
- appJar : Option[AppJar],
- masterProxy : ActorRef,
- registerData : AppMasterRegisterData)
+ appJar: Option[AppJar],
+ masterProxy: ActorRef,
+ registerData: AppMasterRegisterData)
/**
* Jar file container in the cluster
*
- * @param name: A meaningful name to represent this jar
- * @param filePath: Where the jar file is stored.
+ * @param name A meaningful name to represent this jar
+ * @param filePath Where the jar file is stored.
*/
case class AppJar(name: String, filePath: FilePath)
-
/**
- * TODO: ExecutorContext doesn't belong here.
- * Need to move to other places
+ * Serves as the context to start an Executor JVM.
*/
-case class ExecutorContext(executorId : Int, worker: WorkerInfo, appId : Int, appName: String,
- appMaster : ActorRef, resource : Resource)
-
+// TODO: ExecutorContext doesn't belong to this package in logic.
+case class ExecutorContext(
+ executorId: Int, worker: WorkerInfo, appId: Int, appName: String,
+ appMaster: ActorRef, resource: Resource)
/**
- * TODO: ExecutorJVMConfig doesn't belong here.
- * Need to move to other places
- */
-/**
- * @param classPath: When a worker create a executor, the parent worker's classpath will
- * be automatically inherited, the application jar will also be added to runtime
- * classpath automatically. Sometimes, you still want to add some extraclasspath,
- * you can do this by specify classPath option.
+ * JVM configurations to start an Executor JVM.
+ *
+ * @param classPath When executor is created by a worker JVM, executor automatically inherits
+ * parent worker's classpath. Sometimes, you still want to add some extra
+ * classpath, you can do this by specify classPath option.
* @param jvmArguments java arguments like -Dxx=yy
* @param mainClass Executor main class name like io.gearpump.xx.AppMaster
* @param arguments Executor command line arguments
* @param jar application jar
- * @param executorAkkaConfig Akka config used to initialize the actor system of this executor. It will
- * use io.gearpump.util.Constants.GEARPUMP_CUSTOM_CONFIG_FILE to pass the config to executor
- * process
- *
+ * @param executorAkkaConfig Akka config used to initialize the actor system of this executor.
+ * It uses io.gearpump.util.Constants.GEARPUMP_CUSTOM_CONFIG_FILE
+ * to pass the config to executor process
*/
-case class ExecutorJVMConfig(classPath : Array[String], jvmArguments : Array[String], mainClass : String, arguments : Array[String], jar: Option[AppJar], username : String, executorAkkaConfig: Config = ConfigFactory.empty())
\ No newline at end of file
+// TODO: ExecutorContext doesn't belong to this package in logic.
+case class ExecutorJVMConfig(
+ classPath: Array[String], jvmArguments: Array[String], mainClass: String,
+ arguments: Array[String], jar: Option[AppJar], username: String,
+ executorAkkaConfig: Config = ConfigFactory.empty())
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/ClusterConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/ClusterConfig.scala b/core/src/main/scala/io/gearpump/cluster/ClusterConfig.scala
index 64d5c42..7bae6d6 100644
--- a/core/src/main/scala/io/gearpump/cluster/ClusterConfig.scala
+++ b/core/src/main/scala/io/gearpump/cluster/ClusterConfig.scala
@@ -19,24 +19,27 @@
package io.gearpump.cluster
import java.io.File
+
import com.typesafe.config._
+
import io.gearpump.util.Constants._
-import io.gearpump.util.{Util, FileUtils, Constants, LogUtil}
-import scala.collection.JavaConversions._
+import io.gearpump.util.{Constants, FileUtils, LogUtil, Util}
/**
*
* All Gearpump application should use this class to load configurations.
*
- * Compared with Akka built-in [[ConfigFactory]], this class will also
- * resolve file gear.conf and geardefault.conf.
+ * Compared with Akka built-in com.typesafe.config.ConfigFactory, this class also
+ * resolve config from file gear.conf and geardefault.conf.
*
* Overriding order:
+ * {{{
* System Properties
* > Custom configuration file (by using system property -Dgearpump.config.file) >
* > gear.conf
* > geardefault.conf
* > reference.conf
+ * }}}
*/
object ClusterConfig {
@@ -81,11 +84,10 @@ object ClusterConfig {
load(configFile).ui
}
-
/**
* try to load system property gearpump.config.file, or use configFile
*/
- private def load(configFile: String) : Configs = {
+ private def load(configFile: String): Configs = {
val file = Option(System.getProperty(GEARPUMP_CUSTOM_CONFIG_FILE))
file match {
case Some(path) =>
@@ -100,7 +102,7 @@ object ClusterConfig {
val APPLICATION = "application.conf"
val LOG = LogUtil.getLogger(getClass)
- def saveConfig(conf : Config, file : File) : Unit = {
+ def saveConfig(conf: Config, file: File): Unit = {
val serialized = conf.root().render()
FileUtils.write(file, serialized)
}
@@ -113,13 +115,13 @@ object ClusterConfig {
}
}
- // filter JVM reserved keys and akka default reference.conf
+ /** filter JVM reserved keys and akka default reference.conf */
def filterOutDefaultConfig(input: Config): Config = {
val updated = filterOutJvmReservedKeys(input)
Util.filterOutOrigin(updated, "reference.conf")
}
- private[gearpump] def load(source: ClusterConfigSource) : Configs = {
+ private[gearpump] def load(source: ClusterConfigSource): Configs = {
val systemProperties = getSystemProperties
@@ -162,8 +164,8 @@ object ClusterConfig {
)
private def getSystemProperties: Config = {
- // exclude default java system properties
- JVM_RESERVED_PROPERTIES.foldLeft(ConfigFactory.systemProperties()) {(config, property) =>
+ // Excludes default java system properties
+ JVM_RESERVED_PROPERTIES.foldLeft(ConfigFactory.systemProperties()) { (config, property) =>
config.withoutPath(property)
}
}
@@ -177,5 +179,6 @@ object ClusterConfig {
filterJvmReservedKeys
}
- protected class Configs (val master: Config, val worker: Config, val ui: Config, val default: Config)
+ protected class Configs(
+ val master: Config, val worker: Config, val ui: Config, val default: Config)
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/ClusterConfigSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/ClusterConfigSource.scala b/core/src/main/scala/io/gearpump/cluster/ClusterConfigSource.scala
index 7e8a91d..3a248d7 100644
--- a/core/src/main/scala/io/gearpump/cluster/ClusterConfigSource.scala
+++ b/core/src/main/scala/io/gearpump/cluster/ClusterConfigSource.scala
@@ -19,9 +19,10 @@
package io.gearpump.cluster
import java.io.File
-import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
import scala.language.implicitConversions
+import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
+
/**
* Data Source of ClusterConfig
*
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala b/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala
index aac45ab..5a42ea3 100644
--- a/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala
+++ b/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,26 +18,25 @@
package io.gearpump.cluster
+import scala.util.Try
+
import akka.actor.ActorRef
import com.typesafe.config.Config
-import io.gearpump.{WorkerId, TimeStamp}
+
+import io.gearpump.TimeStamp
import io.gearpump.cluster.MasterToAppMaster.AppMasterStatus
import io.gearpump.cluster.master.MasterSummary
import io.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest}
-import io.gearpump.cluster.worker.WorkerSummary
+import io.gearpump.cluster.worker.{WorkerId, WorkerSummary}
import io.gearpump.metrics.Metrics.MetricType
-import scala.util.Try
-
-/**
- * Application Flow
- */
-
object ClientToMaster {
case object AddMaster
case class AddWorker(count: Int)
case class RemoveMaster(masterContainerId: String)
case class RemoveWorker(workerContainerId: String)
+
+ /** Command result of AddMaster, RemoveMaster, and etc... */
case class CommandResult(success: Boolean, exception: String = null) {
override def toString: String = {
val tag = getClass.getSimpleName
@@ -48,50 +47,89 @@ object ClientToMaster {
}
}
}
- case class SubmitApplication(appDescription: AppDescription, appJar: Option[AppJar], username : String = System.getProperty("user.name"))
+
+ /** Submit an application to master */
+ case class SubmitApplication(
+ appDescription: AppDescription, appJar: Option[AppJar],
+ username: String = System.getProperty("user.name"))
+
case class RestartApplication(appId: Int)
case class ShutdownApplication(appId: Int)
+
+ /** Client send ResolveAppId to Master to resolves AppMaster actor path by providing appId */
case class ResolveAppId(appId: Int)
+ /** Client send ResolveWorkerId to master to get the Actor path of worker. */
case class ResolveWorkerId(workerId: WorkerId)
+ /** Get an active Jar store to upload job jars, like wordcount.jar */
case object GetJarStoreServer
+ /** Service address of JarStore */
case class JarStoreServerAddress(url: String)
+ /** Query AppMaster config by providing appId */
case class QueryAppMasterConfig(appId: Int)
+ /** Query worker config */
case class QueryWorkerConfig(workerId: WorkerId)
+ /** Query master config */
case object QueryMasterConfig
+ /** Options for read the metrics from the cluster */
object ReadOption {
type ReadOption = String
val Key: String = "readOption"
+ /** Read the latest record of the metrics, only return 1 record for one metric name (id) */
val ReadLatest: ReadOption = "readLatest"
+ /** Read recent metrics from cluster, typically it contains metrics in 5 minutes */
val ReadRecent = "readRecent"
+ /**
+ * Read the history metrics, typically it contains metrics for 48 hours
+ *
+ * NOTE: Each hour only contain one or two data points.
+ */
val ReadHistory = "readHistory"
}
+ /** Query history metrics from master or app master. */
+ case class QueryHistoryMetrics(
+ path: String, readOption: ReadOption.ReadOption = ReadOption.ReadLatest,
+ aggregatorClazz: String = "", options: Map[String, String] = Map.empty[String, String])
- case class QueryHistoryMetrics(path: String, readOption: ReadOption.ReadOption = ReadOption.ReadLatest, aggregatorClazz: String = "", options: Map[String, String] = Map.empty[String, String])
-
+ /**
+ * If there are message loss, the clock would pause for a while. This message is used to
+ * pin-point which task has stalling clock value, and usually it means something wrong on
+ * that machine.
+ */
case class GetStallingTasks(appId: Int)
+ /**
+ * Request app master for a short list of cluster app that administrators should be aware of.
+ */
case class GetLastFailure(appId: Int)
}
object MasterToClient {
- case class SubmitApplicationResult(appId : Try[Int])
+
+ /** Result of SubmitApplication */
+ // TODO: Merge with SubmitApplicationResultValue and change this to (appId: Option, ex: Exception)
+ case class SubmitApplicationResult(appId: Try[Int])
+
case class SubmitApplicationResultValue(appId: Int)
- case class ShutdownApplicationResult(appId : Try[Int])
+
+ case class ShutdownApplicationResult(appId: Try[Int])
case class ReplayApplicationResult(appId: Try[Int])
+
+ /** Return Actor ref of app master */
case class ResolveAppIdResult(appMaster: Try[ActorRef])
+ /** Return Actor ref of worker */
case class ResolveWorkerIdResult(worker: Try[ActorRef])
case class AppMasterConfig(config: Config)
@@ -102,25 +140,64 @@ object MasterToClient {
case class HistoryMetricsItem(time: TimeStamp, value: MetricType)
+ /**
+ * History metrics returned from master, worker, or app master.
+ *
+ * All metric items are organized like a tree, path is used to navigate through the tree.
+ * For example, when querying with path == "executor0.task1.throughput*", the metrics
+ * provider picks metrics whose source matches the path.
+ *
+ * @param path The path client provided. The returned metrics are the result query of this path.
+ * @param metrics The detailed metrics.
+ */
case class HistoryMetrics(path: String, metrics: List[HistoryMetricsItem])
+ /** Return the last error of this streaming application job */
case class LastFailure(time: TimeStamp, error: String)
}
trait AppMasterRegisterData
object AppMasterToMaster {
- case class RegisterAppMaster(appMaster: ActorRef, registerData : AppMasterRegisterData)
+
+ /**
+ * Register an AppMaster by providing a ActorRef, and registerData
+ * @param registerData The registerData is provided by Master when starting the app master.
+ * App master should return the registerData back to master.
+ * Typically registerData hold some context information for this app Master.
+ */
+
+ case class RegisterAppMaster(appMaster: ActorRef, registerData: AppMasterRegisterData)
+
case class InvalidAppMaster(appId: Int, appMaster: String, reason: Throwable)
+
case class RequestResource(appId: Int, request: ResourceRequest)
+ /**
+ * Each application job can save some data in the distributed cluster storage on master nodes.
+ *
+ * @param appId App Id of the client application who send the request.
+ * @param key Key name
+ * @param value Value to store on distributed cluster storage on master nodes
+ */
case class SaveAppData(appId: Int, key: String, value: Any)
+
+ /** The application specific data is successfully stored */
case object AppDataSaved
+
+ /** Fail to store the application data */
case object SaveAppDataFailed
+ /** Fetch the application specific data that stored previously */
case class GetAppData(appId: Int, key: String)
+
+ /** The KV data returned for query GetAppData */
case class GetAppDataResult(key: String, value: Any)
+ /**
+ * AppMasterSummary returned to REST API query. Streaming and Non-streaming
+ * have very different application info. AppMasterSummary is the common interface.
+ */
trait AppMasterSummary {
def appType: String
def appId: Int
@@ -132,37 +209,43 @@ object AppMasterToMaster {
def user: String
}
+ /** Represents a generic application that is not a streaming job */
case class GeneralAppMasterSummary(
- appId: Int,
- appType: String = "general",
- appName: String = null,
- actorPath: String = null,
- status: AppMasterStatus = MasterToAppMaster.AppMasterActive,
- startTime: TimeStamp = 0L,
- uptime: TimeStamp = 0L,
- user: String = null)
+ appId: Int,
+ appType: String = "general",
+ appName: String = null,
+ actorPath: String = null,
+ status: AppMasterStatus = MasterToAppMaster.AppMasterActive,
+ startTime: TimeStamp = 0L,
+ uptime: TimeStamp = 0L,
+ user: String = null)
extends AppMasterSummary
+ /** Fetches the list of workers from Master */
case object GetAllWorkers
+
+ /** Get worker data of workerId */
case class GetWorkerData(workerId: WorkerId)
+
+ /** Response to GetWorkerData */
case class WorkerData(workerDescription: WorkerSummary)
+ /** Get Master data */
case object GetMasterData
+
+ /** Response to GetMasterData */
case class MasterData(masterDescription: MasterSummary)
}
object MasterToAppMaster {
- case class ResourceAllocated(allocations: Array[ResourceAllocation]){
- override def equals(other: Any): Boolean = {
- other match {
- case that: ResourceAllocated =>
- allocations.sortBy(_.workerId).sameElements(that.allocations.sortBy(_.workerId))
- case _ =>
- false
- }
- }
- }
+
+ /** Resource allocated for application xx */
+ case class ResourceAllocated(allocations: Array[ResourceAllocation])
+
+ /** Master confirm reception of RegisterAppMaster message */
case class AppMasterRegistered(appId: Int)
+
+ /** Shutdown the application job */
case object ShutdownAppMaster
type AppMasterStatus = String
@@ -171,7 +254,11 @@ object MasterToAppMaster {
val AppMasterNonExist: AppMasterStatus = "nonexist"
sealed trait StreamingType
- case class AppMasterData(status: AppMasterStatus, appId: Int = 0, appName: String = null, appMasterPath: String = null, workerPath: String = null, submissionTime: TimeStamp = 0, startTime: TimeStamp = 0, finishTime: TimeStamp = 0, user: String = null)
+ case class AppMasterData(
+ status: AppMasterStatus, appId: Int = 0, appName: String = null, appMasterPath: String = null,
+ workerPath: String = null, submissionTime: TimeStamp = 0, startTime: TimeStamp = 0,
+ finishTime: TimeStamp = 0, user: String = null)
+
case class AppMasterDataRequest(appId: Int, detail: Boolean = false)
case class AppMastersData(appMasters: List[AppMasterData])
@@ -185,8 +272,10 @@ object MasterToAppMaster {
}
object AppMasterToWorker {
- case class LaunchExecutor(appId: Int, executorId: Int, resource: Resource, executorJvmConfig: ExecutorJVMConfig)
- case class ShutdownExecutor(appId : Int, executorId : Int, reason : String)
+ case class LaunchExecutor(
+ appId: Int, executorId: Int, resource: Resource, executorJvmConfig: ExecutorJVMConfig)
+
+ case class ShutdownExecutor(appId: Int, executorId: Int, reason: String)
case class ChangeExecutorResource(appId: Int, executorId: Int, resource: Resource)
}
@@ -196,4 +285,3 @@ object WorkerToAppMaster {
case class ShutdownExecutorFailed(reason: String = null, ex: Throwable = null)
}
-
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/UserConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/UserConfig.scala b/core/src/main/scala/io/gearpump/cluster/UserConfig.scala
index 0756d54..61de1dd 100644
--- a/core/src/main/scala/io/gearpump/cluster/UserConfig.scala
+++ b/core/src/main/scala/io/gearpump/cluster/UserConfig.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,14 +18,15 @@
package io.gearpump.cluster
-import akka.actor.{ExtendedActorSystem, ActorSystem}
+import akka.actor.{ActorSystem, ExtendedActorSystem}
import akka.serialization.JavaSerializer
+
import io.gearpump.google.common.io.BaseEncoding
/**
* Immutable configuration
*/
-final class UserConfig(private val _config: Map[String, String]) extends Serializable{
+final class UserConfig(private val _config: Map[String, String]) extends Serializable {
def withBoolean(key: String, value: Boolean): UserConfig = {
new UserConfig(_config + (key -> value.toString))
@@ -39,7 +40,7 @@ final class UserConfig(private val _config: Map[String, String]) extends Serial
new UserConfig(_config + (key -> value.toString))
}
- def withInt(key: String, value: Int) : UserConfig = {
+ def withInt(key: String, value: Int): UserConfig = {
new UserConfig(_config + (key -> value.toString))
}
@@ -77,7 +78,7 @@ final class UserConfig(private val _config: Map[String, String]) extends Serial
_config.get(key).map(_.toFloat)
}
- def getInt(key : String) : Option[Int] = {
+ def getInt(key: String): Option[Int] = {
_config.get(key).map(_.toInt)
}
@@ -85,11 +86,11 @@ final class UserConfig(private val _config: Map[String, String]) extends Serial
_config.get(key).map(_.toLong)
}
- def getString(key : String) : Option[String] = {
+ def getString(key: String): Option[String] = {
_config.get(key)
}
- def getBytes(key: String) : Option[Array[Byte]] = {
+ def getBytes(key: String): Option[Array[Byte]] = {
_config.get(key).map(BaseEncoding.base64().decode(_))
}
@@ -101,31 +102,35 @@ final class UserConfig(private val _config: Map[String, String]) extends Serial
}
}
+ // scalastyle:off line.size.limit
/**
- * This will de-serialize value to object instance
+ * This de-serializes value to object instance
*
* To do de-serialization, this requires an implicit ActorSystem, as
* the ActorRef and possibly other akka classes deserialization
* requires an implicit ActorSystem.
*
- * @see [[http://doc.akka.io/docs/akka/snapshot/scala/serialization.html#A_Word_About_Java_Serialization]]
+ * See Link:
+ * http://doc.akka.io/docs/akka/snapshot/scala/serialization.html#A_Word_About_Java_Serialization
*/
- def getValue[T](key: String)(implicit system: ActorSystem): Option[T] = {
+
+ def getValue[T](key: String)(implicit system: ActorSystem): Option[T] = {
val serializer = new JavaSerializer(system.asInstanceOf[ExtendedActorSystem])
_config.get(key).map(BaseEncoding.base64().decode(_))
.map(serializer.fromBinary(_).asInstanceOf[T])
}
/**
- * This will serialize the object and store it as string.
+ * This serializes the object and store it as string.
*
* To do serialization, this requires an implicit ActorSystem, as
* the ActorRef and possibly other akka classes serialization
* requires an implicit ActorSystem.
*
- * @see [[http://doc.akka.io/docs/akka/snapshot/scala/serialization.html#A_Word_About_Java_Serialization]]
+ * See Link:
+ * http://doc.akka.io/docs/akka/snapshot/scala/serialization.html#A_Word_About_Java_Serialization
*/
- def withValue[T<: AnyRef](key: String, value: T)(implicit system: ActorSystem): UserConfig = {
+ def withValue[T <: AnyRef](key: String, value: T)(implicit system: ActorSystem): UserConfig = {
if (null == value) {
this
@@ -136,8 +141,9 @@ final class UserConfig(private val _config: Map[String, String]) extends Serial
this.withString(key, encoded)
}
}
+ // scalastyle:on line.size.limit
- def withConfig(other: UserConfig) = {
+ def withConfig(other: UserConfig): UserConfig = {
if (null == other) {
this
} else {
@@ -146,11 +152,11 @@ final class UserConfig(private val _config: Map[String, String]) extends Serial
}
}
-object UserConfig{
+object UserConfig {
- def empty = new UserConfig(Map.empty[String, String])
+ def empty: UserConfig = new UserConfig(Map.empty[String, String])
- def apply(config : Map[String, String]) = new UserConfig(config)
+ def apply(config: Map[String, String]): UserConfig = new UserConfig(config)
def unapply(config: UserConfig): Option[Map[String, String]] = Option(config._config)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala
index 1e34678..4e8582b 100644
--- a/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala
+++ b/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala
@@ -19,32 +19,34 @@
package io.gearpump.cluster.appmaster
import akka.actor._
+
import io.gearpump.cluster.AppMasterToMaster.RegisterAppMaster
import io.gearpump.cluster.appmaster.AppMasterRuntimeEnvironment._
import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.{Session, StartExecutorSystems}
import io.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus._
import io.gearpump.cluster.master.MasterProxy
-import io.gearpump.cluster.{AppMasterContext, AppDescription}
+import io.gearpump.cluster.{AppDescription, AppMasterContext}
import io.gearpump.util.LogUtil
/**
* This serves as runtime environment for AppMaster.
- * When starting an AppMaster, we need to setup the connection to master, and prepare other environemnts.
+ * When starting an AppMaster, we need to setup the connection to master,
+ * and prepare other environments.
*
* This also extend the function of Master, by providing a scheduler service for Executor System.
* AppMaster can ask Master for executor system directly. details like requesting resource,
- * contacting worker to start a process, and then starting an executor system is hidden from AppMaster.
+ * contacting worker to start a process, and then starting an executor system is hidden from
+ * AppMaster.
*
* Please use AppMasterRuntimeEnvironment.props() to construct this actor.
- *
*/
private[appmaster]
-class AppMasterRuntimeEnvironment (
+class AppMasterRuntimeEnvironment(
appContextInput: AppMasterContext,
app: AppDescription,
masters: Iterable[ActorPath],
masterFactory: (AppId, MasterActorRef) => Props,
- appMasterFactory: (AppMasterContext, AppDescription)=> Props,
+ appMasterFactory: (AppMasterContext, AppDescription) => Props,
masterConnectionKeeperFactory: (MasterActorRef, RegisterAppMaster, ListenerActorRef) => Props)
extends Actor {
@@ -52,15 +54,18 @@ class AppMasterRuntimeEnvironment (
private val LOG = LogUtil.getLogger(getClass, app = appId)
import scala.concurrent.duration._
- private val master = context.actorOf(masterFactory(appId, context.actorOf(Props(new MasterProxy(masters, 30 seconds)))))
+
+ private val master = context.actorOf(
+ masterFactory(appId, context.actorOf(Props(new MasterProxy(masters, 30.seconds)))))
private val appContext = appContextInput.copy(masterProxy = master)
- //create appMaster proxy to receive command and forward to appmaster
+ // Create appMaster proxy to receive command and forward to appmaster
private val appMaster = context.actorOf(appMasterFactory(appContext, app))
context.watch(appMaster)
private val registerAppMaster = RegisterAppMaster(appMaster, appContext.registerData)
- private val masterConnectionKeeper = context.actorOf(masterConnectionKeeperFactory(master, registerAppMaster, self))
+ private val masterConnectionKeeper = context.actorOf(
+ masterConnectionKeeperFactory(master, registerAppMaster, self))
context.watch(masterConnectionKeeper)
def receive: Receive = {
@@ -72,20 +77,21 @@ class AppMasterRuntimeEnvironment (
context.stop(self)
case Terminated(actor) => actor match {
case `appMaster` =>
- LOG.error (s"AppMaster ${appId} is stopped, shutdown myself")
- context.stop (self)
+ LOG.error(s"AppMaster ${appId} is stopped, shutdown myself")
+ context.stop(self)
case `masterConnectionKeeper` =>
- LOG.error (s"Master connection keeper is stopped, appId: ${appId}, shutdown myself")
- context.stop (self)
- case _ => //skip
+ LOG.error(s"Master connection keeper is stopped, appId: ${appId}, shutdown myself")
+ context.stop(self)
+ case _ => // Skip
}
}
}
object AppMasterRuntimeEnvironment {
-
- def props(masters: Iterable[ActorPath], app : AppDescription, appContextInput: AppMasterContext): Props = {
+ def props(
+ masters: Iterable[ActorPath], app: AppDescription, appContextInput: AppMasterContext)
+ : Props = {
val master = (appId: AppId, masterProxy: MasterActorRef) =>
MasterWithExecutorSystemProvider.props(appId, masterProxy)
@@ -93,26 +99,25 @@ object AppMasterRuntimeEnvironment {
val appMaster = (appContext: AppMasterContext, app: AppDescription) =>
LazyStartAppMaster.props(appContext, app)
- val masterConnectionKeeper =
- (master: MasterActorRef, registerAppMaster: RegisterAppMaster, listener: ListenerActorRef) =>
- Props(new MasterConnectionKeeper(registerAppMaster, master, masterStatusListener = listener))
+ val masterConnectionKeeper = (master: MasterActorRef, registerAppMaster:
+ RegisterAppMaster, listener: ListenerActorRef) => Props(new MasterConnectionKeeper(
+ registerAppMaster, master, masterStatusListener = listener))
Props(new AppMasterRuntimeEnvironment(
appContextInput, app, masters, master, appMaster, masterConnectionKeeper))
}
/**
- * This behavior as AppMaster, and will lazy start the real AppMaster. When real AppMaster
- * is not started yet, all messages will be stashed. The stashed messages will be forwarded to
- * real AppMaster when it is started.
+ * This behavior like a AppMaster. Under the hood, It start start the real AppMaster in a lazy
+ * way. When real AppMaster is not started yet, all messages are stashed. The stashed
+ * messages are forwarded to real AppMaster when the real AppMaster is started.
*
* Please use LazyStartAppMaster.props to construct this actor
*
- * @param appId
* @param appMasterProps underlying AppMaster Props
*/
private[appmaster]
- class LazyStartAppMaster (appId: Int, appMasterProps: Props) extends Actor with Stash {
+ class LazyStartAppMaster(appId: Int, appMasterProps: Props) extends Actor with Stash {
private val LOG = LogUtil.getLogger(getClass, app = appId)
@@ -151,14 +156,11 @@ object AppMasterRuntimeEnvironment {
private[appmaster] case object StartAppMaster
-
/**
* This enhance Master by providing new service: StartExecutorSystems
*
- * * Please use MasterWithExecutorSystemProvider.props to construct this actor
+ * Please use MasterWithExecutorSystemProvider.props to construct this actor
*
- * @param master
- * @param executorSystemProviderProps
*/
private[appmaster]
class MasterWithExecutorSystemProvider(master: ActorRef, executorSystemProviderProps: Props)
@@ -168,7 +170,7 @@ object AppMasterRuntimeEnvironment {
override def receive: Receive = {
case request: StartExecutorSystems =>
- executorSystemProvider forward request
+ executorSystemProvider forward request
case msg =>
master forward msg
}
@@ -181,13 +183,12 @@ object AppMasterRuntimeEnvironment {
val executorSystemLauncher = (appId: Int, session: Session) =>
Props(new ExecutorSystemLauncher(appId, session))
- val scheduler = Props(new ExecutorSystemScheduler(appId, master, executorSystemLauncher))
+ val scheduler = Props(new ExecutorSystemScheduler(appId, master, executorSystemLauncher))
Props(new MasterWithExecutorSystemProvider(master, scheduler))
}
}
-
private[appmaster] type AppId = Int
private[appmaster] type MasterActorRef = ActorRef
private[appmaster] type ListenerActorRef = ActorRef
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala
index 11414cf..5b3e0c5 100644
--- a/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala
+++ b/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,21 +15,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package io.gearpump.cluster.appmaster
import akka.actor.ActorRef
import com.typesafe.config.Config
+
import io.gearpump._
import io.gearpump.cluster.AppMasterRegisterData
+/** Run time info used to start an AppMaster */
case class AppMasterRuntimeInfo(
- appId: Int,
- // appName is the unique Id for an application
- appName: String,
- worker : ActorRef = null,
- user: String = null,
- submissionTime: TimeStamp = 0,
- startTime: TimeStamp = 0,
- finishTime: TimeStamp = 0,
- config: Config = null)
+ appId: Int,
+ // AppName is the unique Id for an application
+ appName: String,
+ worker: ActorRef = null,
+ user: String = null,
+ submissionTime: TimeStamp = 0,
+ startTime: TimeStamp = 0,
+ finishTime: TimeStamp = 0,
+ config: Config = null)
extends AppMasterRegisterData
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/appmaster/ApplicationState.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/ApplicationState.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/ApplicationState.scala
index 430c958..3b967f4 100644
--- a/core/src/main/scala/io/gearpump/cluster/appmaster/ApplicationState.scala
+++ b/core/src/main/scala/io/gearpump/cluster/appmaster/ApplicationState.scala
@@ -18,28 +18,30 @@
package io.gearpump.cluster.appmaster
-import io.gearpump.cluster.{AppJar, AppDescription}
+import io.gearpump.cluster.{AppDescription, AppJar}
/**
- * This state will be persisted across the masters.
- */
-case class ApplicationState(appId : Int, appName: String, attemptId : Int, app : AppDescription, jar: Option[AppJar], username : String, state : Any) extends Serializable {
+ * This state for single application, it is be distributed across the masters.
+ */
+case class ApplicationState(
+ appId: Int, appName: String, attemptId: Int, app: AppDescription, jar: Option[AppJar],
+ username: String, state: Any) extends Serializable {
- override def equals(other: Any): Boolean = {
- other match {
- case that: ApplicationState =>
- if (appId == that.appId && attemptId == that.attemptId) {
- true
- } else {
- false
- }
- case _ =>
- false
- }
- }
+ override def equals(other: Any): Boolean = {
+ other match {
+ case that: ApplicationState =>
+ if (appId == that.appId && attemptId == that.attemptId) {
+ true
+ } else {
+ false
+ }
+ case _ =>
+ false
+ }
+ }
- override def hashCode: Int = {
- import akka.routing.MurmurHash._
- extendHash(appId, attemptId, startMagicA, startMagicB)
- }
- }
+ override def hashCode: Int = {
+ import akka.routing.MurmurHash._
+ extendHash(appId, attemptId, startMagicA, startMagicB)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala
index 400b61c..6fcb5e7 100644
--- a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala
+++ b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala
@@ -19,22 +19,25 @@
package io.gearpump.cluster.appmaster
import akka.actor.{ActorRef, Address, PoisonPill}
-import io.gearpump.WorkerId
+
import io.gearpump.cluster.scheduler.Resource
+import io.gearpump.cluster.worker.WorkerId
import io.gearpump.util.ActorSystemBooter.BindLifeCycle
case class WorkerInfo(workerId: WorkerId, ref: ActorRef)
/**
- * This contains JVM configurations to start an executor system
+ * Configurations to start an executor system on remote machine
+ *
+ * @param address Remote address where we start an Actor System.
*/
case class ExecutorSystem(executorSystemId: Int, address: Address, daemon:
-ActorRef, resource: Resource, worker: WorkerInfo) {
+ ActorRef, resource: Resource, worker: WorkerInfo) {
def bindLifeCycleWith(actor: ActorRef): Unit = {
daemon ! BindLifeCycle(actor)
}
- def shutdown: Unit = {
+ def shutdown(): Unit = {
daemon ! PoisonPill
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala
index 80af748..78432f4 100644
--- a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala
+++ b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala
@@ -18,41 +18,42 @@
package io.gearpump.cluster.appmaster
+import scala.concurrent.duration._
+
import akka.actor._
+import org.slf4j.Logger
+
import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor
import io.gearpump.cluster.ExecutorJVMConfig
-import io.gearpump.cluster.scheduler.Resource
-import io.gearpump.util.{Constants, ActorSystemBooter, ActorUtil, LogUtil}
import io.gearpump.cluster.WorkerToAppMaster._
import io.gearpump.cluster.appmaster.ExecutorSystemLauncher._
import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, Session}
-import io.gearpump.util.ActorSystemBooter.{ActorSystemRegistered, BindLifeCycle, RegisterActorSystem}
-import org.slf4j.Logger
-
-import scala.concurrent.duration._
+import io.gearpump.cluster.scheduler.Resource
+import io.gearpump.util.ActorSystemBooter.{ActorSystemRegistered, RegisterActorSystem}
+import io.gearpump.util.{ActorSystemBooter, ActorUtil, Constants, LogUtil}
/**
* This launches single executor system on target worker.
*
* Please use ExecutorSystemLauncher.props() to construct this actor
*
- * @param appId
* @param session The session that request to launch executor system
*/
private[appmaster]
-class ExecutorSystemLauncher (appId: Int, session: Session) extends Actor {
+class ExecutorSystemLauncher(appId: Int, session: Session) extends Actor {
private val LOG: Logger = LogUtil.getLogger(getClass)
val scheduler = context.system.scheduler
implicit val executionContext = context.dispatcher
- val timeoutSetting = context.system.settings.config.getInt(Constants.GEARPUMP_START_EXECUTOR_SYSTEM_TIMEOUT_MS)
+ private val systemConfig = context.system.settings.config
+ val timeoutSetting = systemConfig.getInt(Constants.GEARPUMP_START_EXECUTOR_SYSTEM_TIMEOUT_MS)
- val timeout = scheduler.scheduleOnce(timeoutSetting milliseconds,
+ val timeout = scheduler.scheduleOnce(timeoutSetting.milliseconds,
self, LaunchExecutorSystemTimeout(session))
- def receive : Receive = waitForLaunchCommand
+ def receive: Receive = waitForLaunchCommand
def waitForLaunchCommand: Receive = {
case LaunchExecutorSystem(worker, executorSystemId, resource) =>
@@ -61,23 +62,27 @@ class ExecutorSystemLauncher (appId: Int, session: Session) extends Actor {
.map(getExecutorJvmConfig(_, s"app${appId}system${executorSystemId}", launcherPath)).orNull
val launch = LaunchExecutor(appId, executorSystemId, resource, jvmConfig)
- LOG.info(s"Launching Executor ...appId: $appId, executorSystemId: $executorSystemId, slots: ${resource.slots} on worker $worker")
+ LOG.info(s"Launching Executor ...appId: $appId, executorSystemId: $executorSystemId, " +
+ s"slots: ${resource.slots} on worker $worker")
worker.ref ! launch
context.become(waitForActorSystemToStart(sender, launch, worker, executorSystemId))
}
- def waitForActorSystemToStart(replyTo: ActorRef, launch: LaunchExecutor, worker: WorkerInfo, executorSystemId: Int) : Receive = {
+ def waitForActorSystemToStart(
+ replyTo: ActorRef, launch: LaunchExecutor, worker: WorkerInfo, executorSystemId: Int)
+ : Receive = {
case RegisterActorSystem(systemPath) =>
import launch._
timeout.cancel()
LOG.info(s"Received RegisterActorSystem $systemPath for session ${session.requestor}")
sender ! ActorSystemRegistered(worker.ref)
- val system = ExecutorSystem(executorId, AddressFromURIString(systemPath), sender, resource, worker)
+ val system =
+ ExecutorSystem(executorId, AddressFromURIString(systemPath), sender, resource, worker)
replyTo ! LaunchExecutorSystemSuccess(system, session)
context.stop(self)
- case reject @ ExecutorLaunchRejected(reason, ex) =>
- LOG.error(s"Executor Launch ${launch.resource} failed reason:$reason", ex)
+ case reject@ExecutorLaunchRejected(reason, ex) =>
+ LOG.error(s"Executor Launch ${launch.resource} failed reason: $reason", ex)
replyTo ! LaunchExecutorSystemRejected(launch.resource, reason, session)
context.stop(self)
case timeout: LaunchExecutorSystemTimeout =>
@@ -89,6 +94,7 @@ class ExecutorSystemLauncher (appId: Int, session: Session) extends Actor {
private[appmaster]
object ExecutorSystemLauncher {
+
case class LaunchExecutorSystem(worker: WorkerInfo, systemId: Int, resource: Resource)
case class LaunchExecutorSystemSuccess(system: ExecutorSystem, session: Session)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala
index aa980b5..c5ec600 100644
--- a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala
+++ b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala
@@ -18,29 +18,29 @@
package io.gearpump.cluster.appmaster
+import scala.concurrent.duration._
+
import akka.actor._
import com.typesafe.config.Config
-import io.gearpump.WorkerId
+
import io.gearpump.cluster.AppMasterToMaster.RequestResource
import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated
import io.gearpump.cluster._
import io.gearpump.cluster.appmaster.ExecutorSystemLauncher._
import io.gearpump.cluster.appmaster.ExecutorSystemScheduler._
import io.gearpump.cluster.scheduler.{ResourceAllocation, ResourceRequest}
+import io.gearpump.cluster.worker.WorkerId
import io.gearpump.util.{Constants, LogUtil}
-import scala.concurrent.duration._
-
/**
* ExecutorSystem is also a type of resource, this class schedules ExecutorSystem for AppMaster.
* AppMaster can use this class to directly request a live executor actor systems. The communication
* in the background with Master and Worker is hidden from AppMaster.
*
* Please use ExecutorSystemScheduler.props() to construct this actor
- *
-*/
+ */
private[appmaster]
-class ExecutorSystemScheduler (appId: Int, masterProxy: ActorRef,
+class ExecutorSystemScheduler(appId: Int, masterProxy: ActorRef,
executorSystemLauncher: (Int, Session) => Props) extends Actor {
private val LOG = LogUtil.getLogger(getClass, app = appId)
@@ -50,18 +50,22 @@ class ExecutorSystemScheduler (appId: Int, masterProxy: ActorRef,
var resourceAgents = Map.empty[Session, ActorRef]
- def receive: Receive = clientCommands orElse resourceAllocationMessageHandler orElse executorSystemMessageHandler
+ def receive: Receive = {
+ clientCommands orElse resourceAllocationMessageHandler orElse executorSystemMessageHandler
+ }
def clientCommands: Receive = {
case start: StartExecutorSystems =>
- LOG.info(s"starting executor systems (ExecutorSystemConfig(${start.executorSystemConfig}), Resources(${start.resources.mkString(",")}))")
+ LOG.info(s"starting executor systems (ExecutorSystemConfig(${start.executorSystemConfig}), " +
+ s"Resources(${start.resources.mkString(",")}))")
val requestor = sender()
val executorSystemConfig = start.executorSystemConfig
- val session = Session(requestor, executorSystemConfig)
- val agent = resourceAgents.getOrElse(session, context.actorOf(Props(new ResourceAgent(masterProxy, session))))
+ val session = Session(requestor, executorSystemConfig)
+ val agent = resourceAgents.getOrElse(session,
+ context.actorOf(Props(new ResourceAgent(masterProxy, session))))
resourceAgents = resourceAgents + (session -> agent)
- start.resources.foreach {resource =>
+ start.resources.foreach { resource =>
agent ! RequestResource(appId, resource)
}
@@ -87,14 +91,15 @@ class ExecutorSystemScheduler (appId: Int, masterProxy: ActorRef,
}
}
- def executorSystemMessageHandler : Receive = {
+ def executorSystemMessageHandler: Receive = {
case LaunchExecutorSystemSuccess(system, session) =>
if (isSessionAlive(session)) {
LOG.info("LaunchExecutorSystemSuccess, send back to " + session.requestor)
system.bindLifeCycleWith(self)
session.requestor ! ExecutorSystemStarted(system, session.executorSystemJvmConfig.jar)
} else {
- LOG.error("We get a ExecutorSystem back, but resource requestor is no longer valid. Will shutdown the allocated system")
+ LOG.error("We get a ExecutorSystem back, but resource requestor is no longer valid. " +
+ "Will shutdown the allocated system")
system.shutdown
}
case LaunchExecutorSystemTimeout(session) =>
@@ -105,8 +110,11 @@ class ExecutorSystemScheduler (appId: Int, masterProxy: ActorRef,
case LaunchExecutorSystemRejected(resource, reason, session) =>
if (isSessionAlive(session)) {
- LOG.error(s"Failed to launch executor system, due to $reason, will ask master to allocate new resources $resource")
- resourceAgents.get(session).map(_ ! RequestResource(appId, ResourceRequest(resource, WorkerId.unspecified)))
+ LOG.error(s"Failed to launch executor system, due to $reason, " +
+ s"will ask master to allocate new resources $resource")
+ resourceAgents.get(session).map { resourceAgent: ActorRef =>
+ resourceAgent ! RequestResource(appId, ResourceRequest(resource, WorkerId.unspecified))
+ }
}
}
@@ -117,27 +125,28 @@ class ExecutorSystemScheduler (appId: Int, masterProxy: ActorRef,
object ExecutorSystemScheduler {
- case class StartExecutorSystems(resources: Array[ResourceRequest], executorSystemConfig: ExecutorSystemJvmConfig)
+ case class StartExecutorSystems(
+ resources: Array[ResourceRequest], executorSystemConfig: ExecutorSystemJvmConfig)
+
case class ExecutorSystemStarted(system: ExecutorSystem, boundedJar: Option[AppJar])
+
case class StopExecutorSystem(system: ExecutorSystem)
- case object StartExecutorSystemTimeout
- case class ExecutorSystemJvmConfig(classPath : Array[String], jvmArguments : Array[String],
- jar: Option[AppJar], username : String, executorAkkaConfig: Config = null)
+ case object StartExecutorSystemTimeout
+ case class ExecutorSystemJvmConfig(classPath: Array[String], jvmArguments: Array[String],
+ jar: Option[AppJar], username: String, executorAkkaConfig: Config = null)
/**
* For each client which ask for an executor system, the scheduler will create a session for it.
*
- * @param requestor
- * @param executorSystemJvmConfig
*/
- private [appmaster]
+ private[appmaster]
case class Session(requestor: ActorRef, executorSystemJvmConfig: ExecutorSystemJvmConfig)
/**
* This is a agent for session to request resource
- * @param master
+ *
* @param session the original requester of the resource requests
*/
private[appmaster]
@@ -145,17 +154,19 @@ object ExecutorSystemScheduler {
private var resourceRequestor: ActorRef = null
var timeOutClock: Cancellable = null
private var unallocatedResource: Int = 0
+
import context.dispatcher
- import Constants._
- val timeout = context.system.settings.config.getInt(GEARPUMP_RESOURCE_ALLOCATION_TIMEOUT)
+ import io.gearpump.util.Constants._
+ val timeout = context.system.settings.config.getInt(GEARPUMP_RESOURCE_ALLOCATION_TIMEOUT)
def receive: Receive = {
case request: RequestResource =>
unallocatedResource += request.request.resource.slots
Option(timeOutClock).map(_.cancel)
- timeOutClock = context.system.scheduler.scheduleOnce(timeout seconds, self, ResourceAllocationTimeOut(session))
+ timeOutClock = context.system.scheduler.scheduleOnce(
+ timeout.seconds, self, ResourceAllocationTimeOut(session))
resourceRequestor = sender
master ! request
case ResourceAllocated(allocations) =>
@@ -164,7 +175,7 @@ object ExecutorSystemScheduler {
case timeout: ResourceAllocationTimeOut =>
if (unallocatedResource > 0) {
resourceRequestor ! ResourceAllocationTimeOut(session)
- //we will not receive any ResourceAllocation after timeout
+ // We will not receive any ResourceAllocation after timeout
context.stop(self)
}
}
@@ -175,4 +186,5 @@ object ExecutorSystemScheduler {
private[ExecutorSystemScheduler]
case class ResourceAllocationTimeOut(session: Session)
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeper.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeper.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeper.scala
index 300c4ea..f8c8503 100644
--- a/core/src/main/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeper.scala
+++ b/core/src/main/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeper.scala
@@ -19,31 +19,27 @@
package io.gearpump.cluster.appmaster
import java.util.concurrent.TimeUnit
+import scala.concurrent.duration.FiniteDuration
import akka.actor._
-import io.gearpump.cluster.master.MasterProxy.MasterRestarted
+
import io.gearpump.cluster.AppMasterToMaster.RegisterAppMaster
import io.gearpump.cluster.MasterToAppMaster.AppMasterRegistered
import io.gearpump.cluster.appmaster.MasterConnectionKeeper.AppMasterRegisterTimeout
import io.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus.{MasterConnected, MasterStopped}
-import io.gearpump.cluster.master.MasterProxy.WatchMaster
+import io.gearpump.cluster.master.MasterProxy.{MasterRestarted, WatchMaster}
import io.gearpump.util.LogUtil
-import scala.concurrent.duration.FiniteDuration
-
/**
- * This will watch the liveness of Master.
- * When Master is restarted, it will send RegisterAppMaster to the new Master instance.
- * If Master is stopped, it will send the MasterConnectionStatus to listener
+ * Watches the liveness of Master.
*
- * please use MasterConnectionKeeper.props() to construct this actor
+ * When Master is restarted, it sends RegisterAppMaster to the new Master instance.
+ * If Master is stopped, it sends the MasterConnectionStatus to listener
*
- * @param register
- * @param masterProxy
- * @param masterStatusListener
+ * please use MasterConnectionKeeper.props() to construct this actor
*/
private[appmaster]
-class MasterConnectionKeeper (
+class MasterConnectionKeeper(
register: RegisterAppMaster, masterProxy: ActorRef, masterStatusListener: ActorRef)
extends Actor {
@@ -52,12 +48,13 @@ class MasterConnectionKeeper (
private val LOG = LogUtil.getLogger(getClass)
private var master: ActorRef = null
- //Subscribe self to masterProxy,
+ // Subscribe self to masterProxy,
masterProxy ! WatchMaster(self)
def registerAppMaster: Cancellable = {
masterProxy ! register
- context.system.scheduler.scheduleOnce(FiniteDuration(30, TimeUnit.SECONDS), self, AppMasterRegisterTimeout)
+ context.system.scheduler.scheduleOnce(FiniteDuration(30, TimeUnit.SECONDS),
+ self, AppMasterRegisterTimeout)
}
context.become(waitMasterToConfirm(registerAppMaster))
@@ -87,10 +84,15 @@ class MasterConnectionKeeper (
}
private[appmaster] object MasterConnectionKeeper {
+
case object AppMasterRegisterTimeout
object MasterConnectionStatus {
+
case object MasterConnected
+
case object MasterStopped
+
}
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala b/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala
index 3a2868c..41c01d8 100644
--- a/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala
+++ b/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,10 +19,16 @@
package io.gearpump.cluster.client
import java.util.concurrent.TimeUnit
+import scala.collection.JavaConverters._
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
+import scala.util.Try
import akka.actor.{ActorRef, ActorSystem}
import akka.util.Timeout
-import com.typesafe.config.{ConfigValueFactory, Config}
+import com.typesafe.config.{Config, ConfigValueFactory}
+import org.slf4j.Logger
+
import io.gearpump.cluster.MasterToAppMaster.{AppMastersData, ReplayFromTimestampWindowTrailingEdge}
import io.gearpump.cluster.MasterToClient.ReplayApplicationResult
import io.gearpump.cluster._
@@ -30,17 +36,12 @@ import io.gearpump.cluster.master.MasterProxy
import io.gearpump.jarstore.JarStoreService
import io.gearpump.util.Constants._
import io.gearpump.util.{ActorUtil, Constants, LogUtil, Util}
-import org.slf4j.Logger
-
-import scala.collection.JavaConversions._
-import scala.concurrent.{Await, Future}
-import scala.concurrent.duration.Duration
-import scala.util.Try
/**
* ClientContext is a user facing util to submit/manage an application.
+ *
+ * TODO: add interface to query master here
*/
-//TODO: add interface to query master here
class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) {
def this(system: ActorSystem) = {
@@ -54,7 +55,7 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) {
private val LOG: Logger = LogUtil.getLogger(getClass)
private implicit val timeout = Timeout(5, TimeUnit.SECONDS)
- implicit val system = Option(sys).getOrElse(ActorSystem(s"client${Util.randInt}" , config))
+ implicit val system = Option(sys).getOrElse(ActorSystem(s"client${Util.randInt()}", config))
LOG.info(s"Starting system ${system.name}")
val shouldCleanupSystem = Option(sys).isEmpty
@@ -62,16 +63,18 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) {
jarStoreService.init(config, system)
private lazy val master: ActorRef = {
- val masters = config.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).toList.flatMap(Util.parseHostList)
- val master = Option(_master).getOrElse(system.actorOf(MasterProxy.props(masters), s"masterproxy${system.name}"))
+ val masters = config.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala
+ .flatMap(Util.parseHostList)
+ val master = Option(_master).getOrElse(system.actorOf(MasterProxy.props(masters),
+ s"masterproxy${system.name}"))
LOG.info(s"Creating master proxy ${master} for master list: $masters")
master
}
/**
- * Submit an application with default jar setting. Use java property
- * "gearpump.app.jar" if defined. Otherwise, will assume the jar is on
- * the target runtime classpath, and will not send it.
+ * Submits an application with default jar setting. Use java property "gearpump.app.jar" if
+ * defined. Otherwise, it assumes the jar is on the target runtime classpath, thus will
+ * not send the jar across the wire.
*/
def submit(app: Application): Int = {
submit(app, System.getProperty(GEARPUMP_APP_JAR))
@@ -86,7 +89,8 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) {
val appName = checkAndAddNamePrefix(app.name, System.getProperty(GEARPUMP_APP_NAME_PREFIX))
val submissionConfig = getSubmissionConfig(config)
.withValue(APPLICATION_EXECUTOR_NUMBER, ConfigValueFactory.fromAnyRef(executorNum))
- val appDescription = AppDescription(appName, app.appMaster.getName, app.userConfig, submissionConfig)
+ val appDescription =
+ AppDescription(appName, app.appMaster.getName, app.userConfig, submissionConfig)
val appJar = Option(jar).map(loadFile)
client.submitApplication(appDescription, appJar)
}
@@ -99,9 +103,11 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) {
ClusterConfig.filterOutDefaultConfig(config)
}
- def replayFromTimestampWindowTrailingEdge(appId : Int): ReplayApplicationResult = {
+ def replayFromTimestampWindowTrailingEdge(appId: Int): ReplayApplicationResult = {
import scala.concurrent.ExecutionContext.Implicits.global
- val result = Await.result(ActorUtil.askAppMaster[ReplayApplicationResult](master, appId,ReplayFromTimestampWindowTrailingEdge(appId)), Duration.Inf)
+ val result = Await.result(
+ ActorUtil.askAppMaster[ReplayApplicationResult](master,
+ appId, ReplayFromTimestampWindowTrailingEdge(appId)), Duration.Inf)
result
}
@@ -115,30 +121,30 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) {
client.listApplications
}
- def shutdown(appId : Int) : Unit = {
+ def shutdown(appId: Int): Unit = {
val client = getMasterClient
client.shutdownApplication(appId)
}
- def resolveAppID(appId: Int) : ActorRef = {
+ def resolveAppID(appId: Int): ActorRef = {
val client = getMasterClient
client.resolveAppId(appId)
}
- def close() : Unit = {
+ def close(): Unit = {
if (shouldCleanupSystem) {
LOG.info(s"Shutting down system ${system.name}")
- system.shutdown()
+ system.terminate()
}
}
- private def loadFile(jarPath : String) : AppJar = {
+ private def loadFile(jarPath: String): AppJar = {
val jarFile = new java.io.File(jarPath)
val path = jarStoreService.copyFromLocal(jarFile)
AppJar(jarFile.getName, path)
}
- private def checkAndAddNamePrefix(appName: String, namePrefix: String) : String = {
+ private def checkAndAddNamePrefix(appName: String, namePrefix: String): String = {
val fullName = if (namePrefix != null && namePrefix != "") {
namePrefix + "_" + appName
} else {
@@ -163,12 +169,17 @@ object ClientContext {
def apply(): ClientContext = new ClientContext(ClusterConfig.default(), null, null)
- def apply(system: ActorSystem) = new ClientContext(ClusterConfig.default(), system, null)
+ def apply(system: ActorSystem): ClientContext = {
+ new ClientContext(ClusterConfig.default(), system, null)
+ }
- def apply(system: ActorSystem, master: ActorRef) = new ClientContext(ClusterConfig.default(), system, master)
+ def apply(system: ActorSystem, master: ActorRef): ClientContext = {
+ new ClientContext(ClusterConfig.default(), system, master)
+ }
def apply(config: Config): ClientContext = new ClientContext(config, null, null)
- def apply(config: Config, system: ActorSystem, master: ActorRef): ClientContext
- = new ClientContext(config, system, master)
+ def apply(config: Config, system: ActorSystem, master: ActorRef): ClientContext = {
+ new ClientContext(config, system, master)
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/client/MasterClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/client/MasterClient.scala b/core/src/main/scala/io/gearpump/cluster/client/MasterClient.scala
index 5800e8d..9edaf46 100644
--- a/core/src/main/scala/io/gearpump/cluster/client/MasterClient.scala
+++ b/core/src/main/scala/io/gearpump/cluster/client/MasterClient.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,31 +18,36 @@
package io.gearpump.cluster.client
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
+import scala.util.{Failure, Success}
+
import akka.actor.ActorRef
import akka.pattern.ask
import akka.util.Timeout
-import io.gearpump.cluster.ClientToMaster._
-import io.gearpump.cluster.MasterToAppMaster.{AppMastersData, AppMastersDataRequest, ReplayFromTimestampWindowTrailingEdge}
-import io.gearpump.cluster.MasterToClient.{ReplayApplicationResult, ResolveAppIdResult, ShutdownApplicationResult, SubmitApplicationResult}
-import io.gearpump.cluster.{AppJar, AppDescription}
-import io.gearpump.util.{ActorUtil, Constants}
-import scala.concurrent.duration.Duration
-import scala.concurrent.{ExecutionContext, Await, Future}
-import scala.util.{Failure, Success}
+import io.gearpump.cluster.ClientToMaster._
+import io.gearpump.cluster.MasterToAppMaster.{AppMastersData, AppMastersDataRequest}
+import io.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult, SubmitApplicationResult}
+import io.gearpump.cluster.{AppDescription, AppJar}
/**
- * Client to Master node.
- * Stateless, thread safe
+ * Client to inter-operate with Master node.
+ *
+ * NOTE: Stateless, thread safe
*/
-class MasterClient(master : ActorRef, timeout: Timeout) {
+class MasterClient(master: ActorRef, timeout: Timeout) {
implicit val masterClientTimeout = timeout
- def submitApplication(app : AppDescription, appJar: Option[AppJar]) : Int = {
- val result = Await.result( (master ? SubmitApplication(app, appJar)).asInstanceOf[Future[SubmitApplicationResult]], Duration.Inf)
+ def submitApplication(app: AppDescription, appJar: Option[AppJar]): Int = {
+ val result = Await.result(
+ (master ? SubmitApplication(app, appJar)).asInstanceOf[Future[SubmitApplicationResult]],
+ Duration.Inf)
val appId = result.appId match {
case Success(appId) =>
+ // scalastyle:off println
Console.println(s"Submit application succeed. The application id is $appId")
+ // scalastyle:on println
appId
case Failure(ex) => throw ex
}
@@ -50,15 +55,18 @@ class MasterClient(master : ActorRef, timeout: Timeout) {
}
def resolveAppId(appId: Int): ActorRef = {
- val result = Await.result((master ? ResolveAppId(appId)).asInstanceOf[Future[ResolveAppIdResult]], Duration.Inf)
+ val result = Await.result(
+ (master ? ResolveAppId(appId)).asInstanceOf[Future[ResolveAppIdResult]], Duration.Inf)
result.appMaster match {
case Success(appMaster) => appMaster
case Failure(ex) => throw ex
}
}
- def shutdownApplication(appId : Int) : Unit = {
- val result = Await.result((master ? ShutdownApplication(appId)).asInstanceOf[Future[ShutdownApplicationResult]], Duration.Inf)
+ def shutdownApplication(appId: Int): Unit = {
+ val result = Await.result(
+ (master ? ShutdownApplication(appId)).asInstanceOf[Future[ShutdownApplicationResult]],
+ Duration.Inf)
result.appId match {
case Success(_) =>
case Failure(ex) => throw ex
@@ -66,7 +74,8 @@ class MasterClient(master : ActorRef, timeout: Timeout) {
}
def listApplications: AppMastersData = {
- val result = Await.result((master ? AppMastersDataRequest).asInstanceOf[Future[AppMastersData]], Duration.Inf)
+ val result = Await.result(
+ (master ? AppMastersDataRequest).asInstanceOf[Future[AppMastersData]], Duration.Inf)
result
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/main/ArgumentsParser.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/main/ArgumentsParser.scala b/core/src/main/scala/io/gearpump/cluster/main/ArgumentsParser.scala
index f290241..209f831 100644
--- a/core/src/main/scala/io/gearpump/cluster/main/ArgumentsParser.scala
+++ b/core/src/main/scala/io/gearpump/cluster/main/ArgumentsParser.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,39 +20,44 @@ package io.gearpump.cluster.main
import io.gearpump.cluster.main.ArgumentsParser.Syntax
-case class CLIOption[+T] (description:String = "", required: Boolean = false, defaultValue: Option[T] = None)
+case class CLIOption[+T](
+ description: String = "", required: Boolean = false, defaultValue: Option[T] = None)
-class ParseResult(optionMap : Map[String, String], remainArguments : Array[String]) {
- def getInt(key : String) = optionMap.get(key).get.toInt
+class ParseResult(optionMap: Map[String, String], remainArguments: Array[String]) {
+ def getInt(key: String): Int = optionMap.get(key).get.toInt
- def getString (key : String) = optionMap.get(key).get
+ def getString(key: String): String = optionMap.get(key).get
- def getBoolean (key : String) = optionMap.get(key).get.toBoolean
+ def getBoolean(key: String): Boolean = optionMap.get(key).get.toBoolean
- def exists(key : String) = !optionMap.getOrElse(key,"").isEmpty
+ def exists(key: String): Boolean = !(optionMap.getOrElse(key, "").isEmpty)
- def remainArgs : Array[String] = this.remainArguments
+ def remainArgs: Array[String] = this.remainArguments
}
/**
- * Parse command line arguments
+ * Parser for command line arguments
+ *
* Grammar: -option1 value1 -option2 value3 -flag1 -flag2 remainArg1 remainArg2...
*/
-trait ArgumentsParser {
+trait ArgumentsParser {
val ignoreUnknownArgument = false
- def help: Unit = {
- Console.err.println(s"\nHelp: $description")
+ // scalastyle:off println
+ def help(): Unit = {
+ Console.println(s"\nHelp: $description")
var usage = List.empty[String]
- options.map(kv => if(kv._2.required) {
+ options.map(kv => if (kv._2.required) {
usage = usage :+ s"-${kv._1} (required:${kv._2.required})${kv._2.description}"
} else {
- usage = usage :+ s"-${kv._1} (required:${kv._2.required}, default:${kv._2.defaultValue.getOrElse("")})${kv._2.description}"
+ usage = usage :+ s"-${kv._1} (required:${kv._2.required}, " +
+ s"default:${kv._2.defaultValue.getOrElse("")})${kv._2.description}"
})
usage :+= remainArgs.map(k => s"<$k>").mkString(" ")
- usage.foreach(Console.err.println(_))
+ usage.foreach(Console.println(_))
}
+ // scalastyle:on println
def parse(args: Array[String]): ParseResult = {
val syntax = Syntax(options, remainArgs, ignoreUnknownArgument)
@@ -60,16 +65,18 @@ trait ArgumentsParser {
}
val description: String = ""
- val options : Array[(String, CLIOption[Any])] = Array.empty[(String, CLIOption[Any])]
- val remainArgs : Array[String] = Array.empty[String]
+ val options: Array[(String, CLIOption[Any])] = Array.empty[(String, CLIOption[Any])]
+ val remainArgs: Array[String] = Array.empty[String]
}
object ArgumentsParser {
- case class Syntax(val options: Array[(String, CLIOption[Any])], val remainArgs : Array[String], val ignoreUnknownArgument: Boolean)
+ case class Syntax(
+ val options: Array[(String, CLIOption[Any])], val remainArgs: Array[String],
+ val ignoreUnknownArgument: Boolean)
def parse(syntax: Syntax, args: Array[String]): ParseResult = {
- import syntax.{options, remainArgs, ignoreUnknownArgument}
+ import syntax.{ignoreUnknownArgument, options, remainArgs}
var config = Map.empty[String, String]
var remain = Array.empty[String]
@@ -100,14 +107,16 @@ object ArgumentsParser {
doParse(rest)
case value :: rest =>
+ // scalastyle:off println
Console.err.println(s"Warning: get unknown argument $value, maybe it is a main class")
+ // scalastyle:on println
remain ++= value :: rest
doParse(Nil)
}
}
doParse(args.toList)
- options.foreach{pair =>
+ options.foreach { pair =>
val (key, option) = pair
if (!config.contains(key) && !option.required) {
config += key -> option.defaultValue.getOrElse("").toString
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala b/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala
index d391bba..fb3e5c4 100644
--- a/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala
+++ b/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala
@@ -19,36 +19,36 @@
package io.gearpump.cluster.master
import java.util.concurrent.{TimeUnit, TimeoutException}
+import scala.collection.JavaConverters._
+import scala.concurrent.duration.Duration
+import scala.util.{Failure, Success}
import akka.actor.{Actor, ActorRef, Props, _}
import com.typesafe.config.Config
-import io.gearpump.cluster.{AppJar, AppDescription}
+import org.slf4j.Logger
+
import io.gearpump.cluster.AppMasterToMaster.RequestResource
import io.gearpump.cluster.AppMasterToWorker.{LaunchExecutor, ShutdownExecutor}
import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated
import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
import io.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected
-import io.gearpump.cluster._
-import io.gearpump.cluster.appmaster.{WorkerInfo, AppMasterRuntimeEnvironment, AppMasterRuntimeInfo}
+import io.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, AppMasterRuntimeInfo, WorkerInfo}
import io.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest}
+import io.gearpump.cluster.worker.WorkerId
+import io.gearpump.cluster.{AppDescription, AppJar, _}
import io.gearpump.transport.HostPort
import io.gearpump.util.ActorSystemBooter._
import io.gearpump.util.Constants._
import io.gearpump.util.{ActorSystemBooter, ActorUtil, LogUtil, Util}
-import org.slf4j.Logger
-import scala.collection.JavaConverters._
-import scala.concurrent.duration.Duration
-import scala.util.{Failure, Success}
-import io.gearpump.WorkerId
/**
*
* AppMasterLauncher is a child Actor of AppManager, it is responsible
* to launch the AppMaster on the cluster.
*/
class AppMasterLauncher(
- appId : Int, executorId: Int, app : AppDescription,
- jar: Option[AppJar], username : String, master : ActorRef, client: Option[ActorRef])
+ appId: Int, executorId: Int, app: AppDescription,
+ jar: Option[AppJar], username: String, master: ActorRef, client: Option[ActorRef])
extends Actor {
private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
@@ -61,9 +61,9 @@ class AppMasterLauncher(
LOG.info(s"Ask Master resource to start AppMaster $appId...")
master ! RequestResource(appId, ResourceRequest(Resource(1), WorkerId.unspecified))
- def receive : Receive = waitForResourceAllocation
+ def receive: Receive = waitForResourceAllocation
- def waitForResourceAllocation : Receive = {
+ def waitForResourceAllocation: Receive = {
case ResourceAllocated(allocations) =>
val ResourceAllocation(resource, worker, workerId) = allocations(0)
@@ -74,22 +74,27 @@ class AppMasterLauncher(
val appMasterInfo = AppMasterRuntimeInfo(appId, app.name, worker, username,
submissionTime, config = appMasterAkkaConfig)
val workerInfo = WorkerInfo(workerId, worker)
- val appMasterContext = AppMasterContext(appId, username, resource, workerInfo, jar, null, appMasterInfo)
+ val appMasterContext =
+ AppMasterContext(appId, username, resource, workerInfo, jar, null, appMasterInfo)
LOG.info(s"Try to launch a executor for AppMaster on worker ${workerId} for app $appId")
val name = ActorUtil.actorNameForExecutor(appId, executorId)
val selfPath = ActorUtil.getFullPath(context.system, self.path)
- val jvmSetting = Util.resolveJvmSetting(appMasterAkkaConfig.withFallback(systemConfig)).appMater
- val executorJVM = ExecutorJVMConfig(jvmSetting.classPath ,jvmSetting.vmargs,
- classOf[ActorSystemBooter].getName, Array(name, selfPath), jar, username, appMasterAkkaConfig)
+ val jvmSetting =
+ Util.resolveJvmSetting(appMasterAkkaConfig.withFallback(systemConfig)).appMater
+ val executorJVM = ExecutorJVMConfig(jvmSetting.classPath, jvmSetting.vmargs,
+ classOf[ActorSystemBooter].getName, Array(name, selfPath), jar,
+ username, appMasterAkkaConfig)
worker ! LaunchExecutor(appId, executorId, resource, executorJVM)
context.become(waitForActorSystemToStart(worker, appMasterContext, app.userConfig, resource))
}
- def waitForActorSystemToStart(worker : ActorRef, appContext : AppMasterContext, user : UserConfig, resource: Resource) : Receive = {
+ def waitForActorSystemToStart(
+ worker: ActorRef, appContext: AppMasterContext, user: UserConfig, resource: Resource)
+ : Receive = {
case ExecutorLaunchRejected(reason, ex) =>
- LOG.error(s"Executor Launch failed reason:$reason", ex)
+ LOG.error(s"Executor Launch failed reason: $reason", ex)
LOG.info(s"reallocate resource $resource to start appmaster")
master ! RequestResource(appId, ResourceRequest(resource, WorkerId.unspecified))
context.become(waitForResourceAllocation)
@@ -99,7 +104,8 @@ class AppMasterLauncher(
val masterAddress = systemConfig.getStringList(GEARPUMP_CLUSTER_MASTERS)
.asScala.map(HostPort(_)).map(ActorUtil.getMasterActorPath)
- sender ! CreateActor(AppMasterRuntimeEnvironment.props(masterAddress, app, appContext), s"appdaemon$appId")
+ sender ! CreateActor(
+ AppMasterRuntimeEnvironment.props(masterAddress, app, appContext), s"appdaemon$appId")
import context.dispatcher
val appMasterTimeout = scheduler.scheduleOnce(TIMEOUT, self,
@@ -107,7 +113,7 @@ class AppMasterLauncher(
context.become(waitForAppMasterToStart(worker, appMasterTimeout))
}
- def waitForAppMasterToStart(worker : ActorRef, cancel: Cancellable) : Receive = {
+ def waitForAppMasterToStart(worker: ActorRef, cancel: Cancellable): Receive = {
case ActorCreated(appMaster, _) =>
cancel.cancel()
sender ! BindLifeCycle(appMaster)
@@ -121,19 +127,21 @@ class AppMasterLauncher(
context.stop(self)
}
- def replyToClient(result : SubmitApplicationResult) : Unit = {
+ def replyToClient(result: SubmitApplicationResult): Unit = {
if (client.isDefined) {
client.get.tell(result, master)
}
}
}
-object AppMasterLauncher extends AppMasterLauncherFactory{
- def props(appId : Int, executorId: Int, app : AppDescription, jar: Option[AppJar], username : String, master : ActorRef, client: Option[ActorRef]) = {
+object AppMasterLauncher extends AppMasterLauncherFactory {
+ def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar],
+ username: String, master: ActorRef, client: Option[ActorRef]): Props = {
Props(new AppMasterLauncher(appId, executorId, app, jar, username, master, client))
}
}
trait AppMasterLauncherFactory {
- def props(appId : Int, executorId: Int, app : AppDescription, jar: Option[AppJar], username : String, master : ActorRef, client: Option[ActorRef]) : Props
+ def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar],
+ username: String, master: ActorRef, client: Option[ActorRef]): Props
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/master/MasterProxy.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/master/MasterProxy.scala b/core/src/main/scala/io/gearpump/cluster/master/MasterProxy.scala
index 5d9e410..61d95dc 100644
--- a/core/src/main/scala/io/gearpump/cluster/master/MasterProxy.scala
+++ b/core/src/main/scala/io/gearpump/cluster/master/MasterProxy.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,24 +18,21 @@
package io.gearpump.cluster.master
+import scala.concurrent.duration.FiniteDuration
import akka.actor._
-import io.gearpump.transport.HostPort
-import io.gearpump.util.{ActorUtil, LogUtil}
import org.slf4j.Logger
-import scala.concurrent.duration.{FiniteDuration, Duration}
+import io.gearpump.transport.HostPort
+import io.gearpump.util.{ActorUtil, LogUtil}
/**
* This works with Master HA. When there are multiple Master nodes,
* This will find a active one.
- *
- *
- * @param masters
*/
-class MasterProxy (masters: Iterable[ActorPath], timeout: FiniteDuration)
+class MasterProxy(masters: Iterable[ActorPath], timeout: FiniteDuration)
extends Actor with Stash {
- import MasterProxy._
+ import io.gearpump.cluster.master.MasterProxy._
val LOG: Logger = LogUtil.getLogger(getClass, name = self.path.name)
@@ -48,10 +45,12 @@ class MasterProxy (masters: Iterable[ActorPath], timeout: FiniteDuration)
import context.dispatcher
- def findMaster() = repeatActionUtil(timeout){
- contacts foreach { contact =>
- LOG.info(s"sending identity to $contact")
- contact ! Identify(None)
+ def findMaster(): Cancellable = {
+ repeatActionUtil(timeout) {
+ contacts foreach { contact =>
+ LOG.info(s"sending identity to $contact")
+ contact ! Identify(None)
+ }
}
}
@@ -64,11 +63,11 @@ class MasterProxy (masters: Iterable[ActorPath], timeout: FiniteDuration)
super.postStop()
}
- override def receive : Receive = {
- case _=>
+ override def receive: Receive = {
+ case _ =>
}
- def establishing(findMaster : Cancellable): Actor.Receive = {
+ def establishing(findMaster: Cancellable): Actor.Receive = {
case ActorIdentity(_, Some(receptionist)) =>
context watch receptionist
LOG.info("Connected to [{}]", receptionist.path)
@@ -85,10 +84,10 @@ class MasterProxy (masters: Iterable[ActorPath], timeout: FiniteDuration)
}
def active(receptionist: ActorRef): Actor.Receive = {
- case Terminated(receptionist) ⇒
+ case Terminated(receptionist) =>
LOG.info("Lost contact with [{}], restablishing connection", receptionist)
context.become(establishing(findMaster))
- case _: ActorIdentity ⇒ // ok, from previous establish, already handled
+ case _: ActorIdentity => // ok, from previous establish, already handled
case WatchMaster(watcher) =>
watchers = watchers :+ watcher
}
@@ -99,10 +98,10 @@ class MasterProxy (masters: Iterable[ActorPath], timeout: FiniteDuration)
master forward msg
}
- def scheduler = context.system.scheduler
+ def scheduler: Scheduler = context.system.scheduler
import scala.concurrent.duration._
- private def repeatActionUtil(timeout: FiniteDuration)(action : => Unit) : Cancellable = {
- val send = scheduler.schedule(0 seconds, 2 seconds)(action)
+ private def repeatActionUtil(timeout: FiniteDuration)(action: => Unit): Cancellable = {
+ val send = scheduler.schedule(0.seconds, 2.seconds)(action)
val suicide = scheduler.scheduleOnce(timeout) {
send.cancel()
self ! PoisonPill
@@ -128,7 +127,7 @@ object MasterProxy {
case class WatchMaster(watcher: ActorRef)
import scala.concurrent.duration._
- def props(masters: Iterable[HostPort], duration: FiniteDuration = 30 seconds): Props = {
+ def props(masters: Iterable[HostPort], duration: FiniteDuration = 30.seconds): Props = {
val contacts = masters.map(ActorUtil.getMasterActorPath(_))
Props(new MasterProxy(contacts, duration))
}