You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/04/26 11:42:25 UTC

[15/49] incubator-gearpump git commit: GEARPUMP-8, fix "two machines can possibly have same worker id for single-master cluster"

GEARPUMP-8, fix "two machines can possibly have same worker id for single-master cluster"


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/e7a7f542
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/e7a7f542
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/e7a7f542

Branch: refs/heads/master
Commit: e7a7f54272a263eebb156510abf88616088dff34
Parents: 02597f7
Author: Sean Zhong <cl...@gmail.com>
Authored: Sat Apr 2 12:34:19 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Tue Apr 26 14:24:43 2016 +0800

----------------------------------------------------------------------
 .gitignore                                      |  1 +
 .sbtopts                                        |  1 +
 conf/gear.conf                                  |  2 +-
 .../io/gearpump/cluster/ClusterMessage.scala    | 10 ++---
 .../cluster/appmaster/ExecutorSystem.scala      |  3 +-
 .../appmaster/ExecutorSystemScheduler.scala     |  3 +-
 .../cluster/master/AppMasterLauncher.scala      |  6 +--
 .../gearpump/cluster/master/MasterSummary.scala |  6 ++-
 .../gearpump/cluster/scheduler/Resource.scala   |  5 ++-
 .../gearpump/cluster/worker/WorkerSummary.scala |  5 ++-
 core/src/main/scala/io/gearpump/package.scala   | 44 ++++++++++++++++++++
 .../main/scala/io/gearpump/util/ActorUtil.scala |  3 +-
 .../appmaster/ExecutorSystemLauncherSpec.scala  |  3 +-
 .../appmaster/ExecutorSystemSchedulerSpec.scala |  5 ++-
 .../cluster/master/AppMasterLauncherSpec.scala  |  9 ++--
 .../io/gearpump/cluster/DaemonMessage.scala     |  7 ++--
 .../io/gearpump/cluster/master/Master.scala     | 14 +++++--
 .../cluster/scheduler/PriorityScheduler.scala   |  3 +-
 .../gearpump/cluster/scheduler/Scheduler.scala  |  4 +-
 .../io/gearpump/cluster/worker/Worker.scala     |  3 +-
 .../io/gearpump/cluster/main/MainSpec.scala     | 11 -----
 .../scheduler/PrioritySchedulerSpec.scala       | 29 ++++++-------
 .../io/gearpump/cluster/worker/WorkerSpec.scala |  3 +-
 docs/dev-rest-api.md                            | 14 +++----
 .../DistShellAppMasterSpec.scala                |  5 ++-
 .../distributedshell/ShellExecutorSpec.scala    |  3 +-
 .../DistServiceAppMasterSpec.scala              |  5 ++-
 .../checklist/RestServiceSpec.scala             |  2 +-
 .../minicluster/BaseContainer.scala             |  2 +-
 .../minicluster/MiniCluster.scala               | 17 ++++----
 .../minicluster/RestClient.scala                | 10 +++--
 .../integrationtest/storm/StormClient.scala     |  2 +-
 project/Build.scala                             |  4 +-
 .../io/gearpump/services/AdminService.scala     |  2 +-
 .../io/gearpump/services/AppMasterService.scala |  2 +-
 .../io/gearpump/services/MasterService.scala    |  2 +-
 .../io/gearpump/services/SecurityService.scala  |  2 +
 .../io/gearpump/services/StaticService.scala    |  1 +
 .../gearpump/services/SupervisorService.scala   |  6 ++-
 .../io/gearpump/services/WorkerService.scala    |  7 +++-
 .../io/gearpump/services/util/UpickleUtil.scala | 11 +++++
 .../io/gearpump/services/AdminServiceSpec.scala |  2 +-
 .../services/AppMasterServiceSpec.scala         |  2 +-
 .../gearpump/services/MasterServiceSpec.scala   |  6 ++-
 .../gearpump/services/SecurityServiceSpec.scala |  1 +
 .../gearpump/services/StaticServiceSpec.scala   |  1 +
 .../gearpump/services/WorkerServiceSpec.scala   | 12 +++---
 .../streaming/appmaster/AppMaster.scala         |  7 ++--
 .../streaming/appmaster/ExecutorManager.scala   |  3 +-
 .../streaming/appmaster/JarScheduler.scala      |  6 +--
 .../streaming/appmaster/TaskLocator.scala       |  9 ++--
 .../streaming/appmaster/TaskSchedulerImpl.scala | 20 ++++-----
 .../gearpump/streaming/executor/Executor.scala  |  5 ++-
 .../streaming/appmaster/AppMasterSpec.scala     |  8 ++--
 .../appmaster/ExecutorManagerSpec.scala         |  6 +--
 .../streaming/appmaster/JarSchedulerSpec.scala  |  5 ++-
 .../streaming/appmaster/TaskLocatorSpec.scala   |  3 +-
 .../streaming/appmaster/TaskManagerSpec.scala   |  6 +--
 .../streaming/appmaster/TaskSchedulerSpec.scala | 26 ++++++------
 .../streaming/executor/ExecutorSpec.scala       |  3 +-
 60 files changed, 251 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 0932561..1efaafc 100644
--- a/.gitignore
+++ b/.gitignore
@@ -33,6 +33,7 @@ out/
 .idea_modules/
 atlassian-ide-plugin.xml
 com_crashlytics_export_strings.xml
+intellij
 
 # Eclipse
 .metadata

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/.sbtopts
----------------------------------------------------------------------
diff --git a/.sbtopts b/.sbtopts
index 161b349..f6f24bf 100644
--- a/.sbtopts
+++ b/.sbtopts
@@ -1,3 +1,4 @@
 -J-XX:+CMSClassUnloadingEnabled
 -J-XX:+UseConcMarkSweepGC
 -J-Xss6M
+-J-XX:MaxMetaspaceSize=512m

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/conf/gear.conf
----------------------------------------------------------------------
diff --git a/conf/gear.conf b/conf/gear.conf
index e7f14af..c2352fe 100644
--- a/conf/gear.conf
+++ b/conf/gear.conf
@@ -547,4 +547,4 @@ gearpump-linux {
   ### On windows, the value must be larger than 10ms, check
   ### https://github.com/akka/akka/blob/master/akka-actor/src/main/scala/akka/actor/Scheduler.scala#L204
   akka.scheduler.tick-duration = 1
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala b/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala
index 2280920..aac45ab 100644
--- a/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala
+++ b/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala
@@ -20,7 +20,7 @@ package io.gearpump.cluster
 
 import akka.actor.ActorRef
 import com.typesafe.config.Config
-import io.gearpump.TimeStamp
+import io.gearpump.{WorkerId, TimeStamp}
 import io.gearpump.cluster.MasterToAppMaster.AppMasterStatus
 import io.gearpump.cluster.master.MasterSummary
 import io.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest}
