You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2017/11/25 14:09:50 UTC

[GitHub] rabbah closed pull request #2878: Streamingly read user-logs.

rabbah closed pull request #2878: Streamingly read user-logs.
URL: https://github.com/apache/incubator-openwhisk/pull/2878
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 145b61a92e..8777f47d64 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -13,9 +13,9 @@ dependencies {
 
     compile 'io.spray:spray-json_2.11:1.3.3'
 
-    compile 'com.typesafe.akka:akka-actor_2.11:2.5.4'
-    compile 'com.typesafe.akka:akka-stream_2.11:2.5.4'
-    compile 'com.typesafe.akka:akka-slf4j_2.11:2.5.4'
+    compile 'com.typesafe.akka:akka-actor_2.11:2.5.6'
+    compile 'com.typesafe.akka:akka-stream_2.11:2.5.6'
+    compile 'com.typesafe.akka:akka-slf4j_2.11:2.5.6'
     compile 'com.typesafe.akka:akka-http-core_2.11:10.0.10'
     compile 'com.typesafe.akka:akka-http-spray-json_2.11:10.0.10'
 
diff --git a/common/scala/src/main/scala/whisk/common/Logging.scala b/common/scala/src/main/scala/whisk/common/Logging.scala
index 2b971b0506..e63c824f30 100644
--- a/common/scala/src/main/scala/whisk/common/Logging.scala
+++ b/common/scala/src/main/scala/whisk/common/Logging.scala
@@ -241,6 +241,9 @@ object LoggingMarkers {
   // Time that is needed to init the action
   val INVOKER_ACTIVATION_INIT = LogMarkerToken(invoker, "activationInit", start)
 
+  // Time needed to collect the logs
+  val INVOKER_COLLECT_LOGS = LogMarkerToken(invoker, "collectLogs", start)
+
   // Time in invoker
   val INVOKER_ACTIVATION = LogMarkerToken(invoker, activation, start)
   def INVOKER_DOCKER_CMD(cmd: String) = LogMarkerToken(invoker, s"docker.$cmd", start)
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/Container.scala b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
index 2e913abad5..fae0f2d8b5 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
@@ -18,6 +18,10 @@
 package whisk.core.containerpool
 
 import java.time.Instant
+
+import akka.stream.scaladsl.Source
+import akka.util.ByteString
+
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.duration.Duration
@@ -66,7 +70,7 @@ trait Container {
   def resume()(implicit transid: TransactionId): Future[Unit]
 
   /** Obtains logs up to a given threshold from the container. Optionally waits for a sentinel to appear. */
-  def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId): Future[Vector[String]]
+  def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId): Source[ByteString, Any]
 
   /** Completely destroys this instance of the container. */
   def destroy()(implicit transid: TransactionId): Future[Unit] = {
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerLogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerLogStore.scala
index ed9393430c..64434ad027 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerLogStore.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerLogStore.scala
@@ -17,13 +17,38 @@
 
 package whisk.core.containerpool.logging
 
+import akka.NotUsed
 import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
+import akka.stream.scaladsl.Sink
+import akka.stream.scaladsl.Flow
+import akka.util.ByteString
 import whisk.common.TransactionId
 import whisk.core.containerpool.Container
 import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, WhiskActivation}
+import spray.json._
+import whisk.http.Messages
 
 import scala.concurrent.{ExecutionContext, Future}
 
+/**
+ * Represents a single log line as read from a docker log
+ */
+protected[core] case class LogLine(time: String, stream: String, log: String) {
+  def toFormattedString = f"$time%-30s $stream: ${log.trim}"
+}
+
+protected[core] object LogLine extends DefaultJsonProtocol {
+  implicit val serdes = jsonFormat3(LogLine.apply)
+}
+
+object DockerLogStore {
+
+  /** Transforms chunked JsObjects into formatted strings */
+  val toFormattedString: Flow[ByteString, String, NotUsed] =
+    Flow[ByteString].map(_.utf8String.parseJson.convertTo[LogLine].toFormattedString)
+}
+
 /**
  * Docker based implementation of a LogStore.
  *
@@ -33,6 +58,7 @@ import scala.concurrent.{ExecutionContext, Future}
  */
 class DockerLogStore(system: ActorSystem) extends LogStore {
   implicit val ec: ExecutionContext = system.dispatcher
+  implicit val mat: ActorMaterializer = ActorMaterializer()(system)
 
   /* "json-file" is the log-driver that writes out to file */
   override val containerParameters = Map("--log-driver" -> Set("json-file"))
@@ -43,7 +69,22 @@ class DockerLogStore(system: ActorSystem) extends LogStore {
   override def collectLogs(transid: TransactionId,
                            container: Container,
                            action: ExecutableWhiskAction): Future[ActivationLogs] = {
-    container.logs(action.limits.logs.asMegaBytes, action.exec.sentinelledLogs)(transid).map(ActivationLogs(_))
+
+    val possibleErrors = Set(Messages.logFailure, Messages.truncateLogs(action.limits.logs.asMegaBytes))
+
+    container
+      .logs(action.limits.logs.asMegaBytes, action.exec.sentinelledLogs)(transid)
+      .via(DockerLogStore.toFormattedString)
+      .runWith(Sink.seq)
+      .flatMap { seq =>
+        val errored = seq.lastOption.exists(last => possibleErrors.exists(last.contains))
+        val logs = ActivationLogs(seq.toVector)
+        if (!errored) {
+          Future.successful(logs)
+        } else {
+          Future.failed(LogCollectingException(logs))
+        }
+      }
   }
 }
 
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala
index bb7f8f5e13..7df2f2cad5 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala
@@ -74,3 +74,6 @@ trait LogStore {
 trait LogStoreProvider extends Spi {
   def logStore(actorSystem: ActorSystem): LogStore
 }
+
+/** Indicates reading logs has failed either terminally or truncated logs */
+case class LogCollectingException(partialLogs: ActivationLogs) extends Exception("Failed to read logs")
diff --git a/common/scala/src/main/scala/whisk/http/ErrorResponse.scala b/common/scala/src/main/scala/whisk/http/ErrorResponse.scala
index 7486cd94e9..0feadc5c58 100644
--- a/common/scala/src/main/scala/whisk/http/ErrorResponse.scala
+++ b/common/scala/src/main/scala/whisk/http/ErrorResponse.scala
@@ -138,6 +138,7 @@ object Messages {
   def truncateLogs(limit: ByteSize) = {
     s"Logs were truncated because the total bytes size exceeds the limit of ${limit.toBytes} bytes."
   }
+  val logFailure = "There was an issue while collecting your logs. Data might be missing."
 
   /** Error for meta api. */
   val propertyNotFound = "Response does not include requested property."
diff --git a/core/invoker/build.gradle b/core/invoker/build.gradle
index e6a41db548..93dd8f7468 100644
--- a/core/invoker/build.gradle
+++ b/core/invoker/build.gradle
@@ -13,7 +13,9 @@ repositories {
 dependencies {
     compile "org.scala-lang:scala-library:${gradle.scala.version}"
     compile project(':common:scala')
+
     compile 'org.apache.curator:curator-recipes:2.12.0'
+    compile 'com.lightbend.akka:akka-stream-alpakka-file_2.11:0.14'
 }
 
 tasks.withType(ScalaCompile) {
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
index a1b9658650..a626a7f164 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
@@ -23,7 +23,6 @@ import scala.concurrent.Future
 import scala.concurrent.duration._
 import scala.util.Success
 import scala.util.Failure
-
 import akka.actor.FSM
 import akka.actor.Props
 import akka.actor.Stash
@@ -31,13 +30,12 @@ import akka.actor.Status.{Failure => FailureMessage}
 import akka.pattern.pipe
 import spray.json._
 import spray.json.DefaultJsonProtocol._
-import whisk.common.TransactionId
+import whisk.common.{AkkaLogging, Counter, LoggingMarkers, TransactionId}
 import whisk.core.connector.ActivationMessage
+import whisk.core.containerpool.logging.LogCollectingException
 import whisk.core.entity._
 import whisk.core.entity.size._
-import whisk.common.Counter
 import whisk.core.entity.ExecManifest.ImageName
-import whisk.common.AkkaLogging
 import whisk.http.Messages
 
 // States
@@ -362,28 +360,36 @@ class ContainerProxy(factory: (TransactionId, String, ImageName, Boolean, ByteSi
             ActivationResponse.whiskError(Messages.abnormalRun))
       }
 
-    // Sending active ack and storing the activation are concurrent side-effects
-    // and do not block further execution of the future. They are completely
-    // asynchronous.
-    activation
-      .andThen {
-        // the activation future will always complete with Success
-        case Success(ack) => sendActiveAck(tid, ack, job.msg.blocking, job.msg.rootControllerIndex)
-      }
-      .flatMap { activation =>
-        collectLogs(tid, container, job.action).map { logs =>
-          activation.withLogs(logs)
-        }
-      }
-      .andThen {
-        case Success(activation) => storeActivation(tid, activation)
-      }
+    // Sending active ack. Entirely asynchronous and not waited upon.
+    activation.foreach(sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex))
+
+    // Adds logs to the raw activation.
+    val activationWithLogs: Future[Either[ActivationLogReadingError, WhiskActivation]] = activation
       .flatMap { activation =>
-        // Fail the future iff the activation was unsuccessful to facilitate
-        // better cleanup logic.
-        if (activation.response.isSuccess) Future.successful(activation)
-        else Future.failed(ActivationUnsuccessfulError(activation))
+        val start = tid.started(this, LoggingMarkers.INVOKER_COLLECT_LOGS)
+        collectLogs(tid, container, job.action)
+          .andThen {
+            case Success(_) => tid.finished(this, start)
+            case Failure(t) => tid.failed(this, start, s"reading logs failed: $t")
+          }
+          .map(logs => Right(activation.withLogs(logs)))
+          .recover {
+            case LogCollectingException(logs) =>
+              Left(ActivationLogReadingError(activation.withLogs(logs)))
+            case _ =>
+              Left(ActivationLogReadingError(activation.withLogs(ActivationLogs(Vector(Messages.logFailure)))))
+          }
       }
+
+    // Storing the record. Entirely asynchronous and not waited upon.
+    activationWithLogs.map(_.fold(_.activation, identity)).foreach(storeActivation(tid, _))
+
+    // Disambiguate activation errors and transform the Either into a failed/successful Future respectively.
+    activationWithLogs.flatMap {
+      case Right(act) if !act.response.isSuccess => Future.failed(ActivationUnsuccessfulError(act))
+      case Left(error)                           => Future.failed(error)
+      case Right(act)                            => Future.successful(act)
+    }
   }
 }
 
@@ -440,6 +446,13 @@ object ContainerProxy {
   }
 }
 
+/** Indicates that something went wrong with an activation and the container should be removed */
+trait ActivationError extends Exception {
+  val activation: WhiskActivation
+}
+
 /** Indicates an activation with a non-successful response */
-case class ActivationUnsuccessfulError(activation: WhiskActivation)
-    extends Exception(s"activation ${activation.activationId} failed")
+case class ActivationUnsuccessfulError(activation: WhiskActivation) extends ActivationError
+
+/** Indicates reading logs for an activation failed (terminally, truncated) */
+case class ActivationLogReadingError(activation: WhiskActivation) extends ActivationError
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerActionLogDriver.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerActionLogDriver.scala
deleted file mode 100644
index f8a3f83f7f..0000000000
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerActionLogDriver.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.containerpool.docker
-
-import java.nio.charset.StandardCharsets
-import scala.util.{Failure, Success, Try}
-import spray.json._
-import spray.json.DefaultJsonProtocol
-import whisk.common.{Logging, TransactionId}
-import whisk.core.entity._
-import whisk.core.entity.size.{SizeInt, SizeString}
-import scala.collection.mutable.Buffer
-import whisk.http.Messages
-
-/**
- * Represents a single log line as read from a docker log
- */
-protected[core] case class LogLine(time: String, stream: String, log: String) {
-  def toFormattedString = f"$time%-30s $stream: ${log.trim}"
-  def dropRight(maxBytes: ByteSize) = {
-    val bytes = log.getBytes(StandardCharsets.UTF_8).dropRight(maxBytes.toBytes.toInt)
-    LogLine(time, stream, new String(bytes, StandardCharsets.UTF_8))
-  }
-}
-
-protected[core] object LogLine extends DefaultJsonProtocol {
-  implicit val serdes = jsonFormat3(LogLine.apply)
-}
-
-protected[core] object DockerActionLogDriver {
-  // The action proxies inserts this line in the logs at the end of each activation for stdout/stderr
-  val LOG_ACTIVATION_SENTINEL = "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX"
-}
-
-protected[core] trait DockerActionLogDriver {
-
-  /**
-   * Given the JSON driver's raw output of a docker container, convert it into our own
-   * JSON format. If asked, check for sentinel markers (which are not included in the output).
-   *
-   * Only parses and returns so much logs to fit into the LogLimit passed.
-   *
-   * @param logMsgs raw String read from a JSON log-driver written file
-   * @param requireSentinel determines if the processor should wait for a sentinel to appear
-   * @param limit the limit to apply to the log size
-   *
-   * @return Tuple containing (isComplete, isTruncated, logs)
-   */
-  protected def processJsonDriverLogContents(logMsgs: String, requireSentinel: Boolean, limit: ByteSize)(
-    implicit transid: TransactionId,
-    logging: Logging): (Boolean, Boolean, Vector[String]) = {
-
-    var hasOut = false
-    var hasErr = false
-    var truncated = false
-    var bytesSoFar = 0.B
-    val logLines = Buffer[String]()
-    val lines = logMsgs.lines
-
-    // read whiles bytesSoFar <= limit when requireSentinel to try and grab sentinel if they exist to indicate completion
-    while (lines.hasNext && ((requireSentinel && bytesSoFar <= limit) || bytesSoFar < limit)) {
-      // if line does not parse, there's an error in the container log driver
-      Try(lines.next().parseJson.convertTo[LogLine]) match {
-        case Success(t) =>
-          // if sentinels are expected, do not account for their size, otherwise, all bytes are accounted for
-          if (requireSentinel && t.log.trim != DockerActionLogDriver.LOG_ACTIVATION_SENTINEL || !requireSentinel) {
-            // ignore empty log lines
-            if (t.log.nonEmpty) {
-              bytesSoFar += t.log.sizeInBytes
-              if (bytesSoFar <= limit) {
-                logLines.append(t.toFormattedString)
-              } else {
-                // chop off the right most bytes that overflow
-                val chopped = t.dropRight(bytesSoFar - limit)
-                if (chopped.log.nonEmpty) {
-                  logLines.append(chopped.toFormattedString)
-                }
-                truncated = true
-              }
-            }
-          } else if (requireSentinel) {
-            // there may be more than one sentinel in stdout/stderr (as logs may leak across activations
-            // if for example there log limit was exceeded in one activation and the container was reused
-            // to run the same action again (this is considered a feature - otherwise, must drain the logs
-            // or destroy the container as if it errored)
-            if (t.stream == "stdout") {
-              hasOut = true
-            } else if (t.stream == "stderr") {
-              hasErr = true
-            }
-          }
-
-        case Failure(t) =>
-          // Drop lines that did not parse to JSON objects.
-          // However, should not happen since we are using the json log driver.
-          logging.error(this, s"log line skipped/did not parse: $t")
-      }
-    }
-
-    if (lines.hasNext) truncated = true
-    if (truncated) logLines.append(Messages.truncateLogs(limit))
-
-    ((hasOut && hasErr) || !requireSentinel, truncated, logLines.toVector)
-  }
-}
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClientWithFileAccess.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClientWithFileAccess.scala
index 72e1eb9b38..1644e2e183 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClientWithFileAccess.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClientWithFileAccess.scala
@@ -18,14 +18,15 @@
 package whisk.core.containerpool.docker
 
 import java.io.File
-import java.io.FileInputStream
-import java.io.IOException
-import java.nio.ByteBuffer
 import java.nio.file.Paths
+
+import akka.stream.alpakka.file.scaladsl.FileTailSource
+import akka.stream.scaladsl.{FileIO, Source => AkkaSource}
+import akka.util.ByteString
+
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.blocking
-import scala.io.Source
 import spray.json.DefaultJsonProtocol._
 import spray.json._
 import whisk.common.Logging
@@ -33,6 +34,9 @@ import whisk.common.TransactionId
 import whisk.core.containerpool.ContainerId
 import whisk.core.containerpool.ContainerAddress
 
+import scala.io.Source
+import scala.concurrent.duration.FiniteDuration
+
 class DockerClientWithFileAccess(
   dockerHost: Option[String] = None,
   containersDirectory: File = Paths.get("containers").toFile)(executionContext: ExecutionContext)(implicit log: Logging)
@@ -135,62 +139,33 @@ class DockerClientWithFileAccess(
       .map(_.fields("State").asJsObject.fields("OOMKilled").convertTo[Boolean])
       .recover { case _ => false }
 
-  // See extended trait for description
-  def rawContainerLogs(containerId: ContainerId, fromPos: Long): Future[ByteBuffer] = Future {
-    blocking { // Needed due to synchronous file operations
-      var fis: FileInputStream = null
-      try {
-        val file = containerLogFile(containerId)
-        val size = file.length
-
-        fis = new FileInputStream(file)
-        val channel = fis.getChannel().position(fromPos)
-
-        // Buffer allocation may fail if the log file is too large to hold in memory or
-        // too few space is left on the heap, respectively.
-        var remainingBytes = (size - fromPos).toInt
-        val readBuffer = ByteBuffer.allocate(remainingBytes)
-
-        while (remainingBytes > 0) {
-          val readBytes = channel.read(readBuffer)
-          if (readBytes > 0) {
-            remainingBytes -= readBytes
-          } else if (readBytes < 0) {
-            remainingBytes = 0
-          }
-        }
-
-        readBuffer
-      } catch {
-        case e: Exception =>
-          throw new IOException(s"rawContainerLogs failed on ${containerId}", e)
-      } finally {
-        if (fis != null) fis.close()
+  private val readChunkSize = 8192 // bytes
+  override def rawContainerLogs(containerId: ContainerId,
+                                fromPos: Long,
+                                pollInterval: Option[FiniteDuration]): AkkaSource[ByteString, Any] =
+    try {
+      // If there is no waiting interval, we can end the stream early by reading just what is there from file.
+      pollInterval match {
+        case Some(interval) => FileTailSource(containerLogFile(containerId).toPath, readChunkSize, fromPos, interval)
+        case None           => FileIO.fromPath(containerLogFile(containerId).toPath, readChunkSize, fromPos)
       }
+    } catch {
+      case t: Throwable => AkkaSource.failed(t)
     }
-  }
 }
 
 trait DockerApiWithFileAccess extends DockerApi {
 
   /**
-   * Obtains the container's stdout and stderr by reading the internal docker log file
-   * for the container. Said file is written by docker's JSON log driver and has
-   * a "well-known" location and name.
-   *
-   * Reads the log file from the specified position to its end. The returned ByteBuffer
-   * indicates how many bytes were actually read from the file.
-   *
-   * Attention: a ByteBuffer is allocated to keep the file from the specified position to its end
-   * fully in memory. At the moment, there is no size limit checking which can lead to
-   * out-of-memory exceptions for very large files.
-   *
-   * Deals with incomplete reads and premature end of file situations. Behavior is undefined
-   * if the log file is changed or truncated while reading.
+   * Reads logs from the container written json-log file and returns them
+   * streamingly in bytes.
    *
-   * @param containerId the container for which to provide logs
-   * @param fromPos position where to start reading the container's log file
-   * @return a ByteBuffer holding the read log file contents
+   * @param containerId id of the container to get the logs for
+   * @param fromPos position to start to read in the file
+   * @param pollInterval interval to poll for changes of the file
+   * @return a source emitting chunks read from the log-file
    */
-  def rawContainerLogs(containerId: ContainerId, fromPos: Long): Future[ByteBuffer]
+  def rawContainerLogs(containerId: ContainerId,
+                       fromPos: Long,
+                       pollInterval: Option[FiniteDuration]): AkkaSource[ByteString, Any]
 }
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
index 4827eb5843..f24b35b81e 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
@@ -17,25 +17,38 @@
 
 package whisk.core.containerpool.docker
 
-import java.nio.charset.StandardCharsets
 import java.time.Instant
+import java.util.concurrent.atomic.AtomicLong
 
 import akka.actor.ActorSystem
+import akka.stream._
+import akka.stream.scaladsl.Framing.FramingException
 import spray.json._
 
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
+import scala.concurrent.{ExecutionContext, Future}
 import scala.concurrent.duration._
-import scala.util.Failure
 import whisk.common.Logging
 import whisk.common.TransactionId
 import whisk.core.containerpool._
 import whisk.core.entity.ActivationResponse.{ConnectionError, MemoryExhausted}
 import whisk.core.entity.ByteSize
 import whisk.core.entity.size._
+import akka.stream.scaladsl.{Framing, Source}
+import akka.stream.stage._
+import akka.util.ByteString
+import spray.json._
+import whisk.core.containerpool.logging.LogLine
+import whisk.http.Messages
 
 object DockerContainer {
 
+  /**
+   * The action proxies insert this line in the logs at the end of each activation for stdout/stderr
+   *
+   * Note: Blackbox containers might not add this sentinel, as we cannot be sure the action developer actually does this.
+   */
+  val ActivationSentinel = ByteString("XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX")
+
   /**
    * Creates a container running on a docker daemon.
    *
@@ -135,15 +148,14 @@ class DockerContainer(protected val id: ContainerId,
                                                       as: ActorSystem,
                                                       protected val ec: ExecutionContext,
                                                       protected val logging: Logging)
-    extends Container
-    with DockerActionLogDriver {
+    extends Container {
 
   /** The last read-position in the log file */
-  private var logFileOffset = 0L
+  private var logFileOffset = new AtomicLong(0)
 
   protected val waitForLogs: FiniteDuration = 2.seconds
   protected val waitForOomState: FiniteDuration = 2.seconds
-  protected val filePollInterval: FiniteDuration = 100.milliseconds
+  protected val filePollInterval: FiniteDuration = 5.milliseconds
 
   def suspend()(implicit transid: TransactionId): Future[Unit] =
     if (useRunc) { runc.pause(id) } else { docker.pause(id) }
@@ -215,40 +227,97 @@ class DockerContainer(protected val id: ContainerId,
    * the result returned from this method.
    *
    * Only parses and returns as much logs as fit in the passed log limit.
-   * Even if the log limit is exceeded, advance the starting position for the next invocation
-   * behind the bytes most recently read - but don't actively read any more until sentinel
-   * markers have been found.
    *
    * @param limit the limit to apply to the log size
    * @param waitForSentinel determines if the processor should wait for a sentinel to appear
    *
    * @return a vector of Strings with log lines in our own JSON format
    */
-  def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId): Future[Vector[String]] = {
-
-    def readLogs(retries: Int): Future[Vector[String]] = {
-      docker
-        .rawContainerLogs(id, logFileOffset)
-        .flatMap { rawLogBytes =>
-          val rawLog =
-            new String(rawLogBytes.array, rawLogBytes.arrayOffset, rawLogBytes.position, StandardCharsets.UTF_8)
-          val (isComplete, isTruncated, formattedLogs) = processJsonDriverLogContents(rawLog, waitForSentinel, limit)
-
-          if (retries > 0 && !isComplete && !isTruncated) {
-            logging.info(this, s"log cursor advanced but missing sentinel, trying $retries more times")
-            akka.pattern.after(filePollInterval, as.scheduler)(readLogs(retries - 1))
+  def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId): Source[ByteString, Any] = {
+    docker
+      .rawContainerLogs(id, logFileOffset.get(), if (waitForSentinel) Some(filePollInterval) else None)
+      // This stage only throws 'FramingException' so we cannot decide whether we got truncated due to a size
+      // constraint (like StreamLimitReachedException below) or due to the file being truncated itself.
+      .via(Framing.delimiter(delimiter, limit.toBytes.toInt))
+      .limitWeighted(limit.toBytes) { obj =>
+        // Adding + 1 since we know there's a newline byte being read
+        val size = obj.size + 1
+        logFileOffset.addAndGet(size)
+        size
+      }
+      .via(new CompleteAfterOccurrences(_.containsSlice(DockerContainer.ActivationSentinel), 2, waitForSentinel))
+      .recover {
+        case _: StreamLimitReachedException =>
+          // While the stream has already ended by failing the limitWeighted stage above, we inject a truncation
+          // notice downstream, which will be processed as usual. This will be the last element of the stream.
+          ByteString(LogLine(Instant.now.toString, "stderr", Messages.truncateLogs(limit)).toJson.compactPrint)
+        case _: OccurrencesNotFoundException | _: FramingException =>
+          // Stream has already ended and we insert a notice that data might be missing from the logs. While a
+          // FramingException can also mean exceeding the limits, we cannot decide which case happened so we resort
+          // to the general error message. This will be the last element of the stream.
+          ByteString(LogLine(Instant.now.toString, "stderr", Messages.logFailure).toJson.compactPrint)
+      }
+      .takeWithin(waitForLogs)
+  }
+
+  /** Delimiter used to split log-lines as written by the json-log-driver. */
+  private val delimiter = ByteString("\n")
+}
+
+/**
+ * Completes the stream once the given predicate is fulfilled by N events in the stream.
+ *
+ * '''Emits when''' an upstream element arrives and does not fulfill the predicate
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes or predicate is fulfilled N times
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * '''Errors when''' stream completes, not enough occurrences have been found and errorOnNotEnough is true
+ */
+class CompleteAfterOccurrences[T](isInEvent: T => Boolean, neededOccurrences: Int, errorOnNotEnough: Boolean)
+    extends GraphStage[FlowShape[T, T]] {
+  val in = Inlet[T]("WaitForOccurances.in")
+  val out = Outlet[T]("WaitForOccurances.out")
+  override val shape = FlowShape.of(in, out)
+
+  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+    new GraphStageLogic(shape) with InHandler with OutHandler {
+      private var occurrencesFound = 0
+
+      override def onPull(): Unit = pull(in)
+
+      override def onPush(): Unit = {
+        val element = grab(in)
+        val isOccurrence = isInEvent(element)
+
+        if (isOccurrence) occurrencesFound += 1
+
+        if (occurrencesFound >= neededOccurrences) {
+          completeStage()
+        } else {
+          if (isOccurrence) {
+            pull(in)
           } else {
-            logFileOffset += rawLogBytes.position - rawLogBytes.arrayOffset
-            Future.successful(formattedLogs)
+            push(out, element)
           }
         }
-        .andThen {
-          case Failure(e) =>
-            logging.error(this, s"Failed to obtain logs of ${id.asString}: ${e.getClass} - ${e.getMessage}")
-        }
-    }
+      }
 
-    readLogs((waitForLogs / filePollInterval).toInt)
-  }
+      override def onUpstreamFinish(): Unit = {
+        if (occurrencesFound >= neededOccurrences || !errorOnNotEnough) {
+          completeStage()
+        } else {
+          failStage(OccurrencesNotFoundException(neededOccurrences, occurrencesFound))
+        }
+      }
 
+      setHandlers(in, out, this)
+    }
 }
+
+/** Indicates that Occurrences have not been found in the stream */
+case class OccurrencesNotFoundException(neededCount: Int, actualCount: Int)
+    extends RuntimeException(s"Only found $actualCount out of $neededCount occurrences.")
diff --git a/tests/src/test/scala/common/TimingHelpers.scala b/tests/src/test/scala/common/TimingHelpers.scala
new file mode 100644
index 0000000000..d7392221f1
--- /dev/null
+++ b/tests/src/test/scala/common/TimingHelpers.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package common
+
+import java.time.Instant
+import scala.concurrent.duration._
+
+trait TimingHelpers {
+  def between(start: Instant, end: Instant): Duration =
+    Duration.fromNanos(java.time.Duration.between(start, end).toNanos)
+
+  def durationOf[A](block: => A): (Duration, A) = {
+    val start = Instant.now
+    val value = block
+    val end = Instant.now
+    (between(start, end), value)
+  }
+}
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/ActionLogDriverTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/ActionLogDriverTests.scala
deleted file mode 100644
index 6847214a14..0000000000
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/ActionLogDriverTests.scala
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.containerpool.docker.test
-
-import java.nio.charset.StandardCharsets
-
-import org.junit.runner.RunWith
-import org.scalatest.BeforeAndAfter
-import org.scalatest.FlatSpec
-import org.scalatest.Matchers
-import org.scalatest.junit.JUnitRunner
-
-import common.StreamLogging
-import spray.json.pimpAny
-import whisk.common.TransactionId
-import whisk.core.entity.size._
-import whisk.http.Messages
-import whisk.core.containerpool.docker.DockerActionLogDriver
-import whisk.core.containerpool.docker.LogLine
-
-@RunWith(classOf[JUnitRunner])
-class ActionLogDriverTests
-    extends FlatSpec
-    with BeforeAndAfter
-    with Matchers
-    with DockerActionLogDriver
-    with StreamLogging {
-
-  private def makeLogMsgs(lines: Seq[String], stream: String = "stdout", addSentinel: Boolean = true) = {
-    val msgs = if (addSentinel) {
-      lines.map((stream, _)) :+
-        ("stdout", s"${DockerActionLogDriver.LOG_ACTIVATION_SENTINEL}") :+
-        ("stderr", s"${DockerActionLogDriver.LOG_ACTIVATION_SENTINEL}")
-    } else {
-      lines.map((stream, _))
-    }
-
-    msgs
-      .map(p => LogLine("", p._1, p._2).toJson.compactPrint)
-      .mkString("\n")
-  }
-
-  private def makeLogLines(lines: Seq[String], stream: String = "stdout") = {
-    lines.map(LogLine("", stream, _)).filter(_.log.nonEmpty).map(_.toFormattedString).toVector
-  }
-
-  behavior of "LogLine"
-
-  it should "truncate log line" in {
-    "?".sizeInBytes shouldBe 3.B
-
-    Seq("abcdef", "? ? ?").foreach { logline =>
-      val bytes = logline.sizeInBytes
-      LogLine("", "", logline).dropRight(0.B).log shouldBe logline
-      LogLine("", "", logline).dropRight(1.B).log shouldBe {
-        val truncated = logline.getBytes(StandardCharsets.UTF_8).dropRight(1)
-        new String(truncated, StandardCharsets.UTF_8)
-      }
-    }
-  }
-
-  behavior of "ActionLogDriver"
-
-  it should "mock container log drain" in {
-    makeLogMsgs(Seq("a", "b", "c")) shouldBe {
-      raw"""|{"time":"","stream":"stdout","log":"a"}
-                  |{"time":"","stream":"stdout","log":"b"}
-                  |{"time":"","stream":"stdout","log":"c"}
-                  |{"time":"","stream":"stdout","log":"${DockerActionLogDriver.LOG_ACTIVATION_SENTINEL}"}
-                  |{"time":"","stream":"stderr","log":"${DockerActionLogDriver.LOG_ACTIVATION_SENTINEL}"}"""
-        .stripMargin('|')
-    }
-  }
-
-  it should "handle empty logs" in {
-    implicit val tid = TransactionId.testing
-    processJsonDriverLogContents("", true, 0.B) shouldBe {
-      (false, false, Vector())
-    }
-
-    processJsonDriverLogContents("", false, 0.B) shouldBe {
-      (true, false, Vector())
-    }
-  }
-
-  it should "not truncate logs within limit" in {
-    implicit val tid = TransactionId.testing
-
-    Seq((Seq("\n"), 1), (Seq("a"), 1), (Seq("?"), 3), (Seq("", "a", "?"), 4), (Seq("abc\n", "abc\n"), 8))
-      .foreach {
-        case (msgs, l) =>
-          Seq(false).foreach { sentinel =>
-            processJsonDriverLogContents(makeLogMsgs(msgs, addSentinel = sentinel), sentinel, l.B) shouldBe {
-              (true, false, makeLogLines(msgs))
-            }
-          }
-      }
-  }
-
-  it should "account for sentinels when logs are not from a sentinelled action runtime" in {
-    implicit val tid = TransactionId.testing
-
-    Seq((Seq(""), 0), (Seq("\n"), 1), (Seq("a"), 1), (Seq("?"), 3), (Seq("", "a", "?"), 4), (Seq("abc\n", "abc\n"), 8))
-      .foreach {
-        case (msgs, l) =>
-          processJsonDriverLogContents(makeLogMsgs(msgs, addSentinel = true), false, l.B) shouldBe {
-            (true, true, makeLogLines(msgs) ++ Vector(Messages.truncateLogs(l.B)))
-          }
-      }
-  }
-
-  it should "truncate logs exceeding limit" in {
-    implicit val tid = TransactionId.testing
-
-    Seq(
-      (Seq("\n"), Seq(), 0),
-      (Seq("a"), Seq(), 0),
-      (Seq("ab"), Seq("a"), 1),
-      (Seq("?"), Seq("?"), 1),
-      (Seq("?"), Seq("?"), 2),
-      (Seq("abc\n", "abc\n", "abc\n"), Seq("abc\n", "abc\n"), 8))
-      .foreach {
-        case (msgs, exp, l) =>
-          Seq(true, false).foreach { sentinel =>
-            processJsonDriverLogContents(makeLogMsgs(msgs, addSentinel = sentinel), sentinel, l.B) shouldBe {
-              (!sentinel, true, makeLogLines(exp) ++ Vector(Messages.truncateLogs(l.B)))
-            }
-          }
-      }
-  }
-}
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientWithFileAccessTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientWithFileAccessTests.scala
index 8b3a060850..895461d2e1 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientWithFileAccessTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientWithFileAccessTests.scala
@@ -18,9 +18,6 @@
 package whisk.core.containerpool.docker.test
 
 import java.io.File
-import java.io.FileWriter
-import java.io.IOException
-import java.nio.charset.StandardCharsets
 
 import scala.concurrent.Await
 import scala.concurrent.ExecutionContext
@@ -35,7 +32,6 @@ import org.scalatest.BeforeAndAfterEach
 import org.scalatest.FlatSpec
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.Matchers
-import org.scalatest.fixture.{FlatSpec => FixtureFlatSpec}
 
 import common.StreamLogging
 import spray.json._
@@ -143,107 +139,3 @@ class DockerClientWithFileAccessTestsOom extends FlatSpec with Matchers with Str
     await(dc.isOomKilled(id)) shouldBe false
   }
 }
-
-/**
- * The file access tests use fixtures (org.scalatest.fixture.FlatSpec) in contrast to
- * the IP address related tests. For this reason, the file access tests are in a separate
- * test suite.
- *
- * This test uses fixtures because they provide a good way for setup and cleanup(!) in a thread-safe
- * way such that tests could be run in parallel. In particular, this test suite creates
- * a temporary file for each test and cleans it up afterwards.
- */
-@RunWith(classOf[JUnitRunner])
-class DockerClientWithFileAccessTestsLogs
-    extends FixtureFlatSpec
-    with Matchers
-    with StreamLogging
-    with BeforeAndAfterEach {
-
-  override def beforeEach = stream.reset()
-
-  implicit val transid = TransactionId.testing
-
-  behavior of "DockerClientWithFileAccess - rawContainerLogs"
-
-  /** Returns a DockerClient with mocked results */
-  def dockerClient(logFile: File) = new DockerClientWithFileAccess()(global) {
-    override def containerLogFile(containerId: ContainerId) = logFile
-  }
-
-  def await[A](f: Future[A], timeout: FiniteDuration = 500.milliseconds) = Await.result(f, timeout)
-
-  /** The fixture parameter must be of type FixtureParam. This is hard-wired in fixture suits. */
-  case class FixtureParam(file: File, writer: FileWriter, docker: DockerClientWithFileAccess) {
-    def writeLogFile(content: String) = {
-      writer.write(content)
-      writer.flush()
-    }
-  }
-
-  /** This overridden method gets control for each test and actually invokes the test. */
-  override def withFixture(test: OneArgTest) = {
-    val file = File.createTempFile(this.getClass.getName, test.name.replaceAll("[^a-zA-Z0-9.-]", "_"))
-    val writer = new FileWriter(file)
-    val docker = dockerClient(file)
-
-    val fixture = FixtureParam(file, writer, docker)
-
-    try {
-      super.withFixture(test.toNoArgTest(fixture))
-    } finally {
-      writer.close()
-      file.delete()
-    }
-  }
-
-  val containerId = ContainerId("Id")
-
-  it should "tolerate an empty log file" in { fixture =>
-    val logText = ""
-    fixture.writeLogFile(logText)
-
-    val buffer = await(fixture.docker.rawContainerLogs(containerId, fromPos = 0))
-    val logContent = new String(buffer.array, buffer.arrayOffset, buffer.position, StandardCharsets.UTF_8)
-
-    logContent shouldBe logText
-    stream should have size 0
-  }
-
-  it should "read a full log file" in { fixture =>
-    val logText = "text"
-    fixture.writeLogFile(logText)
-
-    val buffer = await(fixture.docker.rawContainerLogs(containerId, fromPos = 0))
-    val logContent = new String(buffer.array, buffer.arrayOffset, buffer.position, StandardCharsets.UTF_8)
-
-    logContent shouldBe logText
-    stream should have size 0
-  }
-
-  it should "read a log file portion" in { fixture =>
-    val logText =
-      """Hey, dude-it'z true not sad
-              |Take a thrash song and make it better
-              |Admit it! Beatallica'z under your skin!
-              |So now begin to be a shredder""".stripMargin
-    val from = 66 // start at third line...
-    val expectedText = logText.substring(from)
-
-    fixture.writeLogFile(logText)
-
-    val buffer = await(fixture.docker.rawContainerLogs(containerId, fromPos = from))
-    val logContent = new String(buffer.array, buffer.arrayOffset, buffer.position, StandardCharsets.UTF_8)
-
-    logContent shouldBe expectedText
-    stream should have size 0
-  }
-
-  it should "provide an empty result on failure" in { fixture =>
-    fixture.writer.close()
-    fixture.file.delete()
-
-    an[IOException] should be thrownBy await(fixture.docker.rawContainerLogs(containerId, fromPos = 0))
-    stream should have size 0
-  }
-}
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
index 8ded76e0d3..1519d0916a 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
@@ -18,9 +18,13 @@
 package whisk.core.containerpool.docker.test
 
 import java.io.IOException
-import java.nio.ByteBuffer
-import java.nio.charset.StandardCharsets
 import java.time.Instant
+
+import akka.stream.ActorMaterializer
+import akka.stream.scaladsl.{Sink, Source}
+import akka.util.ByteString
+import common.TimingHelpers
+
 import scala.collection.mutable
 import scala.concurrent.Await
 import scala.concurrent.duration._
@@ -29,7 +33,8 @@ import org.junit.runner.RunWith
 import org.scalamock.scalatest.MockFactory
 import org.scalatest.BeforeAndAfterEach
 import org.scalatest.FlatSpec
-import org.scalatest.Inspectors._
+import whisk.core.containerpool.logging.{DockerLogStore, LogLine}
+
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.Matchers
 import common.{StreamLogging, WskActorSystem}
@@ -45,6 +50,8 @@ import whisk.core.entity.ActivationResponse.Timeout
 import whisk.core.entity.size._
 import whisk.http.Messages
 
+import whisk.core.entity.size._
+
 /**
  * Unit tests for ContainerPool schedule
  */
@@ -55,15 +62,22 @@ class DockerContainerTests
     with MockFactory
     with StreamLogging
     with BeforeAndAfterEach
-    with WskActorSystem {
+    with WskActorSystem
+    with TimingHelpers {
 
   override def beforeEach() = {
     stream.reset()
   }
 
+  implicit val materializer: ActorMaterializer = ActorMaterializer()
+
   /** Awaits the given future, throws the exception enclosed in Failure. */
   def await[A](f: Future[A], timeout: FiniteDuration = 500.milliseconds) = Await.result[A](f, timeout)
 
+  /** Reads logs into memory and awaits them */
+  def awaitLogs(source: Source[ByteString, Any], timeout: FiniteDuration = 500.milliseconds): Vector[String] =
+    Await.result(source.via(DockerLogStore.toFormattedString).runWith(Sink.seq[String]), timeout).toVector
+
   val containerId = ContainerId("id")
 
   /**
@@ -73,7 +87,7 @@ class DockerContainerTests
   def dockerContainer(id: ContainerId = containerId, addr: ContainerAddress = ContainerAddress("ip"))(
     ccRes: Future[RunResult] =
       Future.successful(RunResult(intervalOf(1.millisecond), Right(ContainerResponse(true, "", None)))),
-    retryCount: Int = 0)(implicit docker: DockerApiWithFileAccess, runc: RuncApi): DockerContainer = {
+    awaitLogs: FiniteDuration = 2.seconds)(implicit docker: DockerApiWithFileAccess, runc: RuncApi): DockerContainer = {
 
     new DockerContainer(id, addr, true) {
       override protected def callContainer(
@@ -83,7 +97,7 @@ class DockerContainerTests
         retry: Boolean = false)(implicit transid: TransactionId): Future[RunResult] = {
         ccRes
       }
-      override protected val waitForLogs = retryCount.milliseconds
+      override protected val waitForLogs = awaitLogs
       override protected val filePollInterval = 1.millisecond
     }
   }
@@ -421,47 +435,41 @@ class DockerContainerTests
   /*
    * LOGS
    */
-  def toByteBuffer(s: String): ByteBuffer = {
-    val bb = ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8))
-    // Set position behind provided string to simulate read - this is what FileChannel.read() does
-    // Otherwise position would be 0 indicating that buffer is empty
-    bb.position(bb.capacity())
-    bb
-  }
-
-  def toRawLog(log: Seq[LogLine], appendSentinel: Boolean = true): ByteBuffer = {
+  def toRawLog(log: Seq[LogLine], appendSentinel: Boolean = true): ByteString = {
     val appendedLog = if (appendSentinel) {
       val lastTime = log.lastOption.map { case LogLine(time, _, _) => time }.getOrElse(Instant.EPOCH.toString)
       log :+
-        LogLine(lastTime, "stderr", s"${DockerActionLogDriver.LOG_ACTIVATION_SENTINEL}\n") :+
-        LogLine(lastTime, "stdout", s"${DockerActionLogDriver.LOG_ACTIVATION_SENTINEL}\n")
+        LogLine(lastTime, "stderr", s"${DockerContainer.ActivationSentinel.utf8String}\n") :+
+        LogLine(lastTime, "stdout", s"${DockerContainer.ActivationSentinel.utf8String}\n")
     } else {
       log
     }
-    toByteBuffer(appendedLog.map(_.toJson.compactPrint).mkString("", "\n", "\n"))
+    ByteString(appendedLog.map(_.toJson.compactPrint).mkString("", "\n", "\n"))
   }
 
   it should "read a simple log with sentinel" in {
     val expectedLogEntry = LogLine(Instant.EPOCH.toString, "stdout", "This is a log entry.\n")
     val rawLog = toRawLog(Seq(expectedLogEntry), appendSentinel = true)
-    val readResults = mutable.Queue(rawLog)
 
     implicit val docker = new TestDockerClient {
-      override def rawContainerLogs(containerId: ContainerId, fromPos: Long): Future[ByteBuffer] = {
-        rawContainerLogsInvocations += ((containerId, fromPos))
-        Future.successful(readResults.dequeue())
+      override def rawContainerLogs(containerId: ContainerId,
+                                    fromPos: Long,
+                                    pollInterval: Option[FiniteDuration]): Source[ByteString, Any] = {
+        rawContainerLogsInvocations += ((containerId, fromPos, pollInterval))
+        Source.single(rawLog)
       }
     }
     implicit val runc = stub[RuncApi]
 
     val container = dockerContainer(id = containerId)()
     // Read with tight limit to verify that no truncation occurs
-    val processedLogs = await(container.logs(limit = expectedLogEntry.log.sizeInBytes, waitForSentinel = true))
+    val processedLogs = awaitLogs(container.logs(limit = rawLog.length.bytes, waitForSentinel = true))
 
     docker.rawContainerLogsInvocations should have size 1
-    val (id, fromPos) = docker.rawContainerLogsInvocations(0)
+    val (id, fromPos, pollInterval) = docker.rawContainerLogsInvocations(0)
     id shouldBe containerId
     fromPos shouldBe 0
+    pollInterval shouldBe 'defined
 
     processedLogs should have size 1
     processedLogs shouldBe Vector(expectedLogEntry.toFormattedString)
@@ -470,24 +478,26 @@ class DockerContainerTests
   it should "read a simple log without sentinel" in {
     val expectedLogEntry = LogLine(Instant.EPOCH.toString, "stdout", "This is a log entry.\n")
     val rawLog = toRawLog(Seq(expectedLogEntry), appendSentinel = false)
-    val readResults = mutable.Queue(rawLog)
 
     implicit val docker = new TestDockerClient {
-      override def rawContainerLogs(containerId: ContainerId, fromPos: Long): Future[ByteBuffer] = {
-        rawContainerLogsInvocations += ((containerId, fromPos))
-        Future.successful(readResults.dequeue())
+      override def rawContainerLogs(containerId: ContainerId,
+                                    fromPos: Long,
+                                    pollInterval: Option[FiniteDuration]): Source[ByteString, Any] = {
+        rawContainerLogsInvocations += ((containerId, fromPos, pollInterval))
+        Source.single(rawLog)
       }
     }
     implicit val runc = stub[RuncApi]
 
     val container = dockerContainer(id = containerId)()
     // Read without tight limit so that the full read result is processed
-    val processedLogs = await(container.logs(limit = 1.MB, waitForSentinel = false))
+    val processedLogs = awaitLogs(container.logs(limit = 1.MB, waitForSentinel = false))
 
     docker.rawContainerLogsInvocations should have size 1
-    val (id, fromPos) = docker.rawContainerLogsInvocations(0)
+    val (id, fromPos, pollInterval) = docker.rawContainerLogsInvocations(0)
     id shouldBe containerId
     fromPos shouldBe 0
+    pollInterval should not be 'defined
 
     processedLogs should have size 1
     processedLogs shouldBe Vector(expectedLogEntry.toFormattedString)
@@ -495,22 +505,22 @@ class DockerContainerTests
 
   it should "fail log reading if error occurs during file reading" in {
     implicit val docker = new TestDockerClient {
-      override def rawContainerLogs(containerId: ContainerId, fromPos: Long): Future[ByteBuffer] = {
-        rawContainerLogsInvocations += ((containerId, fromPos))
-        Future.failed(new IOException)
+      override def rawContainerLogs(containerId: ContainerId,
+                                    fromPos: Long,
+                                    pollInterval: Option[FiniteDuration]): Source[ByteString, Any] = {
+        rawContainerLogsInvocations += ((containerId, fromPos, pollInterval))
+        Source.failed(new IOException)
       }
     }
     implicit val runc = stub[RuncApi]
 
     val container = dockerContainer()()
-    an[IOException] should be thrownBy await(container.logs(limit = 1.MB, waitForSentinel = true))
+    an[IOException] should be thrownBy awaitLogs(container.logs(limit = 1.MB, waitForSentinel = true))
 
     docker.rawContainerLogsInvocations should have size 1
-    val (id, fromPos) = docker.rawContainerLogsInvocations(0)
+    val (id, fromPos, _) = docker.rawContainerLogsInvocations(0)
     id shouldBe containerId
     fromPos shouldBe 0
-
-    exactly(1, logLines) should include(s"Failed to obtain logs of ${containerId.asString}")
   }
 
   it should "read two consecutive logs with sentinel" in {
@@ -521,23 +531,25 @@ class DockerContainerTests
     val returnValues = mutable.Queue(firstRawLog, secondRawLog)
 
     implicit val docker = new TestDockerClient {
-      override def rawContainerLogs(containerId: ContainerId, fromPos: Long): Future[ByteBuffer] = {
-        rawContainerLogsInvocations += ((containerId, fromPos))
-        Future.successful(returnValues.dequeue())
+      override def rawContainerLogs(containerId: ContainerId,
+                                    fromPos: Long,
+                                    pollInterval: Option[FiniteDuration]): Source[ByteString, Any] = {
+        rawContainerLogsInvocations += ((containerId, fromPos, pollInterval))
+        Source.single(returnValues.dequeue())
       }
     }
     implicit val runc = stub[RuncApi]
 
     val container = dockerContainer()()
     // Read without tight limit so that the full read result is processed
-    val processedFirstLog = await(container.logs(limit = 1.MB, waitForSentinel = true))
-    val processedSecondLog = await(container.logs(limit = 1.MB, waitForSentinel = true))
+    val processedFirstLog = awaitLogs(container.logs(limit = 1.MB, waitForSentinel = true))
+    val processedSecondLog = awaitLogs(container.logs(limit = 1.MB, waitForSentinel = true))
 
     docker.rawContainerLogsInvocations should have size 2
-    val (_, fromPos1) = docker.rawContainerLogsInvocations(0)
+    val (_, fromPos1, _) = docker.rawContainerLogsInvocations(0)
     fromPos1 shouldBe 0
-    val (_, fromPos2) = docker.rawContainerLogsInvocations(1)
-    fromPos2 shouldBe firstRawLog.capacity() // second read should start behind the first line
+    val (_, fromPos2, _) = docker.rawContainerLogsInvocations(1)
+    fromPos2 shouldBe firstRawLog.length // second read should start behind the first line
 
     processedFirstLog should have size 1
     processedFirstLog shouldBe Vector(firstLogEntry.toFormattedString)
@@ -545,75 +557,34 @@ class DockerContainerTests
     processedSecondLog shouldBe Vector(secondLogEntry.toFormattedString)
   }
 
-  it should "retry log reading if sentinel cannot be found in the first place" in {
-    val retries = 15
-    val expectedLog = (1 to retries).map { i =>
-      LogLine(Instant.EPOCH.plusMillis(i.toLong).toString, "stdout", s"This is log entry ${i}.\n")
-    }.toVector
-    val returnValues = mutable.Queue.empty[ByteBuffer]
-    for (i <- 0 to retries) {
-      // Sentinel only added for the last return value
-      returnValues += toRawLog(expectedLog.take(i).toSeq, appendSentinel = (i == retries))
-    }
+  it should "eventually terminate even if no sentinels can be found" in {
+
+    val expectedLog = Seq(LogLine(Instant.EPOCH.toString, "stdout", s"This is log entry.\n"))
+    val rawLog = toRawLog(expectedLog, appendSentinel = false)
 
     implicit val docker = new TestDockerClient {
-      override def rawContainerLogs(containerId: ContainerId, fromPos: Long): Future[ByteBuffer] = {
-        rawContainerLogsInvocations += ((containerId, fromPos))
-        Future.successful(returnValues.dequeue())
+      override def rawContainerLogs(containerId: ContainerId,
+                                    fromPos: Long,
+                                    pollInterval: Option[FiniteDuration]): Source[ByteString, Any] = {
+        rawContainerLogsInvocations += ((containerId, fromPos, pollInterval))
+        // "Fakes" an infinite source with only 1 entry
+        Source.tick(0.milliseconds, 10.seconds, rawLog)
       }
     }
     implicit val runc = stub[RuncApi]
 
-    val container = dockerContainer()(retryCount = retries)
+    val waitForLogs = 100.milliseconds
+    val container = dockerContainer()(awaitLogs = waitForLogs)
     // Read without tight limit so that the full read result is processed
-    val processedLog = await(container.logs(limit = 1.MB, waitForSentinel = true))
-
-    docker.rawContainerLogsInvocations should have size retries + 1
-    forAll(docker.rawContainerLogsInvocations) {
-      case (_, fromPos) => fromPos shouldBe 0
-    }
-
-    processedLog should have size expectedLog.length
-    processedLog shouldBe expectedLog.map(_.toFormattedString)
 
-    (retries to 1).foreach { i =>
-      exactly(1, logLines) should include(s"log cursor advanced but missing sentinel, trying ${i} more times")
-    }
-  }
+    val (interval, processedLog) = durationOf(awaitLogs(container.logs(limit = 1.MB, waitForSentinel = true)))
 
-  it should "provide full log if log reading retries are exhausted and no sentinel can be found" in {
-    val retries = 15
-    val expectedLog = (1 to retries).map { i =>
-      LogLine(Instant.EPOCH.plusMillis(i.toLong).toString, "stdout", s"This is log entry ${i}.\n")
-    }.toVector
-    val returnValues = mutable.Queue.empty[ByteBuffer]
-    for (i <- 0 to retries) {
-      returnValues += toRawLog(expectedLog.take(i).toSeq, appendSentinel = false)
-    }
+    interval.toMillis should (be >= waitForLogs.toMillis and be < (waitForLogs * 2).toMillis)
 
-    implicit val docker = new TestDockerClient {
-      override def rawContainerLogs(containerId: ContainerId, fromPos: Long): Future[ByteBuffer] = {
-        rawContainerLogsInvocations += ((containerId, fromPos))
-        Future.successful(returnValues.dequeue())
-      }
-    }
-    implicit val runc = stub[RuncApi]
-
-    val container = dockerContainer()(retryCount = retries)
-    // Read without tight limit so that the full read result is processed
-    val processedLog = await(container.logs(limit = 1.MB, waitForSentinel = true))
-
-    docker.rawContainerLogsInvocations should have size retries + 1
-    forAll(docker.rawContainerLogsInvocations) {
-      case (_, fromPos) => fromPos shouldBe 0
-    }
+    docker.rawContainerLogsInvocations should have size 1
 
     processedLog should have size expectedLog.length
     processedLog shouldBe expectedLog.map(_.toFormattedString)
-
-    (retries to 1).foreach { i =>
-      exactly(1, logLines) should include(s"log cursor advanced but missing sentinel, trying ${i} more times")
-    }
   }
 
   it should "truncate logs and advance reading position to end of current read" in {
@@ -630,50 +601,136 @@ class DockerContainerTests
     val thirdLogFirstEntry =
       LogLine(Instant.EPOCH.plusMillis(4L).toString, "stdout", "This is the first line in third log.\n")
 
-    val firstRawLog = toRawLog(Seq(firstLogFirstEntry, firstLogSecondEntry), appendSentinel = true)
+    val firstRawLog = toRawLog(Seq(firstLogFirstEntry, firstLogSecondEntry), appendSentinel = false)
     val secondRawLog = toRawLog(Seq(secondLogFirstEntry, secondLogSecondEntry), appendSentinel = false)
     val thirdRawLog = toRawLog(Seq(thirdLogFirstEntry), appendSentinel = true)
 
     val returnValues = mutable.Queue(firstRawLog, secondRawLog, thirdRawLog)
 
     implicit val docker = new TestDockerClient {
-      override def rawContainerLogs(containerId: ContainerId, fromPos: Long): Future[ByteBuffer] = {
-        rawContainerLogsInvocations += ((containerId, fromPos))
-        Future.successful(returnValues.dequeue())
+      override def rawContainerLogs(containerId: ContainerId,
+                                    fromPos: Long,
+                                    pollInterval: Option[FiniteDuration]): Source[ByteString, Any] = {
+        rawContainerLogsInvocations += ((containerId, fromPos, pollInterval))
+        Source.single(returnValues.dequeue())
       }
     }
     implicit val runc = stub[RuncApi]
 
     val container = dockerContainer()()
-    val processedFirstLog = await(container.logs(limit = firstLogFirstEntry.log.sizeInBytes, waitForSentinel = true))
+    val processedFirstLog = awaitLogs(container.logs(limit = (firstRawLog.length - 1).bytes, waitForSentinel = true))
     val processedSecondLog =
-      await(container.logs(limit = secondLogFirstEntry.log.take(secondLogLimit).sizeInBytes, waitForSentinel = false))
-    val processedThirdLog = await(container.logs(limit = 1.MB, waitForSentinel = true))
+      awaitLogs(container.logs(limit = (secondRawLog.length - 1).bytes, waitForSentinel = false))
+    val processedThirdLog = awaitLogs(container.logs(limit = 1.MB, waitForSentinel = true))
 
     docker.rawContainerLogsInvocations should have size 3
-    val (_, fromPos1) = docker.rawContainerLogsInvocations(0)
+    val (_, fromPos1, _) = docker.rawContainerLogsInvocations(0)
     fromPos1 shouldBe 0
-    val (_, fromPos2) = docker.rawContainerLogsInvocations(1)
-    fromPos2 shouldBe firstRawLog.capacity() // second read should start behind full content of first read
-    val (_, fromPos3) = docker.rawContainerLogsInvocations(2)
-    fromPos3 shouldBe firstRawLog.capacity() + secondRawLog
-      .capacity() // third read should start behind full content of first and second read
+    val (_, fromPos2, _) = docker.rawContainerLogsInvocations(1)
+    fromPos2 shouldBe firstRawLog.length // second read should start behind full content of first read
+    val (_, fromPos3, _) = docker.rawContainerLogsInvocations(2)
+    fromPos3 shouldBe firstRawLog.length + secondRawLog.length // third read should start behind full content of first and second read
 
     processedFirstLog should have size 2
     processedFirstLog(0) shouldBe firstLogFirstEntry.toFormattedString
-    processedFirstLog(1) should startWith(Messages.truncateLogs(firstLogFirstEntry.log.sizeInBytes))
+    // Allowing just 1 byte less than the JSON structure causes the entire line to drop
+    processedFirstLog(1) should include(Messages.truncateLogs((firstRawLog.length - 1).bytes))
 
     processedSecondLog should have size 2
-    processedSecondLog(0) shouldBe secondLogFirstEntry
-      .copy(log = secondLogFirstEntry.log.take(secondLogLimit))
-      .toFormattedString
-    processedSecondLog(1) should startWith(
-      Messages.truncateLogs(secondLogFirstEntry.log.take(secondLogLimit).sizeInBytes))
+    processedSecondLog(0) shouldBe secondLogFirstEntry.toFormattedString
+    processedSecondLog(1) should include(Messages.truncateLogs((secondRawLog.length - 1).bytes))
 
     processedThirdLog should have size 1
     processedThirdLog(0) shouldBe thirdLogFirstEntry.toFormattedString
   }
 
+  it should "not fail if the last log-line is incomplete" in {
+    val expectedLogEntry = LogLine(Instant.EPOCH.toString, "stdout", "This is a log entry.\n")
+    // "destroy" the second log entry by dropping some bytes
+    val rawLog = toRawLog(Seq(expectedLogEntry, expectedLogEntry), appendSentinel = false).dropRight(10)
+
+    implicit val docker = new TestDockerClient {
+      override def rawContainerLogs(containerId: ContainerId,
+                                    fromPos: Long,
+                                    pollInterval: Option[FiniteDuration]): Source[ByteString, Any] = {
+        rawContainerLogsInvocations += ((containerId, fromPos, pollInterval))
+        Source.single(rawLog)
+      }
+    }
+    implicit val runc = stub[RuncApi]
+
+    val container = dockerContainer(id = containerId)()
+    // Read with tight limit to verify that no truncation occurs
+    val processedLogs = awaitLogs(container.logs(limit = rawLog.length.bytes, waitForSentinel = false))
+
+    docker.rawContainerLogsInvocations should have size 1
+    val (id, fromPos, _) = docker.rawContainerLogsInvocations(0)
+    id shouldBe containerId
+    fromPos shouldBe 0
+
+    processedLogs should have size 2
+    processedLogs(0) shouldBe expectedLogEntry.toFormattedString
+    processedLogs(1) should include(Messages.logFailure)
+  }
+
+  it should "include an incomplete warning if sentinels have not been found only if we wait for sentinels" in {
+    val expectedLogEntry = LogLine(Instant.EPOCH.toString, "stdout", "This is a log entry.\n")
+    val rawLog = toRawLog(Seq(expectedLogEntry, expectedLogEntry), appendSentinel = false)
+
+    implicit val docker = new TestDockerClient {
+      override def rawContainerLogs(containerId: ContainerId,
+                                    fromPos: Long,
+                                    pollInterval: Option[FiniteDuration]): Source[ByteString, Any] = {
+        rawContainerLogsInvocations += ((containerId, fromPos, pollInterval))
+        Source.single(rawLog)
+      }
+    }
+    implicit val runc = stub[RuncApi]
+
+    val container = dockerContainer(id = containerId)()
+    // Read with tight limit to verify that no truncation occurs
+    val processedLogs = awaitLogs(container.logs(limit = rawLog.length.bytes, waitForSentinel = true))
+
+    docker.rawContainerLogsInvocations should have size 1
+    val (id, fromPos, _) = docker.rawContainerLogsInvocations(0)
+    id shouldBe containerId
+    fromPos shouldBe 0
+
+    processedLogs should have size 3
+    processedLogs(0) shouldBe expectedLogEntry.toFormattedString
+    processedLogs(1) shouldBe expectedLogEntry.toFormattedString
+    processedLogs(2) should include(Messages.logFailure)
+
+    val processedLogsFalse = awaitLogs(container.logs(limit = rawLog.length.bytes, waitForSentinel = false))
+    processedLogsFalse should have size 2
+    processedLogsFalse(0) shouldBe expectedLogEntry.toFormattedString
+    processedLogsFalse(1) shouldBe expectedLogEntry.toFormattedString
+  }
+
+  it should "strip sentinel lines if it waits or doesn't wait for them" in {
+    val expectedLogEntry = LogLine(Instant.EPOCH.toString, "stdout", "This is a log entry.\n")
+    val rawLog = toRawLog(Seq(expectedLogEntry), appendSentinel = true)
+
+    implicit val docker = new TestDockerClient {
+      override def rawContainerLogs(containerId: ContainerId,
+                                    fromPos: Long,
+                                    pollInterval: Option[FiniteDuration]): Source[ByteString, Any] = {
+        rawContainerLogsInvocations += ((containerId, fromPos, pollInterval))
+        Source.single(rawLog)
+      }
+    }
+    implicit val runc = stub[RuncApi]
+
+    val container = dockerContainer(id = containerId)()
+    val processedLogs = awaitLogs(container.logs(limit = 1.MB, waitForSentinel = true))
+    processedLogs should have size 1
+    processedLogs(0) shouldBe expectedLogEntry.toFormattedString
+
+    val processedLogsFalse = awaitLogs(container.logs(limit = 1.MB, waitForSentinel = false))
+    processedLogsFalse should have size 1
+    processedLogsFalse(0) shouldBe expectedLogEntry.toFormattedString
+  }
+
   class TestDockerClient extends DockerApiWithFileAccess {
     var runs = mutable.Buffer.empty[(String, Seq[String])]
     var inspects = mutable.Buffer.empty[(ContainerId, String)]
@@ -681,7 +738,7 @@ class DockerContainerTests
     var unpauses = mutable.Buffer.empty[ContainerId]
     var rms = mutable.Buffer.empty[ContainerId]
     var pulls = mutable.Buffer.empty[String]
-    var rawContainerLogsInvocations = mutable.Buffer.empty[(ContainerId, Long)]
+    var rawContainerLogsInvocations = mutable.Buffer.empty[(ContainerId, Long, Option[FiniteDuration])]
 
     def run(image: String, args: Seq[String] = Seq.empty[String])(
       implicit transid: TransactionId): Future[ContainerId] = {
@@ -720,9 +777,11 @@ class DockerContainerTests
 
     override def isOomKilled(id: ContainerId)(implicit transid: TransactionId): Future[Boolean] = ???
 
-    def rawContainerLogs(containerId: ContainerId, fromPos: Long): Future[ByteBuffer] = {
-      rawContainerLogsInvocations += ((containerId, fromPos))
-      Future.successful(ByteBuffer.wrap(Array[Byte]()))
+    override def rawContainerLogs(containerId: ContainerId,
+                                  fromPos: Long,
+                                  pollInterval: Option[FiniteDuration]): Source[ByteString, Any] = {
+      rawContainerLogsInvocations += ((containerId, fromPos, pollInterval))
+      Source.single(ByteString.empty)
     }
   }
 }
diff --git a/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerLogStoreTests.scala b/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerLogStoreTests.scala
new file mode 100644
index 0000000000..d6de3cc0ec
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerLogStoreTests.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.logging.test
+
+import common.{StreamLogging, WskActorSystem}
+import org.scalatest.{FlatSpec, Matchers}
+import whisk.core.containerpool.logging.{DockerLogStoreProvider, LogCollectingException, LogLine}
+import whisk.core.entity.ExecManifest.{ImageName, RuntimeManifest}
+import whisk.core.entity._
+import java.time.Instant
+
+import akka.stream.scaladsl.Source
+import akka.util.ByteString
+import spray.json._
+import whisk.common.{Logging, TransactionId}
+import whisk.core.containerpool.{Container, ContainerAddress, ContainerId}
+import whisk.http.Messages
+
+import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.concurrent.duration._
+
+class DockerLogStoreTests extends FlatSpec with Matchers with WskActorSystem with StreamLogging {
+  def await[T](future: Future[T]) = Await.result(future, 1.minute)
+
+  val user = Identity(Subject(), EntityName("testSpace"), AuthKey(), Set())
+  val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
+  val action = ExecutableWhiskAction(user.namespace.toPath, EntityName("actionName"), exec)
+  val activation =
+    WhiskActivation(user.namespace.toPath, action.name, user.subject, ActivationId(), Instant.EPOCH, Instant.EPOCH)
+
+  def toByteString(logs: List[LogLine]) = logs.map(_.toJson.compactPrint).map(ByteString.apply)
+
+  val tid = TransactionId.testing
+
+  behavior of "DockerLogStore"
+
+  it should "read logs into a sequence and parse them into the specified format" in {
+    val store = DockerLogStoreProvider.logStore(actorSystem)
+
+    val logs = List(
+      LogLine(Instant.now.toString, "stdout", "this is a log"),
+      LogLine(Instant.now.toString, "stdout", "this is a log too"))
+    val container = new TestContainer(Source(toByteString(logs)))
+
+    await(store.collectLogs(tid, container, action)) shouldBe ActivationLogs(logs.map(_.toFormattedString).toVector)
+  }
+
+  it should "report an error if the logs contain an 'official' notice of such" in {
+    val store = DockerLogStoreProvider.logStore(actorSystem)
+
+    val logs = List(
+      LogLine(Instant.now.toString, "stdout", "this is a log"),
+      LogLine(Instant.now.toString, "stderr", Messages.logFailure))
+    val container = new TestContainer(Source(toByteString(logs)))
+
+    val ex = the[LogCollectingException] thrownBy await(store.collectLogs(tid, container, action))
+    ex.partialLogs shouldBe ActivationLogs(logs.map(_.toFormattedString).toVector)
+  }
+
+  it should "report an error if logs have been truncated" in {
+    val store = DockerLogStoreProvider.logStore(actorSystem)
+
+    val logs = List(
+      LogLine(Instant.now.toString, "stdout", "this is a log"),
+      LogLine(Instant.now.toString, "stderr", Messages.truncateLogs(action.limits.logs.asMegaBytes)))
+    val container = new TestContainer(Source(toByteString(logs)))
+
+    val ex = the[LogCollectingException] thrownBy await(store.collectLogs(tid, container, action))
+    ex.partialLogs shouldBe ActivationLogs(logs.map(_.toFormattedString).toVector)
+  }
+
+  class TestContainer(lines: Source[ByteString, Any],
+                      val id: ContainerId = ContainerId("test"),
+                      val addr: ContainerAddress = ContainerAddress("test", 1234))(implicit val ec: ExecutionContext,
+                                                                                   val logging: Logging)
+      extends Container {
+    def suspend()(implicit transid: TransactionId): Future[Unit] = ???
+    def resume()(implicit transid: TransactionId): Future[Unit] = ???
+
+    def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId) = lines
+  }
+}
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index d3484e9767..8da8aeac11 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -18,11 +18,11 @@
 package whisk.core.containerpool.test
 
 import java.time.Instant
+
 import scala.concurrent.Future
 import scala.concurrent.Promise
 import scala.concurrent.duration._
 import org.junit.runner.RunWith
-import org.scalamock.scalatest.MockFactory
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FlatSpecLike
 import org.scalatest.Matchers
@@ -33,10 +33,13 @@ import akka.actor.FSM
 import akka.actor.FSM.CurrentState
 import akka.actor.FSM.SubscribeTransitionCallBack
 import akka.actor.FSM.Transition
+import akka.stream.scaladsl.Source
 import akka.testkit.ImplicitSender
 import akka.testkit.TestKit
+import akka.util.ByteString
 import common.LoggedFunction
 import common.StreamLogging
+
 import scala.concurrent.ExecutionContext
 import spray.json._
 import spray.json.DefaultJsonProtocol._
@@ -44,11 +47,12 @@ import whisk.common.Logging
 import whisk.common.TransactionId
 import whisk.core.connector.ActivationMessage
 import whisk.core.containerpool._
-import whisk.core.containerpool.logging.DockerLogStore
+import whisk.core.containerpool.logging.LogCollectingException
 import whisk.core.entity._
 import whisk.core.entity.ExecManifest.RuntimeManifest
 import whisk.core.entity.ExecManifest.ImageName
 import whisk.core.entity.size._
+import whisk.http.Messages
 
 @RunWith(classOf[JUnitRunner])
 class ContainerProxyTests
@@ -57,7 +61,6 @@ class ContainerProxyTests
     with FlatSpecLike
     with Matchers
     with BeforeAndAfterAll
-    with MockFactory
     with StreamLogging {
 
   override def afterAll = TestKit.shutdownActorSystem(system)
@@ -147,9 +150,14 @@ class ContainerProxyTests
       response
   }
 
-  val store = stubFunction[TransactionId, WhiskActivation, Future[Any]]
+  def createCollector(response: Future[ActivationLogs] = Future.successful(ActivationLogs(Vector.empty))) =
+    LoggedFunction { (transid: TransactionId, container: Container, action: ExecutableWhiskAction) =>
+      response
+    }
 
-  val collectLogs = new DockerLogStore(system).collectLogs _
+  def createStore = LoggedFunction { (transid: TransactionId, activation: WhiskActivation) =>
+    Future.successful(())
+  }
 
   behavior of "ContainerProxy"
 
@@ -159,9 +167,11 @@ class ContainerProxyTests
   it should "create a container given a Start message" in within(timeout) {
     val container = new TestContainer
     val factory = createFactory(Future.successful(container))
+    val store = createStore
 
     val machine =
-      childActorOf(ContainerProxy.props(factory, createAcker, store, collectLogs, InstanceId(0), pauseGrace = timeout))
+      childActorOf(
+        ContainerProxy.props(factory, createAcker, store, createCollector(), InstanceId(0), pauseGrace = timeout))
     registerCallback(machine)
     preWarm(machine)
 
@@ -177,9 +187,11 @@ class ContainerProxyTests
     val container = new TestContainer
     val factory = createFactory(Future.successful(container))
     val acker = createAcker
+    val store = createStore
+    val collector = createCollector()
 
     val machine =
-      childActorOf(ContainerProxy.props(factory, acker, store, collectLogs, InstanceId(0), pauseGrace = timeout))
+      childActorOf(ContainerProxy.props(factory, acker, store, collector, InstanceId(0), pauseGrace = timeout))
     registerCallback(machine)
 
     preWarm(machine)
@@ -198,11 +210,11 @@ class ContainerProxyTests
       factory.calls should have size 1
       container.initializeCount shouldBe 1
       container.runCount shouldBe 1
-      container.logsCount shouldBe 1
+      collector.calls should have size 1
       container.suspendCount shouldBe 1
       container.destroyCount shouldBe 1
       acker.calls should have size 1
-      store.verify(message.transid, *)
+      store.calls should have size 1
     }
   }
 
@@ -210,9 +222,11 @@ class ContainerProxyTests
     val container = new TestContainer
     val factory = createFactory(Future.successful(container))
     val acker = createAcker
+    val store = createStore
+    val collector = createCollector()
 
     val machine =
-      childActorOf(ContainerProxy.props(factory, acker, store, collectLogs, InstanceId(0), pauseGrace = timeout))
+      childActorOf(ContainerProxy.props(factory, acker, store, collector, InstanceId(0), pauseGrace = timeout))
     registerCallback(machine)
     preWarm(machine)
 
@@ -224,10 +238,10 @@ class ContainerProxyTests
       factory.calls should have size 1
       container.initializeCount shouldBe 1
       container.runCount shouldBe 2
-      container.logsCount shouldBe 2
+      collector.calls should have size 2
       container.suspendCount shouldBe 0
       acker.calls should have size 2
-      store.verify(message.transid, *).repeat(2)
+      store.calls should have size 2
     }
   }
 
@@ -235,9 +249,11 @@ class ContainerProxyTests
     val container = new TestContainer
     val factory = createFactory(Future.successful(container))
     val acker = createAcker
+    val store = createStore
+    val collector = createCollector()
 
     val machine =
-      childActorOf(ContainerProxy.props(factory, acker, store, collectLogs, InstanceId(0), pauseGrace = timeout))
+      childActorOf(ContainerProxy.props(factory, acker, store, collector, InstanceId(0), pauseGrace = timeout))
     registerCallback(machine)
     preWarm(machine)
 
@@ -250,11 +266,11 @@ class ContainerProxyTests
       factory.calls should have size 1
       container.initializeCount shouldBe 1
       container.runCount shouldBe 2
-      container.logsCount shouldBe 2
+      collector.calls should have size 2
       container.suspendCount shouldBe 1
       container.resumeCount shouldBe 1
       acker.calls should have size 2
-      store.verify(message.transid, *).repeat(2)
+      store.calls should have size 2
     }
   }
 
@@ -262,9 +278,11 @@ class ContainerProxyTests
     val container = new TestContainer
     val factory = createFactory(Future.successful(container))
     val acker = createAcker
+    val store = createStore
+    val collector = createCollector()
 
     val machine =
-      childActorOf(ContainerProxy.props(factory, acker, store, collectLogs, InstanceId(0), pauseGrace = timeout))
+      childActorOf(ContainerProxy.props(factory, acker, store, collector, InstanceId(0), pauseGrace = timeout))
     registerCallback(machine)
     run(machine, Uninitialized)
 
@@ -272,9 +290,9 @@ class ContainerProxyTests
       factory.calls should have size 1
       container.initializeCount shouldBe 1
       container.runCount shouldBe 1
-      container.logsCount shouldBe 1
+      collector.calls should have size 1
       acker.calls should have size 1
-      store.verify(message.transid, *).repeat(1)
+      store.calls should have size 1
     }
   }
 
@@ -285,9 +303,11 @@ class ContainerProxyTests
     val container = new TestContainer
     val factory = createFactory(Future.failed(new Exception()))
     val acker = createAcker
+    val store = createStore
+    val collector = createCollector()
 
     val machine =
-      childActorOf(ContainerProxy.props(factory, acker, store, collectLogs, InstanceId(0), pauseGrace = timeout))
+      childActorOf(ContainerProxy.props(factory, acker, store, collector, InstanceId(0), pauseGrace = timeout))
     registerCallback(machine)
     machine ! Run(action, message)
     expectMsg(Transition(machine, Uninitialized, Running))
@@ -297,11 +317,11 @@ class ContainerProxyTests
       factory.calls should have size 1
       container.initializeCount shouldBe 0
       container.runCount shouldBe 0
-      container.logsCount shouldBe 0 // gather no logs
+      collector.calls should have size 0 // gather no logs
       container.destroyCount shouldBe 0 // no destroying possible as no container could be obtained
       acker.calls should have size 1
       acker.calls(0)._2.response should be a 'whiskError
-      store.verify(message.transid, *).repeat(1)
+      store.calls should have size 1
     }
   }
 
@@ -315,9 +335,11 @@ class ContainerProxyTests
     }
     val factory = createFactory(Future.successful(container))
     val acker = createAcker
+    val store = createStore
+    val collector = createCollector()
 
     val machine =
-      childActorOf(ContainerProxy.props(factory, acker, store, collectLogs, InstanceId(0), pauseGrace = timeout))
+      childActorOf(ContainerProxy.props(factory, acker, store, collector, InstanceId(0), pauseGrace = timeout))
     registerCallback(machine)
     machine ! Run(action, message)
     expectMsg(Transition(machine, Uninitialized, Running))
@@ -328,10 +350,10 @@ class ContainerProxyTests
       factory.calls should have size 1
       container.initializeCount shouldBe 1
       container.runCount shouldBe 0 // should not run the action
-      container.logsCount shouldBe 1
+      collector.calls should have size 1
       container.destroyCount shouldBe 1
       acker.calls(0)._2.response shouldBe ActivationResponse.applicationError("boom")
-      store.verify(message.transid, *).repeat(1)
+      store.calls should have size 1
     }
   }
 
@@ -345,9 +367,11 @@ class ContainerProxyTests
     }
     val factory = createFactory(Future.successful(container))
     val acker = createAcker
