You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2017/11/10 09:49:09 UTC

[incubator-openwhisk] branch master updated: Add ability to use controllers round-robin. (#2888)

This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new a2aa07e  Add ability to use controllers round-robin. (#2888)
a2aa07e is described below

commit a2aa07ec93c6230e52e99c89901394acfa8b872e
Author: Vadim Raskin <ra...@gmail.com>
AuthorDate: Fri Nov 10 10:49:06 2017 +0100

    Add ability to use controllers round-robin. (#2888)
    
    Per minute limits need to be diluted in an active/active scenario of controllers since they will be divided by the number of controllers in the system (to not need to share their state). A slight dilution of these values will account inconsistent usage of a specific controller when invoking/firing.
    
    Some tests assumed that only a single controller is active at any given time. These tests have been adjusted to also take round-robin used controllers into account.
---
 ansible/group_vars/all                             |  2 +
 ansible/roles/controller/tasks/deploy.yml          |  3 +-
 ansible/roles/nginx/templates/nginx.conf.j2        |  2 +-
 ansible/templates/whisk.properties.j2              |  1 +
 .../src/main/scala/whisk/core/WhiskConfig.scala    |  2 +
 .../MultipleReadersSingleWriterCache.scala         |  8 ++--
 .../scala/whisk/core/entitlement/Entitlement.scala | 34 +++++++++++++--
 tests/src/test/scala/ha/ShootComponentsTests.scala | 20 ++++++---
 tests/src/test/scala/limits/ThrottleTests.scala    | 17 +++++---
 .../core/database/test/CacheConcurrencyTests.scala | 51 +++++++++++++++-------
 .../MultipleReadersSingleWriterCacheTests.scala    | 11 ++---
 11 files changed, 108 insertions(+), 43 deletions(-)

diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 79642fe..c6ad149 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -117,6 +117,8 @@ controller:
       bindPort: 2551
       # at this moment all controllers are seed nodes
       seedNodes: "{{ groups['controllers'] | map('extract', hostvars, 'ansible_host') | list }}"
+  # We recommend to enable HA for the controllers only, if bookkeeping data are shared too. (localBookkeeping: false)
+  ha: "{{ controller_enable_ha | default(false) }}"
 
 registry:
   confdir: "{{ config_root_dir }}/registry"
diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml
index c669395..bba75d7 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -82,12 +82,11 @@
       "AKKA_CLUSTER_SEED_NODES": "{{seed_nodes_list | join(' ') }}"
       "AKKA_CLUSTER_BIND_PORT": "{{ controller.akka.cluster.bindPort }}"
       "AKKA_ACTOR_PROVIDER": "{{ controller.akka.provider }}"
-
       "METRICS_KAMON": "{{ metrics.kamon.enabled }}"
       "METRICS_LOG": "{{ metrics.log.enabled }}"
       "METRICS_KAMON_HOST": "{{ metrics.kamon.host }}"
       "METRICS_KAMON_PORT": "{{ metrics.kamon.port }}"
-
+      "CONTROLLER_HA": "{{ controller.ha }}"
     volumes:
       - "{{ whisk_logs_dir }}/controller{{ groups['controllers'].index(inventory_hostname) }}:/logs"
     ports:
diff --git a/ansible/roles/nginx/templates/nginx.conf.j2 b/ansible/roles/nginx/templates/nginx.conf.j2
index 6a0b4dc..9efae00 100644
--- a/ansible/roles/nginx/templates/nginx.conf.j2
+++ b/ansible/roles/nginx/templates/nginx.conf.j2
@@ -30,7 +30,7 @@ http {
         server {{ hostvars[groups['controllers'] | first].ansible_host }}:{{ controller.basePort }} fail_timeout=60s;
 {% for ip in groups['controllers'] %}
 {% if groups['controllers'].index(ip) > 0 %}
-        server {{ hostvars[ip].ansible_host }}:{{ controller.basePort + groups['controllers'].index(ip) }} backup;
+        server {{ hostvars[ip].ansible_host }}:{{ controller.basePort + groups['controllers'].index(ip) }} {% if controller.ha %}fail_timeout=60s{% else %}backup{% endif %};
 {% endif %}
 {% endfor %}
         keepalive 512;
diff --git a/ansible/templates/whisk.properties.j2 b/ansible/templates/whisk.properties.j2
index 2c1d644..ecef131 100644
--- a/ansible/templates/whisk.properties.j2
+++ b/ansible/templates/whisk.properties.j2
@@ -58,6 +58,7 @@ invoker.hosts.baseport={{ invoker.port }}
 controller.hosts={{ groups["controllers"] | map('extract', hostvars, 'ansible_host') | list | join(",") }}
 controller.host.basePort={{ controller.basePort }}
 controller.instances={{ controller.instances }}
+controller.ha={{ controller.ha }}
 
 invoker.container.network=bridge
 invoker.container.policy={{ invoker_container_policy_name | default()}}
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 726d1bc..bc63dc2 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -105,6 +105,7 @@ class WhiskConfig(requiredProperties: Map[String, String],
   val actionSequenceLimit = this(WhiskConfig.actionSequenceMaxLimit)
   val controllerSeedNodes = this(WhiskConfig.controllerSeedNodes)
   val controllerLocalBookkeeping = getAsBoolean(WhiskConfig.controllerLocalBookkeeping, false)
+  val controllerHighAvailability = getAsBoolean(WhiskConfig.controllerHighAvailability, false)
 }
 
 object WhiskConfig {
@@ -240,4 +241,5 @@ object WhiskConfig {
   val triggerFirePerMinuteLimit = "limits.triggers.fires.perMinute"
   val controllerSeedNodes = "akka.cluster.seed.nodes"
   val controllerLocalBookkeeping = "controller.localBookkeeping"
+  val controllerHighAvailability = "controller.ha"
 }
diff --git a/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala b/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala
index ef51097..6fa76d2 100644
--- a/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala
+++ b/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala
@@ -161,8 +161,6 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
     if (cacheEnabled) {
       logger.info(this, s"invalidating $key on delete")
 
-      notifier.foreach(_(key))
-
       // try inserting our desired entry...
       val desiredEntry = Entry(transid, InvalidateInProgress, None)
       cache(key)(desiredEntry) flatMap { actualEntry =>
@@ -206,6 +204,8 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
             // a pre-existing owner will take care of the invalidation
             invalidator
         }
+      } andThen {
+        case _ => notifier.foreach(_(key))
       }
     } else invalidator // not caching
   }
@@ -265,8 +265,6 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
     notifier: Option[CacheChangeNotification]): Future[Winfo] = {
     if (cacheEnabled) {
 
-      notifier.foreach(_(key))
-
       // try inserting our desired entry...
       val desiredEntry = Entry(transid, WriteInProgress, Some(Future.successful(doc)))
       cache(key)(desiredEntry) flatMap { actualEntry =>
@@ -292,6 +290,8 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
             invalidateEntryAfter(generator, key, actualEntry)
           }
         }
+      } andThen {
+        case _ => notifier.foreach(_(key))
       }
     } else generator // not caching
   }
