You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:57 UTC
[47/49] incubator-gearpump git commit: fix GEARPUMP-118 change
package name to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/appmaster/ApplicationState.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/ApplicationState.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/ApplicationState.scala
deleted file mode 100644
index 3b967f4..0000000
--- a/core/src/main/scala/io/gearpump/cluster/appmaster/ApplicationState.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.appmaster
-
-import io.gearpump.cluster.{AppDescription, AppJar}
-
-/**
- * This state for single application, it is be distributed across the masters.
- */
-case class ApplicationState(
- appId: Int, appName: String, attemptId: Int, app: AppDescription, jar: Option[AppJar],
- username: String, state: Any) extends Serializable {
-
- override def equals(other: Any): Boolean = {
- other match {
- case that: ApplicationState =>
- if (appId == that.appId && attemptId == that.attemptId) {
- true
- } else {
- false
- }
- case _ =>
- false
- }
- }
-
- override def hashCode: Int = {
- import akka.routing.MurmurHash._
- extendHash(appId, attemptId, startMagicA, startMagicB)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala
deleted file mode 100644
index 6fcb5e7..0000000
--- a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.appmaster
-
-import akka.actor.{ActorRef, Address, PoisonPill}
-
-import io.gearpump.cluster.scheduler.Resource
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.util.ActorSystemBooter.BindLifeCycle
-
-case class WorkerInfo(workerId: WorkerId, ref: ActorRef)
-
-/**
- * Configurations to start an executor system on remote machine
- *
- * @param address Remote address where we start an Actor System.
- */
-case class ExecutorSystem(executorSystemId: Int, address: Address, daemon:
- ActorRef, resource: Resource, worker: WorkerInfo) {
- def bindLifeCycleWith(actor: ActorRef): Unit = {
- daemon ! BindLifeCycle(actor)
- }
-
- def shutdown(): Unit = {
- daemon ! PoisonPill
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala
deleted file mode 100644
index 78432f4..0000000
--- a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.appmaster
-
-import scala.concurrent.duration._
-
-import akka.actor._
-import org.slf4j.Logger
-
-import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor
-import io.gearpump.cluster.ExecutorJVMConfig
-import io.gearpump.cluster.WorkerToAppMaster._
-import io.gearpump.cluster.appmaster.ExecutorSystemLauncher._
-import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, Session}
-import io.gearpump.cluster.scheduler.Resource
-import io.gearpump.util.ActorSystemBooter.{ActorSystemRegistered, RegisterActorSystem}
-import io.gearpump.util.{ActorSystemBooter, ActorUtil, Constants, LogUtil}
-
-/**
- * This launches single executor system on target worker.
- *
- * Please use ExecutorSystemLauncher.props() to construct this actor
- *
- * @param session The session that request to launch executor system
- */
-private[appmaster]
-class ExecutorSystemLauncher(appId: Int, session: Session) extends Actor {
-
- private val LOG: Logger = LogUtil.getLogger(getClass)
-
- val scheduler = context.system.scheduler
- implicit val executionContext = context.dispatcher
-
- private val systemConfig = context.system.settings.config
- val timeoutSetting = systemConfig.getInt(Constants.GEARPUMP_START_EXECUTOR_SYSTEM_TIMEOUT_MS)
-
- val timeout = scheduler.scheduleOnce(timeoutSetting.milliseconds,
- self, LaunchExecutorSystemTimeout(session))
-
- def receive: Receive = waitForLaunchCommand
-
- def waitForLaunchCommand: Receive = {
- case LaunchExecutorSystem(worker, executorSystemId, resource) =>
- val launcherPath = ActorUtil.getFullPath(context.system, self.path)
- val jvmConfig = Option(session.executorSystemJvmConfig)
- .map(getExecutorJvmConfig(_, s"app${appId}system${executorSystemId}", launcherPath)).orNull
-
- val launch = LaunchExecutor(appId, executorSystemId, resource, jvmConfig)
- LOG.info(s"Launching Executor ...appId: $appId, executorSystemId: $executorSystemId, " +
- s"slots: ${resource.slots} on worker $worker")
-
- worker.ref ! launch
- context.become(waitForActorSystemToStart(sender, launch, worker, executorSystemId))
- }
-
- def waitForActorSystemToStart(
- replyTo: ActorRef, launch: LaunchExecutor, worker: WorkerInfo, executorSystemId: Int)
- : Receive = {
- case RegisterActorSystem(systemPath) =>
- import launch._
- timeout.cancel()
- LOG.info(s"Received RegisterActorSystem $systemPath for session ${session.requestor}")
- sender ! ActorSystemRegistered(worker.ref)
- val system =
- ExecutorSystem(executorId, AddressFromURIString(systemPath), sender, resource, worker)
- replyTo ! LaunchExecutorSystemSuccess(system, session)
- context.stop(self)
- case reject@ExecutorLaunchRejected(reason, ex) =>
- LOG.error(s"Executor Launch ${launch.resource} failed reason: $reason", ex)
- replyTo ! LaunchExecutorSystemRejected(launch.resource, reason, session)
- context.stop(self)
- case timeout: LaunchExecutorSystemTimeout =>
- LOG.error(s"The Executor ActorSystem $executorSystemId has not been started in time")
- replyTo ! timeout
- context.stop(self)
- }
-}
-
-private[appmaster]
-object ExecutorSystemLauncher {
-
- case class LaunchExecutorSystem(worker: WorkerInfo, systemId: Int, resource: Resource)
-
- case class LaunchExecutorSystemSuccess(system: ExecutorSystem, session: Session)
-
- case class LaunchExecutorSystemRejected(resource: Resource, reason: Any, session: Session)
-
- case class LaunchExecutorSystemTimeout(session: Session)
-
- private def getExecutorJvmConfig(conf: ExecutorSystemJvmConfig, systemName: String,
- reportBack: String): ExecutorJVMConfig = {
- Option(conf).map { conf =>
- import conf._
- ExecutorJVMConfig(classPath, jvmArguments, classOf[ActorSystemBooter].getName,
- Array(systemName, reportBack), jar, username, executorAkkaConfig)
- }.getOrElse(null)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala
deleted file mode 100644
index c5ec600..0000000
--- a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.appmaster
-
-import scala.concurrent.duration._
-
-import akka.actor._
-import com.typesafe.config.Config
-
-import io.gearpump.cluster.AppMasterToMaster.RequestResource
-import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated
-import io.gearpump.cluster._
-import io.gearpump.cluster.appmaster.ExecutorSystemLauncher._
-import io.gearpump.cluster.appmaster.ExecutorSystemScheduler._
-import io.gearpump.cluster.scheduler.{ResourceAllocation, ResourceRequest}
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.util.{Constants, LogUtil}
-
-/**
- * ExecutorSystem is also a type of resource, this class schedules ExecutorSystem for AppMaster.
- * AppMaster can use this class to directly request a live executor actor systems. The communication
- * in the background with Master and Worker is hidden from AppMaster.
- *
- * Please use ExecutorSystemScheduler.props() to construct this actor
- */
-private[appmaster]
-class ExecutorSystemScheduler(appId: Int, masterProxy: ActorRef,
- executorSystemLauncher: (Int, Session) => Props) extends Actor {
-
- private val LOG = LogUtil.getLogger(getClass, app = appId)
- implicit val timeout = Constants.FUTURE_TIMEOUT
- implicit val actorSystem = context.system
- var currentSystemId = 0
-
- var resourceAgents = Map.empty[Session, ActorRef]
-
- def receive: Receive = {
- clientCommands orElse resourceAllocationMessageHandler orElse executorSystemMessageHandler
- }
-
- def clientCommands: Receive = {
- case start: StartExecutorSystems =>
- LOG.info(s"starting executor systems (ExecutorSystemConfig(${start.executorSystemConfig}), " +
- s"Resources(${start.resources.mkString(",")}))")
- val requestor = sender()
- val executorSystemConfig = start.executorSystemConfig
- val session = Session(requestor, executorSystemConfig)
- val agent = resourceAgents.getOrElse(session,
- context.actorOf(Props(new ResourceAgent(masterProxy, session))))
- resourceAgents = resourceAgents + (session -> agent)
-
- start.resources.foreach { resource =>
- agent ! RequestResource(appId, resource)
- }
-
- case StopExecutorSystem(executorSystem) =>
- executorSystem.shutdown
- }
-
- def resourceAllocationMessageHandler: Receive = {
- case ResourceAllocatedForSession(allocations, session) =>
- if (isSessionAlive(session)) {
- allocations.foreach { resourceAllocation =>
- val ResourceAllocation(resource, worker, workerId) = resourceAllocation
-
- val launcher = context.actorOf(executorSystemLauncher(appId, session))
- launcher ! LaunchExecutorSystem(WorkerInfo(workerId, worker), currentSystemId, resource)
- currentSystemId = currentSystemId + 1
- }
- }
- case ResourceAllocationTimeOut(session) =>
- if (isSessionAlive(session)) {
- resourceAgents = resourceAgents - session
- session.requestor ! StartExecutorSystemTimeout
- }
- }
-
- def executorSystemMessageHandler: Receive = {
- case LaunchExecutorSystemSuccess(system, session) =>
- if (isSessionAlive(session)) {
- LOG.info("LaunchExecutorSystemSuccess, send back to " + session.requestor)
- system.bindLifeCycleWith(self)
- session.requestor ! ExecutorSystemStarted(system, session.executorSystemJvmConfig.jar)
- } else {
- LOG.error("We get a ExecutorSystem back, but resource requestor is no longer valid. " +
- "Will shutdown the allocated system")
- system.shutdown
- }
- case LaunchExecutorSystemTimeout(session) =>
- if (isSessionAlive(session)) {
- LOG.error(s"Failed to launch executor system for ${session.requestor} due to timeout")
- session.requestor ! StartExecutorSystemTimeout
- }
-
- case LaunchExecutorSystemRejected(resource, reason, session) =>
- if (isSessionAlive(session)) {
- LOG.error(s"Failed to launch executor system, due to $reason, " +
- s"will ask master to allocate new resources $resource")
- resourceAgents.get(session).map { resourceAgent: ActorRef =>
- resourceAgent ! RequestResource(appId, ResourceRequest(resource, WorkerId.unspecified))
- }
- }
- }
-
- private def isSessionAlive(session: Session): Boolean = {
- Option(session).flatMap(session => resourceAgents.get(session)).nonEmpty
- }
-}
-
-object ExecutorSystemScheduler {
-
- case class StartExecutorSystems(
- resources: Array[ResourceRequest], executorSystemConfig: ExecutorSystemJvmConfig)
-
- case class ExecutorSystemStarted(system: ExecutorSystem, boundedJar: Option[AppJar])
-
- case class StopExecutorSystem(system: ExecutorSystem)
-
- case object StartExecutorSystemTimeout
-
- case class ExecutorSystemJvmConfig(classPath: Array[String], jvmArguments: Array[String],
- jar: Option[AppJar], username: String, executorAkkaConfig: Config = null)
-
- /**
- * For each client which ask for an executor system, the scheduler will create a session for it.
- *
- */
- private[appmaster]
- case class Session(requestor: ActorRef, executorSystemJvmConfig: ExecutorSystemJvmConfig)
-
- /**
- * This is a agent for session to request resource
- *
- * @param session the original requester of the resource requests
- */
- private[appmaster]
- class ResourceAgent(master: ActorRef, session: Session) extends Actor {
- private var resourceRequestor: ActorRef = null
- var timeOutClock: Cancellable = null
- private var unallocatedResource: Int = 0
-
- import context.dispatcher
-
- import io.gearpump.util.Constants._
-
- val timeout = context.system.settings.config.getInt(GEARPUMP_RESOURCE_ALLOCATION_TIMEOUT)
-
- def receive: Receive = {
- case request: RequestResource =>
- unallocatedResource += request.request.resource.slots
- Option(timeOutClock).map(_.cancel)
- timeOutClock = context.system.scheduler.scheduleOnce(
- timeout.seconds, self, ResourceAllocationTimeOut(session))
- resourceRequestor = sender
- master ! request
- case ResourceAllocated(allocations) =>
- unallocatedResource -= allocations.map(_.resource.slots).sum
- resourceRequestor forward ResourceAllocatedForSession(allocations, session)
- case timeout: ResourceAllocationTimeOut =>
- if (unallocatedResource > 0) {
- resourceRequestor ! ResourceAllocationTimeOut(session)
- // We will not receive any ResourceAllocation after timeout
- context.stop(self)
- }
- }
- }
-
- private[ExecutorSystemScheduler]
- case class ResourceAllocatedForSession(resource: Array[ResourceAllocation], session: Session)
-
- private[ExecutorSystemScheduler]
- case class ResourceAllocationTimeOut(session: Session)
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeper.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeper.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeper.scala
deleted file mode 100644
index f8c8503..0000000
--- a/core/src/main/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeper.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.appmaster
-
-import java.util.concurrent.TimeUnit
-import scala.concurrent.duration.FiniteDuration
-
-import akka.actor._
-
-import io.gearpump.cluster.AppMasterToMaster.RegisterAppMaster
-import io.gearpump.cluster.MasterToAppMaster.AppMasterRegistered
-import io.gearpump.cluster.appmaster.MasterConnectionKeeper.AppMasterRegisterTimeout
-import io.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus.{MasterConnected, MasterStopped}
-import io.gearpump.cluster.master.MasterProxy.{MasterRestarted, WatchMaster}
-import io.gearpump.util.LogUtil
-
-/**
- * Watches the liveness of Master.
- *
- * When Master is restarted, it sends RegisterAppMaster to the new Master instance.
- * If Master is stopped, it sends the MasterConnectionStatus to listener
- *
- * please use MasterConnectionKeeper.props() to construct this actor
- */
-private[appmaster]
-class MasterConnectionKeeper(
- register: RegisterAppMaster, masterProxy: ActorRef, masterStatusListener: ActorRef)
- extends Actor {
-
- import context.dispatcher
-
- private val LOG = LogUtil.getLogger(getClass)
- private var master: ActorRef = null
-
- // Subscribe self to masterProxy,
- masterProxy ! WatchMaster(self)
-
- def registerAppMaster: Cancellable = {
- masterProxy ! register
- context.system.scheduler.scheduleOnce(FiniteDuration(30, TimeUnit.SECONDS),
- self, AppMasterRegisterTimeout)
- }
-
- context.become(waitMasterToConfirm(registerAppMaster))
-
- def waitMasterToConfirm(cancelRegister: Cancellable): Receive = {
- case AppMasterRegistered(appId) =>
- cancelRegister.cancel()
- masterStatusListener ! MasterConnected
- context.become(masterLivenessListener)
- case AppMasterRegisterTimeout =>
- cancelRegister.cancel()
- masterStatusListener ! MasterStopped
- context.stop(self)
- }
-
- def masterLivenessListener: Receive = {
- case MasterRestarted =>
- LOG.info("Master restarted, re-registering appmaster....")
- context.become(waitMasterToConfirm(registerAppMaster))
- case MasterStopped =>
- LOG.info("Master is dead, killing this AppMaster....")
- masterStatusListener ! MasterStopped
- context.stop(self)
- }
-
- def receive: Receive = null
-}
-
-private[appmaster] object MasterConnectionKeeper {
-
- case object AppMasterRegisterTimeout
-
- object MasterConnectionStatus {
-
- case object MasterConnected
-
- case object MasterStopped
-
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala b/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala
deleted file mode 100644
index 41c01d8..0000000
--- a/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.client
-
-import java.util.concurrent.TimeUnit
-import scala.collection.JavaConverters._
-import scala.concurrent.duration.Duration
-import scala.concurrent.{Await, Future}
-import scala.util.Try
-
-import akka.actor.{ActorRef, ActorSystem}
-import akka.util.Timeout
-import com.typesafe.config.{Config, ConfigValueFactory}
-import org.slf4j.Logger
-
-import io.gearpump.cluster.MasterToAppMaster.{AppMastersData, ReplayFromTimestampWindowTrailingEdge}
-import io.gearpump.cluster.MasterToClient.ReplayApplicationResult
-import io.gearpump.cluster._
-import io.gearpump.cluster.master.MasterProxy
-import io.gearpump.jarstore.JarStoreService
-import io.gearpump.util.Constants._
-import io.gearpump.util.{ActorUtil, Constants, LogUtil, Util}
-
-/**
- * ClientContext is a user facing util to submit/manage an application.
- *
- * TODO: add interface to query master here
- */
-class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) {
-
- def this(system: ActorSystem) = {
- this(system.settings.config, system, null)
- }
-
- def this(config: Config) = {
- this(config, null, null)
- }
-
- private val LOG: Logger = LogUtil.getLogger(getClass)
- private implicit val timeout = Timeout(5, TimeUnit.SECONDS)
-
- implicit val system = Option(sys).getOrElse(ActorSystem(s"client${Util.randInt()}", config))
- LOG.info(s"Starting system ${system.name}")
- val shouldCleanupSystem = Option(sys).isEmpty
-
- private val jarStoreService = JarStoreService.get(config)
- jarStoreService.init(config, system)
-
- private lazy val master: ActorRef = {
- val masters = config.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala
- .flatMap(Util.parseHostList)
- val master = Option(_master).getOrElse(system.actorOf(MasterProxy.props(masters),
- s"masterproxy${system.name}"))
- LOG.info(s"Creating master proxy ${master} for master list: $masters")
- master
- }
-
- /**
- * Submits an application with default jar setting. Use java property "gearpump.app.jar" if
- * defined. Otherwise, it assumes the jar is on the target runtime classpath, thus will
- * not send the jar across the wire.
- */
- def submit(app: Application): Int = {
- submit(app, System.getProperty(GEARPUMP_APP_JAR))
- }
-
- def submit(app: Application, jar: String): Int = {
- submit(app, jar, getExecutorNum())
- }
-
- def submit(app: Application, jar: String, executorNum: Int): Int = {
- val client = getMasterClient
- val appName = checkAndAddNamePrefix(app.name, System.getProperty(GEARPUMP_APP_NAME_PREFIX))
- val submissionConfig = getSubmissionConfig(config)
- .withValue(APPLICATION_EXECUTOR_NUMBER, ConfigValueFactory.fromAnyRef(executorNum))
- val appDescription =
- AppDescription(appName, app.appMaster.getName, app.userConfig, submissionConfig)
- val appJar = Option(jar).map(loadFile)
- client.submitApplication(appDescription, appJar)
- }
-
- private def getExecutorNum(): Int = {
- Try(System.getProperty(APPLICATION_EXECUTOR_NUMBER).toInt).getOrElse(1)
- }
-
- private def getSubmissionConfig(config: Config): Config = {
- ClusterConfig.filterOutDefaultConfig(config)
- }
-
- def replayFromTimestampWindowTrailingEdge(appId: Int): ReplayApplicationResult = {
- import scala.concurrent.ExecutionContext.Implicits.global
- val result = Await.result(
- ActorUtil.askAppMaster[ReplayApplicationResult](master,
- appId, ReplayFromTimestampWindowTrailingEdge(appId)), Duration.Inf)
- result
- }
-
- def askAppMaster[T](appId: Int, msg: Any): Future[T] = {
- import scala.concurrent.ExecutionContext.Implicits.global
- ActorUtil.askAppMaster[T](master, appId, msg)
- }
-
- def listApps: AppMastersData = {
- val client = getMasterClient
- client.listApplications
- }
-
- def shutdown(appId: Int): Unit = {
- val client = getMasterClient
- client.shutdownApplication(appId)
- }
-
- def resolveAppID(appId: Int): ActorRef = {
- val client = getMasterClient
- client.resolveAppId(appId)
- }
-
- def close(): Unit = {
- if (shouldCleanupSystem) {
- LOG.info(s"Shutting down system ${system.name}")
- system.terminate()
- }
- }
-
- private def loadFile(jarPath: String): AppJar = {
- val jarFile = new java.io.File(jarPath)
- val path = jarStoreService.copyFromLocal(jarFile)
- AppJar(jarFile.getName, path)
- }
-
- private def checkAndAddNamePrefix(appName: String, namePrefix: String): String = {
- val fullName = if (namePrefix != null && namePrefix != "") {
- namePrefix + "_" + appName
- } else {
- appName
- }
- if (!Util.validApplicationName(fullName)) {
- close()
- val error = s"The application name $appName is not a proper name. An app name can " +
- "be a sequence of letters, numbers or underscore character \"_\""
- throw new Exception(error)
- }
- fullName
- }
-
- private def getMasterClient: MasterClient = {
- val timeout = Try(config.getInt(Constants.GEARPUMP_MASTERCLIENT_TIMEOUT)).getOrElse(90)
- new MasterClient(master, akka.util.Timeout(timeout, TimeUnit.SECONDS))
- }
-}
-
-object ClientContext {
-
- def apply(): ClientContext = new ClientContext(ClusterConfig.default(), null, null)
-
- def apply(system: ActorSystem): ClientContext = {
- new ClientContext(ClusterConfig.default(), system, null)
- }
-
- def apply(system: ActorSystem, master: ActorRef): ClientContext = {
- new ClientContext(ClusterConfig.default(), system, master)
- }
-
- def apply(config: Config): ClientContext = new ClientContext(config, null, null)
-
- def apply(config: Config, system: ActorSystem, master: ActorRef): ClientContext = {
- new ClientContext(config, system, master)
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/client/MasterClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/client/MasterClient.scala b/core/src/main/scala/io/gearpump/cluster/client/MasterClient.scala
deleted file mode 100644
index 9edaf46..0000000
--- a/core/src/main/scala/io/gearpump/cluster/client/MasterClient.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.client
-
-import scala.concurrent.duration.Duration
-import scala.concurrent.{Await, Future}
-import scala.util.{Failure, Success}
-
-import akka.actor.ActorRef
-import akka.pattern.ask
-import akka.util.Timeout
-
-import io.gearpump.cluster.ClientToMaster._
-import io.gearpump.cluster.MasterToAppMaster.{AppMastersData, AppMastersDataRequest}
-import io.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult, SubmitApplicationResult}
-import io.gearpump.cluster.{AppDescription, AppJar}
-
-/**
- * Client to inter-operate with Master node.
- *
- * NOTE: Stateless, thread safe
- */
-class MasterClient(master: ActorRef, timeout: Timeout) {
- implicit val masterClientTimeout = timeout
-
- def submitApplication(app: AppDescription, appJar: Option[AppJar]): Int = {
- val result = Await.result(
- (master ? SubmitApplication(app, appJar)).asInstanceOf[Future[SubmitApplicationResult]],
- Duration.Inf)
- val appId = result.appId match {
- case Success(appId) =>
- // scalastyle:off println
- Console.println(s"Submit application succeed. The application id is $appId")
- // scalastyle:on println
- appId
- case Failure(ex) => throw ex
- }
- appId
- }
-
- def resolveAppId(appId: Int): ActorRef = {
- val result = Await.result(
- (master ? ResolveAppId(appId)).asInstanceOf[Future[ResolveAppIdResult]], Duration.Inf)
- result.appMaster match {
- case Success(appMaster) => appMaster
- case Failure(ex) => throw ex
- }
- }
-
- def shutdownApplication(appId: Int): Unit = {
- val result = Await.result(
- (master ? ShutdownApplication(appId)).asInstanceOf[Future[ShutdownApplicationResult]],
- Duration.Inf)
- result.appId match {
- case Success(_) =>
- case Failure(ex) => throw ex
- }
- }
-
- def listApplications: AppMastersData = {
- val result = Await.result(
- (master ? AppMastersDataRequest).asInstanceOf[Future[AppMastersData]], Duration.Inf)
- result
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/main/ArgumentsParser.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/main/ArgumentsParser.scala b/core/src/main/scala/io/gearpump/cluster/main/ArgumentsParser.scala
deleted file mode 100644
index 209f831..0000000
--- a/core/src/main/scala/io/gearpump/cluster/main/ArgumentsParser.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.main
-
-import io.gearpump.cluster.main.ArgumentsParser.Syntax
-
-case class CLIOption[+T](
- description: String = "", required: Boolean = false, defaultValue: Option[T] = None)
-
-class ParseResult(optionMap: Map[String, String], remainArguments: Array[String]) {
- def getInt(key: String): Int = optionMap.get(key).get.toInt
-
- def getString(key: String): String = optionMap.get(key).get
-
- def getBoolean(key: String): Boolean = optionMap.get(key).get.toBoolean
-
- def exists(key: String): Boolean = !(optionMap.getOrElse(key, "").isEmpty)
-
- def remainArgs: Array[String] = this.remainArguments
-}
-
-/**
- * Parser for command line arguments
- *
- * Grammar: -option1 value1 -option2 value3 -flag1 -flag2 remainArg1 remainArg2...
- */
-trait ArgumentsParser {
-
- val ignoreUnknownArgument = false
-
- // scalastyle:off println
- def help(): Unit = {
- Console.println(s"\nHelp: $description")
- var usage = List.empty[String]
- options.map(kv => if (kv._2.required) {
- usage = usage :+ s"-${kv._1} (required:${kv._2.required})${kv._2.description}"
- } else {
- usage = usage :+ s"-${kv._1} (required:${kv._2.required}, " +
- s"default:${kv._2.defaultValue.getOrElse("")})${kv._2.description}"
- })
- usage :+= remainArgs.map(k => s"<$k>").mkString(" ")
- usage.foreach(Console.println(_))
- }
- // scalastyle:on println
-
- def parse(args: Array[String]): ParseResult = {
- val syntax = Syntax(options, remainArgs, ignoreUnknownArgument)
- ArgumentsParser.parse(syntax, args)
- }
-
- val description: String = ""
- val options: Array[(String, CLIOption[Any])] = Array.empty[(String, CLIOption[Any])]
- val remainArgs: Array[String] = Array.empty[String]
-}
-
-object ArgumentsParser {
-
- case class Syntax(
- val options: Array[(String, CLIOption[Any])], val remainArgs: Array[String],
- val ignoreUnknownArgument: Boolean)
-
- def parse(syntax: Syntax, args: Array[String]): ParseResult = {
- import syntax.{ignoreUnknownArgument, options, remainArgs}
- var config = Map.empty[String, String]
- var remain = Array.empty[String]
-
- def doParse(argument: List[String]): Unit = {
- argument match {
- case Nil => Unit // true if everything processed successfully
-
- case key :: value :: rest if key.startsWith("-") && !value.startsWith("-") =>
- val fixedKey = key.substring(1)
- if (!options.map(_._1).contains(fixedKey)) {
- if (!ignoreUnknownArgument) {
- throw new Exception(s"found unknown option $fixedKey")
- } else {
- remain ++= Array(key, value)
- }
- } else {
- config += fixedKey -> value
- }
- doParse(rest)
-
- case key :: rest if key.startsWith("-") =>
- val fixedKey = key.substring(1)
- if (!options.map(_._1).contains(fixedKey)) {
- throw new Exception(s"found unknown option $fixedKey")
- } else {
- config += fixedKey -> "true"
- }
- doParse(rest)
-
- case value :: rest =>
- // scalastyle:off println
- Console.err.println(s"Warning: get unknown argument $value, maybe it is a main class")
- // scalastyle:on println
- remain ++= value :: rest
- doParse(Nil)
- }
- }
- doParse(args.toList)
-
- options.foreach { pair =>
- val (key, option) = pair
- if (!config.contains(key) && !option.required) {
- config += key -> option.defaultValue.getOrElse("").toString
- }
- }
-
- options.foreach { pair =>
- val (key, value) = pair
- if (config.get(key).isEmpty) {
- throw new Exception(s"Missing option ${key}...")
- }
- }
-
- if (remain.length < remainArgs.length) {
- throw new Exception(s"Missing arguments ...")
- }
-
- new ParseResult(config, remain)
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala b/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala
deleted file mode 100644
index fb3e5c4..0000000
--- a/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.master
-
-import java.util.concurrent.{TimeUnit, TimeoutException}
-import scala.collection.JavaConverters._
-import scala.concurrent.duration.Duration
-import scala.util.{Failure, Success}
-
-import akka.actor.{Actor, ActorRef, Props, _}
-import com.typesafe.config.Config
-import org.slf4j.Logger
-
-import io.gearpump.cluster.AppMasterToMaster.RequestResource
-import io.gearpump.cluster.AppMasterToWorker.{LaunchExecutor, ShutdownExecutor}
-import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated
-import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
-import io.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected
-import io.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, AppMasterRuntimeInfo, WorkerInfo}
-import io.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest}
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.cluster.{AppDescription, AppJar, _}
-import io.gearpump.transport.HostPort
-import io.gearpump.util.ActorSystemBooter._
-import io.gearpump.util.Constants._
-import io.gearpump.util.{ActorSystemBooter, ActorUtil, LogUtil, Util}
-
-/**
- *
- * AppMasterLauncher is a child Actor of AppManager, it is responsible
- * to launch the AppMaster on the cluster.
- */
-class AppMasterLauncher(
- appId: Int, executorId: Int, app: AppDescription,
- jar: Option[AppJar], username: String, master: ActorRef, client: Option[ActorRef])
- extends Actor {
- private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
-
- val scheduler = context.system.scheduler
- val systemConfig = context.system.settings.config
- val TIMEOUT = Duration(15, TimeUnit.SECONDS)
-
- val appMasterAkkaConfig: Config = app.clusterConfig
-
- LOG.info(s"Ask Master resource to start AppMaster $appId...")
- master ! RequestResource(appId, ResourceRequest(Resource(1), WorkerId.unspecified))
-
- def receive: Receive = waitForResourceAllocation
-
- def waitForResourceAllocation: Receive = {
- case ResourceAllocated(allocations) =>
-
- val ResourceAllocation(resource, worker, workerId) = allocations(0)
- LOG.info(s"Resource allocated for appMaster $appId on worker ${workerId}(${worker.path})")
-
- val submissionTime = System.currentTimeMillis()
-
- val appMasterInfo = AppMasterRuntimeInfo(appId, app.name, worker, username,
- submissionTime, config = appMasterAkkaConfig)
- val workerInfo = WorkerInfo(workerId, worker)
- val appMasterContext =
- AppMasterContext(appId, username, resource, workerInfo, jar, null, appMasterInfo)
- LOG.info(s"Try to launch a executor for AppMaster on worker ${workerId} for app $appId")
- val name = ActorUtil.actorNameForExecutor(appId, executorId)
- val selfPath = ActorUtil.getFullPath(context.system, self.path)
-
- val jvmSetting =
- Util.resolveJvmSetting(appMasterAkkaConfig.withFallback(systemConfig)).appMater
- val executorJVM = ExecutorJVMConfig(jvmSetting.classPath, jvmSetting.vmargs,
- classOf[ActorSystemBooter].getName, Array(name, selfPath), jar,
- username, appMasterAkkaConfig)
-
- worker ! LaunchExecutor(appId, executorId, resource, executorJVM)
- context.become(waitForActorSystemToStart(worker, appMasterContext, app.userConfig, resource))
- }
-
- def waitForActorSystemToStart(
- worker: ActorRef, appContext: AppMasterContext, user: UserConfig, resource: Resource)
- : Receive = {
- case ExecutorLaunchRejected(reason, ex) =>
- LOG.error(s"Executor Launch failed reason: $reason", ex)
- LOG.info(s"reallocate resource $resource to start appmaster")
- master ! RequestResource(appId, ResourceRequest(resource, WorkerId.unspecified))
- context.become(waitForResourceAllocation)
- case RegisterActorSystem(systemPath) =>
- LOG.info(s"Received RegisterActorSystem $systemPath for AppMaster")
- sender ! ActorSystemRegistered(worker)
-
- val masterAddress = systemConfig.getStringList(GEARPUMP_CLUSTER_MASTERS)
- .asScala.map(HostPort(_)).map(ActorUtil.getMasterActorPath)
- sender ! CreateActor(
- AppMasterRuntimeEnvironment.props(masterAddress, app, appContext), s"appdaemon$appId")
-
- import context.dispatcher
- val appMasterTimeout = scheduler.scheduleOnce(TIMEOUT, self,
- CreateActorFailed(app.appMaster, new TimeoutException))
- context.become(waitForAppMasterToStart(worker, appMasterTimeout))
- }
-
- def waitForAppMasterToStart(worker: ActorRef, cancel: Cancellable): Receive = {
- case ActorCreated(appMaster, _) =>
- cancel.cancel()
- sender ! BindLifeCycle(appMaster)
- LOG.info(s"AppMaster is created, mission complete...")
- replyToClient(SubmitApplicationResult(Success(appId)))
- context.stop(self)
- case CreateActorFailed(name, reason) =>
- cancel.cancel()
- worker ! ShutdownExecutor(appId, executorId, reason.getMessage)
- replyToClient(SubmitApplicationResult(Failure(reason)))
- context.stop(self)
- }
-
- def replyToClient(result: SubmitApplicationResult): Unit = {
- if (client.isDefined) {
- client.get.tell(result, master)
- }
- }
-}
-
-object AppMasterLauncher extends AppMasterLauncherFactory {
- def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar],
- username: String, master: ActorRef, client: Option[ActorRef]): Props = {
- Props(new AppMasterLauncher(appId, executorId, app, jar, username, master, client))
- }
-}
-
-trait AppMasterLauncherFactory {
- def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar],
- username: String, master: ActorRef, client: Option[ActorRef]): Props
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/master/MasterProxy.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/master/MasterProxy.scala b/core/src/main/scala/io/gearpump/cluster/master/MasterProxy.scala
deleted file mode 100644
index 61d95dc..0000000
--- a/core/src/main/scala/io/gearpump/cluster/master/MasterProxy.scala
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.master
-
-import scala.concurrent.duration.FiniteDuration
-
-import akka.actor._
-import org.slf4j.Logger
-
-import io.gearpump.transport.HostPort
-import io.gearpump.util.{ActorUtil, LogUtil}
-
-/**
- * This works with Master HA. When there are multiple Master nodes,
- * This will find a active one.
- */
-class MasterProxy(masters: Iterable[ActorPath], timeout: FiniteDuration)
- extends Actor with Stash {
- import io.gearpump.cluster.master.MasterProxy._
-
- val LOG: Logger = LogUtil.getLogger(getClass, name = self.path.name)
-
- val contacts = masters.map { url =>
- LOG.info(s"Contacts point URL: $url")
- context.actorSelection(url)
- }
-
- var watchers: List[ActorRef] = List.empty[ActorRef]
-
- import context.dispatcher
-
- def findMaster(): Cancellable = {
- repeatActionUtil(timeout) {
- contacts foreach { contact =>
- LOG.info(s"sending identity to $contact")
- contact ! Identify(None)
- }
- }
- }
-
- context.become(establishing(findMaster()))
-
- LOG.info("Master Proxy is started...")
-
- override def postStop(): Unit = {
- watchers.foreach(_ ! MasterStopped)
- super.postStop()
- }
-
- override def receive: Receive = {
- case _ =>
- }
-
- def establishing(findMaster: Cancellable): Actor.Receive = {
- case ActorIdentity(_, Some(receptionist)) =>
- context watch receptionist
- LOG.info("Connected to [{}]", receptionist.path)
- context.watch(receptionist)
-
- watchers.foreach(_ ! MasterRestarted)
- unstashAll()
- findMaster.cancel()
- context.become(active(receptionist) orElse messageHandler(receptionist))
- case ActorIdentity(_, None) => // ok, use another instead
- case msg =>
- LOG.info(s"Stashing ${msg.getClass.getSimpleName}")
- stash()
- }
-
- def active(receptionist: ActorRef): Actor.Receive = {
- case Terminated(receptionist) =>
- LOG.info("Lost contact with [{}], restablishing connection", receptionist)
- context.become(establishing(findMaster))
- case _: ActorIdentity => // ok, from previous establish, already handled
- case WatchMaster(watcher) =>
- watchers = watchers :+ watcher
- }
-
- def messageHandler(master: ActorRef): Receive = {
- case msg =>
- LOG.debug(s"Get msg ${msg.getClass.getSimpleName}, forwarding to ${master.path}")
- master forward msg
- }
-
- def scheduler: Scheduler = context.system.scheduler
- import scala.concurrent.duration._
- private def repeatActionUtil(timeout: FiniteDuration)(action: => Unit): Cancellable = {
- val send = scheduler.schedule(0.seconds, 2.seconds)(action)
- val suicide = scheduler.scheduleOnce(timeout) {
- send.cancel()
- self ! PoisonPill
- }
-
- new Cancellable {
- def cancel(): Boolean = {
- val result1 = send.cancel()
- val result2 = suicide.cancel()
- result1 && result2
- }
-
- def isCancelled: Boolean = {
- send.isCancelled && suicide.isCancelled
- }
- }
- }
-}
-
-object MasterProxy {
- case object MasterRestarted
- case object MasterStopped
- case class WatchMaster(watcher: ActorRef)
-
- import scala.concurrent.duration._
- def props(masters: Iterable[HostPort], duration: FiniteDuration = 30.seconds): Props = {
- val contacts = masters.map(ActorUtil.getMasterActorPath(_))
- Props(new MasterProxy(contacts, duration))
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala b/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala
deleted file mode 100644
index 0996381..0000000
--- a/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.master
-
-import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
-
-/** Master status. Synced means all masters are live and synced. */
-object MasterStatus {
- type Type = String
- val Synced = "synced"
- val UnSynced = "unsynced"
-}
-
-case class MasterNode(host: String, port: Int) {
- def toTuple: (String, Int) = {
- (host, port)
- }
-}
-
-/**
- * Master information returned for REST API call
- */
-case class MasterSummary(
- leader: MasterNode,
- cluster: List[MasterNode],
- aliveFor: Long,
- logFile: String,
- jarStore: String,
- masterStatus: MasterStatus.Type,
- homeDirectory: String,
- activities: List[MasterActivity],
- jvmName: String,
- historyMetricsConfig: HistoryMetricsConfig = null)
-
-case class MasterActivity(time: Long, event: String)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala b/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala
deleted file mode 100644
index b25162e..0000000
--- a/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.scheduler
-
-import akka.actor.ActorRef
-
-import io.gearpump.cluster.worker.WorkerId
-
-case class Resource(slots: Int) {
-
- // scalastyle:off spaces.after.plus
- def +(other: Resource): Resource = Resource(slots + other.slots)
- // scalastyle:on spaces.after.plus
-
- def -(other: Resource): Resource = Resource(slots - other.slots)
-
- def >(other: Resource): Boolean = slots > other.slots
-
- def >=(other: Resource): Boolean = !(this < other)
-
- def <(other: Resource): Boolean = slots < other.slots
-
- def <=(other: Resource): Boolean = !(this > other)
-
- def isEmpty: Boolean = {
- slots == 0
- }
-}
-
-/**
- * Each streaming job can have a priority, the job with higher priority
- * will get scheduled resource earlier than those with lower priority.
- */
-object Priority extends Enumeration {
- type Priority = Value
- val LOW, NORMAL, HIGH = Value
-}
-
-/**
- * Relaxation.ONEWORKER means only resource (slot) from that worker will be accepted by
- * the requestor application job.
- */
-object Relaxation extends Enumeration {
- type Relaxation = Value
-
- // Option ONEWORKER allow user to schedule a task on specific worker.
- val ANY, ONEWORKER, SPECIFICWORKER = Value
-}
-
-import io.gearpump.cluster.scheduler.Priority._
-import io.gearpump.cluster.scheduler.Relaxation._
-
-case class ResourceRequest(
- resource: Resource, workerId: WorkerId, priority: Priority = NORMAL,
- relaxation: Relaxation = ANY, executorNum: Int = 1)
-
-case class ResourceAllocation(resource: Resource, worker: ActorRef, workerId: WorkerId)
-
-object Resource {
- def empty: Resource = new Resource(0)
-
- def min(res1: Resource, res2: Resource): Resource = if (res1.slots < res2.slots) res1 else res2
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/worker/ExecutorProcessLauncher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/worker/ExecutorProcessLauncher.scala b/core/src/main/scala/io/gearpump/cluster/worker/ExecutorProcessLauncher.scala
deleted file mode 100644
index 8581467..0000000
--- a/core/src/main/scala/io/gearpump/cluster/worker/ExecutorProcessLauncher.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.worker
-
-import com.typesafe.config.Config
-
-import io.gearpump.cluster.scheduler.Resource
-import io.gearpump.util.RichProcess
-
-/**
- * ExecutorProcessLauncher is used to launch a process for Executor using given parameters.
- *
- * User can implement this interface to decide the behavior of launching a process.
- * Set "gearpump.worker.executor-process-launcher" to your implemented class name.
- */
-trait ExecutorProcessLauncher {
- val config: Config
-
- /**
- * This function launches a process for Executor using given parameters.
- *
- * @param appId The appId of the executor to be launched
- * @param executorId The executorId of the executor to be launched
- * @param resource The resource allocated for that executor
- * @param options The command options
- * @param classPath The classpath of the process
- * @param mainClass The main class of the process
- * @param arguments The rest arguments
- */
- def createProcess(
- appId: Int, executorId: Int, resource: Resource, config: Config, options: Array[String],
- classPath: Array[String], mainClass: String, arguments: Array[String]): RichProcess
-
- /**
- * This function will clean resources for a launched process.
- * @param appId The appId of the launched executor
- * @param executorId The executorId of launched executor
- */
- def cleanProcess(appId: Int, executorId: Int): Unit
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/worker/WorkerId.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/worker/WorkerId.scala b/core/src/main/scala/io/gearpump/cluster/worker/WorkerId.scala
deleted file mode 100644
index 24c6ad2..0000000
--- a/core/src/main/scala/io/gearpump/cluster/worker/WorkerId.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.worker
-
-/**
- * WorkerId is used to uniquely track a worker machine.
- *
- * @param sessionId sessionId is assigned by Master node for easy tracking. It is possible that
- * sessionId is **NOT** unique, so always use WorkerId for comparison.
- * @param registerTime the timestamp when a worker node register itself to master node
- */
-case class WorkerId(sessionId: Int, registerTime: Long)
-
-object WorkerId {
- val unspecified: WorkerId = new WorkerId(-1, 0L)
-
- def render(workerId: WorkerId): String = {
- workerId.registerTime + "_" + workerId.sessionId
- }
-
- def parse(str: String): WorkerId = {
- val pair = str.split("_")
- new WorkerId(pair(1).toInt, pair(0).toLong)
- }
-
- implicit val workerIdOrdering: Ordering[WorkerId] = {
- new Ordering[WorkerId] {
-
- /** Compare timestamp first, then id */
- override def compare(x: WorkerId, y: WorkerId): Int = {
- if (x.registerTime < y.registerTime) {
- -1
- } else if (x.registerTime == y.registerTime) {
- if (x.sessionId < y.sessionId) {
- -1
- } else if (x.sessionId == y.sessionId) {
- 0
- } else {
- 1
- }
- } else {
- 1
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala b/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala
deleted file mode 100644
index cdf2d03..0000000
--- a/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.worker
-import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
-
-/**
- * Worker summary information for REST API.
- */
-case class WorkerSummary(
- workerId: WorkerId,
- state: String,
- actorPath: String,
- aliveFor: Long,
- logFile: String,
- executors: Array[ExecutorSlots],
- totalSlots: Int,
- availableSlots: Int,
- homeDirectory: String,
- jvmName: String,
- // Id used to uniquely identity this worker process in low level resource manager like YARN.
- resourceManagerContainerId: String,
- historyMetricsConfig: HistoryMetricsConfig = null)
-
-object WorkerSummary {
- def empty: WorkerSummary = {
- WorkerSummary(WorkerId.unspecified, "", "", 0L, "",
- Array.empty[ExecutorSlots], 0, 0, "", jvmName = "", resourceManagerContainerId = "")
- }
-}
-
-case class ExecutorSlots(appId: Int, executorId: Int, slots: Int)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/jarstore/JarStoreService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/jarstore/JarStoreService.scala b/core/src/main/scala/io/gearpump/jarstore/JarStoreService.scala
deleted file mode 100644
index 54d5431..0000000
--- a/core/src/main/scala/io/gearpump/jarstore/JarStoreService.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.jarstore
-
-import java.io.File
-import java.net.URI
-import java.util.ServiceLoader
-import scala.collection.JavaConverters._
-
-import akka.actor.ActorSystem
-import com.typesafe.config.Config
-
-import io.gearpump.util.{Constants, Util}
-
-case class FilePath(path: String)
-
-/**
- * JarStoreService is used to manage the upload/download of binary files,
- * like user submitted application jar.
- */
-trait JarStoreService {
- /**
- * The scheme of the JarStoreService.
- * Like "hdfs" for HDFS file system, and "file" for a local
- * file system.
- */
- val scheme: String
-
- /**
- * Init the Jar Store.
- */
- def init(config: Config, system: ActorSystem)
-
- /**
- * This function will copy the local file to the remote JarStore, called from client side.
- * @param localFile The local file
- */
- def copyFromLocal(localFile: File): FilePath
-
- /**
- * This function will copy the remote file to local file system, called from client side.
- *
- * @param localFile The destination of file path
- * @param remotePath The remote file path from JarStore
- */
- def copyToLocalFile(localFile: File, remotePath: FilePath)
-}
-
-object JarStoreService {
-
- /**
- * Get a active JarStoreService by specifying a scheme.
- *
- * Please see config [[io.gearpump.util.Constants#GEARPUMP_APP_JAR_STORE_ROOT_PATH]] for more
- * information.
- */
- def get(config: Config): JarStoreService = {
- val jarStoreRootPath = config.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH)
- get(jarStoreRootPath)
- }
-
- private lazy val jarstoreServices: List[JarStoreService] = {
- ServiceLoader.load(classOf[JarStoreService]).asScala.toList
- }
-
- private def get(rootPath: String): JarStoreService = {
- val scheme = new URI(Util.resolvePath(rootPath)).getScheme
- jarstoreServices.find(_.scheme == scheme).get
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/metrics/AkkaReporter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/metrics/AkkaReporter.scala b/core/src/main/scala/io/gearpump/metrics/AkkaReporter.scala
deleted file mode 100644
index 3a581fb..0000000
--- a/core/src/main/scala/io/gearpump/metrics/AkkaReporter.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.metrics
-
-import scala.collection.JavaConverters._
-
-import akka.actor.{ActorRef, ActorSystem}
-
-import io.gearpump.codahale.metrics.{Gauge => CodaGauge, MetricRegistry}
-import io.gearpump.metrics.Metrics.{Counter => CounterData, Gauge => GaugeData, Histogram => HistogramData, Meter => MeterData}
-import io.gearpump.metrics.MetricsReporterService.ReportTo
-import io.gearpump.util.LogUtil
-
-/**
- * A reporter class for logging metrics values to a remote actor periodically
- */
-class AkkaReporter(
- system: ActorSystem,
- registry: MetricRegistry)
- extends ReportTo {
- private val LOG = LogUtil.getLogger(getClass)
- LOG.info("Start Metrics AkkaReporter")
-
- override def report(to: ActorRef): Unit = {
- val counters = registry.getCounters()
- val histograms = registry.getHistograms()
- val meters = registry.getMeters()
- val gauges = registry.getGauges()
-
- counters.entrySet().asScala.foreach { pair =>
- to ! CounterData(pair.getKey, pair.getValue.getCount)
- }
-
- histograms.entrySet().asScala.foreach { pair =>
- val key = pair.getKey
- val value = pair.getValue
- val s = value.getSnapshot
- to ! HistogramData(
- key, s.getMean, s.getStdDev, s.getMedian,
- s.get95thPercentile, s.get99thPercentile, s.get999thPercentile)
- }
-
- meters.entrySet().asScala.foreach { pair =>
- val key = pair.getKey
- val value = pair.getValue
- to ! MeterData(key,
- value.getCount,
- value.getMeanRate,
- value.getOneMinuteRate,
- getRateUnit)
- }
-
- gauges.entrySet().asScala.foreach { kv =>
- val value = kv.getValue.asInstanceOf[CodaGauge[Number]].getValue.longValue()
- to ! GaugeData(kv.getKey, value)
- }
- }
-
- private def getRateUnit: String = {
- "events/s"
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/metrics/Counter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/metrics/Counter.scala b/core/src/main/scala/io/gearpump/metrics/Counter.scala
deleted file mode 100644
index 70c7bae..0000000
--- a/core/src/main/scala/io/gearpump/metrics/Counter.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.metrics
-
-import io.gearpump.codahale.metrics.{Counter => CodaHaleCounter}
-
-/**
- * @see io.gearpump.codahale.metrics.Counter
- */
-class Counter(val name: String, counter: CodaHaleCounter, sampleRate: Int = 1) {
- private var sampleCount = 0L
- private var toBeIncremented = 0L
-
- def inc() {
- inc(1)
- }
-
- def inc(n: Long) {
- toBeIncremented += n
- sampleCount += 1
- if (null != counter && sampleCount % sampleRate == 0) {
- counter.inc(toBeIncremented)
- toBeIncremented = 0
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/metrics/Histogram.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/metrics/Histogram.scala b/core/src/main/scala/io/gearpump/metrics/Histogram.scala
deleted file mode 100644
index 4673050..0000000
--- a/core/src/main/scala/io/gearpump/metrics/Histogram.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.metrics
-
-import io.gearpump.codahale.metrics.{Histogram => CodaHaleHistogram}
-
-/**
- * @see io.gearpump.codahale.metrics.Histogram
- */
-class Histogram(val name: String, histogram: CodaHaleHistogram, sampleRate: Int = 1) {
- private var sampleCount = 0L
-
- def update(value: Long) {
- sampleCount += 1
- if (null != histogram && sampleCount % sampleRate == 0) {
- histogram.update(value)
- }
- }
-
- def getMean(): Double = {
- histogram.getSnapshot.getMean
- }
-
- def getStdDev(): Double = {
- histogram.getSnapshot.getStdDev
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/metrics/JvmMetricsSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/metrics/JvmMetricsSet.scala b/core/src/main/scala/io/gearpump/metrics/JvmMetricsSet.scala
deleted file mode 100644
index 28d420a..0000000
--- a/core/src/main/scala/io/gearpump/metrics/JvmMetricsSet.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.metrics
-
-import java.util
-import scala.collection.JavaConverters._
-
-import io.gearpump.codahale.metrics.jvm.{MemoryUsageGaugeSet, ThreadStatesGaugeSet}
-import io.gearpump.codahale.metrics.{Metric, MetricSet}
-
-class JvmMetricsSet(name: String) extends MetricSet {
-
- override def getMetrics: util.Map[String, Metric] = {
- val memoryMetrics = new MemoryUsageGaugeSet().getMetrics.asScala
- val threadMetrics = new ThreadStatesGaugeSet().getMetrics.asScala
- Map(
- s"$name:memory.total.used" -> memoryMetrics("total.used"),
- s"$name:memory.total.committed" -> memoryMetrics("total.committed"),
- s"$name:memory.total.max" -> memoryMetrics("total.max"),
- s"$name:memory.heap.used" -> memoryMetrics("heap.used"),
- s"$name:memory.heap.committed" -> memoryMetrics("heap.committed"),
- s"$name:memory.heap.max" -> memoryMetrics("heap.max"),
- s"$name:thread.count" -> threadMetrics("count"),
- s"$name:thread.daemon.count" -> threadMetrics("daemon.count")
- ).asJava
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/metrics/Meter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/metrics/Meter.scala b/core/src/main/scala/io/gearpump/metrics/Meter.scala
deleted file mode 100644
index ca79a37..0000000
--- a/core/src/main/scala/io/gearpump/metrics/Meter.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.metrics
-
-import io.gearpump.codahale.metrics.{Meter => CodaHaleMeter}
-
-/** See io.gearpump.codahale.metrics.Meter */
-class Meter(val name: String, meter: CodaHaleMeter, sampleRate: Int = 1) {
- private var sampleCount = 0L
- private var toBeMarked = 0L
-
- def mark() {
- meter.mark(1)
- }
-
- def mark(n: Long) {
- toBeMarked += n
- sampleCount += 1
- if (null != meter && sampleCount % sampleRate == 0) {
- meter.mark(toBeMarked)
- toBeMarked = 0
- }
- }
-
- def getOneMinuteRate(): Double = {
- meter.getOneMinuteRate
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/metrics/Metrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/metrics/Metrics.scala b/core/src/main/scala/io/gearpump/metrics/Metrics.scala
deleted file mode 100644
index aad1af0..0000000
--- a/core/src/main/scala/io/gearpump/metrics/Metrics.scala
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.metrics
-
-import scala.collection.JavaConverters._
-
-import akka.actor._
-import org.slf4j.Logger
-
-import io.gearpump.codahale.metrics._
-import io.gearpump.metrics
-import io.gearpump.util.LogUtil
-
-/** Metric objects registry */
-class Metrics(sampleRate: Int) extends Extension {
-
- val registry = new MetricRegistry()
-
- def meter(name: String): metrics.Meter = {
- new metrics.Meter(name, registry.meter(name), sampleRate)
- }
-
- def histogram(name: String): Histogram = {
- new Histogram(name, registry.histogram(name), sampleRate)
- }
-
- def histogram(name: String, sampleRate: Int): Histogram = {
- new Histogram(name, registry.histogram(name), sampleRate)
- }
-
- def counter(name: String): Counter = {
- new Counter(name, registry.counter(name), sampleRate)
- }
-
- def register(set: MetricSet): Unit = {
- val names = registry.getNames
- val metrics = set.getMetrics.asScala.filterKeys { key => !names.contains(key) }
- metrics.foreach { kv =>
- registry.register(kv._1, kv._2)
- }
- }
-}
-
-object Metrics extends ExtensionId[Metrics] with ExtensionIdProvider {
-
- val LOG: Logger = LogUtil.getLogger(getClass)
- import io.gearpump.util.Constants._
-
- sealed trait MetricType {
- def name: String
- }
-
- object MetricType {
- def unapply(obj: MetricType): Option[(Histogram, Counter, Meter, Timer, Gauge)] = {
- obj match {
- case x: Histogram => Some((x, null, null, null, null))
- case x: Counter => Some((null, x, null, null, null))
- case x: Meter => Some((null, null, x, null, null))
- case x: Timer => Some((null, null, null, x, null))
- case g: Gauge => Some((null, null, null, null, g))
- }
- }
-
- def apply(h: Histogram, c: Counter, m: Meter, t: Timer, g: Gauge): MetricType = {
- val result =
- if (h != null) h
- else if (c != null) c
- else if (m != null) m
- else if (t != null) t
- else if (g != null) g
- else null
- result
- }
- }
-
- case class Histogram (
- name: String, mean: Double,
- stddev: Double, median: Double,
- p95: Double, p99: Double, p999: Double)
- extends MetricType
-
- case class Counter(name: String, value: Long) extends MetricType
-
- case class Meter(
- name: String, count: Long, meanRate: Double,
- m1: Double, rateUnit: String)
- extends MetricType
-
- case class Timer(
- name: String, count: Long, min: Double, max: Double,
- mean: Double, stddev: Double, median: Double,
- p75: Double, p95: Double, p98: Double,
- p99: Double, p999: Double, meanRate: Double,
- m1: Double, m5: Double, m15: Double,
- rateUnit: String, durationUnit: String)
- extends MetricType
-
- case class Gauge(name: String, value: Long) extends MetricType
-
- case object ReportMetrics
-
- case class DemandMoreMetrics(subscriber: ActorRef)
-
- override def get(system: ActorSystem): Metrics = super.get(system)
-
- override def lookup: ExtensionId[Metrics] = Metrics
-
- override def createExtension(system: ExtendedActorSystem): Metrics = {
- val metricsEnabled = system.settings.config.getBoolean(GEARPUMP_METRIC_ENABLED)
- LOG.info(s"Metrics is enabled..., $metricsEnabled")
- val sampleRate = system.settings.config.getInt(GEARPUMP_METRIC_SAMPLE_RATE)
- if (metricsEnabled) {
- val meters = new Metrics(sampleRate)
- meters
- } else {
- new DummyMetrics
- }
- }
-
- class DummyMetrics extends Metrics(1) {
- override def register(set: MetricSet): Unit = Unit
-
- private val meter = new metrics.Meter("", null) {
- override def mark(): Unit = Unit
- override def mark(n: Long): Unit = Unit
- override def getOneMinuteRate(): Double = 0
- }
-
- private val histogram = new metrics.Histogram("", null) {
- override def update(value: Long): Unit = Unit
- override def getMean(): Double = 0
- override def getStdDev(): Double = 0
- }
-
- private val counter = new metrics.Counter("", null) {
- override def inc(): Unit = Unit
- override def inc(n: Long): Unit = Unit
- }
-
- override def meter(name: String): metrics.Meter = meter
- override def histogram(name: String): metrics.Histogram = histogram
- override def counter(name: String): metrics.Counter = counter
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/metrics/MetricsAggregator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/metrics/MetricsAggregator.scala b/core/src/main/scala/io/gearpump/metrics/MetricsAggregator.scala
deleted file mode 100644
index f52a060..0000000
--- a/core/src/main/scala/io/gearpump/metrics/MetricsAggregator.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.metrics
-
-import io.gearpump.cluster.MasterToClient.HistoryMetricsItem
-
-/**
- * Aggregates a larger set of metrics into a smaller set
- *
- * Sub Class must implement a constructor with signature like this:
- * MetricsAggregator(config: Config)
- */
-trait MetricsAggregator {
- def aggregate(options: Map[String, String], inputs: Iterator[HistoryMetricsItem])
- : List[HistoryMetricsItem]
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/metrics/MetricsReporterService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/metrics/MetricsReporterService.scala b/core/src/main/scala/io/gearpump/metrics/MetricsReporterService.scala
deleted file mode 100644
index 05decdd..0000000
--- a/core/src/main/scala/io/gearpump/metrics/MetricsReporterService.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.metrics
-
-import java.net.InetSocketAddress
-import java.util.concurrent.TimeUnit
-import scala.concurrent.duration._
-
-import akka.actor.{Actor, ActorRef}
-
-import io.gearpump.codahale.metrics.graphite.{Graphite, GraphiteReporter}
-import io.gearpump.codahale.metrics.{MetricFilter, Slf4jReporter}
-import io.gearpump.metrics.Metrics.{DemandMoreMetrics, ReportMetrics}
-import io.gearpump.metrics.MetricsReporterService.ReportTo
-import io.gearpump.util.Constants._
-import io.gearpump.util.LogUtil
-
-/**
- * Reports the metrics data to some where, like Ganglia, remote Akka actor, log files...
- *
- * @param metrics Holds a list of metrics object.
- */
-class MetricsReporterService(metrics: Metrics) extends Actor {
-
- private val LOG = LogUtil.getLogger(getClass)
- private implicit val system = context.system
-
- private val reportInterval = system.settings.config.getInt(GEARPUMP_METRIC_REPORT_INTERVAL)
- private val reporter = getReporter
- implicit val dispatcher = context.dispatcher
-
- def receive: Receive = {
- // The subscriber is demanding more messages.
- case DemandMoreMetrics(subscriber) => {
- reporter.report(subscriber)
- context.system.scheduler.scheduleOnce(reportInterval.milliseconds,
- subscriber, ReportMetrics)
- }
- }
-
- def startGraphiteReporter(): ReportTo = {
- val graphiteHost = system.settings.config.getString(GEARPUMP_METRIC_GRAPHITE_HOST)
- val graphitePort = system.settings.config.getInt(GEARPUMP_METRIC_GRAPHITE_PORT)
-
- val graphite = new Graphite(new InetSocketAddress(graphiteHost, graphitePort))
- LOG.info(s"reporting to $graphiteHost, $graphitePort")
- new ReportTo {
- private val reporter = GraphiteReporter.forRegistry(metrics.registry)
- .convertRatesTo(TimeUnit.SECONDS)
- .convertDurationsTo(TimeUnit.MILLISECONDS)
- .filter(MetricFilter.ALL)
- .build(graphite)
-
- override def report(to: ActorRef): Unit = reporter.report()
- }
- }
-
- def startSlf4jReporter(): ReportTo = {
- new ReportTo {
- val reporter = Slf4jReporter.forRegistry(metrics.registry)
- .convertRatesTo(TimeUnit.SECONDS)
- .convertDurationsTo(TimeUnit.MILLISECONDS)
- .filter(MetricFilter.ALL)
- .outputTo(LOG)
- .build()
-
- override def report(to: ActorRef): Unit = reporter.report()
- }
- }
-
- def startAkkaReporter(): ReportTo = {
- new AkkaReporter(system, metrics.registry)
- }
-
- def getReporter: ReportTo = {
- val reporterType = system.settings.config.getString(GEARPUMP_METRIC_REPORTER)
- LOG.info(s"Metrics reporter is enabled, using $reporterType reporter")
- val reporter = reporterType match {
- case "graphite" => startGraphiteReporter()
- case "logfile" => startSlf4jReporter()
- case "akka" => startAkkaReporter()
- }
- reporter
- }
-}
-
-object MetricsReporterService {
-
- /** Target where user want to report the metrics data to */
- trait ReportTo {
- def report(to: ActorRef): Unit
- }
-}
\ No newline at end of file