You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2017/11/07 10:57:48 UTC

[incubator-openwhisk] branch master updated: use ZooKeeper as persistent store for invokerId assignment (#2916)

This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c0cc90  use ZooKeeper as persistent store for invokerId assignment (#2916)
6c0cc90 is described below

commit 6c0cc9021ebff10a627d62bacd06e6e0d6a1676c
Author: David Grove <dg...@users.noreply.github.com>
AuthorDate: Tue Nov 7 05:57:46 2017 -0500

    use ZooKeeper as persistent store for invokerId assignment (#2916)
    
    Use ZooKeeper instead of Redis to persist mapping from invoker names to invoker ids and the counter for the next available id.
    
    The basic algorithm for dynamic invoker id assignment is unchanged, just using a different persistent store whose durability is a better match for this purpose (we want the invoker id assignment information to be as durable as the kafka topics it is being used to assign).
---
 ansible/roles/invoker/tasks/deploy.yml             |  4 +-
 common/scala/build.gradle                          |  1 -
 .../src/main/scala/whisk/core/WhiskConfig.scala    |  4 +-
 core/invoker/build.gradle                          |  1 +
 .../main/scala/whisk/core/invoker/Invoker.scala    | 64 +++++++++++++---------
 5 files changed, 43 insertions(+), 31 deletions(-)

diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index 0669362..2b8dc7c 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -119,8 +119,8 @@
         -e PORT='8080'
         -e KAFKA_HOST='{{ groups['kafka']|first }}'
         -e KAFKA_HOST_PORT='{{ kafka.port }}'
-        -e REDIS_HOST='{{ groups['redis'] | default([""]) | first }}'
-        -e REDIS_HOST_PORT='{{ redis.port }}'
+        -e ZOOKEEPER_HOST='{{ groups['kafka']|first }}'
+        -e ZOOKEEPER_HOST_PORT='{{ zookeeper.port }}'
         -e DB_PROTOCOL='{{ db_protocol }}'
         -e DB_PROVIDER='{{ db_provider }}'
         -e DB_HOST='{{ db_host }}'
diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index c5d70a0..aa52e5b 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -22,7 +22,6 @@ dependencies {
     compile 'ch.qos.logback:logback-classic:1.2.3'
     compile 'org.slf4j:jcl-over-slf4j:1.7.25'
     compile 'org.slf4j:log4j-over-slf4j:1.7.25'
-    compile 'net.debasishg:redisclient_2.11:3.4'
     compile 'commons-codec:commons-codec:1.9'
     compile 'commons-io:commons-io:2.4'
     compile 'commons-collections:commons-collections:3.2.2'
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 7e9d7c1..56da847 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -208,13 +208,13 @@ object WhiskConfig {
   val loadbalancerInvokerBusyThreshold = "loadbalancer.invokerBusyThreshold"
 
   val kafkaHostName = "kafka.host"
-  private val zookeeperHostName = "zookeeper.host"
+  val zookeeperHostName = "zookeeper.host"
   val redisHostName = "redis.host"
 
   private val edgeHostApiPort = "edge.host.apiport"
   val kafkaHostPort = "kafka.host.port"
   val redisHostPort = "redis.host.port"
-  private val zookeeperHostPort = "zookeeper.host.port"
+  val zookeeperHostPort = "zookeeper.host.port"
 
   val invokerHostsList = "invoker.hosts"
 
diff --git a/core/invoker/build.gradle b/core/invoker/build.gradle
index 0c2b245..e6a41db 100644
--- a/core/invoker/build.gradle
+++ b/core/invoker/build.gradle
@@ -13,6 +13,7 @@ repositories {
 dependencies {
     compile "org.scala-lang:scala-library:${gradle.scala.version}"
     compile project(':common:scala')
+    compile 'org.apache.curator:curator-recipes:2.12.0'
 }
 
 tasks.withType(ScalaCompile) {
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index e1fec08..788fbe8 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -22,10 +22,12 @@ import scala.concurrent.duration._
 import scala.concurrent.Future
 import scala.util.Failure
 
-import com.redis.RedisClient
-
 import kamon.Kamon
 
+import org.apache.curator.retry.RetryUntilElapsed
+import org.apache.curator.framework.CuratorFrameworkFactory
+import org.apache.curator.framework.recipes.shared.SharedCount
+
 import akka.Done
 import akka.actor.ActorSystem
 import akka.actor.CoordinatedShutdown
@@ -56,7 +58,7 @@ object Invoker {
       WhiskEntityStore.requiredProperties ++
       WhiskActivationStore.requiredProperties ++
       kafkaHost ++
-      Map(redisHostName -> "", redisHostPort -> "") ++
+      Map(zookeeperHostName -> "", zookeeperHostPort -> "") ++
       wskApiHost ++ Map(
       dockerImageTag -> "latest",
       invokerNumCore -> "4",
@@ -109,37 +111,47 @@ object Invoker {
         id
       }
       .getOrElse {
-        if (config.redisHostName.trim.isEmpty || config.redisHostPort.trim.isEmpty) {
-          abort(
-            s"Must provide valid Redis host and port to use dynamicId assignment (${config.redisHostName}:${config.redisHostPort})")
+        if (config.zookeeperHost.startsWith(":") || config.zookeeperHost.endsWith(":")) {
+          abort(s"Must provide valid zookeeper host and port to use dynamicId assignment (${config.zookeeperHost})")
         }
         val invokerName = config.invokerName
         if (invokerName.trim.isEmpty) {
           abort("Invoker name can't be empty to use dynamicId assignment.")
         }
-        val redisClient = new RedisClient(config.redisHostName, config.redisHostPort.toInt)
-        val assignedId = redisClient
-          .hget("controller:registar:idAssignments", invokerName)
-          .map { oldId =>
-            logger.info(this, s"invokerReg: invoker ${invokerName} was assigned its previous invokerId ${oldId}")
-            oldId.toInt
-          }
-          .getOrElse {
-            // If key not present, incr initializes to 0 before applying increment.
-            // Convert from 1-based to 0-based invokerIds by subtracting 1 from incr's result
-            val newId = redisClient
-              .incr("controller:registrar:nextInvokerId")
-              .map { id =>
-                id.toInt - 1
+        logger.info(this, s"invokerReg: creating zkClient to ${config.zookeeperHost}")
+        val retryPolicy = new RetryUntilElapsed(5000, 500) // retry at 500ms intervals until 5 seconds have elapsed
+        val zkClient = CuratorFrameworkFactory.newClient(config.zookeeperHost, retryPolicy)
+        zkClient.start()
+        zkClient.blockUntilConnected();
+        logger.info(this, "invokerReg: connected to zookeeper")
+        val myIdPath = "/invokers/idAssignment/mapping/" + invokerName
+        val assignedId = Option(zkClient.checkExists().forPath(myIdPath)) match {
+          case None =>
+            // path doesn't exist ==> no previous mapping for this invoker
+            logger.info(this, s"invokerReg: no prior assignment of id for invoker $invokerName")
+            val idCounter = new SharedCount(zkClient, "/invokers/idAssignment/counter", 0)
+            idCounter.start()
+            def assignId(): Int = {
+              val current = idCounter.getVersionedValue()
+              if (idCounter.trySetCount(current, current.getValue() + 1)) {
+                current.getValue()
+              } else {
+                assignId()
               }
-              .getOrElse {
-                abort("Failed to increment invokerId")
-              }
-            redisClient.hset("controller:registar:idAssignments", invokerName, newId)
+            }
+            val newId = assignId()
+            idCounter.close()
+            zkClient.create().creatingParentContainersIfNeeded().forPath(myIdPath, BigInt(newId).toByteArray)
             logger.info(this, s"invokerReg: invoker ${invokerName} was assigned invokerId ${newId}")
             newId
-          }
-        redisClient.quit
+          case Some(_) =>
+            // path already exists ==> there is a previous mapping for this invoker we should use
+            val rawOldId = zkClient.getData().forPath(myIdPath)
+            val oldId = BigInt(rawOldId).intValue
+            logger.info(this, s"invokerReg: invoker ${invokerName} was assigned its previous invokerId ${oldId}")
+            oldId
+        }
+        zkClient.close()
         assignedId
       }
     val invokerInstance = InstanceId(assignedInvokerId);

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].