You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by cb...@apache.org on 2017/06/21 11:58:50 UTC

[incubator-openwhisk] branch master updated: Choose target invoker based on specific invoker load.

This is an automated email from the ASF dual-hosted git repository.

cbickel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new f25a8f1  Choose target invoker based on specific invoker load.
f25a8f1 is described below

commit f25a8f1f49e0086b4c9428665d15910d614e15fb
Author: Markus Thoemmes <ma...@de.ibm.com>
AuthorDate: Mon Jun 12 11:57:55 2017 +0200

    Choose target invoker based on specific invoker load.
    
    Currently, the loadbalancer advances from one Invoker to another after a fixed amount of invocations, which isn't aware of any load in the system causing suboptimal behavior.
    
    This only advances away from the home invoker of an action (determined by hash) if that home invoker is "heavily" loaded. We advance further if the next chosen invoker is busy and so forth. If we arrive at the home invoker again, the system is completely loaded and we force schedule to the home invoker. Step sizes are determined by prime numbers and also chosen by hashing to prevent chasing behavior.
---
 ansible/group_vars/all                             |   1 +
 ansible/templates/whisk.properties.j2              |   2 +-
 .../src/main/scala/whisk/core/WhiskConfig.scala    |   4 +-
 .../core/loadBalancer/LoadBalancerService.scala    | 119 +++++++++++++++-----
 .../test/LoadBalancerServiceObjectTests.scala      | 125 +++++++++++++++++++++
 5 files changed, 219 insertions(+), 32 deletions(-)

diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 59ef946..4e42368 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -124,6 +124,7 @@ invoker:
   arguments: "{{ invoker_arguments | default('') }}"
   numcore: 2
   coreshare: 2
+  busyThreshold: "{{ invoker_busy_threshold | default(16) }}"
   serializeDockerOp: true
   serializeDockerPull: true
   useRunc: false
diff --git a/ansible/templates/whisk.properties.j2 b/ansible/templates/whisk.properties.j2
index 05e5f76..5ee84f7 100644
--- a/ansible/templates/whisk.properties.j2
+++ b/ansible/templates/whisk.properties.j2
@@ -107,4 +107,4 @@ apigw.auth.pwd={{apigw_auth_pwd}}
 apigw.host={{apigw_host}}
 apigw.host.v2={{apigw_host_v2}}
 
