You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2017/11/21 12:29:45 UTC

[GitHub] sven-lange-last commented on a change in pull request #2878: Streamingly read user-logs.

sven-lange-last commented on a change in pull request #2878: Streamingly read user-logs.
URL: https://github.com/apache/incubator-openwhisk/pull/2878#discussion_r152023479
 
 

 ##########
 File path: core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
 ##########
 @@ -224,31 +230,73 @@ class DockerContainer(protected val id: ContainerId,
    *
    * @return a vector of Strings with log lines in our own JSON format
    */
-  def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId): Future[Vector[String]] = {
-
-    def readLogs(retries: Int): Future[Vector[String]] = {
-      docker
-        .rawContainerLogs(id, logFileOffset)
-        .flatMap { rawLogBytes =>
-          val rawLog =
-            new String(rawLogBytes.array, rawLogBytes.arrayOffset, rawLogBytes.position, StandardCharsets.UTF_8)
-          val (isComplete, isTruncated, formattedLogs) = processJsonDriverLogContents(rawLog, waitForSentinel, limit)
-
-          if (retries > 0 && !isComplete && !isTruncated) {
-            logging.info(this, s"log cursor advanced but missing sentinel, trying $retries more times")
-            akka.pattern.after(filePollInterval, as.scheduler)(readLogs(retries - 1))
-          } else {
-            logFileOffset += rawLogBytes.position - rawLogBytes.arrayOffset
-            Future.successful(formattedLogs)
-          }
-        }
-        .andThen {
-          case Failure(e) =>
-            logging.error(this, s"Failed to obtain logs of ${id.asString}: ${e.getClass} - ${e.getMessage}")
-        }
+  def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId): Source[ByteString, Any] = {
+    val source = docker
+      .rawContainerLogs(id, logFileOffset.get(), if (waitForSentinel) Some(filePollInterval) else None)
+      .via(Framing.delimiter(delimiter, Int.MaxValue))
+      .limitWeighted(limit.toBytes) { obj =>
 
 Review comment:
   * Please add a comment how we use this stage and - in particular - why we add 1 to the object's size.
   * What happens if we reach the limit in the middle of an element - i.e. a log line? Is that log line still consumed or not forwarded to the next stage at all?
   * Ideally, if we read a log with sentinel and reach the log limit in the middle of a line, we should continue consuming the log until the sentinel is reached and set the read position for the next activation. Otherwise, logs for the next activation won't be proper.
   * If we read a log without sentinel and reach the log limit in the middle of a line, that line should be consumed and the read position for the next activation set accordingly. Otherwise the first log line associated with the next activation will be broken.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services