You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by mc...@apache.org on 2020/08/21 16:15:42 UTC
[openwhisk] branch master updated: Label pods before they are
initialized (#4927)
This is an automated email from the ASF dual-hosted git repository.
mcdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new df2c499 Label pods before they are initialized (#4927)
df2c499 is described below
commit df2c49970e804dfa64b55d54cebf85f394d934ea
Author: dan mcweeney <mc...@adobe.com>
AuthorDate: Fri Aug 21 12:15:24 2020 -0400
Label pods before they are initialized (#4927)
---
.../openwhisk/core/containerpool/Container.scala | 8 +++++---
.../core/containerpool/ContainerProxy.scala | 3 ++-
.../kubernetes/KubernetesClient.scala | 22 +++++++++++++++++++++-
.../kubernetes/KubernetesContainer.scala | 17 ++++++++++++++++-
.../kubernetes/test/KubernetesClientTests.scala | 4 ++++
.../containerpool/test/ContainerProxyTests.scala | 12 ++++++++----
6 files changed, 56 insertions(+), 10 deletions(-)
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
index 6717198..e3b5b5e 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
@@ -30,7 +30,7 @@ import spray.json.JsObject
import org.apache.openwhisk.common.{Logging, LoggingMarkers, TransactionId}
import org.apache.openwhisk.core.ConfigKeys
import org.apache.openwhisk.core.entity.ActivationResponse.{ContainerConnectionError, ContainerResponse}
-import org.apache.openwhisk.core.entity.{ActivationEntityLimit, ActivationResponse, ByteSize}
+import org.apache.openwhisk.core.entity.{ActivationEntityLimit, ActivationResponse, ByteSize, WhiskAction}
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.http.Messages
@@ -110,8 +110,10 @@ trait Container {
}
/** Initializes code in the container. */
- def initialize(initializer: JsObject, timeout: FiniteDuration, maxConcurrent: Int)(
- implicit transid: TransactionId): Future[Interval] = {
+ def initialize(initializer: JsObject,
+ timeout: FiniteDuration,
+ maxConcurrent: Int,
+ entity: Option[WhiskAction] = None)(implicit transid: TransactionId): Future[Interval] = {
val start = transid.started(
this,
LoggingMarkers.INVOKER_ACTIVATION_INIT,
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
index cf1dad6..b1625a3 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
@@ -778,7 +778,8 @@ class ContainerProxy(factory: (TransactionId,
.initialize(
job.action.containerInitializer(env ++ owEnv),
actionTimeout,
- job.action.limits.concurrency.maxConcurrent)
+ job.action.limits.concurrency.maxConcurrent,
+ Some(job.action.toWhiskAction))
.map(Some(_))
}
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
index 87e650f..57b14be 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
@@ -201,6 +201,7 @@ class KubernetesClient(
def rm(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = {
deleteByName(container.id.asString)
}
+
def rm(podName: String)(implicit transid: TransactionId): Future[Unit] = {
deleteByName(podName)
}
@@ -235,6 +236,7 @@ class KubernetesClient(
ErrorLevel)
}
}
+
private def deleteByName(podName: String)(implicit transid: TransactionId) = {
val start = transid.started(
this,
@@ -265,6 +267,7 @@ class KubernetesClient(
ErrorLevel)
}
}
+
// suspend is a no-op with the basic KubernetesClient
def suspend(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = Future.successful({})
@@ -299,6 +302,7 @@ class KubernetesClient(
implicit val kubernetes = this
new KubernetesContainer(id, addr, workerIP, nativeContainerId, portFwd)
}
+
// check for ready status every 1 second until timeout (minus the start time, which is the time for the pod create call) has past
private def waitForPod(namespace: String,
pod: Pod,
@@ -322,8 +326,22 @@ class KubernetesClient(
Future.successful(readyPod.get())
}
}
-}
+ def addLabel(container: KubernetesContainer, labels: Map[String, String]): Future[Unit] =
+ try {
+ kubeRestClient
+ .pods()
+ .withName(container.id.asString)
+ .edit()
+ .editMetadata()
+ .addToLabels(labels.asJava)
+ .endMetadata()
+ .done()
+ Future.successful({})
+ } catch {
+ case e: Throwable => Future.failed(e)
+ }
+}
object KubernetesClient {
// Necessary, as Kubernetes uses nanosecond precision in logs, but java.time.Instant toString uses milliseconds
@@ -363,6 +381,8 @@ trait KubernetesApi {
def logs(container: KubernetesContainer, sinceTime: Option[Instant], waitForSentinel: Boolean = false)(
implicit transid: TransactionId): Source[TypedLogLine, Any]
+
+ def addLabel(container: KubernetesContainer, labels: Map[String, String]): Future[Unit]
}
object KubernetesRestLogSourceStage {
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainer.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainer.scala
index 8c90e29..0e23470 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainer.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainer.scala
@@ -34,9 +34,10 @@ import org.apache.openwhisk.common.Logging
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.containerpool._
import org.apache.openwhisk.core.containerpool.docker.{CompleteAfterOccurrences, OccurrencesNotFoundException}
-import org.apache.openwhisk.core.entity.ByteSize
+import org.apache.openwhisk.core.entity.{ByteSize, WhiskAction}
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.http.Messages
+import spray.json.JsObject
import scala.util.Failure
@@ -129,6 +130,20 @@ class KubernetesContainer(protected[core] val id: ContainerId,
kubernetes.rm(this)
}
+ override def initialize(initializer: JsObject,
+ timeout: FiniteDuration,
+ maxConcurrent: Int,
+ entity: Option[WhiskAction] = None)(implicit transid: TransactionId): Future[Interval] = {
+ entity match {
+ case Some(e) => {
+ kubernetes
+ .addLabel(this, Map("openwhisk/action" -> e.name.toString, "openwhisk/namespace" -> e.namespace.toString))
+ .map(return super.initialize(initializer, timeout, maxConcurrent, entity))
+ }
+ case None => super.initialize(initializer, timeout, maxConcurrent, entity)
+ }
+ }
+
def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId): Source[ByteString, Any] = {
kubernetes
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
index 942f903..093974a 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
@@ -233,5 +233,9 @@ object KubernetesClientTests {
logCalls += ((container.id, sinceTime))
Source(List.empty[TypedLogLine])
}
+
+ override def addLabel(container: KubernetesContainer, labels: Map[String, String]): Future[Unit] = {
+ Future.successful({})
+ }
}
}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
index f64a0f8..a63e108 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
@@ -1405,7 +1405,8 @@ class ContainerProxyTests
val container = new TestContainer {
override def initialize(initializer: JsObject,
timeout: FiniteDuration,
- concurrent: Int)(implicit transid: TransactionId): Future[Interval] = {
+ concurrent: Int,
+ entity: Option[WhiskAction] = None)(implicit transid: TransactionId): Future[Interval] = {
initializeCount += 1
Future.failed(InitializationError(initInterval, ActivationResponse.developerError("boom")))
}
@@ -1806,7 +1807,8 @@ class ContainerProxyTests
val container = new TestContainer {
override def initialize(initializer: JsObject,
timeout: FiniteDuration,
- concurrent: Int)(implicit transid: TransactionId): Future[Interval] = {
+ concurrent: Int,
+ entity: Option[WhiskAction] = None)(implicit transid: TransactionId): Future[Interval] = {
initializeCount += 1
initPromise.future
}
@@ -2055,8 +2057,10 @@ class ContainerProxyTests
destroyCount += 1
super.destroy()
}
- override def initialize(initializer: JsObject, timeout: FiniteDuration, concurrent: Int)(
- implicit transid: TransactionId): Future[Interval] = {
+ override def initialize(initializer: JsObject,
+ timeout: FiniteDuration,
+ concurrent: Int,
+ entity: Option[WhiskAction] = None)(implicit transid: TransactionId): Future[Interval] = {
initializeCount += 1
val envField = "env"