You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:57 UTC

[47/49] incubator-gearpump git commit: fix GEARPUMP-118 change package name to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/appmaster/ApplicationState.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/ApplicationState.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/ApplicationState.scala
deleted file mode 100644
index 3b967f4..0000000
--- a/core/src/main/scala/io/gearpump/cluster/appmaster/ApplicationState.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.appmaster
-
-import io.gearpump.cluster.{AppDescription, AppJar}
-
-/**
- * This state for single application, it is be distributed across the masters.
- */
-case class ApplicationState(
-    appId: Int, appName: String, attemptId: Int, app: AppDescription, jar: Option[AppJar],
-    username: String, state: Any) extends Serializable {
-
-  override def equals(other: Any): Boolean = {
-    other match {
-      case that: ApplicationState =>
-        if (appId == that.appId && attemptId == that.attemptId) {
-          true
-        } else {
-          false
-        }
-      case _ =>
-        false
-    }
-  }
-
-  override def hashCode: Int = {
-    import akka.routing.MurmurHash._
-    extendHash(appId, attemptId, startMagicA, startMagicB)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala
deleted file mode 100644
index 6fcb5e7..0000000
--- a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.appmaster
-
-import akka.actor.{ActorRef, Address, PoisonPill}
-
-import io.gearpump.cluster.scheduler.Resource
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.util.ActorSystemBooter.BindLifeCycle
-
-case class WorkerInfo(workerId: WorkerId, ref: ActorRef)
-
-/**
- * Configurations to start an executor system on remote machine
- *
- * @param address Remote address where we start an Actor System.
- */
-case class ExecutorSystem(executorSystemId: Int, address: Address, daemon:
-    ActorRef, resource: Resource, worker: WorkerInfo) {
-  def bindLifeCycleWith(actor: ActorRef): Unit = {
-    daemon ! BindLifeCycle(actor)
-  }
-
-  def shutdown(): Unit = {
-    daemon ! PoisonPill
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala
deleted file mode 100644
index 78432f4..0000000
--- a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.appmaster
-
-import scala.concurrent.duration._
-
-import akka.actor._
-import org.slf4j.Logger
-
-import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor
-import io.gearpump.cluster.ExecutorJVMConfig
-import io.gearpump.cluster.WorkerToAppMaster._
-import io.gearpump.cluster.appmaster.ExecutorSystemLauncher._
-import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, Session}
-import io.gearpump.cluster.scheduler.Resource
-import io.gearpump.util.ActorSystemBooter.{ActorSystemRegistered, RegisterActorSystem}
-import io.gearpump.util.{ActorSystemBooter, ActorUtil, Constants, LogUtil}
-
-/**
- * This launches single executor system on target worker.
- *
- * Please use ExecutorSystemLauncher.props() to construct this actor
- *
- * @param session The session that request to launch executor system
- */
-private[appmaster]
-class ExecutorSystemLauncher(appId: Int, session: Session) extends Actor {
-
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-
-  val scheduler = context.system.scheduler
-  implicit val executionContext = context.dispatcher
-
-  private val systemConfig = context.system.settings.config
-  val timeoutSetting = systemConfig.getInt(Constants.GEARPUMP_START_EXECUTOR_SYSTEM_TIMEOUT_MS)
-
-  val timeout = scheduler.scheduleOnce(timeoutSetting.milliseconds,
-    self, LaunchExecutorSystemTimeout(session))
-
-  def receive: Receive = waitForLaunchCommand
-
-  def waitForLaunchCommand: Receive = {
-    case LaunchExecutorSystem(worker, executorSystemId, resource) =>
-      val launcherPath = ActorUtil.getFullPath(context.system, self.path)
-      val jvmConfig = Option(session.executorSystemJvmConfig)
-        .map(getExecutorJvmConfig(_, s"app${appId}system${executorSystemId}", launcherPath)).orNull
-
-      val launch = LaunchExecutor(appId, executorSystemId, resource, jvmConfig)
-      LOG.info(s"Launching Executor ...appId: $appId, executorSystemId: $executorSystemId, " +
-        s"slots: ${resource.slots} on worker $worker")
-
-      worker.ref ! launch
-      context.become(waitForActorSystemToStart(sender, launch, worker, executorSystemId))
-  }
-
-  def waitForActorSystemToStart(
-      replyTo: ActorRef, launch: LaunchExecutor, worker: WorkerInfo, executorSystemId: Int)
-    : Receive = {
-    case RegisterActorSystem(systemPath) =>
-      import launch._
-      timeout.cancel()
-      LOG.info(s"Received RegisterActorSystem $systemPath for session ${session.requestor}")
-      sender ! ActorSystemRegistered(worker.ref)
-      val system =
-        ExecutorSystem(executorId, AddressFromURIString(systemPath), sender, resource, worker)
-      replyTo ! LaunchExecutorSystemSuccess(system, session)
-      context.stop(self)
-    case reject@ExecutorLaunchRejected(reason, ex) =>
-      LOG.error(s"Executor Launch ${launch.resource} failed reason: $reason", ex)
-      replyTo ! LaunchExecutorSystemRejected(launch.resource, reason, session)
-      context.stop(self)
-    case timeout: LaunchExecutorSystemTimeout =>
-      LOG.error(s"The Executor ActorSystem $executorSystemId has not been started in time")
-      replyTo ! timeout
-      context.stop(self)
-  }
-}
-
-private[appmaster]
-object ExecutorSystemLauncher {
-
-  case class LaunchExecutorSystem(worker: WorkerInfo, systemId: Int, resource: Resource)
-
-  case class LaunchExecutorSystemSuccess(system: ExecutorSystem, session: Session)
-
-  case class LaunchExecutorSystemRejected(resource: Resource, reason: Any, session: Session)
-
-  case class LaunchExecutorSystemTimeout(session: Session)
-
-  private def getExecutorJvmConfig(conf: ExecutorSystemJvmConfig, systemName: String,
-      reportBack: String): ExecutorJVMConfig = {
-    Option(conf).map { conf =>
-      import conf._
-      ExecutorJVMConfig(classPath, jvmArguments, classOf[ActorSystemBooter].getName,
-        Array(systemName, reportBack), jar, username, executorAkkaConfig)
-    }.getOrElse(null)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala
deleted file mode 100644
index c5ec600..0000000
--- a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.appmaster
-
-import scala.concurrent.duration._
-
-import akka.actor._
-import com.typesafe.config.Config
-
-import io.gearpump.cluster.AppMasterToMaster.RequestResource
-import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated
-import io.gearpump.cluster._
-import io.gearpump.cluster.appmaster.ExecutorSystemLauncher._
-import io.gearpump.cluster.appmaster.ExecutorSystemScheduler._
-import io.gearpump.cluster.scheduler.{ResourceAllocation, ResourceRequest}
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.util.{Constants, LogUtil}
-
-/**
- * ExecutorSystem is also a type of resource, this class schedules ExecutorSystem for AppMaster.
- * AppMaster can use this class to directly request a live executor actor systems. The communication
- * in the background with Master and Worker is hidden from AppMaster.
- *
- * Please use ExecutorSystemScheduler.props() to construct this actor
- */
-private[appmaster]
-class ExecutorSystemScheduler(appId: Int, masterProxy: ActorRef,
-    executorSystemLauncher: (Int, Session) => Props) extends Actor {
-
-  private val LOG = LogUtil.getLogger(getClass, app = appId)
-  implicit val timeout = Constants.FUTURE_TIMEOUT
-  implicit val actorSystem = context.system
-  var currentSystemId = 0
-
-  var resourceAgents = Map.empty[Session, ActorRef]
-
-  def receive: Receive = {
-    clientCommands orElse resourceAllocationMessageHandler orElse executorSystemMessageHandler
-  }
-
-  def clientCommands: Receive = {
-    case start: StartExecutorSystems =>
-      LOG.info(s"starting executor systems (ExecutorSystemConfig(${start.executorSystemConfig}), " +
-        s"Resources(${start.resources.mkString(",")}))")
-      val requestor = sender()
-      val executorSystemConfig = start.executorSystemConfig
-      val session = Session(requestor, executorSystemConfig)
-      val agent = resourceAgents.getOrElse(session,
-        context.actorOf(Props(new ResourceAgent(masterProxy, session))))
-      resourceAgents = resourceAgents + (session -> agent)
-
-      start.resources.foreach { resource =>
-        agent ! RequestResource(appId, resource)
-      }
-
-    case StopExecutorSystem(executorSystem) =>
-      executorSystem.shutdown
-  }
-
-  def resourceAllocationMessageHandler: Receive = {
-    case ResourceAllocatedForSession(allocations, session) =>
-      if (isSessionAlive(session)) {
-        allocations.foreach { resourceAllocation =>
-          val ResourceAllocation(resource, worker, workerId) = resourceAllocation
-
-          val launcher = context.actorOf(executorSystemLauncher(appId, session))
-          launcher ! LaunchExecutorSystem(WorkerInfo(workerId, worker), currentSystemId, resource)
-          currentSystemId = currentSystemId + 1
-        }
-      }
-    case ResourceAllocationTimeOut(session) =>
-      if (isSessionAlive(session)) {
-        resourceAgents = resourceAgents - session
-        session.requestor ! StartExecutorSystemTimeout
-      }
-  }
-
-  def executorSystemMessageHandler: Receive = {
-    case LaunchExecutorSystemSuccess(system, session) =>
-      if (isSessionAlive(session)) {
-        LOG.info("LaunchExecutorSystemSuccess, send back to " + session.requestor)
-        system.bindLifeCycleWith(self)
-        session.requestor ! ExecutorSystemStarted(system, session.executorSystemJvmConfig.jar)
-      } else {
-        LOG.error("We get a ExecutorSystem back, but resource requestor is no longer valid. " +
-          "Will shutdown the allocated system")
-        system.shutdown
-      }
-    case LaunchExecutorSystemTimeout(session) =>
-      if (isSessionAlive(session)) {
-        LOG.error(s"Failed to launch executor system for ${session.requestor} due to timeout")
-        session.requestor ! StartExecutorSystemTimeout
-      }
-
-    case LaunchExecutorSystemRejected(resource, reason, session) =>
-      if (isSessionAlive(session)) {
-        LOG.error(s"Failed to launch executor system, due to $reason, " +
-          s"will ask master to allocate new resources $resource")
-        resourceAgents.get(session).map { resourceAgent: ActorRef =>
-          resourceAgent ! RequestResource(appId, ResourceRequest(resource, WorkerId.unspecified))
-        }
-      }
-  }
-
-  private def isSessionAlive(session: Session): Boolean = {
-    Option(session).flatMap(session => resourceAgents.get(session)).nonEmpty
-  }
-}
-
-object ExecutorSystemScheduler {
-
-  case class StartExecutorSystems(
-      resources: Array[ResourceRequest], executorSystemConfig: ExecutorSystemJvmConfig)
-
-  case class ExecutorSystemStarted(system: ExecutorSystem, boundedJar: Option[AppJar])
-
-  case class StopExecutorSystem(system: ExecutorSystem)
-
-  case object StartExecutorSystemTimeout
-
-  case class ExecutorSystemJvmConfig(classPath: Array[String], jvmArguments: Array[String],
-      jar: Option[AppJar], username: String, executorAkkaConfig: Config = null)
-
-  /**
-   * For each client which ask for an executor system, the scheduler will create a session for it.
-   *
-   */
-  private[appmaster]
-  case class Session(requestor: ActorRef, executorSystemJvmConfig: ExecutorSystemJvmConfig)
-
-  /**
-   * This is a agent for session to request resource
-   *
-   * @param session the original requester of the resource requests
-   */
-  private[appmaster]
-  class ResourceAgent(master: ActorRef, session: Session) extends Actor {
-    private var resourceRequestor: ActorRef = null
-    var timeOutClock: Cancellable = null
-    private var unallocatedResource: Int = 0
-
-    import context.dispatcher
-
-    import io.gearpump.util.Constants._
-
-    val timeout = context.system.settings.config.getInt(GEARPUMP_RESOURCE_ALLOCATION_TIMEOUT)
-
-    def receive: Receive = {
-      case request: RequestResource =>
-        unallocatedResource += request.request.resource.slots
-        Option(timeOutClock).map(_.cancel)
-        timeOutClock = context.system.scheduler.scheduleOnce(
-          timeout.seconds, self, ResourceAllocationTimeOut(session))
-        resourceRequestor = sender
-        master ! request
-      case ResourceAllocated(allocations) =>
-        unallocatedResource -= allocations.map(_.resource.slots).sum
-        resourceRequestor forward ResourceAllocatedForSession(allocations, session)
-      case timeout: ResourceAllocationTimeOut =>
-        if (unallocatedResource > 0) {
-          resourceRequestor ! ResourceAllocationTimeOut(session)
-          // We will not receive any ResourceAllocation after timeout
-          context.stop(self)
-        }
-    }
-  }
-
-  private[ExecutorSystemScheduler]
-  case class ResourceAllocatedForSession(resource: Array[ResourceAllocation], session: Session)
-
-  private[ExecutorSystemScheduler]
-  case class ResourceAllocationTimeOut(session: Session)
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeper.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeper.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeper.scala
deleted file mode 100644
index f8c8503..0000000
--- a/core/src/main/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeper.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.appmaster
-
-import java.util.concurrent.TimeUnit
-import scala.concurrent.duration.FiniteDuration
-
-import akka.actor._
-
-import io.gearpump.cluster.AppMasterToMaster.RegisterAppMaster
-import io.gearpump.cluster.MasterToAppMaster.AppMasterRegistered
-import io.gearpump.cluster.appmaster.MasterConnectionKeeper.AppMasterRegisterTimeout
-import io.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus.{MasterConnected, MasterStopped}
-import io.gearpump.cluster.master.MasterProxy.{MasterRestarted, WatchMaster}
-import io.gearpump.util.LogUtil
-
-/**
- * Watches the liveness of Master.
- *
- * When Master is restarted, it sends RegisterAppMaster to the new Master instance.
- * If Master is stopped, it sends the MasterConnectionStatus to listener
- *
- * please use MasterConnectionKeeper.props() to construct this actor
- */
-private[appmaster]
-class MasterConnectionKeeper(
-    register: RegisterAppMaster, masterProxy: ActorRef, masterStatusListener: ActorRef)
-  extends Actor {
-
-  import context.dispatcher
-
-  private val LOG = LogUtil.getLogger(getClass)
-  private var master: ActorRef = null
-
-  // Subscribe self to masterProxy,
-  masterProxy ! WatchMaster(self)
-
-  def registerAppMaster: Cancellable = {
-    masterProxy ! register
-    context.system.scheduler.scheduleOnce(FiniteDuration(30, TimeUnit.SECONDS),
-      self, AppMasterRegisterTimeout)
-  }
-
-  context.become(waitMasterToConfirm(registerAppMaster))
-
-  def waitMasterToConfirm(cancelRegister: Cancellable): Receive = {
-    case AppMasterRegistered(appId) =>
-      cancelRegister.cancel()
-      masterStatusListener ! MasterConnected
-      context.become(masterLivenessListener)
-    case AppMasterRegisterTimeout =>
-      cancelRegister.cancel()
-      masterStatusListener ! MasterStopped
-      context.stop(self)
-  }
-
-  def masterLivenessListener: Receive = {
-    case MasterRestarted =>
-      LOG.info("Master restarted, re-registering appmaster....")
-      context.become(waitMasterToConfirm(registerAppMaster))
-    case MasterStopped =>
-      LOG.info("Master is dead, killing this AppMaster....")
-      masterStatusListener ! MasterStopped
-      context.stop(self)
-  }
-
-  def receive: Receive = null
-}
-
-private[appmaster] object MasterConnectionKeeper {
-
-  case object AppMasterRegisterTimeout
-
-  object MasterConnectionStatus {
-
-    case object MasterConnected
-
-    case object MasterStopped
-
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala b/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala
deleted file mode 100644
index 41c01d8..0000000
--- a/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.client
-
-import java.util.concurrent.TimeUnit
-import scala.collection.JavaConverters._
-import scala.concurrent.duration.Duration
-import scala.concurrent.{Await, Future}
-import scala.util.Try
-
-import akka.actor.{ActorRef, ActorSystem}
-import akka.util.Timeout
-import com.typesafe.config.{Config, ConfigValueFactory}
-import org.slf4j.Logger
-
-import io.gearpump.cluster.MasterToAppMaster.{AppMastersData, ReplayFromTimestampWindowTrailingEdge}
-import io.gearpump.cluster.MasterToClient.ReplayApplicationResult
-import io.gearpump.cluster._
-import io.gearpump.cluster.master.MasterProxy
-import io.gearpump.jarstore.JarStoreService
-import io.gearpump.util.Constants._
-import io.gearpump.util.{ActorUtil, Constants, LogUtil, Util}
-
-/**
- * ClientContext is a user facing util to submit/manage an application.
- *
- * TODO: add interface to query master here
- */
-class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) {
-
-  def this(system: ActorSystem) = {
-    this(system.settings.config, system, null)
-  }
-
-  def this(config: Config) = {
-    this(config, null, null)
-  }
-
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-  private implicit val timeout = Timeout(5, TimeUnit.SECONDS)
-
-  implicit val system = Option(sys).getOrElse(ActorSystem(s"client${Util.randInt()}", config))
-  LOG.info(s"Starting system ${system.name}")
-  val shouldCleanupSystem = Option(sys).isEmpty
-
-  private val jarStoreService = JarStoreService.get(config)
-  jarStoreService.init(config, system)
-
-  private lazy val master: ActorRef = {
-    val masters = config.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala
-      .flatMap(Util.parseHostList)
-    val master = Option(_master).getOrElse(system.actorOf(MasterProxy.props(masters),
-      s"masterproxy${system.name}"))
-    LOG.info(s"Creating master proxy ${master} for master list: $masters")
-    master
-  }
-
-  /**
-   * Submits an application with default jar setting. Use java property "gearpump.app.jar" if
-   * defined. Otherwise, it assumes the jar is on the target runtime classpath, thus will
-   * not send the jar across the wire.
-   */
-  def submit(app: Application): Int = {
-    submit(app, System.getProperty(GEARPUMP_APP_JAR))
-  }
-
-  def submit(app: Application, jar: String): Int = {
-    submit(app, jar, getExecutorNum())
-  }
-
-  def submit(app: Application, jar: String, executorNum: Int): Int = {
-    val client = getMasterClient
-    val appName = checkAndAddNamePrefix(app.name, System.getProperty(GEARPUMP_APP_NAME_PREFIX))
-    val submissionConfig = getSubmissionConfig(config)
-      .withValue(APPLICATION_EXECUTOR_NUMBER, ConfigValueFactory.fromAnyRef(executorNum))
-    val appDescription =
-      AppDescription(appName, app.appMaster.getName, app.userConfig, submissionConfig)
-    val appJar = Option(jar).map(loadFile)
-    client.submitApplication(appDescription, appJar)
-  }
-
-  private def getExecutorNum(): Int = {
-    Try(System.getProperty(APPLICATION_EXECUTOR_NUMBER).toInt).getOrElse(1)
-  }
-
-  private def getSubmissionConfig(config: Config): Config = {
-    ClusterConfig.filterOutDefaultConfig(config)
-  }
-
-  def replayFromTimestampWindowTrailingEdge(appId: Int): ReplayApplicationResult = {
-    import scala.concurrent.ExecutionContext.Implicits.global
-    val result = Await.result(
-      ActorUtil.askAppMaster[ReplayApplicationResult](master,
-        appId, ReplayFromTimestampWindowTrailingEdge(appId)), Duration.Inf)
-    result
-  }
-
-  def askAppMaster[T](appId: Int, msg: Any): Future[T] = {
-    import scala.concurrent.ExecutionContext.Implicits.global
-    ActorUtil.askAppMaster[T](master, appId, msg)
-  }
-
-  def listApps: AppMastersData = {
-    val client = getMasterClient
-    client.listApplications
-  }
-
-  def shutdown(appId: Int): Unit = {
-    val client = getMasterClient
-    client.shutdownApplication(appId)
-  }
-
-  def resolveAppID(appId: Int): ActorRef = {
-    val client = getMasterClient
-    client.resolveAppId(appId)
-  }
-
-  def close(): Unit = {
-    if (shouldCleanupSystem) {
-      LOG.info(s"Shutting down system ${system.name}")
-      system.terminate()
-    }
-  }
-
-  private def loadFile(jarPath: String): AppJar = {
-    val jarFile = new java.io.File(jarPath)
-    val path = jarStoreService.copyFromLocal(jarFile)
-    AppJar(jarFile.getName, path)
-  }
-
-  private def checkAndAddNamePrefix(appName: String, namePrefix: String): String = {
-    val fullName = if (namePrefix != null && namePrefix != "") {
-      namePrefix + "_" + appName
-    } else {
-      appName
-    }
-    if (!Util.validApplicationName(fullName)) {
-      close()
-      val error = s"The application name $appName is not a proper name. An app name can " +
-        "be a sequence of letters, numbers or underscore character \"_\""
-      throw new Exception(error)
-    }
-    fullName
-  }
-
-  private def getMasterClient: MasterClient = {
-    val timeout = Try(config.getInt(Constants.GEARPUMP_MASTERCLIENT_TIMEOUT)).getOrElse(90)
-    new MasterClient(master, akka.util.Timeout(timeout, TimeUnit.SECONDS))
-  }
-}
-
-object ClientContext {
-
-  def apply(): ClientContext = new ClientContext(ClusterConfig.default(), null, null)
-
-  def apply(system: ActorSystem): ClientContext = {
-    new ClientContext(ClusterConfig.default(), system, null)
-  }
-
-  def apply(system: ActorSystem, master: ActorRef): ClientContext = {
-    new ClientContext(ClusterConfig.default(), system, master)
-  }
-
-  def apply(config: Config): ClientContext = new ClientContext(config, null, null)
-
-  def apply(config: Config, system: ActorSystem, master: ActorRef): ClientContext = {
-    new ClientContext(config, system, master)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/client/MasterClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/client/MasterClient.scala b/core/src/main/scala/io/gearpump/cluster/client/MasterClient.scala
deleted file mode 100644
index 9edaf46..0000000
--- a/core/src/main/scala/io/gearpump/cluster/client/MasterClient.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.client
-
-import scala.concurrent.duration.Duration
-import scala.concurrent.{Await, Future}
-import scala.util.{Failure, Success}
-
-import akka.actor.ActorRef
-import akka.pattern.ask
-import akka.util.Timeout
-
-import io.gearpump.cluster.ClientToMaster._
-import io.gearpump.cluster.MasterToAppMaster.{AppMastersData, AppMastersDataRequest}
-import io.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult, SubmitApplicationResult}
-import io.gearpump.cluster.{AppDescription, AppJar}
-
-/**
- * Client to inter-operate with Master node.
- *
- * NOTE: Stateless, thread safe
- */
-class MasterClient(master: ActorRef, timeout: Timeout) {
-  implicit val masterClientTimeout = timeout
-
-  def submitApplication(app: AppDescription, appJar: Option[AppJar]): Int = {
-    val result = Await.result(
-      (master ? SubmitApplication(app, appJar)).asInstanceOf[Future[SubmitApplicationResult]],
-      Duration.Inf)
-    val appId = result.appId match {
-      case Success(appId) =>
-        // scalastyle:off println
-        Console.println(s"Submit application succeed. The application id is $appId")
-        // scalastyle:on println
-        appId
-      case Failure(ex) => throw ex
-    }
-    appId
-  }
-
-  def resolveAppId(appId: Int): ActorRef = {
-    val result = Await.result(
-      (master ? ResolveAppId(appId)).asInstanceOf[Future[ResolveAppIdResult]], Duration.Inf)
-    result.appMaster match {
-      case Success(appMaster) => appMaster
-      case Failure(ex) => throw ex
-    }
-  }
-
-  def shutdownApplication(appId: Int): Unit = {
-    val result = Await.result(
-      (master ? ShutdownApplication(appId)).asInstanceOf[Future[ShutdownApplicationResult]],
-      Duration.Inf)
-    result.appId match {
-      case Success(_) =>
-      case Failure(ex) => throw ex
-    }
-  }
-
-  def listApplications: AppMastersData = {
-    val result = Await.result(
-      (master ? AppMastersDataRequest).asInstanceOf[Future[AppMastersData]], Duration.Inf)
-    result
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/main/ArgumentsParser.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/main/ArgumentsParser.scala b/core/src/main/scala/io/gearpump/cluster/main/ArgumentsParser.scala
deleted file mode 100644
index 209f831..0000000
--- a/core/src/main/scala/io/gearpump/cluster/main/ArgumentsParser.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.main
-
-import io.gearpump.cluster.main.ArgumentsParser.Syntax
-
-case class CLIOption[+T](
-    description: String = "", required: Boolean = false, defaultValue: Option[T] = None)
-
-class ParseResult(optionMap: Map[String, String], remainArguments: Array[String]) {
-  def getInt(key: String): Int = optionMap.get(key).get.toInt
-
-  def getString(key: String): String = optionMap.get(key).get
-
-  def getBoolean(key: String): Boolean = optionMap.get(key).get.toBoolean
-
-  def exists(key: String): Boolean = !(optionMap.getOrElse(key, "").isEmpty)
-
-  def remainArgs: Array[String] = this.remainArguments
-}
-
-/**
- * Parser for command line arguments
- *
- * Grammar: -option1 value1 -option2 value3 -flag1 -flag2 remainArg1 remainArg2...
- */
-trait ArgumentsParser {
-
-  val ignoreUnknownArgument = false
-
-  // scalastyle:off println
-  def help(): Unit = {
-    Console.println(s"\nHelp: $description")
-    var usage = List.empty[String]
-    options.map(kv => if (kv._2.required) {
-      usage = usage :+ s"-${kv._1} (required:${kv._2.required})${kv._2.description}"
-    } else {
-      usage = usage :+ s"-${kv._1} (required:${kv._2.required}, " +
-        s"default:${kv._2.defaultValue.getOrElse("")})${kv._2.description}"
-    })
-    usage :+= remainArgs.map(k => s"<$k>").mkString(" ")
-    usage.foreach(Console.println(_))
-  }
-  // scalastyle:on println
-
-  def parse(args: Array[String]): ParseResult = {
-    val syntax = Syntax(options, remainArgs, ignoreUnknownArgument)
-    ArgumentsParser.parse(syntax, args)
-  }
-
-  val description: String = ""
-  val options: Array[(String, CLIOption[Any])] = Array.empty[(String, CLIOption[Any])]
-  val remainArgs: Array[String] = Array.empty[String]
-}
-
-object ArgumentsParser {
-
-  case class Syntax(
-      val options: Array[(String, CLIOption[Any])], val remainArgs: Array[String],
-      val ignoreUnknownArgument: Boolean)
-
-  def parse(syntax: Syntax, args: Array[String]): ParseResult = {
-    import syntax.{ignoreUnknownArgument, options, remainArgs}
-    var config = Map.empty[String, String]
-    var remain = Array.empty[String]
-
-    def doParse(argument: List[String]): Unit = {
-      argument match {
-        case Nil => Unit // true if everything processed successfully
-
-        case key :: value :: rest if key.startsWith("-") && !value.startsWith("-") =>
-          val fixedKey = key.substring(1)
-          if (!options.map(_._1).contains(fixedKey)) {
-            if (!ignoreUnknownArgument) {
-              throw new Exception(s"found unknown option $fixedKey")
-            } else {
-              remain ++= Array(key, value)
-            }
-          } else {
-            config += fixedKey -> value
-          }
-          doParse(rest)
-
-        case key :: rest if key.startsWith("-") =>
-          val fixedKey = key.substring(1)
-          if (!options.map(_._1).contains(fixedKey)) {
-            throw new Exception(s"found unknown option $fixedKey")
-          } else {
-            config += fixedKey -> "true"
-          }
-          doParse(rest)
-
-        case value :: rest =>
-          // scalastyle:off println
-          Console.err.println(s"Warning: get unknown argument $value, maybe it is a main class")
-          // scalastyle:on println
-          remain ++= value :: rest
-          doParse(Nil)
-      }
-    }
-    doParse(args.toList)
-
-    options.foreach { pair =>
-      val (key, option) = pair
-      if (!config.contains(key) && !option.required) {
-        config += key -> option.defaultValue.getOrElse("").toString
-      }
-    }
-
-    options.foreach { pair =>
-      val (key, value) = pair
-      if (config.get(key).isEmpty) {
-        throw new Exception(s"Missing option ${key}...")
-      }
-    }
-
-    if (remain.length < remainArgs.length) {
-      throw new Exception(s"Missing arguments ...")
-    }
-
-    new ParseResult(config, remain)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala b/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala
deleted file mode 100644
index fb3e5c4..0000000
--- a/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.master
-
-import java.util.concurrent.{TimeUnit, TimeoutException}
-import scala.collection.JavaConverters._
-import scala.concurrent.duration.Duration
-import scala.util.{Failure, Success}
-
-import akka.actor.{Actor, ActorRef, Props, _}
-import com.typesafe.config.Config
-import org.slf4j.Logger
-
-import io.gearpump.cluster.AppMasterToMaster.RequestResource
-import io.gearpump.cluster.AppMasterToWorker.{LaunchExecutor, ShutdownExecutor}
-import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated
-import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
-import io.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected
-import io.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, AppMasterRuntimeInfo, WorkerInfo}
-import io.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest}
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.cluster.{AppDescription, AppJar, _}
-import io.gearpump.transport.HostPort
-import io.gearpump.util.ActorSystemBooter._
-import io.gearpump.util.Constants._
-import io.gearpump.util.{ActorSystemBooter, ActorUtil, LogUtil, Util}
-
-/**
- *
- * AppMasterLauncher is a child Actor of AppManager, it is responsible
- * to launch the AppMaster on the cluster.
- */
-class AppMasterLauncher(
-    appId: Int, executorId: Int, app: AppDescription,
-    jar: Option[AppJar], username: String, master: ActorRef, client: Option[ActorRef])
-  extends Actor {
-  private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
-
-  val scheduler = context.system.scheduler
-  val systemConfig = context.system.settings.config
-  val TIMEOUT = Duration(15, TimeUnit.SECONDS)
-
-  val appMasterAkkaConfig: Config = app.clusterConfig
-
-  LOG.info(s"Ask Master resource to start AppMaster $appId...")
-  master ! RequestResource(appId, ResourceRequest(Resource(1), WorkerId.unspecified))
-
-  def receive: Receive = waitForResourceAllocation
-
-  def waitForResourceAllocation: Receive = {
-    case ResourceAllocated(allocations) =>
-
-      val ResourceAllocation(resource, worker, workerId) = allocations(0)
-      LOG.info(s"Resource allocated for appMaster $appId on worker ${workerId}(${worker.path})")
-
-      val submissionTime = System.currentTimeMillis()
-
-      val appMasterInfo = AppMasterRuntimeInfo(appId, app.name, worker, username,
-        submissionTime, config = appMasterAkkaConfig)
-      val workerInfo = WorkerInfo(workerId, worker)
-      val appMasterContext =
-        AppMasterContext(appId, username, resource, workerInfo, jar, null, appMasterInfo)
-      LOG.info(s"Try to launch a executor for AppMaster on worker ${workerId} for app $appId")
-      val name = ActorUtil.actorNameForExecutor(appId, executorId)
-      val selfPath = ActorUtil.getFullPath(context.system, self.path)
-
-      val jvmSetting =
-        Util.resolveJvmSetting(appMasterAkkaConfig.withFallback(systemConfig)).appMater
-      val executorJVM = ExecutorJVMConfig(jvmSetting.classPath, jvmSetting.vmargs,
-        classOf[ActorSystemBooter].getName, Array(name, selfPath), jar,
-        username, appMasterAkkaConfig)
-
-      worker ! LaunchExecutor(appId, executorId, resource, executorJVM)
-      context.become(waitForActorSystemToStart(worker, appMasterContext, app.userConfig, resource))
-  }
-
-  def waitForActorSystemToStart(
-      worker: ActorRef, appContext: AppMasterContext, user: UserConfig, resource: Resource)
-    : Receive = {
-    case ExecutorLaunchRejected(reason, ex) =>
-      LOG.error(s"Executor Launch failed reason: $reason", ex)
-      LOG.info(s"reallocate resource $resource to start appmaster")
-      master ! RequestResource(appId, ResourceRequest(resource, WorkerId.unspecified))
-      context.become(waitForResourceAllocation)
-    case RegisterActorSystem(systemPath) =>
-      LOG.info(s"Received RegisterActorSystem $systemPath for AppMaster")
-      sender ! ActorSystemRegistered(worker)
-
-      val masterAddress = systemConfig.getStringList(GEARPUMP_CLUSTER_MASTERS)
-        .asScala.map(HostPort(_)).map(ActorUtil.getMasterActorPath)
-      sender ! CreateActor(
-        AppMasterRuntimeEnvironment.props(masterAddress, app, appContext), s"appdaemon$appId")
-
-      import context.dispatcher
-      val appMasterTimeout = scheduler.scheduleOnce(TIMEOUT, self,
-        CreateActorFailed(app.appMaster, new TimeoutException))
-      context.become(waitForAppMasterToStart(worker, appMasterTimeout))
-  }
-
-  def waitForAppMasterToStart(worker: ActorRef, cancel: Cancellable): Receive = {
-    case ActorCreated(appMaster, _) =>
-      cancel.cancel()
-      sender ! BindLifeCycle(appMaster)
-      LOG.info(s"AppMaster is created, mission complete...")
-      replyToClient(SubmitApplicationResult(Success(appId)))
-      context.stop(self)
-    case CreateActorFailed(name, reason) =>
-      cancel.cancel()
-      worker ! ShutdownExecutor(appId, executorId, reason.getMessage)
-      replyToClient(SubmitApplicationResult(Failure(reason)))
-      context.stop(self)
-  }
-
-  def replyToClient(result: SubmitApplicationResult): Unit = {
-    if (client.isDefined) {
-      client.get.tell(result, master)
-    }
-  }
-}
-
-object AppMasterLauncher extends AppMasterLauncherFactory {
-  def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar],
-      username: String, master: ActorRef, client: Option[ActorRef]): Props = {
-    Props(new AppMasterLauncher(appId, executorId, app, jar, username, master, client))
-  }
-}
-
-trait AppMasterLauncherFactory {
-  def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar],
-      username: String, master: ActorRef, client: Option[ActorRef]): Props
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/master/MasterProxy.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/master/MasterProxy.scala b/core/src/main/scala/io/gearpump/cluster/master/MasterProxy.scala
deleted file mode 100644
index 61d95dc..0000000
--- a/core/src/main/scala/io/gearpump/cluster/master/MasterProxy.scala
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.master
-
-import scala.concurrent.duration.FiniteDuration
-
-import akka.actor._
-import org.slf4j.Logger
-
-import io.gearpump.transport.HostPort
-import io.gearpump.util.{ActorUtil, LogUtil}
-
-/**
- * This works with Master HA. When there are multiple Master nodes,
- * This will find a active one.
- */
-class MasterProxy(masters: Iterable[ActorPath], timeout: FiniteDuration)
-  extends Actor with Stash {
-  import io.gearpump.cluster.master.MasterProxy._
-
-  val LOG: Logger = LogUtil.getLogger(getClass, name = self.path.name)
-
-  val contacts = masters.map { url =>
-    LOG.info(s"Contacts point URL: $url")
-    context.actorSelection(url)
-  }
-
-  var watchers: List[ActorRef] = List.empty[ActorRef]
-
-  import context.dispatcher
-
-  def findMaster(): Cancellable = {
-    repeatActionUtil(timeout) {
-      contacts foreach { contact =>
-        LOG.info(s"sending identity to $contact")
-        contact ! Identify(None)
-      }
-    }
-  }
-
-  context.become(establishing(findMaster()))
-
-  LOG.info("Master Proxy is started...")
-
-  override def postStop(): Unit = {
-    watchers.foreach(_ ! MasterStopped)
-    super.postStop()
-  }
-
-  override def receive: Receive = {
-    case _ =>
-  }
-
-  def establishing(findMaster: Cancellable): Actor.Receive = {
-    case ActorIdentity(_, Some(receptionist)) =>
-      context watch receptionist
-      LOG.info("Connected to [{}]", receptionist.path)
-      context.watch(receptionist)
-
-      watchers.foreach(_ ! MasterRestarted)
-      unstashAll()
-      findMaster.cancel()
-      context.become(active(receptionist) orElse messageHandler(receptionist))
-    case ActorIdentity(_, None) => // ok, use another instead
-    case msg =>
-      LOG.info(s"Stashing ${msg.getClass.getSimpleName}")
-      stash()
-  }
-
-  def active(receptionist: ActorRef): Actor.Receive = {
-    case Terminated(receptionist) =>
-      LOG.info("Lost contact with [{}], restablishing connection", receptionist)
-      context.become(establishing(findMaster))
-    case _: ActorIdentity => // ok, from previous establish, already handled
-    case WatchMaster(watcher) =>
-      watchers = watchers :+ watcher
-  }
-
-  def messageHandler(master: ActorRef): Receive = {
-    case msg =>
-      LOG.debug(s"Get msg ${msg.getClass.getSimpleName}, forwarding to ${master.path}")
-      master forward msg
-  }
-
-  def scheduler: Scheduler = context.system.scheduler
-  import scala.concurrent.duration._
-  private def repeatActionUtil(timeout: FiniteDuration)(action: => Unit): Cancellable = {
-    val send = scheduler.schedule(0.seconds, 2.seconds)(action)
-    val suicide = scheduler.scheduleOnce(timeout) {
-      send.cancel()
-      self ! PoisonPill
-    }
-
-    new Cancellable {
-      def cancel(): Boolean = {
-        val result1 = send.cancel()
-        val result2 = suicide.cancel()
-        result1 && result2
-      }
-
-      def isCancelled: Boolean = {
-        send.isCancelled && suicide.isCancelled
-      }
-    }
-  }
-}
-
-object MasterProxy {
-  case object MasterRestarted
-  case object MasterStopped
-  case class WatchMaster(watcher: ActorRef)
-
-  import scala.concurrent.duration._
-  def props(masters: Iterable[HostPort], duration: FiniteDuration = 30.seconds): Props = {
-    val contacts = masters.map(ActorUtil.getMasterActorPath(_))
-    Props(new MasterProxy(contacts, duration))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala b/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala
deleted file mode 100644
index 0996381..0000000
--- a/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.master
-
-import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
-
-/** Master status. Synced means all masters are live and synced. */
-object MasterStatus {
-  type Type = String
-  val Synced = "synced"
-  val UnSynced = "unsynced"
-}
-
-case class MasterNode(host: String, port: Int) {
-  def toTuple: (String, Int) = {
-    (host, port)
-  }
-}
-
-/**
- * Master information returned for REST API call
- */
-case class MasterSummary(
-    leader: MasterNode,
-    cluster: List[MasterNode],
-    aliveFor: Long,
-    logFile: String,
-    jarStore: String,
-    masterStatus: MasterStatus.Type,
-    homeDirectory: String,
-    activities: List[MasterActivity],
-    jvmName: String,
-    historyMetricsConfig: HistoryMetricsConfig = null)
-
-case class MasterActivity(time: Long, event: String)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala b/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala
deleted file mode 100644
index b25162e..0000000
--- a/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.scheduler
-
-import akka.actor.ActorRef
-
-import io.gearpump.cluster.worker.WorkerId
-
-case class Resource(slots: Int) {
-
-  // scalastyle:off spaces.after.plus
-  def +(other: Resource): Resource = Resource(slots + other.slots)
-  // scalastyle:on spaces.after.plus
-
-  def -(other: Resource): Resource = Resource(slots - other.slots)
-
-  def >(other: Resource): Boolean = slots > other.slots
-
-  def >=(other: Resource): Boolean = !(this < other)
-
-  def <(other: Resource): Boolean = slots < other.slots
-
-  def <=(other: Resource): Boolean = !(this > other)
-
-  def isEmpty: Boolean = {
-    slots == 0
-  }
-}
-
-/**
- * Each streaming job can have a priority, the job with higher priority
- * will get scheduled resource earlier than those with lower priority.
- */
-object Priority extends Enumeration {
-  type Priority = Value
-  val LOW, NORMAL, HIGH = Value
-}
-
-/**
- * Relaxation.ONEWORKER means only resource (slot) from that worker will be accepted by
- * the requestor application job.
- */
-object Relaxation extends Enumeration {
-  type Relaxation = Value
-
-  // Option ONEWORKER allow user to schedule a task on specific worker.
-  val ANY, ONEWORKER, SPECIFICWORKER = Value
-}
-
-import io.gearpump.cluster.scheduler.Priority._
-import io.gearpump.cluster.scheduler.Relaxation._
-
-case class ResourceRequest(
-    resource: Resource, workerId: WorkerId, priority: Priority = NORMAL,
-    relaxation: Relaxation = ANY, executorNum: Int = 1)
-
-case class ResourceAllocation(resource: Resource, worker: ActorRef, workerId: WorkerId)
-
-object Resource {
-  def empty: Resource = new Resource(0)
-
-  def min(res1: Resource, res2: Resource): Resource = if (res1.slots < res2.slots) res1 else res2
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/worker/ExecutorProcessLauncher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/worker/ExecutorProcessLauncher.scala b/core/src/main/scala/io/gearpump/cluster/worker/ExecutorProcessLauncher.scala
deleted file mode 100644
index 8581467..0000000
--- a/core/src/main/scala/io/gearpump/cluster/worker/ExecutorProcessLauncher.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.worker
-
-import com.typesafe.config.Config
-
-import io.gearpump.cluster.scheduler.Resource
-import io.gearpump.util.RichProcess
-
-/**
- * ExecutorProcessLauncher is used to launch a process for Executor using given parameters.
- *
- * User can implement this interface to decide the behavior of launching a process.
- * Set "gearpump.worker.executor-process-launcher" to your implemented class name.
- */
-trait ExecutorProcessLauncher {
-  val config: Config
-
-  /**
-   * This function launches a process for Executor using given parameters.
-   *
-   * @param appId The appId of the executor to be launched
-   * @param executorId The executorId of the executor to be launched
-   * @param resource The resource allocated for that executor
-   * @param options The command options
-   * @param classPath The classpath of the process
-   * @param mainClass The main class of the process
-   * @param arguments The rest arguments
-   */
-  def createProcess(
-      appId: Int, executorId: Int, resource: Resource, config: Config, options: Array[String],
-      classPath: Array[String], mainClass: String, arguments: Array[String]): RichProcess
-
-  /**
-   * This function will clean resources for a launched process.
-   * @param appId The appId of the launched executor
-   * @param executorId The executorId of launched executor
-   */
-  def cleanProcess(appId: Int, executorId: Int): Unit
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/worker/WorkerId.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/worker/WorkerId.scala b/core/src/main/scala/io/gearpump/cluster/worker/WorkerId.scala
deleted file mode 100644
index 24c6ad2..0000000
--- a/core/src/main/scala/io/gearpump/cluster/worker/WorkerId.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.worker
-
-/**
- * WorkerId is used to uniquely track a worker machine.
- *
- * @param sessionId sessionId is assigned by Master node for easy tracking. It is possible that
- *                  sessionId is **NOT** unique, so always use WorkerId for comparison.
- * @param registerTime the timestamp when a worker node register itself to master node
- */
-case class WorkerId(sessionId: Int, registerTime: Long)
-
-object WorkerId {
-  val unspecified: WorkerId = new WorkerId(-1, 0L)
-
-  def render(workerId: WorkerId): String = {
-    workerId.registerTime + "_" + workerId.sessionId
-  }
-
-  def parse(str: String): WorkerId = {
-    val pair = str.split("_")
-    new WorkerId(pair(1).toInt, pair(0).toLong)
-  }
-
-  implicit val workerIdOrdering: Ordering[WorkerId] = {
-    new Ordering[WorkerId] {
-
-      /** Compare timestamp first, then id */
-      override def compare(x: WorkerId, y: WorkerId): Int = {
-        if (x.registerTime < y.registerTime) {
-          -1
-        } else if (x.registerTime == y.registerTime) {
-          if (x.sessionId < y.sessionId) {
-            -1
-          } else if (x.sessionId == y.sessionId) {
-            0
-          } else {
-            1
-          }
-        } else {
-          1
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala b/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala
deleted file mode 100644
index cdf2d03..0000000
--- a/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.worker
-import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
-
-/**
- * Worker summary information for REST API.
- */
-case class WorkerSummary(
-    workerId: WorkerId,
-    state: String,
-    actorPath: String,
-    aliveFor: Long,
-    logFile: String,
-    executors: Array[ExecutorSlots],
-    totalSlots: Int,
-    availableSlots: Int,
-    homeDirectory: String,
-    jvmName: String,
-    // Id used to uniquely identity this worker process in low level resource manager like YARN.
-    resourceManagerContainerId: String,
-    historyMetricsConfig: HistoryMetricsConfig = null)
-
-object WorkerSummary {
-  def empty: WorkerSummary = {
-    WorkerSummary(WorkerId.unspecified, "", "", 0L, "",
-      Array.empty[ExecutorSlots], 0, 0, "", jvmName = "", resourceManagerContainerId = "")
-  }
-}
-
-case class ExecutorSlots(appId: Int, executorId: Int, slots: Int)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/jarstore/JarStoreService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/jarstore/JarStoreService.scala b/core/src/main/scala/io/gearpump/jarstore/JarStoreService.scala
deleted file mode 100644
index 54d5431..0000000
--- a/core/src/main/scala/io/gearpump/jarstore/JarStoreService.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.jarstore
-
-import java.io.File
-import java.net.URI
-import java.util.ServiceLoader
-import scala.collection.JavaConverters._
-
-import akka.actor.ActorSystem
-import com.typesafe.config.Config
-
-import io.gearpump.util.{Constants, Util}
-
-case class FilePath(path: String)
-
-/**
- * JarStoreService is used to manage the upload/download of binary files,
- * like user submitted application jar.
- */
-trait JarStoreService {
-  /**
-   * The scheme of the JarStoreService.
-   * Like "hdfs" for HDFS file system, and "file" for a local
-   * file system.
-   */
-  val scheme: String
-
-  /**
-   * Init the Jar Store.
-   */
-  def init(config: Config, system: ActorSystem)
-
-  /**
-   * This function will copy the local file to the remote JarStore, called from client side.
-   * @param localFile The local file
-   */
-  def copyFromLocal(localFile: File): FilePath
-
-  /**
-   * This function will copy the remote file to local file system, called from client side.
-   *
-   * @param localFile The destination of file path
-   * @param remotePath The remote file path from JarStore
-   */
-  def copyToLocalFile(localFile: File, remotePath: FilePath)
-}
-
-object JarStoreService {
-
-  /**
-   * Get a active JarStoreService by specifying a scheme.
-   *
-   * Please see config [[io.gearpump.util.Constants#GEARPUMP_APP_JAR_STORE_ROOT_PATH]] for more
-   * information.
-   */
-  def get(config: Config): JarStoreService = {
-    val jarStoreRootPath = config.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH)
-    get(jarStoreRootPath)
-  }
-
-  private lazy val jarstoreServices: List[JarStoreService] = {
-    ServiceLoader.load(classOf[JarStoreService]).asScala.toList
-  }
-
-  private def get(rootPath: String): JarStoreService = {
-    val scheme = new URI(Util.resolvePath(rootPath)).getScheme
-    jarstoreServices.find(_.scheme == scheme).get
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/metrics/AkkaReporter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/metrics/AkkaReporter.scala b/core/src/main/scala/io/gearpump/metrics/AkkaReporter.scala
deleted file mode 100644
index 3a581fb..0000000
--- a/core/src/main/scala/io/gearpump/metrics/AkkaReporter.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.metrics
-
-import scala.collection.JavaConverters._
-
-import akka.actor.{ActorRef, ActorSystem}
-
-import io.gearpump.codahale.metrics.{Gauge => CodaGauge, MetricRegistry}
-import io.gearpump.metrics.Metrics.{Counter => CounterData, Gauge => GaugeData, Histogram => HistogramData, Meter => MeterData}
-import io.gearpump.metrics.MetricsReporterService.ReportTo
-import io.gearpump.util.LogUtil
-
-/**
- * A reporter class for logging metrics values to a remote actor periodically
- */
-class AkkaReporter(
-    system: ActorSystem,
-    registry: MetricRegistry)
-  extends ReportTo {
-  private val LOG = LogUtil.getLogger(getClass)
-  LOG.info("Start Metrics AkkaReporter")
-
-  override def report(to: ActorRef): Unit = {
-    val counters = registry.getCounters()
-    val histograms = registry.getHistograms()
-    val meters = registry.getMeters()
-    val gauges = registry.getGauges()
-
-    counters.entrySet().asScala.foreach { pair =>
-      to ! CounterData(pair.getKey, pair.getValue.getCount)
-    }
-
-    histograms.entrySet().asScala.foreach { pair =>
-      val key = pair.getKey
-      val value = pair.getValue
-      val s = value.getSnapshot
-      to ! HistogramData(
-        key, s.getMean, s.getStdDev, s.getMedian,
-        s.get95thPercentile, s.get99thPercentile, s.get999thPercentile)
-    }
-
-    meters.entrySet().asScala.foreach { pair =>
-      val key = pair.getKey
-      val value = pair.getValue
-      to ! MeterData(key,
-        value.getCount,
-        value.getMeanRate,
-        value.getOneMinuteRate,
-        getRateUnit)
-    }
-
-    gauges.entrySet().asScala.foreach { kv =>
-      val value = kv.getValue.asInstanceOf[CodaGauge[Number]].getValue.longValue()
-      to ! GaugeData(kv.getKey, value)
-    }
-  }
-
-  private def getRateUnit: String = {
-    "events/s"
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/metrics/Counter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/metrics/Counter.scala b/core/src/main/scala/io/gearpump/metrics/Counter.scala
deleted file mode 100644
index 70c7bae..0000000
--- a/core/src/main/scala/io/gearpump/metrics/Counter.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.metrics
-
-import io.gearpump.codahale.metrics.{Counter => CodaHaleCounter}
-
-/**
- * @see io.gearpump.codahale.metrics.Counter
- */
-class Counter(val name: String, counter: CodaHaleCounter, sampleRate: Int = 1) {
-  private var sampleCount = 0L
-  private var toBeIncremented = 0L
-
-  def inc() {
-    inc(1)
-  }
-
-  def inc(n: Long) {
-    toBeIncremented += n
-    sampleCount += 1
-    if (null != counter && sampleCount % sampleRate == 0) {
-      counter.inc(toBeIncremented)
-      toBeIncremented = 0
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/metrics/Histogram.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/metrics/Histogram.scala b/core/src/main/scala/io/gearpump/metrics/Histogram.scala
deleted file mode 100644
index 4673050..0000000
--- a/core/src/main/scala/io/gearpump/metrics/Histogram.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.metrics
-
-import io.gearpump.codahale.metrics.{Histogram => CodaHaleHistogram}
-
-/**
- * @see io.gearpump.codahale.metrics.Histogram
- */
-class Histogram(val name: String, histogram: CodaHaleHistogram, sampleRate: Int = 1) {
-  private var sampleCount = 0L
-
-  def update(value: Long) {
-    sampleCount += 1
-    if (null != histogram && sampleCount % sampleRate == 0) {
-      histogram.update(value)
-    }
-  }
-
-  def getMean(): Double = {
-    histogram.getSnapshot.getMean
-  }
-
-  def getStdDev(): Double = {
-    histogram.getSnapshot.getStdDev
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/metrics/JvmMetricsSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/metrics/JvmMetricsSet.scala b/core/src/main/scala/io/gearpump/metrics/JvmMetricsSet.scala
deleted file mode 100644
index 28d420a..0000000
--- a/core/src/main/scala/io/gearpump/metrics/JvmMetricsSet.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.metrics
-
-import java.util
-import scala.collection.JavaConverters._
-
-import io.gearpump.codahale.metrics.jvm.{MemoryUsageGaugeSet, ThreadStatesGaugeSet}
-import io.gearpump.codahale.metrics.{Metric, MetricSet}
-
-class JvmMetricsSet(name: String) extends MetricSet {
-
-  override def getMetrics: util.Map[String, Metric] = {
-    val memoryMetrics = new MemoryUsageGaugeSet().getMetrics.asScala
-    val threadMetrics = new ThreadStatesGaugeSet().getMetrics.asScala
-    Map(
-      s"$name:memory.total.used" -> memoryMetrics("total.used"),
-      s"$name:memory.total.committed" -> memoryMetrics("total.committed"),
-      s"$name:memory.total.max" -> memoryMetrics("total.max"),
-      s"$name:memory.heap.used" -> memoryMetrics("heap.used"),
-      s"$name:memory.heap.committed" -> memoryMetrics("heap.committed"),
-      s"$name:memory.heap.max" -> memoryMetrics("heap.max"),
-      s"$name:thread.count" -> threadMetrics("count"),
-      s"$name:thread.daemon.count" -> threadMetrics("daemon.count")
-    ).asJava
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/metrics/Meter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/metrics/Meter.scala b/core/src/main/scala/io/gearpump/metrics/Meter.scala
deleted file mode 100644
index ca79a37..0000000
--- a/core/src/main/scala/io/gearpump/metrics/Meter.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.metrics
-
-import io.gearpump.codahale.metrics.{Meter => CodaHaleMeter}
-
-/** See io.gearpump.codahale.metrics.Meter */
-class Meter(val name: String, meter: CodaHaleMeter, sampleRate: Int = 1) {
-  private var sampleCount = 0L
-  private var toBeMarked = 0L
-
-  def mark() {
-    meter.mark(1)
-  }
-
-  def mark(n: Long) {
-    toBeMarked += n
-    sampleCount += 1
-    if (null != meter && sampleCount % sampleRate == 0) {
-      meter.mark(toBeMarked)
-      toBeMarked = 0
-    }
-  }
-
-  def getOneMinuteRate(): Double = {
-    meter.getOneMinuteRate
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/metrics/Metrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/metrics/Metrics.scala b/core/src/main/scala/io/gearpump/metrics/Metrics.scala
deleted file mode 100644
index aad1af0..0000000
--- a/core/src/main/scala/io/gearpump/metrics/Metrics.scala
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.metrics
-
-import scala.collection.JavaConverters._
-
-import akka.actor._
-import org.slf4j.Logger
-
-import io.gearpump.codahale.metrics._
-import io.gearpump.metrics
-import io.gearpump.util.LogUtil
-
-/** Metric objects registry */
-class Metrics(sampleRate: Int) extends Extension {
-
-  val registry = new MetricRegistry()
-
-  def meter(name: String): metrics.Meter = {
-    new metrics.Meter(name, registry.meter(name), sampleRate)
-  }
-
-  def histogram(name: String): Histogram = {
-    new Histogram(name, registry.histogram(name), sampleRate)
-  }
-
-  def histogram(name: String, sampleRate: Int): Histogram = {
-    new Histogram(name, registry.histogram(name), sampleRate)
-  }
-
-  def counter(name: String): Counter = {
-    new Counter(name, registry.counter(name), sampleRate)
-  }
-
-  def register(set: MetricSet): Unit = {
-    val names = registry.getNames
-    val metrics = set.getMetrics.asScala.filterKeys { key => !names.contains(key) }
-    metrics.foreach { kv =>
-      registry.register(kv._1, kv._2)
-    }
-  }
-}
-
-object Metrics extends ExtensionId[Metrics] with ExtensionIdProvider {
-
-  val LOG: Logger = LogUtil.getLogger(getClass)
-  import io.gearpump.util.Constants._
-
-  sealed trait MetricType {
-    def name: String
-  }
-
-  object MetricType {
-    def unapply(obj: MetricType): Option[(Histogram, Counter, Meter, Timer, Gauge)] = {
-      obj match {
-        case x: Histogram => Some((x, null, null, null, null))
-        case x: Counter => Some((null, x, null, null, null))
-        case x: Meter => Some((null, null, x, null, null))
-        case x: Timer => Some((null, null, null, x, null))
-        case g: Gauge => Some((null, null, null, null, g))
-      }
-    }
-
-    def apply(h: Histogram, c: Counter, m: Meter, t: Timer, g: Gauge): MetricType = {
-      val result =
-        if (h != null) h
-        else if (c != null) c
-        else if (m != null) m
-        else if (t != null) t
-        else if (g != null) g
-        else null
-      result
-    }
-  }
-
-  case class Histogram (
-      name: String, mean: Double,
-      stddev: Double, median: Double,
-      p95: Double, p99: Double, p999: Double)
-    extends MetricType
-
-  case class Counter(name: String, value: Long) extends MetricType
-
-  case class Meter(
-      name: String, count: Long, meanRate: Double,
-      m1: Double, rateUnit: String)
-    extends MetricType
-
-  case class Timer(
-      name: String, count: Long, min: Double, max: Double,
-      mean: Double, stddev: Double, median: Double,
-      p75: Double, p95: Double, p98: Double,
-      p99: Double, p999: Double, meanRate: Double,
-      m1: Double, m5: Double, m15: Double,
-      rateUnit: String, durationUnit: String)
-    extends MetricType
-
-  case class Gauge(name: String, value: Long) extends MetricType
-
-  case object ReportMetrics
-
-  case class DemandMoreMetrics(subscriber: ActorRef)
-
-  override def get(system: ActorSystem): Metrics = super.get(system)
-
-  override def lookup: ExtensionId[Metrics] = Metrics
-
-  override def createExtension(system: ExtendedActorSystem): Metrics = {
-    val metricsEnabled = system.settings.config.getBoolean(GEARPUMP_METRIC_ENABLED)
-    LOG.info(s"Metrics is enabled...,  $metricsEnabled")
-    val sampleRate = system.settings.config.getInt(GEARPUMP_METRIC_SAMPLE_RATE)
-    if (metricsEnabled) {
-      val meters = new Metrics(sampleRate)
-      meters
-    } else {
-      new DummyMetrics
-    }
-  }
-
-  class DummyMetrics extends Metrics(1) {
-    override def register(set: MetricSet): Unit = Unit
-
-    private val meter = new metrics.Meter("", null) {
-      override def mark(): Unit = Unit
-      override def mark(n: Long): Unit = Unit
-      override def getOneMinuteRate(): Double = 0
-    }
-
-    private val histogram = new metrics.Histogram("", null) {
-      override def update(value: Long): Unit = Unit
-      override def getMean(): Double = 0
-      override def getStdDev(): Double = 0
-    }
-
-    private val counter = new metrics.Counter("", null) {
-      override def inc(): Unit = Unit
-      override def inc(n: Long): Unit = Unit
-    }
-
-    override def meter(name: String): metrics.Meter = meter
-    override def histogram(name: String): metrics.Histogram = histogram
-    override def counter(name: String): metrics.Counter = counter
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/metrics/MetricsAggregator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/metrics/MetricsAggregator.scala b/core/src/main/scala/io/gearpump/metrics/MetricsAggregator.scala
deleted file mode 100644
index f52a060..0000000
--- a/core/src/main/scala/io/gearpump/metrics/MetricsAggregator.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.metrics
-
-import io.gearpump.cluster.MasterToClient.HistoryMetricsItem
-
-/**
- * Aggregates a larger set of metrics into a smaller set
- *
- * Sub Class must implement a constructor with signature like this:
- * MetricsAggregator(config: Config)
- */
-trait MetricsAggregator {
-  def aggregate(options: Map[String, String], inputs: Iterator[HistoryMetricsItem])
-  : List[HistoryMetricsItem]
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/metrics/MetricsReporterService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/metrics/MetricsReporterService.scala b/core/src/main/scala/io/gearpump/metrics/MetricsReporterService.scala
deleted file mode 100644
index 05decdd..0000000
--- a/core/src/main/scala/io/gearpump/metrics/MetricsReporterService.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.metrics
-
-import java.net.InetSocketAddress
-import java.util.concurrent.TimeUnit
-import scala.concurrent.duration._
-
-import akka.actor.{Actor, ActorRef}
-
-import io.gearpump.codahale.metrics.graphite.{Graphite, GraphiteReporter}
-import io.gearpump.codahale.metrics.{MetricFilter, Slf4jReporter}
-import io.gearpump.metrics.Metrics.{DemandMoreMetrics, ReportMetrics}
-import io.gearpump.metrics.MetricsReporterService.ReportTo
-import io.gearpump.util.Constants._
-import io.gearpump.util.LogUtil
-
-/**
- * Reports the metrics data to some where, like Ganglia, remote Akka actor, log files...
- *
- * @param metrics Holds a list of metrics object.
- */
-class MetricsReporterService(metrics: Metrics) extends Actor {
-
-  private val LOG = LogUtil.getLogger(getClass)
-  private implicit val system = context.system
-
-  private val reportInterval = system.settings.config.getInt(GEARPUMP_METRIC_REPORT_INTERVAL)
-  private val reporter = getReporter
-  implicit val dispatcher = context.dispatcher
-
-  def receive: Receive = {
-    // The subscriber is demanding more messages.
-    case DemandMoreMetrics(subscriber) => {
-      reporter.report(subscriber)
-      context.system.scheduler.scheduleOnce(reportInterval.milliseconds,
-        subscriber, ReportMetrics)
-    }
-  }
-
-  def startGraphiteReporter(): ReportTo = {
-    val graphiteHost = system.settings.config.getString(GEARPUMP_METRIC_GRAPHITE_HOST)
-    val graphitePort = system.settings.config.getInt(GEARPUMP_METRIC_GRAPHITE_PORT)
-
-    val graphite = new Graphite(new InetSocketAddress(graphiteHost, graphitePort))
-    LOG.info(s"reporting to $graphiteHost, $graphitePort")
-    new ReportTo {
-      private val reporter = GraphiteReporter.forRegistry(metrics.registry)
-        .convertRatesTo(TimeUnit.SECONDS)
-        .convertDurationsTo(TimeUnit.MILLISECONDS)
-        .filter(MetricFilter.ALL)
-        .build(graphite)
-
-      override def report(to: ActorRef): Unit = reporter.report()
-    }
-  }
-
-  def startSlf4jReporter(): ReportTo = {
-    new ReportTo {
-      val reporter = Slf4jReporter.forRegistry(metrics.registry)
-        .convertRatesTo(TimeUnit.SECONDS)
-        .convertDurationsTo(TimeUnit.MILLISECONDS)
-        .filter(MetricFilter.ALL)
-        .outputTo(LOG)
-        .build()
-
-      override def report(to: ActorRef): Unit = reporter.report()
-    }
-  }
-
-  def startAkkaReporter(): ReportTo = {
-    new AkkaReporter(system, metrics.registry)
-  }
-
-  def getReporter: ReportTo = {
-    val reporterType = system.settings.config.getString(GEARPUMP_METRIC_REPORTER)
-    LOG.info(s"Metrics reporter is enabled, using $reporterType reporter")
-    val reporter = reporterType match {
-      case "graphite" => startGraphiteReporter()
-      case "logfile" => startSlf4jReporter()
-      case "akka" => startAkkaReporter()
-    }
-    reporter
-  }
-}
-
-object MetricsReporterService {
-
-  /** Target where user want to report the metrics data to */
-  trait ReportTo {
-    def report(to: ActorRef): Unit
-  }
-}
\ No newline at end of file