@@ -53,7 +53,7 @@ object ClientToMaster {
   case class ShutdownApplication(appId: Int)
   case class ResolveAppId(appId: Int)
 
-  case class ResolveWorkerId(workerId: Int)
+  case class ResolveWorkerId(workerId: WorkerId)
 
   case object GetJarStoreServer
 
@@ -61,7 +61,7 @@ object ClientToMaster {
 
   case class QueryAppMasterConfig(appId: Int)
 
-  case class QueryWorkerConfig(workerId: Int)
+  case class QueryWorkerConfig(workerId: WorkerId)
 
   case object QueryMasterConfig
 
@@ -144,7 +144,7 @@ object AppMasterToMaster {
     extends AppMasterSummary
 
   case object GetAllWorkers
-  case class GetWorkerData(workerId: Int)
+  case class GetWorkerData(workerId: WorkerId)
   case class WorkerData(workerDescription: WorkerSummary)
 
   case object GetMasterData
@@ -181,7 +181,7 @@ object MasterToAppMaster {
 
   case class ReplayFromTimestampWindowTrailingEdge(appId: Int)
 
-  case class WorkerList(workers: List[Int])
+  case class WorkerList(workers: List[WorkerId])
 }
 
 object AppMasterToWorker {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala
index 94f7aa4..400b61c 100644
--- a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala
+++ b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala
@@ -19,10 +19,11 @@
 package io.gearpump.cluster.appmaster
 
 import akka.actor.{ActorRef, Address, PoisonPill}
+import io.gearpump.WorkerId
 import io.gearpump.cluster.scheduler.Resource
 import io.gearpump.util.ActorSystemBooter.BindLifeCycle
 
-case class WorkerInfo(workerId: Int, ref: ActorRef)
+case class WorkerInfo(workerId: WorkerId, ref: ActorRef)
 
 /**
  * This contains JVM configurations to start an executor system

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala
index 6af68eb..aa980b5 100644
--- a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala
+++ b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala
@@ -20,6 +20,7 @@ package io.gearpump.cluster.appmaster
 
 import akka.actor._
 import com.typesafe.config.Config
+import io.gearpump.WorkerId
 import io.gearpump.cluster.AppMasterToMaster.RequestResource
 import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated
 import io.gearpump.cluster._
@@ -105,7 +106,7 @@ class ExecutorSystemScheduler (appId: Int, masterProxy: ActorRef,
     case LaunchExecutorSystemRejected(resource, reason, session) =>
       if (isSessionAlive(session)) {
         LOG.error(s"Failed to launch executor system, due to $reason, will ask master to allocate new resources $resource")
-        resourceAgents.get(session).map(_ ! RequestResource(appId, ResourceRequest(resource)))
+        resourceAgents.get(session).map(_ ! RequestResource(appId, ResourceRequest(resource, WorkerId.unspecified)))
       }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala b/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala
index ffd1835..d391bba 100644
--- a/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala
+++ b/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala
@@ -40,7 +40,7 @@ import org.slf4j.Logger
 import scala.collection.JavaConverters._
 import scala.concurrent.duration.Duration
 import scala.util.{Failure, Success}
-
+import io.gearpump.WorkerId
 /**
  *
  * AppMasterLauncher is a child Actor of AppManager, it is responsible
@@ -59,7 +59,7 @@ class AppMasterLauncher(
   val appMasterAkkaConfig: Config = app.clusterConfig
 
   LOG.info(s"Ask Master resource to start AppMaster $appId...")
-  master ! RequestResource(appId, ResourceRequest(Resource(1)))
+  master ! RequestResource(appId, ResourceRequest(Resource(1), WorkerId.unspecified))
 
   def receive : Receive = waitForResourceAllocation
 
@@ -91,7 +91,7 @@ class AppMasterLauncher(
     case ExecutorLaunchRejected(reason, ex) =>
       LOG.error(s"Executor Launch failed reason:$reason", ex)
       LOG.info(s"reallocate resource $resource to start appmaster")
-      master ! RequestResource(appId, ResourceRequest(resource))
+      master ! RequestResource(appId, ResourceRequest(resource, WorkerId.unspecified))
       context.become(waitForResourceAllocation)
     case RegisterActorSystem(systemPath) =>
       LOG.info(s"Received RegisterActorSystem $systemPath for AppMaster")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala b/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala
index 39e4a41..8847df2 100644
--- a/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala
+++ b/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala
@@ -26,7 +26,11 @@ object MasterStatus {
 }
 
 
-case class MasterNode(host: String, port: Int)
+case class MasterNode(host: String, port: Int) {
+  def toTuple: (String, Int) = {
+    (host, port)
+  }
+}
 
 /**
  * Master information for REST API call

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala b/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala
index 94c7532..da17829 100644
--- a/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala
+++ b/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala
@@ -18,6 +18,7 @@
 package io.gearpump.cluster.scheduler
 
 import akka.actor.ActorRef
+import io.gearpump.WorkerId
 
 case class Resource(slots : Int) {
   def +(other : Resource): Resource = Resource(slots + other.slots)
@@ -51,9 +52,9 @@ object Relaxation extends Enumeration{
 
 import Relaxation._
 import Priority._
-case class ResourceRequest(resource: Resource,  workerId: Int = 0, priority: Priority = NORMAL, relaxation: Relaxation = ANY, executorNum: Int = 1)
+case class ResourceRequest(resource: Resource,  workerId: WorkerId, priority: Priority = NORMAL, relaxation: Relaxation = ANY, executorNum: Int = 1)
 
-case class ResourceAllocation(resource : Resource, worker : ActorRef, workerId : Int)
+case class ResourceAllocation(resource : Resource, worker : ActorRef, workerId : WorkerId)
 
 object Resource {
   def empty = new Resource(0)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala b/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala
index b1f9e6f..ca700dc 100644
--- a/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala
+++ b/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala
@@ -18,13 +18,14 @@
 package io.gearpump.cluster.worker
 
 import akka.actor.ActorRef
+import io.gearpump.WorkerId
 import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
 
 /**
  * Worker summary information for REST API.
  */
 case class WorkerSummary(
-  workerId: Int,
+  workerId: WorkerId,
   state: String,
   actorPath: String,
   aliveFor: Long,
@@ -39,7 +40,7 @@ case class WorkerSummary(
   historyMetricsConfig: HistoryMetricsConfig = null)
 
 object WorkerSummary{
-  def empty = WorkerSummary(-1, "", "", 0L, "", Array.empty[ExecutorSlots], 0, 0, "", jvmName = "", resourceManagerContainerId = "")
+  def empty = WorkerSummary(WorkerId.unspecified, "", "", 0L, "", Array.empty[ExecutorSlots], 0, 0, "", jvmName = "", resourceManagerContainerId = "")
 }
 
 case class ExecutorSlots(appId: Int, executorId: Int, slots: Int)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/core/src/main/scala/io/gearpump/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/package.scala b/core/src/main/scala/io/gearpump/package.scala
index 05130a9..1ed94a7 100644
--- a/core/src/main/scala/io/gearpump/package.scala
+++ b/core/src/main/scala/io/gearpump/package.scala
@@ -3,4 +3,48 @@ package io
 package object gearpump {
   type TimeStamp = Long
   val LatestTime = -1
+
+  /**
+   * WorkerId is used to uniquely track a worker machine.
+   *
+   * @param sessionId sessionId is assigned by Master node for easy tracking. It is possible that
+   *                  sessionId is **NOT** unique, so always use WorkerId for comparison.
+   * @param registerTime the timestamp when a worker node register itself to master node
+   */
+  case class WorkerId(sessionId: Int, registerTime: Long)
+
+  object WorkerId {
+    val unspecified: WorkerId = new WorkerId(-1, 0L)
+
+    def render(workerId: WorkerId): String = {
+      workerId.registerTime + "_" + workerId.sessionId
+    }
+
+    def parse(str: String): WorkerId = {
+      val pair = str.split("_")
+      new WorkerId(pair(1).toInt, pair(0).toLong)
+    }
+
+    implicit val workerIdOrdering: Ordering[WorkerId] = {
+      new Ordering[WorkerId] {
+
+        /** Compare timestamp first, then id */
+        override def compare(x: WorkerId, y: WorkerId): Int = {
+          if (x.registerTime < y.registerTime) {
+            -1
+          } else if (x.registerTime == y.registerTime) {
+            if (x.sessionId < y.sessionId) {
+              -1
+            } else if (x.sessionId == y.sessionId) {
+              0
+            } else {
+              1
+            }
+          } else {
+            1
+          }
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/core/src/main/scala/io/gearpump/util/ActorUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/ActorUtil.scala b/core/src/main/scala/io/gearpump/util/ActorUtil.scala
index 8233b28..cc701d8 100644
--- a/core/src/main/scala/io/gearpump/util/ActorUtil.scala
+++ b/core/src/main/scala/io/gearpump/util/ActorUtil.scala
@@ -21,6 +21,7 @@ package io.gearpump.util
 import akka.actor.Actor.Receive
 import akka.actor._
 import akka.pattern.ask
+import io.gearpump.WorkerId
 import io.gearpump.cluster.AppMasterToMaster.GetAllWorkers
 import io.gearpump.cluster.ClientToMaster.{ResolveAppId, ResolveWorkerId}
 import io.gearpump.cluster.MasterToAppMaster.WorkerList
@@ -110,7 +111,7 @@ object ActorUtil {
     appmaster.flatMap(askActor[T](_, msg))
   }
 
-  def askWorker[T](master: ActorRef, workerId: Int, msg: Any)(implicit ex: ExecutionContext): Future[T] = {
+  def askWorker[T](master: ActorRef, workerId: WorkerId, msg: Any)(implicit ex: ExecutionContext): Future[T] = {
     implicit val timeout = Constants.FUTURE_TIMEOUT
     val worker =  askActor[ResolveWorkerIdResult](master, ResolveWorkerId(workerId)).flatMap { result =>
       if (result.worker.isSuccess) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala b/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala
index d0fded1..dc8e624 100644
--- a/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala
+++ b/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala
@@ -21,6 +21,7 @@ package io.gearpump.cluster.appmaster
 import akka.actor.{ActorSystem, Props}
 import akka.testkit.TestProbe
 import com.typesafe.config.ConfigValueFactory
+import io.gearpump.WorkerId
 import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor
 import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor
 import io.gearpump.cluster.TestUtil
@@ -36,7 +37,7 @@ import scala.concurrent.duration._
 
 class ExecutorSystemLauncherSpec  extends FlatSpec with Matchers with BeforeAndAfterAll {
   implicit var system: ActorSystem = null
-  val workerId = 0
+  val workerId: WorkerId = WorkerId(0, 0L)
   val appId = 0
   val executorId = 0
   val url = "akka.tcp://worker@127.0.0.1:3000"

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala b/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala
index d487582..106d389 100644
--- a/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala
+++ b/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala
@@ -20,6 +20,7 @@ package io.gearpump.cluster.appmaster
 
 import akka.actor.{Actor, ActorSystem, Props}
 import akka.testkit.TestProbe
+import io.gearpump.WorkerId
 import io.gearpump.cluster.AppMasterToMaster.RequestResource
 import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated
 import io.gearpump.cluster.{ExecutorJVMConfig, AppJar, TestUtil}
@@ -34,9 +35,9 @@ import scala.concurrent.duration._
 
 class ExecutorSystemSchedulerSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
   val appId = 0
-  val workerId = 0
+  val workerId = WorkerId(0, 0L)
   val resource = Resource(1)
-  val resourceRequest = ResourceRequest(resource)
+  val resourceRequest = ResourceRequest(resource, WorkerId.unspecified)
   val mockJar = AppJar("for_test", FilePath("PATH"))
   val emptyJvmConfig = ExecutorSystemJvmConfig(Array.empty, Array.empty, Some(mockJar), "")
   val start = StartExecutorSystems(Array(resourceRequest), emptyJvmConfig)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala b/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala
index 8db0e0b..8ff2ea1 100644
--- a/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala
+++ b/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala
@@ -20,6 +20,7 @@ package io.gearpump.cluster.master
 
 import akka.actor._
 import akka.testkit.TestProbe
+import io.gearpump.WorkerId
 import io.gearpump.cluster.AppMasterToMaster.RequestResource
 import io.gearpump.cluster.AppMasterToWorker.{ShutdownExecutor, LaunchExecutor}
 import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
@@ -53,8 +54,8 @@ class AppMasterLauncherSpec extends FlatSpec with Matchers with BeforeAndAfterEa
     appMasterLauncher = getActorSystem.actorOf(AppMasterLauncher.props(appId, executorId,
       TestUtil.dummyApp, None, "username", master.ref, Some(client.ref)))
     watcher watch appMasterLauncher
-    master.expectMsg(RequestResource(appId, ResourceRequest(Resource(1))))
-    val resource = ResourceAllocated(Array(ResourceAllocation(Resource(1), worker.ref, 0)))
+    master.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), WorkerId.unspecified)))
+    val resource = ResourceAllocated(Array(ResourceAllocation(Resource(1), worker.ref, WorkerId(0, 0L))))
     master.reply(resource)
     worker.expectMsgType[LaunchExecutor]
   }
@@ -76,9 +77,9 @@ class AppMasterLauncherSpec extends FlatSpec with Matchers with BeforeAndAfterEa
 
   "AppMasterLauncher" should "reallocate resource if executor launch rejected" in {
     worker.reply(ExecutorLaunchRejected(""))
-    master.expectMsg(RequestResource(appId, ResourceRequest(Resource(1))))
+    master.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), WorkerId.unspecified)))
 
-    val resource = ResourceAllocated(Array(ResourceAllocation(Resource(1), worker.ref, 0)))
+    val resource = ResourceAllocated(Array(ResourceAllocation(Resource(1), worker.ref, WorkerId(0, 0L))))
     master.reply(resource)
     worker.expectMsgType[LaunchExecutor]
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala b/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala
index 19ac620..ac942ed 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala
@@ -18,6 +18,7 @@
 package io.gearpump.cluster
 
 import akka.actor.ActorRef
+import io.gearpump.WorkerId
 import io.gearpump.cluster.master.Master.MasterInfo
 import io.gearpump.cluster.scheduler.Resource
 
@@ -26,12 +27,12 @@ import io.gearpump.cluster.scheduler.Resource
  */
 object WorkerToMaster {
   case object RegisterNewWorker
-  case class RegisterWorker(workerId: Int)
-  case class ResourceUpdate(worker: ActorRef, workerId: Int, resource: Resource)
+  case class RegisterWorker(workerId: WorkerId)
+  case class ResourceUpdate(worker: ActorRef, workerId: WorkerId, resource: Resource)
 }
 
 object MasterToWorker {
-  case class WorkerRegistered(workerId : Int, masterInfo: MasterInfo)
+  case class WorkerRegistered(workerId : WorkerId, masterInfo: MasterInfo)
   case class UpdateResourceFailed(reason : String = null, ex: Throwable = null)
   case object UpdateResourceSucceed
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala b/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala
index 8db6758..fdef42e 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala
@@ -40,6 +40,8 @@ import io.gearpump.transport.HostPort
 import io.gearpump.util.Constants._
 import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
 import io.gearpump.util._
+import io.gearpump.WorkerId
+
 import org.apache.commons.lang.exception.ExceptionUtils
 import org.slf4j.Logger
 
@@ -64,7 +66,7 @@ private[cluster] class Master extends Actor with Stash {
 
   private var scheduler : ActorRef = null
 
-  private var workers = new immutable.HashMap[ActorRef, Int]
+  private var workers = new immutable.HashMap[ActorRef, WorkerId]
 
   private val birth = System.currentTimeMillis()
 
@@ -109,6 +111,8 @@ private[cluster] class Master extends Actor with Stash {
     case GetKVSuccess(_, result) =>
       if(result != null) {
         this.nextWorkerId = result.asInstanceOf[Int]
+      } else {
+        LOG.warn("Cannot find existing state in the distributed cluster...")
       }
       context.become(receiveHandler)
       unstashAll()
@@ -132,9 +136,11 @@ private[cluster] class Master extends Actor with Stash {
 
   def workerMsgHandler : Receive = {
     case RegisterNewWorker =>
-      val workerId = nextWorkerId
+      val workerId = WorkerId(nextWorkerId, System.currentTimeMillis())
       nextWorkerId += 1
       kvService ! PutKV(MASTER_GROUP, WORKER_ID, nextWorkerId)
+      val workerHostname = ActorUtil.getHostname(sender())
+      LOG.info(s"Register new from $workerHostname ....")
       self forward RegisterWorker(workerId)
 
     case RegisterWorker(id) =>
@@ -143,7 +149,7 @@ private[cluster] class Master extends Actor with Stash {
       scheduler forward WorkerRegistered(id, MasterInfo(self, birth))
       workers += (sender() -> id)
       val workerHostname = ActorUtil.getHostname(sender())
-      LOG.info(s"Register Worker $id from $workerHostname ....")
+      LOG.info(s"Register Worker with id $id from $workerHostname ....")
     case resourceUpdate : ResourceUpdate =>
       scheduler forward resourceUpdate
   }
@@ -290,7 +296,7 @@ object Master {
 
   final val WORKER_ID = "next_worker_id"
 
-  case class WorkerTerminated(workerId: Int)
+  case class WorkerTerminated(workerId: WorkerId)
 
   case class MasterInfo(master: ActorRef, startTime : Long = 0L)
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala b/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala
index 10e9dcb..3b1bd9f 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala
@@ -19,6 +19,7 @@
 package io.gearpump.cluster.scheduler
 
 import akka.actor.ActorRef
+import io.gearpump.WorkerId
 import io.gearpump.cluster.AppMasterToMaster.RequestResource
 import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated
 import io.gearpump.cluster.scheduler.Relaxation._
@@ -104,7 +105,7 @@ class PriorityScheduler extends Scheduler {
     resourceRequests = resourceRequests.filter(_.appId != appId)
   }
 
-  private def allocateFairly(resources: mutable.HashMap[Int, (ActorRef, Resource)], request: ResourceRequest): List[ResourceAllocation] = {
+  private def allocateFairly(resources: mutable.HashMap[WorkerId, (ActorRef, Resource)], request: ResourceRequest): List[ResourceAllocation] = {
     val workerNum = resources.size
     var allocations = List.empty[ResourceAllocation]
     var totalAvailable = Resource(resources.values.map(_._2.slots).sum)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/daemon/src/main/scala/io/gearpump/cluster/scheduler/Scheduler.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/scheduler/Scheduler.scala b/daemon/src/main/scala/io/gearpump/cluster/scheduler/Scheduler.scala
index 36b900f..8ccf1fb 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/scheduler/Scheduler.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/scheduler/Scheduler.scala
@@ -20,7 +20,7 @@ package io.gearpump.cluster.scheduler
 import akka.actor.{Actor, ActorRef}
 import io.gearpump.cluster.MasterToWorker.UpdateResourceSucceed
 import io.gearpump.util.LogUtil
-import io.gearpump.TimeStamp
+import io.gearpump.{WorkerId, TimeStamp}
 import io.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, UpdateResourceSucceed, WorkerRegistered}
 import io.gearpump.cluster.WorkerToMaster.ResourceUpdate
 import io.gearpump.cluster.master.Master.WorkerTerminated
@@ -35,7 +35,7 @@ import scala.collection.mutable
  */
 abstract class Scheduler extends Actor{
   val LOG: Logger = LogUtil.getLogger(getClass)
-  protected var resources = new mutable.HashMap[Int, (ActorRef, Resource)]
+  protected var resources = new mutable.HashMap[WorkerId, (ActorRef, Resource)]
 
   def handleScheduleMessage : Receive = {
     case WorkerRegistered(id, _) =>

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala b/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala
index 8957dff..a746b39 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala
@@ -26,6 +26,7 @@ import java.util.concurrent.{Executors, TimeUnit}
 import akka.actor.SupervisorStrategy.Stop
 import akka.actor._
 import com.typesafe.config.{Config, ConfigFactory}
+import io.gearpump.WorkerId
 import io.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData}
 import io.gearpump.cluster.AppMasterToWorker._
 import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig}
@@ -63,7 +64,7 @@ private[cluster] class Worker(masterProxy : ActorRef) extends Actor with TimeOut
   private var resource = Resource.empty
   private var allocatedResources = Map[ActorRef, Resource]()
   private var executorsInfo = Map[ActorRef, ExecutorSlots]()
-  private var id = -1
+  private var id: WorkerId = WorkerId.unspecified
   private val createdTime = System.currentTimeMillis()
   private var masterInfo: MasterInfo = null
   private var executorNameToActor = Map.empty[String, ActorRef]

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/daemon/src/test/scala/io/gearpump/cluster/main/MainSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/io/gearpump/cluster/main/MainSpec.scala b/daemon/src/test/scala/io/gearpump/cluster/main/MainSpec.scala
index d412020..9dc0289 100644
--- a/daemon/src/test/scala/io/gearpump/cluster/main/MainSpec.scala
+++ b/daemon/src/test/scala/io/gearpump/cluster/main/MainSpec.scala
@@ -17,23 +17,12 @@
  */
 package io.gearpump.cluster.main
 
-import java.util.concurrent.TimeUnit
-
-import akka.testkit.TestProbe
-import io.gearpump.cluster.ClientToMaster.ResolveAppId
 import io.gearpump.cluster.MasterToAppMaster.{ReplayFromTimestampWindowTrailingEdge, AppMastersDataRequest}
-import io.gearpump.cluster.TestUtil
-import io.gearpump.cluster.main.{Local, Replay, Kill, Worker}
-import io.gearpump.cluster.master.MasterProxy
-import io.gearpump.util.{Constants, Util}
 import io.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication}
 import io.gearpump.cluster.MasterToAppMaster._
 import io.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ReplayApplicationResult, ShutdownApplicationResult}
-import io.gearpump.cluster.MasterToWorker.WorkerRegistered
 import io.gearpump.cluster.WorkerToMaster.RegisterNewWorker
-import io.gearpump.cluster.master.MasterProxy
 import io.gearpump.cluster.{MasterHarness, TestUtil}
-import io.gearpump.transport.HostPort
 import io.gearpump.util.Constants._
 import io.gearpump.util.{Constants, LogUtil, Util}
 import org.scalatest._

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala b/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
index 0a7e1f8..aa2028b 100644
--- a/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
+++ b/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
@@ -19,6 +19,7 @@ package io.gearpump.cluster.scheduler
 
 import akka.actor.{ActorSystem, Props}
 import akka.testkit.{ImplicitSender, TestKit, TestProbe}
+import io.gearpump.WorkerId
 import io.gearpump.cluster.AppMasterToMaster.RequestResource
 import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated
 import io.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered}
@@ -36,8 +37,8 @@ class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with
 
   def this() = this(ActorSystem("PrioritySchedulerSpec",  TestUtil.DEFAULT_CONFIG))
   val appId = 0
-  val workerId1 = 1
-  val workerId2 = 2
+  val workerId1: WorkerId = WorkerId(1, 0L)
+  val workerId2: WorkerId = WorkerId(2, 0L)
   val mockAppMaster = TestProbe()
   val mockWorker1 = TestProbe()
   val mockWorker2 = TestProbe()
@@ -55,8 +56,8 @@ class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with
 
     "drop application's resource requests when the application is removed" in {
       val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
-      val request1 = ResourceRequest(Resource(40), 0, Priority.HIGH, Relaxation.ANY)
-      val request2 = ResourceRequest(Resource(20), 0, Priority.HIGH, Relaxation.ANY)
+      val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, Priority.HIGH, Relaxation.ANY)
+      val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, Priority.HIGH, Relaxation.ANY)
       scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
       scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
       scheduler.tell(ApplicationFinished(appId), mockAppMaster.ref)
@@ -69,9 +70,9 @@ class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with
   "The resource request with higher priority" should {
     "be handled first" in {
       val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
-      val request1 = ResourceRequest(Resource(40), 0, Priority.LOW, Relaxation.ANY)
-      val request2 = ResourceRequest(Resource(20), 0, Priority.NORMAL, Relaxation.ANY)
-      val request3 = ResourceRequest(Resource(30), 0, Priority.HIGH, Relaxation.ANY)
+      val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, Priority.LOW, Relaxation.ANY)
+      val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, Priority.NORMAL, Relaxation.ANY)
+      val request3 = ResourceRequest(Resource(30), WorkerId.unspecified, Priority.HIGH, Relaxation.ANY)
 
       scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
       scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
@@ -94,8 +95,8 @@ class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with
   "The resource request which delivered earlier" should {
     "be handled first if the priorities are the same" in {
       val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
-      val request1 = ResourceRequest(Resource(40), 0, Priority.HIGH, Relaxation.ANY)
-      val request2 = ResourceRequest(Resource(20), 0, Priority.HIGH, Relaxation.ANY)
+      val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, Priority.HIGH, Relaxation.ANY)
+      val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, Priority.HIGH, Relaxation.ANY)
       scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
       scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
       scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
@@ -122,14 +123,14 @@ class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with
       scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
       mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2))))
 
-      val request3 = ResourceRequest(Resource(30), 0, Priority.NORMAL, Relaxation.ANY, executorNum = 2)
+      val request3 = ResourceRequest(Resource(30), WorkerId.unspecified, Priority.NORMAL, Relaxation.ANY, executorNum = 2)
       scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
       mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(15), mockWorker1.ref, workerId1), ResourceAllocation(Resource(15), mockWorker2.ref, workerId2))))
 
       //we have to manually update the resource on each worker
       scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(65)), mockWorker1.ref)
       scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(45)), mockWorker2.ref)