+    val store = createStore
+    val collector = createCollector()
 
     val machine =
-      childActorOf(ContainerProxy.props(factory, acker, store, collectLogs, InstanceId(0), pauseGrace = timeout))
+      childActorOf(ContainerProxy.props(factory, acker, store, collector, InstanceId(0), pauseGrace = timeout))
     registerCallback(machine)
     machine ! Run(action, message)
     expectMsg(Transition(machine, Uninitialized, Running))
@@ -358,10 +382,69 @@ class ContainerProxyTests
       factory.calls should have size 1
       container.initializeCount shouldBe 1
       container.runCount shouldBe 1
-      container.logsCount shouldBe 1
+      collector.calls should have size 1
       container.destroyCount shouldBe 1
       acker.calls(0)._2.response shouldBe ActivationResponse.applicationError("boom")
-      store.verify(message.transid, *).repeat(1)
+      store.calls should have size 1
+    }
+  }
+
+  it should "complete the transaction and destroy the container if log reading failed" in {
+    val container = new TestContainer
+    val factory = createFactory(Future.successful(container))
+    val acker = createAcker
+    val store = createStore
+
+    val partialLogs = Vector("this log line made it", Messages.logFailure)
+    val collector =
+      createCollector(Future.failed(LogCollectingException(ActivationLogs(partialLogs))))
+
+    val machine =
+      childActorOf(ContainerProxy.props(factory, acker, store, collector, InstanceId(0), pauseGrace = timeout))
+    registerCallback(machine)
+    machine ! Run(action, message)
+    expectMsg(Transition(machine, Uninitialized, Running))
+    expectMsg(ContainerRemoved) // The message is sent as soon as the container decides to destroy itself
+    expectMsg(Transition(machine, Running, Removing))
+
+    awaitAssert {
+      factory.calls should have size 1
+      container.initializeCount shouldBe 1
+      container.runCount shouldBe 1
+      collector.calls should have size 1
+      container.destroyCount shouldBe 1
+      acker.calls should have size 1
+      acker.calls(0)._2.response shouldBe ActivationResponse.success()
+      store.calls should have size 1
+      store.calls(0)._2.logs shouldBe ActivationLogs(partialLogs)
+    }
+  }
+
+  it should "complete the transaction and destroy the container if log reading failed terminally" in {
+    val container = new TestContainer
+    val factory = createFactory(Future.successful(container))
+    val acker = createAcker
+    val store = createStore
+    val collector = createCollector(Future.failed(new Exception))
+
+    val machine =
+      childActorOf(ContainerProxy.props(factory, acker, store, collector, InstanceId(0), pauseGrace = timeout))
+    registerCallback(machine)
+    machine ! Run(action, message)
+    expectMsg(Transition(machine, Uninitialized, Running))
+    expectMsg(ContainerRemoved) // The message is sent as soon as the container decides to destroy itself
+    expectMsg(Transition(machine, Running, Removing))
+
+    awaitAssert {
+      factory.calls should have size 1
+      container.initializeCount shouldBe 1
+      container.runCount shouldBe 1
+      collector.calls should have size 1
+      container.destroyCount shouldBe 1
+      acker.calls should have size 1
+      acker.calls(0)._2.response shouldBe ActivationResponse.success()
+      store.calls should have size 1
+      store.calls(0)._2.logs shouldBe ActivationLogs(Vector(Messages.logFailure))
     }
   }
 
