You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ty...@apache.org on 2020/03/11 19:49:41 UTC

[openwhisk] branch master updated: Kcf - optional cpu resource scaling based on action memory (#4814)

This is an automated email from the ASF dual-hosted git repository.

tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 9038017  Kcf - optional cpu resource scaling based on action memory (#4814)
9038017 is described below

commit 90380176f5d201e77b08ee00b248ed20e6eaf099
Author: tysonnorris <tn...@adobe.com>
AuthorDate: Wed Mar 11 12:49:28 2020 -0700

    Kcf - optional cpu resource scaling based on action memory (#4814)
    
    * cpuscaling to set cpu resources based on memory config for action in kubernetes
---
 core/invoker/src/main/resources/application.conf   |  9 ++++
 .../kubernetes/KubernetesClient.scala              | 10 +++-
 .../kubernetes/KubernetesContainerFactory.scala    |  1 +
 .../containerpool/kubernetes/WhiskPodBuilder.scala | 19 ++++++-
 .../kubernetes/test/WhiskPodBuilderTests.scala     | 63 +++++++++++++++++++++-
 5 files changed, 96 insertions(+), 6 deletions(-)

diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf
index 5760a4b..3264943 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -88,6 +88,15 @@ whisk {
     # See https://github.com/fabric8io/kubernetes-client#configuring-the-client for more information.
     # action-namespace = "ns-actions"
 
+    #scale milliCPU config per segment of memory: 100 milliCPU == .1 vcpu per https://kubernetes.io/docs/tasks/configure-pod-container/assign-cpu-resource/
+    #code will append the "m" after calculating the number of milliCPU
+    #if missing, the pod will be created without cpu request/limit (and use the default for that namespace/cluster)
+    #if specified, the pod will be created with cpu request+limit set as (action memory limit / cpu-scaling.memory) * cpu-scaling.millicpus; with max of cpu-scaling.max-millicpus and min of cpu-scaling.millicpus
+    #cpu-scaling {
+    #  millicpus = 100
+    #  memory = 256 m
+    #  max-millicpus = 4000
+    #}
   }
 
   # Timeouts for runc commands. Set to "Inf" to disable timeout.
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
index 8b4b050..f1bca95 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
@@ -64,6 +64,11 @@ import scala.util.{Failure, Success, Try}
 case class KubernetesClientTimeoutConfig(run: FiniteDuration, logs: FiniteDuration)
 
 /**
+ * Configuration for kubernetes cpu resource request/limit scaling based on action memory limit
+ */
+case class KubernetesCpuScalingConfig(millicpus: Int, memory: ByteSize, maxMillicpus: Int)
+
+/**
  * Configuration for node affinity for the pods that execute user action containers
  * The key,value pair should match the <key,value> pair with which the invoker worker nodes
  * are labeled in the Kubernetes cluster.  The default pair is <openwhisk-role,invoker>,
@@ -78,7 +83,8 @@ case class KubernetesClientConfig(timeouts: KubernetesClientTimeoutConfig,
                                   userPodNodeAffinity: KubernetesInvokerNodeAffinity,
                                   portForwardingEnabled: Boolean,
                                   actionNamespace: Option[String] = None,
-                                  podTemplate: Option[ConfigMapValue] = None)
+                                  podTemplate: Option[ConfigMapValue] = None,
+                                  cpuScaling: Option[KubernetesCpuScalingConfig] = None)
 
 /**
  * Serves as an interface to the Kubernetes API by proxying its REST API and/or invoking the kubectl CLI.
@@ -111,7 +117,7 @@ class KubernetesClient(
           environment: Map[String, String] = Map.empty,
           labels: Map[String, String] = Map.empty)(implicit transid: TransactionId): Future[KubernetesContainer] = {
 
-    val pod = podBuilder.buildPodSpec(name, image, memory, environment, labels)
+    val pod = podBuilder.buildPodSpec(name, image, memory, environment, labels, config)
     if (transid.meta.extraLogging) {
       log.info(this, s"Pod spec being created\n${Serialization.asYaml(pod)}")
     }
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
index a595c21..dbad7c4 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
@@ -37,6 +37,7 @@ import org.apache.openwhisk.core.containerpool.{
 import org.apache.openwhisk.core.entity.ByteSize
 import org.apache.openwhisk.core.entity.ExecManifest.ImageName
 import org.apache.openwhisk.core.entity.InvokerInstanceId
+import org.apache.openwhisk.core.entity.size._
 import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
 
 class KubernetesContainerFactory(
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/WhiskPodBuilder.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/WhiskPodBuilder.scala
index ec115e1..f90384c 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/WhiskPodBuilder.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/WhiskPodBuilder.scala
@@ -41,7 +41,8 @@ class WhiskPodBuilder(client: NamespacedKubernetesClient,
                    image: String,
                    memory: ByteSize,
                    environment: Map[String, String],
-                   labels: Map[String, String])(implicit transid: TransactionId): Pod = {
+                   labels: Map[String, String],
+                   config: KubernetesClientConfig)(implicit transid: TransactionId): Pod = {
     val envVars = environment.map {
       case (key, value) => new EnvVarBuilder().withName(key).withValue(value).build()
     }.toSeq
@@ -84,11 +85,18 @@ class WhiskPodBuilder(client: NamespacedKubernetesClient,
       specBuilder.editMatchingContainer(actionContainerPredicate)
     } else specBuilder.addNewContainer()
 
+    //if cpu scaling is enabled, calculate cpu from memory, 100m per 256Mi, min is 100m(.1cpu), max is 10000 (10cpu)
+    val cpu = config.cpuScaling
+      .map(cpuConfig => Map("cpu" -> new Quantity(calculateCpu(cpuConfig, memory) + "m")))
+      .getOrElse(Map.empty)
+
     //In container its assumed that env, port, resource limits are set explicitly
     //Here if any value exist in template then that would be overridden
     containerBuilder
       .withNewResources()
-      .withLimits(Map("memory" -> new Quantity(memory.toMB + "Mi")).asJava)
+      //explicitly set requests and limits to same values
+      .withLimits((Map("memory" -> new Quantity(memory.toMB + "Mi")) ++ cpu).asJava)
+      .withRequests((Map("memory" -> new Quantity(memory.toMB + "Mi")) ++ cpu).asJava)
       .endResources()
       .withName("user-action")
       .withImage(image)
@@ -113,6 +121,13 @@ class WhiskPodBuilder(client: NamespacedKubernetesClient,
     pod
   }
 
+  def calculateCpu(c: KubernetesCpuScalingConfig, memory: ByteSize): Int = {
+    val cpuPerMemorySegment = c.millicpus
+    val cpuMin = c.millicpus
+    val cpuMax = c.maxMillicpus
+    math.min(math.max((memory.toMB / c.memory.toMB) * cpuPerMemorySegment, cpuMin), cpuMax).toInt
+  }
+
   private def loadPodSpec(bytes: Array[Byte]): Pod = {
     val resources = client.load(new ByteArrayInputStream(bytes))
     resources.get().get(0).asInstanceOf[Pod]
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/WhiskPodBuilderTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/WhiskPodBuilderTests.scala
index 5c6eee2..5ad9c58 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/WhiskPodBuilderTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/WhiskPodBuilderTests.scala
@@ -20,13 +20,20 @@ package org.apache.openwhisk.core.containerpool.kubernetes.test
 import io.fabric8.kubernetes.api.model.Pod
 import io.fabric8.kubernetes.client.utils.Serialization
 import org.apache.openwhisk.common.{ConfigMapValue, TransactionId}
-import org.apache.openwhisk.core.containerpool.kubernetes.{KubernetesInvokerNodeAffinity, WhiskPodBuilder}
+import org.apache.openwhisk.core.containerpool.kubernetes.{
+  KubernetesClientConfig,
+  KubernetesClientTimeoutConfig,
+  KubernetesCpuScalingConfig,
+  KubernetesInvokerNodeAffinity,
+  WhiskPodBuilder
+}
 import org.apache.openwhisk.core.entity.size._
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{FlatSpec, Matchers}
 
 import scala.collection.JavaConverters._
+import scala.concurrent.duration._
 
 @RunWith(classOf[JUnitRunner])
 class WhiskPodBuilderTests extends FlatSpec with Matchers with KubeClientSupport {
@@ -42,7 +49,51 @@ class WhiskPodBuilderTests extends FlatSpec with Matchers with KubeClientSupport
     val builder = new WhiskPodBuilder(kubeClient, affinity)
     assertPodSettings(builder)
   }
+  it should "build set cpu scaled based on memory, if enabled in configuration" in {
+    val builder = new WhiskPodBuilder(kubeClient, affinity)
+    val config = KubernetesClientConfig(
+      KubernetesClientTimeoutConfig(1.second, 1.second),
+      KubernetesInvokerNodeAffinity(false, "k", "v"),
+      true,
+      None,
+      None,
+      Some(KubernetesCpuScalingConfig(300, 3.MB, 1000)))
+
+    val pod = builder.buildPodSpec(name, testImage, 2.MB, Map("foo" -> "bar"), Map("fooL" -> "barV"), config)
+    withClue(Serialization.asYaml(pod)) {
+      val c = getActionContainer(pod)
+      //min cpu is: config.millicpus
+      c.getResources.getLimits.asScala.get("cpu").map(_.getAmount) shouldBe Some("300m")
+    }
+
+    val pod2 = builder.buildPodSpec(name, testImage, 15.MB, Map("foo" -> "bar"), Map("fooL" -> "barV"), config)
+    withClue(Serialization.asYaml(pod2)) {
+      val c = getActionContainer(pod2)
+      //max cpu is: config.maxMillicpus
+      c.getResources.getLimits.asScala.get("cpu").map(_.getAmount) shouldBe Some("1000m")
+    }
+
+    val pod3 = builder.buildPodSpec(name, testImage, 7.MB, Map("foo" -> "bar"), Map("fooL" -> "barV"), config)
+    withClue(Serialization.asYaml(pod3)) {
+      val c = getActionContainer(pod3)
+      //scaled cpu is: action mem/config.mem x config.maxMillicpus
+      c.getResources.getLimits.asScala.get("cpu").map(_.getAmount) shouldBe Some("600m")
+    }
 
+    val config2 = KubernetesClientConfig(
+      KubernetesClientTimeoutConfig(1.second, 1.second),
+      KubernetesInvokerNodeAffinity(false, "k", "v"),
+      true,
+      None,
+      None)
+    val pod4 = builder.buildPodSpec(name, testImage, 7.MB, Map("foo" -> "bar"), Map("fooL" -> "barV"), config2)
+    withClue(Serialization.asYaml(pod4)) {
+      val c = getActionContainer(pod4)
+      //if scaling config is not provided, no cpu resources are specified
+      c.getResources.getLimits.asScala.get("cpu").map(_.getAmount) shouldBe None
+    }
+
+  }
   it should "extend existing pod template" in {
     val template = """
        |---
@@ -104,12 +155,20 @@ class WhiskPodBuilderTests extends FlatSpec with Matchers with KubeClientSupport
   }
 
   private def assertPodSettings(builder: WhiskPodBuilder): Pod = {
-    val pod = builder.buildPodSpec(name, testImage, memLimit, Map("foo" -> "bar"), Map("fooL" -> "barV"))
+    val config = KubernetesClientConfig(
+      KubernetesClientTimeoutConfig(1.second, 1.second),
+      KubernetesInvokerNodeAffinity(false, "k", "v"),
+      true,
+      None,
+      None,
+      Some(KubernetesCpuScalingConfig(300, 3.MB, 1000)))
+    val pod = builder.buildPodSpec(name, testImage, memLimit, Map("foo" -> "bar"), Map("fooL" -> "barV"), config)
     withClue(Serialization.asYaml(pod)) {
       val c = getActionContainer(pod)
       c.getEnv.asScala.exists(_.getName == "foo") shouldBe true
 
       c.getResources.getLimits.asScala.get("memory").map(_.getAmount) shouldBe Some("10Mi")
+      c.getResources.getLimits.asScala.get("cpu").map(_.getAmount) shouldBe Some("900m")
       c.getSecurityContext.getCapabilities.getDrop.asScala should contain allOf ("NET_RAW", "NET_ADMIN")
       c.getPorts.asScala.find(_.getName == "action").map(_.getContainerPort) shouldBe Some(8080)
       c.getImage shouldBe testImage