-loadbalancer.activationCountBeforeNextInvoker={{ loadbalancer_activation_count_before_next_invoker | default(10) }}
+loadbalancer.invokerBusyThreshold={{ invoker.busyThreshold }}
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 7fa2c94..267ad38 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -83,7 +83,7 @@ class WhiskConfig(
     val wskApiHost = this(WhiskConfig.wskApiProtocol) + "://" + this(WhiskConfig.wskApiHostname) + ":" + this(WhiskConfig.wskApiPort)
     val controllerHost = this(WhiskConfig.controllerHostName) + ":" + this(WhiskConfig.controllerHostPort)
     val controllerBlackboxFraction = this.getAsDouble(WhiskConfig.controllerBlackboxFraction, 0.10)
-    val loadbalancerActivationCountBeforeNextInvoker = this.getAsInt(WhiskConfig.loadbalancerActivationCountBeforeNextInvoker, 10)
+    val loadbalancerInvokerBusyThreshold = this.getAsInt(WhiskConfig.loadbalancerInvokerBusyThreshold, 16)
 
     val edgeHost = this(WhiskConfig.edgeHostName) + ":" + this(WhiskConfig.edgeHostApiPort)
     val kafkaHost = this(WhiskConfig.kafkaHostName) + ":" + this(WhiskConfig.kafkaHostPort)
@@ -251,7 +251,7 @@ object WhiskConfig {
     private val controllerHostPort = "controller.host.port"
     private val controllerBlackboxFraction = "controller.blackboxFraction"
 
-    val loadbalancerActivationCountBeforeNextInvoker = "loadbalancer.activationCountBeforeNextInvoker"
+    val loadbalancerInvokerBusyThreshold = "loadbalancer.invokerBusyThreshold"
 
     val kafkaHostName = "kafka.host"
     val loadbalancerHostName = "loadbalancer.host"
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
index 201826f..7e633c5 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
@@ -20,7 +20,6 @@ package whisk.core.loadBalancer
 import java.nio.charset.StandardCharsets
 
 import java.time.{ Clock, Instant }
-import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.concurrent.TrieMap
 import scala.concurrent.Await
@@ -46,13 +45,14 @@ import whisk.common.TransactionId
 import whisk.connector.kafka.KafkaConsumerConnector
 import whisk.connector.kafka.KafkaProducerConnector
 import whisk.core.WhiskConfig
-import whisk.core.WhiskConfig.{ consulServer, kafkaHost, loadbalancerActivationCountBeforeNextInvoker }
+import whisk.core.WhiskConfig._
 import whisk.core.connector.{ ActivationMessage, CompletionMessage }
 import whisk.core.connector.MessageProducer
 import whisk.core.database.NoDocumentException
 import whisk.core.entity.{ ActivationId, CodeExec, WhiskAction, WhiskActivation }
 import whisk.core.entity.WhiskAction
 import whisk.core.entity.types.EntityStore
+import scala.annotation.tailrec
 
 trait LoadBalancer {
 
@@ -86,10 +86,6 @@ class LoadBalancerService(config: WhiskConfig, entityStore: EntityStore)(implici
     private val blackboxFraction: Double = Math.max(0.0, Math.min(1.0, config.controllerBlackboxFraction))
     logging.info(this, s"blackboxFraction = $blackboxFraction")
 
-    /** We run this often on an invoker before going onto the next. */
-    private val activationCountBeforeNextInvoker = Math.max(1, config.loadbalancerActivationCountBeforeNextInvoker)
-    logging.info(this, s"activationCountBeforeNextInvoker = $activationCountBeforeNextInvoker")
-
     override def getActiveUserActivationCounts: Map[String, Int] = activationBySubject.toMap mapValues { _.size }
 
     override def publish(action: WhiskAction, msg: ActivationMessage, timeout: FiniteDuration)(
@@ -278,18 +274,19 @@ class LoadBalancerService(config: WhiskConfig, entityStore: EntityStore)(implici
             case _              => false
         }
         val invokers = if (isBlackbox) blackboxInvokers else managedInvokers
-        val (hash, count) = hashAndCountSubjectAction(msg)
+        val hash = hashSubjectAction(msg)
 
         invokers.flatMap { invokers =>
-            val numInvokers = invokers.length
-            if (numInvokers > 0) {
-                val hashCount = math.abs(hash + count / activationCountBeforeNextInvoker)
-                val invokerIndex = hashCount % numInvokers
-                Future.successful(invokers(invokerIndex))
-            } else {
-                logging.error(this, s"all invokers down")(TransactionId.invokerHealth)
-                Future.failed(new LoadBalancerException("no invokers available"))
-            }
+            LoadBalancerService.schedule(
+                invokers,
+                activationByInvoker.mapValues(_.size),
+                config.loadbalancerInvokerBusyThreshold,
+                hash) match {
+                    case Some(invoker) => Future.successful(invoker)
+                    case None =>
+                        logging.error(this, s"all invokers down")(TransactionId.invokerHealth)
+                        Future.failed(new LoadBalancerException("no invokers available"))
+                }
         }
     }
 
@@ -300,26 +297,90 @@ class LoadBalancerService(config: WhiskConfig, entityStore: EntityStore)(implici
      * Invoker is currently using and which is better avoid if/until
      * these are moved to some common place (like a subclass of Message?)
      */
-    private val activationCountMap = TrieMap[(String, String), AtomicInteger]()
-    private def hashAndCountSubjectAction(msg: ActivationMessage): (Int, Int) = {
+    private def hashSubjectAction(msg: ActivationMessage): Int = {
         val subject = msg.user.subject.asString
         val path = msg.action.toString
-        val hash = subject.hashCode() ^ path.hashCode()
-        val key = (subject, path)
-        val count = activationCountMap.get(key) match {
-            case Some(counter) => counter.getAndIncrement()
-            case None => {
-                activationCountMap.put(key, new AtomicInteger(0))
-                0
-            }
-        }
-        return (hash, count)
+        (subject.hashCode() ^ path.hashCode()).abs
     }
 }
 
 object LoadBalancerService {
     def requiredProperties = kafkaHost ++ consulServer ++
-        Map(loadbalancerActivationCountBeforeNextInvoker -> null)
+        Map(loadbalancerInvokerBusyThreshold -> null)
+
+    /** Memoizes the result of `f` for later use. */
+    def memoize[I, O](f: I => O): I => O = new scala.collection.mutable.HashMap[I, O]() {
+        override def apply(key: I) = getOrElseUpdate(key, f(key))
+    }
+
+    /** Euclidean algorithm to determine the greatest-common-divisor */
+    @tailrec
+    def gcd(a: Int, b: Int): Int = if (b == 0) a else gcd(b, a % b)
+
+    /** Returns pairwise coprime numbers until x. Result is memoized. */
+    val pairwiseCoprimeNumbersUntil: Int => IndexedSeq[Int] = LoadBalancerService.memoize {
+        case x =>
+            (1 to x).foldLeft(IndexedSeq.empty[Int])((primes, cur) => {
+                if (gcd(cur, x) == 1 && primes.forall(i => gcd(i, cur) == 1)) {
+                    primes :+ cur
+                } else primes
+            })
+    }
+
+    /**
+     * Scans through all invokers and searches for an invoker, that has a queue length
+     * below the defined threshold. Iff no "underloaded" invoker was found it will
+     * default to the least loaded invoker in the list.
+     *
+     * @param availableInvokers a list of available (healthy) invokers to search in
+     * @param activationsPerInvoker a map of the number of outstanding activations per invoker
+     * @param invokerBusyThreshold defines when an invoker is considered overloaded
+     * @param hash stable identifier of the entity to be scheduled
+     * @return an invoker to schedule to or None of no invoker is available
+     */
+    def schedule[A](
+        availableInvokers: Seq[A],
+        activationsPerInvoker: collection.Map[A, Int],
+        invokerBusyThreshold: Int,
+        hash: Int): Option[A] = {
+
+        val numInvokers = availableInvokers.length
+        if (numInvokers > 0) {
+            val homeInvoker = hash % numInvokers
+
+            val stepSizes = LoadBalancerService.pairwiseCoprimeNumbersUntil(numInvokers)
+            val step = stepSizes(hash % stepSizes.size)
+
+            @tailrec
+            def search(targetInvoker: Int, seenInvokers: Int): A = {
+                // map the computed index to the actual invoker index
+                val invokerName = availableInvokers(targetInvoker)
+
+                // send the request to the target invoker if it has capacity...
+                if (activationsPerInvoker.get(invokerName).getOrElse(0) < invokerBusyThreshold) {
+                    invokerName
+                } else {
+                    // ... otherwise look for a less loaded invoker by stepping through a pre computed
+                    // list of invokers; there are two possible outcomes:
+                    // 1. the search lands on a new invoker that has capacity, choose it
+                    // 2. walked through the entire list and found no better invoker than the
+                    //    "home invoker", choose the least loaded invoker
+                    val newTarget = (targetInvoker + step) % numInvokers
+                    if (newTarget == homeInvoker || seenInvokers > numInvokers) {
+                        // fall back to the invoker with the least load.
+                        activationsPerInvoker.minBy(_._2)._1
+                    } else {
+                        search(newTarget, seenInvokers + 1)
+                    }
+                }
+            }
+
+            Some(search(homeInvoker, 0))
+        } else {
+            None
+        }
+    }
+
 }
 
 private case class ActiveAckTimeout(activationId: ActivationId) extends TimeoutException
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerServiceObjectTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerServiceObjectTests.scala
new file mode 100644
index 0000000..7066f06
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerServiceObjectTests.scala
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2015-2016 IBM Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.loadBalancer.test
+
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+import whisk.core.loadBalancer.LoadBalancerService
+
+/**
+ * Unit tests for the ContainerPool object.
+ *
+ * These tests test only the "static" methods "schedule" and "remove"
+ * of the ContainerPool object.
+ */
+@RunWith(classOf[JUnitRunner])
+class LoadBalancerServiceObjectTests extends FlatSpec with Matchers {
+    behavior of "memoize"
+
+    it should "not recompute a value which was already given" in {
+        var calls = 0
+        val add1: Int => Int = LoadBalancerService.memoize {
+            case second =>
+                calls += 1
+                1 + second
+        }
+
+        add1(1) shouldBe 2
+        calls shouldBe 1
+        add1(1) shouldBe 2
+        calls shouldBe 1
+        add1(2) shouldBe 3
+        calls shouldBe 2
+        add1(1) shouldBe 2
+        calls shouldBe 2
+    }
+
+    behavior of "pairwiseCoprimeNumbersUntil"
+
+    it should "return an empty set for malformed inputs" in {
+        LoadBalancerService.pairwiseCoprimeNumbersUntil(0) shouldBe Seq()
+        LoadBalancerService.pairwiseCoprimeNumbersUntil(-1) shouldBe Seq()
+    }
+
+    it should "return all coprime numbers until the number given" in {
+        LoadBalancerService.pairwiseCoprimeNumbersUntil(1) shouldBe Seq(1)
+        LoadBalancerService.pairwiseCoprimeNumbersUntil(2) shouldBe Seq(1)
+        LoadBalancerService.pairwiseCoprimeNumbersUntil(3) shouldBe Seq(1, 2)
+        LoadBalancerService.pairwiseCoprimeNumbersUntil(4) shouldBe Seq(1, 3)
+        LoadBalancerService.pairwiseCoprimeNumbersUntil(5) shouldBe Seq(1, 2, 3)
+        LoadBalancerService.pairwiseCoprimeNumbersUntil(9) shouldBe Seq(1, 2, 5, 7)
+        LoadBalancerService.pairwiseCoprimeNumbersUntil(10) shouldBe Seq(1, 3, 7)
+    }
+
+    behavior of "chooseInvoker"
+
+    def invokers(n: Int) = (0 until n).map(i => s"invoker$i")
+    def hashInto[A](list: Seq[A], hash: Int) = list(hash % list.size)
+
+    it should "return None on an empty invokers list" in {
+        LoadBalancerService.schedule(Seq(), Map(), 0, 1) shouldBe None
+    }
+
+    it should "schedule to the home invoker" in {
+        val invs = invokers(10)
+        val hash = 2
+
+        LoadBalancerService.schedule(invs, Map(), 1, hash) shouldBe Some(hashInto(invs, hash))
+    }
+
+    it should "jump to the next invoker determined by a hashed stepsize if the home invoker is overloaded" in {
+        val invokerCount = 10
+        val invs = invokers(invokerCount)
+        val hash = 2
+
+        val targetInvoker = hashInto(invs, hash)
+        val step = hashInto(LoadBalancerService.pairwiseCoprimeNumbersUntil(invokerCount), hash)
+
+        LoadBalancerService.schedule(invs, Map(targetInvoker -> 1), 1, hash) shouldBe Some(hashInto(invs, hash + step))
+    }
+
+    it should "wrap the search at the end of the invoker list" in {
+        val invokerCount = 3
+        val invs = invokers(invokerCount)
+        val hash = 1
+
+        val targetInvoker = hashInto(invs, hash) // will be invoker1
+        val step = hashInto(LoadBalancerService.pairwiseCoprimeNumbersUntil(invokerCount), hash) // will be 2
+        step shouldBe 2
+
+        // invoker1 is overloaded so it will step (2 steps) to the next one --> 1 2 0 --> invoker0 is next target
+        // invoker0 is overloaded so it will step to the next one --> 0 1 2 --> invoker2 is next target and underloaded
+        LoadBalancerService.schedule(
+            invs,
+            Map("invoker0" -> 1, "invoker1" -> 1),
+            1, hash) shouldBe Some(hashInto(invs, hash + step + step))
+    }
+
+    it should "choose the least loaded invoker if all invokers are overloaded to begin with" in {
+        val invokerCount = 3
+        val invs = invokers(invokerCount)
+        val hash = 1
+
+        LoadBalancerService.schedule(
+            invs,
+            Map("invoker0" -> 3, "invoker1" -> 3, "invoker2" -> 2),
+            1,
+            hash) shouldBe Some("invoker2")
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].