diff --git a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
index 9afa83a..c068281 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
@@ -65,7 +65,9 @@ protected[core] object EntitlementProvider {
     WhiskConfig.actionInvokePerMinuteLimit -> null,
     WhiskConfig.actionInvokeConcurrentLimit -> null,
     WhiskConfig.triggerFirePerMinuteLimit -> null,
-    WhiskConfig.actionInvokeSystemOverloadLimit -> null)
+    WhiskConfig.actionInvokeSystemOverloadLimit -> null,
+    WhiskConfig.controllerInstances -> null,
+    WhiskConfig.controllerHighAvailability -> null)
 }
 
 /**
@@ -78,10 +80,36 @@ protected[core] abstract class EntitlementProvider(config: WhiskConfig, loadBala
 
   private implicit val executionContext = actorSystem.dispatcher
 
+  /**
+   * The number of controllers if HA is enabled, 1 otherwise
+   */
+  private val diviser = if (config.controllerHighAvailability) config.controllerInstances.toInt else 1
+
+  /**
+   * Allows 20% of additional requests on top of the limit to mitigate possible unfair round-robin loadbalancing between
+   * controllers
+   */
+  private val overcommit = if (config.controllerHighAvailability) 1.2 else 1
+
+  /**
+   * Adjust the throttles for a single controller with the diviser and the overcommit.
+   *
+   * @param originalThrottle The throttle that needs to be adjusted for this controller.
+   */
+  private def dilateThrottle(originalThrottle: Int): Int = {
+    Math.ceil((originalThrottle.toDouble / diviser.toDouble) * overcommit).toInt
+  }
+
   private val invokeRateThrottler =
