You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2018/08/17 12:04:09 UTC
[incubator-openwhisk] branch master updated: Implement invoker-node
affinity and eliminate usage of kubectl. (#3963)
This is an automated email from the ASF dual-hosted git repository.
markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 10ba53c Implement invoker-node affinity and eliminate usage of kubectl. (#3963)
10ba53c is described below
commit 10ba53c747d91a486cb43c3a34334c26193d8575
Author: David Grove <dg...@users.noreply.github.com>
AuthorDate: Fri Aug 17 08:04:03 2018 -0400
Implement invoker-node affinity and eliminate usage of kubectl. (#3963)
1. Upgrade to latest released version of the fabric8 Kubernetes client
to get access to an implementation of node affinity. Use that implementation
to optionally add a scheduling affinity to the pods created for actions
to bind them to worker nodes labeled as invoker nodes.
2. implement the container removal operation via the kube rest client
instead of via an exec to the kubectl cli. This eliminates the last
usage of kubectl in the KubernetesClient and allows the kubectl
executable to be removed from the invoker Docker image.
---
common/scala/build.gradle | 2 +-
core/invoker/Dockerfile | 6 --
core/invoker/src/main/resources/application.conf | 7 +-
.../kubernetes/KubernetesClient.scala | 85 +++++++++++++---------
.../kubernetes/test/KubernetesClientTests.scala | 4 -
5 files changed, 55 insertions(+), 49 deletions(-)
diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index c4ae763..743d36c 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -60,7 +60,7 @@ dependencies {
}
compile 'com.github.ben-manes.caffeine:caffeine:2.4.0'
compile 'com.google.code.findbugs:jsr305:3.0.2'
- compile 'io.fabric8:kubernetes-client:2.5.7'
+ compile 'io.fabric8:kubernetes-client:4.0.3'
compile 'io.kamon:kamon-core_2.11:0.6.7'
compile 'io.kamon:kamon-statsd_2.11:0.6.7'
//for mesos
diff --git a/core/invoker/Dockerfile b/core/invoker/Dockerfile
index d1c5d8c..268f24a 100644
--- a/core/invoker/Dockerfile
+++ b/core/invoker/Dockerfile
@@ -4,7 +4,6 @@
FROM scala
ENV DOCKER_VERSION 1.12.0
-ENV KUBERNETES_VERSION 1.6.4
RUN apk add --update openssl
@@ -17,11 +16,6 @@ rm -f docker-${DOCKER_VERSION}.tgz && \
chmod +x /usr/bin/docker && \
chmod +x /usr/bin/docker-runc
-# Install kubernetes client
-RUN wget --no-verbose https://storage.googleapis.com/kubernetes-release/release/v${KUBERNETES_VERSION}/bin/linux/amd64/kubectl && \
-chmod +x kubectl && \
-mv kubectl /usr/bin/kubectl
-
ADD build/distributions/invoker.tar ./
COPY init.sh /
diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf
index c471d1b..244220a 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -46,14 +46,17 @@ whisk {
# Timeouts for k8s commands. Set to "Inf" to disable timeout.
timeouts {
run: 1 minute
- rm: 1 minute
- inspect: 1 minute
logs: 1 minute
}
invoker-agent {
enabled: false
port: 3233
}
+ user-pod-node-affinity {
+ enabled: true
+ key: "openwhisk-role"
+ value: "invoker"
+ }
}
# Timeouts for runc commands. Set to "Inf" to disable timeout.
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala
index 9a19571..c6e5233 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala
@@ -17,15 +17,12 @@
package whisk.core.containerpool.kubernetes
-import java.io.{FileNotFoundException, IOException}
+import java.io.IOException
import java.net.SocketTimeoutException
-import java.nio.file.Files
-import java.nio.file.Paths
import java.time.{Instant, ZoneId}
import java.time.format.DateTimeFormatterBuilder
import akka.actor.ActorSystem
-import akka.event.Logging.{ErrorLevel, InfoLevel}
import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.model.Uri.Path
import akka.http.scaladsl.model.Uri.Query
@@ -37,7 +34,6 @@ import akka.util.ByteString
import io.fabric8.kubernetes.api.model._
import pureconfig.loadConfigOrThrow
import whisk.common.Logging
-import whisk.common.LoggingMarkers
import whisk.common.TransactionId
import whisk.core.ConfigKeys
import whisk.core.containerpool.ContainerId
@@ -68,7 +64,7 @@ import scala.util.control.NonFatal
/**
* Configuration for kubernetes client command timeouts.
*/
-case class KubernetesClientTimeoutConfig(run: Duration, rm: Duration, inspect: Duration, logs: Duration)
+case class KubernetesClientTimeoutConfig(run: Duration, logs: Duration)
/**
* Configuration for kubernetes invoker-agent
@@ -76,9 +72,19 @@ case class KubernetesClientTimeoutConfig(run: Duration, rm: Duration, inspect: D
case class KubernetesInvokerAgentConfig(enabled: Boolean, port: Int)
/**
+ * Configuration for node affinity for the pods that execute user action containers
+ * The key,value pair should match the <key,value> pair with which the invoker worker nodes
+ * are labeled in the Kubernetes cluster. The default pair is <openwhisk-role,invoker>,
+ * but a deployment may override this default if needed.
+ */
+case class KubernetesInvokerNodeAffinity(enabled: Boolean, key: String, value: String)
+
+/**
* General configuration for kubernetes client
*/
-case class KubernetesClientConfig(timeouts: KubernetesClientTimeoutConfig, invokerAgent: KubernetesInvokerAgentConfig)
+case class KubernetesClientConfig(timeouts: KubernetesClientTimeoutConfig,
+ invokerAgent: KubernetesInvokerAgentConfig,
+ userPodNodeAffinity: KubernetesInvokerNodeAffinity)
/**
* Serves as an interface to the Kubernetes API by proxying its REST API and/or invoking the kubectl CLI.
@@ -101,19 +107,6 @@ class KubernetesClient(
.withRequestTimeout(config.timeouts.logs.toMillis.toInt)
.build())
- // Determines how to run kubectl. Failure to find a kubectl binary implies
- // a failure to initialize this instance of KubernetesClient.
- protected def findKubectlCmd(): String = {
- val alternatives = List("/usr/bin/kubectl", "/usr/local/bin/kubectl")
- val kubectlBin = Try {
- alternatives.find(a => Files.isExecutable(Paths.get(a))).get
- } getOrElse {
- throw new FileNotFoundException(s"Couldn't locate kubectl binary (tried: ${alternatives.mkString(", ")}).")
- }
- kubectlBin
- }
- protected val kubectlCmd = Seq(findKubectlCmd)
-
def run(name: String,
image: String,
memory: ByteSize = 256.MB,
@@ -124,7 +117,7 @@ class KubernetesClient(
case (key, value) => new EnvVarBuilder().withName(key).withValue(value).build()
}.toSeq
- val pod = new PodBuilder()
+ val podBuilder = new PodBuilder()
.withNewMetadata()
.withName(name)
.addToLabels("name", name)
@@ -132,6 +125,23 @@ class KubernetesClient(
.endMetadata()
.withNewSpec()
.withRestartPolicy("Always")
+ if (config.userPodNodeAffinity.enabled) {
+ val invokerNodeAffinity = new AffinityBuilder()
+ .withNewNodeAffinity()
+ .withNewRequiredDuringSchedulingIgnoredDuringExecution()
+ .addNewNodeSelectorTerm()
+ .addNewMatchExpression()
+ .withKey(config.userPodNodeAffinity.key)
+ .withOperator("In")
+ .withValues(config.userPodNodeAffinity.value)
+ .endMatchExpression()
+ .endNodeSelectorTerm()
+ .endRequiredDuringSchedulingIgnoredDuringExecution()
+ .endNodeAffinity()
+ .build()
+ podBuilder.withAffinity(invokerNodeAffinity)
+ }
+ val pod = podBuilder
.addNewContainer()
.withNewResources()
.withLimits(Map("memory" -> new Quantity(memory.toMB + "Mi")).asJava)
@@ -166,11 +176,27 @@ class KubernetesClient(
}
def rm(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = {
- runCmd(Seq("delete", "--now", "pod", container.id.asString), config.timeouts.rm).map(_ => ())
+ Future {
+ blocking {
+ kubeRestClient
+ .inNamespace(kubeRestClient.getNamespace)
+ .pods()
+ .withName(container.id.asString)
+ .delete()
+ }
+ }.map(_ => ())
}
def rm(key: String, value: String, ensureUnpaused: Boolean = false)(implicit transid: TransactionId): Future[Unit] = {
- runCmd(Seq("delete", "--now", "pod", "-l", s"$key=$value"), config.timeouts.rm).map(_ => ())
+ Future {
+ blocking {
+ kubeRestClient
+ .inNamespace(kubeRestClient.getNamespace)
+ .pods()
+ .withLabel(key, value)
+ .delete()
+ }
+ }.map(_ => ())
}
// suspend is a no-op with the basic KubernetesClient
@@ -200,19 +226,6 @@ class KubernetesClient(
implicit val kubernetes = this
new KubernetesContainer(id, addr, workerIP, nativeContainerId)
}
-
- protected def runCmd(args: Seq[String], timeout: Duration)(implicit transid: TransactionId): Future[String] = {
- val cmd = kubectlCmd ++ args
- val start = transid.started(
- this,
- LoggingMarkers.INVOKER_KUBECTL_CMD(args.head),
- s"running ${cmd.mkString(" ")} (timeout: $timeout)",
- logLevel = InfoLevel)
- executeProcess(cmd, timeout).andThen {
- case Success(_) => transid.finished(this, start)
- case Failure(t) => transid.failed(this, start, t.getMessage, ErrorLevel)
- }
- }
}
object KubernetesClient {
diff --git a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
index 1f584d7..e653b41 100644
--- a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
@@ -76,13 +76,9 @@ class KubernetesClientTests
val id = ContainerId("55db56ee082239428b27d3728b4dd324c09068458aad9825727d5bfc1bba6d52")
val container = kubernetesContainer(id)
- val kubectlCommand = "kubectl"
-
/** Returns a KubernetesClient with a mocked result for 'executeProcess' */
def kubernetesClient(fixture: => Future[String]) = {
new KubernetesClient()(global) {
- override def findKubectlCmd() = kubectlCommand
-
override def executeProcess(args: Seq[String], timeout: Duration)(implicit ec: ExecutionContext,
as: ActorSystem) =
fixture