You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ty...@apache.org on 2020/03/12 16:40:31 UTC
[openwhisk] branch master updated: enable optional use of
PodDisruptionBudget for action pods (#4856)
This is an automated email from the ASF dual-hosted git repository.
tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new a8ed4e4 enable optional use of PodDisruptionBudget for action pods (#4856)
a8ed4e4 is described below
commit a8ed4e4c00e95ae9dca339081552f724f9541e86
Author: tysonnorris <tn...@adobe.com>
AuthorDate: Thu Mar 12 09:40:16 2020 -0700
enable optional use of PodDisruptionBudget for action pods (#4856)
---
core/invoker/src/main/resources/application.conf | 3 +
.../kubernetes/KubernetesClient.scala | 68 ++++++++++++----------
.../containerpool/kubernetes/WhiskPodBuilder.scala | 40 ++++++++++---
.../kubernetes/test/WhiskPodBuilderTests.scala | 44 ++++++++++----
4 files changed, 105 insertions(+), 50 deletions(-)
diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf
index 3264943..6c6d5aa 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -97,6 +97,9 @@ whisk {
# memory = 256 m
# max-millicpus = 4000
#}
+
+ #enable PodDisruptionBudget creation for pods? (will include same labels as pods, and specify minAvailable=1 to prevent termination of action pods during maintenance)
+ pdb-enabled = false
}
# Timeouts for runc commands. Set to "Inf" to disable timeout.
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
index f1bca95..4154166 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
@@ -82,9 +82,10 @@ case class KubernetesInvokerNodeAffinity(enabled: Boolean, key: String, value: S
case class KubernetesClientConfig(timeouts: KubernetesClientTimeoutConfig,
userPodNodeAffinity: KubernetesInvokerNodeAffinity,
portForwardingEnabled: Boolean,
- actionNamespace: Option[String] = None,
- podTemplate: Option[ConfigMapValue] = None,
- cpuScaling: Option[KubernetesCpuScalingConfig] = None)
+ actionNamespace: Option[String],
+ podTemplate: Option[ConfigMapValue],
+ cpuScaling: Option[KubernetesCpuScalingConfig],
+ pdbEnabled: Boolean)
/**
* Serves as an interface to the Kubernetes API by proxying its REST API and/or invoking the kubectl CLI.
@@ -117,7 +118,7 @@ class KubernetesClient(
environment: Map[String, String] = Map.empty,
labels: Map[String, String] = Map.empty)(implicit transid: TransactionId): Future[KubernetesContainer] = {
- val pod = podBuilder.buildPodSpec(name, image, memory, environment, labels, config)
+ val (pod, pdb) = podBuilder.buildPodSpec(name, image, memory, environment, labels, config)
if (transid.meta.extraLogging) {
log.info(this, s"Pod spec being created\n${Serialization.asYaml(pod)}")
}
@@ -131,6 +132,12 @@ class KubernetesClient(
//create the pod; catch any failure to end the transaction timer
try {
kubeRestClient.pods.inNamespace(namespace).create(pod)
+ pdb.map(
+ p =>
+ kubeRestClient.policy.podDisruptionBudget
+ .inNamespace(namespace)
+ .withName(name)
+ .create(p))
} catch {
case e: Throwable =>
transid.failed(this, start, s"Failed create pod for '$name': ${e.getClass} - ${e.getMessage}", ErrorLevel)
@@ -171,18 +178,31 @@ class KubernetesClient(
}
def rm(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = {
+ deleteByName(container.id.asString)
+ }
+ def rm(podName: String)(implicit transid: TransactionId): Future[Unit] = {
+ deleteByName(podName)
+ }
+
+ def rm(key: String, value: String, ensureUnpaused: Boolean = false)(implicit transid: TransactionId): Future[Unit] = {
val start = transid.started(
this,
LoggingMarkers.INVOKER_KUBEAPI_CMD("delete"),
- s"Deleting pod ${container.id}",
+ s"Deleting pods with label $key = $value",
logLevel = akka.event.Logging.InfoLevel)
Future {
blocking {
kubeRestClient
.inNamespace(kubeRestClient.getNamespace)
.pods()
- .withName(container.id.asString)
+ .withLabel(key, value)
.delete()
+ if (config.pdbEnabled) {
+ kubeRestClient.policy.podDisruptionBudget
+ .inNamespace(kubeRestClient.getNamespace)
+ .withLabel(key, value)
+ .delete()
+ }
}
}.map(_ => transid.finished(this, start, logLevel = InfoLevel))
.recover {
@@ -190,15 +210,15 @@ class KubernetesClient(
transid.failed(
this,
start,
- s"Failed delete pod for '${container.id}': ${e.getClass} - ${e.getMessage}",
+ s"Failed delete pods with label $key = $value: ${e.getClass} - ${e.getMessage}",
ErrorLevel)
}
}
- def rm(podName: String)(implicit transid: TransactionId): Future[Unit] = {
+ private def deleteByName(podName: String)(implicit transid: TransactionId) = {
val start = transid.started(
this,
LoggingMarkers.INVOKER_KUBEAPI_CMD("delete"),
- s"Deleting pod $podName",
+ s"Deleting pod ${podName}",
logLevel = akka.event.Logging.InfoLevel)
Future {
blocking {
@@ -207,27 +227,12 @@ class KubernetesClient(
.pods()
.withName(podName)
.delete()
- }
- }.map(_ => transid.finished(this, start, logLevel = InfoLevel))
- .recover {
- case e =>
- transid.failed(this, start, s"Failed delete pod for '$podName': ${e.getClass} - ${e.getMessage}", ErrorLevel)
- }
- }
-
- def rm(key: String, value: String, ensureUnpaused: Boolean = false)(implicit transid: TransactionId): Future[Unit] = {
- val start = transid.started(
- this,
- LoggingMarkers.INVOKER_KUBEAPI_CMD("delete"),
- s"Deleting pods with label $key = $value",
- logLevel = akka.event.Logging.InfoLevel)
- Future {
- blocking {
- kubeRestClient
- .inNamespace(kubeRestClient.getNamespace)
- .pods()
- .withLabel(key, value)
- .delete()
+ if (config.pdbEnabled) {
+ kubeRestClient.policy.podDisruptionBudget
+ .inNamespace(kubeRestClient.getNamespace)
+ .withName(podName)
+ .delete()
+ }
}
}.map(_ => transid.finished(this, start, logLevel = InfoLevel))
.recover {
@@ -235,11 +240,10 @@ class KubernetesClient(
transid.failed(
this,
start,
- s"Failed delete pods with label $key = $value: ${e.getClass} - ${e.getMessage}",
+ s"Failed delete pod for '${podName}': ${e.getClass} - ${e.getMessage}",
ErrorLevel)
}
}
-
// suspend is a no-op with the basic KubernetesClient
def suspend(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = Future.successful({})
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/WhiskPodBuilder.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/WhiskPodBuilder.scala
index f90384c..3b606ba 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/WhiskPodBuilder.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/WhiskPodBuilder.scala
@@ -21,7 +21,16 @@ import java.io.ByteArrayInputStream
import java.nio.charset.StandardCharsets.UTF_8
import io.fabric8.kubernetes.api.builder.Predicate
-import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, Pod, PodBuilder, Quantity}
+import io.fabric8.kubernetes.api.model.policy.{PodDisruptionBudget, PodDisruptionBudgetBuilder}
+import io.fabric8.kubernetes.api.model.{
+ ContainerBuilder,
+ EnvVarBuilder,
+ IntOrString,
+ LabelSelectorBuilder,
+ Pod,
+ PodBuilder,
+ Quantity
+}
import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import org.apache.openwhisk.common.{ConfigMapValue, TransactionId}
import org.apache.openwhisk.core.entity.ByteSize
@@ -37,12 +46,13 @@ class WhiskPodBuilder(client: NamespacedKubernetesClient,
def affinityEnabled: Boolean = userPodNodeAffinity.enabled
- def buildPodSpec(name: String,
- image: String,
- memory: ByteSize,
- environment: Map[String, String],
- labels: Map[String, String],
- config: KubernetesClientConfig)(implicit transid: TransactionId): Pod = {
+ def buildPodSpec(
+ name: String,
+ image: String,
+ memory: ByteSize,
+ environment: Map[String, String],
+ labels: Map[String, String],
+ config: KubernetesClientConfig)(implicit transid: TransactionId): (Pod, Option[PodDisruptionBudget]) = {
val envVars = environment.map {
case (key, value) => new EnvVarBuilder().withName(key).withValue(value).build()
}.toSeq
@@ -118,7 +128,21 @@ class WhiskPodBuilder(client: NamespacedKubernetesClient,
.endContainer()
.endSpec()
.build()
- pod
+ val pdb = if (config.pdbEnabled) {
+ Some(
+ new PodDisruptionBudgetBuilder().withNewMetadata
+ .withName(name)
+ .addToLabels(labels.asJava)
+ .endMetadata()
+ .withNewSpec()
+ .withMinAvailable(new IntOrString(1))
+ .withSelector(new LabelSelectorBuilder().withMatchLabels(Map("name" -> name).asJava).build())
+ .and
+ .build)
+ } else {
+ None
+ }
+ (pod, pdb)
}
def calculateCpu(c: KubernetesCpuScalingConfig, memory: ByteSize): Int = {
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/WhiskPodBuilderTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/WhiskPodBuilderTests.scala
index 5ad9c58..12ed183 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/WhiskPodBuilderTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/WhiskPodBuilderTests.scala
@@ -17,7 +17,8 @@
package org.apache.openwhisk.core.containerpool.kubernetes.test
-import io.fabric8.kubernetes.api.model.Pod
+import io.fabric8.kubernetes.api.model.policy.PodDisruptionBudgetBuilder
+import io.fabric8.kubernetes.api.model.{IntOrString, LabelSelectorBuilder, Pod}
import io.fabric8.kubernetes.client.utils.Serialization
import org.apache.openwhisk.common.{ConfigMapValue, TransactionId}
import org.apache.openwhisk.core.containerpool.kubernetes.{
@@ -57,23 +58,24 @@ class WhiskPodBuilderTests extends FlatSpec with Matchers with KubeClientSupport
true,
None,
None,
- Some(KubernetesCpuScalingConfig(300, 3.MB, 1000)))
+ Some(KubernetesCpuScalingConfig(300, 3.MB, 1000)),
+ false)
- val pod = builder.buildPodSpec(name, testImage, 2.MB, Map("foo" -> "bar"), Map("fooL" -> "barV"), config)
+ val (pod, _) = builder.buildPodSpec(name, testImage, 2.MB, Map("foo" -> "bar"), Map("fooL" -> "barV"), config)
withClue(Serialization.asYaml(pod)) {
val c = getActionContainer(pod)
//min cpu is: config.millicpus
c.getResources.getLimits.asScala.get("cpu").map(_.getAmount) shouldBe Some("300m")
}
- val pod2 = builder.buildPodSpec(name, testImage, 15.MB, Map("foo" -> "bar"), Map("fooL" -> "barV"), config)
+ val (pod2, _) = builder.buildPodSpec(name, testImage, 15.MB, Map("foo" -> "bar"), Map("fooL" -> "barV"), config)
withClue(Serialization.asYaml(pod2)) {
val c = getActionContainer(pod2)
//max cpu is: config.maxMillicpus
c.getResources.getLimits.asScala.get("cpu").map(_.getAmount) shouldBe Some("1000m")
}
- val pod3 = builder.buildPodSpec(name, testImage, 7.MB, Map("foo" -> "bar"), Map("fooL" -> "barV"), config)
+ val (pod3, _) = builder.buildPodSpec(name, testImage, 7.MB, Map("foo" -> "bar"), Map("fooL" -> "barV"), config)
withClue(Serialization.asYaml(pod3)) {
val c = getActionContainer(pod3)
//scaled cpu is: action mem/config.mem x config.maxMillicpus
@@ -85,8 +87,10 @@ class WhiskPodBuilderTests extends FlatSpec with Matchers with KubeClientSupport
KubernetesInvokerNodeAffinity(false, "k", "v"),
true,
None,
- None)
- val pod4 = builder.buildPodSpec(name, testImage, 7.MB, Map("foo" -> "bar"), Map("fooL" -> "barV"), config2)
+ None,
+ None,
+ false)
+ val (pod4, _) = builder.buildPodSpec(name, testImage, 7.MB, Map("foo" -> "bar"), Map("fooL" -> "barV"), config2)
withClue(Serialization.asYaml(pod4)) {
val c = getActionContainer(pod4)
//if scaling config is not provided, no cpu resources are specified
@@ -131,6 +135,11 @@ class WhiskPodBuilderTests extends FlatSpec with Matchers with KubeClientSupport
pod.getMetadata.getNamespace shouldBe "whiskns"
}
+ it should "build a pod disruption budget for the pod, if enabled" in {
+ val builder = new WhiskPodBuilder(kubeClient, affinity)
+ assertPodSettings(builder, true)
+ }
+
it should "extend existing pod template with affinity" in {
val template = """
|apiVersion: "v1"
@@ -154,15 +163,17 @@ class WhiskPodBuilderTests extends FlatSpec with Matchers with KubeClientSupport
terms.exists(_.getMatchExpressions.asScala.exists(_.getKey == "nodelabel")) shouldBe true
}
- private def assertPodSettings(builder: WhiskPodBuilder): Pod = {
+ private def assertPodSettings(builder: WhiskPodBuilder, pdbEnabled: Boolean = false): Pod = {
val config = KubernetesClientConfig(
KubernetesClientTimeoutConfig(1.second, 1.second),
KubernetesInvokerNodeAffinity(false, "k", "v"),
true,
None,
None,
- Some(KubernetesCpuScalingConfig(300, 3.MB, 1000)))
- val pod = builder.buildPodSpec(name, testImage, memLimit, Map("foo" -> "bar"), Map("fooL" -> "barV"), config)
+ Some(KubernetesCpuScalingConfig(300, 3.MB, 1000)),
+ pdbEnabled)
+ val labels = Map("fooL" -> "barV")
+ val (pod, pdb) = builder.buildPodSpec(name, testImage, memLimit, Map("foo" -> "bar"), labels, config)
withClue(Serialization.asYaml(pod)) {
val c = getActionContainer(pod)
c.getEnv.asScala.exists(_.getName == "foo") shouldBe true
@@ -184,6 +195,19 @@ class WhiskPodBuilderTests extends FlatSpec with Matchers with KubeClientSupport
terms.exists(_.getMatchExpressions.asScala.exists(_.getKey == affinity.key)) shouldBe true
}
}
+ if (pdbEnabled) {
+ println("matching pdb...")
+ pdb shouldBe Some(
+ new PodDisruptionBudgetBuilder().withNewMetadata
+ .withName(name)
+ .addToLabels(labels.asJava)
+ .endMetadata()
+ .withNewSpec()
+ .withMinAvailable(new IntOrString(1))
+ .withSelector(new LabelSelectorBuilder().withMatchLabels(Map("name" -> name).asJava).build())
+ .and
+ .build)
+ }
pod
}