You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/08/27 13:40:42 UTC

[GitHub] rabbah closed pull request #3909: Refactor sentinel message into shared place.

rabbah closed pull request #3909: Refactor sentinel message into shared place.
URL: https://github.com/apache/incubator-openwhisk/pull/3909
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/src/main/scala/whisk/core/containerpool/Container.scala b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
index ff0124e8d8..dc59b3ce18 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
@@ -42,13 +42,24 @@ import scala.util.{Failure, Success}
  * for different container providers, but the implementation also needs to include
  * OpenWhisk specific behavior, especially for initialize and run.
  */
-case class ContainerId(val asString: String) {
+case class ContainerId(asString: String) {
   require(asString.nonEmpty, "ContainerId must not be empty")
 }
-case class ContainerAddress(val host: String, val port: Int = 8080) {
+case class ContainerAddress(host: String, port: Int = 8080) {
   require(host.nonEmpty, "ContainerIp must not be empty")
 }
 
+object Container {
+
+  /**
+   * The action proxies insert this line in the logs at the end of each activation for stdout/stderr.
+   *
+   * Note: Blackbox containers might not add this sentinel, as we cannot be sure the action developer actually does this.
+   */
+  val ACTIVATION_LOG_SENTINEL = "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX"
+
+}
+
 trait Container {
 
   implicit protected val as: ActorSystem
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
index 99ff3c3a2d..fddabd66ae 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
@@ -42,12 +42,7 @@ import whisk.http.Messages
 
 object DockerContainer {
 
-  /**
-   * The action proxies insert this line in the logs at the end of each activation for stdout/stderr
-   *
-   * Note: Blackbox containers might not add this sentinel, as we cannot be sure the action developer actually does this.
-   */
-  val ActivationSentinel = ByteString("XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX")
+  private val byteStringSentinel = ByteString(Container.ACTIVATION_LOG_SENTINEL)
 
   /**
    * Creates a container running on a docker daemon.
@@ -272,7 +267,7 @@ class DockerContainer(protected val id: ContainerId,
         logFileOffset.addAndGet(size)
         size
       }
-      .via(new CompleteAfterOccurrences(_.containsSlice(DockerContainer.ActivationSentinel), 2, waitForSentinel))
+      .via(new CompleteAfterOccurrences(_.containsSlice(DockerContainer.byteStringSentinel), 2, waitForSentinel))
       // As we're reading the logs after the activation has finished the invariant is that all loglines are already
       // written and we mostly await them being flushed by the docker daemon. Therefore we can timeout based on the time
       // between two loglines appear without relying on the log frequency in the action itself.
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
index 56e8f10d03..0e32be46af 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
@@ -20,20 +20,19 @@ package whisk.core.containerpool.kubernetes
 import akka.actor.ActorSystem
 import java.time.Instant
 import java.util.concurrent.atomic.AtomicReference
+
 import akka.stream.StreamLimitReachedException
 import akka.stream.scaladsl.Framing.FramingException
 import akka.stream.scaladsl.Source
 import akka.util.ByteString
+
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.duration._
 import whisk.common.Logging
 import whisk.common.TransactionId
-import whisk.core.containerpool.Container
-import whisk.core.containerpool.WhiskContainerStartupError
-import whisk.core.containerpool.ContainerId
-import whisk.core.containerpool.ContainerAddress
-import whisk.core.containerpool.docker.{CompleteAfterOccurrences, DockerContainer, OccurrencesNotFoundException}
+import whisk.core.containerpool._
+import whisk.core.containerpool.docker.{CompleteAfterOccurrences, OccurrencesNotFoundException}
 import whisk.core.entity.ByteSize
 import whisk.core.entity.size._
 import whisk.http.Messages
@@ -108,8 +107,6 @@ class KubernetesContainer(protected[core] val id: ContainerId,
     kubernetes.rm(this)
   }
 
-  private val stringSentinel = DockerContainer.ActivationSentinel.utf8String
-
   def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId): Source[ByteString, Any] = {
 
     kubernetes
@@ -122,7 +119,7 @@ class KubernetesContainer(protected[core] val id: ContainerId,
         lastTimestamp.set(Option(line.time))
         line
       }
-      .via(new CompleteAfterOccurrences(_.log == stringSentinel, 2, waitForSentinel))
+      .via(new CompleteAfterOccurrences(_.log == Container.ACTIVATION_LOG_SENTINEL, 2, waitForSentinel))
       .recover {
         case _: StreamLimitReachedException =>
           // While the stream has already ended by failing the limitWeighted stage above, we inject a truncation
diff --git a/tests/src/test/scala/actionContainers/ActionContainer.scala b/tests/src/test/scala/actionContainers/ActionContainer.scala
index 213c74e1f8..153b6d2e72 100644
--- a/tests/src/test/scala/actionContainers/ActionContainer.scala
+++ b/tests/src/test/scala/actionContainers/ActionContainer.scala
@@ -35,6 +35,7 @@ import scala.util.{Failure, Success}
 import org.apache.commons.lang3.StringUtils
 import org.scalatest.{FlatSpec, Matchers}
 import akka.actor.ActorSystem
+
 import scala.concurrent.ExecutionContext
 import spray.json._
 import common.StreamLogging
@@ -42,6 +43,7 @@ import whisk.common.Logging
 import whisk.common.TransactionId
 import whisk.core.entity.Exec
 import common.WhiskProperties
+import whisk.core.containerpool.Container
 
 /**
  * For testing convenience, this interface abstracts away the REST calls to a
@@ -167,8 +169,8 @@ object ActionContainer {
     Await.result(proc(docker(cmd)), t)
   }
 
-  // Filters out the sentinel markers inserted by the container (see relevant private code in Invoker.scala)
-  val sentinel = "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX"
+  // Filters out the sentinel markers inserted by the container (see relevant private code in Invoker)
+  val sentinel = Container.ACTIVATION_LOG_SENTINEL
   def filterSentinel(str: String): String = str.replaceAll(sentinel, "").trim
 
   def withContainer(imageName: String, environment: Map[String, String] = Map.empty)(
diff --git a/tests/src/test/scala/system/basic/WskRestBasicTests.scala b/tests/src/test/scala/system/basic/WskRestBasicTests.scala
index 01fed1a44e..33cbdbdc20 100644
--- a/tests/src/test/scala/system/basic/WskRestBasicTests.scala
+++ b/tests/src/test/scala/system/basic/WskRestBasicTests.scala
@@ -32,6 +32,7 @@ import common.rest.WskRestOperations
 import common.rest.RestResult
 import spray.json._
 import spray.json.DefaultJsonProtocol._
+import whisk.core.containerpool.Container
 import whisk.http.Messages
 
 @RunWith(classOf[JUnitRunner])
@@ -299,7 +300,7 @@ class WskRestBasicTests extends TestHelpers with WskTestHelpers with WskActorSys
         activation.logs shouldBe defined
         val logs = activation.logs.get.toString
         logs should include("This is an example zip used with the docker skeleton action.")
-        logs should not include ("XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX")
+        logs should not include Container.ACTIVATION_LOG_SENTINEL
       }
   }
 
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
index 1f2ab3245c..eb3b2ffed7 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
@@ -63,8 +63,8 @@ object DockerContainerTests {
     val appendedLog = if (appendSentinel) {
       val lastTime = log.lastOption.map { case LogLine(time, _, _) => time }.getOrElse(Instant.EPOCH.toString)
       log :+
-        LogLine(lastTime, "stderr", s"${DockerContainer.ActivationSentinel.utf8String}\n") :+
-        LogLine(lastTime, "stdout", s"${DockerContainer.ActivationSentinel.utf8String}\n")
+        LogLine(lastTime, "stderr", s"${Container.ACTIVATION_LOG_SENTINEL}\n") :+
+        LogLine(lastTime, "stdout", s"${Container.ACTIVATION_LOG_SENTINEL}\n")
     } else {
       log
     }
diff --git a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
index e653b4117c..3336e831b1 100644
--- a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
@@ -44,6 +44,7 @@ import whisk.core.containerpool.{ContainerAddress, ContainerId}
 import whisk.core.containerpool.kubernetes._
 import whisk.core.entity.ByteSize
 import whisk.core.entity.size._
+import whisk.core.containerpool.Container.ACTIVATION_LOG_SENTINEL
 
 import scala.collection.mutable
 import scala.collection.immutable
@@ -92,13 +93,13 @@ class KubernetesClientTests
 
   behavior of "KubernetesClient"
 
-  val firstLog = """2018-02-06T00:00:18.419889342Z first activation
-                   |2018-02-06T00:00:18.419929471Z XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
-                   |2018-02-06T00:00:18.419988733Z XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
+  val firstLog = s"""2018-02-06T00:00:18.419889342Z first activation
+                   |2018-02-06T00:00:18.419929471Z $ACTIVATION_LOG_SENTINEL
+                   |2018-02-06T00:00:18.419988733Z $ACTIVATION_LOG_SENTINEL
                    |""".stripMargin
-  val secondLog = """2018-02-06T00:09:35.38267193Z second activation
-                    |2018-02-06T00:09:35.382990278Z XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
-                    |2018-02-06T00:09:35.383116503Z XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
+  val secondLog = s"""2018-02-06T00:09:35.38267193Z second activation
+                    |2018-02-06T00:09:35.382990278Z $ACTIVATION_LOG_SENTINEL
+                    |2018-02-06T00:09:35.383116503Z $ACTIVATION_LOG_SENTINEL
                     |""".stripMargin
 
   def firstSource(lastTimestamp: Option[Instant] = None): Source[TypedLogLine, Any] =
@@ -141,7 +142,7 @@ class KubernetesClientTests
     val logs = awaitLogs(client.logs(container, None))
     logs should have size 3
     logs(0) shouldBe TypedLogLine("2018-02-06T00:00:18.419889342Z", "stdout", "first activation")
-    logs(2) shouldBe TypedLogLine("2018-02-06T00:00:18.419988733Z", "stdout", "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX")
+    logs(2) shouldBe TypedLogLine("2018-02-06T00:00:18.419988733Z", "stdout", ACTIVATION_LOG_SENTINEL)
   }
 
   it should "return all logs after the one matching sinceTime" in {
@@ -156,7 +157,7 @@ class KubernetesClientTests
     val logs = awaitLogs(client.logs(container, testDate))
     logs should have size 3
     logs(0) shouldBe TypedLogLine("2018-02-06T00:09:35.38267193Z", "stdout", "second activation")
-    logs(2) shouldBe TypedLogLine("2018-02-06T00:09:35.383116503Z", "stdout", "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX")
+    logs(2) shouldBe TypedLogLine("2018-02-06T00:09:35.383116503Z", "stdout", ACTIVATION_LOG_SENTINEL)
   }
 
   it should "return all logs if none match sinceTime" in {
@@ -170,7 +171,7 @@ class KubernetesClientTests
     val logs = awaitLogs(client.logs(container, testDate))
     logs should have size 3
     logs(0) shouldBe TypedLogLine("2018-02-06T00:09:35.38267193Z", "stdout", "second activation")
-    logs(2) shouldBe TypedLogLine("2018-02-06T00:09:35.383116503Z", "stdout", "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX")
+    logs(2) shouldBe TypedLogLine("2018-02-06T00:09:35.383116503Z", "stdout", ACTIVATION_LOG_SENTINEL)
   }
 
 }
diff --git a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
index 2354bc9d18..4214cf7ca6 100644
--- a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
@@ -500,8 +500,8 @@ object KubernetesContainerTests {
     if (appendSentinel) {
       val lastTime = log.lastOption.map { case TypedLogLine(time, _, _) => time }.getOrElse(Instant.EPOCH)
       log :+
-        TypedLogLine(lastTime, "stderr", s"${DockerContainer.ActivationSentinel.utf8String}") :+
-        TypedLogLine(lastTime, "stdout", s"${DockerContainer.ActivationSentinel.utf8String}")
+        TypedLogLine(lastTime, "stderr", s"${Container.ACTIVATION_LOG_SENTINEL}") :+
+        TypedLogLine(lastTime, "stdout", s"${Container.ACTIVATION_LOG_SENTINEL}")
     } else {
       log
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services