You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2018/07/13 09:37:04 UTC

[incubator-openwhisk] branch master updated: Add an optional display name to the invokers. (#3855)

This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new c4f71ef  Add an optional display name to the invokers. (#3855)
c4f71ef is described below

commit c4f71efeca19e22d98226797fac14db4e34941bd
Author: Vadim Raskin <ra...@gmail.com>
AuthorDate: Fri Jul 13 11:36:58 2018 +0200

    Add an optional display name to the invokers. (#3855)
    
    When running the invokers on a container orchestrator, their name in that orchestrator will differ from the topic names used to schedule to them. While the former are usually auto-generated, the latter follow a strict form of a strictly increasing number, which is needed for scheduling.
    
    This adds a display name to the invoker's instance id, so that the operator of such a system can directly correlate the scheduling entity (topic name) and the corresponding container name in her system.
    
    This display name differs from the "unique name" in that the latter might be something more cryptical like the node's host ip, to stay stable in between redeployments of the system. The unique name is not necessarily useful for the operator though.
---
 .../core/containerpool/ContainerFactory.scala      |  2 +-
 .../main/scala/whisk/core/entity/InstanceId.scala  | 15 ++++++++--
 .../scala/whisk/core/controller/Controller.scala   |  2 +-
 .../ShardingContainerPoolBalancer.scala            |  2 +-
 .../main/scala/whisk/core/invoker/Invoker.scala    | 32 +++++++++++++---------
 5 files changed, 35 insertions(+), 18 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala b/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala
index ae7f3d1..9810cff 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala
@@ -75,7 +75,7 @@ object ContainerFactory {
 
   /** include the instance name, if specified and strip invalid chars before attempting to use them in the container name */
   def containerNamePrefix(instanceId: InvokerInstanceId): String =
-    s"wsk${instanceId.name.getOrElse("")}${instanceId.toInt}".filter(isAllowed)
+    s"wsk${instanceId.uniqueName.getOrElse("")}${instanceId.toInt}".filter(isAllowed)
 }
 
 /**
diff --git a/common/scala/src/main/scala/whisk/core/entity/InstanceId.scala b/common/scala/src/main/scala/whisk/core/entity/InstanceId.scala
index 9ee2dc0..5122980 100644
--- a/common/scala/src/main/scala/whisk/core/entity/InstanceId.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/InstanceId.scala
@@ -21,8 +21,19 @@ import spray.json.DefaultJsonProtocol
 import whisk.core.entity.ControllerInstanceId.LEGAL_CHARS
 import whisk.core.entity.ControllerInstanceId.MAX_NAME_LENGTH
 
-case class InvokerInstanceId(val instance: Int, name: Option[String] = None) {
+/**
+ * An instance id representing an invoker
+ *
+ * @param instance a numeric value used for the load balancing and Kafka topic creation
+ * @param uniqueName an identifier required for dynamic instance assignment by Zookeeper
+ * @param displayedName an identifier that is required for the health protocol to correlate Kafka topics with invoker container names
+ */
+case class InvokerInstanceId(val instance: Int,
+                             uniqueName: Option[String] = None,
+                             displayedName: Option[String] = None) {
   def toInt: Int = instance
+
+  override def toString: String = (Seq("invoker" + instance) ++ uniqueName ++ displayedName).mkString("/")
 }
 
 case class ControllerInstanceId(val asString: String) {
@@ -32,7 +43,7 @@ case class ControllerInstanceId(val asString: String) {
 }
 
 object InvokerInstanceId extends DefaultJsonProtocol {
-  implicit val serdes = jsonFormat2(InvokerInstanceId.apply)
+  implicit val serdes = jsonFormat3(InvokerInstanceId.apply)
 }
 
 object ControllerInstanceId extends DefaultJsonProtocol {
diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index d3260ed..5546e0f 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -146,7 +146,7 @@ class Controller(val instance: ControllerInstanceId,
         complete {
           loadBalancer
             .invokerHealth()
-            .map(_.map(i => s"invoker${i.id.toInt}" -> i.status.asString).toMap.toJson.asJsObject)
+            .map(_.map(i => i.id.toString -> i.status.asString).toMap.toJson.asJsObject)
         }
       } ~ path("healthy" / "count") {
         complete {
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index 9ddbf0a..a334b41 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -291,7 +291,7 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con
     val start = transid.started(
       this,
       LoggingMarkers.CONTROLLER_KAFKA,
-      s"posting topic '$topic' with activation id '${msg.activationId}'",
+      s"posting to '$invoker' with activation id '${msg.activationId}'",
       logLevel = InfoLevel)
 
     producer.send(topic, msg).andThen {
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index 1b291d4..c5690f2 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -44,7 +44,7 @@ import whisk.spi.SpiLoader
 import whisk.utils.ExecutionContextFactory
 import whisk.common.TransactionId
 
-case class CmdLineArgs(name: Option[String] = None, id: Option[Int] = None)
+case class CmdLineArgs(uniqueName: Option[String] = None, id: Option[Int] = None, displayedName: Option[String] = None)
 
 object Invoker {
 
@@ -96,11 +96,14 @@ object Invoker {
     // process command line arguments
     // We accept the command line grammar of:
     // Usage: invoker [options] [<proposedInvokerId>]
-    //    --name <value>   a unique name to use for this invoker
+    //    --uniqueName <value>   a unique name to dynamically assign Kafka topics from Zookeeper
+    //    --displayedName <value> a name to identify this invoker via invoker health protocol
     //    --id <value>     proposed invokerId
+
     def parse(ls: List[String], c: CmdLineArgs): CmdLineArgs = {
       ls match {
-        case "--name" :: name :: tail                        => parse(tail, c.copy(name = Some(name)))
+        case "--uniqueName" :: uniqueName :: tail            => parse(tail, c.copy(uniqueName = Some(uniqueName)))
+        case "--displayedName" :: displayedName :: tail      => parse(tail, c.copy(displayedName = Some(displayedName)))
         case "--id" :: id :: tail if Try(id.toInt).isSuccess => parse(tail, c.copy(id = Some(id.toInt)))
         case id :: Nil if Try(id.toInt).isSuccess            => c.copy(id = Some(id.toInt))
         case Nil                                             => c
@@ -109,7 +112,8 @@ object Invoker {
     }
     val cmdLineArgs = parse(args.toList, CmdLineArgs())
     logger.info(this, "Command line arguments parsed to yield " + cmdLineArgs)
-    val invokerName = cmdLineArgs.name.orElse(if (config.invokerName.trim.isEmpty) None else Some(config.invokerName))
+    val invokerUniqueName =
+      cmdLineArgs.uniqueName.orElse(if (config.invokerName.trim.isEmpty) None else Some(config.invokerName))
     val assignedInvokerId = cmdLineArgs.id
       .map { id =>
         logger.info(this, s"invokerReg: using proposedInvokerId ${id}")
@@ -119,7 +123,7 @@ object Invoker {
         if (config.zookeeperHosts.startsWith(":") || config.zookeeperHosts.endsWith(":")) {
           abort(s"Must provide valid zookeeper host and port to use dynamicId assignment (${config.zookeeperHosts})")
         }
-        if (invokerName.isEmpty || invokerName.get.trim.isEmpty) {
+        if (invokerUniqueName.isEmpty || invokerUniqueName.get.trim.isEmpty) {
           abort("Invoker name can't be empty to use dynamicId assignment.")
         }
 
@@ -130,11 +134,11 @@ object Invoker {
         zkClient.blockUntilConnected()
         logger.info(this, "invokerReg: connected to zookeeper")
 
-        val myIdPath = "/invokers/idAssignment/mapping/" + invokerName
+        val myIdPath = "/invokers/idAssignment/mapping/" + invokerUniqueName
         val assignedId = Option(zkClient.checkExists().forPath(myIdPath)) match {
           case None =>
             // path doesn't exist -> no previous mapping for this invoker
-            logger.info(this, s"invokerReg: no prior assignment of id for invoker $invokerName")
+            logger.info(this, s"invokerReg: no prior assignment of id for invoker $invokerUniqueName")
             val idCounter = new SharedCount(zkClient, "/invokers/idAssignment/counter", 0)
             idCounter.start()
 
@@ -150,25 +154,27 @@ object Invoker {
             val newId = assignId()
             idCounter.close()
             zkClient.create().creatingParentContainersIfNeeded().forPath(myIdPath, BigInt(newId).toByteArray)
-            logger.info(this, s"invokerReg: invoker ${invokerName} was assigned invokerId ${newId}")
+            logger.info(this, s"invokerReg: invoker ${invokerUniqueName} was assigned invokerId ${newId}")
             newId
 
           case Some(_) =>
             // path already exists -> there is a previous mapping for this invoker we should use
             val rawOldId = zkClient.getData().forPath(myIdPath)
             val oldId = BigInt(rawOldId).intValue
-            logger.info(this, s"invokerReg: invoker ${invokerName} was assigned its previous invokerId ${oldId}")
+            logger.info(this, s"invokerReg: invoker ${invokerUniqueName} was assigned its previous invokerId ${oldId}")
             oldId
         }
 
         zkClient.close()
         assignedId
       }
-
-    val invokerInstance = InvokerInstanceId(assignedInvokerId, invokerName)
+    val topicBaseName = "invoker"
+    val topicName = topicBaseName + assignedInvokerId
+    val invokerDisplayedName = cmdLineArgs.displayedName
+    val invokerInstance = InvokerInstanceId(assignedInvokerId, invokerUniqueName, invokerDisplayedName)
     val msgProvider = SpiLoader.get[MessagingProvider]
-    if (msgProvider.ensureTopic(config, topic = "invoker" + assignedInvokerId, topicConfig = "invoker").isFailure) {
-      abort(s"failure during msgProvider.ensureTopic for topic invoker$assignedInvokerId")
+    if (msgProvider.ensureTopic(config, topic = topicName, topicConfig = topicBaseName).isFailure) {
+      abort(s"failure during msgProvider.ensureTopic for topic $topicName")
     }
     val producer = msgProvider.getProducer(config)
     val invoker = try {