@@ -374,9 +457,10 @@ class ContainerProxyTests
     }
     val factory = createFactory(Future.successful(container))
     val acker = createAcker
+    val store = createStore
 
     val machine =
-      childActorOf(ContainerProxy.props(factory, acker, store, collectLogs, InstanceId(0), pauseGrace = timeout))
+      childActorOf(ContainerProxy.props(factory, acker, store, createCollector(), InstanceId(0), pauseGrace = timeout))
     registerCallback(machine)
     run(machine, Uninitialized) // first run an activation
     timeout(machine) // times out Ready state so container suspends
@@ -407,9 +491,10 @@ class ContainerProxyTests
     }
     val factory = createFactory(Future.successful(container))
     val acker = createAcker
+    val store = createStore
 
     val machine =
-      childActorOf(ContainerProxy.props(factory, acker, store, collectLogs, InstanceId(0), pauseGrace = timeout))
+      childActorOf(ContainerProxy.props(factory, acker, store, createCollector(), InstanceId(0), pauseGrace = timeout))
     registerCallback(machine)
     run(machine, Uninitialized)
     timeout(machine) // times out Ready state so container suspends
@@ -440,9 +525,11 @@ class ContainerProxyTests
     }
     val factory = createFactory(Future.successful(container))
     val acker = createAcker