-      val request4 = ResourceRequest(Resource(60), 0, Priority.NORMAL, Relaxation.ONEWORKER)
+      val request4 = ResourceRequest(Resource(60), WorkerId(0, 0L), Priority.NORMAL, Relaxation.ONEWORKER)
       scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref)
       mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(60), mockWorker1.ref, workerId1))))
     }
@@ -144,13 +145,13 @@ class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with
       scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
 
       //By default, the request requires only one executor
-      val request2 = ResourceRequest(Resource(20))
+      val request2 = ResourceRequest(Resource(20), WorkerId.unspecified)
       scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
       val allocations2 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
       assert(allocations2.allocations.length == 1)
       assert(allocations2.allocations.head.resource == Resource(20))
 
-      val request3 = ResourceRequest(Resource(24), executorNum = 3)
+      val request3 = ResourceRequest(Resource(24), WorkerId.unspecified, executorNum = 3)
       scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
       val allocations3 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
       assert(allocations3.allocations.length == 3)
@@ -159,7 +160,7 @@ class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with
       //The total available resource can not satisfy the requirements with executor number
       scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(30)), mockWorker1.ref)
       scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(30)), mockWorker2.ref)
-      val request4 = ResourceRequest(Resource(60), executorNum = 3)
+      val request4 = ResourceRequest(Resource(60), WorkerId.unspecified, executorNum = 3)
       scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref)
       val allocations4 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
       assert(allocations4.allocations.length == 2)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/daemon/src/test/scala/io/gearpump/cluster/worker/WorkerSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/io/gearpump/cluster/worker/WorkerSpec.scala b/daemon/src/test/scala/io/gearpump/cluster/worker/WorkerSpec.scala
