You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by mc...@apache.org on 2020/08/21 16:15:42 UTC

[openwhisk] branch master updated: Label pods before they are initialized (#4927)

This is an automated email from the ASF dual-hosted git repository.

mcdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new df2c499  Label pods before they are initialized (#4927)
df2c499 is described below

commit df2c49970e804dfa64b55d54cebf85f394d934ea
Author: dan mcweeney <mc...@adobe.com>
AuthorDate: Fri Aug 21 12:15:24 2020 -0400

    Label pods before they are initialized (#4927)
---
 .../openwhisk/core/containerpool/Container.scala   |  8 +++++---
 .../core/containerpool/ContainerProxy.scala        |  3 ++-
 .../kubernetes/KubernetesClient.scala              | 22 +++++++++++++++++++++-
 .../kubernetes/KubernetesContainer.scala           | 17 ++++++++++++++++-
 .../kubernetes/test/KubernetesClientTests.scala    |  4 ++++
 .../containerpool/test/ContainerProxyTests.scala   | 12 ++++++++----
 6 files changed, 56 insertions(+), 10 deletions(-)

diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
index 6717198..e3b5b5e 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
@@ -30,7 +30,7 @@ import spray.json.JsObject
 import org.apache.openwhisk.common.{Logging, LoggingMarkers, TransactionId}
 import org.apache.openwhisk.core.ConfigKeys
 import org.apache.openwhisk.core.entity.ActivationResponse.{ContainerConnectionError, ContainerResponse}
-import org.apache.openwhisk.core.entity.{ActivationEntityLimit, ActivationResponse, ByteSize}
+import org.apache.openwhisk.core.entity.{ActivationEntityLimit, ActivationResponse, ByteSize, WhiskAction}
 import org.apache.openwhisk.core.entity.size._
 import org.apache.openwhisk.http.Messages
 
@@ -110,8 +110,10 @@ trait Container {
   }
 
   /** Initializes code in the container. */
-  def initialize(initializer: JsObject, timeout: FiniteDuration, maxConcurrent: Int)(
-    implicit transid: TransactionId): Future[Interval] = {
+  def initialize(initializer: JsObject,
+                 timeout: FiniteDuration,
+                 maxConcurrent: Int,
+                 entity: Option[WhiskAction] = None)(implicit transid: TransactionId): Future[Interval] = {
     val start = transid.started(
       this,
       LoggingMarkers.INVOKER_ACTIVATION_INIT,
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
index cf1dad6..b1625a3 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
@@ -778,7 +778,8 @@ class ContainerProxy(factory: (TransactionId,
           .initialize(
             job.action.containerInitializer(env ++ owEnv),
             actionTimeout,
-            job.action.limits.concurrency.maxConcurrent)
+            job.action.limits.concurrency.maxConcurrent,
+            Some(job.action.toWhiskAction))
           .map(Some(_))
     }
 
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
index 87e650f..57b14be 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
@@ -201,6 +201,7 @@ class KubernetesClient(
   def rm(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = {
     deleteByName(container.id.asString)
   }
+
   def rm(podName: String)(implicit transid: TransactionId): Future[Unit] = {
     deleteByName(podName)
   }
@@ -235,6 +236,7 @@ class KubernetesClient(
             ErrorLevel)
       }
   }
+
   private def deleteByName(podName: String)(implicit transid: TransactionId) = {
     val start = transid.started(
       this,
@@ -265,6 +267,7 @@ class KubernetesClient(
             ErrorLevel)
       }
   }
+
   // suspend is a no-op with the basic KubernetesClient
   def suspend(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = Future.successful({})
 
@@ -299,6 +302,7 @@ class KubernetesClient(
     implicit val kubernetes = this
     new KubernetesContainer(id, addr, workerIP, nativeContainerId, portFwd)
   }
+
   // check for ready status every 1 second until timeout (minus the start time, which is the time for the pod create call) has past
   private def waitForPod(namespace: String,
                          pod: Pod,
@@ -322,8 +326,22 @@ class KubernetesClient(
       Future.successful(readyPod.get())
     }
   }
-}
 
+  def addLabel(container: KubernetesContainer, labels: Map[String, String]): Future[Unit] =
+    try {
+      kubeRestClient
+        .pods()
+        .withName(container.id.asString)
+        .edit()
+        .editMetadata()
+        .addToLabels(labels.asJava)
+        .endMetadata()
+        .done()
+      Future.successful({})
+    } catch {
+      case e: Throwable => Future.failed(e)
+    }
+}
 object KubernetesClient {
 
   // Necessary, as Kubernetes uses nanosecond precision in logs, but java.time.Instant toString uses milliseconds
@@ -363,6 +381,8 @@ trait KubernetesApi {
 
   def logs(container: KubernetesContainer, sinceTime: Option[Instant], waitForSentinel: Boolean = false)(
     implicit transid: TransactionId): Source[TypedLogLine, Any]
+
+  def addLabel(container: KubernetesContainer, labels: Map[String, String]): Future[Unit]
 }
 
 object KubernetesRestLogSourceStage {
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainer.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainer.scala
index 8c90e29..0e23470 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainer.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainer.scala
@@ -34,9 +34,10 @@ import org.apache.openwhisk.common.Logging
 import org.apache.openwhisk.common.TransactionId
 import org.apache.openwhisk.core.containerpool._
 import org.apache.openwhisk.core.containerpool.docker.{CompleteAfterOccurrences, OccurrencesNotFoundException}
-import org.apache.openwhisk.core.entity.ByteSize
+import org.apache.openwhisk.core.entity.{ByteSize, WhiskAction}
 import org.apache.openwhisk.core.entity.size._
 import org.apache.openwhisk.http.Messages
+import spray.json.JsObject
 
 import scala.util.Failure
 
@@ -129,6 +130,20 @@ class KubernetesContainer(protected[core] val id: ContainerId,
     kubernetes.rm(this)
   }
 
+  override def initialize(initializer: JsObject,
+                          timeout: FiniteDuration,
+                          maxConcurrent: Int,
+                          entity: Option[WhiskAction] = None)(implicit transid: TransactionId): Future[Interval] = {
+    entity match {
+      case Some(e) => {
+        kubernetes
+          .addLabel(this, Map("openwhisk/action" -> e.name.toString, "openwhisk/namespace" -> e.namespace.toString))
+          .map(return super.initialize(initializer, timeout, maxConcurrent, entity))
+      }
+      case None => super.initialize(initializer, timeout, maxConcurrent, entity)
+    }
+  }
+
   def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId): Source[ByteString, Any] = {
 
     kubernetes
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
index 942f903..093974a 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
@@ -233,5 +233,9 @@ object KubernetesClientTests {
       logCalls += ((container.id, sinceTime))
       Source(List.empty[TypedLogLine])
     }
+
+    override def addLabel(container: KubernetesContainer, labels: Map[String, String]): Future[Unit] = {
+      Future.successful({})
+    }
   }
 }
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
index f64a0f8..a63e108 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
@@ -1405,7 +1405,8 @@ class ContainerProxyTests
     val container = new TestContainer {
       override def initialize(initializer: JsObject,
                               timeout: FiniteDuration,
-                              concurrent: Int)(implicit transid: TransactionId): Future[Interval] = {
+                              concurrent: Int,
+                              entity: Option[WhiskAction] = None)(implicit transid: TransactionId): Future[Interval] = {
         initializeCount += 1
         Future.failed(InitializationError(initInterval, ActivationResponse.developerError("boom")))
       }
@@ -1806,7 +1807,8 @@ class ContainerProxyTests
     val container = new TestContainer {
       override def initialize(initializer: JsObject,
                               timeout: FiniteDuration,
-                              concurrent: Int)(implicit transid: TransactionId): Future[Interval] = {
+                              concurrent: Int,
+                              entity: Option[WhiskAction] = None)(implicit transid: TransactionId): Future[Interval] = {
         initializeCount += 1
         initPromise.future
       }
@@ -2055,8 +2057,10 @@ class ContainerProxyTests
       destroyCount += 1
       super.destroy()
     }
-    override def initialize(initializer: JsObject, timeout: FiniteDuration, concurrent: Int)(
-      implicit transid: TransactionId): Future[Interval] = {
+    override def initialize(initializer: JsObject,
+                            timeout: FiniteDuration,
+                            concurrent: Int,
+                            entity: Option[WhiskAction] = None)(implicit transid: TransactionId): Future[Interval] = {
       initializeCount += 1
 
       val envField = "env"