-    new RateThrottler("actions per minute", config.actionInvokePerMinuteLimit.toInt, _.limits.invocationsPerMinute)
+    new RateThrottler(
+      "actions per minute",
+      dilateThrottle(config.actionInvokePerMinuteLimit.toInt),
+      _.limits.invocationsPerMinute.map(dilateThrottle))
   private val triggerRateThrottler =
-    new RateThrottler("triggers per minute", config.triggerFirePerMinuteLimit.toInt, _.limits.firesPerMinute)
+    new RateThrottler(
+      "triggers per minute",
+      dilateThrottle(config.triggerFirePerMinuteLimit.toInt),
+      _.limits.firesPerMinute.map(dilateThrottle))
   private val concurrentInvokeThrottler = new ActivationThrottler(
     loadBalancer,
     config.actionInvokeConcurrentLimit.toInt,
diff --git a/tests/src/test/scala/ha/ShootComponentsTests.scala b/tests/src/test/scala/ha/ShootComponentsTests.scala
index fa1aba1..5fd1525 100644
--- a/tests/src/test/scala/ha/ShootComponentsTests.scala
+++ b/tests/src/test/scala/ha/ShootComponentsTests.scala
@@ -53,6 +53,13 @@ class ShootComponentsTests extends FlatSpec with Matchers with WskTestHelpers wi
   implicit val materializer = ActorMaterializer()
   implicit val testConfig = PatienceConfig(1.minute)
 
+  // Throttle requests to the remaining controllers to avoid getting 429s. (60 req/min)
+  val amountOfControllers = WhiskProperties.getProperty(WhiskConfig.controllerInstances).toInt
+  val limit = WhiskProperties.getProperty(WhiskConfig.actionInvokeConcurrentLimit).toDouble
+  val limitPerController = limit / amountOfControllers
+  val allowedRequestsPerMinute = (amountOfControllers - 1.0) * limitPerController
+  val timeBeweenRequests = 60.seconds / allowedRequestsPerMinute
+
   val controller0DockerHost = WhiskProperties.getBaseControllerHost() + ":" + WhiskProperties.getProperty(
     WhiskConfig.dockerPort)
 
@@ -94,9 +101,8 @@ class ShootComponentsTests extends FlatSpec with Matchers with WskTestHelpers wi
 
       println(s"Done rerquests with responses: invoke: ${invokeExit.futureValue} and get: ${getExit.futureValue}")
 
-      // Do at most one action invocation per second to avoid getting 429s. (60 req/min - limit)
-      val wait = 1000 - (Instant.now.toEpochMilli - start.toEpochMilli)
-      Thread.sleep(if (wait < 0) 0L else if (wait > 1000) 1000L else wait)
+      val remainingWait = timeBeweenRequests.toMillis - (Instant.now.toEpochMilli - start.toEpochMilli)
+      Thread.sleep(if (remainingWait < 0) 0L else remainingWait)
       (invokeExit.futureValue, getExit.futureValue)
     }
   }
