You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/04/26 11:42:25 UTC
[15/49] incubator-gearpump git commit: GEARPUMP-8,
fix "two machines can possibly have same worker id for single-master
cluster"
GEARPUMP-8, fix "two machines can possibly have same worker id for single-master cluster"
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/e7a7f542
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/e7a7f542
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/e7a7f542
Branch: refs/heads/master
Commit: e7a7f54272a263eebb156510abf88616088dff34
Parents: 02597f7
Author: Sean Zhong <cl...@gmail.com>
Authored: Sat Apr 2 12:34:19 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Tue Apr 26 14:24:43 2016 +0800
----------------------------------------------------------------------
.gitignore | 1 +
.sbtopts | 1 +
conf/gear.conf | 2 +-
.../io/gearpump/cluster/ClusterMessage.scala | 10 ++---
.../cluster/appmaster/ExecutorSystem.scala | 3 +-
.../appmaster/ExecutorSystemScheduler.scala | 3 +-
.../cluster/master/AppMasterLauncher.scala | 6 +--
.../gearpump/cluster/master/MasterSummary.scala | 6 ++-
.../gearpump/cluster/scheduler/Resource.scala | 5 ++-
.../gearpump/cluster/worker/WorkerSummary.scala | 5 ++-
core/src/main/scala/io/gearpump/package.scala | 44 ++++++++++++++++++++
.../main/scala/io/gearpump/util/ActorUtil.scala | 3 +-
.../appmaster/ExecutorSystemLauncherSpec.scala | 3 +-
.../appmaster/ExecutorSystemSchedulerSpec.scala | 5 ++-
.../cluster/master/AppMasterLauncherSpec.scala | 9 ++--
.../io/gearpump/cluster/DaemonMessage.scala | 7 ++--
.../io/gearpump/cluster/master/Master.scala | 14 +++++--
.../cluster/scheduler/PriorityScheduler.scala | 3 +-
.../gearpump/cluster/scheduler/Scheduler.scala | 4 +-
.../io/gearpump/cluster/worker/Worker.scala | 3 +-
.../io/gearpump/cluster/main/MainSpec.scala | 11 -----
.../scheduler/PrioritySchedulerSpec.scala | 29 ++++++-------
.../io/gearpump/cluster/worker/WorkerSpec.scala | 3 +-
docs/dev-rest-api.md | 14 +++----
.../DistShellAppMasterSpec.scala | 5 ++-
.../distributedshell/ShellExecutorSpec.scala | 3 +-
.../DistServiceAppMasterSpec.scala | 5 ++-
.../checklist/RestServiceSpec.scala | 2 +-
.../minicluster/BaseContainer.scala | 2 +-
.../minicluster/MiniCluster.scala | 17 ++++----
.../minicluster/RestClient.scala | 10 +++--
.../integrationtest/storm/StormClient.scala | 2 +-
project/Build.scala | 4 +-
.../io/gearpump/services/AdminService.scala | 2 +-
.../io/gearpump/services/AppMasterService.scala | 2 +-
.../io/gearpump/services/MasterService.scala | 2 +-
.../io/gearpump/services/SecurityService.scala | 2 +
.../io/gearpump/services/StaticService.scala | 1 +
.../gearpump/services/SupervisorService.scala | 6 ++-
.../io/gearpump/services/WorkerService.scala | 7 +++-
.../io/gearpump/services/util/UpickleUtil.scala | 11 +++++
.../io/gearpump/services/AdminServiceSpec.scala | 2 +-
.../services/AppMasterServiceSpec.scala | 2 +-
.../gearpump/services/MasterServiceSpec.scala | 6 ++-
.../gearpump/services/SecurityServiceSpec.scala | 1 +
.../gearpump/services/StaticServiceSpec.scala | 1 +
.../gearpump/services/WorkerServiceSpec.scala | 12 +++---
.../streaming/appmaster/AppMaster.scala | 7 ++--
.../streaming/appmaster/ExecutorManager.scala | 3 +-
.../streaming/appmaster/JarScheduler.scala | 6 +--
.../streaming/appmaster/TaskLocator.scala | 9 ++--
.../streaming/appmaster/TaskSchedulerImpl.scala | 20 ++++-----
.../gearpump/streaming/executor/Executor.scala | 5 ++-
.../streaming/appmaster/AppMasterSpec.scala | 8 ++--
.../appmaster/ExecutorManagerSpec.scala | 6 +--
.../streaming/appmaster/JarSchedulerSpec.scala | 5 ++-
.../streaming/appmaster/TaskLocatorSpec.scala | 3 +-
.../streaming/appmaster/TaskManagerSpec.scala | 6 +--
.../streaming/appmaster/TaskSchedulerSpec.scala | 26 ++++++------
.../streaming/executor/ExecutorSpec.scala | 3 +-
60 files changed, 251 insertions(+), 157 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 0932561..1efaafc 100644
--- a/.gitignore
+++ b/.gitignore
@@ -33,6 +33,7 @@ out/
.idea_modules/
atlassian-ide-plugin.xml
com_crashlytics_export_strings.xml
+intellij
# Eclipse
.metadata
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/.sbtopts
----------------------------------------------------------------------
diff --git a/.sbtopts b/.sbtopts
index 161b349..f6f24bf 100644
--- a/.sbtopts
+++ b/.sbtopts
@@ -1,3 +1,4 @@
-J-XX:+CMSClassUnloadingEnabled
-J-XX:+UseConcMarkSweepGC
-J-Xss6M
+-J-XX:MaxMetaspaceSize=512m
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/conf/gear.conf
----------------------------------------------------------------------
diff --git a/conf/gear.conf b/conf/gear.conf
index e7f14af..c2352fe 100644
--- a/conf/gear.conf
+++ b/conf/gear.conf
@@ -547,4 +547,4 @@ gearpump-linux {
### On windows, the value must be larger than 10ms, check
### https://github.com/akka/akka/blob/master/akka-actor/src/main/scala/akka/actor/Scheduler.scala#L204
akka.scheduler.tick-duration = 1
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala b/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala
index 2280920..aac45ab 100644
--- a/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala
+++ b/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala
@@ -20,7 +20,7 @@ package io.gearpump.cluster
import akka.actor.ActorRef
import com.typesafe.config.Config
-import io.gearpump.TimeStamp
+import io.gearpump.{WorkerId, TimeStamp}
import io.gearpump.cluster.MasterToAppMaster.AppMasterStatus
import io.gearpump.cluster.master.MasterSummary
import io.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest}
@@ -53,7 +53,7 @@ object ClientToMaster {
case class ShutdownApplication(appId: Int)
case class ResolveAppId(appId: Int)
- case class ResolveWorkerId(workerId: Int)
+ case class ResolveWorkerId(workerId: WorkerId)
case object GetJarStoreServer
@@ -61,7 +61,7 @@ object ClientToMaster {
case class QueryAppMasterConfig(appId: Int)
- case class QueryWorkerConfig(workerId: Int)
+ case class QueryWorkerConfig(workerId: WorkerId)
case object QueryMasterConfig
@@ -144,7 +144,7 @@ object AppMasterToMaster {
extends AppMasterSummary
case object GetAllWorkers
- case class GetWorkerData(workerId: Int)
+ case class GetWorkerData(workerId: WorkerId)
case class WorkerData(workerDescription: WorkerSummary)
case object GetMasterData
@@ -181,7 +181,7 @@ object MasterToAppMaster {
case class ReplayFromTimestampWindowTrailingEdge(appId: Int)
- case class WorkerList(workers: List[Int])
+ case class WorkerList(workers: List[WorkerId])
}
object AppMasterToWorker {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala
index 94f7aa4..400b61c 100644
--- a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala
+++ b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala
@@ -19,10 +19,11 @@
package io.gearpump.cluster.appmaster
import akka.actor.{ActorRef, Address, PoisonPill}
+import io.gearpump.WorkerId
import io.gearpump.cluster.scheduler.Resource
import io.gearpump.util.ActorSystemBooter.BindLifeCycle
-case class WorkerInfo(workerId: Int, ref: ActorRef)
+case class WorkerInfo(workerId: WorkerId, ref: ActorRef)
/**
* This contains JVM configurations to start an executor system
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala
index 6af68eb..aa980b5 100644
--- a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala
+++ b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala
@@ -20,6 +20,7 @@ package io.gearpump.cluster.appmaster
import akka.actor._
import com.typesafe.config.Config
+import io.gearpump.WorkerId
import io.gearpump.cluster.AppMasterToMaster.RequestResource
import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated
import io.gearpump.cluster._
@@ -105,7 +106,7 @@ class ExecutorSystemScheduler (appId: Int, masterProxy: ActorRef,
case LaunchExecutorSystemRejected(resource, reason, session) =>
if (isSessionAlive(session)) {
LOG.error(s"Failed to launch executor system, due to $reason, will ask master to allocate new resources $resource")
- resourceAgents.get(session).map(_ ! RequestResource(appId, ResourceRequest(resource)))
+ resourceAgents.get(session).map(_ ! RequestResource(appId, ResourceRequest(resource, WorkerId.unspecified)))
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala b/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala
index ffd1835..d391bba 100644
--- a/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala
+++ b/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala
@@ -40,7 +40,7 @@ import org.slf4j.Logger
import scala.collection.JavaConverters._
import scala.concurrent.duration.Duration
import scala.util.{Failure, Success}
-
+import io.gearpump.WorkerId
/**
*
* AppMasterLauncher is a child Actor of AppManager, it is responsible
@@ -59,7 +59,7 @@ class AppMasterLauncher(
val appMasterAkkaConfig: Config = app.clusterConfig
LOG.info(s"Ask Master resource to start AppMaster $appId...")
- master ! RequestResource(appId, ResourceRequest(Resource(1)))
+ master ! RequestResource(appId, ResourceRequest(Resource(1), WorkerId.unspecified))
def receive : Receive = waitForResourceAllocation
@@ -91,7 +91,7 @@ class AppMasterLauncher(
case ExecutorLaunchRejected(reason, ex) =>
LOG.error(s"Executor Launch failed reason:$reason", ex)
LOG.info(s"reallocate resource $resource to start appmaster")
- master ! RequestResource(appId, ResourceRequest(resource))
+ master ! RequestResource(appId, ResourceRequest(resource, WorkerId.unspecified))
context.become(waitForResourceAllocation)
case RegisterActorSystem(systemPath) =>
LOG.info(s"Received RegisterActorSystem $systemPath for AppMaster")
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala b/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala
index 39e4a41..8847df2 100644
--- a/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala
+++ b/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala
@@ -26,7 +26,11 @@ object MasterStatus {
}
-case class MasterNode(host: String, port: Int)
+case class MasterNode(host: String, port: Int) {
+ def toTuple: (String, Int) = {
+ (host, port)
+ }
+}
/**
* Master information for REST API call
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala b/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala
index 94c7532..da17829 100644
--- a/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala
+++ b/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala
@@ -18,6 +18,7 @@
package io.gearpump.cluster.scheduler
import akka.actor.ActorRef
+import io.gearpump.WorkerId
case class Resource(slots : Int) {
def +(other : Resource): Resource = Resource(slots + other.slots)
@@ -51,9 +52,9 @@ object Relaxation extends Enumeration{
import Relaxation._
import Priority._
-case class ResourceRequest(resource: Resource, workerId: Int = 0, priority: Priority = NORMAL, relaxation: Relaxation = ANY, executorNum: Int = 1)
+case class ResourceRequest(resource: Resource, workerId: WorkerId, priority: Priority = NORMAL, relaxation: Relaxation = ANY, executorNum: Int = 1)
-case class ResourceAllocation(resource : Resource, worker : ActorRef, workerId : Int)
+case class ResourceAllocation(resource : Resource, worker : ActorRef, workerId : WorkerId)
object Resource {
def empty = new Resource(0)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala b/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala
index b1f9e6f..ca700dc 100644
--- a/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala
+++ b/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala
@@ -18,13 +18,14 @@
package io.gearpump.cluster.worker
import akka.actor.ActorRef
+import io.gearpump.WorkerId
import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
/**
* Worker summary information for REST API.
*/
case class WorkerSummary(
- workerId: Int,
+ workerId: WorkerId,
state: String,
actorPath: String,
aliveFor: Long,
@@ -39,7 +40,7 @@ case class WorkerSummary(
historyMetricsConfig: HistoryMetricsConfig = null)
object WorkerSummary{
- def empty = WorkerSummary(-1, "", "", 0L, "", Array.empty[ExecutorSlots], 0, 0, "", jvmName = "", resourceManagerContainerId = "")
+ def empty = WorkerSummary(WorkerId.unspecified, "", "", 0L, "", Array.empty[ExecutorSlots], 0, 0, "", jvmName = "", resourceManagerContainerId = "")
}
case class ExecutorSlots(appId: Int, executorId: Int, slots: Int)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/core/src/main/scala/io/gearpump/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/package.scala b/core/src/main/scala/io/gearpump/package.scala
index 05130a9..1ed94a7 100644
--- a/core/src/main/scala/io/gearpump/package.scala
+++ b/core/src/main/scala/io/gearpump/package.scala
@@ -3,4 +3,48 @@ package io
package object gearpump {
type TimeStamp = Long
val LatestTime = -1
+
+ /**
+ * WorkerId is used to uniquely track a worker machine.
+ *
+ * @param sessionId sessionId is assigned by Master node for easy tracking. It is possible that
+ * sessionId is **NOT** unique, so always use WorkerId for comparison.
+ * @param registerTime the timestamp when a worker node register itself to master node
+ */
+ case class WorkerId(sessionId: Int, registerTime: Long)
+
+ object WorkerId {
+ val unspecified: WorkerId = new WorkerId(-1, 0L)
+
+ def render(workerId: WorkerId): String = {
+ workerId.registerTime + "_" + workerId.sessionId
+ }
+
+ def parse(str: String): WorkerId = {
+ val pair = str.split("_")
+ new WorkerId(pair(1).toInt, pair(0).toLong)
+ }
+
+ implicit val workerIdOrdering: Ordering[WorkerId] = {
+ new Ordering[WorkerId] {
+
+ /** Compare timestamp first, then id */
+ override def compare(x: WorkerId, y: WorkerId): Int = {
+ if (x.registerTime < y.registerTime) {
+ -1
+ } else if (x.registerTime == y.registerTime) {
+ if (x.sessionId < y.sessionId) {
+ -1
+ } else if (x.sessionId == y.sessionId) {
+ 0
+ } else {
+ 1
+ }
+ } else {
+ 1
+ }
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/core/src/main/scala/io/gearpump/util/ActorUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/ActorUtil.scala b/core/src/main/scala/io/gearpump/util/ActorUtil.scala
index 8233b28..cc701d8 100644
--- a/core/src/main/scala/io/gearpump/util/ActorUtil.scala
+++ b/core/src/main/scala/io/gearpump/util/ActorUtil.scala
@@ -21,6 +21,7 @@ package io.gearpump.util
import akka.actor.Actor.Receive
import akka.actor._
import akka.pattern.ask
+import io.gearpump.WorkerId
import io.gearpump.cluster.AppMasterToMaster.GetAllWorkers
import io.gearpump.cluster.ClientToMaster.{ResolveAppId, ResolveWorkerId}
import io.gearpump.cluster.MasterToAppMaster.WorkerList
@@ -110,7 +111,7 @@ object ActorUtil {
appmaster.flatMap(askActor[T](_, msg))
}
- def askWorker[T](master: ActorRef, workerId: Int, msg: Any)(implicit ex: ExecutionContext): Future[T] = {
+ def askWorker[T](master: ActorRef, workerId: WorkerId, msg: Any)(implicit ex: ExecutionContext): Future[T] = {
implicit val timeout = Constants.FUTURE_TIMEOUT
val worker = askActor[ResolveWorkerIdResult](master, ResolveWorkerId(workerId)).flatMap { result =>
if (result.worker.isSuccess) {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala b/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala
index d0fded1..dc8e624 100644
--- a/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala
+++ b/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala
@@ -21,6 +21,7 @@ package io.gearpump.cluster.appmaster
import akka.actor.{ActorSystem, Props}
import akka.testkit.TestProbe
import com.typesafe.config.ConfigValueFactory
+import io.gearpump.WorkerId
import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor
import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor
import io.gearpump.cluster.TestUtil
@@ -36,7 +37,7 @@ import scala.concurrent.duration._
class ExecutorSystemLauncherSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
implicit var system: ActorSystem = null
- val workerId = 0
+ val workerId: WorkerId = WorkerId(0, 0L)
val appId = 0
val executorId = 0
val url = "akka.tcp://worker@127.0.0.1:3000"
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala b/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala
index d487582..106d389 100644
--- a/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala
+++ b/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala
@@ -20,6 +20,7 @@ package io.gearpump.cluster.appmaster
import akka.actor.{Actor, ActorSystem, Props}
import akka.testkit.TestProbe
+import io.gearpump.WorkerId
import io.gearpump.cluster.AppMasterToMaster.RequestResource
import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated
import io.gearpump.cluster.{ExecutorJVMConfig, AppJar, TestUtil}
@@ -34,9 +35,9 @@ import scala.concurrent.duration._
class ExecutorSystemSchedulerSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
val appId = 0
- val workerId = 0
+ val workerId = WorkerId(0, 0L)
val resource = Resource(1)
- val resourceRequest = ResourceRequest(resource)
+ val resourceRequest = ResourceRequest(resource, WorkerId.unspecified)
val mockJar = AppJar("for_test", FilePath("PATH"))
val emptyJvmConfig = ExecutorSystemJvmConfig(Array.empty, Array.empty, Some(mockJar), "")
val start = StartExecutorSystems(Array(resourceRequest), emptyJvmConfig)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala b/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala
index 8db0e0b..8ff2ea1 100644
--- a/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala
+++ b/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala
@@ -20,6 +20,7 @@ package io.gearpump.cluster.master
import akka.actor._
import akka.testkit.TestProbe
+import io.gearpump.WorkerId
import io.gearpump.cluster.AppMasterToMaster.RequestResource
import io.gearpump.cluster.AppMasterToWorker.{ShutdownExecutor, LaunchExecutor}
import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
@@ -53,8 +54,8 @@ class AppMasterLauncherSpec extends FlatSpec with Matchers with BeforeAndAfterEa
appMasterLauncher = getActorSystem.actorOf(AppMasterLauncher.props(appId, executorId,
TestUtil.dummyApp, None, "username", master.ref, Some(client.ref)))
watcher watch appMasterLauncher
- master.expectMsg(RequestResource(appId, ResourceRequest(Resource(1))))
- val resource = ResourceAllocated(Array(ResourceAllocation(Resource(1), worker.ref, 0)))
+ master.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), WorkerId.unspecified)))
+ val resource = ResourceAllocated(Array(ResourceAllocation(Resource(1), worker.ref, WorkerId(0, 0L))))
master.reply(resource)
worker.expectMsgType[LaunchExecutor]
}
@@ -76,9 +77,9 @@ class AppMasterLauncherSpec extends FlatSpec with Matchers with BeforeAndAfterEa
"AppMasterLauncher" should "reallocate resource if executor launch rejected" in {
worker.reply(ExecutorLaunchRejected(""))
- master.expectMsg(RequestResource(appId, ResourceRequest(Resource(1))))
+ master.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), WorkerId.unspecified)))
- val resource = ResourceAllocated(Array(ResourceAllocation(Resource(1), worker.ref, 0)))
+ val resource = ResourceAllocated(Array(ResourceAllocation(Resource(1), worker.ref, WorkerId(0, 0L))))
master.reply(resource)
worker.expectMsgType[LaunchExecutor]
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala b/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala
index 19ac620..ac942ed 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala
@@ -18,6 +18,7 @@
package io.gearpump.cluster
import akka.actor.ActorRef
+import io.gearpump.WorkerId
import io.gearpump.cluster.master.Master.MasterInfo
import io.gearpump.cluster.scheduler.Resource
@@ -26,12 +27,12 @@ import io.gearpump.cluster.scheduler.Resource
*/
object WorkerToMaster {
case object RegisterNewWorker
- case class RegisterWorker(workerId: Int)
- case class ResourceUpdate(worker: ActorRef, workerId: Int, resource: Resource)
+ case class RegisterWorker(workerId: WorkerId)
+ case class ResourceUpdate(worker: ActorRef, workerId: WorkerId, resource: Resource)
}
object MasterToWorker {
- case class WorkerRegistered(workerId : Int, masterInfo: MasterInfo)
+ case class WorkerRegistered(workerId : WorkerId, masterInfo: MasterInfo)
case class UpdateResourceFailed(reason : String = null, ex: Throwable = null)
case object UpdateResourceSucceed
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala b/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala
index 8db6758..fdef42e 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala
@@ -40,6 +40,8 @@ import io.gearpump.transport.HostPort
import io.gearpump.util.Constants._
import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
import io.gearpump.util._
+import io.gearpump.WorkerId
+
import org.apache.commons.lang.exception.ExceptionUtils
import org.slf4j.Logger
@@ -64,7 +66,7 @@ private[cluster] class Master extends Actor with Stash {
private var scheduler : ActorRef = null
- private var workers = new immutable.HashMap[ActorRef, Int]
+ private var workers = new immutable.HashMap[ActorRef, WorkerId]
private val birth = System.currentTimeMillis()
@@ -109,6 +111,8 @@ private[cluster] class Master extends Actor with Stash {
case GetKVSuccess(_, result) =>
if(result != null) {
this.nextWorkerId = result.asInstanceOf[Int]
+ } else {
+ LOG.warn("Cannot find existing state in the distributed cluster...")
}
context.become(receiveHandler)
unstashAll()
@@ -132,9 +136,11 @@ private[cluster] class Master extends Actor with Stash {
def workerMsgHandler : Receive = {
case RegisterNewWorker =>
- val workerId = nextWorkerId
+ val workerId = WorkerId(nextWorkerId, System.currentTimeMillis())
nextWorkerId += 1
kvService ! PutKV(MASTER_GROUP, WORKER_ID, nextWorkerId)
+ val workerHostname = ActorUtil.getHostname(sender())
+ LOG.info(s"Register new from $workerHostname ....")
self forward RegisterWorker(workerId)
case RegisterWorker(id) =>
@@ -143,7 +149,7 @@ private[cluster] class Master extends Actor with Stash {
scheduler forward WorkerRegistered(id, MasterInfo(self, birth))
workers += (sender() -> id)
val workerHostname = ActorUtil.getHostname(sender())
- LOG.info(s"Register Worker $id from $workerHostname ....")
+ LOG.info(s"Register Worker with id $id from $workerHostname ....")
case resourceUpdate : ResourceUpdate =>
scheduler forward resourceUpdate
}
@@ -290,7 +296,7 @@ object Master {
final val WORKER_ID = "next_worker_id"
- case class WorkerTerminated(workerId: Int)
+ case class WorkerTerminated(workerId: WorkerId)
case class MasterInfo(master: ActorRef, startTime : Long = 0L)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala b/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala
index 10e9dcb..3b1bd9f 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala
@@ -19,6 +19,7 @@
package io.gearpump.cluster.scheduler
import akka.actor.ActorRef
+import io.gearpump.WorkerId
import io.gearpump.cluster.AppMasterToMaster.RequestResource
import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated
import io.gearpump.cluster.scheduler.Relaxation._
@@ -104,7 +105,7 @@ class PriorityScheduler extends Scheduler {
resourceRequests = resourceRequests.filter(_.appId != appId)
}
- private def allocateFairly(resources: mutable.HashMap[Int, (ActorRef, Resource)], request: ResourceRequest): List[ResourceAllocation] = {
+ private def allocateFairly(resources: mutable.HashMap[WorkerId, (ActorRef, Resource)], request: ResourceRequest): List[ResourceAllocation] = {
val workerNum = resources.size
var allocations = List.empty[ResourceAllocation]
var totalAvailable = Resource(resources.values.map(_._2.slots).sum)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/daemon/src/main/scala/io/gearpump/cluster/scheduler/Scheduler.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/scheduler/Scheduler.scala b/daemon/src/main/scala/io/gearpump/cluster/scheduler/Scheduler.scala
index 36b900f..8ccf1fb 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/scheduler/Scheduler.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/scheduler/Scheduler.scala
@@ -20,7 +20,7 @@ package io.gearpump.cluster.scheduler
import akka.actor.{Actor, ActorRef}
import io.gearpump.cluster.MasterToWorker.UpdateResourceSucceed
import io.gearpump.util.LogUtil
-import io.gearpump.TimeStamp
+import io.gearpump.{WorkerId, TimeStamp}
import io.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, UpdateResourceSucceed, WorkerRegistered}
import io.gearpump.cluster.WorkerToMaster.ResourceUpdate
import io.gearpump.cluster.master.Master.WorkerTerminated
@@ -35,7 +35,7 @@ import scala.collection.mutable
*/
abstract class Scheduler extends Actor{
val LOG: Logger = LogUtil.getLogger(getClass)
- protected var resources = new mutable.HashMap[Int, (ActorRef, Resource)]
+ protected var resources = new mutable.HashMap[WorkerId, (ActorRef, Resource)]
def handleScheduleMessage : Receive = {
case WorkerRegistered(id, _) =>
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala b/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala
index 8957dff..a746b39 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala
@@ -26,6 +26,7 @@ import java.util.concurrent.{Executors, TimeUnit}
import akka.actor.SupervisorStrategy.Stop
import akka.actor._
import com.typesafe.config.{Config, ConfigFactory}
+import io.gearpump.WorkerId
import io.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData}
import io.gearpump.cluster.AppMasterToWorker._
import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig}
@@ -63,7 +64,7 @@ private[cluster] class Worker(masterProxy : ActorRef) extends Actor with TimeOut
private var resource = Resource.empty
private var allocatedResources = Map[ActorRef, Resource]()
private var executorsInfo = Map[ActorRef, ExecutorSlots]()
- private var id = -1
+ private var id: WorkerId = WorkerId.unspecified
private val createdTime = System.currentTimeMillis()
private var masterInfo: MasterInfo = null
private var executorNameToActor = Map.empty[String, ActorRef]
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/daemon/src/test/scala/io/gearpump/cluster/main/MainSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/io/gearpump/cluster/main/MainSpec.scala b/daemon/src/test/scala/io/gearpump/cluster/main/MainSpec.scala
index d412020..9dc0289 100644
--- a/daemon/src/test/scala/io/gearpump/cluster/main/MainSpec.scala
+++ b/daemon/src/test/scala/io/gearpump/cluster/main/MainSpec.scala
@@ -17,23 +17,12 @@
*/
package io.gearpump.cluster.main
-import java.util.concurrent.TimeUnit
-
-import akka.testkit.TestProbe
-import io.gearpump.cluster.ClientToMaster.ResolveAppId
import io.gearpump.cluster.MasterToAppMaster.{ReplayFromTimestampWindowTrailingEdge, AppMastersDataRequest}
-import io.gearpump.cluster.TestUtil
-import io.gearpump.cluster.main.{Local, Replay, Kill, Worker}
-import io.gearpump.cluster.master.MasterProxy
-import io.gearpump.util.{Constants, Util}
import io.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication}
import io.gearpump.cluster.MasterToAppMaster._
import io.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ReplayApplicationResult, ShutdownApplicationResult}
-import io.gearpump.cluster.MasterToWorker.WorkerRegistered
import io.gearpump.cluster.WorkerToMaster.RegisterNewWorker
-import io.gearpump.cluster.master.MasterProxy
import io.gearpump.cluster.{MasterHarness, TestUtil}
-import io.gearpump.transport.HostPort
import io.gearpump.util.Constants._
import io.gearpump.util.{Constants, LogUtil, Util}
import org.scalatest._
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala b/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
index 0a7e1f8..aa2028b 100644
--- a/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
+++ b/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
@@ -19,6 +19,7 @@ package io.gearpump.cluster.scheduler
import akka.actor.{ActorSystem, Props}
import akka.testkit.{ImplicitSender, TestKit, TestProbe}
+import io.gearpump.WorkerId
import io.gearpump.cluster.AppMasterToMaster.RequestResource
import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated
import io.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered}
@@ -36,8 +37,8 @@ class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with
def this() = this(ActorSystem("PrioritySchedulerSpec", TestUtil.DEFAULT_CONFIG))
val appId = 0
- val workerId1 = 1
- val workerId2 = 2
+ val workerId1: WorkerId = WorkerId(1, 0L)
+ val workerId2: WorkerId = WorkerId(2, 0L)
val mockAppMaster = TestProbe()
val mockWorker1 = TestProbe()
val mockWorker2 = TestProbe()
@@ -55,8 +56,8 @@ class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with
"drop application's resource requests when the application is removed" in {
val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
- val request1 = ResourceRequest(Resource(40), 0, Priority.HIGH, Relaxation.ANY)
- val request2 = ResourceRequest(Resource(20), 0, Priority.HIGH, Relaxation.ANY)
+ val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, Priority.HIGH, Relaxation.ANY)
+ val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, Priority.HIGH, Relaxation.ANY)
scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
scheduler.tell(ApplicationFinished(appId), mockAppMaster.ref)
@@ -69,9 +70,9 @@ class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with
"The resource request with higher priority" should {
"be handled first" in {
val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
- val request1 = ResourceRequest(Resource(40), 0, Priority.LOW, Relaxation.ANY)
- val request2 = ResourceRequest(Resource(20), 0, Priority.NORMAL, Relaxation.ANY)
- val request3 = ResourceRequest(Resource(30), 0, Priority.HIGH, Relaxation.ANY)
+ val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, Priority.LOW, Relaxation.ANY)
+ val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, Priority.NORMAL, Relaxation.ANY)
+ val request3 = ResourceRequest(Resource(30), WorkerId.unspecified, Priority.HIGH, Relaxation.ANY)
scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
@@ -94,8 +95,8 @@ class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with
"The resource request which delivered earlier" should {
"be handled first if the priorities are the same" in {
val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
- val request1 = ResourceRequest(Resource(40), 0, Priority.HIGH, Relaxation.ANY)
- val request2 = ResourceRequest(Resource(20), 0, Priority.HIGH, Relaxation.ANY)
+ val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, Priority.HIGH, Relaxation.ANY)
+ val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, Priority.HIGH, Relaxation.ANY)
scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
@@ -122,14 +123,14 @@ class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with
scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2))))
- val request3 = ResourceRequest(Resource(30), 0, Priority.NORMAL, Relaxation.ANY, executorNum = 2)
+ val request3 = ResourceRequest(Resource(30), WorkerId.unspecified, Priority.NORMAL, Relaxation.ANY, executorNum = 2)
scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(15), mockWorker1.ref, workerId1), ResourceAllocation(Resource(15), mockWorker2.ref, workerId2))))
//we have to manually update the resource on each worker
scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(65)), mockWorker1.ref)
scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(45)), mockWorker2.ref)
- val request4 = ResourceRequest(Resource(60), 0, Priority.NORMAL, Relaxation.ONEWORKER)
+ val request4 = ResourceRequest(Resource(60), WorkerId(0, 0L), Priority.NORMAL, Relaxation.ONEWORKER)
scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref)
mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(60), mockWorker1.ref, workerId1))))
}
@@ -144,13 +145,13 @@ class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with
scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
//By default, the request requires only one executor
- val request2 = ResourceRequest(Resource(20))
+ val request2 = ResourceRequest(Resource(20), WorkerId.unspecified)
scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
val allocations2 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
assert(allocations2.allocations.length == 1)
assert(allocations2.allocations.head.resource == Resource(20))
- val request3 = ResourceRequest(Resource(24), executorNum = 3)
+ val request3 = ResourceRequest(Resource(24), WorkerId.unspecified, executorNum = 3)
scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
val allocations3 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
assert(allocations3.allocations.length == 3)
@@ -159,7 +160,7 @@ class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with
//The total available resource can not satisfy the requirements with executor number
scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(30)), mockWorker1.ref)
scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(30)), mockWorker2.ref)
- val request4 = ResourceRequest(Resource(60), executorNum = 3)
+ val request4 = ResourceRequest(Resource(60), WorkerId.unspecified, executorNum = 3)
scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref)
val allocations4 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
assert(allocations4.allocations.length == 2)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/daemon/src/test/scala/io/gearpump/cluster/worker/WorkerSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/io/gearpump/cluster/worker/WorkerSpec.scala b/daemon/src/test/scala/io/gearpump/cluster/worker/WorkerSpec.scala
index ae73a3b..9fc4096 100644
--- a/daemon/src/test/scala/io/gearpump/cluster/worker/WorkerSpec.scala
+++ b/daemon/src/test/scala/io/gearpump/cluster/worker/WorkerSpec.scala
@@ -20,6 +20,7 @@ package io.gearpump.cluster.worker
import akka.actor.{ActorSystem, PoisonPill, Props}
import akka.testkit.TestProbe
import com.typesafe.config.ConfigFactory
+import io.gearpump.WorkerId
import io.gearpump.cluster.WorkerToMaster.RegisterNewWorker
import io.gearpump.cluster.{TestUtil, MasterHarness}
import io.gearpump.util.{ActorSystemBooter, ActorUtil, Constants}
@@ -40,7 +41,7 @@ class WorkerSpec extends WordSpec with Matchers with BeforeAndAfterEach with Mas
override def config = TestUtil.DEFAULT_CONFIG
val appId = 1
- val workerId = 1
+ val workerId: WorkerId = WorkerId(1, 0L)
val executorId = 1
var masterProxy: TestProbe = null
var mockMaster: TestProbe = null
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/docs/dev-rest-api.md
----------------------------------------------------------------------
diff --git a/docs/dev-rest-api.md b/docs/dev-rest-api.md
index 7fc718a..7b2bb6f 100644
--- a/docs/dev-rest-api.md
+++ b/docs/dev-rest-api.md
@@ -165,7 +165,7 @@ Sample Response:
```
[
{
- "workerId": 1,
+ "workerId": "1",
"state": "active",
"actorPath": "akka.tcp://master@127.0.0.1:3000/user/Worker0",
"aliveFor": "431565",
@@ -188,7 +188,7 @@ Sample Response:
"jvmName": "11788@lisa"
},
{
- "workerId": 0,
+ "workerId": "0",
"state": "active",
"actorPath": "akka.tcp://master@127.0.0.1:3000/user/Worker1",
"aliveFor": "431546",
@@ -397,7 +397,7 @@ Sample Response:
```
{
- "workerId": 0,
+ "workerId": "0",
"state": "active",
"actorPath": "akka.tcp://master@127.0.0.1:3000/user/Worker1",
"aliveFor": "831069",
@@ -738,19 +738,19 @@ Sample Response:
{
"executorId": 0,
"executor": "akka.tcp://app1system0@127.0.0.1:52240/remote/akka.tcp/app1-executor-1@127.0.0.1:52212/user/daemon/appdaemon1/$c/appmaster/executors/0#-1554950276",
- "workerId": 1,
+ "workerId": "1",
"status": "active"
},
{
"executorId": 1,
"executor": "akka.tcp://app1system1@127.0.0.1:52241/remote/akka.tcp/app1-executor-1@127.0.0.1:52212/user/daemon/appdaemon1/$c/appmaster/executors/1#928082134",
- "workerId": 0,
+ "workerId": "0",
"status": "active"
},
{
"executorId": -1,
"executor": "akka://app1-executor-1/user/daemon/appdaemon1/$c/appmaster",
- "workerId": 1,
+ "workerId": "1",
"status": "active"
}
],
@@ -1087,7 +1087,7 @@ Sample Response:
```
{
"id": 1,
- "workerId": 0,
+ "workerId": "0",
"actorPath": "akka.tcp://app1system1@127.0.0.1:52241/remote/akka.tcp/app1-executor-1@127.0.0.1:52212/user/daemon/appdaemon1/$c/appmaster/executors/1",
"logFile": "logs/",
"status": "active",
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala
----------------------------------------------------------------------
diff --git a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala b/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala
index 2a19bd2..ec6bba1 100644
--- a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala
+++ b/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala
@@ -19,6 +19,7 @@ package io.gearpump.examples.distributedshell
import akka.actor.ActorSystem
import akka.testkit.{TestActorRef, TestProbe}
+import io.gearpump.WorkerId
import io.gearpump.cluster.AppMasterToMaster.{RequestResource, GetAllWorkers, RegisterAppMaster}
import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor
import io.gearpump.cluster.MasterToAppMaster.{ResourceAllocated, WorkerList, AppMasterRegistered}
@@ -37,7 +38,7 @@ class DistShellAppMasterSpec extends WordSpec with Matchers with BeforeAndAfter{
val appId = 0
val userName = "test"
val masterExecutorId = 0
- val workerList = List(1, 2, 3)
+ val workerList = List(WorkerId(1, 0L), WorkerId(2, 0L), WorkerId(3, 0L))
val resource = Resource(1)
val appJar = None
val appDescription = AppDescription("app0", classOf[DistShellAppMaster].getName, UserConfig.empty)
@@ -57,7 +58,7 @@ class DistShellAppMasterSpec extends WordSpec with Matchers with BeforeAndAfter{
workerList.foreach { workerId =>
mockMaster.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), workerId, relaxation = Relaxation.SPECIFICWORKER)))
}
- mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(resource, mockWorker1.ref, 1))))
+ mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(resource, mockWorker1.ref, WorkerId(1, 0L)))))
mockWorker1.expectMsgClass(classOf[LaunchExecutor])
mockWorker1.reply(RegisterActorSystem(ActorUtil.getSystemAddress(system).toString))
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellExecutorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellExecutorSpec.scala b/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellExecutorSpec.scala
index 6eceb16..d51880b 100644
--- a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellExecutorSpec.scala
+++ b/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellExecutorSpec.scala
@@ -19,6 +19,7 @@ package io.gearpump.examples.distributedshell
import akka.actor.{ActorSystem, Props}
import akka.testkit.TestProbe
+import io.gearpump.WorkerId
import io.gearpump.examples.distributedshell.DistShellAppMaster.ShellCommandResult
import io.gearpump.cluster.appmaster.WorkerInfo
import io.gearpump.cluster.scheduler.Resource
@@ -34,7 +35,7 @@ class ShellExecutorSpec extends WordSpec with Matchers {
"ShellExecutor" should {
"execute the shell command and return the result" in {
val executorId = 1
- val workerId = 2
+ val workerId = WorkerId(2, 0L)
val appId = 0
val appName = "app"
val resource = Resource(1)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala
----------------------------------------------------------------------
diff --git a/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala b/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala
index da5001e..fcdbf14 100644
--- a/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala
+++ b/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala
@@ -19,6 +19,7 @@ package io.gearpump.experiments.distributeservice
import akka.actor.ActorSystem
import akka.testkit.{TestActorRef, TestProbe}
+import io.gearpump.WorkerId
import io.gearpump.cluster.AppMasterToMaster.{RequestResource, GetAllWorkers, RegisterAppMaster}
import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor
import io.gearpump.cluster.MasterToAppMaster.{ResourceAllocated, WorkerList, AppMasterRegistered}
@@ -41,7 +42,7 @@ class DistServiceAppMasterSpec extends WordSpec with Matchers with BeforeAndAfte
val appId = 0
val userName = "test"
val masterExecutorId = 0
- val workerList = List(1, 2, 3)
+ val workerList = List(WorkerId(1, 0L), WorkerId(2, 0L), WorkerId(3, 0L))
val resource = Resource(1)
val appJar = None
val appDescription = AppDescription("app0", classOf[DistServiceAppMaster].getName, UserConfig.empty)
@@ -64,7 +65,7 @@ class DistServiceAppMasterSpec extends WordSpec with Matchers with BeforeAndAfte
workerList.foreach { workerId =>
mockMaster.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), workerId, relaxation = Relaxation.SPECIFICWORKER)))
}
- mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(resource, mockWorker1.ref, 1))))
+ mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(resource, mockWorker1.ref, WorkerId(1, 0L)))))
mockWorker1.expectMsgClass(classOf[LaunchExecutor])
mockWorker1.reply(RegisterActorSystem(ActorUtil.getSystemAddress(system).toString))
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala
index 2830390..92ed3ec 100644
--- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala
+++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala
@@ -183,7 +183,7 @@ class RestServiceSpec extends TestSpecBase {
"retrieve 1 master for a non-HA cluster" in {
// exercise
val masterSummary = restClient.queryMaster()
- masterSummary.cluster shouldEqual cluster.getMastersAddresses
+ masterSummary.cluster.map(_.toTuple) shouldEqual cluster.getMastersAddresses
masterSummary.aliveFor should be > 0L
masterSummary.masterStatus shouldEqual MasterStatus.Synced
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala
index 1d7ddd9..9d7e39f 100644
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala
+++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala
@@ -25,7 +25,7 @@ import scala.sys.process._
* A helper to instantiate the base image for different usage.
*/
class BaseContainer(val host: String, command: String,
- masterAddrs: Seq[(String, Int)],
+ masterAddrs: List[(String, Int)],
tunnelPorts: Set[Int] = Set.empty) {
private val IMAGE_NAME = "stanleyxu2005/gearpump-launcher"
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala
index 7c9a69a..c17a20d 100644
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala
+++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala
@@ -17,6 +17,7 @@
*/
package io.gearpump.integrationtest.minicluster
+import io.gearpump.cluster.master.MasterNode
import io.gearpump.integrationtest.{Docker, Util}
import org.apache.log4j.Logger
@@ -32,15 +33,15 @@ class MiniCluster {
private val REST_SERVICE_PORT = 8090
private val MASTER_PORT = 3000
- private val MASTER_ADDRS = {
+ private val MASTER_ADDRS: List[(String, Int)] = {
(0 to 0).map(index =>
("master" + index, MASTER_PORT)
- )
+ ).toList
}
- lazy val commandLineClient = new CommandLineClient(getMasterHosts.head)
+ lazy val commandLineClient: CommandLineClient = new CommandLineClient(getMasterHosts.head)
- lazy val restClient = {
+ lazy val restClient: RestClient = {
val client = new RestClient(getMasterHosts.head, REST_SERVICE_PORT)
client
}
@@ -129,16 +130,16 @@ class MiniCluster {
start()
}
- def getMastersAddresses = {
+ def getMastersAddresses: List[(String, Int)] = {
MASTER_ADDRS
}
- def getMasterHosts = {
+ def getMasterHosts: List[String] = {
MASTER_ADDRS.map({ case (host, port) => host })
}
- def getWorkerHosts = {
- workers
+ def getWorkerHosts: List[String] = {
+ workers.toList
}
def nodeIsOnline(host: String): Boolean = {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
index f11ff9c..7d83c93 100644
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
+++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
@@ -18,6 +18,7 @@
package io.gearpump.integrationtest.minicluster
import com.typesafe.config.{Config, ConfigFactory}
+import io.gearpump.WorkerId
import io.gearpump.cluster.{AppJar, MasterToAppMaster}
import io.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersData}
import io.gearpump.cluster.MasterToClient.HistoryMetrics
@@ -36,6 +37,7 @@ import io.gearpump.util.{Constants, Graph}
import org.apache.log4j.Logger
import upickle.Js
import upickle.default._
+import io.gearpump.services.util.UpickleUtil._
/**
* A REST client to operate a Gearpump cluster
@@ -168,14 +170,14 @@ class RestClient(host: String, port: Int) {
ConfigFactory.parseString(resp)
}
- def queryWorkerMetrics(workerId: Int, current: Boolean): HistoryMetrics = {
+ def queryWorkerMetrics(workerId: WorkerId, current: Boolean): HistoryMetrics = {
val args = if (current) "?readLatest=true" else ""
- val resp = callApi(s"worker/$workerId/metrics/worker$workerId?$args")
+ val resp = callApi(s"worker/${WorkerId.render(workerId)}/metrics/worker$workerId?$args")
decodeAs[HistoryMetrics](resp)
}
- def queryWorkerConfig(workerId: Int): Config = {
- val resp = callApi(s"worker/$workerId/config")
+ def queryWorkerConfig(workerId: WorkerId): Config = {
+ val resp = callApi(s"worker/${WorkerId.render(workerId)}/config")
ConfigFactory.parseString(resp)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
index a018280..dbcc53d 100644
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
+++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
@@ -22,7 +22,7 @@ import backtype.storm.utils.{Utils, DRPCClient}
import io.gearpump.integrationtest.{Util, Docker}
import io.gearpump.integrationtest.minicluster.{RestClient, BaseContainer}
-class StormClient(masterAddrs: Seq[(String, Int)], restClient: RestClient) {
+class StormClient(masterAddrs: List[(String, Int)], restClient: RestClient) {
private val CONFIG_FILE = "/opt/gearpump/storm.yaml"
private val DRPC_HOST = "storm0"
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/project/Build.scala
----------------------------------------------------------------------
diff --git a/project/Build.scala b/project/Build.scala
index a99530f..b521e53 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -317,8 +317,8 @@ object Build extends sbt.Build {
checksums := Seq(""),
requiresDOM := true,
libraryDependencies ++= Seq(
- "com.lihaoyi" %%% "upickle" % upickleVersion,
- "com.lihaoyi" %%% "utest" % "0.3.1"
+// "com.lihaoyi" %%% "upickle" % upickleVersion,
+// "com.lihaoyi" %%% "utest" % "0.3.1"
),
scalaJSStage in Global := FastOptStage,
testFrameworks += new TestFramework("utest.runner.Framework"),
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/main/scala/io/gearpump/services/AdminService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/AdminService.scala b/services/jvm/src/main/scala/io/gearpump/services/AdminService.scala
index b159a19..aa88d13 100644
--- a/services/jvm/src/main/scala/io/gearpump/services/AdminService.scala
+++ b/services/jvm/src/main/scala/io/gearpump/services/AdminService.scala
@@ -23,7 +23,7 @@ import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import akka.stream.{Materializer}
import io.gearpump.util.{Constants, Util}
-
+import io.gearpump.services.util.UpickleUtil._
/**
* AdminService is for cluster-wide managements. it is not related with
* specific application.
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala b/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala
index 98ce0b1..30ec061 100644
--- a/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala
+++ b/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala
@@ -39,7 +39,7 @@ import io.gearpump.util.ActorUtil.{askActor, askAppMaster}
import io.gearpump.util.FileDirective._
import io.gearpump.util.{Constants, Util}
import upickle.default.{read, write}
-
+import io.gearpump.services.util.UpickleUtil._
import scala.util.{Failure, Success, Try}
/**
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
index 3e2b9ca..91f25fe 100644
--- a/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
+++ b/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
@@ -44,6 +44,7 @@ import io.gearpump.streaming.{ProcessorDescription, ProcessorId, StreamApplicati
import io.gearpump.util.ActorUtil._
import io.gearpump.util.FileDirective._
import io.gearpump.util.{Constants, Graph, Util}
+import io.gearpump.services.util.UpickleUtil._
import scala.collection.JavaConversions._
import scala.concurrent.Future
@@ -154,7 +155,6 @@ class MasterService(val master: ActorRef,
path("submitdag") {
post {
entity(as[String]) { request =>
- import io.gearpump.services.util.UpickleUtil._
val msg = java.net.URLDecoder.decode(request, "UTF-8")
val submitApplicationRequest = read[SubmitApplicationRequest](msg)
import submitApplicationRequest.{appName, dag, processors, userconfig}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/main/scala/io/gearpump/services/SecurityService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/SecurityService.scala b/services/jvm/src/main/scala/io/gearpump/services/SecurityService.scala
index 8e03c43..a5bbe2f 100644
--- a/services/jvm/src/main/scala/io/gearpump/services/SecurityService.scala
+++ b/services/jvm/src/main/scala/io/gearpump/services/SecurityService.scala
@@ -34,10 +34,12 @@ import io.gearpump.services.SecurityService.{User, UserSession}
import io.gearpump.services.security.oauth2.OAuth2Authenticator
import io.gearpump.util.{Constants, LogUtil}
import upickle.default.{write}
+import io.gearpump.services.util.UpickleUtil._
import io.gearpump.security.{Authenticator => BaseAuthenticator}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
+
/**
* When user cannot be authenticated, will reject with 401 AuthenticationFailedRejection
* When user can be authenticated, but are not authorized to access certail resource, will
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/main/scala/io/gearpump/services/StaticService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/StaticService.scala b/services/jvm/src/main/scala/io/gearpump/services/StaticService.scala
index b38e5cb..09180c3 100644
--- a/services/jvm/src/main/scala/io/gearpump/services/StaticService.scala
+++ b/services/jvm/src/main/scala/io/gearpump/services/StaticService.scala
@@ -23,6 +23,7 @@ import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import akka.stream.{Materializer}
import io.gearpump.util.{Constants, Util}
+import io.gearpump.services.util.UpickleUtil._
/**
* static resource files.
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala b/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala
index 9ecd078..5d552d6 100644
--- a/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala
+++ b/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala
@@ -22,6 +22,7 @@ import akka.actor.{ActorRef, ActorSystem}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.stream.Materializer
+import io.gearpump.WorkerId
import io.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData, GetAllWorkers}
import io.gearpump.cluster.ClientToMaster._
import io.gearpump.services.SupervisorService.{Path, Status}
@@ -30,6 +31,7 @@ import io.gearpump.util.ActorUtil._
import scala.concurrent.Future
import scala.util.{Failure, Success}
import upickle.default.{read, write}
+import io.gearpump.services.util.UpickleUtil._
class SupervisorService(val master: ActorRef, val supervisor: ActorRef, override val system: ActorSystem)
extends BasicService {
@@ -80,10 +82,10 @@ class SupervisorService(val master: ActorRef, val supervisor: ActorRef, override
}
}
} ~
- path("removeworker" / IntNumber) { workerId =>
+ path("removeworker" / Segment) { workerIdString =>
post {
authorize {
-
+ val workerId = WorkerId.parse(workerIdString)
def future(): Future[CommandResult] = {
askWorker[WorkerData](master, workerId, GetWorkerData(workerId)).flatMap{workerData =>
val containerId = workerData.workerDescription.resourceManagerContainerId
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/main/scala/io/gearpump/services/WorkerService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/WorkerService.scala b/services/jvm/src/main/scala/io/gearpump/services/WorkerService.scala
index 5efce44..a1edbe4 100644
--- a/services/jvm/src/main/scala/io/gearpump/services/WorkerService.scala
+++ b/services/jvm/src/main/scala/io/gearpump/services/WorkerService.scala
@@ -21,12 +21,14 @@ package io.gearpump.services
import akka.actor.{ActorSystem, ActorRef}
import akka.http.scaladsl.server.Directives._
import akka.stream.{Materializer}
+import io.gearpump.WorkerId
import io.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData}
import io.gearpump.cluster.ClientToMaster.{ReadOption, QueryHistoryMetrics, QueryWorkerConfig}
import io.gearpump.cluster.ClusterConfig
import io.gearpump.cluster.MasterToClient.{HistoryMetrics, WorkerConfig}
import io.gearpump.util.ActorUtil._
import io.gearpump.util.Constants
+import io.gearpump.services.util.UpickleUtil._
import scala.util.{Failure, Success}
@@ -37,8 +39,9 @@ class WorkerService(val master: ActorRef, override val system: ActorSystem)
private val systemConfig = system.settings.config
private val concise = systemConfig.getBoolean(Constants.GEARPUMP_SERVICE_RENDER_CONFIG_CONCISE)
- override def doRoute(implicit mat: Materializer) = pathPrefix("worker" / IntNumber) { workerId =>
+ override def doRoute(implicit mat: Materializer) = pathPrefix("worker" / Segment) { workerIdString =>
pathEnd {
+ val workerId = WorkerId.parse(workerIdString)
onComplete(askWorker[WorkerData](master, workerId, GetWorkerData(workerId))) {
case Success(value: WorkerData) =>
complete(write(value.workerDescription))
@@ -46,6 +49,7 @@ class WorkerService(val master: ActorRef, override val system: ActorSystem)
}
} ~
path("config") {
+ val workerId = WorkerId.parse(workerIdString)
onComplete(askWorker[WorkerConfig](master, workerId, QueryWorkerConfig(workerId))) {
case Success(value: WorkerConfig) =>
val config = Option(value.config).map(ClusterConfig.render(_, concise)).getOrElse("{}")
@@ -55,6 +59,7 @@ class WorkerService(val master: ActorRef, override val system: ActorSystem)
}
} ~
path("metrics" / RestPath ) { path =>
+ val workerId = WorkerId.parse(workerIdString)
parameter(ReadOption.Key ? ReadOption.ReadLatest) { readOption =>
val query = QueryHistoryMetrics(path.head.toString, readOption)
onComplete(askWorker[HistoryMetrics](master, workerId, query)) {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/main/scala/io/gearpump/services/util/UpickleUtil.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/util/UpickleUtil.scala b/services/jvm/src/main/scala/io/gearpump/services/util/UpickleUtil.scala
index 8ca2c65..0ae4505 100644
--- a/services/jvm/src/main/scala/io/gearpump/services/util/UpickleUtil.scala
+++ b/services/jvm/src/main/scala/io/gearpump/services/util/UpickleUtil.scala
@@ -18,6 +18,7 @@
package io.gearpump.services.util
+import io.gearpump.WorkerId
import io.gearpump.util.Graph
import upickle.Js
@@ -31,4 +32,14 @@ object UpickleUtil {
val edgeList = upickle.default.readJs[List[(Int, String, Int)]](edges._2)
Graph(vertexList, edgeList)
}
+
+ implicit val workerIdReader: upickle.default.Reader[WorkerId] = upickle.default.Reader[WorkerId] {
+ case Js.Str(str) =>
+ WorkerId.parse(str)
+ }
+
+ implicit val workerIdWriter: upickle.default.Writer[WorkerId] = upickle.default.Writer[WorkerId] {
+ case workerId: WorkerId =>
+ Js.Str(WorkerId.render(workerId))
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala
index d4ea2b7..738bbad 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala
@@ -23,7 +23,7 @@ import io.gearpump.cluster.TestUtil
import akka.http.scaladsl.testkit.{ScalatestRouteTest, RouteTestTimeout}
import com.typesafe.config.{Config, ConfigFactory}
import org.scalatest.{BeforeAndAfterAll, Matchers, FlatSpec}
-
+import io.gearpump.services.util.UpickleUtil._
import scala.concurrent.duration._
import scala.util.Try
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala
index 3cfe162..7b98cb6 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala
@@ -37,7 +37,7 @@ import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
import org.slf4j.Logger
import upickle.default.read
import akka.http.scaladsl.testkit.ScalatestRouteTest
-
+import io.gearpump.services.util.UpickleUtil._
import scala.concurrent.duration._
import scala.util.{Success, Try}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala
index 3a1c4fe..1cfc01a 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala
@@ -28,6 +28,7 @@ import akka.stream.scaladsl.Source
import akka.testkit.TestActor.{AutoPilot, KeepRunning}
import akka.testkit.TestProbe
import com.typesafe.config.{Config, ConfigFactory}
+import io.gearpump.WorkerId
import io.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, GetMasterData, GetWorkerData, MasterData, WorkerData}
import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryMasterConfig, ResolveWorkerId, SubmitApplication}
import io.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersData, AppMastersDataRequest, WorkerList}
@@ -44,6 +45,7 @@ import scala.concurrent.{Future, ExecutionContext}
import scala.concurrent.duration._
import scala.util.{Success, Try}
import akka.stream.scaladsl.FileIO
+import io.gearpump.services.util.UpickleUtil._
class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with
Matchers with BeforeAndAfterAll {
@@ -87,9 +89,9 @@ class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with
sender ! AppMastersData(List.empty[AppMasterData])
KeepRunning
case GetAllWorkers =>
- sender ! WorkerList(List(0))
+ sender ! WorkerList(List(WorkerId(0, 0L)))
KeepRunning
- case ResolveWorkerId(0) =>
+ case ResolveWorkerId(WorkerId(0, 0L)) =>
sender ! ResolveWorkerIdResult(Success(mockWorker.ref))
KeepRunning
case QueryHistoryMetrics(path, _, _, _) =>
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala
index c276286..10b45b9 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala
@@ -21,6 +21,7 @@ package io.gearpump.services
import akka.http.scaladsl.testkit.{RouteTestTimeout}
import com.typesafe.config.Config
import io.gearpump.cluster.TestUtil
+import io.gearpump.services.util.UpickleUtil._
import org.scalatest.{BeforeAndAfterAll, Matchers, FlatSpec}
import akka.actor.{ActorSystem}
import akka.http.scaladsl.server._
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala
index 2ab62e8..501776e 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala
@@ -24,6 +24,7 @@ import akka.testkit.TestProbe
import com.typesafe.config.{Config, ConfigFactory}
import io.gearpump.cluster.TestUtil
import io.gearpump.util.Constants
+import io.gearpump.services.util.UpickleUtil._
import org.scalatest.{BeforeAndAfterAll, Matchers, FlatSpec}
import scala.util.Try
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala
index e9256a4..dc8d5a7 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala
@@ -23,12 +23,14 @@ import akka.http.scaladsl.model.headers.`Cache-Control`
import akka.testkit.TestActor.{AutoPilot, KeepRunning}
import akka.testkit.{TestKit, TestProbe}
import com.typesafe.config.{Config, ConfigFactory}
+import io.gearpump.WorkerId
import io.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData}
import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig, ResolveWorkerId}
import io.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, ResolveWorkerIdResult, WorkerConfig}
import io.gearpump.cluster.TestUtil
import io.gearpump.cluster.worker.WorkerSummary
import io.gearpump.jarstore.JarStoreService
+import io.gearpump.services.util.UpickleUtil._
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
import scala.concurrent.duration._
@@ -52,10 +54,10 @@ class WorkerServiceSpec extends FlatSpec with ScalatestRouteTest with Matchers
mockWorker.setAutoPilot {
new AutoPilot {
def run(sender: ActorRef, msg: Any) = msg match {
- case GetWorkerData(appId) =>
+ case GetWorkerData(workerId) =>
sender ! WorkerData(WorkerSummary.empty)
KeepRunning
- case QueryWorkerConfig(appId) =>
+ case QueryWorkerConfig(workerId) =>
sender ! WorkerConfig(null)
KeepRunning
case QueryHistoryMetrics(path, _, _, _) =>
@@ -79,7 +81,7 @@ class WorkerServiceSpec extends FlatSpec with ScalatestRouteTest with Matchers
"ConfigQueryService" should "return config for worker" in {
implicit val customTimeout = RouteTestTimeout(15.seconds)
- (Get(s"/api/$REST_VERSION/worker/1/config") ~> workerRoute) ~> check{
+ (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(0, 0L))}/config") ~> workerRoute) ~> check{
val responseBody = responseAs[String]
val config = Try(ConfigFactory.parseString(responseBody))
assert(config.isSuccess)
@@ -88,7 +90,7 @@ class WorkerServiceSpec extends FlatSpec with ScalatestRouteTest with Matchers
it should "return WorkerData" in {
implicit val customTimeout = RouteTestTimeout(15.seconds)
- (Get(s"/api/$REST_VERSION/worker/1") ~> workerRoute) ~> check{
+ (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(1, 0L))}") ~> workerRoute) ~> check{
val responseBody = responseAs[String]
val config = Try(ConfigFactory.parseString(responseBody))
assert(config.isSuccess)
@@ -102,7 +104,7 @@ class WorkerServiceSpec extends FlatSpec with ScalatestRouteTest with Matchers
"MetricsQueryService" should "return history metrics" in {
implicit val customTimeout = RouteTestTimeout(15.seconds)
- (Get(s"/api/$REST_VERSION/worker/0/metrics/worker") ~> workerRoute) ~> check {
+ (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(0, 0L))}/metrics/worker") ~> workerRoute) ~> check {
val responseBody = responseAs[String]
val config = Try(ConfigFactory.parseString(responseBody))
assert(config.isSuccess)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/streaming/src/main/scala/io/gearpump/streaming/appmaster/AppMaster.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/AppMaster.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/AppMaster.scala
index fe90c6a..6fcc47c 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/AppMaster.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/AppMaster.scala
@@ -82,7 +82,8 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription) extends Ap
private val systemConfig = context.system.settings.config
private var lastFailure = LastFailure(0L, null)
- private val appMasterBrief = ExecutorBrief(APPMASTER_DEFAULT_EXECUTOR_ID, self.path.toString, Option(appContext.workerInfo).map(_.workerId).getOrElse(-1), "active")
+ private val appMasterBrief = ExecutorBrief(APPMASTER_DEFAULT_EXECUTOR_ID,
+ self.path.toString, Option(appContext.workerInfo).map(_.workerId).getOrElse(WorkerId.unspecified), "active")
private val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig)
@@ -93,7 +94,7 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription) extends Ap
private val appMasterExecutorSummary = ExecutorSummary(
APPMASTER_DEFAULT_EXECUTOR_ID,
- Option(appContext.workerInfo).map(_.workerId).getOrElse(-1),
+ Option(appContext.workerInfo).map(_.workerId).getOrElse(WorkerId.unspecified),
self.path.toString,
logFile.getAbsolutePath,
status = "Active",
@@ -334,6 +335,6 @@ object AppMaster {
class ServiceNotAvailableException(reason: String) extends Exception(reason)
- case class ExecutorBrief(executorId: ExecutorId, executor: String, workerId: Int, status: String)
+ case class ExecutorBrief(executorId: ExecutorId, executor: String, workerId: WorkerId, status: String)
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/streaming/src/main/scala/io/gearpump/streaming/appmaster/ExecutorManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/ExecutorManager.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/ExecutorManager.scala
index 526fbc3..ebe6c11 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/ExecutorManager.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/ExecutorManager.scala
@@ -22,6 +22,7 @@ import akka.actor.SupervisorStrategy.Stop
import akka.actor._
import akka.remote.RemoteScope
import com.typesafe.config.Config
+import io.gearpump.WorkerId
import io.gearpump.cluster.AppMasterToWorker.ChangeExecutorResource
import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, ExecutorSystemStarted, StartExecutorSystemTimeout, StartExecutorSystems}
import io.gearpump.cluster.appmaster.WorkerInfo
@@ -167,7 +168,7 @@ private[appmaster] object ExecutorManager {
case object GetExecutorInfo
- case class ExecutorStarted(executorId: Int, resource: Resource, workerId: Int, boundedJar: Option[AppJar])
+ case class ExecutorStarted(executorId: Int, resource: Resource, workerId: WorkerId, boundedJar: Option[AppJar])
case class ExecutorStopped(executorId: Int)
case class SetTaskManager(taskManager: ActorRef)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/streaming/src/main/scala/io/gearpump/streaming/appmaster/JarScheduler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/JarScheduler.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/JarScheduler.scala
index fa2e444..e769a0d 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/JarScheduler.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/JarScheduler.scala
@@ -19,7 +19,7 @@ package io.gearpump.streaming.appmaster
import akka.actor._
import com.typesafe.config.Config
-import io.gearpump.TimeStamp
+import io.gearpump.{WorkerId, TimeStamp}
import io.gearpump.streaming.task.{StartClock, TaskId}
import io.gearpump.streaming.{ProcessorDescription, DAG}
import io.gearpump.cluster.AppJar
@@ -52,7 +52,7 @@ class JarScheduler(appId : Int, appName: String, config: Config, factory: ActorR
(actor ? GetRequestDetails).asInstanceOf[Future[Array[ResourceRequestDetail]]]
}
- def scheduleTask(appJar: AppJar, workerId: Int, executorId: Int, resource: Resource): Future[List[TaskId]] = {
+ def scheduleTask(appJar: AppJar, workerId: WorkerId, executorId: Int, resource: Resource): Future[List[TaskId]] = {
(actor ? ScheduleTask(appJar, workerId, executorId, resource)).asInstanceOf[Future[List[TaskId]]]
}
@@ -71,7 +71,7 @@ object JarScheduler{
case object GetRequestDetails
- case class ScheduleTask(appJar: AppJar, workerId: Int, executorId: Int, resource: Resource)
+ case class ScheduleTask(appJar: AppJar, workerId: WorkerId, executorId: Int, resource: Resource)
case class ExecutorFailed(executorId: Int)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskLocator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskLocator.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskLocator.scala
index 10ce15b..c456a06 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskLocator.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskLocator.scala
@@ -19,6 +19,7 @@ package io.gearpump.streaming.appmaster
import com.typesafe.config.{ConfigValueFactory, ConfigFactory, ConfigRenderOptions, Config}
import TaskLocator.{Localities, WorkerLocality, NonLocality, Locality}
+import io.gearpump.WorkerId
import io.gearpump.streaming.Constants
import io.gearpump.streaming.task.TaskId
import scala.util.Try
@@ -56,12 +57,10 @@ object TaskLocator {
trait Locality
- case class WorkerLocality(workerId: Int) extends Locality
+ case class WorkerLocality(workerId: WorkerId) extends Locality
object NonLocality extends Locality
- type WorkerId = Int
-
case class Localities(localities: Map[WorkerId, Array[TaskId]])
object Localities {
@@ -70,7 +69,7 @@ object TaskLocator {
def fromJson(json: String): Localities = {
val localities = ConfigFactory.parseString(json).getAnyRef("localities")
.asInstanceOf[java.util.Map[String, String]].asScala.map { pair =>
- val workerId: WorkerId = pair._1.toInt
+ val workerId: WorkerId = WorkerId.parse(pair._1)
val tasks = pair._2.split(",").map { task =>
val pattern(processorId, taskIndex) = task
TaskId(processorId.toInt, taskIndex.toInt)
@@ -82,7 +81,7 @@ object TaskLocator {
def toJson(localities: Localities): String = {
val map = localities.localities.toList.map {pair =>
- (pair._1.toString, pair._2.map(task => s"task_${task.processorId}_${task.index}").mkString(","))
+ (WorkerId.render(pair._1), pair._2.map(task => s"task_${task.processorId}_${task.index}").mkString(","))
}.toMap.asJava
ConfigFactory.empty().withValue("localities", ConfigValueFactory.fromAnyRef(map)).
root.render(ConfigRenderOptions.concise())