You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by dg...@apache.org on 2018/07/13 19:08:04 UTC

[incubator-openwhisk] branch master updated: Fix invokerId assignment path in zookeeper (#3866)

This is an automated email from the ASF dual-hosted git repository.

dgrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new ae2ba84  Fix invokerId assignment path in zookeeper (#3866)
ae2ba84 is described below

commit ae2ba8497c45bef38eeff8643e95687b307f67ff
Author: Chetan Mehrotra <ch...@apache.org>
AuthorDate: Sat Jul 14 00:37:58 2018 +0530

    Fix invokerId assignment path in zookeeper (#3866)
    
    Fix the id assignment path to eliminate Some(...) wrapping from Scala option type and also add some tests based on Curator Test Support. To enable test some refactoring was done to extract the existing id assignment logic to a new class InstanceIdAssigner.
---
 core/invoker/build.gradle                          |  4 +-
 .../whisk/core/invoker/InstanceIdAssigner.scala    | 75 ++++++++++++++++++++++
 .../main/scala/whisk/core/invoker/Invoker.scala    | 45 +------------
 settings.gradle                                    |  2 +
 tests/build.gradle                                 |  1 +
 .../invoker/test/InstanceIdAssignerTests.scala     | 53 +++++++++++++++
 6 files changed, 135 insertions(+), 45 deletions(-)

diff --git a/core/invoker/build.gradle b/core/invoker/build.gradle
index c25d5b7..327dc78 100644
--- a/core/invoker/build.gradle
+++ b/core/invoker/build.gradle
@@ -35,7 +35,9 @@ dependencies {
     compile "org.scala-lang:scala-library:${gradle.scala.version}"
     compile project(':common:scala')
 
-    compile 'org.apache.curator:curator-recipes:4.0.0', { exclude group: 'org.apache.zookeeper', module:'zookeeper' }
+    compile ("org.apache.curator:curator-recipes:${gradle.curator.version}") {
+        exclude group: 'org.apache.zookeeper', module:'zookeeper'
+    }
     compile ('org.apache.zookeeper:zookeeper:3.4.11') {
         exclude group: 'org.slf4j'
         exclude group: 'log4j'
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InstanceIdAssigner.scala b/core/invoker/src/main/scala/whisk/core/invoker/InstanceIdAssigner.scala
new file mode 100644
index 0000000..dd6babb
--- /dev/null
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InstanceIdAssigner.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.invoker
+
+import org.apache.curator.framework.CuratorFrameworkFactory
+import org.apache.curator.framework.recipes.shared.SharedCount
+import org.apache.curator.retry.RetryUntilElapsed
+import whisk.common.Logging
+
+/**
+ * Computes the instanceId for invoker
+ *
+ * @param connectionString zooKeeper connection string
+ */
+private[invoker] class InstanceIdAssigner(connectionString: String)(implicit logger: Logging) {
+
+  def getId(name: String): Int = {
+    logger.info(this, s"invokerReg: creating zkClient to $connectionString")
+    val retryPolicy = new RetryUntilElapsed(5000, 500) // retry at 500ms intervals until 5 seconds have elapsed
+    val zkClient = CuratorFrameworkFactory.newClient(connectionString, retryPolicy)
+    zkClient.start()
+    zkClient.blockUntilConnected()
+    logger.info(this, "invokerReg: connected to zookeeper")
+
+    val myIdPath = "/invokers/idAssignment/mapping/" + name
+    val assignedId = Option(zkClient.checkExists().forPath(myIdPath)) match {
+      case None =>
+        // path doesn't exist -> no previous mapping for this invoker
+        logger.info(this, s"invokerReg: no prior assignment of id for invoker $name")
+        val idCounter = new SharedCount(zkClient, "/invokers/idAssignment/counter", 0)
+        idCounter.start()
+
+        def assignId(): Int = {
+          val current = idCounter.getVersionedValue()
+          if (idCounter.trySetCount(current, current.getValue() + 1)) {
+            current.getValue()
+          } else {
+            assignId()
+          }
+        }
+
+        val newId = assignId()
+        idCounter.close()
+        zkClient.create().creatingParentContainersIfNeeded().forPath(myIdPath, BigInt(newId).toByteArray)
+        logger.info(this, s"invokerReg: invoker $name was assigned invokerId $newId")
+        newId
+
+      case Some(_) =>
+        // path already exists -> there is a previous mapping for this invoker we should use
+        val rawOldId = zkClient.getData().forPath(myIdPath)
+        val oldId = BigInt(rawOldId).intValue
+        logger.info(this, s"invokerReg: invoker $name was assigned its previous invokerId $oldId")
+        oldId
+    }
+
+    zkClient.close()
+    assignedId
+  }
+
+}
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index c5690f2..4f5b341 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -23,9 +23,6 @@ import scala.concurrent.Future
 import scala.util.Failure
 import scala.util.Try
 import kamon.Kamon
-import org.apache.curator.retry.RetryUntilElapsed
-import org.apache.curator.framework.CuratorFrameworkFactory
-import org.apache.curator.framework.recipes.shared.SharedCount
 import akka.Done
 import akka.actor.ActorSystem
 import akka.actor.CoordinatedShutdown
@@ -36,7 +33,6 @@ import whisk.core.WhiskConfig
 import whisk.core.WhiskConfig._
 import whisk.core.connector.MessagingProvider
 import whisk.core.connector.PingMessage
-import whisk.core.entity._
 import whisk.core.entity.ExecManifest
 import whisk.core.entity.InvokerInstanceId
 import whisk.http.{BasicHttpService, BasicRasService}
@@ -127,46 +123,7 @@ object Invoker {
           abort("Invoker name can't be empty to use dynamicId assignment.")
         }
 
-        logger.info(this, s"invokerReg: creating zkClient to ${config.zookeeperHosts}")
-        val retryPolicy = new RetryUntilElapsed(5000, 500) // retry at 500ms intervals until 5 seconds have elapsed
-        val zkClient = CuratorFrameworkFactory.newClient(config.zookeeperHosts, retryPolicy)
-        zkClient.start()
-        zkClient.blockUntilConnected()
-        logger.info(this, "invokerReg: connected to zookeeper")
-
-        val myIdPath = "/invokers/idAssignment/mapping/" + invokerUniqueName
-        val assignedId = Option(zkClient.checkExists().forPath(myIdPath)) match {
-          case None =>
-            // path doesn't exist -> no previous mapping for this invoker
-            logger.info(this, s"invokerReg: no prior assignment of id for invoker $invokerUniqueName")
-            val idCounter = new SharedCount(zkClient, "/invokers/idAssignment/counter", 0)
-            idCounter.start()
-
-            def assignId(): Int = {
-              val current = idCounter.getVersionedValue()
-              if (idCounter.trySetCount(current, current.getValue() + 1)) {
-                current.getValue()
-              } else {
-                assignId()
-              }
-            }
-
-            val newId = assignId()
-            idCounter.close()
-            zkClient.create().creatingParentContainersIfNeeded().forPath(myIdPath, BigInt(newId).toByteArray)
-            logger.info(this, s"invokerReg: invoker ${invokerUniqueName} was assigned invokerId ${newId}")
-            newId
-
-          case Some(_) =>
-            // path already exists -> there is a previous mapping for this invoker we should use
-            val rawOldId = zkClient.getData().forPath(myIdPath)
-            val oldId = BigInt(rawOldId).intValue
-            logger.info(this, s"invokerReg: invoker ${invokerUniqueName} was assigned its previous invokerId ${oldId}")
-            oldId
-        }
-
-        zkClient.close()
-        assignedId
+        new InstanceIdAssigner(config.zookeeperHosts).getId(invokerUniqueName.get)
       }
     val topicBaseName = "invoker"
     val topicName = topicBaseName + assignedInvokerId
diff --git a/settings.gradle b/settings.gradle
index d0f0d09..17dd43b 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -46,3 +46,5 @@ gradle.ext.scoverage = [
         'org.scoverage:scalac-scoverage-runtime_2.11:1.3.1'
     ]
 ]
+
+gradle.ext.curator = [version:'4.0.0']
diff --git a/tests/build.gradle b/tests/build.gradle
index 919d27e..b90c99d 100644
--- a/tests/build.gradle
+++ b/tests/build.gradle
@@ -146,6 +146,7 @@ dependencies {
     compile 'com.github.java-json-tools:json-schema-validator:2.2.8'
     compile "org.mockito:mockito-core:2.15.0"
     compile 'io.opentracing:opentracing-mock:0.31.0'
+    compile "org.apache.curator:curator-test:${gradle.curator.version}"
 
     compile project(':common:scala')
     compile project(':core:controller')
diff --git a/tests/src/test/scala/whisk/core/invoker/test/InstanceIdAssignerTests.scala b/tests/src/test/scala/whisk/core/invoker/test/InstanceIdAssignerTests.scala
new file mode 100644
index 0000000..ef9b038
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/invoker/test/InstanceIdAssignerTests.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.invoker.test
+
+import common.StreamLogging
+import org.apache.curator.test.TestingServer
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+import whisk.core.invoker.InstanceIdAssigner
+
+@RunWith(classOf[JUnitRunner])
+class InstanceIdAssignerTests extends FlatSpec with Matchers with StreamLogging with BeforeAndAfterEach {
+  behavior of "Id Assignment"
+
+  private var zkServer: TestingServer = _
+
+  override protected def beforeEach(): Unit = {
+    zkServer = new TestingServer()
+  }
+
+  override protected def afterEach(): Unit = {
+    zkServer.stop()
+  }
+
+  it should "assign fresh id" in {
+    val assigner = new InstanceIdAssigner(zkServer.getConnectString)
+    assigner.getId("foo") shouldBe 0
+  }
+
+  it should "reuse id if exists" in {
+    val assigner = new InstanceIdAssigner(zkServer.getConnectString)
+    assigner.getId("foo") shouldBe 0
+    assigner.getId("bar") shouldBe 1
+    assigner.getId("bar") shouldBe 1
+  }
+
+}