You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/08/17 12:04:05 UTC

[GitHub] markusthoemmes closed pull request #3963: k8s: implement invoker-node affinity and eliminate usage of kubectl

markusthoemmes closed pull request #3963: k8s: implement invoker-node affinity and eliminate usage of kubectl
URL: https://github.com/apache/incubator-openwhisk/pull/3963
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index c4ae763960..743d36c853 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -60,7 +60,7 @@ dependencies {
     }
     compile 'com.github.ben-manes.caffeine:caffeine:2.4.0'
     compile 'com.google.code.findbugs:jsr305:3.0.2'
-    compile 'io.fabric8:kubernetes-client:2.5.7'
+    compile 'io.fabric8:kubernetes-client:4.0.3'
     compile 'io.kamon:kamon-core_2.11:0.6.7'
     compile 'io.kamon:kamon-statsd_2.11:0.6.7'
     //for mesos
diff --git a/core/invoker/Dockerfile b/core/invoker/Dockerfile
index d1c5d8cc51..268f24a500 100644
--- a/core/invoker/Dockerfile
+++ b/core/invoker/Dockerfile
@@ -4,7 +4,6 @@
 FROM scala
 
 ENV DOCKER_VERSION 1.12.0
-ENV KUBERNETES_VERSION 1.6.4
 
 RUN apk add --update openssl
 
@@ -17,11 +16,6 @@ rm -f docker-${DOCKER_VERSION}.tgz && \
 chmod +x /usr/bin/docker && \
 chmod +x /usr/bin/docker-runc
 
-# Install kubernetes client
-RUN wget --no-verbose https://storage.googleapis.com/kubernetes-release/release/v${KUBERNETES_VERSION}/bin/linux/amd64/kubectl && \
-chmod +x kubectl && \
-mv kubectl /usr/bin/kubectl
-
 ADD build/distributions/invoker.tar ./
 
 COPY init.sh /
diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf
index c471d1b726..244220afb1 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -46,14 +46,17 @@ whisk {
     # Timeouts for k8s commands. Set to "Inf" to disable timeout.
     timeouts {
       run: 1 minute
-      rm: 1 minute
-      inspect: 1 minute
       logs: 1 minute
     }
     invoker-agent {
       enabled: false
       port: 3233
     }
+    user-pod-node-affinity {
+      enabled: true
+      key: "openwhisk-role"
+      value: "invoker"
+    }
   }
 
   # Timeouts for runc commands. Set to "Inf" to disable timeout.
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala
index 9a19571ea6..c6e5233062 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala
@@ -17,15 +17,12 @@
 
 package whisk.core.containerpool.kubernetes
 
-import java.io.{FileNotFoundException, IOException}
+import java.io.IOException
 import java.net.SocketTimeoutException
-import java.nio.file.Files
-import java.nio.file.Paths
 import java.time.{Instant, ZoneId}
 import java.time.format.DateTimeFormatterBuilder
 
 import akka.actor.ActorSystem
-import akka.event.Logging.{ErrorLevel, InfoLevel}
 import akka.http.scaladsl.model.Uri
 import akka.http.scaladsl.model.Uri.Path
 import akka.http.scaladsl.model.Uri.Query
@@ -37,7 +34,6 @@ import akka.util.ByteString
 import io.fabric8.kubernetes.api.model._
 import pureconfig.loadConfigOrThrow
 import whisk.common.Logging
-import whisk.common.LoggingMarkers
 import whisk.common.TransactionId
 import whisk.core.ConfigKeys
 import whisk.core.containerpool.ContainerId
@@ -68,17 +64,27 @@ import scala.util.control.NonFatal
 /**
  * Configuration for kubernetes client command timeouts.
  */
-case class KubernetesClientTimeoutConfig(run: Duration, rm: Duration, inspect: Duration, logs: Duration)
+case class KubernetesClientTimeoutConfig(run: Duration, logs: Duration)
 
 /**
  * Configuration for kubernetes invoker-agent
  */
 case class KubernetesInvokerAgentConfig(enabled: Boolean, port: Int)
 
+/**
+ * Configuration for node affinity for the pods that execute user action containers
+ * The key,value pair should match the <key,value> pair with which the invoker worker nodes
+ * are labeled in the Kubernetes cluster.  The default pair is <openwhisk-role,invoker>,
+ * but a deployment may override this default if needed.
+ */
+case class KubernetesInvokerNodeAffinity(enabled: Boolean, key: String, value: String)
+
 /**
  * General configuration for kubernetes client
  */
