You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by cb...@apache.org on 2018/02/12 09:59:31 UTC

[incubator-openwhisk] branch master updated: Wait for logs based on intervals not based on total processing time. (#3273)

This is an automated email from the ASF dual-hosted git repository.

cbickel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new ebe7788  Wait for logs based on intervals not based on total processing time. (#3273)
ebe7788 is described below

commit ebe7788b5261bb845e43b74525fa2ad2cacbf80b
Author: Markus Thömmes <ma...@me.com>
AuthorDate: Mon Feb 12 10:59:28 2018 +0100

    Wait for logs based on intervals not based on total processing time. (#3273)
    
    Writing a large chunk of logs can take quite some time to process. The standard timeout for this process is 2 seconds today. It is bounded, because an action developer might break the action proxy to make sentinels not appear at all which would cause us to infinitely wait on sentinels.
    
    As we process logs after an activation has run though, we can safely rely on the time **between** two logs not exceeding a certain threshold. That way, the complete processing is not bounded by some arbitrary timeout (which can even be too short for large volumes) and is still tight enough to exit early if sentinels really are missing.
    
    Furthermore, an error line is inserted if this timeout hits to inform the user that something might've gone wrong.
---
 .../core/containerpool/docker/DockerContainer.scala      | 16 ++++++++++------
 .../containerpool/docker/test/DockerContainerTests.scala |  7 +++----
 2 files changed, 13 insertions(+), 10 deletions(-)

diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
index 265a450..5d3083c 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
@@ -18,6 +18,7 @@
 package whisk.core.containerpool.docker
 
 import java.time.Instant
+import java.util.concurrent.TimeoutException
 import java.util.concurrent.atomic.AtomicLong
 
 import akka.actor.ActorSystem
@@ -80,7 +81,7 @@ object DockerContainer {
                                                             as: ActorSystem,
                                                             ec: ExecutionContext,
                                                             log: Logging): Future[DockerContainer] = {
-    implicit val tid = transid
+    implicit val tid: TransactionId = transid
 
     val environmentArgs = environment.flatMap {
       case (key, value) => Seq("-e", s"$key=$value")
@@ -246,18 +247,21 @@ class DockerContainer(protected val id: ContainerId,
         size
       }
       .via(new CompleteAfterOccurrences(_.containsSlice(DockerContainer.ActivationSentinel), 2, waitForSentinel))
+      // As we're reading the logs after the activation has finished the invariant is that all loglines are already
+      // written and we mostly await them being flushed by the docker daemon. Therefore we can timeout based on the time
+      // between two loglines appear without relying on the log frequency in the action itself.
+      .idleTimeout(waitForLogs)
       .recover {
         case _: StreamLimitReachedException =>
           // While the stream has already ended by failing the limitWeighted stage above, we inject a truncation
           // notice downstream, which will be processed as usual. This will be the last element of the stream.
           ByteString(LogLine(Instant.now.toString, "stderr", Messages.truncateLogs(limit)).toJson.compactPrint)
-        case _: OccurrencesNotFoundException | _: FramingException =>
+        case _: OccurrencesNotFoundException | _: FramingException | _: TimeoutException =>
           // Stream has already ended and we insert a notice that data might be missing from the logs. While a
           // FramingException can also mean exceeding the limits, we cannot decide which case happened so we resort
           // to the general error message. This will be the last element of the stream.
           ByteString(LogLine(Instant.now.toString, "stderr", Messages.logFailure).toJson.compactPrint)
       }
-      .takeWithin(waitForLogs)
   }
 
   /** Delimiter used to split log-lines as written by the json-log-driver. */
@@ -279,9 +283,9 @@ class DockerContainer(protected val id: ContainerId,
  */
 class CompleteAfterOccurrences[T](isInEvent: T => Boolean, neededOccurrences: Int, errorOnNotEnough: Boolean)
     extends GraphStage[FlowShape[T, T]] {
-  val in = Inlet[T]("WaitForOccurances.in")
-  val out = Outlet[T]("WaitForOccurances.out")
-  override val shape = FlowShape.of(in, out)
+  val in: Inlet[T] = Inlet[T]("WaitForOccurrences.in")
+  val out: Outlet[T] = Outlet[T]("WaitForOccurrences.out")
+  override val shape: FlowShape[T, T] = FlowShape.of(in, out)
 
   override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
     new GraphStageLogic(shape) with InHandler with OutHandler {
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
index 5f9898e..ea77810 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
@@ -34,7 +34,6 @@ import org.scalamock.scalatest.MockFactory
 import org.scalatest.BeforeAndAfterEach
 import org.scalatest.FlatSpec
 import whisk.core.containerpool.logging.{DockerToActivationLogStore, LogLine}
-
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.Matchers
 import common.{StreamLogging, WskActorSystem}
@@ -49,7 +48,6 @@ import whisk.core.entity.ActivationResponse.ContainerResponse
 import whisk.core.entity.ActivationResponse.Timeout
 import whisk.core.entity.size._
 import whisk.http.Messages
-
 import whisk.core.entity.size._
 
 /**
@@ -583,8 +581,9 @@ class DockerContainerTests
 
     docker.rawContainerLogsInvocations should have size 1
 
-    processedLog should have size expectedLog.length
-    processedLog shouldBe expectedLog.map(_.toFormattedString)
+    processedLog should have size expectedLog.length + 1 //error log should be appended
+    processedLog.head shouldBe expectedLog.head.toFormattedString
+    processedLog(1) should include(Messages.logFailure)
   }
 
   it should "truncate logs and advance reading position to end of current read" in {

-- 
To stop receiving notification emails like this one, please contact
cbickel@apache.org.