You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by bd...@apache.org on 2020/11/13 18:37:49 UTC
[openwhisk] branch master updated: Reset / Overwrite invokerId for
unique name in zookeeper manually (#5024)
This is an automated email from the ASF dual-hosted git repository.
bdoyle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 526f011 Reset / Overwrite invokerId for unique name in zookeeper manually (#5024)
526f011 is described below
commit 526f0119ef9e89336b76c483e32c8dad75bfcdb4
Author: Brendan Doyle <bd...@gmail.com>
AuthorDate: Fri Nov 13 10:37:03 2020 -0800
Reset / Overwrite invokerId for unique name in zookeeper manually (#5024)
* init
* add instance id check for overwrite
* Update core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InstanceIdAssigner.scala
Co-authored-by: rodric rabbah <ro...@gmail.com>
* Update core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InstanceIdAssigner.scala
Co-authored-by: rodric rabbah <ro...@gmail.com>
* assign based on invoker pool size
Co-authored-by: Brendan Doyle <br...@qualtrics.com>
Co-authored-by: rodric rabbah <ro...@gmail.com>
---
.../core/invoker/InstanceIdAssigner.scala | 81 +++++++++++++++-------
.../apache/openwhisk/core/invoker/Invoker.scala | 15 ++--
.../invoker/test/InstanceIdAssignerTests.scala | 27 ++++++--
3 files changed, 89 insertions(+), 34 deletions(-)
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InstanceIdAssigner.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InstanceIdAssigner.scala
index 884c0e7..981522e 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InstanceIdAssigner.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InstanceIdAssigner.scala
@@ -22,6 +22,8 @@ import org.apache.curator.framework.recipes.shared.SharedCount
import org.apache.curator.retry.RetryUntilElapsed
import org.apache.openwhisk.common.Logging
+import scala.collection.JavaConverters._
+
/**
* Computes the instanceId for invoker
*
@@ -29,7 +31,7 @@ import org.apache.openwhisk.common.Logging
*/
private[invoker] class InstanceIdAssigner(connectionString: String)(implicit logger: Logging) {
- def getId(name: String): Int = {
+ def setAndGetId(name: String, overwriteId: Option[Int] = None): Int = {
logger.info(this, s"invokerReg: creating zkClient to $connectionString")
val retryPolicy = new RetryUntilElapsed(5000, 500) // retry at 500ms intervals until 5 seconds have elapsed
val zkClient = CuratorFrameworkFactory.newClient(connectionString, retryPolicy)
@@ -37,36 +39,63 @@ private[invoker] class InstanceIdAssigner(connectionString: String)(implicit log
zkClient.blockUntilConnected()
logger.info(this, "invokerReg: connected to zookeeper")
- val myIdPath = "/invokers/idAssignment/mapping/" + name
- val assignedId = Option(zkClient.checkExists().forPath(myIdPath)) match {
- case None =>
- // path doesn't exist -> no previous mapping for this invoker
- logger.info(this, s"invokerReg: no prior assignment of id for invoker $name")
- val idCounter = new SharedCount(zkClient, "/invokers/idAssignment/counter", 0)
- idCounter.start()
+ val rootPath = "/invokers/idAssignment/mapping"
+ val myIdPath = s"$rootPath/$name"
+ val assignedId = overwriteId
+ .map(newId => {
+ val invokers = zkClient.getChildren.forPath(rootPath).asScala
- def assignId(): Int = {
- val current = idCounter.getVersionedValue()
- if (idCounter.trySetCount(current, current.getValue() + 1)) {
- current.getValue()
- } else {
- assignId()
- }
- }
+ if (invokers.size < newId)
+ throw new IllegalArgumentException(s"invokerReg: cannot assign $newId to $name: not enough invokers")
+
+ //check if the invokerId already exists for another unique name and delete if it does
+ invokers
+ .map(uniqueName => {
+ val idPath = s"$rootPath/$uniqueName"
+ (idPath, BigInt(zkClient.getData.forPath(idPath)).intValue)
+ })
+ .find(_._2 == newId)
+ .map(id => zkClient.delete().forPath(id._1))
+
+ zkClient.create().orSetData().forPath(myIdPath, BigInt(newId).toByteArray)
- val newId = assignId()
- idCounter.close()
- zkClient.create().creatingParentContainersIfNeeded().forPath(myIdPath, BigInt(newId).toByteArray)
logger.info(this, s"invokerReg: invoker $name was assigned invokerId $newId")
newId
+ })
+ .getOrElse({
+ Option(zkClient.checkExists().forPath(myIdPath)) match {
+ case None =>
+ // path doesn't exist -> no previous mapping for this invoker
+ logger.info(this, s"invokerReg: no prior assignment of id for invoker $name")
+ val idCounter = new SharedCount(zkClient, "/invokers/idAssignment/counter", 0)
+ idCounter.start()
- case Some(_) =>
- // path already exists -> there is a previous mapping for this invoker we should use
- val rawOldId = zkClient.getData().forPath(myIdPath)
- val oldId = BigInt(rawOldId).intValue
- logger.info(this, s"invokerReg: invoker $name was assigned its previous invokerId $oldId")
- oldId
- }
+ def assignId(): Int = {
+ val current = idCounter.getVersionedValue()
+ val numInvokers = Option(zkClient.checkExists().forPath(rootPath))
+ .map(_ => zkClient.getChildren.forPath(rootPath).size())
+ .getOrElse(0)
+ if (idCounter.trySetCount(current, numInvokers + 1)) {
+ numInvokers
+ } else {
+ assignId()
+ }
+ }
+
+ val newId = assignId()
+ idCounter.close()
+ zkClient.create().creatingParentContainersIfNeeded().forPath(myIdPath, BigInt(newId).toByteArray)
+ logger.info(this, s"invokerReg: invoker $name was assigned invokerId $newId")
+ newId
+
+ case Some(_) =>
+ // path already exists -> there is a previous mapping for this invoker we should use
+ val rawOldId = zkClient.getData.forPath(myIdPath)
+ val oldId = BigInt(rawOldId).intValue
+ logger.info(this, s"invokerReg: invoker $name was assigned its previous invokerId $oldId")
+ oldId
+ }
+ })
zkClient.close()
assignedId
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
index 9901f95..1b0c8bf 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
@@ -40,7 +40,10 @@ import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.Try
-case class CmdLineArgs(uniqueName: Option[String] = None, id: Option[Int] = None, displayedName: Option[String] = None)
+case class CmdLineArgs(uniqueName: Option[String] = None,
+ id: Option[Int] = None,
+ displayedName: Option[String] = None,
+ overwriteId: Option[Int] = None)
object Invoker {
@@ -133,6 +136,8 @@ object Invoker {
// --uniqueName <value> a unique name to dynamically assign Kafka topics from Zookeeper
// --displayedName <value> a name to identify this invoker via invoker health protocol
// --id <value> proposed invokerId
+ // --overwriteId <value> proposed invokerId to re-write with uniqueName in Zookeeper,
+ // DO NOT USE overwriteId unless sure invokerId does not exist for other uniqueName
def parse(ls: List[String], c: CmdLineArgs): CmdLineArgs = {
ls match {
case "--uniqueName" :: uniqueName :: tail =>
@@ -141,6 +146,8 @@ object Invoker {
parse(tail, c.copy(displayedName = nonEmptyString(displayedName)))
case "--id" :: id :: tail if Try(id.toInt).isSuccess =>
parse(tail, c.copy(id = Some(id.toInt)))
+ case "--overwriteId" :: overwriteId :: tail if Try(overwriteId.toInt).isSuccess =>
+ parse(tail, c.copy(overwriteId = Some(overwriteId.toInt)))
case Nil => c
case _ => abort(s"Error processing command line arguments $ls")
}
@@ -150,16 +157,16 @@ object Invoker {
val assignedInvokerId = cmdLineArgs match {
// --id is defined with a valid value, use this id directly.
- case CmdLineArgs(_, Some(id), _) =>
+ case CmdLineArgs(_, Some(id), _, _) =>
logger.info(this, s"invokerReg: using proposedInvokerId $id")
id
// --uniqueName is defined with a valid value, id is empty, assign an id via zookeeper
- case CmdLineArgs(Some(unique), None, _) =>
+ case CmdLineArgs(Some(unique), None, _, overwriteId) =>
if (config.zookeeperHosts.startsWith(":") || config.zookeeperHosts.endsWith(":")) {
abort(s"Must provide valid zookeeper host and port to use dynamicId assignment (${config.zookeeperHosts})")
}
- new InstanceIdAssigner(config.zookeeperHosts).getId(unique)
+ new InstanceIdAssigner(config.zookeeperHosts).setAndGetId(unique, overwriteId)
case _ => abort(s"Either --id or --uniqueName must be configured with correct values")
}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/InstanceIdAssignerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/InstanceIdAssignerTests.scala
index 038f373..112d119 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/InstanceIdAssignerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/InstanceIdAssignerTests.scala
@@ -40,14 +40,33 @@ class InstanceIdAssignerTests extends FlatSpec with Matchers with StreamLogging
it should "assign fresh id" in {
val assigner = new InstanceIdAssigner(zkServer.getConnectString)
- assigner.getId("foo") shouldBe 0
+ assigner.setAndGetId("foo") shouldBe 0
}
it should "reuse id if exists" in {
val assigner = new InstanceIdAssigner(zkServer.getConnectString)
- assigner.getId("foo") shouldBe 0
- assigner.getId("bar") shouldBe 1
- assigner.getId("bar") shouldBe 1
+ assigner.setAndGetId("foo") shouldBe 0
+ assigner.setAndGetId("bar") shouldBe 1
+ assigner.setAndGetId("bar") shouldBe 1
}
+ it should "attempt to overwrite id for unique name if overwrite set" in {
+ val assigner = new InstanceIdAssigner(zkServer.getConnectString)
+ assigner.setAndGetId("foo") shouldBe 0
+ assigner.setAndGetId("bar", Some(0)) shouldBe 0
+ }
+
+ it should "overwrite an id for unique name that already exists and reset overwritten id" in {
+ val assigner = new InstanceIdAssigner(zkServer.getConnectString)
+ assigner.setAndGetId("foo") shouldBe 0
+ assigner.setAndGetId("bar", Some(0)) shouldBe 0
+ assigner.setAndGetId("foo") shouldBe 1
+ assigner.setAndGetId("cat") shouldBe 2
+ }
+
+ it should "fail to overwrite an id too large for the invoker pool size" in {
+ val assigner = new InstanceIdAssigner(zkServer.getConnectString)
+ assigner.setAndGetId("foo") shouldBe 0
+ assertThrows[IllegalArgumentException](assigner.setAndGetId("bar", Some(2)))
+ }
}