-case class KubernetesClientConfig(timeouts: KubernetesClientTimeoutConfig, invokerAgent: KubernetesInvokerAgentConfig)
+case class KubernetesClientConfig(timeouts: KubernetesClientTimeoutConfig,
+                                  invokerAgent: KubernetesInvokerAgentConfig,
+                                  userPodNodeAffinity: KubernetesInvokerNodeAffinity)
 
 /**
  * Serves as an interface to the Kubernetes API by proxying its REST API and/or invoking the kubectl CLI.
@@ -101,19 +107,6 @@ class KubernetesClient(
       .withRequestTimeout(config.timeouts.logs.toMillis.toInt)
       .build())
 
-  // Determines how to run kubectl. Failure to find a kubectl binary implies
-  // a failure to initialize this instance of KubernetesClient.
-  protected def findKubectlCmd(): String = {
-    val alternatives = List("/usr/bin/kubectl", "/usr/local/bin/kubectl")
-    val kubectlBin = Try {
-      alternatives.find(a => Files.isExecutable(Paths.get(a))).get
-    } getOrElse {
-      throw new FileNotFoundException(s"Couldn't locate kubectl binary (tried: ${alternatives.mkString(", ")}).")
-    }
-    kubectlBin
-  }
-  protected val kubectlCmd = Seq(findKubectlCmd)
-
   def run(name: String,
           image: String,
           memory: ByteSize = 256.MB,
@@ -124,7 +117,7 @@ class KubernetesClient(
       case (key, value) => new EnvVarBuilder().withName(key).withValue(value).build()
     }.toSeq
 
-    val pod = new PodBuilder()
+    val podBuilder = new PodBuilder()
       .withNewMetadata()
       .withName(name)
       .addToLabels("name", name)
@@ -132,6 +125,23 @@ class KubernetesClient(
       .endMetadata()
       .withNewSpec()
       .withRestartPolicy("Always")
+    if (config.userPodNodeAffinity.enabled) {
+      val invokerNodeAffinity = new AffinityBuilder()
+        .withNewNodeAffinity()
+        .withNewRequiredDuringSchedulingIgnoredDuringExecution()
+        .addNewNodeSelectorTerm()
+        .addNewMatchExpression()
+        .withKey(config.userPodNodeAffinity.key)
+        .withOperator("In")
+        .withValues(config.userPodNodeAffinity.value)
+        .endMatchExpression()
+        .endNodeSelectorTerm()
+        .endRequiredDuringSchedulingIgnoredDuringExecution()
+        .endNodeAffinity()
+        .build()
+      podBuilder.withAffinity(invokerNodeAffinity)
+    }
+    val pod = podBuilder
       .addNewContainer()
       .withNewResources()
       .withLimits(Map("memory" -> new Quantity(memory.toMB + "Mi")).asJava)
@@ -166,11 +176,27 @@ class KubernetesClient(
   }
 
   def rm(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = {
-    runCmd(Seq("delete", "--now", "pod", container.id.asString), config.timeouts.rm).map(_ => ())
+    Future {
+      blocking {
+        kubeRestClient
+          .inNamespace(kubeRestClient.getNamespace)
+          .pods()
+          .withName(container.id.asString)
+          .delete()
+      }
+    }.map(_ => ())
   }
 
   def rm(key: String, value: String, ensureUnpaused: Boolean = false)(implicit transid: TransactionId): Future[Unit] = {
-    runCmd(Seq("delete", "--now", "pod", "-l", s"$key=$value"), config.timeouts.rm).map(_ => ())
+    Future {
+      blocking {
+        kubeRestClient
+          .inNamespace(kubeRestClient.getNamespace)
+          .pods()
+          .withLabel(key, value)
+          .delete()
+      }
+    }.map(_ => ())
   }
 
   // suspend is a no-op with the basic KubernetesClient
@@ -200,19 +226,6 @@ class KubernetesClient(
     implicit val kubernetes = this
     new KubernetesContainer(id, addr, workerIP, nativeContainerId)
   }
-
-  protected def runCmd(args: Seq[String], timeout: Duration)(implicit transid: TransactionId): Future[String] = {
-    val cmd = kubectlCmd ++ args
-    val start = transid.started(
-      this,
-      LoggingMarkers.INVOKER_KUBECTL_CMD(args.head),
-      s"running ${cmd.mkString(" ")} (timeout: $timeout)",
-      logLevel = InfoLevel)
-    executeProcess(cmd, timeout).andThen {
-      case Success(_) => transid.finished(this, start)
-      case Failure(t) => transid.failed(this, start, t.getMessage, ErrorLevel)
-    }
-  }
 }
 
 object KubernetesClient {
diff --git a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
index 1f584d7d41..e653b4117c 100644
--- a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
@@ -76,13 +76,9 @@ class KubernetesClientTests
   val id = ContainerId("55db56ee082239428b27d3728b4dd324c09068458aad9825727d5bfc1bba6d52")
   val container = kubernetesContainer(id)
 
-  val kubectlCommand = "kubectl"
-
   /** Returns a KubernetesClient with a mocked result for 'executeProcess' */
   def kubernetesClient(fixture: => Future[String]) = {
     new KubernetesClient()(global) {
-      override def findKubectlCmd() = kubectlCommand
-
       override def executeProcess(args: Seq[String], timeout: Duration)(implicit ec: ExecutionContext,
                                                                         as: ActorSystem) =
         fixture


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services