You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ty...@apache.org on 2018/06/26 20:11:51 UTC
[incubator-openwhisk] branch master updated: allow use of string
for controller id (#3507)
This is an automated email from the ASF dual-hosted git repository.
tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 27ff6bf allow use of string for controller id (#3507)
27ff6bf is described below
commit 27ff6bf0fcaebbe51df1e602bc470acc815cbc19
Author: tysonnorris <ty...@gmail.com>
AuthorDate: Tue Jun 26 13:11:47 2018 -0700
allow use of string for controller id (#3507)
* Replace InstanceId with Controller and Invoker specific ids.
Controller ids are strings that must conform to a max length and legal character match.
Invoker ids must be integers as their ordinality matters.
* Simplify checking of controller id.
* review feedback
---
.../src/main/scala/whisk/common/Logging.scala | 9 ++---
.../main/scala/whisk/core/connector/Message.scala | 6 +--
.../core/containerpool/ContainerFactory.scala | 6 +--
.../core/database/RemoteCacheInvalidation.scala | 9 +++--
.../main/scala/whisk/core/entity/InstanceId.scala | 25 ++++++++++--
.../whisk/core/mesos/MesosContainerFactory.scala | 4 +-
.../scala/whisk/core/controller/Controller.scala | 12 +++---
.../scala/whisk/core/controller/RestAPIs.scala | 6 +--
.../core/controller/actions/PrimitiveActions.scala | 2 +-
.../scala/whisk/core/entitlement/Entitlement.scala | 6 +--
.../whisk/core/entitlement/LocalEntitlement.scala | 4 +-
.../core/loadBalancer/InvokerSupervision.scala | 38 +++++++++---------
.../whisk/core/loadBalancer/LoadBalancer.scala | 9 +++--
.../ShardingContainerPoolBalancer.scala | 18 ++++-----
.../whisk/core/containerpool/ContainerProxy.scala | 10 ++---
.../docker/DockerContainerFactory.scala | 6 +--
.../kubernetes/KubernetesContainerFactory.scala | 4 +-
.../main/scala/whisk/core/invoker/Invoker.scala | 4 +-
.../scala/whisk/core/invoker/InvokerReactive.scala | 6 +--
.../connector/tests/CompletionMessageTests.scala | 8 ++--
.../docker/test/DockerContainerFactoryTests.scala | 4 +-
.../containerpool/test/ContainerPoolTests.scala | 2 +-
.../containerpool/test/ContainerProxyTests.scala | 34 ++++++++--------
.../controller/test/ControllerTestCommon.scala | 2 +-
.../core/controller/test/WebActionsApiTests.scala | 2 +-
.../scala/whisk/core/database/test/DbUtils.scala | 2 +-
.../entity/test/ControllerInstanceIdTests.scala | 32 +++++++++++----
.../test/InvokerSupervisionTests.scala | 45 +++++++++++-----------
.../test/ShardingContainerPoolBalancerTests.scala | 8 ++--
29 files changed, 182 insertions(+), 141 deletions(-)
diff --git a/common/scala/src/main/scala/whisk/common/Logging.scala b/common/scala/src/main/scala/whisk/common/Logging.scala
index e5fcfe2..cf8cb26 100644
--- a/common/scala/src/main/scala/whisk/common/Logging.scala
+++ b/common/scala/src/main/scala/whisk/common/Logging.scala
@@ -20,11 +20,10 @@ package whisk.common
import java.io.PrintStream
import java.time.{Clock, Instant, ZoneId}
import java.time.format.DateTimeFormatter
-
import akka.event.Logging._
import akka.event.LoggingAdapter
import kamon.Kamon
-import whisk.core.entity.InstanceId
+import whisk.core.entity.ControllerInstanceId
trait Logging {
@@ -255,7 +254,7 @@ object LoggingMarkers {
/*
* Controller related markers
*/
- def CONTROLLER_STARTUP(i: Int) = LogMarkerToken(controller, s"startup$i", count)
+ def CONTROLLER_STARTUP(id: String) = LogMarkerToken(controller, s"startup$id", count)
// Time of the activation in controller until it is delivered to Kafka
val CONTROLLER_ACTIVATION = LogMarkerToken(controller, activation, start)
@@ -277,8 +276,8 @@ object LoggingMarkers {
val LOADBALANCER_INVOKER_UNHEALTHY = LogMarkerToken(loadbalancer, "invokerUnhealthy", count)
val LOADBALANCER_ACTIVATION_START = LogMarkerToken(loadbalancer, "activations", count)
- def LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance: InstanceId) =
- LogMarkerToken(loadbalancer + controllerInstance.toInt, "activationsInflight", count)
+ def LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance: ControllerInstanceId) =
+ LogMarkerToken(loadbalancer + controllerInstance.asString, "activationsInflight", count)
// Time that is needed to execute the action
val INVOKER_ACTIVATION_RUN = LogMarkerToken(invoker, "activationRun", start)
diff --git a/common/scala/src/main/scala/whisk/core/connector/Message.scala b/common/scala/src/main/scala/whisk/core/connector/Message.scala
index b17de8f..eb960b3 100644
--- a/common/scala/src/main/scala/whisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/Message.scala
@@ -46,7 +46,7 @@ case class ActivationMessage(override val transid: TransactionId,
revision: DocRevision,
user: Identity,
activationId: ActivationId,
- rootControllerIndex: InstanceId,
+ rootControllerIndex: ControllerInstanceId,
blocking: Boolean,
content: Option[JsObject],
cause: Option[ActivationId] = None)
@@ -76,7 +76,7 @@ object ActivationMessage extends DefaultJsonProtocol {
*/
case class CompletionMessage(override val transid: TransactionId,
response: Either[ActivationId, WhiskActivation],
- invoker: InstanceId)
+ invoker: InvokerInstanceId)
extends Message {
override def serialize: String = {
@@ -108,7 +108,7 @@ object CompletionMessage extends DefaultJsonProtocol {
private val serdes = jsonFormat3(CompletionMessage.apply)
}
-case class PingMessage(instance: InstanceId) extends Message {
+case class PingMessage(instance: InvokerInstanceId) extends Message {
override def serialize = PingMessage.serdes.write(this).compactPrint
}
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala b/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala
index 35d3b8b..ae7f3d1 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala
@@ -24,7 +24,7 @@ import whisk.common.TransactionId
import whisk.core.WhiskConfig
import whisk.core.entity.ByteSize
import whisk.core.entity.ExecManifest
-import whisk.core.entity.InstanceId
+import whisk.core.entity.InvokerInstanceId
import whisk.spi.Spi
case class ContainerArgsConfig(network: String,
@@ -74,7 +74,7 @@ object ContainerFactory {
private def isAllowed(c: Char) = c.isLetterOrDigit || c == '_' || c == '.' || c == '-'
/** include the instance name, if specified and strip invalid chars before attempting to use them in the container name */
- def containerNamePrefix(instanceId: InstanceId): String =
+ def containerNamePrefix(instanceId: InvokerInstanceId): String =
s"wsk${instanceId.name.getOrElse("")}${instanceId.toInt}".filter(isAllowed)
}
@@ -86,6 +86,6 @@ trait ContainerFactoryProvider extends Spi {
def getContainerFactory(actorSystem: ActorSystem,
logging: Logging,
config: WhiskConfig,
- instance: InstanceId,
+ instance: InvokerInstanceId,
parameters: Map[String, Set[String]]): ContainerFactory
}
diff --git a/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala b/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala
index 417a8b9..1bfd5aa 100644
--- a/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala
+++ b/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala
@@ -34,7 +34,7 @@ import whisk.core.connector.Message
import whisk.core.connector.MessageFeed
import whisk.core.connector.MessagingProvider
import whisk.core.entity.CacheKey
-import whisk.core.entity.InstanceId
+import whisk.core.entity.ControllerInstanceId
import whisk.core.entity.WhiskAction
import whisk.core.entity.WhiskActionMetaData
import whisk.core.entity.WhiskPackage
@@ -51,13 +51,14 @@ object CacheInvalidationMessage extends DefaultJsonProtocol {
implicit val serdes = jsonFormat(CacheInvalidationMessage.apply _, "key", "instanceId")
}
-class RemoteCacheInvalidation(config: WhiskConfig, component: String, instance: InstanceId)(implicit logging: Logging,
- as: ActorSystem) {
+class RemoteCacheInvalidation(config: WhiskConfig, component: String, instance: ControllerInstanceId)(
+ implicit logging: Logging,
+ as: ActorSystem) {
implicit private val ec = as.dispatchers.lookup("dispatchers.kafka-dispatcher")
private val topic = "cacheInvalidation"
- private val instanceId = s"$component${instance.toInt}"
+ private val instanceId = s"$component${instance.asString}"
private val msgProvider = SpiLoader.get[MessagingProvider]
private val cacheInvalidationConsumer =
diff --git a/common/scala/src/main/scala/whisk/core/entity/InstanceId.scala b/common/scala/src/main/scala/whisk/core/entity/InstanceId.scala
index 8087b8b..9ee2dc0 100644
--- a/common/scala/src/main/scala/whisk/core/entity/InstanceId.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/InstanceId.scala
@@ -18,11 +18,30 @@
package whisk.core.entity
import spray.json.DefaultJsonProtocol
+import whisk.core.entity.ControllerInstanceId.LEGAL_CHARS
+import whisk.core.entity.ControllerInstanceId.MAX_NAME_LENGTH
-case class InstanceId(val instance: Int, name: Option[String] = None) {
+case class InvokerInstanceId(val instance: Int, name: Option[String] = None) {
def toInt: Int = instance
}
-object InstanceId extends DefaultJsonProtocol {
- implicit val serdes = jsonFormat2(InstanceId.apply)
+case class ControllerInstanceId(val asString: String) {
+ require(
+ asString.length <= MAX_NAME_LENGTH && asString.matches(LEGAL_CHARS),
+ "Controller instance id contains invalid characters")
+}
+
+object InvokerInstanceId extends DefaultJsonProtocol {
+ implicit val serdes = jsonFormat2(InvokerInstanceId.apply)
+}
+
+object ControllerInstanceId extends DefaultJsonProtocol {
+ // controller ids become part of a kafka topic, hence, hence allow only certain characters
+ // see https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L29
+ private val LEGAL_CHARS = "[a-zA-Z0-9._-]+"
+
+ // reserve some number of characters as the prefix to be added to topic names
+ private val MAX_NAME_LENGTH = 249 - 121
+
+ implicit val serdes = jsonFormat1(ControllerInstanceId.apply)
}
diff --git a/common/scala/src/main/scala/whisk/core/mesos/MesosContainerFactory.scala b/common/scala/src/main/scala/whisk/core/mesos/MesosContainerFactory.scala
index 42232b9..c7ae587 100644
--- a/common/scala/src/main/scala/whisk/core/mesos/MesosContainerFactory.scala
+++ b/common/scala/src/main/scala/whisk/core/mesos/MesosContainerFactory.scala
@@ -47,7 +47,7 @@ import whisk.core.containerpool.ContainerFactory
import whisk.core.containerpool.ContainerFactoryProvider
import whisk.core.entity.ByteSize
import whisk.core.entity.ExecManifest
-import whisk.core.entity.InstanceId
+import whisk.core.entity.InvokerInstanceId
import whisk.core.entity.UUID
/**
@@ -198,7 +198,7 @@ object MesosContainerFactoryProvider extends ContainerFactoryProvider {
override def getContainerFactory(actorSystem: ActorSystem,
logging: Logging,
config: WhiskConfig,
- instance: InstanceId,
+ instance: InvokerInstanceId,
parameters: Map[String, Set[String]]): ContainerFactory =
new MesosContainerFactory(config, actorSystem, logging, parameters)
}
diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index a8b6a4e..04dddcf 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -74,7 +74,7 @@ import pureconfig.loadConfigOrThrow
* @param verbosity logging verbosity
* @param executionContext Scala runtime support for concurrent operations
*/
-class Controller(val instance: InstanceId,
+class Controller(val instance: ControllerInstanceId,
runtimes: Runtimes,
implicit val whiskConfig: WhiskConfig,
implicit val actorSystem: ActorSystem,
@@ -84,8 +84,8 @@ class Controller(val instance: InstanceId,
TransactionId.controller.mark(
this,
- LoggingMarkers.CONTROLLER_STARTUP(instance.toInt),
- s"starting controller instance ${instance.toInt}",
+ LoggingMarkers.CONTROLLER_STARTUP(instance.asString),
+ s"starting controller instance ${instance.asString}",
logLevel = InfoLevel)
/**
@@ -203,7 +203,7 @@ object Controller {
// if deploying multiple instances (scale out), must pass the instance number as the
require(args.length >= 1, "controller instance required")
- val instance = args(0).toInt
+ val instance = ControllerInstanceId(args(0))
def abort(message: String) = {
logger.error(this, message)
@@ -219,7 +219,7 @@ object Controller {
val msgProvider = SpiLoader.get[MessagingProvider]
Map(
- "completed" + instance -> "completed",
+ "completed" + instance.asString -> "completed",
"health" -> "health",
"cacheInvalidation" -> "cache-invalidation",
"events" -> "events").foreach {
@@ -232,7 +232,7 @@ object Controller {
ExecManifest.initialize(config) match {
case Success(_) =>
val controller = new Controller(
- InstanceId(instance),
+ instance,
ExecManifest.runtimesManifest,
config,
actorSystem,
diff --git a/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala b/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
index e524060..a76e64a 100644
--- a/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
@@ -155,7 +155,7 @@ protected[controller] trait RespondWithHeaders extends Directives {
case class WhiskInformation(buildNo: String, date: String)
class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)(
- implicit val activeAckTopicIndex: InstanceId,
+ implicit val activeAckTopicIndex: ControllerInstanceId,
implicit val actorSystem: ActorSystem,
implicit val materializer: ActorMaterializer,
implicit val logging: Logging,
@@ -235,7 +235,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)(
class ActionsApi(val apiPath: String, val apiVersion: String)(
implicit override val actorSystem: ActorSystem,
- override val activeAckTopicIndex: InstanceId,
+ override val activeAckTopicIndex: ControllerInstanceId,
override val entityStore: EntityStore,
override val activationStore: ActivationStore,
override val entitlementProvider: EntitlementProvider,
@@ -303,7 +303,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)(
override val webApiDirectives: WebApiDirectives)(
implicit override val authStore: AuthStore,
implicit val entityStore: EntityStore,
- override val activeAckTopicIndex: InstanceId,
+ override val activeAckTopicIndex: ControllerInstanceId,
override val activationStore: ActivationStore,
override val entitlementProvider: EntitlementProvider,
override val activationIdFactory: ActivationIdGenerator,
diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
index 135bc48..79da2d5 100644
--- a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
@@ -55,7 +55,7 @@ protected[actions] trait PrimitiveActions {
* The index of the active ack topic, this controller is listening for.
* Typically this is also the instance number of the controller
*/
- protected val activeAckTopicIndex: InstanceId
+ protected val activeAckTopicIndex: ControllerInstanceId
/** Database service to CRUD actions. */
protected val entityStore: EntityStore
diff --git a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
index 23007f3..116f3ef 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
@@ -78,7 +78,7 @@ protected[core] object EntitlementProvider {
protected[core] abstract class EntitlementProvider(
config: WhiskConfig,
loadBalancer: LoadBalancer,
- controllerInstance: InstanceId)(implicit actorSystem: ActorSystem, logging: Logging) {
+ controllerInstance: ControllerInstanceId)(implicit actorSystem: ActorSystem, logging: Logging) {
private implicit val executionContext: ExecutionContext = actorSystem.dispatcher
@@ -372,7 +372,7 @@ protected[core] abstract class EntitlementProvider(
UserEvents.send(
eventProducer,
EventMessage(
- s"controller${controllerInstance.instance}",
+ s"controller${controllerInstance.asString}",
metric,
user.subject,
user.namespace.name.toString,
@@ -388,7 +388,7 @@ protected[core] abstract class EntitlementProvider(
UserEvents.send(
eventProducer,
EventMessage(
- s"controller${controllerInstance.instance}",
+ s"controller${controllerInstance.asString}",
metric,
user.subject,
user.namespace.name.toString,
diff --git a/core/controller/src/main/scala/whisk/core/entitlement/LocalEntitlement.scala b/core/controller/src/main/scala/whisk/core/entitlement/LocalEntitlement.scala
index fa0edf7..4f240c6 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/LocalEntitlement.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/LocalEntitlement.scala
@@ -23,7 +23,7 @@ import akka.actor.ActorSystem
import whisk.common.Logging
import whisk.common.TransactionId
import whisk.core.WhiskConfig
-import whisk.core.entity.{InstanceId, Subject}
+import whisk.core.entity.{ControllerInstanceId, Subject}
import whisk.core.loadBalancer.LoadBalancer
private object LocalEntitlementProvider {
@@ -35,7 +35,7 @@ private object LocalEntitlementProvider {
protected[core] class LocalEntitlementProvider(
private val config: WhiskConfig,
private val loadBalancer: LoadBalancer,
- private val controllerInstance: InstanceId)(implicit actorSystem: ActorSystem, logging: Logging)
+ private val controllerInstance: ControllerInstanceId)(implicit actorSystem: ActorSystem, logging: Logging)
extends EntitlementProvider(config, loadBalancer, controllerInstance) {
private implicit val executionContext = actorSystem.dispatcher
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
index 152f555..b8f29f1 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
@@ -50,8 +50,8 @@ case object Offline extends InvokerState { val asString = "down" }
case object Healthy extends InvokerState { val asString = "up" }
case object UnHealthy extends InvokerState { val asString = "unhealthy" }
-case class ActivationRequest(msg: ActivationMessage, invoker: InstanceId)
-case class InvocationFinishedMessage(invokerInstance: InstanceId, successful: Boolean)
+case class ActivationRequest(msg: ActivationMessage, invoker: InvokerInstanceId)
+case class InvocationFinishedMessage(invokerInstance: InvokerInstanceId, successful: Boolean)
// Sent to a monitor if the state changed
case class CurrentInvokerPoolState(newState: IndexedSeq[InvokerHealth])
@@ -70,8 +70,8 @@ final case class InvokerInfo(buffer: RingBuffer[Boolean])
* Note: An Invoker that never sends an initial Ping will not be considered
* by the InvokerPool and thus might not be caught by monitoring.
*/
-class InvokerPool(childFactory: (ActorRefFactory, InstanceId) => ActorRef,
- sendActivationToInvoker: (ActivationMessage, InstanceId) => Future[RecordMetadata],
+class InvokerPool(childFactory: (ActorRefFactory, InvokerInstanceId) => ActorRef,
+ sendActivationToInvoker: (ActivationMessage, InvokerInstanceId) => Future[RecordMetadata],
pingConsumer: MessageConsumer,
monitor: Option[ActorRef])
extends Actor {
@@ -83,8 +83,8 @@ class InvokerPool(childFactory: (ActorRefFactory, InstanceId) => ActorRef,
// State of the actor. Mutable vars with immutable collections prevents closures or messages
// from leaking the state for external mutation
- var instanceToRef = immutable.Map.empty[InstanceId, ActorRef]
- var refToInstance = immutable.Map.empty[ActorRef, InstanceId]
+ var instanceToRef = immutable.Map.empty[InvokerInstanceId, ActorRef]
+ var refToInstance = immutable.Map.empty[ActorRef, InvokerInstanceId]
var status = IndexedSeq[InvokerHealth]()
def receive = {
@@ -152,10 +152,10 @@ class InvokerPool(childFactory: (ActorRefFactory, InstanceId) => ActorRef,
def padToIndexed[A](list: IndexedSeq[A], n: Int, f: (Int) => A) = list ++ (list.size until n).map(f)
// Register a new invoker
- def registerInvoker(instanceId: InstanceId): ActorRef = {
+ def registerInvoker(instanceId: InvokerInstanceId): ActorRef = {
logging.info(this, s"registered a new invoker: invoker${instanceId.toInt}")(TransactionId.invokerHealth)
- status = padToIndexed(status, instanceId.toInt + 1, i => new InvokerHealth(InstanceId(i), Offline))
+ status = padToIndexed(status, instanceId.toInt + 1, i => new InvokerHealth(InvokerInstanceId(i), Offline))
val ref = childFactory(context, instanceId)
@@ -196,7 +196,7 @@ object InvokerPool {
* @param entityStore store to write the action to
* @return throws an exception on failure to prepare
*/
- def prepare(controllerInstance: InstanceId, entityStore: EntityStore): Unit = {
+ def prepare(controllerInstance: ControllerInstanceId, entityStore: EntityStore): Unit = {
InvokerPool
.healthAction(controllerInstance)
.map {
@@ -211,8 +211,8 @@ object InvokerPool {
}
}
- def props(f: (ActorRefFactory, InstanceId) => ActorRef,
- p: (ActivationMessage, InstanceId) => Future[RecordMetadata],
+ def props(f: (ActorRefFactory, InvokerInstanceId) => ActorRef,
+ p: (ActivationMessage, InvokerInstanceId) => Future[RecordMetadata],
pc: MessageConsumer,
m: Option[ActorRef] = None) = {
Props(new InvokerPool(f, p, pc, m))
@@ -226,11 +226,12 @@ object InvokerPool {
}
/** An action to use for monitoring invoker health. */
- def healthAction(i: InstanceId) = ExecManifest.runtimesManifest.resolveDefaultRuntime("nodejs:6").map { manifest =>
- new WhiskAction(
- namespace = healthActionIdentity.namespace.name.toPath,
- name = EntityName(s"invokerHealthTestAction${i.toInt}"),
- exec = CodeExecAsString(manifest, """function main(params) { return params; }""", None))
+ def healthAction(i: ControllerInstanceId) = ExecManifest.runtimesManifest.resolveDefaultRuntime("nodejs:6").map {
+ manifest =>
+ new WhiskAction(
+ namespace = healthActionIdentity.namespace.name.toPath,
+ name = EntityName(s"invokerHealthTestAction${i.asString}"),
+ exec = CodeExecAsString(manifest, """function main(params) { return params; }""", None))
}
}
@@ -240,7 +241,8 @@ object InvokerPool {
* This finite state-machine represents an Invoker in its possible
* states "Healthy" and "Offline".
*/
-class InvokerActor(invokerInstance: InstanceId, controllerInstance: InstanceId) extends FSM[InvokerState, InvokerInfo] {
+class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: ControllerInstanceId)
+ extends FSM[InvokerState, InvokerInfo] {
implicit val transid = TransactionId.invokerHealth
implicit val logging = new AkkaLogging(context.system.log)
val name = s"invoker${invokerInstance.toInt}"
@@ -391,7 +393,7 @@ class InvokerActor(invokerInstance: InstanceId, controllerInstance: InstanceId)
}
object InvokerActor {
- def props(invokerInstance: InstanceId, controllerInstance: InstanceId) =
+ def props(invokerInstance: InvokerInstanceId, controllerInstance: ControllerInstanceId) =
Props(new InvokerActor(invokerInstance, controllerInstance))
val bufferSize = 10
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala
index 52ffd73..5d581fd 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala
@@ -33,7 +33,7 @@ import whisk.spi.Spi
* @param id a unique instance identifier for the invoker
* @param status it status (healthy, unhealthy, offline)
*/
-class InvokerHealth(val id: InstanceId, val status: InvokerState) {
+class InvokerHealth(val id: InvokerInstanceId, val status: InvokerState) {
override def equals(obj: scala.Any): Boolean = obj match {
case that: InvokerHealth => that.id == this.id && that.status == this.status
case _ => false
@@ -82,9 +82,10 @@ trait LoadBalancer {
trait LoadBalancerProvider extends Spi {
def requiredProperties: Map[String, String]
- def loadBalancer(whiskConfig: WhiskConfig, instance: InstanceId)(implicit actorSystem: ActorSystem,
- logging: Logging,
- materializer: ActorMaterializer): LoadBalancer
+ def loadBalancer(whiskConfig: WhiskConfig, instance: ControllerInstanceId)(
+ implicit actorSystem: ActorSystem,
+ logging: Logging,
+ materializer: ActorMaterializer): LoadBalancer
}
/** Exception thrown by the loadbalancer */
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index bad3842..eac4aff 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -50,7 +50,7 @@ import scala.util.{Failure, Success}
* Horizontal sharding means, that each invoker's capacity is evenly divided between the loadbalancers. If an invoker
* has at most 16 slots available, those will be divided to 8 slots for each loadbalancer (if there are 2).
*/
-class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: InstanceId)(
+class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: ControllerInstanceId)(
implicit val actorSystem: ActorSystem,
logging: Logging,
materializer: ActorMaterializer)
@@ -159,7 +159,7 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Ins
/** 2. Update local state with the to be executed activation */
private def setupActivation(msg: ActivationMessage,
action: ExecutableWhiskActionMetaData,
- instance: InstanceId): ActivationEntry = {
+ instance: InvokerInstanceId): ActivationEntry = {
totalActivations.increment()
activationsPerNamespace.getOrElseUpdate(msg.user.namespace.uuid, new LongAdder()).increment()
@@ -191,7 +191,7 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Ins
/** 3. Send the activation to the invoker */
private def sendActivationToInvoker(producer: MessageProducer,
msg: ActivationMessage,
- invoker: InstanceId): Future[RecordMetadata] = {
+ invoker: InvokerInstanceId): Future[RecordMetadata] = {
implicit val transid: TransactionId = msg.transid
val topic = s"invoker${invoker.toInt}"
@@ -218,7 +218,7 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Ins
* Subscribes to active acks (completion messages from the invokers), and
* registers a handler for received active acks from invokers.
*/
- private val activeAckTopic = s"completed${controllerInstance.toInt}"
+ private val activeAckTopic = s"completed${controllerInstance.asString}"
private val maxActiveAcksPerPoll = 128
private val activeAckPollDuration = 1.second
private val activeAckConsumer =
@@ -252,7 +252,7 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Ins
private def processCompletion(response: Either[ActivationId, WhiskActivation],
tid: TransactionId,
forced: Boolean,
- invoker: InstanceId): Unit = {
+ invoker: InvokerInstanceId): Unit = {
val aid = response.fold(l => l, r => r.activationId)
// treat left as success (as it is the result of a message exceeding the bus limit)
@@ -295,14 +295,14 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Ins
InvokerPool.props(
(f, i) => f.actorOf(InvokerActor.props(i, controllerInstance)),
(m, i) => sendActivationToInvoker(messageProducer, m, i),
- messagingProvider.getConsumer(config, s"health${controllerInstance.toInt}", "health", maxPeek = 128),
+ messagingProvider.getConsumer(config, s"health${controllerInstance.asString}", "health", maxPeek = 128),
Some(monitor)))
}
}
object ShardingContainerPoolBalancer extends LoadBalancerProvider {
- override def loadBalancer(whiskConfig: WhiskConfig, instance: InstanceId)(
+ override def loadBalancer(whiskConfig: WhiskConfig, instance: ControllerInstanceId)(
implicit actorSystem: ActorSystem,
logging: Logging,
materializer: ActorMaterializer): LoadBalancer = new ShardingContainerPoolBalancer(whiskConfig, instance)
@@ -341,7 +341,7 @@ object ShardingContainerPoolBalancer extends LoadBalancerProvider {
dispatched: IndexedSeq[ForcableSemaphore],
index: Int,
step: Int,
- stepsDone: Int = 0)(implicit logging: Logging): Option[InstanceId] = {
+ stepsDone: Int = 0)(implicit logging: Logging): Option[InvokerInstanceId] = {
val numInvokers = invokers.size
if (numInvokers > 0) {
@@ -501,6 +501,6 @@ case class ShardingContainerPoolBalancerConfig(blackboxFraction: Double, invoker
*/
case class ActivationEntry(id: ActivationId,
namespaceId: UUID,
- invokerName: InstanceId,
+ invokerName: InvokerInstanceId,
timeoutHandler: Cancellable,
promise: Promise[Either[ActivationId, WhiskActivation]])
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
index 0a7c1ac..c0914b9 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
@@ -95,10 +95,10 @@ case object RescheduleJob // job is sent back to parent and could not be process
*/
class ContainerProxy(
factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container],
- sendActiveAck: (TransactionId, WhiskActivation, Boolean, InstanceId, UUID) => Future[Any],
+ sendActiveAck: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID) => Future[Any],
storeActivation: (TransactionId, WhiskActivation) => Future[Any],
collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs],
- instance: InstanceId,
+ instance: InvokerInstanceId,
poolConfig: ContainerPoolConfig,
unusedTimeout: FiniteDuration,
pauseGrace: FiniteDuration)
@@ -428,10 +428,10 @@ final case class ContainerProxyTimeoutConfig(idleContainer: FiniteDuration, paus
object ContainerProxy {
def props(
factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container],
- ack: (TransactionId, WhiskActivation, Boolean, InstanceId, UUID) => Future[Any],
+ ack: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID) => Future[Any],
store: (TransactionId, WhiskActivation) => Future[Any],
collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs],
- instance: InstanceId,
+ instance: InvokerInstanceId,
poolConfig: ContainerPoolConfig,
unusedTimeout: FiniteDuration = timeouts.idleContainer,
pauseGrace: FiniteDuration = timeouts.pauseGrace) =
@@ -449,7 +449,7 @@ object ContainerProxy {
* @param suffix the container name's suffix
* @return a unique container name
*/
- def containerName(instance: InstanceId, prefix: String, suffix: String): String = {
+ def containerName(instance: InvokerInstanceId, prefix: String, suffix: String): String = {
def isAllowed(c: Char): Boolean = c.isLetterOrDigit || c == '_'
val sanitizedPrefix = prefix.filter(isAllowed)
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala
index 4d4fcf1..68ada27 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala
@@ -29,7 +29,7 @@ import whisk.core.containerpool.ContainerFactory
import whisk.core.containerpool.ContainerFactoryProvider
import whisk.core.entity.ByteSize
import whisk.core.entity.ExecManifest
-import whisk.core.entity.InstanceId
+import whisk.core.entity.InvokerInstanceId
import scala.concurrent.duration._
import java.util.concurrent.TimeoutException
import pureconfig._
@@ -38,7 +38,7 @@ import whisk.core.containerpool.ContainerArgsConfig
case class DockerContainerFactoryConfig(useRunc: Boolean)
-class DockerContainerFactory(instance: InstanceId,
+class DockerContainerFactory(instance: InvokerInstanceId,
parameters: Map[String, Set[String]],
containerArgsConfig: ContainerArgsConfig =
loadConfigOrThrow[ContainerArgsConfig](ConfigKeys.containerArgs),
@@ -136,7 +136,7 @@ object DockerContainerFactoryProvider extends ContainerFactoryProvider {
override def getContainerFactory(actorSystem: ActorSystem,
logging: Logging,
config: WhiskConfig,
- instanceId: InstanceId,
+ instanceId: InvokerInstanceId,
parameters: Map[String, Set[String]]): ContainerFactory = {
new DockerContainerFactory(instanceId, parameters)(
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
index b56a285..ec01bf3 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
@@ -32,7 +32,7 @@ import whisk.core.containerpool.ContainerFactory
import whisk.core.containerpool.ContainerFactoryProvider
import whisk.core.entity.ByteSize
import whisk.core.entity.ExecManifest.ImageName
-import whisk.core.entity.InstanceId
+import whisk.core.entity.InvokerInstanceId
import whisk.core.{ConfigKeys, WhiskConfig}
class KubernetesContainerFactory(label: String, config: WhiskConfig)(implicit actorSystem: ActorSystem,
@@ -87,7 +87,7 @@ object KubernetesContainerFactoryProvider extends ContainerFactoryProvider {
override def getContainerFactory(actorSystem: ActorSystem,
logging: Logging,
config: WhiskConfig,
- instance: InstanceId,
+ instance: InvokerInstanceId,
parameters: Map[String, Set[String]]): ContainerFactory =
new KubernetesContainerFactory(s"invoker${instance.toInt}", config)(actorSystem, actorSystem.dispatcher, logging)
}
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index 02d08eb..1b291d4 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -38,7 +38,7 @@ import whisk.core.connector.MessagingProvider
import whisk.core.connector.PingMessage
import whisk.core.entity._
import whisk.core.entity.ExecManifest
-import whisk.core.entity.InstanceId
+import whisk.core.entity.InvokerInstanceId
import whisk.http.{BasicHttpService, BasicRasService}
import whisk.spi.SpiLoader
import whisk.utils.ExecutionContextFactory
@@ -165,7 +165,7 @@ object Invoker {
assignedId
}
- val invokerInstance = InstanceId(assignedInvokerId, invokerName)
+ val invokerInstance = InvokerInstanceId(assignedInvokerId, invokerName)
val msgProvider = SpiLoader.get[MessagingProvider]
if (msgProvider.ensureTopic(config, topic = "invoker" + assignedInvokerId, topicConfig = "invoker").isFailure) {
abort(s"failure during msgProvider.ensureTopic for topic invoker$assignedInvokerId")
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index 4ad6bbf..ae4a1ae 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -43,7 +43,7 @@ import DefaultJsonProtocol._
class InvokerReactive(
config: WhiskConfig,
- instance: InstanceId,
+ instance: InvokerInstanceId,
producer: MessageProducer,
poolConfig: ContainerPoolConfig = loadConfigOrThrow[ContainerPoolConfig](ConfigKeys.containerPool))(
implicit actorSystem: ActorSystem,
@@ -114,13 +114,13 @@ class InvokerReactive(
private val ack = (tid: TransactionId,
activationResult: WhiskActivation,
blockingInvoke: Boolean,
- controllerInstance: InstanceId,
+ controllerInstance: ControllerInstanceId,
userId: UUID) => {
implicit val transid: TransactionId = tid
def send(res: Either[ActivationId, WhiskActivation], recovery: Boolean = false) = {
val msg = CompletionMessage(transid, res, instance)
- producer.send(s"completed${controllerInstance.toInt}", msg).andThen {
+ producer.send(topic = "completed" + controllerInstance.asString, msg).andThen {
case Success(_) =>
logging.info(
this,
diff --git a/tests/src/test/scala/whisk/core/connector/tests/CompletionMessageTests.scala b/tests/src/test/scala/whisk/core/connector/tests/CompletionMessageTests.scala
index 3a9a16e..ff4419d 100644
--- a/tests/src/test/scala/whisk/core/connector/tests/CompletionMessageTests.scala
+++ b/tests/src/test/scala/whisk/core/connector/tests/CompletionMessageTests.scala
@@ -53,7 +53,7 @@ class CompletionMessageTests extends FlatSpec with Matchers {
duration = Some(123))
it should "serialize a left completion message" in {
- val m = CompletionMessage(TransactionId.testing, Left(ActivationId.generate()), InstanceId(0))
+ val m = CompletionMessage(TransactionId.testing, Left(ActivationId.generate()), InvokerInstanceId(0))
m.serialize shouldBe JsObject(
"transid" -> m.transid.toJson,
"response" -> m.response.left.get.toJson,
@@ -61,7 +61,7 @@ class CompletionMessageTests extends FlatSpec with Matchers {
}
it should "serialize a right completion message" in {
- val m = CompletionMessage(TransactionId.testing, Right(activation), InstanceId(0))
+ val m = CompletionMessage(TransactionId.testing, Right(activation), InvokerInstanceId(0))
m.serialize shouldBe JsObject(
"transid" -> m.transid.toJson,
"response" -> m.response.right.get.toJson,
@@ -69,12 +69,12 @@ class CompletionMessageTests extends FlatSpec with Matchers {
}
it should "deserialize a left completion message" in {
- val m = CompletionMessage(TransactionId.testing, Left(ActivationId.generate()), InstanceId(0))
+ val m = CompletionMessage(TransactionId.testing, Left(ActivationId.generate()), InvokerInstanceId(0))
CompletionMessage.parse(m.serialize) shouldBe Success(m)
}
it should "deserialize a right completion message" in {
- val m = CompletionMessage(TransactionId.testing, Right(activation), InstanceId(0))
+ val m = CompletionMessage(TransactionId.testing, Right(activation), InvokerInstanceId(0))
CompletionMessage.parse(m.serialize) shouldBe Success(m)
}
}
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerFactoryTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerFactoryTests.scala
index a94890b..e6aa3ed 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerFactoryTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerFactoryTests.scala
@@ -39,7 +39,7 @@ import whisk.core.containerpool.docker.DockerContainerFactory
import whisk.core.containerpool.docker.DockerContainerFactoryConfig
import whisk.core.containerpool.docker.RuncApi
import whisk.core.entity.ExecManifest
-import whisk.core.entity.InstanceId
+import whisk.core.entity.InvokerInstanceId
import whisk.core.entity.size._
@RunWith(classOf[JUnitRunner])
@@ -104,7 +104,7 @@ class DockerContainerFactoryTests
val factory =
new DockerContainerFactory(
- InstanceId(0),
+ InvokerInstanceId(0),
Map(),
ContainerArgsConfig("net1", Seq("dns1", "dns2"), Map("env" -> Set("e1", "e2"))),
DockerContainerFactoryConfig(true))(actorSystem, executionContext, logging, dockerApiStub, mock[RuncApi])
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
index 46f4559..9388b76 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
@@ -78,7 +78,7 @@ class ContainerPoolTests
action.rev,
Identity(Subject(), Namespace(invocationNamespace, uuid), AuthKey(uuid, Secret()), Set()),
ActivationId.generate(),
- InstanceId(0),
+ ControllerInstanceId("0"),
blocking = false,
content = None)
Run(action, message)
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index 8bbf806..87ed494 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -87,7 +87,7 @@ class ContainerProxyTests
action.rev,
Identity(Subject(), Namespace(invocationNamespace, uuid), AuthKey(uuid, Secret()), Set()),
ActivationId.generate(),
- InstanceId(0),
+ ControllerInstanceId("0"),
blocking = false,
content = None)
@@ -140,7 +140,7 @@ class ContainerProxyTests
/** Creates an inspectable version of the ack method, which records all calls in a buffer */
def createAcker(a: ExecutableWhiskAction = action) = LoggedFunction {
- (_: TransactionId, activation: WhiskActivation, _: Boolean, _: InstanceId, _: UUID) =>
+ (_: TransactionId, activation: WhiskActivation, _: Boolean, _: ControllerInstanceId, _: UUID) =>
activation.annotations.get("limits") shouldBe Some(a.limits.toJson)
activation.annotations.get("path") shouldBe Some(a.fullyQualifiedName(false).toString.toJson)
activation.annotations.get("kind") shouldBe Some(a.exec.kind.toJson)
@@ -186,7 +186,7 @@ class ContainerProxyTests
createAcker(),
store,
createCollector(),
- InstanceId(0, Some("myname")),
+ InvokerInstanceId(0, Some("myname")),
poolConfig,
pauseGrace = timeout))
registerCallback(machine)
@@ -210,7 +210,7 @@ class ContainerProxyTests
val machine =
childActorOf(
ContainerProxy
- .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+ .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
preWarm(machine)
@@ -247,7 +247,7 @@ class ContainerProxyTests
val machine =
childActorOf(
ContainerProxy
- .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+ .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
preWarm(machine)
@@ -295,7 +295,7 @@ class ContainerProxyTests
val machine =
childActorOf(
ContainerProxy
- .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+ .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
preWarm(machine)
@@ -334,7 +334,7 @@ class ContainerProxyTests
val machine =
childActorOf(
ContainerProxy
- .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+ .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
run(machine, Uninitialized)
@@ -367,7 +367,7 @@ class ContainerProxyTests
val machine =
childActorOf(
ContainerProxy
- .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+ .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
machine ! Run(noLogsAction, message)
@@ -398,7 +398,7 @@ class ContainerProxyTests
val machine =
childActorOf(
ContainerProxy
- .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+ .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
machine ! Run(action, message)
expectMsg(Transition(machine, Uninitialized, Running))
@@ -434,7 +434,7 @@ class ContainerProxyTests
val machine =
childActorOf(
ContainerProxy
- .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+ .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
machine ! Run(action, message)
expectMsg(Transition(machine, Uninitialized, Running))
@@ -474,7 +474,7 @@ class ContainerProxyTests
val machine =
childActorOf(
ContainerProxy
- .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+ .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
machine ! Run(action, message)
expectMsg(Transition(machine, Uninitialized, Running))
@@ -505,7 +505,7 @@ class ContainerProxyTests
val machine =
childActorOf(
ContainerProxy
- .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+ .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
machine ! Run(action, message)
expectMsg(Transition(machine, Uninitialized, Running))
@@ -535,7 +535,7 @@ class ContainerProxyTests
val machine =
childActorOf(
ContainerProxy
- .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+ .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
machine ! Run(action, message)
expectMsg(Transition(machine, Uninitialized, Running))
@@ -569,7 +569,7 @@ class ContainerProxyTests
val machine =
childActorOf(
ContainerProxy
- .props(factory, acker, store, createCollector(), InstanceId(0), poolConfig, pauseGrace = timeout))
+ .props(factory, acker, store, createCollector(), InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
run(machine, Uninitialized) // first run an activation
timeout(machine) // times out Ready state so container suspends
@@ -605,7 +605,7 @@ class ContainerProxyTests
val machine =
childActorOf(
ContainerProxy
- .props(factory, acker, store, createCollector(), InstanceId(0), poolConfig, pauseGrace = timeout))
+ .props(factory, acker, store, createCollector(), InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
run(machine, Uninitialized)
timeout(machine) // times out Ready state so container suspends
@@ -642,7 +642,7 @@ class ContainerProxyTests
val machine =
childActorOf(
ContainerProxy
- .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+ .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
// Start running the action
@@ -694,7 +694,7 @@ class ContainerProxyTests
val machine =
childActorOf(
ContainerProxy
- .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+ .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
run(machine, Uninitialized)
timeout(machine)
diff --git a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
index 0d0b96f..82a47d7 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
@@ -55,7 +55,7 @@ protected trait ControllerTestCommon
with WhiskServices
with StreamLogging {
- val activeAckTopicIndex = InstanceId(0)
+ val activeAckTopicIndex = ControllerInstanceId("0")
implicit val routeTestTimeout = RouteTestTimeout(90 seconds)
diff --git a/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala
index c54797b..d01381d 100644
--- a/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala
@@ -1755,7 +1755,7 @@ trait WebActionsApiBaseTests extends ControllerTestCommon with BeforeAndAfterEac
}
class TestingEntitlementProvider(config: WhiskConfig, loadBalancer: LoadBalancer)
- extends EntitlementProvider(config, loadBalancer, InstanceId(0)) {
+ extends EntitlementProvider(config, loadBalancer, ControllerInstanceId("0")) {
protected[core] override def checkThrottles(user: Identity)(implicit transid: TransactionId): Future[Unit] = {
val subject = user.subject
diff --git a/tests/src/test/scala/whisk/core/database/test/DbUtils.scala b/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
index 78910fc..10ffcc2 100644
--- a/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
+++ b/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
@@ -48,7 +48,7 @@ import scala.util.{Failure, Random, Success, Try}
*/
trait DbUtils extends Assertions {
implicit val dbOpTimeout = 15 seconds
- val instance = InstanceId(0)
+ val instance = ControllerInstanceId("0")
val docsToDelete = ListBuffer[(ArtifactStore[_], DocInfo)]()
case class RetryOp() extends Throwable
diff --git a/common/scala/src/main/scala/whisk/core/entity/InstanceId.scala b/tests/src/test/scala/whisk/core/entity/test/ControllerInstanceIdTests.scala
similarity index 53%
copy from common/scala/src/main/scala/whisk/core/entity/InstanceId.scala
copy to tests/src/test/scala/whisk/core/entity/test/ControllerInstanceIdTests.scala
index 8087b8b..0845033 100644
--- a/common/scala/src/main/scala/whisk/core/entity/InstanceId.scala
+++ b/tests/src/test/scala/whisk/core/entity/test/ControllerInstanceIdTests.scala
@@ -15,14 +15,32 @@
* limitations under the License.
*/
-package whisk.core.entity
+package whisk.core.entity.test
-import spray.json.DefaultJsonProtocol
+import org.junit.runner.RunWith
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+import org.scalatest.junit.JUnitRunner
+import whisk.core.entity.ControllerInstanceId
-case class InstanceId(val instance: Int, name: Option[String] = None) {
- def toInt: Int = instance
-}
+@RunWith(classOf[JUnitRunner])
+class ControllerInstanceIdTests extends FlatSpec with Matchers {
+
+ behavior of "ControllerInstanceId"
+
+ it should "accept usable characters" in {
+ Seq("a", "1", "a.1", "a_1").foreach { s =>
+ ControllerInstanceId(s).asString shouldBe s
+
+ }
+ }
+
+ it should "reject unusable characters" in {
+ Seq(" ", "!", "$", "a" * 129).foreach { s =>
+ an[IllegalArgumentException] shouldBe thrownBy {
+ ControllerInstanceId(s)
+ }
+ }
+ }
-object InstanceId extends DefaultJsonProtocol {
- implicit val serdes = jsonFormat2(InstanceId.apply)
}
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
index bf0da86..2605058 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
@@ -62,6 +62,7 @@ import whisk.core.loadBalancer.InvokerHealth
import whisk.utils.retry
import whisk.core.connector.test.TestConnector
import whisk.core.entitlement.Privilege
+import whisk.core.entity.ControllerInstanceId
@RunWith(classOf[JUnitRunner])
class InvokerSupervisionTests
@@ -88,11 +89,11 @@ class InvokerSupervisionTests
/** Queries all invokers for their state */
def allStates(pool: ActorRef) =
- Await.result(pool.ask(GetStatus).mapTo[IndexedSeq[(InstanceId, InvokerState)]], timeout.duration)
+ Await.result(pool.ask(GetStatus).mapTo[IndexedSeq[(InvokerInstanceId, InvokerState)]], timeout.duration)
/** Helper to generate a list of (InstanceId, InvokerState) */
def zipWithInstance(list: IndexedSeq[InvokerState]) = list.zipWithIndex.map {
- case (state, index) => new InvokerHealth(InstanceId(index), state)
+ case (state, index) => new InvokerHealth(InvokerInstanceId(index), state)
}
val pC = new TestConnector("pingFeedTtest", 4, false) {}
@@ -103,13 +104,13 @@ class InvokerSupervisionTests
val invoker5 = TestProbe()
val invoker2 = TestProbe()
- val invoker5Instance = InstanceId(5)
- val invoker2Instance = InstanceId(2)
+ val invoker5Instance = InvokerInstanceId(5)
+ val invoker2Instance = InvokerInstanceId(2)
val children = mutable.Queue(invoker5.ref, invoker2.ref)
- val childFactory = (f: ActorRefFactory, instance: InstanceId) => children.dequeue()
+ val childFactory = (f: ActorRefFactory, instance: InvokerInstanceId) => children.dequeue()
- val sendActivationToInvoker = stubFunction[ActivationMessage, InstanceId, Future[RecordMetadata]]
+ val sendActivationToInvoker = stubFunction[ActivationMessage, InvokerInstanceId, Future[RecordMetadata]]
val supervisor = system.actorOf(InvokerPool.props(childFactory, sendActivationToInvoker, pC))
within(timeout.duration) {
@@ -145,10 +146,10 @@ class InvokerSupervisionTests
it should "forward the ActivationResult to the appropriate invoker" in {
val invoker = TestProbe()
- val invokerInstance = InstanceId(0)
+ val invokerInstance = InvokerInstanceId(0)
val invokerName = s"invoker${invokerInstance.toInt}"
- val childFactory = (f: ActorRefFactory, instance: InstanceId) => invoker.ref
- val sendActivationToInvoker = stubFunction[ActivationMessage, InstanceId, Future[RecordMetadata]]
+ val childFactory = (f: ActorRefFactory, instance: InvokerInstanceId) => invoker.ref
+ val sendActivationToInvoker = stubFunction[ActivationMessage, InvokerInstanceId, Future[RecordMetadata]]
val supervisor = system.actorOf(InvokerPool.props(childFactory, sendActivationToInvoker, pC))
@@ -170,11 +171,11 @@ class InvokerSupervisionTests
it should "forward an ActivationMessage to the sendActivation-Method" in {
val invoker = TestProbe()
- val invokerInstance = InstanceId(0)
+ val invokerInstance = InvokerInstanceId(0)
val invokerName = s"invoker${invokerInstance.toInt}"
- val childFactory = (f: ActorRefFactory, instance: InstanceId) => invoker.ref
+ val childFactory = (f: ActorRefFactory, instance: InvokerInstanceId) => invoker.ref
- val sendActivationToInvoker = stubFunction[ActivationMessage, InstanceId, Future[RecordMetadata]]
+ val sendActivationToInvoker = stubFunction[ActivationMessage, InvokerInstanceId, Future[RecordMetadata]]
val supervisor = system.actorOf(InvokerPool.props(childFactory, sendActivationToInvoker, pC))
@@ -190,7 +191,7 @@ class InvokerSupervisionTests
AuthKey(uuid, Secret()),
Set[Privilege]()),
activationId = new ActivationIdGenerator {}.make(),
- rootControllerIndex = InstanceId(0),
+ rootControllerIndex = ControllerInstanceId("0"),
blocking = false,
content = None)
val msg = ActivationRequest(activationMessage, invokerInstance)
@@ -215,7 +216,7 @@ class InvokerSupervisionTests
// offline -> unhealthy
it should "start unhealthy, go offline if the state times out and goes unhealthy on a successful ping again" in {
val pool = TestProbe()
- val invoker = pool.system.actorOf(InvokerActor.props(InstanceId(0), InstanceId(0)))
+ val invoker = pool.system.actorOf(InvokerActor.props(InvokerInstanceId(0), ControllerInstanceId("0")))
within(timeout.duration) {
pool.send(invoker, SubscribeTransitionCallBack(pool.ref))
@@ -223,7 +224,7 @@ class InvokerSupervisionTests
timeout(invoker)
pool.expectMsg(Transition(invoker, UnHealthy, Offline))
- invoker ! PingMessage(InstanceId(0))
+ invoker ! PingMessage(InvokerInstanceId(0))
pool.expectMsg(Transition(invoker, Offline, UnHealthy))
}
}
@@ -231,7 +232,7 @@ class InvokerSupervisionTests
// unhealthy -> healthy
it should "goto healthy again, if unhealthy and error buffer has enough successful invocations" in {
val pool = TestProbe()
- val invoker = pool.system.actorOf(InvokerActor.props(InstanceId(0), InstanceId(0)))
+ val invoker = pool.system.actorOf(InvokerActor.props(InvokerInstanceId(0), ControllerInstanceId("0")))
within(timeout.duration) {
pool.send(invoker, SubscribeTransitionCallBack(pool.ref))
@@ -239,12 +240,12 @@ class InvokerSupervisionTests
// Fill buffer with errors
(1 to InvokerActor.bufferSize).foreach { _ =>
- invoker ! InvocationFinishedMessage(InstanceId(0), false)
+ invoker ! InvocationFinishedMessage(InvokerInstanceId(0), false)
}
// Fill buffer with successful invocations to become healthy again (one below errorTolerance)
(1 to InvokerActor.bufferSize - InvokerActor.bufferErrorTolerance).foreach { _ =>
- invoker ! InvocationFinishedMessage(InstanceId(0), true)
+ invoker ! InvocationFinishedMessage(InvokerInstanceId(0), true)
}
pool.expectMsg(Transition(invoker, UnHealthy, Healthy))
}
@@ -254,7 +255,7 @@ class InvokerSupervisionTests
// offline -> unhealthy
it should "go offline when unhealthy, if the state times out and go unhealthy on a successful ping again" in {
val pool = TestProbe()
- val invoker = pool.system.actorOf(InvokerActor.props(InstanceId(0), InstanceId(0)))
+ val invoker = pool.system.actorOf(InvokerActor.props(InvokerInstanceId(0), ControllerInstanceId("0")))
within(timeout.duration) {
pool.send(invoker, SubscribeTransitionCallBack(pool.ref))
@@ -263,20 +264,20 @@ class InvokerSupervisionTests
timeout(invoker)
pool.expectMsg(Transition(invoker, UnHealthy, Offline))
- invoker ! PingMessage(InstanceId(0))
+ invoker ! PingMessage(InvokerInstanceId(0))
pool.expectMsg(Transition(invoker, Offline, UnHealthy))
}
}
it should "start timer to send testactions when unhealthy" in {
- val invoker = TestFSMRef(new InvokerActor(InstanceId(0), InstanceId(0)))
+ val invoker = TestFSMRef(new InvokerActor(InvokerInstanceId(0), ControllerInstanceId("0")))
invoker.stateName shouldBe UnHealthy
invoker.isTimerActive(InvokerActor.timerName) shouldBe true
// Fill buffer with successful invocations to become healthy again (one below errorTolerance)
(1 to InvokerActor.bufferSize - InvokerActor.bufferErrorTolerance).foreach { _ =>
- invoker ! InvocationFinishedMessage(InstanceId(0), true)
+ invoker ! InvocationFinishedMessage(InvokerInstanceId(0), true)
}
invoker.stateName shouldBe Healthy
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
index 2f8ce96..4c8d15f 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
@@ -22,7 +22,7 @@ import org.junit.runner.RunWith
import org.scalatest.{FlatSpec, Matchers}
import org.scalatest.junit.JUnitRunner
import whisk.common.ForcableSemaphore
-import whisk.core.entity.InstanceId
+import whisk.core.entity.InvokerInstanceId
import whisk.core.loadBalancer._
/**
@@ -35,9 +35,9 @@ import whisk.core.loadBalancer._
class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with StreamLogging {
behavior of "ShardingContainerPoolBalancerState"
- def healthy(i: Int) = new InvokerHealth(InstanceId(i), Healthy)
- def unhealthy(i: Int) = new InvokerHealth(InstanceId(i), UnHealthy)
- def offline(i: Int) = new InvokerHealth(InstanceId(i), Offline)
+ def healthy(i: Int) = new InvokerHealth(InvokerInstanceId(i), Healthy)
+ def unhealthy(i: Int) = new InvokerHealth(InvokerInstanceId(i), UnHealthy)
+ def offline(i: Int) = new InvokerHealth(InvokerInstanceId(i), Offline)
def semaphores(count: Int, max: Int): IndexedSeq[ForcableSemaphore] =
IndexedSeq.fill(count)(new ForcableSemaphore(max))