+    val store = createStore
+    val collector = createCollector()
 
     val machine =
-      childActorOf(ContainerProxy.props(factory, acker, store, collectLogs, InstanceId(0), pauseGrace = timeout))
+      childActorOf(ContainerProxy.props(factory, acker, store, collector, InstanceId(0), pauseGrace = timeout))
     registerCallback(machine)
 
     // Start running the action
@@ -465,11 +552,11 @@ class ContainerProxyTests
       factory.calls should have size 1
       container.initializeCount shouldBe 1
       container.runCount shouldBe 1
-      container.logsCount shouldBe 1
+      collector.calls should have size 1
       container.suspendCount shouldBe 0 // skips pausing the container
       container.destroyCount shouldBe 1
       acker.calls should have size 1
-      store.verify(message.transid, *).repeat(1)
+      store.calls should have size 1
     }
   }
 
@@ -488,9 +575,11 @@ class ContainerProxyTests
     }
     val factory = createFactory(Future.successful(container))
     val acker = createAcker
+    val store = createStore
+    val collector = createCollector()
 
     val machine =
-      childActorOf(ContainerProxy.props(factory, acker, store, collectLogs, InstanceId(0), pauseGrace = timeout))
+      childActorOf(ContainerProxy.props(factory, acker, store, collector, InstanceId(0), pauseGrace = timeout))
     registerCallback(machine)
     run(machine, Uninitialized)
     timeout(machine)
