You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2018/08/27 13:40:49 UTC
[incubator-openwhisk] branch master updated: Refactor sentinel
message into shared place. (#3909)
This is an automated email from the ASF dual-hosted git repository.
rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 3a0f248 Refactor sentinel message into shared place. (#3909)
3a0f248 is described below
commit 3a0f2488c0ff5111b39b265db73f473f2f3fdc67
Author: Tzu-Chiao Yeh <su...@gmail.com>
AuthorDate: Mon Aug 27 21:40:40 2018 +0800
Refactor sentinel message into shared place. (#3909)
Signed-off-by: Tzu-Chiao Yeh <su...@gmail.com>
---
.../scala/whisk/core/containerpool/Container.scala | 15 +++++++++++++--
.../core/containerpool/docker/DockerContainer.scala | 9 ++-------
.../kubernetes/KubernetesContainer.scala | 13 +++++--------
.../test/scala/actionContainers/ActionContainer.scala | 6 ++++--
.../test/scala/system/basic/WskRestBasicTests.scala | 3 ++-
.../docker/test/DockerContainerTests.scala | 4 ++--
.../kubernetes/test/KubernetesClientTests.scala | 19 ++++++++++---------
.../kubernetes/test/KubernetesContainerTests.scala | 4 ++--
8 files changed, 40 insertions(+), 33 deletions(-)
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/Container.scala b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
index ff0124e..dc59b3c 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
@@ -42,13 +42,24 @@ import scala.util.{Failure, Success}
* for different container providers, but the implementation also needs to include
* OpenWhisk specific behavior, especially for initialize and run.
*/
-case class ContainerId(val asString: String) {
+case class ContainerId(asString: String) {
require(asString.nonEmpty, "ContainerId must not be empty")
}
-case class ContainerAddress(val host: String, val port: Int = 8080) {
+case class ContainerAddress(host: String, port: Int = 8080) {
require(host.nonEmpty, "ContainerIp must not be empty")
}
+object Container {
+
+ /**
+ * The action proxies insert this line in the logs at the end of each activation for stdout/stderr.
+ *
+ * Note: Blackbox containers might not add this sentinel, as we cannot be sure the action developer actually does this.
+ */
+ val ACTIVATION_LOG_SENTINEL = "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX"
+
+}
+
trait Container {
implicit protected val as: ActorSystem
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
index 99ff3c3..fddabd6 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
@@ -42,12 +42,7 @@ import whisk.http.Messages
object DockerContainer {
- /**
- * The action proxies insert this line in the logs at the end of each activation for stdout/stderr
- *
- * Note: Blackbox containers might not add this sentinel, as we cannot be sure the action developer actually does this.
- */
- val ActivationSentinel = ByteString("XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX")
+ private val byteStringSentinel = ByteString(Container.ACTIVATION_LOG_SENTINEL)
/**
* Creates a container running on a docker daemon.
@@ -272,7 +267,7 @@ class DockerContainer(protected val id: ContainerId,
logFileOffset.addAndGet(size)
size
}
- .via(new CompleteAfterOccurrences(_.containsSlice(DockerContainer.ActivationSentinel), 2, waitForSentinel))
+ .via(new CompleteAfterOccurrences(_.containsSlice(DockerContainer.byteStringSentinel), 2, waitForSentinel))
// As we're reading the logs after the activation has finished the invariant is that all loglines are already
// written and we mostly await them being flushed by the docker daemon. Therefore we can timeout based on the time
// between two loglines appear without relying on the log frequency in the action itself.
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
index ca91bf4..7c86c11 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
@@ -20,20 +20,19 @@ package whisk.core.containerpool.kubernetes
import akka.actor.ActorSystem
import java.time.Instant
import java.util.concurrent.atomic.AtomicReference
+
import akka.stream.StreamLimitReachedException
import akka.stream.scaladsl.Framing.FramingException
import akka.stream.scaladsl.Source
import akka.util.ByteString
+
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration._
import whisk.common.Logging
import whisk.common.TransactionId
-import whisk.core.containerpool.Container
-import whisk.core.containerpool.WhiskContainerStartupError
-import whisk.core.containerpool.ContainerId
-import whisk.core.containerpool.ContainerAddress
-import whisk.core.containerpool.docker.{CompleteAfterOccurrences, DockerContainer, OccurrencesNotFoundException}
+import whisk.core.containerpool._
+import whisk.core.containerpool.docker.{CompleteAfterOccurrences, OccurrencesNotFoundException}
import whisk.core.entity.ByteSize
import whisk.core.entity.size._
import whisk.http.Messages
@@ -110,8 +109,6 @@ class KubernetesContainer(protected[core] val id: ContainerId,
kubernetes.rm(this)
}
- private val stringSentinel = DockerContainer.ActivationSentinel.utf8String
-
def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId): Source[ByteString, Any] = {
kubernetes
@@ -124,7 +121,7 @@ class KubernetesContainer(protected[core] val id: ContainerId,
lastTimestamp.set(Option(line.time))
line
}
- .via(new CompleteAfterOccurrences(_.log == stringSentinel, 2, waitForSentinel))
+ .via(new CompleteAfterOccurrences(_.log == Container.ACTIVATION_LOG_SENTINEL, 2, waitForSentinel))
.recover {
case _: StreamLimitReachedException =>
// While the stream has already ended by failing the limitWeighted stage above, we inject a truncation
diff --git a/tests/src/test/scala/actionContainers/ActionContainer.scala b/tests/src/test/scala/actionContainers/ActionContainer.scala
index 213c74e..153b6d2 100644
--- a/tests/src/test/scala/actionContainers/ActionContainer.scala
+++ b/tests/src/test/scala/actionContainers/ActionContainer.scala
@@ -35,6 +35,7 @@ import scala.util.{Failure, Success}
import org.apache.commons.lang3.StringUtils
import org.scalatest.{FlatSpec, Matchers}
import akka.actor.ActorSystem
+
import scala.concurrent.ExecutionContext
import spray.json._
import common.StreamLogging
@@ -42,6 +43,7 @@ import whisk.common.Logging
import whisk.common.TransactionId
import whisk.core.entity.Exec
import common.WhiskProperties
+import whisk.core.containerpool.Container
/**
* For testing convenience, this interface abstracts away the REST calls to a
@@ -167,8 +169,8 @@ object ActionContainer {
Await.result(proc(docker(cmd)), t)
}
- // Filters out the sentinel markers inserted by the container (see relevant private code in Invoker.scala)
- val sentinel = "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX"
+ // Filters out the sentinel markers inserted by the container (see relevant private code in Invoker)
+ val sentinel = Container.ACTIVATION_LOG_SENTINEL
def filterSentinel(str: String): String = str.replaceAll(sentinel, "").trim
def withContainer(imageName: String, environment: Map[String, String] = Map.empty)(
diff --git a/tests/src/test/scala/system/basic/WskRestBasicTests.scala b/tests/src/test/scala/system/basic/WskRestBasicTests.scala
index 01fed1a..33cbdbd 100644
--- a/tests/src/test/scala/system/basic/WskRestBasicTests.scala
+++ b/tests/src/test/scala/system/basic/WskRestBasicTests.scala
@@ -32,6 +32,7 @@ import common.rest.WskRestOperations
import common.rest.RestResult
import spray.json._
import spray.json.DefaultJsonProtocol._
+import whisk.core.containerpool.Container
import whisk.http.Messages
@RunWith(classOf[JUnitRunner])
@@ -299,7 +300,7 @@ class WskRestBasicTests extends TestHelpers with WskTestHelpers with WskActorSys
activation.logs shouldBe defined
val logs = activation.logs.get.toString
logs should include("This is an example zip used with the docker skeleton action.")
- logs should not include ("XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX")
+ logs should not include Container.ACTIVATION_LOG_SENTINEL
}
}
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
index 1f2ab32..eb3b2ff 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
@@ -63,8 +63,8 @@ object DockerContainerTests {
val appendedLog = if (appendSentinel) {
val lastTime = log.lastOption.map { case LogLine(time, _, _) => time }.getOrElse(Instant.EPOCH.toString)
log :+
- LogLine(lastTime, "stderr", s"${DockerContainer.ActivationSentinel.utf8String}\n") :+
- LogLine(lastTime, "stdout", s"${DockerContainer.ActivationSentinel.utf8String}\n")
+ LogLine(lastTime, "stderr", s"${Container.ACTIVATION_LOG_SENTINEL}\n") :+
+ LogLine(lastTime, "stdout", s"${Container.ACTIVATION_LOG_SENTINEL}\n")
} else {
log
}
diff --git a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
index e653b41..3336e83 100644
--- a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
@@ -44,6 +44,7 @@ import whisk.core.containerpool.{ContainerAddress, ContainerId}
import whisk.core.containerpool.kubernetes._
import whisk.core.entity.ByteSize
import whisk.core.entity.size._
+import whisk.core.containerpool.Container.ACTIVATION_LOG_SENTINEL
import scala.collection.mutable
import scala.collection.immutable
@@ -92,13 +93,13 @@ class KubernetesClientTests
behavior of "KubernetesClient"
- val firstLog = """2018-02-06T00:00:18.419889342Z first activation
- |2018-02-06T00:00:18.419929471Z XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
- |2018-02-06T00:00:18.419988733Z XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
+ val firstLog = s"""2018-02-06T00:00:18.419889342Z first activation
+ |2018-02-06T00:00:18.419929471Z $ACTIVATION_LOG_SENTINEL
+ |2018-02-06T00:00:18.419988733Z $ACTIVATION_LOG_SENTINEL
|""".stripMargin
- val secondLog = """2018-02-06T00:09:35.38267193Z second activation
- |2018-02-06T00:09:35.382990278Z XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
- |2018-02-06T00:09:35.383116503Z XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
+ val secondLog = s"""2018-02-06T00:09:35.38267193Z second activation
+ |2018-02-06T00:09:35.382990278Z $ACTIVATION_LOG_SENTINEL
+ |2018-02-06T00:09:35.383116503Z $ACTIVATION_LOG_SENTINEL
|""".stripMargin
def firstSource(lastTimestamp: Option[Instant] = None): Source[TypedLogLine, Any] =
@@ -141,7 +142,7 @@ class KubernetesClientTests
val logs = awaitLogs(client.logs(container, None))
logs should have size 3
logs(0) shouldBe TypedLogLine("2018-02-06T00:00:18.419889342Z", "stdout", "first activation")
- logs(2) shouldBe TypedLogLine("2018-02-06T00:00:18.419988733Z", "stdout", "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX")
+ logs(2) shouldBe TypedLogLine("2018-02-06T00:00:18.419988733Z", "stdout", ACTIVATION_LOG_SENTINEL)
}
it should "return all logs after the one matching sinceTime" in {
@@ -156,7 +157,7 @@ class KubernetesClientTests
val logs = awaitLogs(client.logs(container, testDate))
logs should have size 3
logs(0) shouldBe TypedLogLine("2018-02-06T00:09:35.38267193Z", "stdout", "second activation")
- logs(2) shouldBe TypedLogLine("2018-02-06T00:09:35.383116503Z", "stdout", "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX")
+ logs(2) shouldBe TypedLogLine("2018-02-06T00:09:35.383116503Z", "stdout", ACTIVATION_LOG_SENTINEL)
}
it should "return all logs if none match sinceTime" in {
@@ -170,7 +171,7 @@ class KubernetesClientTests
val logs = awaitLogs(client.logs(container, testDate))
logs should have size 3
logs(0) shouldBe TypedLogLine("2018-02-06T00:09:35.38267193Z", "stdout", "second activation")
- logs(2) shouldBe TypedLogLine("2018-02-06T00:09:35.383116503Z", "stdout", "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX")
+ logs(2) shouldBe TypedLogLine("2018-02-06T00:09:35.383116503Z", "stdout", ACTIVATION_LOG_SENTINEL)
}
}
diff --git a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
index 2354bc9..4214cf7 100644
--- a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
@@ -500,8 +500,8 @@ object KubernetesContainerTests {
if (appendSentinel) {
val lastTime = log.lastOption.map { case TypedLogLine(time, _, _) => time }.getOrElse(Instant.EPOCH)
log :+
- TypedLogLine(lastTime, "stderr", s"${DockerContainer.ActivationSentinel.utf8String}") :+
- TypedLogLine(lastTime, "stdout", s"${DockerContainer.ActivationSentinel.utf8String}")
+ TypedLogLine(lastTime, "stderr", s"${Container.ACTIVATION_LOG_SENTINEL}") :+
+ TypedLogLine(lastTime, "stdout", s"${Container.ACTIVATION_LOG_SENTINEL}")
} else {
log
}