You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by bd...@apache.org on 2020/11/13 18:37:49 UTC

[openwhisk] branch master updated: Reset / Overwrite invokerId for unique name in zookeeper manually (#5024)

This is an automated email from the ASF dual-hosted git repository.

bdoyle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 526f011  Reset / Overwrite invokerId for unique name in zookeeper manually (#5024)
526f011 is described below

commit 526f0119ef9e89336b76c483e32c8dad75bfcdb4
Author: Brendan Doyle <bd...@gmail.com>
AuthorDate: Fri Nov 13 10:37:03 2020 -0800

    Reset / Overwrite invokerId for unique name in zookeeper manually (#5024)
    
    * init
    
    * add instance id check for overwrite
    
    * Update core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InstanceIdAssigner.scala
    
    Co-authored-by: rodric rabbah <ro...@gmail.com>
    
    * Update core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InstanceIdAssigner.scala
    
    Co-authored-by: rodric rabbah <ro...@gmail.com>
    
    * assign based on invoker pool size
    
    Co-authored-by: Brendan Doyle <br...@qualtrics.com>
    Co-authored-by: rodric rabbah <ro...@gmail.com>
---
 .../core/invoker/InstanceIdAssigner.scala          | 81 +++++++++++++++-------
 .../apache/openwhisk/core/invoker/Invoker.scala    | 15 ++--
 .../invoker/test/InstanceIdAssignerTests.scala     | 27 ++++++--
 3 files changed, 89 insertions(+), 34 deletions(-)

diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InstanceIdAssigner.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InstanceIdAssigner.scala
index 884c0e7..981522e 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InstanceIdAssigner.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InstanceIdAssigner.scala
@@ -22,6 +22,8 @@ import org.apache.curator.framework.recipes.shared.SharedCount
 import org.apache.curator.retry.RetryUntilElapsed
 import org.apache.openwhisk.common.Logging
 
+import scala.collection.JavaConverters._
+
 /**
  * Computes the instanceId for invoker
  *
@@ -29,7 +31,7 @@ import org.apache.openwhisk.common.Logging
  */
 private[invoker] class InstanceIdAssigner(connectionString: String)(implicit logger: Logging) {
 
-  def getId(name: String): Int = {
+  def setAndGetId(name: String, overwriteId: Option[Int] = None): Int = {
     logger.info(this, s"invokerReg: creating zkClient to $connectionString")
     val retryPolicy = new RetryUntilElapsed(5000, 500) // retry at 500ms intervals until 5 seconds have elapsed
     val zkClient = CuratorFrameworkFactory.newClient(connectionString, retryPolicy)
@@ -37,36 +39,63 @@ private[invoker] class InstanceIdAssigner(connectionString: String)(implicit log
     zkClient.blockUntilConnected()
     logger.info(this, "invokerReg: connected to zookeeper")
 
-    val myIdPath = "/invokers/idAssignment/mapping/" + name
-    val assignedId = Option(zkClient.checkExists().forPath(myIdPath)) match {
-      case None =>
-        // path doesn't exist -> no previous mapping for this invoker
-        logger.info(this, s"invokerReg: no prior assignment of id for invoker $name")
-        val idCounter = new SharedCount(zkClient, "/invokers/idAssignment/counter", 0)
-        idCounter.start()
+    val rootPath = "/invokers/idAssignment/mapping"
+    val myIdPath = s"$rootPath/$name"
+    val assignedId = overwriteId
+      .map(newId => {
+        val invokers = zkClient.getChildren.forPath(rootPath).asScala
 
-        def assignId(): Int = {
-          val current = idCounter.getVersionedValue()
-          if (idCounter.trySetCount(current, current.getValue() + 1)) {
-            current.getValue()
-          } else {
-            assignId()
-          }
-        }
+        if (invokers.size < newId)
+          throw new IllegalArgumentException(s"invokerReg: cannot assign $newId to $name: not enough invokers")
+
+        //check if the invokerId already exists for another unique name and delete if it does
+        invokers
+          .map(uniqueName => {
+            val idPath = s"$rootPath/$uniqueName"
+            (idPath, BigInt(zkClient.getData.forPath(idPath)).intValue)
+          })
+          .find(_._2 == newId)
+          .map(id => zkClient.delete().forPath(id._1))
+
+        zkClient.create().orSetData().forPath(myIdPath, BigInt(newId).toByteArray)
 
-        val newId = assignId()
-        idCounter.close()
-        zkClient.create().creatingParentContainersIfNeeded().forPath(myIdPath, BigInt(newId).toByteArray)
         logger.info(this, s"invokerReg: invoker $name was assigned invokerId $newId")
         newId
+      })
+      .getOrElse({
+        Option(zkClient.checkExists().forPath(myIdPath)) match {
+          case None =>
+            // path doesn't exist -> no previous mapping for this invoker
+            logger.info(this, s"invokerReg: no prior assignment of id for invoker $name")
+            val idCounter = new SharedCount(zkClient, "/invokers/idAssignment/counter", 0)
+            idCounter.start()
 
-      case Some(_) =>
-        // path already exists -> there is a previous mapping for this invoker we should use
-        val rawOldId = zkClient.getData().forPath(myIdPath)
-        val oldId = BigInt(rawOldId).intValue
-        logger.info(this, s"invokerReg: invoker $name was assigned its previous invokerId $oldId")
-        oldId
-    }
+            def assignId(): Int = {
+              val current = idCounter.getVersionedValue()
+              val numInvokers = Option(zkClient.checkExists().forPath(rootPath))
+                .map(_ => zkClient.getChildren.forPath(rootPath).size())
+                .getOrElse(0)
+              if (idCounter.trySetCount(current, numInvokers + 1)) {
+                numInvokers
+              } else {
+                assignId()
+              }
+            }
+
+            val newId = assignId()
+            idCounter.close()
+            zkClient.create().creatingParentContainersIfNeeded().forPath(myIdPath, BigInt(newId).toByteArray)
+            logger.info(this, s"invokerReg: invoker $name was assigned invokerId $newId")
+            newId
+
+          case Some(_) =>
+            // path already exists -> there is a previous mapping for this invoker we should use
+            val rawOldId = zkClient.getData.forPath(myIdPath)
+            val oldId = BigInt(rawOldId).intValue
+            logger.info(this, s"invokerReg: invoker $name was assigned its previous invokerId $oldId")
+            oldId
+        }
+      })
 
     zkClient.close()
     assignedId
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
index 9901f95..1b0c8bf 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
@@ -40,7 +40,10 @@ import scala.concurrent.duration._
 import scala.concurrent.{Await, ExecutionContext, Future}
 import scala.util.Try
 
-case class CmdLineArgs(uniqueName: Option[String] = None, id: Option[Int] = None, displayedName: Option[String] = None)
+case class CmdLineArgs(uniqueName: Option[String] = None,
+                       id: Option[Int] = None,
+                       displayedName: Option[String] = None,
+                       overwriteId: Option[Int] = None)
 
 object Invoker {
 
@@ -133,6 +136,8 @@ object Invoker {
     //    --uniqueName <value>   a unique name to dynamically assign Kafka topics from Zookeeper
     //    --displayedName <value> a name to identify this invoker via invoker health protocol
     //    --id <value>     proposed invokerId
+    //    --overwriteId <value> proposed invokerId to re-write with uniqueName in Zookeeper,
+    //    DO NOT USE overwriteId unless sure invokerId does not exist for other uniqueName
     def parse(ls: List[String], c: CmdLineArgs): CmdLineArgs = {
       ls match {
         case "--uniqueName" :: uniqueName :: tail =>
@@ -141,6 +146,8 @@ object Invoker {
           parse(tail, c.copy(displayedName = nonEmptyString(displayedName)))
         case "--id" :: id :: tail if Try(id.toInt).isSuccess =>
           parse(tail, c.copy(id = Some(id.toInt)))
+        case "--overwriteId" :: overwriteId :: tail if Try(overwriteId.toInt).isSuccess =>
+          parse(tail, c.copy(overwriteId = Some(overwriteId.toInt)))
         case Nil => c
         case _   => abort(s"Error processing command line arguments $ls")
       }
@@ -150,16 +157,16 @@ object Invoker {
 
     val assignedInvokerId = cmdLineArgs match {
       // --id is defined with a valid value, use this id directly.
-      case CmdLineArgs(_, Some(id), _) =>
+      case CmdLineArgs(_, Some(id), _, _) =>
         logger.info(this, s"invokerReg: using proposedInvokerId $id")
         id
 
       // --uniqueName is defined with a valid value, id is empty, assign an id via zookeeper
-      case CmdLineArgs(Some(unique), None, _) =>
+      case CmdLineArgs(Some(unique), None, _, overwriteId) =>
         if (config.zookeeperHosts.startsWith(":") || config.zookeeperHosts.endsWith(":")) {
           abort(s"Must provide valid zookeeper host and port to use dynamicId assignment (${config.zookeeperHosts})")
         }
-        new InstanceIdAssigner(config.zookeeperHosts).getId(unique)
+        new InstanceIdAssigner(config.zookeeperHosts).setAndGetId(unique, overwriteId)
 
       case _ => abort(s"Either --id or --uniqueName must be configured with correct values")
     }
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/InstanceIdAssignerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/InstanceIdAssignerTests.scala
index 038f373..112d119 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/InstanceIdAssignerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/InstanceIdAssignerTests.scala
@@ -40,14 +40,33 @@ class InstanceIdAssignerTests extends FlatSpec with Matchers with StreamLogging
 
   it should "assign fresh id" in {
     val assigner = new InstanceIdAssigner(zkServer.getConnectString)
-    assigner.getId("foo") shouldBe 0
+    assigner.setAndGetId("foo") shouldBe 0
   }
 
   it should "reuse id if exists" in {
     val assigner = new InstanceIdAssigner(zkServer.getConnectString)
-    assigner.getId("foo") shouldBe 0
-    assigner.getId("bar") shouldBe 1
-    assigner.getId("bar") shouldBe 1
+    assigner.setAndGetId("foo") shouldBe 0
+    assigner.setAndGetId("bar") shouldBe 1
+    assigner.setAndGetId("bar") shouldBe 1
   }
 
+  it should "attempt to overwrite id for unique name if overwrite set" in {
+    val assigner = new InstanceIdAssigner(zkServer.getConnectString)
+    assigner.setAndGetId("foo") shouldBe 0
+    assigner.setAndGetId("bar", Some(0)) shouldBe 0
+  }
+
+  it should "overwrite an id for unique name that already exists and reset overwritten id" in {
+    val assigner = new InstanceIdAssigner(zkServer.getConnectString)
+    assigner.setAndGetId("foo") shouldBe 0
+    assigner.setAndGetId("bar", Some(0)) shouldBe 0
+    assigner.setAndGetId("foo") shouldBe 1
+    assigner.setAndGetId("cat") shouldBe 2
+  }
+
+  it should "fail to overwrite an id too large for the invoker pool size" in {
+    val assigner = new InstanceIdAssigner(zkServer.getConnectString)
+    assigner.setAndGetId("foo") shouldBe 0
+    assertThrows[IllegalArgumentException](assigner.setAndGetId("bar", Some(2)))
+  }
 }