index ae73a3b..9fc4096 100644
--- a/daemon/src/test/scala/io/gearpump/cluster/worker/WorkerSpec.scala
+++ b/daemon/src/test/scala/io/gearpump/cluster/worker/WorkerSpec.scala
@@ -20,6 +20,7 @@ package io.gearpump.cluster.worker
 import akka.actor.{ActorSystem, PoisonPill, Props}
 import akka.testkit.TestProbe
 import com.typesafe.config.ConfigFactory
+import io.gearpump.WorkerId
 import io.gearpump.cluster.WorkerToMaster.RegisterNewWorker
 import io.gearpump.cluster.{TestUtil, MasterHarness}
 import io.gearpump.util.{ActorSystemBooter, ActorUtil, Constants}
@@ -40,7 +41,7 @@ class WorkerSpec extends WordSpec with Matchers with BeforeAndAfterEach with Mas
   override def config = TestUtil.DEFAULT_CONFIG
 
   val appId = 1
-  val workerId = 1
+  val workerId: WorkerId = WorkerId(1, 0L)
   val executorId = 1
   var masterProxy: TestProbe = null
   var mockMaster: TestProbe = null

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/docs/dev-rest-api.md
----------------------------------------------------------------------
diff --git a/docs/dev-rest-api.md b/docs/dev-rest-api.md
index 7fc718a..7b2bb6f 100644
--- a/docs/dev-rest-api.md
+++ b/docs/dev-rest-api.md
@@ -165,7 +165,7 @@ Sample Response:
 ```
 [
   {
-    "workerId": 1,
+    "workerId": "1",
     "state": "active",
     "actorPath": "akka.tcp://master@127.0.0.1:3000/user/Worker0",
     "aliveFor": "431565",
@@ -188,7 +188,7 @@ Sample Response:
     "jvmName": "11788@lisa"
   },
   {
-    "workerId": 0,
+    "workerId": "0",
     "state": "active",
     "actorPath": "akka.tcp://master@127.0.0.1:3000/user/Worker1",
     "aliveFor": "431546",
@@ -397,7 +397,7 @@ Sample Response:
 
 ```
 {
-  "workerId": 0,
+  "workerId": "0",
   "state": "active",
   "actorPath": "akka.tcp://master@127.0.0.1:3000/user/Worker1",
   "aliveFor": "831069",
@@ -738,19 +738,19 @@ Sample Response:
     {
       "executorId": 0,
       "executor": "akka.tcp://app1system0@127.0.0.1:52240/remote/akka.tcp/app1-executor-1@127.0.0.1:52212/user/daemon/appdaemon1/$c/appmaster/executors/0#-1554950276",
-      "workerId": 1,
+      "workerId": "1",
       "status": "active"
     },
     {
       "executorId": 1,
       "executor": "akka.tcp://app1system1@127.0.0.1:52241/remote/akka.tcp/app1-executor-1@127.0.0.1:52212/user/daemon/appdaemon1/$c/appmaster/executors/1#928082134",
-      "workerId": 0,
+      "workerId": "0",
       "status": "active"
     },
     {
       "executorId": -1,
       "executor": "akka://app1-executor-1/user/daemon/appdaemon1/$c/appmaster",
-      "workerId": 1,
+      "workerId": "1",
       "status": "active"
     }
   ],
