You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2018/08/27 13:40:49 UTC

[incubator-openwhisk] branch master updated: Refactor sentinel message into shared place. (#3909)

This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a0f248  Refactor sentinel message into shared place. (#3909)
3a0f248 is described below

commit 3a0f2488c0ff5111b39b265db73f473f2f3fdc67
Author: Tzu-Chiao Yeh <su...@gmail.com>
AuthorDate: Mon Aug 27 21:40:40 2018 +0800

    Refactor sentinel message into shared place. (#3909)
    
    Signed-off-by: Tzu-Chiao Yeh <su...@gmail.com>
---
 .../scala/whisk/core/containerpool/Container.scala    | 15 +++++++++++++--
 .../core/containerpool/docker/DockerContainer.scala   |  9 ++-------
 .../kubernetes/KubernetesContainer.scala              | 13 +++++--------
 .../test/scala/actionContainers/ActionContainer.scala |  6 ++++--
 .../test/scala/system/basic/WskRestBasicTests.scala   |  3 ++-
 .../docker/test/DockerContainerTests.scala            |  4 ++--
 .../kubernetes/test/KubernetesClientTests.scala       | 19 ++++++++++---------
 .../kubernetes/test/KubernetesContainerTests.scala    |  4 ++--
 8 files changed, 40 insertions(+), 33 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/core/containerpool/Container.scala b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
index ff0124e..dc59b3c 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
@@ -42,13 +42,24 @@ import scala.util.{Failure, Success}
  * for different container providers, but the implementation also needs to include
  * OpenWhisk specific behavior, especially for initialize and run.
  */
-case class ContainerId(val asString: String) {
+case class ContainerId(asString: String) {
   require(asString.nonEmpty, "ContainerId must not be empty")
 }
-case class ContainerAddress(val host: String, val port: Int = 8080) {
+case class ContainerAddress(host: String, port: Int = 8080) {
   require(host.nonEmpty, "ContainerIp must not be empty")
 }
 
+object Container {
+
+  /**
+   * The action proxies insert this line in the logs at the end of each activation for stdout/stderr.
+   *
+   * Note: Blackbox containers might not add this sentinel, as we cannot be sure the action developer actually does this.
+   */
+  val ACTIVATION_LOG_SENTINEL = "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX"
+
+}
+
 trait Container {
 
   implicit protected val as: ActorSystem
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
index 99ff3c3..fddabd6 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
@@ -42,12 +42,7 @@ import whisk.http.Messages
 
 object DockerContainer {
 
-  /**
-   * The action proxies insert this line in the logs at the end of each activation for stdout/stderr
-   *
-   * Note: Blackbox containers might not add this sentinel, as we cannot be sure the action developer actually does this.
-   */
-  val ActivationSentinel = ByteString("XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX")
+  private val byteStringSentinel = ByteString(Container.ACTIVATION_LOG_SENTINEL)
 
   /**
    * Creates a container running on a docker daemon.
@@ -272,7 +267,7 @@ class DockerContainer(protected val id: ContainerId,
         logFileOffset.addAndGet(size)
         size
       }
-      .via(new CompleteAfterOccurrences(_.containsSlice(DockerContainer.ActivationSentinel), 2, waitForSentinel))
+      .via(new CompleteAfterOccurrences(_.containsSlice(DockerContainer.byteStringSentinel), 2, waitForSentinel))
       // As we're reading the logs after the activation has finished the invariant is that all loglines are already
       // written and we mostly await them being flushed by the docker daemon. Therefore we can timeout based on the time
       // between two loglines appear without relying on the log frequency in the action itself.
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
index ca91bf4..7c86c11 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
@@ -20,20 +20,19 @@ package whisk.core.containerpool.kubernetes
 import akka.actor.ActorSystem
 import java.time.Instant
 import java.util.concurrent.atomic.AtomicReference
+
 import akka.stream.StreamLimitReachedException
 import akka.stream.scaladsl.Framing.FramingException
 import akka.stream.scaladsl.Source
 import akka.util.ByteString
+
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.duration._
 import whisk.common.Logging
 import whisk.common.TransactionId
-import whisk.core.containerpool.Container
-import whisk.core.containerpool.WhiskContainerStartupError
-import whisk.core.containerpool.ContainerId
-import whisk.core.containerpool.ContainerAddress
-import whisk.core.containerpool.docker.{CompleteAfterOccurrences, DockerContainer, OccurrencesNotFoundException}
+import whisk.core.containerpool._
+import whisk.core.containerpool.docker.{CompleteAfterOccurrences, OccurrencesNotFoundException}
 import whisk.core.entity.ByteSize
 import whisk.core.entity.size._
 import whisk.http.Messages
@@ -110,8 +109,6 @@ class KubernetesContainer(protected[core] val id: ContainerId,
     kubernetes.rm(this)
   }
 
-  private val stringSentinel = DockerContainer.ActivationSentinel.utf8String
-
   def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId): Source[ByteString, Any] = {
 
     kubernetes
@@ -124,7 +121,7 @@ class KubernetesContainer(protected[core] val id: ContainerId,
         lastTimestamp.set(Option(line.time))
         line
       }
-      .via(new CompleteAfterOccurrences(_.log == stringSentinel, 2, waitForSentinel))
+      .via(new CompleteAfterOccurrences(_.log == Container.ACTIVATION_LOG_SENTINEL, 2, waitForSentinel))
       .recover {
         case _: StreamLimitReachedException =>
           // While the stream has already ended by failing the limitWeighted stage above, we inject a truncation
diff --git a/tests/src/test/scala/actionContainers/ActionContainer.scala b/tests/src/test/scala/actionContainers/ActionContainer.scala
index 213c74e..153b6d2 100644
--- a/tests/src/test/scala/actionContainers/ActionContainer.scala
+++ b/tests/src/test/scala/actionContainers/ActionContainer.scala
@@ -35,6 +35,7 @@ import scala.util.{Failure, Success}
 import org.apache.commons.lang3.StringUtils
 import org.scalatest.{FlatSpec, Matchers}
 import akka.actor.ActorSystem
+
 import scala.concurrent.ExecutionContext
 import spray.json._
 import common.StreamLogging
@@ -42,6 +43,7 @@ import whisk.common.Logging
 import whisk.common.TransactionId
 import whisk.core.entity.Exec
 import common.WhiskProperties
+import whisk.core.containerpool.Container
 
 /**
  * For testing convenience, this interface abstracts away the REST calls to a
@@ -167,8 +169,8 @@ object ActionContainer {
     Await.result(proc(docker(cmd)), t)
   }
 
-  // Filters out the sentinel markers inserted by the container (see relevant private code in Invoker.scala)
-  val sentinel = "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX"
+  // Filters out the sentinel markers inserted by the container (see relevant private code in Invoker)
+  val sentinel = Container.ACTIVATION_LOG_SENTINEL
   def filterSentinel(str: String): String = str.replaceAll(sentinel, "").trim
 
   def withContainer(imageName: String, environment: Map[String, String] = Map.empty)(
diff --git a/tests/src/test/scala/system/basic/WskRestBasicTests.scala b/tests/src/test/scala/system/basic/WskRestBasicTests.scala
index 01fed1a..33cbdbd 100644
--- a/tests/src/test/scala/system/basic/WskRestBasicTests.scala
+++ b/tests/src/test/scala/system/basic/WskRestBasicTests.scala
@@ -32,6 +32,7 @@ import common.rest.WskRestOperations
 import common.rest.RestResult
 import spray.json._
 import spray.json.DefaultJsonProtocol._
+import whisk.core.containerpool.Container
 import whisk.http.Messages
 
 @RunWith(classOf[JUnitRunner])
@@ -299,7 +300,7 @@ class WskRestBasicTests extends TestHelpers with WskTestHelpers with WskActorSys
         activation.logs shouldBe defined
         val logs = activation.logs.get.toString
         logs should include("This is an example zip used with the docker skeleton action.")
-        logs should not include ("XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX")
+        logs should not include Container.ACTIVATION_LOG_SENTINEL
       }
   }
 
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
index 1f2ab32..eb3b2ff 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
@@ -63,8 +63,8 @@ object DockerContainerTests {
     val appendedLog = if (appendSentinel) {
       val lastTime = log.lastOption.map { case LogLine(time, _, _) => time }.getOrElse(Instant.EPOCH.toString)
       log :+
-        LogLine(lastTime, "stderr", s"${DockerContainer.ActivationSentinel.utf8String}\n") :+
-        LogLine(lastTime, "stdout", s"${DockerContainer.ActivationSentinel.utf8String}\n")
+        LogLine(lastTime, "stderr", s"${Container.ACTIVATION_LOG_SENTINEL}\n") :+
+        LogLine(lastTime, "stdout", s"${Container.ACTIVATION_LOG_SENTINEL}\n")
     } else {
       log
     }
diff --git a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
index e653b41..3336e83 100644
--- a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
@@ -44,6 +44,7 @@ import whisk.core.containerpool.{ContainerAddress, ContainerId}
 import whisk.core.containerpool.kubernetes._
 import whisk.core.entity.ByteSize
 import whisk.core.entity.size._
+import whisk.core.containerpool.Container.ACTIVATION_LOG_SENTINEL
 
 import scala.collection.mutable
 import scala.collection.immutable
@@ -92,13 +93,13 @@ class KubernetesClientTests
 
   behavior of "KubernetesClient"
 
-  val firstLog = """2018-02-06T00:00:18.419889342Z first activation
-                   |2018-02-06T00:00:18.419929471Z XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
-                   |2018-02-06T00:00:18.419988733Z XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
+  val firstLog = s"""2018-02-06T00:00:18.419889342Z first activation
+                   |2018-02-06T00:00:18.419929471Z $ACTIVATION_LOG_SENTINEL
+                   |2018-02-06T00:00:18.419988733Z $ACTIVATION_LOG_SENTINEL
                    |""".stripMargin
-  val secondLog = """2018-02-06T00:09:35.38267193Z second activation
-                    |2018-02-06T00:09:35.382990278Z XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
-                    |2018-02-06T00:09:35.383116503Z XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
+  val secondLog = s"""2018-02-06T00:09:35.38267193Z second activation
+                    |2018-02-06T00:09:35.382990278Z $ACTIVATION_LOG_SENTINEL
+                    |2018-02-06T00:09:35.383116503Z $ACTIVATION_LOG_SENTINEL
                     |""".stripMargin
 
   def firstSource(lastTimestamp: Option[Instant] = None): Source[TypedLogLine, Any] =
@@ -141,7 +142,7 @@ class KubernetesClientTests
     val logs = awaitLogs(client.logs(container, None))
     logs should have size 3
     logs(0) shouldBe TypedLogLine("2018-02-06T00:00:18.419889342Z", "stdout", "first activation")
-    logs(2) shouldBe TypedLogLine("2018-02-06T00:00:18.419988733Z", "stdout", "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX")
+    logs(2) shouldBe TypedLogLine("2018-02-06T00:00:18.419988733Z", "stdout", ACTIVATION_LOG_SENTINEL)
   }
 
   it should "return all logs after the one matching sinceTime" in {
@@ -156,7 +157,7 @@ class KubernetesClientTests
     val logs = awaitLogs(client.logs(container, testDate))
     logs should have size 3
     logs(0) shouldBe TypedLogLine("2018-02-06T00:09:35.38267193Z", "stdout", "second activation")
-    logs(2) shouldBe TypedLogLine("2018-02-06T00:09:35.383116503Z", "stdout", "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX")
+    logs(2) shouldBe TypedLogLine("2018-02-06T00:09:35.383116503Z", "stdout", ACTIVATION_LOG_SENTINEL)
   }
 
   it should "return all logs if none match sinceTime" in {
@@ -170,7 +171,7 @@ class KubernetesClientTests
     val logs = awaitLogs(client.logs(container, testDate))
     logs should have size 3
     logs(0) shouldBe TypedLogLine("2018-02-06T00:09:35.38267193Z", "stdout", "second activation")
-    logs(2) shouldBe TypedLogLine("2018-02-06T00:09:35.383116503Z", "stdout", "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX")
+    logs(2) shouldBe TypedLogLine("2018-02-06T00:09:35.383116503Z", "stdout", ACTIVATION_LOG_SENTINEL)
   }
 
 }
diff --git a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
index 2354bc9..4214cf7 100644
--- a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
@@ -500,8 +500,8 @@ object KubernetesContainerTests {
     if (appendSentinel) {
       val lastTime = log.lastOption.map { case TypedLogLine(time, _, _) => time }.getOrElse(Instant.EPOCH)
       log :+
-        TypedLogLine(lastTime, "stderr", s"${DockerContainer.ActivationSentinel.utf8String}") :+
-        TypedLogLine(lastTime, "stdout", s"${DockerContainer.ActivationSentinel.utf8String}")
+        TypedLogLine(lastTime, "stderr", s"${Container.ACTIVATION_LOG_SENTINEL}") :+
+        TypedLogLine(lastTime, "stdout", s"${Container.ACTIVATION_LOG_SENTINEL}")
     } else {
       log
     }