You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/06/26 20:11:49 UTC

[GitHub] tysonnorris closed pull request #3507: allow use of string for controller id

tysonnorris closed pull request #3507: allow use of string for controller id
URL: https://github.com/apache/incubator-openwhisk/pull/3507
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/src/main/scala/whisk/common/Logging.scala b/common/scala/src/main/scala/whisk/common/Logging.scala
index e5fcfe2eb7..cf8cb264cc 100644
--- a/common/scala/src/main/scala/whisk/common/Logging.scala
+++ b/common/scala/src/main/scala/whisk/common/Logging.scala
@@ -20,11 +20,10 @@ package whisk.common
 import java.io.PrintStream
 import java.time.{Clock, Instant, ZoneId}
 import java.time.format.DateTimeFormatter
-
 import akka.event.Logging._
 import akka.event.LoggingAdapter
 import kamon.Kamon
-import whisk.core.entity.InstanceId
+import whisk.core.entity.ControllerInstanceId
 
 trait Logging {
 
@@ -255,7 +254,7 @@ object LoggingMarkers {
   /*
    * Controller related markers
    */
-  def CONTROLLER_STARTUP(i: Int) = LogMarkerToken(controller, s"startup$i", count)
+  def CONTROLLER_STARTUP(id: String) = LogMarkerToken(controller, s"startup$id", count)
 
   // Time of the activation in controller until it is delivered to Kafka
   val CONTROLLER_ACTIVATION = LogMarkerToken(controller, activation, start)
@@ -277,8 +276,8 @@ object LoggingMarkers {
   val LOADBALANCER_INVOKER_UNHEALTHY = LogMarkerToken(loadbalancer, "invokerUnhealthy", count)
   val LOADBALANCER_ACTIVATION_START = LogMarkerToken(loadbalancer, "activations", count)
 
-  def LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance: InstanceId) =
-    LogMarkerToken(loadbalancer + controllerInstance.toInt, "activationsInflight", count)
+  def LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance: ControllerInstanceId) =
+    LogMarkerToken(loadbalancer + controllerInstance.asString, "activationsInflight", count)
 
   // Time that is needed to execute the action
   val INVOKER_ACTIVATION_RUN = LogMarkerToken(invoker, "activationRun", start)
diff --git a/common/scala/src/main/scala/whisk/core/connector/Message.scala b/common/scala/src/main/scala/whisk/core/connector/Message.scala
index b17de8f31a..eb960b3ca5 100644
--- a/common/scala/src/main/scala/whisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/Message.scala
@@ -46,7 +46,7 @@ case class ActivationMessage(override val transid: TransactionId,
                              revision: DocRevision,
                              user: Identity,
                              activationId: ActivationId,
-                             rootControllerIndex: InstanceId,
+                             rootControllerIndex: ControllerInstanceId,
                              blocking: Boolean,
                              content: Option[JsObject],
                              cause: Option[ActivationId] = None)
@@ -76,7 +76,7 @@ object ActivationMessage extends DefaultJsonProtocol {
  */
 case class CompletionMessage(override val transid: TransactionId,
                              response: Either[ActivationId, WhiskActivation],
-                             invoker: InstanceId)
+                             invoker: InvokerInstanceId)
     extends Message {
 
   override def serialize: String = {
@@ -108,7 +108,7 @@ object CompletionMessage extends DefaultJsonProtocol {
   private val serdes = jsonFormat3(CompletionMessage.apply)
 }
 
-case class PingMessage(instance: InstanceId) extends Message {
+case class PingMessage(instance: InvokerInstanceId) extends Message {
   override def serialize = PingMessage.serdes.write(this).compactPrint
 }
 
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala b/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala
index 35d3b8bb0c..ae7f3d1772 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala
@@ -24,7 +24,7 @@ import whisk.common.TransactionId
 import whisk.core.WhiskConfig
 import whisk.core.entity.ByteSize
 import whisk.core.entity.ExecManifest
-import whisk.core.entity.InstanceId
+import whisk.core.entity.InvokerInstanceId
 import whisk.spi.Spi
 
 case class ContainerArgsConfig(network: String,
@@ -74,7 +74,7 @@ object ContainerFactory {
   private def isAllowed(c: Char) = c.isLetterOrDigit || c == '_' || c == '.' || c == '-'
 
   /** include the instance name, if specified and strip invalid chars before attempting to use them in the container name */
-  def containerNamePrefix(instanceId: InstanceId): String =
+  def containerNamePrefix(instanceId: InvokerInstanceId): String =
     s"wsk${instanceId.name.getOrElse("")}${instanceId.toInt}".filter(isAllowed)
 }
 
@@ -86,6 +86,6 @@ trait ContainerFactoryProvider extends Spi {
   def getContainerFactory(actorSystem: ActorSystem,
                           logging: Logging,
                           config: WhiskConfig,
-                          instance: InstanceId,
+                          instance: InvokerInstanceId,
                           parameters: Map[String, Set[String]]): ContainerFactory
 }
diff --git a/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala b/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala
index 417a8b9213..1bfd5aaefd 100644
--- a/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala
+++ b/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala
@@ -34,7 +34,7 @@ import whisk.core.connector.Message
 import whisk.core.connector.MessageFeed
 import whisk.core.connector.MessagingProvider
 import whisk.core.entity.CacheKey
-import whisk.core.entity.InstanceId
+import whisk.core.entity.ControllerInstanceId
 import whisk.core.entity.WhiskAction
 import whisk.core.entity.WhiskActionMetaData
 import whisk.core.entity.WhiskPackage
@@ -51,13 +51,14 @@ object CacheInvalidationMessage extends DefaultJsonProtocol {
   implicit val serdes = jsonFormat(CacheInvalidationMessage.apply _, "key", "instanceId")
 }
 
-class RemoteCacheInvalidation(config: WhiskConfig, component: String, instance: InstanceId)(implicit logging: Logging,
-                                                                                            as: ActorSystem) {
+class RemoteCacheInvalidation(config: WhiskConfig, component: String, instance: ControllerInstanceId)(
+  implicit logging: Logging,
+  as: ActorSystem) {
 
   implicit private val ec = as.dispatchers.lookup("dispatchers.kafka-dispatcher")
 
   private val topic = "cacheInvalidation"
-  private val instanceId = s"$component${instance.toInt}"
+  private val instanceId = s"$component${instance.asString}"
 
   private val msgProvider = SpiLoader.get[MessagingProvider]
   private val cacheInvalidationConsumer =
diff --git a/common/scala/src/main/scala/whisk/core/entity/InstanceId.scala b/common/scala/src/main/scala/whisk/core/entity/InstanceId.scala
index 8087b8b7bd..9ee2dc086a 100644
--- a/common/scala/src/main/scala/whisk/core/entity/InstanceId.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/InstanceId.scala
@@ -18,11 +18,30 @@
 package whisk.core.entity
 
 import spray.json.DefaultJsonProtocol
+import whisk.core.entity.ControllerInstanceId.LEGAL_CHARS
+import whisk.core.entity.ControllerInstanceId.MAX_NAME_LENGTH
 
-case class InstanceId(val instance: Int, name: Option[String] = None) {
+case class InvokerInstanceId(val instance: Int, name: Option[String] = None) {
   def toInt: Int = instance
 }
 
-object InstanceId extends DefaultJsonProtocol {
-  implicit val serdes = jsonFormat2(InstanceId.apply)
+case class ControllerInstanceId(val asString: String) {
+  require(
+    asString.length <= MAX_NAME_LENGTH && asString.matches(LEGAL_CHARS),
+    "Controller instance id contains invalid characters")
+}
+
+object InvokerInstanceId extends DefaultJsonProtocol {
+  implicit val serdes = jsonFormat2(InvokerInstanceId.apply)
+}
+
+object ControllerInstanceId extends DefaultJsonProtocol {
+  // controller ids become part of a kafka topic, hence, hence allow only certain characters
+  // see https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L29
+  private val LEGAL_CHARS = "[a-zA-Z0-9._-]+"
+
+  // reserve some number of characters as the prefix to be added to topic names
+  private val MAX_NAME_LENGTH = 249 - 121
+
+  implicit val serdes = jsonFormat1(ControllerInstanceId.apply)
 }
diff --git a/common/scala/src/main/scala/whisk/core/mesos/MesosContainerFactory.scala b/common/scala/src/main/scala/whisk/core/mesos/MesosContainerFactory.scala
index 42232b9d1e..c7ae5871d8 100644
--- a/common/scala/src/main/scala/whisk/core/mesos/MesosContainerFactory.scala
+++ b/common/scala/src/main/scala/whisk/core/mesos/MesosContainerFactory.scala
@@ -47,7 +47,7 @@ import whisk.core.containerpool.ContainerFactory
 import whisk.core.containerpool.ContainerFactoryProvider
 import whisk.core.entity.ByteSize
 import whisk.core.entity.ExecManifest
-import whisk.core.entity.InstanceId
+import whisk.core.entity.InvokerInstanceId
 import whisk.core.entity.UUID
 
 /**
@@ -198,7 +198,7 @@ object MesosContainerFactoryProvider extends ContainerFactoryProvider {
   override def getContainerFactory(actorSystem: ActorSystem,
                                    logging: Logging,
                                    config: WhiskConfig,
-                                   instance: InstanceId,
+                                   instance: InvokerInstanceId,
                                    parameters: Map[String, Set[String]]): ContainerFactory =
     new MesosContainerFactory(config, actorSystem, logging, parameters)
 }
diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index a8b6a4e960..04dddcf301 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -74,7 +74,7 @@ import pureconfig.loadConfigOrThrow
  * @param verbosity logging verbosity
  * @param executionContext Scala runtime support for concurrent operations
  */
-class Controller(val instance: InstanceId,
+class Controller(val instance: ControllerInstanceId,
                  runtimes: Runtimes,
                  implicit val whiskConfig: WhiskConfig,
                  implicit val actorSystem: ActorSystem,
@@ -84,8 +84,8 @@ class Controller(val instance: InstanceId,
 
   TransactionId.controller.mark(
     this,
-    LoggingMarkers.CONTROLLER_STARTUP(instance.toInt),
-    s"starting controller instance ${instance.toInt}",
+    LoggingMarkers.CONTROLLER_STARTUP(instance.asString),
+    s"starting controller instance ${instance.asString}",
     logLevel = InfoLevel)
 
   /**
@@ -203,7 +203,7 @@ object Controller {
 
     // if deploying multiple instances (scale out), must pass the instance number as the
     require(args.length >= 1, "controller instance required")
-    val instance = args(0).toInt
+    val instance = ControllerInstanceId(args(0))
 
     def abort(message: String) = {
       logger.error(this, message)
@@ -219,7 +219,7 @@ object Controller {
     val msgProvider = SpiLoader.get[MessagingProvider]
 
     Map(
-      "completed" + instance -> "completed",
+      "completed" + instance.asString -> "completed",
       "health" -> "health",
       "cacheInvalidation" -> "cache-invalidation",
       "events" -> "events").foreach {
@@ -232,7 +232,7 @@ object Controller {
     ExecManifest.initialize(config) match {
       case Success(_) =>
         val controller = new Controller(
-          InstanceId(instance),
+          instance,
           ExecManifest.runtimesManifest,
           config,
           actorSystem,
diff --git a/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala b/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
index 9791fa8bb8..3bb3664c31 100644
--- a/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
@@ -163,7 +163,7 @@ protected[controller] trait RespondWithHeaders extends Directives {
 }
 
 class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)(
-  implicit val activeAckTopicIndex: InstanceId,
+  implicit val activeAckTopicIndex: ControllerInstanceId,
   implicit val actorSystem: ActorSystem,
   implicit val materializer: ActorMaterializer,
   implicit val logging: Logging,
@@ -242,7 +242,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)(
 
   class ActionsApi(val apiPath: String, val apiVersion: String)(
     implicit override val actorSystem: ActorSystem,
-    override val activeAckTopicIndex: InstanceId,
+    override val activeAckTopicIndex: ControllerInstanceId,
     override val entityStore: EntityStore,
     override val activationStore: ActivationStore,
     override val entitlementProvider: EntitlementProvider,
@@ -310,7 +310,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)(
                                             override val webApiDirectives: WebApiDirectives)(
     implicit override val authStore: AuthStore,
     implicit val entityStore: EntityStore,
-    override val activeAckTopicIndex: InstanceId,
+    override val activeAckTopicIndex: ControllerInstanceId,
     override val activationStore: ActivationStore,
     override val entitlementProvider: EntitlementProvider,
     override val activationIdFactory: ActivationIdGenerator,
diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
index 135bc48d88..79da2d5236 100644
--- a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
@@ -55,7 +55,7 @@ protected[actions] trait PrimitiveActions {
    *  The index of the active ack topic, this controller is listening for.
    *  Typically this is also the instance number of the controller
    */
-  protected val activeAckTopicIndex: InstanceId
+  protected val activeAckTopicIndex: ControllerInstanceId
 
   /** Database service to CRUD actions. */
   protected val entityStore: EntityStore
diff --git a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
index 23007f323f..116f3efa53 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
@@ -78,7 +78,7 @@ protected[core] object EntitlementProvider {
 protected[core] abstract class EntitlementProvider(
   config: WhiskConfig,
   loadBalancer: LoadBalancer,
-  controllerInstance: InstanceId)(implicit actorSystem: ActorSystem, logging: Logging) {
+  controllerInstance: ControllerInstanceId)(implicit actorSystem: ActorSystem, logging: Logging) {
 
   private implicit val executionContext: ExecutionContext = actorSystem.dispatcher
 
@@ -372,7 +372,7 @@ protected[core] abstract class EntitlementProvider(
             UserEvents.send(
               eventProducer,
               EventMessage(
-                s"controller${controllerInstance.instance}",
+                s"controller${controllerInstance.asString}",
                 metric,
                 user.subject,
                 user.namespace.name.toString,
@@ -388,7 +388,7 @@ protected[core] abstract class EntitlementProvider(
         UserEvents.send(
           eventProducer,
           EventMessage(
-            s"controller${controllerInstance.instance}",
+            s"controller${controllerInstance.asString}",
             metric,
             user.subject,
             user.namespace.name.toString,
diff --git a/core/controller/src/main/scala/whisk/core/entitlement/LocalEntitlement.scala b/core/controller/src/main/scala/whisk/core/entitlement/LocalEntitlement.scala
index fa0edf7e0b..4f240c6ed7 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/LocalEntitlement.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/LocalEntitlement.scala
@@ -23,7 +23,7 @@ import akka.actor.ActorSystem
 import whisk.common.Logging
 import whisk.common.TransactionId
 import whisk.core.WhiskConfig
-import whisk.core.entity.{InstanceId, Subject}
+import whisk.core.entity.{ControllerInstanceId, Subject}
 import whisk.core.loadBalancer.LoadBalancer
 
 private object LocalEntitlementProvider {
@@ -35,7 +35,7 @@ private object LocalEntitlementProvider {
 protected[core] class LocalEntitlementProvider(
   private val config: WhiskConfig,
   private val loadBalancer: LoadBalancer,
-  private val controllerInstance: InstanceId)(implicit actorSystem: ActorSystem, logging: Logging)
+  private val controllerInstance: ControllerInstanceId)(implicit actorSystem: ActorSystem, logging: Logging)
     extends EntitlementProvider(config, loadBalancer, controllerInstance) {
 
   private implicit val executionContext = actorSystem.dispatcher
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
index 152f555827..b8f29f1860 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
@@ -50,8 +50,8 @@ case object Offline extends InvokerState { val asString = "down" }
 case object Healthy extends InvokerState { val asString = "up" }
 case object UnHealthy extends InvokerState { val asString = "unhealthy" }
 
-case class ActivationRequest(msg: ActivationMessage, invoker: InstanceId)
-case class InvocationFinishedMessage(invokerInstance: InstanceId, successful: Boolean)
+case class ActivationRequest(msg: ActivationMessage, invoker: InvokerInstanceId)
+case class InvocationFinishedMessage(invokerInstance: InvokerInstanceId, successful: Boolean)
 
 // Sent to a monitor if the state changed
 case class CurrentInvokerPoolState(newState: IndexedSeq[InvokerHealth])
@@ -70,8 +70,8 @@ final case class InvokerInfo(buffer: RingBuffer[Boolean])
  * Note: An Invoker that never sends an initial Ping will not be considered
  * by the InvokerPool and thus might not be caught by monitoring.
  */
-class InvokerPool(childFactory: (ActorRefFactory, InstanceId) => ActorRef,
-                  sendActivationToInvoker: (ActivationMessage, InstanceId) => Future[RecordMetadata],
+class InvokerPool(childFactory: (ActorRefFactory, InvokerInstanceId) => ActorRef,
+                  sendActivationToInvoker: (ActivationMessage, InvokerInstanceId) => Future[RecordMetadata],
                   pingConsumer: MessageConsumer,
                   monitor: Option[ActorRef])
     extends Actor {
@@ -83,8 +83,8 @@ class InvokerPool(childFactory: (ActorRefFactory, InstanceId) => ActorRef,
 
   // State of the actor. Mutable vars with immutable collections prevents closures or messages
   // from leaking the state for external mutation
-  var instanceToRef = immutable.Map.empty[InstanceId, ActorRef]
-  var refToInstance = immutable.Map.empty[ActorRef, InstanceId]
+  var instanceToRef = immutable.Map.empty[InvokerInstanceId, ActorRef]
+  var refToInstance = immutable.Map.empty[ActorRef, InvokerInstanceId]
   var status = IndexedSeq[InvokerHealth]()
 
   def receive = {
@@ -152,10 +152,10 @@ class InvokerPool(childFactory: (ActorRefFactory, InstanceId) => ActorRef,
   def padToIndexed[A](list: IndexedSeq[A], n: Int, f: (Int) => A) = list ++ (list.size until n).map(f)
 
   // Register a new invoker
-  def registerInvoker(instanceId: InstanceId): ActorRef = {
+  def registerInvoker(instanceId: InvokerInstanceId): ActorRef = {
     logging.info(this, s"registered a new invoker: invoker${instanceId.toInt}")(TransactionId.invokerHealth)
 
-    status = padToIndexed(status, instanceId.toInt + 1, i => new InvokerHealth(InstanceId(i), Offline))
+    status = padToIndexed(status, instanceId.toInt + 1, i => new InvokerHealth(InvokerInstanceId(i), Offline))
 
     val ref = childFactory(context, instanceId)
 
@@ -196,7 +196,7 @@ object InvokerPool {
    * @param entityStore store to write the action to
    * @return throws an exception on failure to prepare
    */
-  def prepare(controllerInstance: InstanceId, entityStore: EntityStore): Unit = {
+  def prepare(controllerInstance: ControllerInstanceId, entityStore: EntityStore): Unit = {
     InvokerPool
       .healthAction(controllerInstance)
       .map {
@@ -211,8 +211,8 @@ object InvokerPool {
       }
   }
 
-  def props(f: (ActorRefFactory, InstanceId) => ActorRef,
-            p: (ActivationMessage, InstanceId) => Future[RecordMetadata],
+  def props(f: (ActorRefFactory, InvokerInstanceId) => ActorRef,
+            p: (ActivationMessage, InvokerInstanceId) => Future[RecordMetadata],
             pc: MessageConsumer,
             m: Option[ActorRef] = None) = {
     Props(new InvokerPool(f, p, pc, m))
@@ -226,11 +226,12 @@ object InvokerPool {
   }
 
   /** An action to use for monitoring invoker health. */
-  def healthAction(i: InstanceId) = ExecManifest.runtimesManifest.resolveDefaultRuntime("nodejs:6").map { manifest =>
-    new WhiskAction(
-      namespace = healthActionIdentity.namespace.name.toPath,
-      name = EntityName(s"invokerHealthTestAction${i.toInt}"),
-      exec = CodeExecAsString(manifest, """function main(params) { return params; }""", None))
+  def healthAction(i: ControllerInstanceId) = ExecManifest.runtimesManifest.resolveDefaultRuntime("nodejs:6").map {
+    manifest =>
+      new WhiskAction(
+        namespace = healthActionIdentity.namespace.name.toPath,
+        name = EntityName(s"invokerHealthTestAction${i.asString}"),
+        exec = CodeExecAsString(manifest, """function main(params) { return params; }""", None))
   }
 }
 
@@ -240,7 +241,8 @@ object InvokerPool {
  * This finite state-machine represents an Invoker in its possible
  * states "Healthy" and "Offline".
  */
-class InvokerActor(invokerInstance: InstanceId, controllerInstance: InstanceId) extends FSM[InvokerState, InvokerInfo] {
+class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: ControllerInstanceId)
+    extends FSM[InvokerState, InvokerInfo] {
   implicit val transid = TransactionId.invokerHealth
   implicit val logging = new AkkaLogging(context.system.log)
   val name = s"invoker${invokerInstance.toInt}"
@@ -391,7 +393,7 @@ class InvokerActor(invokerInstance: InstanceId, controllerInstance: InstanceId)
 }
 
 object InvokerActor {
-  def props(invokerInstance: InstanceId, controllerInstance: InstanceId) =
+  def props(invokerInstance: InvokerInstanceId, controllerInstance: ControllerInstanceId) =
     Props(new InvokerActor(invokerInstance, controllerInstance))
 
   val bufferSize = 10
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala
index 52ffd73d4e..5d581fda76 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala
@@ -33,7 +33,7 @@ import whisk.spi.Spi
  * @param id a unique instance identifier for the invoker
  * @param status it status (healthy, unhealthy, offline)
  */
-class InvokerHealth(val id: InstanceId, val status: InvokerState) {
+class InvokerHealth(val id: InvokerInstanceId, val status: InvokerState) {
   override def equals(obj: scala.Any): Boolean = obj match {
     case that: InvokerHealth => that.id == this.id && that.status == this.status
     case _                   => false
@@ -82,9 +82,10 @@ trait LoadBalancer {
 trait LoadBalancerProvider extends Spi {
   def requiredProperties: Map[String, String]
 
-  def loadBalancer(whiskConfig: WhiskConfig, instance: InstanceId)(implicit actorSystem: ActorSystem,
-                                                                   logging: Logging,
-                                                                   materializer: ActorMaterializer): LoadBalancer
+  def loadBalancer(whiskConfig: WhiskConfig, instance: ControllerInstanceId)(
+    implicit actorSystem: ActorSystem,
+    logging: Logging,
+    materializer: ActorMaterializer): LoadBalancer
 }
 
 /** Exception thrown by the loadbalancer */
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index bad384288b..eac4aff4f4 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -50,7 +50,7 @@ import scala.util.{Failure, Success}
  * Horizontal sharding means, that each invoker's capacity is evenly divided between the loadbalancers. If an invoker
  * has at most 16 slots available, those will be divided to 8 slots for each loadbalancer (if there are 2).
  */
-class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: InstanceId)(
+class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: ControllerInstanceId)(
   implicit val actorSystem: ActorSystem,
   logging: Logging,
   materializer: ActorMaterializer)
@@ -159,7 +159,7 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Ins
   /** 2. Update local state with the to be executed activation */
   private def setupActivation(msg: ActivationMessage,
                               action: ExecutableWhiskActionMetaData,
-                              instance: InstanceId): ActivationEntry = {
+                              instance: InvokerInstanceId): ActivationEntry = {
 
     totalActivations.increment()
     activationsPerNamespace.getOrElseUpdate(msg.user.namespace.uuid, new LongAdder()).increment()
@@ -191,7 +191,7 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Ins
   /** 3. Send the activation to the invoker */
   private def sendActivationToInvoker(producer: MessageProducer,
                                       msg: ActivationMessage,
-                                      invoker: InstanceId): Future[RecordMetadata] = {
+                                      invoker: InvokerInstanceId): Future[RecordMetadata] = {
     implicit val transid: TransactionId = msg.transid
 
     val topic = s"invoker${invoker.toInt}"
@@ -218,7 +218,7 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Ins
    * Subscribes to active acks (completion messages from the invokers), and
    * registers a handler for received active acks from invokers.
    */
-  private val activeAckTopic = s"completed${controllerInstance.toInt}"
+  private val activeAckTopic = s"completed${controllerInstance.asString}"
   private val maxActiveAcksPerPoll = 128
   private val activeAckPollDuration = 1.second
   private val activeAckConsumer =
@@ -252,7 +252,7 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Ins
   private def processCompletion(response: Either[ActivationId, WhiskActivation],
                                 tid: TransactionId,
                                 forced: Boolean,
-                                invoker: InstanceId): Unit = {
+                                invoker: InvokerInstanceId): Unit = {
     val aid = response.fold(l => l, r => r.activationId)
 
     // treat left as success (as it is the result of a message exceeding the bus limit)
@@ -295,14 +295,14 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Ins
       InvokerPool.props(
         (f, i) => f.actorOf(InvokerActor.props(i, controllerInstance)),
         (m, i) => sendActivationToInvoker(messageProducer, m, i),
-        messagingProvider.getConsumer(config, s"health${controllerInstance.toInt}", "health", maxPeek = 128),
+        messagingProvider.getConsumer(config, s"health${controllerInstance.asString}", "health", maxPeek = 128),
         Some(monitor)))
   }
 }
 
 object ShardingContainerPoolBalancer extends LoadBalancerProvider {
 
-  override def loadBalancer(whiskConfig: WhiskConfig, instance: InstanceId)(
+  override def loadBalancer(whiskConfig: WhiskConfig, instance: ControllerInstanceId)(
     implicit actorSystem: ActorSystem,
     logging: Logging,
     materializer: ActorMaterializer): LoadBalancer = new ShardingContainerPoolBalancer(whiskConfig, instance)
@@ -341,7 +341,7 @@ object ShardingContainerPoolBalancer extends LoadBalancerProvider {
                dispatched: IndexedSeq[ForcableSemaphore],
                index: Int,
                step: Int,
-               stepsDone: Int = 0)(implicit logging: Logging): Option[InstanceId] = {
+               stepsDone: Int = 0)(implicit logging: Logging): Option[InvokerInstanceId] = {
     val numInvokers = invokers.size
 
     if (numInvokers > 0) {
@@ -501,6 +501,6 @@ case class ShardingContainerPoolBalancerConfig(blackboxFraction: Double, invoker
  */
 case class ActivationEntry(id: ActivationId,
                            namespaceId: UUID,
-                           invokerName: InstanceId,
+                           invokerName: InvokerInstanceId,
                            timeoutHandler: Cancellable,
                            promise: Promise[Either[ActivationId, WhiskActivation]])
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
index 0a7c1aca29..c0914b90c5 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
@@ -95,10 +95,10 @@ case object RescheduleJob // job is sent back to parent and could not be process
  */
 class ContainerProxy(
   factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container],
-  sendActiveAck: (TransactionId, WhiskActivation, Boolean, InstanceId, UUID) => Future[Any],
+  sendActiveAck: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID) => Future[Any],
   storeActivation: (TransactionId, WhiskActivation) => Future[Any],
   collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs],
-  instance: InstanceId,
+  instance: InvokerInstanceId,
   poolConfig: ContainerPoolConfig,
   unusedTimeout: FiniteDuration,
   pauseGrace: FiniteDuration)
@@ -428,10 +428,10 @@ final case class ContainerProxyTimeoutConfig(idleContainer: FiniteDuration, paus
 object ContainerProxy {
   def props(
     factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container],
-    ack: (TransactionId, WhiskActivation, Boolean, InstanceId, UUID) => Future[Any],
+    ack: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID) => Future[Any],
     store: (TransactionId, WhiskActivation) => Future[Any],
     collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs],
-    instance: InstanceId,
+    instance: InvokerInstanceId,
     poolConfig: ContainerPoolConfig,
     unusedTimeout: FiniteDuration = timeouts.idleContainer,
     pauseGrace: FiniteDuration = timeouts.pauseGrace) =
@@ -449,7 +449,7 @@ object ContainerProxy {
    * @param suffix the container name's suffix
    * @return a unique container name
    */
-  def containerName(instance: InstanceId, prefix: String, suffix: String): String = {
+  def containerName(instance: InvokerInstanceId, prefix: String, suffix: String): String = {
     def isAllowed(c: Char): Boolean = c.isLetterOrDigit || c == '_'
 
     val sanitizedPrefix = prefix.filter(isAllowed)
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala
index 4d4fcf1781..68ada27674 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala
@@ -29,7 +29,7 @@ import whisk.core.containerpool.ContainerFactory
 import whisk.core.containerpool.ContainerFactoryProvider
 import whisk.core.entity.ByteSize
 import whisk.core.entity.ExecManifest
-import whisk.core.entity.InstanceId
+import whisk.core.entity.InvokerInstanceId
 import scala.concurrent.duration._
 import java.util.concurrent.TimeoutException
 import pureconfig._
@@ -38,7 +38,7 @@ import whisk.core.containerpool.ContainerArgsConfig
 
 case class DockerContainerFactoryConfig(useRunc: Boolean)
 
-class DockerContainerFactory(instance: InstanceId,
+class DockerContainerFactory(instance: InvokerInstanceId,
                              parameters: Map[String, Set[String]],
                              containerArgsConfig: ContainerArgsConfig =
                                loadConfigOrThrow[ContainerArgsConfig](ConfigKeys.containerArgs),
@@ -136,7 +136,7 @@ object DockerContainerFactoryProvider extends ContainerFactoryProvider {
   override def getContainerFactory(actorSystem: ActorSystem,
                                    logging: Logging,
                                    config: WhiskConfig,
-                                   instanceId: InstanceId,
+                                   instanceId: InvokerInstanceId,
                                    parameters: Map[String, Set[String]]): ContainerFactory = {
 
     new DockerContainerFactory(instanceId, parameters)(
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
index b56a2856cb..ec01bf3224 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
@@ -32,7 +32,7 @@ import whisk.core.containerpool.ContainerFactory
 import whisk.core.containerpool.ContainerFactoryProvider
 import whisk.core.entity.ByteSize
 import whisk.core.entity.ExecManifest.ImageName
-import whisk.core.entity.InstanceId
+import whisk.core.entity.InvokerInstanceId
 import whisk.core.{ConfigKeys, WhiskConfig}
 
 class KubernetesContainerFactory(label: String, config: WhiskConfig)(implicit actorSystem: ActorSystem,
@@ -87,7 +87,7 @@ object KubernetesContainerFactoryProvider extends ContainerFactoryProvider {
   override def getContainerFactory(actorSystem: ActorSystem,
                                    logging: Logging,
                                    config: WhiskConfig,
-                                   instance: InstanceId,
+                                   instance: InvokerInstanceId,
                                    parameters: Map[String, Set[String]]): ContainerFactory =
     new KubernetesContainerFactory(s"invoker${instance.toInt}", config)(actorSystem, actorSystem.dispatcher, logging)
 }
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index 02d08eb0fa..1b291d4e04 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -38,7 +38,7 @@ import whisk.core.connector.MessagingProvider
 import whisk.core.connector.PingMessage
 import whisk.core.entity._
 import whisk.core.entity.ExecManifest
-import whisk.core.entity.InstanceId
+import whisk.core.entity.InvokerInstanceId
 import whisk.http.{BasicHttpService, BasicRasService}
 import whisk.spi.SpiLoader
 import whisk.utils.ExecutionContextFactory
@@ -165,7 +165,7 @@ object Invoker {
         assignedId
       }
 
-    val invokerInstance = InstanceId(assignedInvokerId, invokerName)
+    val invokerInstance = InvokerInstanceId(assignedInvokerId, invokerName)
     val msgProvider = SpiLoader.get[MessagingProvider]
     if (msgProvider.ensureTopic(config, topic = "invoker" + assignedInvokerId, topicConfig = "invoker").isFailure) {
       abort(s"failure during msgProvider.ensureTopic for topic invoker$assignedInvokerId")
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index 4ad6bbf988..ae4a1ae081 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -43,7 +43,7 @@ import DefaultJsonProtocol._
 
 class InvokerReactive(
   config: WhiskConfig,
-  instance: InstanceId,
+  instance: InvokerInstanceId,
   producer: MessageProducer,
   poolConfig: ContainerPoolConfig = loadConfigOrThrow[ContainerPoolConfig](ConfigKeys.containerPool))(
   implicit actorSystem: ActorSystem,
@@ -114,13 +114,13 @@ class InvokerReactive(
   private val ack = (tid: TransactionId,
                      activationResult: WhiskActivation,
                      blockingInvoke: Boolean,
-                     controllerInstance: InstanceId,
+                     controllerInstance: ControllerInstanceId,
                      userId: UUID) => {
     implicit val transid: TransactionId = tid
 
     def send(res: Either[ActivationId, WhiskActivation], recovery: Boolean = false) = {
       val msg = CompletionMessage(transid, res, instance)
-      producer.send(s"completed${controllerInstance.toInt}", msg).andThen {
+      producer.send(topic = "completed" + controllerInstance.asString, msg).andThen {
         case Success(_) =>
           logging.info(
             this,
diff --git a/tests/src/test/scala/whisk/core/connector/tests/CompletionMessageTests.scala b/tests/src/test/scala/whisk/core/connector/tests/CompletionMessageTests.scala
index 3a9a16e820..ff4419d453 100644
--- a/tests/src/test/scala/whisk/core/connector/tests/CompletionMessageTests.scala
+++ b/tests/src/test/scala/whisk/core/connector/tests/CompletionMessageTests.scala
@@ -53,7 +53,7 @@ class CompletionMessageTests extends FlatSpec with Matchers {
     duration = Some(123))
 
   it should "serialize a left completion message" in {
-    val m = CompletionMessage(TransactionId.testing, Left(ActivationId.generate()), InstanceId(0))
+    val m = CompletionMessage(TransactionId.testing, Left(ActivationId.generate()), InvokerInstanceId(0))
     m.serialize shouldBe JsObject(
       "transid" -> m.transid.toJson,
       "response" -> m.response.left.get.toJson,
@@ -61,7 +61,7 @@ class CompletionMessageTests extends FlatSpec with Matchers {
   }
 
   it should "serialize a right completion message" in {
-    val m = CompletionMessage(TransactionId.testing, Right(activation), InstanceId(0))
+    val m = CompletionMessage(TransactionId.testing, Right(activation), InvokerInstanceId(0))
     m.serialize shouldBe JsObject(
       "transid" -> m.transid.toJson,
       "response" -> m.response.right.get.toJson,
@@ -69,12 +69,12 @@ class CompletionMessageTests extends FlatSpec with Matchers {
   }
 
   it should "deserialize a left completion message" in {
-    val m = CompletionMessage(TransactionId.testing, Left(ActivationId.generate()), InstanceId(0))
+    val m = CompletionMessage(TransactionId.testing, Left(ActivationId.generate()), InvokerInstanceId(0))
     CompletionMessage.parse(m.serialize) shouldBe Success(m)
   }
 
   it should "deserialize a right completion message" in {
-    val m = CompletionMessage(TransactionId.testing, Right(activation), InstanceId(0))
+    val m = CompletionMessage(TransactionId.testing, Right(activation), InvokerInstanceId(0))
     CompletionMessage.parse(m.serialize) shouldBe Success(m)
   }
 }
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerFactoryTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerFactoryTests.scala
index a94890b36d..e6aa3edb33 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerFactoryTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerFactoryTests.scala
@@ -39,7 +39,7 @@ import whisk.core.containerpool.docker.DockerContainerFactory
 import whisk.core.containerpool.docker.DockerContainerFactoryConfig
 import whisk.core.containerpool.docker.RuncApi
 import whisk.core.entity.ExecManifest
-import whisk.core.entity.InstanceId
+import whisk.core.entity.InvokerInstanceId
 import whisk.core.entity.size._
 
 @RunWith(classOf[JUnitRunner])
@@ -104,7 +104,7 @@ class DockerContainerFactoryTests
 
     val factory =
       new DockerContainerFactory(
-        InstanceId(0),
+        InvokerInstanceId(0),
         Map(),
         ContainerArgsConfig("net1", Seq("dns1", "dns2"), Map("env" -> Set("e1", "e2"))),
         DockerContainerFactoryConfig(true))(actorSystem, executionContext, logging, dockerApiStub, mock[RuncApi])
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
index 46f4559f82..9388b76945 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
@@ -78,7 +78,7 @@ class ContainerPoolTests
       action.rev,
       Identity(Subject(), Namespace(invocationNamespace, uuid), AuthKey(uuid, Secret()), Set()),
       ActivationId.generate(),
-      InstanceId(0),
+      ControllerInstanceId("0"),
       blocking = false,
       content = None)
     Run(action, message)
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index 8bbf80665a..87ed49497b 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -87,7 +87,7 @@ class ContainerProxyTests
     action.rev,
     Identity(Subject(), Namespace(invocationNamespace, uuid), AuthKey(uuid, Secret()), Set()),
     ActivationId.generate(),
-    InstanceId(0),
+    ControllerInstanceId("0"),
     blocking = false,
     content = None)
 
@@ -140,7 +140,7 @@ class ContainerProxyTests
 
   /** Creates an inspectable version of the ack method, which records all calls in a buffer */
   def createAcker(a: ExecutableWhiskAction = action) = LoggedFunction {
-    (_: TransactionId, activation: WhiskActivation, _: Boolean, _: InstanceId, _: UUID) =>
+    (_: TransactionId, activation: WhiskActivation, _: Boolean, _: ControllerInstanceId, _: UUID) =>
       activation.annotations.get("limits") shouldBe Some(a.limits.toJson)
       activation.annotations.get("path") shouldBe Some(a.fullyQualifiedName(false).toString.toJson)
       activation.annotations.get("kind") shouldBe Some(a.exec.kind.toJson)
@@ -186,7 +186,7 @@ class ContainerProxyTests
             createAcker(),
             store,
             createCollector(),
-            InstanceId(0, Some("myname")),
+            InvokerInstanceId(0, Some("myname")),
             poolConfig,
             pauseGrace = timeout))
     registerCallback(machine)
@@ -210,7 +210,7 @@ class ContainerProxyTests
     val machine =
       childActorOf(
         ContainerProxy
-          .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+          .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
     registerCallback(machine)
 
     preWarm(machine)
@@ -247,7 +247,7 @@ class ContainerProxyTests
     val machine =
       childActorOf(
         ContainerProxy
-          .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+          .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
     registerCallback(machine)
     preWarm(machine)
 
@@ -295,7 +295,7 @@ class ContainerProxyTests
     val machine =
       childActorOf(
         ContainerProxy
-          .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+          .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
     registerCallback(machine)
     preWarm(machine)
 
@@ -334,7 +334,7 @@ class ContainerProxyTests
     val machine =
       childActorOf(
         ContainerProxy
-          .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+          .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
     registerCallback(machine)
     run(machine, Uninitialized)
 
@@ -367,7 +367,7 @@ class ContainerProxyTests
     val machine =
       childActorOf(
         ContainerProxy
-          .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+          .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
     registerCallback(machine)
 
     machine ! Run(noLogsAction, message)
@@ -398,7 +398,7 @@ class ContainerProxyTests
     val machine =
       childActorOf(
         ContainerProxy
-          .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+          .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
     registerCallback(machine)
     machine ! Run(action, message)
     expectMsg(Transition(machine, Uninitialized, Running))
@@ -434,7 +434,7 @@ class ContainerProxyTests
     val machine =
       childActorOf(
         ContainerProxy
-          .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+          .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
     registerCallback(machine)
     machine ! Run(action, message)
     expectMsg(Transition(machine, Uninitialized, Running))
@@ -474,7 +474,7 @@ class ContainerProxyTests
     val machine =
       childActorOf(
         ContainerProxy
-          .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+          .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
     registerCallback(machine)
     machine ! Run(action, message)
     expectMsg(Transition(machine, Uninitialized, Running))
@@ -505,7 +505,7 @@ class ContainerProxyTests
     val machine =
       childActorOf(
         ContainerProxy
-          .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+          .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
     registerCallback(machine)
     machine ! Run(action, message)
     expectMsg(Transition(machine, Uninitialized, Running))
@@ -535,7 +535,7 @@ class ContainerProxyTests
     val machine =
       childActorOf(
         ContainerProxy
-          .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+          .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
     registerCallback(machine)
     machine ! Run(action, message)
     expectMsg(Transition(machine, Uninitialized, Running))
@@ -569,7 +569,7 @@ class ContainerProxyTests
     val machine =
       childActorOf(
         ContainerProxy
-          .props(factory, acker, store, createCollector(), InstanceId(0), poolConfig, pauseGrace = timeout))
+          .props(factory, acker, store, createCollector(), InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
     registerCallback(machine)
     run(machine, Uninitialized) // first run an activation
     timeout(machine) // times out Ready state so container suspends
@@ -605,7 +605,7 @@ class ContainerProxyTests
     val machine =
       childActorOf(
         ContainerProxy
-          .props(factory, acker, store, createCollector(), InstanceId(0), poolConfig, pauseGrace = timeout))
+          .props(factory, acker, store, createCollector(), InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
     registerCallback(machine)
     run(machine, Uninitialized)
     timeout(machine) // times out Ready state so container suspends
@@ -642,7 +642,7 @@ class ContainerProxyTests
     val machine =
       childActorOf(
         ContainerProxy
-          .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+          .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
     registerCallback(machine)
 
     // Start running the action
@@ -694,7 +694,7 @@ class ContainerProxyTests
     val machine =
       childActorOf(
         ContainerProxy
-          .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+          .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
     registerCallback(machine)
     run(machine, Uninitialized)
     timeout(machine)
diff --git a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
index 0d0b96f77c..82a47d724b 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
@@ -55,7 +55,7 @@ protected trait ControllerTestCommon
     with WhiskServices
     with StreamLogging {
 
-  val activeAckTopicIndex = InstanceId(0)
+  val activeAckTopicIndex = ControllerInstanceId("0")
 
   implicit val routeTestTimeout = RouteTestTimeout(90 seconds)
 
diff --git a/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala
index c54797b663..d01381d4b4 100644
--- a/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala
@@ -1755,7 +1755,7 @@ trait WebActionsApiBaseTests extends ControllerTestCommon with BeforeAndAfterEac
   }
 
   class TestingEntitlementProvider(config: WhiskConfig, loadBalancer: LoadBalancer)
-      extends EntitlementProvider(config, loadBalancer, InstanceId(0)) {
+      extends EntitlementProvider(config, loadBalancer, ControllerInstanceId("0")) {
 
     protected[core] override def checkThrottles(user: Identity)(implicit transid: TransactionId): Future[Unit] = {
       val subject = user.subject
diff --git a/tests/src/test/scala/whisk/core/database/test/DbUtils.scala b/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
index 78910fcdef..10ffcc2704 100644
--- a/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
+++ b/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
@@ -48,7 +48,7 @@ import scala.util.{Failure, Random, Success, Try}
  */
 trait DbUtils extends Assertions {
   implicit val dbOpTimeout = 15 seconds
-  val instance = InstanceId(0)
+  val instance = ControllerInstanceId("0")
   val docsToDelete = ListBuffer[(ArtifactStore[_], DocInfo)]()
   case class RetryOp() extends Throwable
 
diff --git a/tests/src/test/scala/whisk/core/entity/test/ControllerInstanceIdTests.scala b/tests/src/test/scala/whisk/core/entity/test/ControllerInstanceIdTests.scala
new file mode 100644
index 0000000000..0845033d3a
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/entity/test/ControllerInstanceIdTests.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.entity.test
+
+import org.junit.runner.RunWith
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+import org.scalatest.junit.JUnitRunner
+import whisk.core.entity.ControllerInstanceId
+
+@RunWith(classOf[JUnitRunner])
+class ControllerInstanceIdTests extends FlatSpec with Matchers {
+
+  behavior of "ControllerInstanceId"
+
+  it should "accept usable characters" in {
+    Seq("a", "1", "a.1", "a_1").foreach { s =>
+      ControllerInstanceId(s).asString shouldBe s
+
+    }
+  }
+
+  it should "reject unusable characters" in {
+    Seq(" ", "!", "$", "a" * 129).foreach { s =>
+      an[IllegalArgumentException] shouldBe thrownBy {
+        ControllerInstanceId(s)
+      }
+    }
+  }
+
+}
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
index bf0da86d2b..2605058d50 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
@@ -62,6 +62,7 @@ import whisk.core.loadBalancer.InvokerHealth
 import whisk.utils.retry
 import whisk.core.connector.test.TestConnector
 import whisk.core.entitlement.Privilege
+import whisk.core.entity.ControllerInstanceId
 
 @RunWith(classOf[JUnitRunner])
 class InvokerSupervisionTests
@@ -88,11 +89,11 @@ class InvokerSupervisionTests
 
   /** Queries all invokers for their state */
   def allStates(pool: ActorRef) =
-    Await.result(pool.ask(GetStatus).mapTo[IndexedSeq[(InstanceId, InvokerState)]], timeout.duration)
+    Await.result(pool.ask(GetStatus).mapTo[IndexedSeq[(InvokerInstanceId, InvokerState)]], timeout.duration)
 
   /** Helper to generate a list of (InstanceId, InvokerState) */
   def zipWithInstance(list: IndexedSeq[InvokerState]) = list.zipWithIndex.map {
-    case (state, index) => new InvokerHealth(InstanceId(index), state)
+    case (state, index) => new InvokerHealth(InvokerInstanceId(index), state)
   }
 
   val pC = new TestConnector("pingFeedTtest", 4, false) {}
@@ -103,13 +104,13 @@ class InvokerSupervisionTests
     val invoker5 = TestProbe()
     val invoker2 = TestProbe()
 
-    val invoker5Instance = InstanceId(5)
-    val invoker2Instance = InstanceId(2)
+    val invoker5Instance = InvokerInstanceId(5)
+    val invoker2Instance = InvokerInstanceId(2)
 
     val children = mutable.Queue(invoker5.ref, invoker2.ref)
-    val childFactory = (f: ActorRefFactory, instance: InstanceId) => children.dequeue()
+    val childFactory = (f: ActorRefFactory, instance: InvokerInstanceId) => children.dequeue()
 
-    val sendActivationToInvoker = stubFunction[ActivationMessage, InstanceId, Future[RecordMetadata]]
+    val sendActivationToInvoker = stubFunction[ActivationMessage, InvokerInstanceId, Future[RecordMetadata]]
     val supervisor = system.actorOf(InvokerPool.props(childFactory, sendActivationToInvoker, pC))
 
     within(timeout.duration) {
@@ -145,10 +146,10 @@ class InvokerSupervisionTests
 
   it should "forward the ActivationResult to the appropriate invoker" in {
     val invoker = TestProbe()
-    val invokerInstance = InstanceId(0)
+    val invokerInstance = InvokerInstanceId(0)
     val invokerName = s"invoker${invokerInstance.toInt}"
-    val childFactory = (f: ActorRefFactory, instance: InstanceId) => invoker.ref
-    val sendActivationToInvoker = stubFunction[ActivationMessage, InstanceId, Future[RecordMetadata]]
+    val childFactory = (f: ActorRefFactory, instance: InvokerInstanceId) => invoker.ref
+    val sendActivationToInvoker = stubFunction[ActivationMessage, InvokerInstanceId, Future[RecordMetadata]]
 
     val supervisor = system.actorOf(InvokerPool.props(childFactory, sendActivationToInvoker, pC))
 
@@ -170,11 +171,11 @@ class InvokerSupervisionTests
 
   it should "forward an ActivationMessage to the sendActivation-Method" in {
     val invoker = TestProbe()
-    val invokerInstance = InstanceId(0)
+    val invokerInstance = InvokerInstanceId(0)
     val invokerName = s"invoker${invokerInstance.toInt}"
-    val childFactory = (f: ActorRefFactory, instance: InstanceId) => invoker.ref
+    val childFactory = (f: ActorRefFactory, instance: InvokerInstanceId) => invoker.ref
 
-    val sendActivationToInvoker = stubFunction[ActivationMessage, InstanceId, Future[RecordMetadata]]
+    val sendActivationToInvoker = stubFunction[ActivationMessage, InvokerInstanceId, Future[RecordMetadata]]
 
     val supervisor = system.actorOf(InvokerPool.props(childFactory, sendActivationToInvoker, pC))
 
@@ -190,7 +191,7 @@ class InvokerSupervisionTests
         AuthKey(uuid, Secret()),
         Set[Privilege]()),
       activationId = new ActivationIdGenerator {}.make(),
-      rootControllerIndex = InstanceId(0),
+      rootControllerIndex = ControllerInstanceId("0"),
       blocking = false,
       content = None)
     val msg = ActivationRequest(activationMessage, invokerInstance)
@@ -215,7 +216,7 @@ class InvokerSupervisionTests
   // offline -> unhealthy
   it should "start unhealthy, go offline if the state times out and goes unhealthy on a successful ping again" in {
     val pool = TestProbe()
-    val invoker = pool.system.actorOf(InvokerActor.props(InstanceId(0), InstanceId(0)))
+    val invoker = pool.system.actorOf(InvokerActor.props(InvokerInstanceId(0), ControllerInstanceId("0")))
 
     within(timeout.duration) {
       pool.send(invoker, SubscribeTransitionCallBack(pool.ref))
@@ -223,7 +224,7 @@ class InvokerSupervisionTests
       timeout(invoker)
       pool.expectMsg(Transition(invoker, UnHealthy, Offline))
 
-      invoker ! PingMessage(InstanceId(0))
+      invoker ! PingMessage(InvokerInstanceId(0))
       pool.expectMsg(Transition(invoker, Offline, UnHealthy))
     }
   }
@@ -231,7 +232,7 @@ class InvokerSupervisionTests
   // unhealthy -> healthy
   it should "goto healthy again, if unhealthy and error buffer has enough successful invocations" in {
     val pool = TestProbe()
-    val invoker = pool.system.actorOf(InvokerActor.props(InstanceId(0), InstanceId(0)))
+    val invoker = pool.system.actorOf(InvokerActor.props(InvokerInstanceId(0), ControllerInstanceId("0")))
 
     within(timeout.duration) {
       pool.send(invoker, SubscribeTransitionCallBack(pool.ref))
@@ -239,12 +240,12 @@ class InvokerSupervisionTests
 
       // Fill buffer with errors
       (1 to InvokerActor.bufferSize).foreach { _ =>
-        invoker ! InvocationFinishedMessage(InstanceId(0), false)
+        invoker ! InvocationFinishedMessage(InvokerInstanceId(0), false)
       }
 
       // Fill buffer with successful invocations to become healthy again (one below errorTolerance)
       (1 to InvokerActor.bufferSize - InvokerActor.bufferErrorTolerance).foreach { _ =>
-        invoker ! InvocationFinishedMessage(InstanceId(0), true)
+        invoker ! InvocationFinishedMessage(InvokerInstanceId(0), true)
       }
       pool.expectMsg(Transition(invoker, UnHealthy, Healthy))
     }
@@ -254,7 +255,7 @@ class InvokerSupervisionTests
   // offline -> unhealthy
   it should "go offline when unhealthy, if the state times out and go unhealthy on a successful ping again" in {
     val pool = TestProbe()
-    val invoker = pool.system.actorOf(InvokerActor.props(InstanceId(0), InstanceId(0)))
+    val invoker = pool.system.actorOf(InvokerActor.props(InvokerInstanceId(0), ControllerInstanceId("0")))
 
     within(timeout.duration) {
       pool.send(invoker, SubscribeTransitionCallBack(pool.ref))
@@ -263,20 +264,20 @@ class InvokerSupervisionTests
       timeout(invoker)
       pool.expectMsg(Transition(invoker, UnHealthy, Offline))
 
-      invoker ! PingMessage(InstanceId(0))
+      invoker ! PingMessage(InvokerInstanceId(0))
       pool.expectMsg(Transition(invoker, Offline, UnHealthy))
     }
   }
 
   it should "start timer to send testactions when unhealthy" in {
-    val invoker = TestFSMRef(new InvokerActor(InstanceId(0), InstanceId(0)))
+    val invoker = TestFSMRef(new InvokerActor(InvokerInstanceId(0), ControllerInstanceId("0")))
     invoker.stateName shouldBe UnHealthy
 
     invoker.isTimerActive(InvokerActor.timerName) shouldBe true
 
     // Fill buffer with successful invocations to become healthy again (one below errorTolerance)
     (1 to InvokerActor.bufferSize - InvokerActor.bufferErrorTolerance).foreach { _ =>
-      invoker ! InvocationFinishedMessage(InstanceId(0), true)
+      invoker ! InvocationFinishedMessage(InvokerInstanceId(0), true)
     }
     invoker.stateName shouldBe Healthy
 
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
index 2f8ce9639e..4c8d15f240 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
@@ -22,7 +22,7 @@ import org.junit.runner.RunWith
 import org.scalatest.{FlatSpec, Matchers}
 import org.scalatest.junit.JUnitRunner
 import whisk.common.ForcableSemaphore
-import whisk.core.entity.InstanceId
+import whisk.core.entity.InvokerInstanceId
 import whisk.core.loadBalancer._
 
 /**
@@ -35,9 +35,9 @@ import whisk.core.loadBalancer._
 class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with StreamLogging {
   behavior of "ShardingContainerPoolBalancerState"
 
-  def healthy(i: Int) = new InvokerHealth(InstanceId(i), Healthy)
-  def unhealthy(i: Int) = new InvokerHealth(InstanceId(i), UnHealthy)
-  def offline(i: Int) = new InvokerHealth(InstanceId(i), Offline)
+  def healthy(i: Int) = new InvokerHealth(InvokerInstanceId(i), Healthy)
+  def unhealthy(i: Int) = new InvokerHealth(InvokerInstanceId(i), UnHealthy)
+  def offline(i: Int) = new InvokerHealth(InvokerInstanceId(i), Offline)
 
   def semaphores(count: Int, max: Int): IndexedSeq[ForcableSemaphore] =
     IndexedSeq.fill(count)(new ForcableSemaphore(max))


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services