You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2017/11/10 09:49:09 UTC
[incubator-openwhisk] branch master updated: Add ability to use
controllers round-robin. (#2888)
This is an automated email from the ASF dual-hosted git repository.
markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new a2aa07e Add ability to use controllers round-robin. (#2888)
a2aa07e is described below
commit a2aa07ec93c6230e52e99c89901394acfa8b872e
Author: Vadim Raskin <ra...@gmail.com>
AuthorDate: Fri Nov 10 10:49:06 2017 +0100
Add ability to use controllers round-robin. (#2888)
Per minute limits need to be diluted in an active/active scenario of controllers since they will be divided by the number of controllers in the system (to not need to share their state). A slight dilution of these values will account inconsistent usage of a specific controller when invoking/firing.
Some tests assumed that only a single controller is active at any given time. These tests have been adjusted to also take round-robin used controllers into account.
---
ansible/group_vars/all | 2 +
ansible/roles/controller/tasks/deploy.yml | 3 +-
ansible/roles/nginx/templates/nginx.conf.j2 | 2 +-
ansible/templates/whisk.properties.j2 | 1 +
.../src/main/scala/whisk/core/WhiskConfig.scala | 2 +
.../MultipleReadersSingleWriterCache.scala | 8 ++--
.../scala/whisk/core/entitlement/Entitlement.scala | 34 +++++++++++++--
tests/src/test/scala/ha/ShootComponentsTests.scala | 20 ++++++---
tests/src/test/scala/limits/ThrottleTests.scala | 17 +++++---
.../core/database/test/CacheConcurrencyTests.scala | 51 +++++++++++++++-------
.../MultipleReadersSingleWriterCacheTests.scala | 11 ++---
11 files changed, 108 insertions(+), 43 deletions(-)
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 79642fe..c6ad149 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -117,6 +117,8 @@ controller:
bindPort: 2551
# at this moment all controllers are seed nodes
seedNodes: "{{ groups['controllers'] | map('extract', hostvars, 'ansible_host') | list }}"
+ # We recommend to enable HA for the controllers only, if bookkeeping data are shared too. (localBookkeeping: false)
+ ha: "{{ controller_enable_ha | default(false) }}"
registry:
confdir: "{{ config_root_dir }}/registry"
diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml
index c669395..bba75d7 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -82,12 +82,11 @@
"AKKA_CLUSTER_SEED_NODES": "{{seed_nodes_list | join(' ') }}"
"AKKA_CLUSTER_BIND_PORT": "{{ controller.akka.cluster.bindPort }}"
"AKKA_ACTOR_PROVIDER": "{{ controller.akka.provider }}"
-
"METRICS_KAMON": "{{ metrics.kamon.enabled }}"
"METRICS_LOG": "{{ metrics.log.enabled }}"
"METRICS_KAMON_HOST": "{{ metrics.kamon.host }}"
"METRICS_KAMON_PORT": "{{ metrics.kamon.port }}"
-
+ "CONTROLLER_HA": "{{ controller.ha }}"
volumes:
- "{{ whisk_logs_dir }}/controller{{ groups['controllers'].index(inventory_hostname) }}:/logs"
ports:
diff --git a/ansible/roles/nginx/templates/nginx.conf.j2 b/ansible/roles/nginx/templates/nginx.conf.j2
index 6a0b4dc..9efae00 100644
--- a/ansible/roles/nginx/templates/nginx.conf.j2
+++ b/ansible/roles/nginx/templates/nginx.conf.j2
@@ -30,7 +30,7 @@ http {
server {{ hostvars[groups['controllers'] | first].ansible_host }}:{{ controller.basePort }} fail_timeout=60s;
{% for ip in groups['controllers'] %}
{% if groups['controllers'].index(ip) > 0 %}
- server {{ hostvars[ip].ansible_host }}:{{ controller.basePort + groups['controllers'].index(ip) }} backup;
+ server {{ hostvars[ip].ansible_host }}:{{ controller.basePort + groups['controllers'].index(ip) }} {% if controller.ha %}fail_timeout=60s{% else %}backup{% endif %};
{% endif %}
{% endfor %}
keepalive 512;
diff --git a/ansible/templates/whisk.properties.j2 b/ansible/templates/whisk.properties.j2
index 2c1d644..ecef131 100644
--- a/ansible/templates/whisk.properties.j2
+++ b/ansible/templates/whisk.properties.j2
@@ -58,6 +58,7 @@ invoker.hosts.baseport={{ invoker.port }}
controller.hosts={{ groups["controllers"] | map('extract', hostvars, 'ansible_host') | list | join(",") }}
controller.host.basePort={{ controller.basePort }}
controller.instances={{ controller.instances }}
+controller.ha={{ controller.ha }}
invoker.container.network=bridge
invoker.container.policy={{ invoker_container_policy_name | default()}}
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 726d1bc..bc63dc2 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -105,6 +105,7 @@ class WhiskConfig(requiredProperties: Map[String, String],
val actionSequenceLimit = this(WhiskConfig.actionSequenceMaxLimit)
val controllerSeedNodes = this(WhiskConfig.controllerSeedNodes)
val controllerLocalBookkeeping = getAsBoolean(WhiskConfig.controllerLocalBookkeeping, false)
+ val controllerHighAvailability = getAsBoolean(WhiskConfig.controllerHighAvailability, false)
}
object WhiskConfig {
@@ -240,4 +241,5 @@ object WhiskConfig {
val triggerFirePerMinuteLimit = "limits.triggers.fires.perMinute"
val controllerSeedNodes = "akka.cluster.seed.nodes"
val controllerLocalBookkeeping = "controller.localBookkeeping"
+ val controllerHighAvailability = "controller.ha"
}
diff --git a/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala b/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala
index ef51097..6fa76d2 100644
--- a/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala
+++ b/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala
@@ -161,8 +161,6 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
if (cacheEnabled) {
logger.info(this, s"invalidating $key on delete")
- notifier.foreach(_(key))
-
// try inserting our desired entry...
val desiredEntry = Entry(transid, InvalidateInProgress, None)
cache(key)(desiredEntry) flatMap { actualEntry =>
@@ -206,6 +204,8 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
// a pre-existing owner will take care of the invalidation
invalidator
}
+ } andThen {
+ case _ => notifier.foreach(_(key))
}
} else invalidator // not caching
}
@@ -265,8 +265,6 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
notifier: Option[CacheChangeNotification]): Future[Winfo] = {
if (cacheEnabled) {
- notifier.foreach(_(key))
-
// try inserting our desired entry...
val desiredEntry = Entry(transid, WriteInProgress, Some(Future.successful(doc)))
cache(key)(desiredEntry) flatMap { actualEntry =>
@@ -292,6 +290,8 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
invalidateEntryAfter(generator, key, actualEntry)
}
}
+ } andThen {
+ case _ => notifier.foreach(_(key))
}
} else generator // not caching
}
diff --git a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
index 9afa83a..c068281 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
@@ -65,7 +65,9 @@ protected[core] object EntitlementProvider {
WhiskConfig.actionInvokePerMinuteLimit -> null,
WhiskConfig.actionInvokeConcurrentLimit -> null,
WhiskConfig.triggerFirePerMinuteLimit -> null,
- WhiskConfig.actionInvokeSystemOverloadLimit -> null)
+ WhiskConfig.actionInvokeSystemOverloadLimit -> null,
+ WhiskConfig.controllerInstances -> null,
+ WhiskConfig.controllerHighAvailability -> null)
}
/**
@@ -78,10 +80,36 @@ protected[core] abstract class EntitlementProvider(config: WhiskConfig, loadBala
private implicit val executionContext = actorSystem.dispatcher
+ /**
+ * The number of controllers if HA is enabled, 1 otherwise
+ */
+ private val diviser = if (config.controllerHighAvailability) config.controllerInstances.toInt else 1
+
+ /**
+ * Allows 20% of additional requests on top of the limit to mitigate possible unfair round-robin loadbalancing between
+ * controllers
+ */
+ private val overcommit = if (config.controllerHighAvailability) 1.2 else 1
+
+ /**
+ * Adjust the throttles for a single controller with the diviser and the overcommit.
+ *
+ * @param originalThrottle The throttle that needs to be adjusted for this controller.
+ */
+ private def dilateThrottle(originalThrottle: Int): Int = {
+ Math.ceil((originalThrottle.toDouble / diviser.toDouble) * overcommit).toInt
+ }
+
private val invokeRateThrottler =
- new RateThrottler("actions per minute", config.actionInvokePerMinuteLimit.toInt, _.limits.invocationsPerMinute)
+ new RateThrottler(
+ "actions per minute",
+ dilateThrottle(config.actionInvokePerMinuteLimit.toInt),
+ _.limits.invocationsPerMinute.map(dilateThrottle))
private val triggerRateThrottler =
- new RateThrottler("triggers per minute", config.triggerFirePerMinuteLimit.toInt, _.limits.firesPerMinute)
+ new RateThrottler(
+ "triggers per minute",
+ dilateThrottle(config.triggerFirePerMinuteLimit.toInt),
+ _.limits.firesPerMinute.map(dilateThrottle))
private val concurrentInvokeThrottler = new ActivationThrottler(
loadBalancer,
config.actionInvokeConcurrentLimit.toInt,
diff --git a/tests/src/test/scala/ha/ShootComponentsTests.scala b/tests/src/test/scala/ha/ShootComponentsTests.scala
index fa1aba1..5fd1525 100644
--- a/tests/src/test/scala/ha/ShootComponentsTests.scala
+++ b/tests/src/test/scala/ha/ShootComponentsTests.scala
@@ -53,6 +53,13 @@ class ShootComponentsTests extends FlatSpec with Matchers with WskTestHelpers wi
implicit val materializer = ActorMaterializer()
implicit val testConfig = PatienceConfig(1.minute)
+ // Throttle requests to the remaining controllers to avoid getting 429s. (60 req/min)
+ val amountOfControllers = WhiskProperties.getProperty(WhiskConfig.controllerInstances).toInt
+ val limit = WhiskProperties.getProperty(WhiskConfig.actionInvokeConcurrentLimit).toDouble
+ val limitPerController = limit / amountOfControllers
+ val allowedRequestsPerMinute = (amountOfControllers - 1.0) * limitPerController
+ val timeBeweenRequests = 60.seconds / allowedRequestsPerMinute
+
val controller0DockerHost = WhiskProperties.getBaseControllerHost() + ":" + WhiskProperties.getProperty(
WhiskConfig.dockerPort)
@@ -94,9 +101,8 @@ class ShootComponentsTests extends FlatSpec with Matchers with WskTestHelpers wi
println(s"Done rerquests with responses: invoke: ${invokeExit.futureValue} and get: ${getExit.futureValue}")
- // Do at most one action invocation per second to avoid getting 429s. (60 req/min - limit)
- val wait = 1000 - (Instant.now.toEpochMilli - start.toEpochMilli)
- Thread.sleep(if (wait < 0) 0L else if (wait > 1000) 1000L else wait)
+ val remainingWait = timeBeweenRequests.toMillis - (Instant.now.toEpochMilli - start.toEpochMilli)
+ Thread.sleep(if (remainingWait < 0) 0L else remainingWait)
(invokeExit.futureValue, getExit.futureValue)
}
}
@@ -104,15 +110,15 @@ class ShootComponentsTests extends FlatSpec with Matchers with WskTestHelpers wi
behavior of "Controllers hot standby"
it should "use controller1 if controller0 goes down" in withAssetCleaner(wskprops) { (wp, assetHelper) =>
- if (WhiskProperties.getProperty(WhiskConfig.controllerInstances).toInt >= 2) {
+ if (amountOfControllers >= 2) {
val actionName = "shootcontroller"
assetHelper.withCleaner(wsk.action, actionName) { (action, _) =>
action.create(actionName, defaultAction)
}
- // Produce some load on the system for 100 seconds (each second one request). Kill the controller after 4 requests
- val totalRequests = 100
+ // Produce some load on the system for 100 seconds. Kill the controller after 4 requests
+ val totalRequests = (100.seconds / timeBeweenRequests).toInt
val requestsBeforeRestart = doRequests(4, actionName)
@@ -130,7 +136,7 @@ class ShootComponentsTests extends FlatSpec with Matchers with WskTestHelpers wi
val requests = requestsBeforeRestart ++ requestsAfterRestart
val unsuccessfulInvokes = requests.map(_._1).count(_ != TestUtils.SUCCESS_EXIT)
- // Allow 3 failures for the 90 seconds
+ // Allow 3 failures for the 100 seconds
unsuccessfulInvokes should be <= 3
val unsuccessfulGets = requests.map(_._2).count(_ != TestUtils.SUCCESS_EXIT)
diff --git a/tests/src/test/scala/limits/ThrottleTests.scala b/tests/src/test/scala/limits/ThrottleTests.scala
index 511a1b3..7c59aec 100644
--- a/tests/src/test/scala/limits/ThrottleTests.scala
+++ b/tests/src/test/scala/limits/ThrottleTests.scala
@@ -27,11 +27,13 @@ import scala.concurrent.Promise
import scala.concurrent.duration._
import org.junit.runner.RunWith
+import org.scalatest.BeforeAndAfterAll
import org.scalatest.FlatSpec
import org.scalatest.Matchers
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.junit.JUnitRunner
+import common.RunWskAdminCmd
import common.TestHelpers
import common.TestUtils
import common.TestUtils._
@@ -42,10 +44,9 @@ import common.WskProps
import common.WskTestHelpers
import spray.json._
import spray.json.DefaultJsonProtocol._
+import whisk.core.WhiskConfig
import whisk.http.Messages._
import whisk.utils.ExecutionContextFactory
-import org.scalatest.BeforeAndAfterAll
-import common.RunWskAdminCmd
import whisk.utils.retry
protected[limits] trait LocalHelper {
@@ -72,8 +73,10 @@ class ThrottleTests
val throttleWindow = 1.minute
- val maximumInvokesPerMinute = getLimit("limits.actions.invokes.perMinute")
- val maximumFiringsPerMinute = getLimit("limits.triggers.fires.perMinute")
+ // Due to the overhead of the per minute limit in the controller, we add this overhead here as well.
+ val overhead = if (WhiskProperties.getProperty(WhiskConfig.controllerHighAvailability).toBoolean) 1.2 else 1.0
+ val maximumInvokesPerMinute = math.ceil(getLimit("limits.actions.invokes.perMinute") * overhead).toInt
+ val maximumFiringsPerMinute = math.ceil(getLimit("limits.triggers.fires.perMinute") * overhead).toInt
val maximumConcurrentInvokes = getLimit("limits.actions.invokes.concurrent")
println(s"maximumInvokesPerMinute = $maximumInvokesPerMinute")
@@ -368,11 +371,13 @@ class NamespaceSpecificThrottleTests
trigger.create(triggerName)
}
+ val deployedControllers = WhiskProperties.getControllerHosts.split(",").length
+
// One invoke should be allowed, the second one throttled.
// Due to the current implementation of the rate throttling,
// it is possible that the counter gets deleted, because the minute switches.
retry({
- val results = (1 to 2).map { _ =>
+ val results = (1 to deployedControllers + 1).map { _ =>
wsk.action.invoke(actionName, expectedExitCode = TestUtils.DONTCARE_EXIT)
}
results.map(_.exitCode) should contain(TestUtils.THROTTLED)
@@ -385,7 +390,7 @@ class NamespaceSpecificThrottleTests
// Due to the current implementation of the rate throttling,
// it is possible, that the counter gets deleted, because the minute switches.
retry({
- val results = (1 to 2).map { _ =>
+ val results = (1 to deployedControllers + 1).map { _ =>
wsk.trigger.fire(triggerName, expectedExitCode = TestUtils.DONTCARE_EXIT)
}
results.map(_.exitCode) should contain(TestUtils.THROTTLED)
diff --git a/tests/src/test/scala/whisk/core/database/test/CacheConcurrencyTests.scala b/tests/src/test/scala/whisk/core/database/test/CacheConcurrencyTests.scala
index 1121522..5ae8167 100644
--- a/tests/src/test/scala/whisk/core/database/test/CacheConcurrencyTests.scala
+++ b/tests/src/test/scala/whisk/core/database/test/CacheConcurrencyTests.scala
@@ -17,9 +17,8 @@
package whisk.core.database.test
-import akka.http.scaladsl.model.StatusCodes.NotFound
-
import scala.collection.parallel._
+import scala.concurrent.duration.DurationInt
import scala.concurrent.forkjoin.ForkJoinPool
import org.junit.runner.RunWith
@@ -27,13 +26,16 @@ import org.scalatest.BeforeAndAfter
import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner
-import common.TestUtils
+import akka.http.scaladsl.model.StatusCodes.NotFound
import common.TestUtils._
-import common.rest.WskRest
+import common.TestUtils
+import common.WhiskProperties
import common.WskProps
import common.WskTestHelpers
+import common.rest.WskRest
import spray.json.JsString
import whisk.common.TransactionId
+import whisk.utils.retry
@RunWith(classOf[JUnitRunner])
class CacheConcurrencyTests extends FlatSpec with WskTestHelpers with BeforeAndAfter {
@@ -98,9 +100,18 @@ class CacheConcurrencyTests extends FlatSpec with WskTestHelpers with BeforeAndA
}
}
- run("get after delete") { name =>
- wsk.action.get(name, expectedExitCode = NotFound.intValue)
- }
+ // Give some time to replicate the state between the controllers
+ retry(
+ {
+ // Check that every controller has the correct state (used round robin)
+ WhiskProperties.getControllerHosts.split(",").foreach { _ =>
+ run("get after delete") { name =>
+ wsk.action.get(name, expectedExitCode = NotFound.intValue)
+ }
+ }
+ },
+ 10,
+ Some(2.second))
run("recreate") { name =>
wsk.action.create(name, Some(actionFile))
@@ -123,14 +134,24 @@ class CacheConcurrencyTests extends FlatSpec with WskTestHelpers with BeforeAndA
}
}
- run("get after update") { name =>
- wsk.action.get(name)
- } map {
- case (name, rr) =>
- withClue(s"get after update: failed check for $name") {
- rr.stdout should include("blue")
- rr.stdout should not include ("red")
+ // All controllers should have the correct action
+ // As they are used round robin, we ask every controller for the action.
+ // We add a retry to tollarate a short interval to bring the controllers in sync.
+ retry(
+ {
+ WhiskProperties.getControllerHosts.split(",").foreach { _ =>
+ run("get after update") { name =>
+ wsk.action.get(name)
+ } map {
+ case (name, rr) =>
+ withClue(s"get after update: failed check for $name") {
+ rr.stdout should include("blue")
+ rr.stdout should not include ("red")
+ }
+ }
}
- }
+ },
+ 10,
+ Some(2.second))
}
}
diff --git a/tests/src/test/scala/whisk/core/database/test/MultipleReadersSingleWriterCacheTests.scala b/tests/src/test/scala/whisk/core/database/test/MultipleReadersSingleWriterCacheTests.scala
index 3b046a3..7196722 100644
--- a/tests/src/test/scala/whisk/core/database/test/MultipleReadersSingleWriterCacheTests.scala
+++ b/tests/src/test/scala/whisk/core/database/test/MultipleReadersSingleWriterCacheTests.scala
@@ -19,8 +19,9 @@ package whisk.core.database.test
import java.util.concurrent.atomic.AtomicInteger
-import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Await
import scala.concurrent.Future
+import scala.concurrent.duration.DurationInt
import org.junit.runner.RunWith
import org.scalatest.FlatSpec
@@ -59,17 +60,17 @@ class MultipleReadersSingleWriterCacheTests
}
// Create an cache entry
- cacheUpdate("doc", key, Future.successful("db save successful"))
+ Await.ready(cacheUpdate("doc", key, Future.successful("db save successful")), 10.seconds)
ctr.get shouldBe 1
// Callback should be called if entry exists
- cacheInvalidate(key, Future.successful(()))
+ Await.ready(cacheInvalidate(key, Future.successful(())), 10.seconds)
ctr.get shouldBe 2
- cacheUpdate("docdoc", key, Future.successful("update in db successful"))
+ Await.ready(cacheUpdate("docdoc", key, Future.successful("update in db successful")), 10.seconds)
ctr.get shouldBe 3
// Callback should be called if entry does not exist
- cacheInvalidate(CacheKey("abc"), Future.successful(()))
+ Await.ready(cacheInvalidate(CacheKey("abc"), Future.successful(())), 10.seconds)
ctr.get shouldBe 4
}
}
--
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].