You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2017/11/07 10:57:50 UTC

[GitHub] markusthoemmes closed pull request #2916: use ZooKeeper as persistent store for invokerId assignment

markusthoemmes closed pull request #2916: use ZooKeeper as persistent store for invokerId assignment
URL: https://github.com/apache/incubator-openwhisk/pull/2916
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index 0669362b89..2b8dc7cbb0 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -119,8 +119,8 @@
         -e PORT='8080'
         -e KAFKA_HOST='{{ groups['kafka']|first }}'
         -e KAFKA_HOST_PORT='{{ kafka.port }}'
-        -e REDIS_HOST='{{ groups['redis'] | default([""]) | first }}'
-        -e REDIS_HOST_PORT='{{ redis.port }}'
+        -e ZOOKEEPER_HOST='{{ groups['kafka']|first }}'
+        -e ZOOKEEPER_HOST_PORT='{{ zookeeper.port }}'
         -e DB_PROTOCOL='{{ db_protocol }}'
         -e DB_PROVIDER='{{ db_provider }}'
         -e DB_HOST='{{ db_host }}'
diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index c5d70a0096..aa52e5b456 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -22,7 +22,6 @@ dependencies {
     compile 'ch.qos.logback:logback-classic:1.2.3'
     compile 'org.slf4j:jcl-over-slf4j:1.7.25'
     compile 'org.slf4j:log4j-over-slf4j:1.7.25'
-    compile 'net.debasishg:redisclient_2.11:3.4'
     compile 'commons-codec:commons-codec:1.9'
     compile 'commons-io:commons-io:2.4'
     compile 'commons-collections:commons-collections:3.2.2'
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 7e9d7c1e2f..56da847b5c 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -208,13 +208,13 @@ object WhiskConfig {
   val loadbalancerInvokerBusyThreshold = "loadbalancer.invokerBusyThreshold"
 
   val kafkaHostName = "kafka.host"
-  private val zookeeperHostName = "zookeeper.host"
+  val zookeeperHostName = "zookeeper.host"
   val redisHostName = "redis.host"
 
   private val edgeHostApiPort = "edge.host.apiport"
   val kafkaHostPort = "kafka.host.port"
   val redisHostPort = "redis.host.port"
-  private val zookeeperHostPort = "zookeeper.host.port"
+  val zookeeperHostPort = "zookeeper.host.port"
 
   val invokerHostsList = "invoker.hosts"
 
diff --git a/core/invoker/build.gradle b/core/invoker/build.gradle
index 0c2b245d83..e6a41db548 100644
--- a/core/invoker/build.gradle
+++ b/core/invoker/build.gradle
@@ -13,6 +13,7 @@ repositories {
 dependencies {
     compile "org.scala-lang:scala-library:${gradle.scala.version}"
     compile project(':common:scala')
+    compile 'org.apache.curator:curator-recipes:2.12.0'
 }
 
 tasks.withType(ScalaCompile) {
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index e1fec08f2b..788fbe8731 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -22,10 +22,12 @@ import scala.concurrent.duration._
 import scala.concurrent.Future
 import scala.util.Failure
 
-import com.redis.RedisClient
-
 import kamon.Kamon
 
+import org.apache.curator.retry.RetryUntilElapsed
+import org.apache.curator.framework.CuratorFrameworkFactory
+import org.apache.curator.framework.recipes.shared.SharedCount
+
 import akka.Done
 import akka.actor.ActorSystem
 import akka.actor.CoordinatedShutdown
@@ -56,7 +58,7 @@ object Invoker {
       WhiskEntityStore.requiredProperties ++
       WhiskActivationStore.requiredProperties ++
       kafkaHost ++
-      Map(redisHostName -> "", redisHostPort -> "") ++
+      Map(zookeeperHostName -> "", zookeeperHostPort -> "") ++
       wskApiHost ++ Map(
       dockerImageTag -> "latest",
       invokerNumCore -> "4",
@@ -109,37 +111,47 @@ object Invoker {
         id
       }
       .getOrElse {
-        if (config.redisHostName.trim.isEmpty || config.redisHostPort.trim.isEmpty) {
-          abort(
-            s"Must provide valid Redis host and port to use dynamicId assignment (${config.redisHostName}:${config.redisHostPort})")
+        if (config.zookeeperHost.startsWith(":") || config.zookeeperHost.endsWith(":")) {
+          abort(s"Must provide valid zookeeper host and port to use dynamicId assignment (${config.zookeeperHost})")
         }
         val invokerName = config.invokerName
         if (invokerName.trim.isEmpty) {
           abort("Invoker name can't be empty to use dynamicId assignment.")
         }
-        val redisClient = new RedisClient(config.redisHostName, config.redisHostPort.toInt)
-        val assignedId = redisClient
-          .hget("controller:registar:idAssignments", invokerName)
-          .map { oldId =>
-            logger.info(this, s"invokerReg: invoker ${invokerName} was assigned its previous invokerId ${oldId}")
-            oldId.toInt
-          }
-          .getOrElse {
-            // If key not present, incr initializes to 0 before applying increment.
-            // Convert from 1-based to 0-based invokerIds by subtracting 1 from incr's result
-            val newId = redisClient
-              .incr("controller:registrar:nextInvokerId")
-              .map { id =>
-                id.toInt - 1
+        logger.info(this, s"invokerReg: creating zkClient to ${config.zookeeperHost}")
+        val retryPolicy = new RetryUntilElapsed(5000, 500) // retry at 500ms intervals until 5 seconds have elapsed
+        val zkClient = CuratorFrameworkFactory.newClient(config.zookeeperHost, retryPolicy)
+        zkClient.start()
+        zkClient.blockUntilConnected();
+        logger.info(this, "invokerReg: connected to zookeeper")
+        val myIdPath = "/invokers/idAssignment/mapping/" + invokerName
+        val assignedId = Option(zkClient.checkExists().forPath(myIdPath)) match {
+          case None =>
+            // path doesn't exist ==> no previous mapping for this invoker
+            logger.info(this, s"invokerReg: no prior assignment of id for invoker $invokerName")
+            val idCounter = new SharedCount(zkClient, "/invokers/idAssignment/counter", 0)
+            idCounter.start()
+            def assignId(): Int = {
+              val current = idCounter.getVersionedValue()
+              if (idCounter.trySetCount(current, current.getValue() + 1)) {
+                current.getValue()
+              } else {
+                assignId()
               }
-              .getOrElse {
-                abort("Failed to increment invokerId")
-              }
-            redisClient.hset("controller:registar:idAssignments", invokerName, newId)
+            }
+            val newId = assignId()
+            idCounter.close()
+            zkClient.create().creatingParentContainersIfNeeded().forPath(myIdPath, BigInt(newId).toByteArray)
             logger.info(this, s"invokerReg: invoker ${invokerName} was assigned invokerId ${newId}")
             newId
-          }
-        redisClient.quit
+          case Some(_) =>
+            // path already exists ==> there is a previous mapping for this invoker we should use
+            val rawOldId = zkClient.getData().forPath(myIdPath)
+            val oldId = BigInt(rawOldId).intValue
+            logger.info(this, s"invokerReg: invoker ${invokerName} was assigned its previous invokerId ${oldId}")
+            oldId
+        }
+        zkClient.close()
         assignedId
       }
     val invokerInstance = InstanceId(assignedInvokerId);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services