@@ -1087,7 +1087,7 @@ Sample Response:
 ```
 {
   "id": 1,
-  "workerId": 0,
+  "workerId": "0",
   "actorPath": "akka.tcp://app1system1@127.0.0.1:52241/remote/akka.tcp/app1-executor-1@127.0.0.1:52212/user/daemon/appdaemon1/$c/appmaster/executors/1",
   "logFile": "logs/",
   "status": "active",

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala
----------------------------------------------------------------------
diff --git a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala b/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala
index 2a19bd2..ec6bba1 100644
--- a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala
+++ b/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala
@@ -19,6 +19,7 @@ package io.gearpump.examples.distributedshell
 
 import akka.actor.ActorSystem
 import akka.testkit.{TestActorRef, TestProbe}
+import io.gearpump.WorkerId
 import io.gearpump.cluster.AppMasterToMaster.{RequestResource, GetAllWorkers, RegisterAppMaster}
 import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor
 import io.gearpump.cluster.MasterToAppMaster.{ResourceAllocated, WorkerList, AppMasterRegistered}
@@ -37,7 +38,7 @@ class DistShellAppMasterSpec extends WordSpec with Matchers with BeforeAndAfter{
   val appId = 0
   val userName = "test"
   val masterExecutorId = 0
-  val workerList = List(1, 2, 3)
+  val workerList = List(WorkerId(1, 0L), WorkerId(2, 0L), WorkerId(3, 0L))
   val resource = Resource(1)
   val appJar = None
   val appDescription = AppDescription("app0", classOf[DistShellAppMaster].getName, UserConfig.empty)
@@ -57,7 +58,7 @@ class DistShellAppMasterSpec extends WordSpec with Matchers with BeforeAndAfter{
       workerList.foreach { workerId =>
         mockMaster.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), workerId, relaxation = Relaxation.SPECIFICWORKER)))
       }
-      mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(resource, mockWorker1.ref, 1))))
+      mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(resource, mockWorker1.ref, WorkerId(1, 0L)))))
       mockWorker1.expectMsgClass(classOf[LaunchExecutor])
       mockWorker1.reply(RegisterActorSystem(ActorUtil.getSystemAddress(system).toString))
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellExecutorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellExecutorSpec.scala b/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellExecutorSpec.scala
index 6eceb16..d51880b 100644
--- a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellExecutorSpec.scala
+++ b/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellExecutorSpec.scala
@@ -19,6 +19,7 @@ package io.gearpump.examples.distributedshell
 
 import akka.actor.{ActorSystem, Props}
 import akka.testkit.TestProbe
+import io.gearpump.WorkerId
 import io.gearpump.examples.distributedshell.DistShellAppMaster.ShellCommandResult
 import io.gearpump.cluster.appmaster.WorkerInfo
 import io.gearpump.cluster.scheduler.Resource
@@ -34,7 +35,7 @@ class ShellExecutorSpec extends WordSpec with Matchers {
   "ShellExecutor" should {
     "execute the shell command and return the result" in {
       val executorId = 1
-      val workerId = 2
+      val workerId = WorkerId(2, 0L)
       val appId = 0
       val appName = "app"
       val resource = Resource(1)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala
----------------------------------------------------------------------
diff --git a/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala b/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala
index da5001e..fcdbf14 100644
--- a/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala
+++ b/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala
@@ -19,6 +19,7 @@ package io.gearpump.experiments.distributeservice
 
 import akka.actor.ActorSystem
 import akka.testkit.{TestActorRef, TestProbe}
+import io.gearpump.WorkerId
 import io.gearpump.cluster.AppMasterToMaster.{RequestResource, GetAllWorkers, RegisterAppMaster}
 import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor
 import io.gearpump.cluster.MasterToAppMaster.{ResourceAllocated, WorkerList, AppMasterRegistered}
@@ -41,7 +42,7 @@ class DistServiceAppMasterSpec extends WordSpec with Matchers with BeforeAndAfte
   val appId = 0
   val userName = "test"
   val masterExecutorId = 0
-  val workerList = List(1, 2, 3)
+  val workerList = List(WorkerId(1, 0L), WorkerId(2, 0L), WorkerId(3, 0L))
   val resource = Resource(1)
   val appJar = None
   val appDescription = AppDescription("app0", classOf[DistServiceAppMaster].getName, UserConfig.empty)
@@ -64,7 +65,7 @@ class DistServiceAppMasterSpec extends WordSpec with Matchers with BeforeAndAfte
       workerList.foreach { workerId =>
         mockMaster.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), workerId, relaxation = Relaxation.SPECIFICWORKER)))
       }
-      mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(resource, mockWorker1.ref, 1))))
+      mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(resource, mockWorker1.ref, WorkerId(1, 0L)))))
       mockWorker1.expectMsgClass(classOf[LaunchExecutor])
       mockWorker1.reply(RegisterActorSystem(ActorUtil.getSystemAddress(system).toString))
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala
index 2830390..92ed3ec 100644
--- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala
+++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala
@@ -183,7 +183,7 @@ class RestServiceSpec extends TestSpecBase {
     "retrieve 1 master for a non-HA cluster" in {
       // exercise
       val masterSummary = restClient.queryMaster()
-      masterSummary.cluster shouldEqual cluster.getMastersAddresses
+      masterSummary.cluster.map(_.toTuple) shouldEqual cluster.getMastersAddresses
       masterSummary.aliveFor should be > 0L
       masterSummary.masterStatus shouldEqual MasterStatus.Synced
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala
index 1d7ddd9..9d7e39f 100644
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala
+++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala
@@ -25,7 +25,7 @@ import scala.sys.process._
  * A helper to instantiate the base image for different usage.
  */
 class BaseContainer(val host: String, command: String,
-                    masterAddrs: Seq[(String, Int)],
+                    masterAddrs: List[(String, Int)],
                     tunnelPorts: Set[Int] = Set.empty) {
 
   private val IMAGE_NAME = "stanleyxu2005/gearpump-launcher"

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala
index 7c9a69a..c17a20d 100644
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala
+++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala
@@ -17,6 +17,7 @@
  */
 package io.gearpump.integrationtest.minicluster
 
+import io.gearpump.cluster.master.MasterNode
 import io.gearpump.integrationtest.{Docker, Util}
 import org.apache.log4j.Logger
 
@@ -32,15 +33,15 @@ class MiniCluster {
 
   private val REST_SERVICE_PORT = 8090
   private val MASTER_PORT = 3000
-  private val MASTER_ADDRS = {
+  private val MASTER_ADDRS: List[(String, Int)] = {
     (0 to 0).map(index =>
       ("master" + index, MASTER_PORT)
-    )
+    ).toList
   }
 
-  lazy val commandLineClient = new CommandLineClient(getMasterHosts.head)
+  lazy val commandLineClient: CommandLineClient = new CommandLineClient(getMasterHosts.head)
 
-  lazy val restClient = {
+  lazy val restClient: RestClient = {
     val client = new RestClient(getMasterHosts.head, REST_SERVICE_PORT)
     client
   }
@@ -129,16 +130,16 @@ class MiniCluster {
     start()
   }
 
-  def getMastersAddresses = {
+  def getMastersAddresses: List[(String, Int)] = {
     MASTER_ADDRS
   }
 
-  def getMasterHosts = {
+  def getMasterHosts: List[String] = {
     MASTER_ADDRS.map({ case (host, port) => host })
   }
 
-  def getWorkerHosts = {
-    workers
+  def getWorkerHosts: List[String] = {
+    workers.toList
   }
 
   def nodeIsOnline(host: String): Boolean = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
index f11ff9c..7d83c93 100644
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
+++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
@@ -18,6 +18,7 @@
 package io.gearpump.integrationtest.minicluster
 
 import com.typesafe.config.{Config, ConfigFactory}
+import io.gearpump.WorkerId
 import io.gearpump.cluster.{AppJar, MasterToAppMaster}
 import io.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersData}
 import io.gearpump.cluster.MasterToClient.HistoryMetrics
@@ -36,6 +37,7 @@ import io.gearpump.util.{Constants, Graph}
 import org.apache.log4j.Logger
 import upickle.Js
 import upickle.default._
+import io.gearpump.services.util.UpickleUtil._
 
 /**
  * A REST client to operate a Gearpump cluster
@@ -168,14 +170,14 @@ class RestClient(host: String, port: Int) {
     ConfigFactory.parseString(resp)
   }
 
-  def queryWorkerMetrics(workerId: Int, current: Boolean): HistoryMetrics = {
+  def queryWorkerMetrics(workerId: WorkerId, current: Boolean): HistoryMetrics = {
     val args = if (current) "?readLatest=true" else ""
-    val resp = callApi(s"worker/$workerId/metrics/worker$workerId?$args")
+    val resp = callApi(s"worker/${WorkerId.render(workerId)}/metrics/worker$workerId?$args")
     decodeAs[HistoryMetrics](resp)
   }
 
-  def queryWorkerConfig(workerId: Int): Config = {
-    val resp = callApi(s"worker/$workerId/config")
+  def queryWorkerConfig(workerId: WorkerId): Config = {
+    val resp = callApi(s"worker/${WorkerId.render(workerId)}/config")
     ConfigFactory.parseString(resp)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
index a018280..dbcc53d 100644
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
+++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
@@ -22,7 +22,7 @@ import backtype.storm.utils.{Utils, DRPCClient}
 import io.gearpump.integrationtest.{Util, Docker}
 import io.gearpump.integrationtest.minicluster.{RestClient, BaseContainer}
 
-class StormClient(masterAddrs: Seq[(String, Int)], restClient: RestClient) {
+class StormClient(masterAddrs: List[(String, Int)], restClient: RestClient) {
 
   private val CONFIG_FILE = "/opt/gearpump/storm.yaml"
   private val DRPC_HOST = "storm0"

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/project/Build.scala
----------------------------------------------------------------------
diff --git a/project/Build.scala b/project/Build.scala
index a99530f..b521e53 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -317,8 +317,8 @@ object Build extends sbt.Build {
     checksums := Seq(""),
     requiresDOM := true,
     libraryDependencies ++= Seq(
-      "com.lihaoyi" %%% "upickle" % upickleVersion,
-      "com.lihaoyi" %%% "utest" % "0.3.1"
+//      "com.lihaoyi" %%% "upickle" % upickleVersion,
+//      "com.lihaoyi" %%% "utest" % "0.3.1"
     ),
     scalaJSStage in Global := FastOptStage,
     testFrameworks += new TestFramework("utest.runner.Framework"),

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/main/scala/io/gearpump/services/AdminService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/AdminService.scala b/services/jvm/src/main/scala/io/gearpump/services/AdminService.scala
index b159a19..aa88d13 100644
--- a/services/jvm/src/main/scala/io/gearpump/services/AdminService.scala
+++ b/services/jvm/src/main/scala/io/gearpump/services/AdminService.scala
@@ -23,7 +23,7 @@ import akka.http.scaladsl.model._
 import akka.http.scaladsl.server.Directives._
 import akka.stream.{Materializer}
 import io.gearpump.util.{Constants, Util}
-
+import io.gearpump.services.util.UpickleUtil._
 /**
  * AdminService is for cluster-wide managements. it is not related with
  * specific application.

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala b/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala
index 98ce0b1..30ec061 100644
--- a/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala
+++ b/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala
@@ -39,7 +39,7 @@ import io.gearpump.util.ActorUtil.{askActor, askAppMaster}
 import io.gearpump.util.FileDirective._
 import io.gearpump.util.{Constants, Util}
 import upickle.default.{read, write}
-
+import io.gearpump.services.util.UpickleUtil._
 import scala.util.{Failure, Success, Try}
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
index 3e2b9ca..91f25fe 100644
--- a/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
+++ b/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
@@ -44,6 +44,7 @@ import io.gearpump.streaming.{ProcessorDescription, ProcessorId, StreamApplicati
 import io.gearpump.util.ActorUtil._
 import io.gearpump.util.FileDirective._
 import io.gearpump.util.{Constants, Graph, Util}
+import io.gearpump.services.util.UpickleUtil._
 
 import scala.collection.JavaConversions._
 import scala.concurrent.Future
@@ -154,7 +155,6 @@ class MasterService(val master: ActorRef,
     path("submitdag") {
       post {
         entity(as[String]) { request =>
-          import io.gearpump.services.util.UpickleUtil._
           val msg = java.net.URLDecoder.decode(request, "UTF-8")
           val submitApplicationRequest = read[SubmitApplicationRequest](msg)
           import submitApplicationRequest.{appName, dag, processors, userconfig}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/main/scala/io/gearpump/services/SecurityService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/SecurityService.scala b/services/jvm/src/main/scala/io/gearpump/services/SecurityService.scala
index 8e03c43..a5bbe2f 100644
--- a/services/jvm/src/main/scala/io/gearpump/services/SecurityService.scala
+++ b/services/jvm/src/main/scala/io/gearpump/services/SecurityService.scala
@@ -34,10 +34,12 @@ import io.gearpump.services.SecurityService.{User, UserSession}
 import io.gearpump.services.security.oauth2.OAuth2Authenticator
 import io.gearpump.util.{Constants, LogUtil}
 import upickle.default.{write}
+import io.gearpump.services.util.UpickleUtil._
 import io.gearpump.security.{Authenticator => BaseAuthenticator}
 import scala.concurrent.{ExecutionContext, Future}
 import scala.util.{Failure, Success}
 
+
 /**
  * When user cannot be authenticated, will reject with 401 AuthenticationFailedRejection
  * When user can be authenticated, but are not authorized to access certail resource, will

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/main/scala/io/gearpump/services/StaticService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/StaticService.scala b/services/jvm/src/main/scala/io/gearpump/services/StaticService.scala
index b38e5cb..09180c3 100644
--- a/services/jvm/src/main/scala/io/gearpump/services/StaticService.scala
+++ b/services/jvm/src/main/scala/io/gearpump/services/StaticService.scala
@@ -23,6 +23,7 @@ import akka.http.scaladsl.model._
 import akka.http.scaladsl.server.Directives._
 import akka.stream.{Materializer}
 import io.gearpump.util.{Constants, Util}
+import io.gearpump.services.util.UpickleUtil._
 
 /**
  * static resource files.

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala b/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala
index 9ecd078..5d552d6 100644
--- a/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala
+++ b/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala
@@ -22,6 +22,7 @@ import akka.actor.{ActorRef, ActorSystem}
 import akka.http.scaladsl.server.Directives._
 import akka.http.scaladsl.server.Route
 import akka.stream.Materializer
+import io.gearpump.WorkerId
 import io.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData, GetAllWorkers}
 import io.gearpump.cluster.ClientToMaster._
 import io.gearpump.services.SupervisorService.{Path, Status}
@@ -30,6 +31,7 @@ import io.gearpump.util.ActorUtil._
 import scala.concurrent.Future
 import scala.util.{Failure, Success}
 import upickle.default.{read, write}
+import io.gearpump.services.util.UpickleUtil._
 
 class SupervisorService(val master: ActorRef, val supervisor: ActorRef, override val system: ActorSystem)
   extends BasicService {
@@ -80,10 +82,10 @@ class SupervisorService(val master: ActorRef, val supervisor: ActorRef, override
         }
       }
     } ~
-    path("removeworker" / IntNumber) { workerId =>
+    path("removeworker" / Segment) { workerIdString =>
       post {
         authorize {
-
+          val workerId = WorkerId.parse(workerIdString)
           def future(): Future[CommandResult] = {
             askWorker[WorkerData](master, workerId, GetWorkerData(workerId)).flatMap{workerData =>
               val containerId = workerData.workerDescription.resourceManagerContainerId

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/main/scala/io/gearpump/services/WorkerService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/WorkerService.scala b/services/jvm/src/main/scala/io/gearpump/services/WorkerService.scala
index 5efce44..a1edbe4 100644
--- a/services/jvm/src/main/scala/io/gearpump/services/WorkerService.scala
+++ b/services/jvm/src/main/scala/io/gearpump/services/WorkerService.scala
@@ -21,12 +21,14 @@ package io.gearpump.services
 import akka.actor.{ActorSystem, ActorRef}
 import akka.http.scaladsl.server.Directives._
 import akka.stream.{Materializer}
+import io.gearpump.WorkerId
 import io.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData}
 import io.gearpump.cluster.ClientToMaster.{ReadOption, QueryHistoryMetrics, QueryWorkerConfig}
 import io.gearpump.cluster.ClusterConfig
 import io.gearpump.cluster.MasterToClient.{HistoryMetrics, WorkerConfig}
 import io.gearpump.util.ActorUtil._
 import io.gearpump.util.Constants
+import io.gearpump.services.util.UpickleUtil._
 
 import scala.util.{Failure, Success}
 
@@ -37,8 +39,9 @@ class WorkerService(val master: ActorRef, override val system: ActorSystem)
   private val systemConfig = system.settings.config
   private val concise = systemConfig.getBoolean(Constants.GEARPUMP_SERVICE_RENDER_CONFIG_CONCISE)
 
-  override def doRoute(implicit mat: Materializer) = pathPrefix("worker" / IntNumber) { workerId =>
+  override def doRoute(implicit mat: Materializer) = pathPrefix("worker" / Segment) { workerIdString =>
     pathEnd {
+      val workerId = WorkerId.parse(workerIdString)
       onComplete(askWorker[WorkerData](master, workerId, GetWorkerData(workerId))) {
         case Success(value: WorkerData) =>
           complete(write(value.workerDescription))
@@ -46,6 +49,7 @@ class WorkerService(val master: ActorRef, override val system: ActorSystem)
       }
     } ~
     path("config") {
+      val workerId = WorkerId.parse(workerIdString)
       onComplete(askWorker[WorkerConfig](master, workerId, QueryWorkerConfig(workerId))) {
         case Success(value: WorkerConfig) =>
           val config = Option(value.config).map(ClusterConfig.render(_, concise)).getOrElse("{}")
@@ -55,6 +59,7 @@ class WorkerService(val master: ActorRef, override val system: ActorSystem)
       }
     } ~
     path("metrics" / RestPath ) { path =>
+      val workerId = WorkerId.parse(workerIdString)
       parameter(ReadOption.Key ? ReadOption.ReadLatest) { readOption =>
         val query = QueryHistoryMetrics(path.head.toString, readOption)
         onComplete(askWorker[HistoryMetrics](master, workerId, query)) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/main/scala/io/gearpump/services/util/UpickleUtil.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/util/UpickleUtil.scala b/services/jvm/src/main/scala/io/gearpump/services/util/UpickleUtil.scala
index 8ca2c65..0ae4505 100644
--- a/services/jvm/src/main/scala/io/gearpump/services/util/UpickleUtil.scala
+++ b/services/jvm/src/main/scala/io/gearpump/services/util/UpickleUtil.scala
@@ -18,6 +18,7 @@
 
 package io.gearpump.services.util
 
+import io.gearpump.WorkerId
 import io.gearpump.util.Graph
 import upickle.Js
 
@@ -31,4 +32,14 @@ object UpickleUtil {
       val edgeList = upickle.default.readJs[List[(Int, String, Int)]](edges._2)
       Graph(vertexList, edgeList)
   }
+
+  implicit val workerIdReader: upickle.default.Reader[WorkerId] = upickle.default.Reader[WorkerId] {
+    case Js.Str(str) =>
+      WorkerId.parse(str)
+  }
+
+  implicit val workerIdWriter: upickle.default.Writer[WorkerId] = upickle.default.Writer[WorkerId] {
+    case workerId: WorkerId =>
+      Js.Str(WorkerId.render(workerId))
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala
index d4ea2b7..738bbad 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala
@@ -23,7 +23,7 @@ import io.gearpump.cluster.TestUtil
 import akka.http.scaladsl.testkit.{ScalatestRouteTest, RouteTestTimeout}
 import com.typesafe.config.{Config, ConfigFactory}
 import org.scalatest.{BeforeAndAfterAll, Matchers, FlatSpec}
-
+import io.gearpump.services.util.UpickleUtil._
 import scala.concurrent.duration._
 
 import scala.util.Try

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala
index 3cfe162..7b98cb6 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala
@@ -37,7 +37,7 @@ import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
 import org.slf4j.Logger
 import upickle.default.read
 import akka.http.scaladsl.testkit.ScalatestRouteTest
-
+import io.gearpump.services.util.UpickleUtil._
 import scala.concurrent.duration._
 import scala.util.{Success, Try}
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala
index 3a1c4fe..1cfc01a 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala
@@ -28,6 +28,7 @@ import akka.stream.scaladsl.Source
 import akka.testkit.TestActor.{AutoPilot, KeepRunning}
 import akka.testkit.TestProbe
 import com.typesafe.config.{Config, ConfigFactory}
+import io.gearpump.WorkerId
 import io.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, GetMasterData, GetWorkerData, MasterData, WorkerData}
 import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryMasterConfig, ResolveWorkerId, SubmitApplication}
 import io.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersData, AppMastersDataRequest, WorkerList}
@@ -44,6 +45,7 @@ import scala.concurrent.{Future, ExecutionContext}
 import scala.concurrent.duration._
 import scala.util.{Success, Try}
 import akka.stream.scaladsl.FileIO
+import io.gearpump.services.util.UpickleUtil._
 
 class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with
   Matchers with BeforeAndAfterAll {
@@ -87,9 +89,9 @@ class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with
           sender ! AppMastersData(List.empty[AppMasterData])
           KeepRunning
         case GetAllWorkers =>
-          sender ! WorkerList(List(0))
+          sender ! WorkerList(List(WorkerId(0, 0L)))
           KeepRunning
-        case ResolveWorkerId(0) =>
+        case ResolveWorkerId(WorkerId(0, 0L)) =>
           sender ! ResolveWorkerIdResult(Success(mockWorker.ref))
           KeepRunning
         case QueryHistoryMetrics(path, _, _, _) =>

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala
index c276286..10b45b9 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala
@@ -21,6 +21,7 @@ package io.gearpump.services
 import akka.http.scaladsl.testkit.{RouteTestTimeout}
 import com.typesafe.config.Config
 import io.gearpump.cluster.TestUtil
+import io.gearpump.services.util.UpickleUtil._
 import org.scalatest.{BeforeAndAfterAll, Matchers, FlatSpec}
 import akka.actor.{ActorSystem}
 import akka.http.scaladsl.server._

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala
index 2ab62e8..501776e 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala
@@ -24,6 +24,7 @@ import akka.testkit.TestProbe
 import com.typesafe.config.{Config, ConfigFactory}
 import io.gearpump.cluster.TestUtil
 import io.gearpump.util.Constants
+import io.gearpump.services.util.UpickleUtil._
 import org.scalatest.{BeforeAndAfterAll, Matchers, FlatSpec}
 
 import scala.util.Try

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala
index e9256a4..dc8d5a7 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala
@@ -23,12 +23,14 @@ import akka.http.scaladsl.model.headers.`Cache-Control`
 import akka.testkit.TestActor.{AutoPilot, KeepRunning}
 import akka.testkit.{TestKit, TestProbe}
 import com.typesafe.config.{Config, ConfigFactory}
+import io.gearpump.WorkerId
 import io.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData}
 import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig, ResolveWorkerId}
 import io.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, ResolveWorkerIdResult, WorkerConfig}
 import io.gearpump.cluster.TestUtil
 import io.gearpump.cluster.worker.WorkerSummary
 import io.gearpump.jarstore.JarStoreService
+import io.gearpump.services.util.UpickleUtil._
 import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
 
 import scala.concurrent.duration._
@@ -52,10 +54,10 @@ class WorkerServiceSpec extends FlatSpec with ScalatestRouteTest  with Matchers
   mockWorker.setAutoPilot {
     new AutoPilot {
       def run(sender: ActorRef, msg: Any) = msg match {
-        case GetWorkerData(appId) =>
+        case GetWorkerData(workerId) =>
           sender ! WorkerData(WorkerSummary.empty)
           KeepRunning
-        case QueryWorkerConfig(appId) =>
+        case QueryWorkerConfig(workerId) =>
           sender ! WorkerConfig(null)
           KeepRunning
         case QueryHistoryMetrics(path, _, _, _) =>
@@ -79,7 +81,7 @@ class WorkerServiceSpec extends FlatSpec with ScalatestRouteTest  with Matchers
 
   "ConfigQueryService" should "return config for worker" in {
     implicit val customTimeout = RouteTestTimeout(15.seconds)
-    (Get(s"/api/$REST_VERSION/worker/1/config") ~> workerRoute) ~> check{
+    (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(0, 0L))}/config") ~> workerRoute) ~> check{
       val responseBody = responseAs[String]
       val config = Try(ConfigFactory.parseString(responseBody))
       assert(config.isSuccess)
@@ -88,7 +90,7 @@ class WorkerServiceSpec extends FlatSpec with ScalatestRouteTest  with Matchers
 
   it should "return WorkerData" in {
     implicit val customTimeout = RouteTestTimeout(15.seconds)
-    (Get(s"/api/$REST_VERSION/worker/1") ~> workerRoute) ~> check{
+    (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(1, 0L))}") ~> workerRoute) ~> check{
       val responseBody = responseAs[String]
       val config = Try(ConfigFactory.parseString(responseBody))
       assert(config.isSuccess)
@@ -102,7 +104,7 @@ class WorkerServiceSpec extends FlatSpec with ScalatestRouteTest  with Matchers
 
   "MetricsQueryService" should "return history metrics" in {
     implicit val customTimeout = RouteTestTimeout(15.seconds)
-    (Get(s"/api/$REST_VERSION/worker/0/metrics/worker") ~> workerRoute) ~> check {
+    (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(0, 0L))}/metrics/worker") ~> workerRoute) ~> check {
       val responseBody = responseAs[String]
       val config = Try(ConfigFactory.parseString(responseBody))
       assert(config.isSuccess)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/streaming/src/main/scala/io/gearpump/streaming/appmaster/AppMaster.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/AppMaster.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/AppMaster.scala
index fe90c6a..6fcc47c 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/AppMaster.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/AppMaster.scala
@@ -82,7 +82,8 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription)  extends Ap
   private val systemConfig = context.system.settings.config
   private var lastFailure = LastFailure(0L, null)
 
-  private val appMasterBrief = ExecutorBrief(APPMASTER_DEFAULT_EXECUTOR_ID, self.path.toString, Option(appContext.workerInfo).map(_.workerId).getOrElse(-1), "active")
+  private val appMasterBrief = ExecutorBrief(APPMASTER_DEFAULT_EXECUTOR_ID,
+    self.path.toString, Option(appContext.workerInfo).map(_.workerId).getOrElse(WorkerId.unspecified), "active")
 
   private val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig)
 
@@ -93,7 +94,7 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription)  extends Ap
 
   private val appMasterExecutorSummary = ExecutorSummary(
     APPMASTER_DEFAULT_EXECUTOR_ID,
-    Option(appContext.workerInfo).map(_.workerId).getOrElse(-1),
+    Option(appContext.workerInfo).map(_.workerId).getOrElse(WorkerId.unspecified),
     self.path.toString,
     logFile.getAbsolutePath,
     status = "Active",
@@ -334,6 +335,6 @@ object AppMaster {
 
   class ServiceNotAvailableException(reason: String) extends Exception(reason)
 
-  case class ExecutorBrief(executorId: ExecutorId, executor: String, workerId: Int, status: String)
+  case class ExecutorBrief(executorId: ExecutorId, executor: String, workerId: WorkerId, status: String)
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/streaming/src/main/scala/io/gearpump/streaming/appmaster/ExecutorManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/ExecutorManager.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/ExecutorManager.scala
index 526fbc3..ebe6c11 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/ExecutorManager.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/ExecutorManager.scala
@@ -22,6 +22,7 @@ import akka.actor.SupervisorStrategy.Stop
 import akka.actor._
 import akka.remote.RemoteScope
 import com.typesafe.config.Config
+import io.gearpump.WorkerId
 import io.gearpump.cluster.AppMasterToWorker.ChangeExecutorResource
 import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, ExecutorSystemStarted, StartExecutorSystemTimeout, StartExecutorSystems}
 import io.gearpump.cluster.appmaster.WorkerInfo
@@ -167,7 +168,7 @@ private[appmaster] object ExecutorManager {
 
   case object GetExecutorInfo
 
-  case class ExecutorStarted(executorId: Int, resource: Resource, workerId: Int, boundedJar: Option[AppJar])
+  case class ExecutorStarted(executorId: Int, resource: Resource, workerId: WorkerId, boundedJar: Option[AppJar])
   case class ExecutorStopped(executorId: Int)
 
   case class SetTaskManager(taskManager: ActorRef)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/streaming/src/main/scala/io/gearpump/streaming/appmaster/JarScheduler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/JarScheduler.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/JarScheduler.scala
index fa2e444..e769a0d 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/JarScheduler.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/JarScheduler.scala
@@ -19,7 +19,7 @@ package io.gearpump.streaming.appmaster
 
 import akka.actor._
 import com.typesafe.config.Config
-import io.gearpump.TimeStamp
+import io.gearpump.{WorkerId, TimeStamp}
 import io.gearpump.streaming.task.{StartClock, TaskId}
 import io.gearpump.streaming.{ProcessorDescription, DAG}
 import io.gearpump.cluster.AppJar
@@ -52,7 +52,7 @@ class JarScheduler(appId : Int, appName: String, config: Config, factory: ActorR
     (actor ? GetRequestDetails).asInstanceOf[Future[Array[ResourceRequestDetail]]]
   }
 
-  def scheduleTask(appJar: AppJar, workerId: Int, executorId: Int, resource: Resource): Future[List[TaskId]] = {
+  def scheduleTask(appJar: AppJar, workerId: WorkerId, executorId: Int, resource: Resource): Future[List[TaskId]] = {
     (actor ? ScheduleTask(appJar, workerId, executorId, resource)).asInstanceOf[Future[List[TaskId]]]
   }
 
@@ -71,7 +71,7 @@ object JarScheduler{
 
   case object GetRequestDetails
 
-  case class ScheduleTask(appJar: AppJar, workerId: Int, executorId: Int, resource: Resource)
+  case class ScheduleTask(appJar: AppJar, workerId: WorkerId, executorId: Int, resource: Resource)
 
   case class ExecutorFailed(executorId: Int)
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskLocator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskLocator.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskLocator.scala
index 10ce15b..c456a06 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskLocator.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskLocator.scala
@@ -19,6 +19,7 @@ package io.gearpump.streaming.appmaster
 
 import com.typesafe.config.{ConfigValueFactory, ConfigFactory, ConfigRenderOptions, Config}
 import TaskLocator.{Localities, WorkerLocality, NonLocality, Locality}
+import io.gearpump.WorkerId
 import io.gearpump.streaming.Constants
 import io.gearpump.streaming.task.TaskId
 import scala.util.Try
@@ -56,12 +57,10 @@ object TaskLocator {
 
   trait Locality
 
-  case class WorkerLocality(workerId: Int) extends Locality
+  case class WorkerLocality(workerId: WorkerId) extends Locality
 
   object NonLocality extends Locality
 
-  type WorkerId = Int
-
   case class Localities(localities: Map[WorkerId, Array[TaskId]])
 
   object Localities {
@@ -70,7 +69,7 @@ object TaskLocator {
     def fromJson(json: String): Localities = {
       val localities = ConfigFactory.parseString(json).getAnyRef("localities")
         .asInstanceOf[java.util.Map[String, String]].asScala.map { pair =>
-        val workerId: WorkerId = pair._1.toInt
+        val workerId: WorkerId = WorkerId.parse(pair._1)
         val tasks = pair._2.split(",").map { task =>
           val pattern(processorId, taskIndex) = task
           TaskId(processorId.toInt, taskIndex.toInt)
@@ -82,7 +81,7 @@ object TaskLocator {
 
     def toJson(localities: Localities): String = {
       val map = localities.localities.toList.map {pair =>
-        (pair._1.toString, pair._2.map(task => s"task_${task.processorId}_${task.index}").mkString(","))
+        (WorkerId.render(pair._1), pair._2.map(task => s"task_${task.processorId}_${task.index}").mkString(","))
       }.toMap.asJava
       ConfigFactory.empty().withValue("localities", ConfigValueFactory.fromAnyRef(map)).
         root.render(ConfigRenderOptions.concise())