@@ -104,15 +110,15 @@ class ShootComponentsTests extends FlatSpec with Matchers with WskTestHelpers wi
   behavior of "Controllers hot standby"
 
   it should "use controller1 if controller0 goes down" in withAssetCleaner(wskprops) { (wp, assetHelper) =>
-    if (WhiskProperties.getProperty(WhiskConfig.controllerInstances).toInt >= 2) {
+    if (amountOfControllers >= 2) {
       val actionName = "shootcontroller"
 
       assetHelper.withCleaner(wsk.action, actionName) { (action, _) =>
         action.create(actionName, defaultAction)
       }
 
-      // Produce some load on the system for 100 seconds (each second one request). Kill the controller after 4 requests
-      val totalRequests = 100
+      // Produce some load on the system for 100 seconds. Kill the controller after 4 requests
+      val totalRequests = (100.seconds / timeBeweenRequests).toInt
 
       val requestsBeforeRestart = doRequests(4, actionName)
 
@@ -130,7 +136,7 @@ class ShootComponentsTests extends FlatSpec with Matchers with WskTestHelpers wi
       val requests = requestsBeforeRestart ++ requestsAfterRestart
 
       val unsuccessfulInvokes = requests.map(_._1).count(_ != TestUtils.SUCCESS_EXIT)
-      // Allow 3 failures for the 90 seconds
+      // Allow 3 failures for the 100 seconds
       unsuccessfulInvokes should be <= 3
 
       val unsuccessfulGets = requests.map(_._2).count(_ != TestUtils.SUCCESS_EXIT)
diff --git a/tests/src/test/scala/limits/ThrottleTests.scala b/tests/src/test/scala/limits/ThrottleTests.scala
index 511a1b3..7c59aec 100644
--- a/tests/src/test/scala/limits/ThrottleTests.scala
+++ b/tests/src/test/scala/limits/ThrottleTests.scala
@@ -27,11 +27,13 @@ import scala.concurrent.Promise
 import scala.concurrent.duration._
 
 import org.junit.runner.RunWith
+import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FlatSpec
 import org.scalatest.Matchers
 import org.scalatest.concurrent.ScalaFutures
 import org.scalatest.junit.JUnitRunner
 
+import common.RunWskAdminCmd
 import common.TestHelpers
 import common.TestUtils
 import common.TestUtils._
@@ -42,10 +44,9 @@ import common.WskProps
 import common.WskTestHelpers
 import spray.json._
 import spray.json.DefaultJsonProtocol._
+import whisk.core.WhiskConfig
 import whisk.http.Messages._
 import whisk.utils.ExecutionContextFactory
-import org.scalatest.BeforeAndAfterAll
-import common.RunWskAdminCmd
 import whisk.utils.retry
 
 protected[limits] trait LocalHelper {
@@ -72,8 +73,10 @@ class ThrottleTests
 
   val throttleWindow = 1.minute
 
-  val maximumInvokesPerMinute = getLimit("limits.actions.invokes.perMinute")
-  val maximumFiringsPerMinute = getLimit("limits.triggers.fires.perMinute")
+  // Due to the overhead of the per minute limit in the controller, we add this overhead here as well.
+  val overhead = if (WhiskProperties.getProperty(WhiskConfig.controllerHighAvailability).toBoolean) 1.2 else 1.0
+  val maximumInvokesPerMinute = math.ceil(getLimit("limits.actions.invokes.perMinute") * overhead).toInt
+  val maximumFiringsPerMinute = math.ceil(getLimit("limits.triggers.fires.perMinute") * overhead).toInt
   val maximumConcurrentInvokes = getLimit("limits.actions.invokes.concurrent")
 
   println(s"maximumInvokesPerMinute  = $maximumInvokesPerMinute")
@@ -368,11 +371,13 @@ class NamespaceSpecificThrottleTests
       trigger.create(triggerName)
     }
 
+    val deployedControllers = WhiskProperties.getControllerHosts.split(",").length
+
     // One invoke should be allowed, the second one throttled.
     // Due to the current implementation of the rate throttling,
     // it is possible that the counter gets deleted, because the minute switches.
     retry({
-      val results = (1 to 2).map { _ =>
+      val results = (1 to deployedControllers + 1).map { _ =>
         wsk.action.invoke(actionName, expectedExitCode = TestUtils.DONTCARE_EXIT)
       }
       results.map(_.exitCode) should contain(TestUtils.THROTTLED)
@@ -385,7 +390,7 @@ class NamespaceSpecificThrottleTests
     // Due to the current implementation of the rate throttling,
     // it is possible, that the counter gets deleted, because the minute switches.
     retry({
-      val results = (1 to 2).map { _ =>
+      val results = (1 to deployedControllers + 1).map { _ =>
         wsk.trigger.fire(triggerName, expectedExitCode = TestUtils.DONTCARE_EXIT)
       }
       results.map(_.exitCode) should contain(TestUtils.THROTTLED)
diff --git a/tests/src/test/scala/whisk/core/database/test/CacheConcurrencyTests.scala b/tests/src/test/scala/whisk/core/database/test/CacheConcurrencyTests.scala
index 1121522..5ae8167 100644
--- a/tests/src/test/scala/whisk/core/database/test/CacheConcurrencyTests.scala
+++ b/tests/src/test/scala/whisk/core/database/test/CacheConcurrencyTests.scala
@@ -17,9 +17,8 @@
 
 package whisk.core.database.test
 
-import akka.http.scaladsl.model.StatusCodes.NotFound
-
 import scala.collection.parallel._
+import scala.concurrent.duration.DurationInt
 import scala.concurrent.forkjoin.ForkJoinPool
 
 import org.junit.runner.RunWith
@@ -27,13 +26,16 @@ import org.scalatest.BeforeAndAfter
 import org.scalatest.FlatSpec
 import org.scalatest.junit.JUnitRunner
 
-import common.TestUtils
+import akka.http.scaladsl.model.StatusCodes.NotFound
 import common.TestUtils._
-import common.rest.WskRest
+import common.TestUtils
+import common.WhiskProperties
 import common.WskProps
 import common.WskTestHelpers
+import common.rest.WskRest
 import spray.json.JsString
 import whisk.common.TransactionId
+import whisk.utils.retry
 
 @RunWith(classOf[JUnitRunner])
 class CacheConcurrencyTests extends FlatSpec with WskTestHelpers with BeforeAndAfter {
@@ -98,9 +100,18 @@ class CacheConcurrencyTests extends FlatSpec with WskTestHelpers with BeforeAndA
         }
       }
 
-      run("get after delete") { name =>
-        wsk.action.get(name, expectedExitCode = NotFound.intValue)
-      }
+      // Give some time to replicate the state between the controllers
+      retry(
+        {
+          // Check that every controller has the correct state (used round robin)
+          WhiskProperties.getControllerHosts.split(",").foreach { _ =>
+            run("get after delete") { name =>
+              wsk.action.get(name, expectedExitCode = NotFound.intValue)
+            }
+          }
+        },
+        10,
+        Some(2.second))
 
       run("recreate") { name =>
         wsk.action.create(name, Some(actionFile))
@@ -123,14 +134,24 @@ class CacheConcurrencyTests extends FlatSpec with WskTestHelpers with BeforeAndA
         }
       }
 
-      run("get after update") { name =>
-        wsk.action.get(name)
-      } map {
-        case (name, rr) =>
-          withClue(s"get after update: failed check for $name") {
-            rr.stdout should include("blue")
-            rr.stdout should not include ("red")
+      // All controllers should have the correct action
+      // As they are used round robin, we ask every controller for the action.
+      // We add a retry to tollarate a short interval to bring the controllers in sync.
+      retry(
+        {
+          WhiskProperties.getControllerHosts.split(",").foreach { _ =>
+            run("get after update") { name =>
+              wsk.action.get(name)
+            } map {
+              case (name, rr) =>
+                withClue(s"get after update: failed check for $name") {
+                  rr.stdout should include("blue")
+                  rr.stdout should not include ("red")
+                }
+            }
           }
-      }
+        },
+        10,
+        Some(2.second))
     }
 }
diff --git a/tests/src/test/scala/whisk/core/database/test/MultipleReadersSingleWriterCacheTests.scala b/tests/src/test/scala/whisk/core/database/test/MultipleReadersSingleWriterCacheTests.scala
index 3b046a3..7196722 100644
--- a/tests/src/test/scala/whisk/core/database/test/MultipleReadersSingleWriterCacheTests.scala
+++ b/tests/src/test/scala/whisk/core/database/test/MultipleReadersSingleWriterCacheTests.scala
@@ -19,8 +19,9 @@ package whisk.core.database.test
 
 import java.util.concurrent.atomic.AtomicInteger
 
-import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Await
 import scala.concurrent.Future
+import scala.concurrent.duration.DurationInt
 
 import org.junit.runner.RunWith
 import org.scalatest.FlatSpec
@@ -59,17 +60,17 @@ class MultipleReadersSingleWriterCacheTests
     }
 
     // Create an cache entry
-    cacheUpdate("doc", key, Future.successful("db save successful"))
+    Await.ready(cacheUpdate("doc", key, Future.successful("db save successful")), 10.seconds)
     ctr.get shouldBe 1
 
     // Callback should be called if entry exists
-    cacheInvalidate(key, Future.successful(()))
+    Await.ready(cacheInvalidate(key, Future.successful(())), 10.seconds)
     ctr.get shouldBe 2
-    cacheUpdate("docdoc", key, Future.successful("update in db successful"))
+    Await.ready(cacheUpdate("docdoc", key, Future.successful("update in db successful")), 10.seconds)
     ctr.get shouldBe 3
 
     // Callback should be called if entry does not exist
-    cacheInvalidate(CacheKey("abc"), Future.successful(()))
+    Await.ready(cacheInvalidate(CacheKey("abc"), Future.successful(())), 10.seconds)
     ctr.get shouldBe 4
   }
 }

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].