You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ty...@apache.org on 2020/03/12 16:40:31 UTC

[openwhisk] branch master updated: enable optional use of PodDisruptionBudget for action pods (#4856)

This is an automated email from the ASF dual-hosted git repository.

tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new a8ed4e4  enable optional use of PodDisruptionBudget for action pods (#4856)
a8ed4e4 is described below

commit a8ed4e4c00e95ae9dca339081552f724f9541e86
Author: tysonnorris <tn...@adobe.com>
AuthorDate: Thu Mar 12 09:40:16 2020 -0700

    enable optional use of PodDisruptionBudget for action pods (#4856)
---
 core/invoker/src/main/resources/application.conf   |  3 +
 .../kubernetes/KubernetesClient.scala              | 68 ++++++++++++----------
 .../containerpool/kubernetes/WhiskPodBuilder.scala | 40 ++++++++++---
 .../kubernetes/test/WhiskPodBuilderTests.scala     | 44 ++++++++++----
 4 files changed, 105 insertions(+), 50 deletions(-)

diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf
index 3264943..6c6d5aa 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -97,6 +97,9 @@ whisk {
     #  memory = 256 m
     #  max-millicpus = 4000
     #}
+
+    #enable PodDisruptionBudget creation for pods? (will include same labels as pods, and specify minAvailable=1 to prevent termination of action pods during maintenance)
+    pdb-enabled = false
   }
 
   # Timeouts for runc commands. Set to "Inf" to disable timeout.
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
index f1bca95..4154166 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
@@ -82,9 +82,10 @@ case class KubernetesInvokerNodeAffinity(enabled: Boolean, key: String, value: S
 case class KubernetesClientConfig(timeouts: KubernetesClientTimeoutConfig,
                                   userPodNodeAffinity: KubernetesInvokerNodeAffinity,
                                   portForwardingEnabled: Boolean,
-                                  actionNamespace: Option[String] = None,
-                                  podTemplate: Option[ConfigMapValue] = None,
-                                  cpuScaling: Option[KubernetesCpuScalingConfig] = None)
+                                  actionNamespace: Option[String],
+                                  podTemplate: Option[ConfigMapValue],
+                                  cpuScaling: Option[KubernetesCpuScalingConfig],
+                                  pdbEnabled: Boolean)
 
 /**
  * Serves as an interface to the Kubernetes API by proxying its REST API and/or invoking the kubectl CLI.
@@ -117,7 +118,7 @@ class KubernetesClient(
           environment: Map[String, String] = Map.empty,
           labels: Map[String, String] = Map.empty)(implicit transid: TransactionId): Future[KubernetesContainer] = {
 
-    val pod = podBuilder.buildPodSpec(name, image, memory, environment, labels, config)
+    val (pod, pdb) = podBuilder.buildPodSpec(name, image, memory, environment, labels, config)
     if (transid.meta.extraLogging) {
       log.info(this, s"Pod spec being created\n${Serialization.asYaml(pod)}")
     }
@@ -131,6 +132,12 @@ class KubernetesClient(
     //create the pod; catch any failure to end the transaction timer
     try {
       kubeRestClient.pods.inNamespace(namespace).create(pod)
+      pdb.map(
+        p =>
+          kubeRestClient.policy.podDisruptionBudget
+            .inNamespace(namespace)
+            .withName(name)
+            .create(p))
     } catch {
       case e: Throwable =>
         transid.failed(this, start, s"Failed create pod for '$name': ${e.getClass} - ${e.getMessage}", ErrorLevel)
@@ -171,18 +178,31 @@ class KubernetesClient(
   }
 
   def rm(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = {
+    deleteByName(container.id.asString)
+  }
+  def rm(podName: String)(implicit transid: TransactionId): Future[Unit] = {
+    deleteByName(podName)
+  }
+
+  def rm(key: String, value: String, ensureUnpaused: Boolean = false)(implicit transid: TransactionId): Future[Unit] = {
     val start = transid.started(
       this,
       LoggingMarkers.INVOKER_KUBEAPI_CMD("delete"),
-      s"Deleting pod ${container.id}",
+      s"Deleting pods with label $key = $value",
       logLevel = akka.event.Logging.InfoLevel)
     Future {
       blocking {
         kubeRestClient
           .inNamespace(kubeRestClient.getNamespace)
           .pods()
-          .withName(container.id.asString)
+          .withLabel(key, value)
           .delete()
+        if (config.pdbEnabled) {
+          kubeRestClient.policy.podDisruptionBudget
+            .inNamespace(kubeRestClient.getNamespace)
+            .withLabel(key, value)
+            .delete()
+        }
       }
     }.map(_ => transid.finished(this, start, logLevel = InfoLevel))
       .recover {
@@ -190,15 +210,15 @@ class KubernetesClient(
           transid.failed(
             this,
             start,
-            s"Failed delete pod for '${container.id}': ${e.getClass} - ${e.getMessage}",
+            s"Failed delete pods with label $key = $value: ${e.getClass} - ${e.getMessage}",
             ErrorLevel)
       }
   }
-  def rm(podName: String)(implicit transid: TransactionId): Future[Unit] = {
+  private def deleteByName(podName: String)(implicit transid: TransactionId) = {
     val start = transid.started(
       this,
       LoggingMarkers.INVOKER_KUBEAPI_CMD("delete"),
-      s"Deleting pod $podName",
+      s"Deleting pod ${podName}",
       logLevel = akka.event.Logging.InfoLevel)
     Future {
       blocking {
@@ -207,27 +227,12 @@ class KubernetesClient(
           .pods()
           .withName(podName)
           .delete()
-      }
-    }.map(_ => transid.finished(this, start, logLevel = InfoLevel))
-      .recover {
-        case e =>
-          transid.failed(this, start, s"Failed delete pod for '$podName': ${e.getClass} - ${e.getMessage}", ErrorLevel)
-      }
-  }
-
-  def rm(key: String, value: String, ensureUnpaused: Boolean = false)(implicit transid: TransactionId): Future[Unit] = {
-    val start = transid.started(
-      this,
-      LoggingMarkers.INVOKER_KUBEAPI_CMD("delete"),
-      s"Deleting pods with label $key = $value",
-      logLevel = akka.event.Logging.InfoLevel)
-    Future {
-      blocking {
-        kubeRestClient
-          .inNamespace(kubeRestClient.getNamespace)
-          .pods()
-          .withLabel(key, value)
-          .delete()
+        if (config.pdbEnabled) {
+          kubeRestClient.policy.podDisruptionBudget
+            .inNamespace(kubeRestClient.getNamespace)
+            .withName(podName)
+            .delete()
+        }
       }
     }.map(_ => transid.finished(this, start, logLevel = InfoLevel))
       .recover {
@@ -235,11 +240,10 @@ class KubernetesClient(
           transid.failed(
             this,
             start,
-            s"Failed delete pods with label $key = $value: ${e.getClass} - ${e.getMessage}",
+            s"Failed delete pod for '${podName}': ${e.getClass} - ${e.getMessage}",
             ErrorLevel)
       }
   }
-
   // suspend is a no-op with the basic KubernetesClient
   def suspend(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = Future.successful({})
 
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/WhiskPodBuilder.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/WhiskPodBuilder.scala
index f90384c..3b606ba 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/WhiskPodBuilder.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/WhiskPodBuilder.scala
@@ -21,7 +21,16 @@ import java.io.ByteArrayInputStream
 import java.nio.charset.StandardCharsets.UTF_8
 
 import io.fabric8.kubernetes.api.builder.Predicate
-import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, Pod, PodBuilder, Quantity}
+import io.fabric8.kubernetes.api.model.policy.{PodDisruptionBudget, PodDisruptionBudgetBuilder}
+import io.fabric8.kubernetes.api.model.{
+  ContainerBuilder,
+  EnvVarBuilder,
+  IntOrString,
+  LabelSelectorBuilder,
+  Pod,
+  PodBuilder,
+  Quantity
+}
 import io.fabric8.kubernetes.client.NamespacedKubernetesClient
 import org.apache.openwhisk.common.{ConfigMapValue, TransactionId}
 import org.apache.openwhisk.core.entity.ByteSize
@@ -37,12 +46,13 @@ class WhiskPodBuilder(client: NamespacedKubernetesClient,
 
   def affinityEnabled: Boolean = userPodNodeAffinity.enabled
 
-  def buildPodSpec(name: String,
-                   image: String,
-                   memory: ByteSize,
-                   environment: Map[String, String],
-                   labels: Map[String, String],
-                   config: KubernetesClientConfig)(implicit transid: TransactionId): Pod = {
+  def buildPodSpec(
+    name: String,
+    image: String,
+    memory: ByteSize,
+    environment: Map[String, String],
+    labels: Map[String, String],
+    config: KubernetesClientConfig)(implicit transid: TransactionId): (Pod, Option[PodDisruptionBudget]) = {
     val envVars = environment.map {
       case (key, value) => new EnvVarBuilder().withName(key).withValue(value).build()
     }.toSeq
@@ -118,7 +128,21 @@ class WhiskPodBuilder(client: NamespacedKubernetesClient,
       .endContainer()
       .endSpec()
       .build()
-    pod
+    val pdb = if (config.pdbEnabled) {
+      Some(
+        new PodDisruptionBudgetBuilder().withNewMetadata
+          .withName(name)
+          .addToLabels(labels.asJava)
+          .endMetadata()
+          .withNewSpec()
+          .withMinAvailable(new IntOrString(1))
+          .withSelector(new LabelSelectorBuilder().withMatchLabels(Map("name" -> name).asJava).build())
+          .and
+          .build)
+    } else {
+      None
+    }
+    (pod, pdb)
   }
 
   def calculateCpu(c: KubernetesCpuScalingConfig, memory: ByteSize): Int = {
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/WhiskPodBuilderTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/WhiskPodBuilderTests.scala
index 5ad9c58..12ed183 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/WhiskPodBuilderTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/WhiskPodBuilderTests.scala
@@ -17,7 +17,8 @@
 
 package org.apache.openwhisk.core.containerpool.kubernetes.test
 
-import io.fabric8.kubernetes.api.model.Pod
+import io.fabric8.kubernetes.api.model.policy.PodDisruptionBudgetBuilder
+import io.fabric8.kubernetes.api.model.{IntOrString, LabelSelectorBuilder, Pod}
 import io.fabric8.kubernetes.client.utils.Serialization
 import org.apache.openwhisk.common.{ConfigMapValue, TransactionId}
 import org.apache.openwhisk.core.containerpool.kubernetes.{
@@ -57,23 +58,24 @@ class WhiskPodBuilderTests extends FlatSpec with Matchers with KubeClientSupport
       true,
       None,
       None,
-      Some(KubernetesCpuScalingConfig(300, 3.MB, 1000)))
+      Some(KubernetesCpuScalingConfig(300, 3.MB, 1000)),
+      false)
 
-    val pod = builder.buildPodSpec(name, testImage, 2.MB, Map("foo" -> "bar"), Map("fooL" -> "barV"), config)
+    val (pod, _) = builder.buildPodSpec(name, testImage, 2.MB, Map("foo" -> "bar"), Map("fooL" -> "barV"), config)
     withClue(Serialization.asYaml(pod)) {
       val c = getActionContainer(pod)
       //min cpu is: config.millicpus
       c.getResources.getLimits.asScala.get("cpu").map(_.getAmount) shouldBe Some("300m")
     }
 
-    val pod2 = builder.buildPodSpec(name, testImage, 15.MB, Map("foo" -> "bar"), Map("fooL" -> "barV"), config)
+    val (pod2, _) = builder.buildPodSpec(name, testImage, 15.MB, Map("foo" -> "bar"), Map("fooL" -> "barV"), config)
     withClue(Serialization.asYaml(pod2)) {
       val c = getActionContainer(pod2)
       //max cpu is: config.maxMillicpus
       c.getResources.getLimits.asScala.get("cpu").map(_.getAmount) shouldBe Some("1000m")
     }
 
-    val pod3 = builder.buildPodSpec(name, testImage, 7.MB, Map("foo" -> "bar"), Map("fooL" -> "barV"), config)
+    val (pod3, _) = builder.buildPodSpec(name, testImage, 7.MB, Map("foo" -> "bar"), Map("fooL" -> "barV"), config)
     withClue(Serialization.asYaml(pod3)) {
       val c = getActionContainer(pod3)
       //scaled cpu is: action mem/config.mem x config.maxMillicpus
@@ -85,8 +87,10 @@ class WhiskPodBuilderTests extends FlatSpec with Matchers with KubeClientSupport
       KubernetesInvokerNodeAffinity(false, "k", "v"),
       true,
       None,
-      None)
-    val pod4 = builder.buildPodSpec(name, testImage, 7.MB, Map("foo" -> "bar"), Map("fooL" -> "barV"), config2)
+      None,
+      None,
+      false)
+    val (pod4, _) = builder.buildPodSpec(name, testImage, 7.MB, Map("foo" -> "bar"), Map("fooL" -> "barV"), config2)
     withClue(Serialization.asYaml(pod4)) {
       val c = getActionContainer(pod4)
       //if scaling config is not provided, no cpu resources are specified
@@ -131,6 +135,11 @@ class WhiskPodBuilderTests extends FlatSpec with Matchers with KubeClientSupport
     pod.getMetadata.getNamespace shouldBe "whiskns"
   }
 
+  it should "build a pod disruption budget for the pod, if enabled" in {
+    val builder = new WhiskPodBuilder(kubeClient, affinity)
+    assertPodSettings(builder, true)
+  }
+
   it should "extend existing pod template with affinity" in {
     val template = """
        |apiVersion: "v1"
@@ -154,15 +163,17 @@ class WhiskPodBuilderTests extends FlatSpec with Matchers with KubeClientSupport
     terms.exists(_.getMatchExpressions.asScala.exists(_.getKey == "nodelabel")) shouldBe true
   }
 
-  private def assertPodSettings(builder: WhiskPodBuilder): Pod = {
+  private def assertPodSettings(builder: WhiskPodBuilder, pdbEnabled: Boolean = false): Pod = {
     val config = KubernetesClientConfig(
       KubernetesClientTimeoutConfig(1.second, 1.second),
       KubernetesInvokerNodeAffinity(false, "k", "v"),
       true,
       None,
       None,
-      Some(KubernetesCpuScalingConfig(300, 3.MB, 1000)))
-    val pod = builder.buildPodSpec(name, testImage, memLimit, Map("foo" -> "bar"), Map("fooL" -> "barV"), config)
+      Some(KubernetesCpuScalingConfig(300, 3.MB, 1000)),
+      pdbEnabled)
+    val labels = Map("fooL" -> "barV")
+    val (pod, pdb) = builder.buildPodSpec(name, testImage, memLimit, Map("foo" -> "bar"), labels, config)
     withClue(Serialization.asYaml(pod)) {
       val c = getActionContainer(pod)
       c.getEnv.asScala.exists(_.getName == "foo") shouldBe true
@@ -184,6 +195,19 @@ class WhiskPodBuilderTests extends FlatSpec with Matchers with KubeClientSupport
         terms.exists(_.getMatchExpressions.asScala.exists(_.getKey == affinity.key)) shouldBe true
       }
     }
+    if (pdbEnabled) {
+      println("matching pdb...")
+      pdb shouldBe Some(
+        new PodDisruptionBudgetBuilder().withNewMetadata
+          .withName(name)
+          .addToLabels(labels.asJava)
+          .endMetadata()
+          .withNewSpec()
+          .withMinAvailable(new IntOrString(1))
+          .withSelector(new LabelSelectorBuilder().withMatchLabels(Map("name" -> name).asJava).build())
+          .and
+          .build)
+    }
     pod
   }