You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ty...@apache.org on 2020/03/11 19:49:41 UTC
[openwhisk] branch master updated: Kcf - optional cpu resource
scaling based on action memory (#4814)
This is an automated email from the ASF dual-hosted git repository.
tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 9038017 Kcf - optional cpu resource scaling based on action memory (#4814)
9038017 is described below
commit 90380176f5d201e77b08ee00b248ed20e6eaf099
Author: tysonnorris <tn...@adobe.com>
AuthorDate: Wed Mar 11 12:49:28 2020 -0700
Kcf - optional cpu resource scaling based on action memory (#4814)
* cpuscaling to set cpu resources based on memory config for action in kubernetes
---
core/invoker/src/main/resources/application.conf | 9 ++++
.../kubernetes/KubernetesClient.scala | 10 +++-
.../kubernetes/KubernetesContainerFactory.scala | 1 +
.../containerpool/kubernetes/WhiskPodBuilder.scala | 19 ++++++-
.../kubernetes/test/WhiskPodBuilderTests.scala | 63 +++++++++++++++++++++-
5 files changed, 96 insertions(+), 6 deletions(-)
diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf
index 5760a4b..3264943 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -88,6 +88,15 @@ whisk {
# See https://github.com/fabric8io/kubernetes-client#configuring-the-client for more information.
# action-namespace = "ns-actions"
+ #scale milliCPU config per segment of memory: 100 milliCPU == .1 vcpu per https://kubernetes.io/docs/tasks/configure-pod-container/assign-cpu-resource/
+ #code will append the "m" after calculating the number of milliCPU
+ #if missing, the pod will be created without cpu request/limit (and use the default for that namespace/cluster)
+ #if specified, the pod will be created with cpu request+limit set as (action memory limit / cpu-scaling.memory) * cpu-scaling.millicpus; with max of cpu-scaling.max-millicpus and min of cpu-scaling.millicpus
+ #cpu-scaling {
+ # millicpus = 100
+ # memory = 256 m
+ # max-millicpus = 4000
+ #}
}
# Timeouts for runc commands. Set to "Inf" to disable timeout.
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
index 8b4b050..f1bca95 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
@@ -64,6 +64,11 @@ import scala.util.{Failure, Success, Try}
case class KubernetesClientTimeoutConfig(run: FiniteDuration, logs: FiniteDuration)
/**
+ * Configuration for kubernetes cpu resource request/limit scaling based on action memory limit
+ */
+case class KubernetesCpuScalingConfig(millicpus: Int, memory: ByteSize, maxMillicpus: Int)
+
+/**
* Configuration for node affinity for the pods that execute user action containers
* The key,value pair should match the <key,value> pair with which the invoker worker nodes
* are labeled in the Kubernetes cluster. The default pair is <openwhisk-role,invoker>,
@@ -78,7 +83,8 @@ case class KubernetesClientConfig(timeouts: KubernetesClientTimeoutConfig,
userPodNodeAffinity: KubernetesInvokerNodeAffinity,
portForwardingEnabled: Boolean,
actionNamespace: Option[String] = None,
- podTemplate: Option[ConfigMapValue] = None)
+ podTemplate: Option[ConfigMapValue] = None,
+ cpuScaling: Option[KubernetesCpuScalingConfig] = None)
/**
* Serves as an interface to the Kubernetes API by proxying its REST API and/or invoking the kubectl CLI.
@@ -111,7 +117,7 @@ class KubernetesClient(
environment: Map[String, String] = Map.empty,
labels: Map[String, String] = Map.empty)(implicit transid: TransactionId): Future[KubernetesContainer] = {
- val pod = podBuilder.buildPodSpec(name, image, memory, environment, labels)
+ val pod = podBuilder.buildPodSpec(name, image, memory, environment, labels, config)
if (transid.meta.extraLogging) {
log.info(this, s"Pod spec being created\n${Serialization.asYaml(pod)}")
}
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
index a595c21..dbad7c4 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
@@ -37,6 +37,7 @@ import org.apache.openwhisk.core.containerpool.{
import org.apache.openwhisk.core.entity.ByteSize
import org.apache.openwhisk.core.entity.ExecManifest.ImageName
import org.apache.openwhisk.core.entity.InvokerInstanceId
+import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
class KubernetesContainerFactory(
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/WhiskPodBuilder.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/WhiskPodBuilder.scala
index ec115e1..f90384c 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/WhiskPodBuilder.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/WhiskPodBuilder.scala
@@ -41,7 +41,8 @@ class WhiskPodBuilder(client: NamespacedKubernetesClient,
image: String,
memory: ByteSize,
environment: Map[String, String],
- labels: Map[String, String])(implicit transid: TransactionId): Pod = {
+ labels: Map[String, String],
+ config: KubernetesClientConfig)(implicit transid: TransactionId): Pod = {
val envVars = environment.map {
case (key, value) => new EnvVarBuilder().withName(key).withValue(value).build()
}.toSeq
@@ -84,11 +85,18 @@ class WhiskPodBuilder(client: NamespacedKubernetesClient,
specBuilder.editMatchingContainer(actionContainerPredicate)
} else specBuilder.addNewContainer()
+ //if cpu scaling is enabled, calculate cpu from memory, 100m per 256Mi, min is 100m(.1cpu), max is 10000 (10cpu)
+ val cpu = config.cpuScaling
+ .map(cpuConfig => Map("cpu" -> new Quantity(calculateCpu(cpuConfig, memory) + "m")))
+ .getOrElse(Map.empty)
+
//In container its assumed that env, port, resource limits are set explicitly
//Here if any value exist in template then that would be overridden
containerBuilder
.withNewResources()
- .withLimits(Map("memory" -> new Quantity(memory.toMB + "Mi")).asJava)
+ //explicitly set requests and limits to same values
+ .withLimits((Map("memory" -> new Quantity(memory.toMB + "Mi")) ++ cpu).asJava)
+ .withRequests((Map("memory" -> new Quantity(memory.toMB + "Mi")) ++ cpu).asJava)
.endResources()
.withName("user-action")
.withImage(image)
@@ -113,6 +121,13 @@ class WhiskPodBuilder(client: NamespacedKubernetesClient,
pod
}
+ def calculateCpu(c: KubernetesCpuScalingConfig, memory: ByteSize): Int = {
+ val cpuPerMemorySegment = c.millicpus
+ val cpuMin = c.millicpus
+ val cpuMax = c.maxMillicpus
+ math.min(math.max((memory.toMB / c.memory.toMB) * cpuPerMemorySegment, cpuMin), cpuMax).toInt
+ }
+
private def loadPodSpec(bytes: Array[Byte]): Pod = {
val resources = client.load(new ByteArrayInputStream(bytes))
resources.get().get(0).asInstanceOf[Pod]
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/WhiskPodBuilderTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/WhiskPodBuilderTests.scala
index 5c6eee2..5ad9c58 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/WhiskPodBuilderTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/WhiskPodBuilderTests.scala
@@ -20,13 +20,20 @@ package org.apache.openwhisk.core.containerpool.kubernetes.test
import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.utils.Serialization
import org.apache.openwhisk.common.{ConfigMapValue, TransactionId}
-import org.apache.openwhisk.core.containerpool.kubernetes.{KubernetesInvokerNodeAffinity, WhiskPodBuilder}
+import org.apache.openwhisk.core.containerpool.kubernetes.{
+ KubernetesClientConfig,
+ KubernetesClientTimeoutConfig,
+ KubernetesCpuScalingConfig,
+ KubernetesInvokerNodeAffinity,
+ WhiskPodBuilder
+}
import org.apache.openwhisk.core.entity.size._
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{FlatSpec, Matchers}
import scala.collection.JavaConverters._
+import scala.concurrent.duration._
@RunWith(classOf[JUnitRunner])
class WhiskPodBuilderTests extends FlatSpec with Matchers with KubeClientSupport {
@@ -42,7 +49,51 @@ class WhiskPodBuilderTests extends FlatSpec with Matchers with KubeClientSupport
val builder = new WhiskPodBuilder(kubeClient, affinity)
assertPodSettings(builder)
}
+ it should "build set cpu scaled based on memory, if enabled in configuration" in {
+ val builder = new WhiskPodBuilder(kubeClient, affinity)
+ val config = KubernetesClientConfig(
+ KubernetesClientTimeoutConfig(1.second, 1.second),
+ KubernetesInvokerNodeAffinity(false, "k", "v"),
+ true,
+ None,
+ None,
+ Some(KubernetesCpuScalingConfig(300, 3.MB, 1000)))
+
+ val pod = builder.buildPodSpec(name, testImage, 2.MB, Map("foo" -> "bar"), Map("fooL" -> "barV"), config)
+ withClue(Serialization.asYaml(pod)) {
+ val c = getActionContainer(pod)
+ //min cpu is: config.millicpus
+ c.getResources.getLimits.asScala.get("cpu").map(_.getAmount) shouldBe Some("300m")
+ }
+
+ val pod2 = builder.buildPodSpec(name, testImage, 15.MB, Map("foo" -> "bar"), Map("fooL" -> "barV"), config)
+ withClue(Serialization.asYaml(pod2)) {
+ val c = getActionContainer(pod2)
+ //max cpu is: config.maxMillicpus
+ c.getResources.getLimits.asScala.get("cpu").map(_.getAmount) shouldBe Some("1000m")
+ }
+
+ val pod3 = builder.buildPodSpec(name, testImage, 7.MB, Map("foo" -> "bar"), Map("fooL" -> "barV"), config)
+ withClue(Serialization.asYaml(pod3)) {
+ val c = getActionContainer(pod3)
+ //scaled cpu is: action mem/config.mem x config.maxMillicpus
+ c.getResources.getLimits.asScala.get("cpu").map(_.getAmount) shouldBe Some("600m")
+ }
+ val config2 = KubernetesClientConfig(
+ KubernetesClientTimeoutConfig(1.second, 1.second),
+ KubernetesInvokerNodeAffinity(false, "k", "v"),
+ true,
+ None,
+ None)
+ val pod4 = builder.buildPodSpec(name, testImage, 7.MB, Map("foo" -> "bar"), Map("fooL" -> "barV"), config2)
+ withClue(Serialization.asYaml(pod4)) {
+ val c = getActionContainer(pod4)
+ //if scaling config is not provided, no cpu resources are specified
+ c.getResources.getLimits.asScala.get("cpu").map(_.getAmount) shouldBe None
+ }
+
+ }
it should "extend existing pod template" in {
val template = """
|---
@@ -104,12 +155,20 @@ class WhiskPodBuilderTests extends FlatSpec with Matchers with KubeClientSupport
}
private def assertPodSettings(builder: WhiskPodBuilder): Pod = {
- val pod = builder.buildPodSpec(name, testImage, memLimit, Map("foo" -> "bar"), Map("fooL" -> "barV"))
+ val config = KubernetesClientConfig(
+ KubernetesClientTimeoutConfig(1.second, 1.second),
+ KubernetesInvokerNodeAffinity(false, "k", "v"),
+ true,
+ None,
+ None,
+ Some(KubernetesCpuScalingConfig(300, 3.MB, 1000)))
+ val pod = builder.buildPodSpec(name, testImage, memLimit, Map("foo" -> "bar"), Map("fooL" -> "barV"), config)
withClue(Serialization.asYaml(pod)) {
val c = getActionContainer(pod)
c.getEnv.asScala.exists(_.getName == "foo") shouldBe true
c.getResources.getLimits.asScala.get("memory").map(_.getAmount) shouldBe Some("10Mi")
+ c.getResources.getLimits.asScala.get("cpu").map(_.getAmount) shouldBe Some("900m")
c.getSecurityContext.getCapabilities.getDrop.asScala should contain allOf ("NET_RAW", "NET_ADMIN")
c.getPorts.asScala.find(_.getName == "action").map(_.getContainerPort) shouldBe Some(8080)
c.getImage shouldBe testImage