@@ -511,12 +600,12 @@ class ContainerProxyTests
       factory.calls should have size 1
       container.initializeCount shouldBe 1
       container.runCount shouldBe 1
-      container.logsCount shouldBe 1
+      collector.calls should have size 1
       container.suspendCount shouldBe 1
       container.resumeCount shouldBe 1
       container.destroyCount shouldBe 1
       acker.calls should have size 1
-      store.verify(message.transid, *).repeat(1)
+      store.calls should have size 1
     }
   }
 
@@ -570,9 +659,6 @@ class ContainerProxyTests
 
       Future.successful((Interval.zero, ActivationResponse.success()))
     }
-    def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId): Future[Vector[String]] = {
-      logsCount += 1
-      Future.successful(Vector("helloTest"))
-    }
+    def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId): Source[ByteString, Any] = ???
   }
 }
diff --git a/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala b/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
index dab3da5f55..eaa288b5a8 100644
--- a/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
+++ b/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
@@ -42,7 +42,6 @@ import whisk.core.entity.ActivationEntityLimit
 import whisk.core.entity.ActivationResponse
 import whisk.core.entity.Exec
 import whisk.core.entity.size._
-import whisk.core.entity.size.SizeString
 import whisk.http.Messages
 
 @RunWith(classOf[JUnitRunner])
@@ -115,12 +114,7 @@ class ActionLimitsTests extends TestHelpers with WskTestHelpers {
       val run = wsk.action.invoke(name, Map("payload" -> attemptedSize.toBytes.toJson))
       withActivation(wsk.activation, run, totalWait = 120 seconds) { response =>
         val lines = response.logs.get
-        lines.last shouldBe Messages.truncateLogs(allowedSize)
-        (lines.length - 1) shouldBe (allowedSize.toBytes / bytesPerLine)
-        // dropping 39 characters (timestamp + stream name)
-        // then reform total string adding back newlines
-        val actual = lines.dropRight(1).map(_.drop(39)).mkString("", "\n", "\n").sizeInBytes.toBytes
-        actual shouldBe allowedSize.toBytes
+        lines.last should include(Messages.truncateLogs(allowedSize))
       }
   }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services