You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by dg...@apache.org on 2018/03/16 16:47:53 UTC

[incubator-openwhisk] branch master updated: Fixes Kubernetes logging issues from #3439 re: activation logs (#3440)

This is an automated email from the ASF dual-hosted git repository.

dgrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new e049eca  Fixes Kubernetes logging issues from #3439 re: activation logs (#3440)
e049eca is described below

commit e049eca4ce90bc9e283d8a3f7bd2f01d5f16df0e
Author: Brendan McAdams <br...@bytes.codes>
AuthorDate: Fri Mar 16 09:47:49 2018 -0700

    Fixes Kubernetes logging issues from #3439 re: activation logs (#3440)
    
    - Fixed an issue where empty results from K8S log HTTP could cause a downstream error propagation
    - Moved the last entry timestamp update up before the CompleteAfterOccurrences stage.
      When it was below, it meant that we never saw the activation marker Lines
      missing the marker lines meant the last timestamp we saved was only from
      valid logging, and we kept refetching the past activation lines
    - On occasions Kubernetes didn't have invoker logs ready at request, it
      would simply stop; fixed code to retry. However, there's no pause If no results
      come back from Kubernetes, retry until they do.
    
    Switched K8S Logging Graph Stage to a Timer based one.
    - With this, added a retry scheduler to delay 100 ms when the logs come back empty.
---
 .../kubernetes/KubernetesClient.scala              | 55 +++++++++++++++++-----
 .../kubernetes/KubernetesContainer.scala           |  9 ++--
 2 files changed, 49 insertions(+), 15 deletions(-)

diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala
index 4cfc813..9a19571 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala
@@ -46,7 +46,7 @@ import whisk.core.containerpool.docker.ProcessRunner
 import whisk.core.entity.ByteSize
 import whisk.core.entity.size._
 
-import scala.concurrent.duration.Duration
+import scala.concurrent.duration._
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.blocking
@@ -186,7 +186,7 @@ class KubernetesClient(
 
     Source
       .fromGraph(new KubernetesRestLogSourceStage(container.id, sinceTime, waitForSentinel))
-      .log("foobar")
+      .log("kubernetesLogs")
 
   }
 
@@ -258,6 +258,12 @@ object KubernetesRestLogSourceStage {
 
   import KubernetesClient.{formatK8STimestamp, parseK8STimestamp}
 
+  val retryDelay = 100.milliseconds
+
+  sealed trait K8SRestLogTimingEvent
+
+  case object K8SRestLogRetry extends K8SRestLogTimingEvent
+
   def constructPath(namespace: String, containerId: String): Path =
     Path / "api" / "v1" / "namespaces" / namespace / "pods" / containerId / "log"
 
@@ -321,7 +327,7 @@ final class KubernetesRestLogSourceStage(id: ContainerId, sinceTime: Option[Inst
   override protected def initialAttributes: Attributes = Attributes.name("KubernetesHttpLogSource")
 
   override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
-    new GraphStageLogicWithLogging(shape) { logic =>
+    new TimerGraphStageLogicWithLogging(shape) { logic =>
 
       private val queue = mutable.Queue.empty[TypedLogLine]
       private var lastTimestamp = sinceTime
@@ -331,7 +337,7 @@ final class KubernetesRestLogSourceStage(id: ContainerId, sinceTime: Option[Inst
           val path = constructPath(kubeRestClient.getNamespace, id.asString)
           val query = constructQuery(lastTimestamp, waitForSentinel)
 
-          log.debug("* Fetching K8S HTTP Logs w/ Path: {} Query: {}", path, query)
+          log.debug("*** Fetching K8S HTTP Logs w/ Path: {} Query: {}", path, query)
 
           val url = Uri(kubeRestClient.getMasterUrl.toString)
             .withPath(path)
@@ -354,11 +360,18 @@ final class KubernetesRestLogSourceStage(id: ContainerId, sinceTime: Option[Inst
       }
 
       val emitCallback: AsyncCallback[Seq[TypedLogLine]] = getAsyncCallback[Seq[TypedLogLine]] {
-        case firstLine +: restOfLines if isAvailable(out) =>
-          pushLine(firstLine)
-          queue ++= restOfLines
-        case lines =>
-          queue ++= lines
+        case lines @ firstLine +: restOfLines =>
+          if (isAvailable(out)) {
+            log.debug("* Lines Available & output ready; pushing {} (remaining: {})", firstLine, restOfLines)
+            pushLine(firstLine)
+            queue ++= restOfLines
+          } else {
+            log.debug("* Output isn't ready; queueing lines: {}", lines)
+            queue ++= lines
+          }
+        case Nil =>
+          log.debug("* Empty lines returned.")
+          retryLogs()
       }
 
       class LogFetchCallback extends Callback {
@@ -369,9 +382,12 @@ final class KubernetesRestLogSourceStage(id: ContainerId, sinceTime: Option[Inst
           try {
             val lines = readLines(response.body.source, lastTimestamp)
 
+            log.debug("* Read & decoded lines for K8S HTTP: {}", lines)
+
             response.body.source.close()
 
             lines.lastOption.foreach { line =>
+              log.debug("* Updating lastTimestamp (sinceTime) to {}", Option(line.time))
               lastTimestamp = Option(line.time)
             }
 
@@ -394,12 +410,29 @@ final class KubernetesRestLogSourceStage(id: ContainerId, sinceTime: Option[Inst
         new OutHandler {
           override def onPull(): Unit = {
             // if we still have lines queued up, return those; else make a new HTTP read.
-            if (queue.nonEmpty)
+            if (queue.nonEmpty) {
+              log.debug("* onPull, nonEmpty queue... pushing line")
               pushLine(queue.dequeue())
-            else
+            } else {
+              log.debug("* onPull, empty queue... fetching logs")
               fetchLogs()
+            }
           }
         })
+
+      def retryLogs(): Unit = {
+        // Pause before retrying so we don't thrash Kubernetes w/ HTTP requests
+        log.debug("* Scheduling a retry of log fetch in {}", retryDelay)
+        scheduleOnce(K8SRestLogRetry, retryDelay)
+      }
+
+      override protected def onTimer(timerKey: Any): Unit = timerKey match {
+        case K8SRestLogRetry =>
+          log.debug("* Timer trigger for log fetch retry")
+          fetchLogs()
+        case x =>
+          log.warning("* Got a timer trigger with an unknown key: {}", x)
+      }
     }
 }
 
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
index 11ebbbf..9f0049c 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
@@ -147,6 +147,10 @@ class KubernetesContainer(protected[core] val id: ContainerId,
         // Adding + 1 since we know there's a newline byte being read
         obj.jsonSize.toLong + 1
       }
+      .map { line =>
+        lastTimestamp.set(Option(line.time))
+        line
+      }
       .via(new CompleteAfterOccurrences(_.log == stringSentinel, 2, waitForSentinel))
       .recover {
         case _: StreamLimitReachedException =>
@@ -160,9 +164,6 @@ class KubernetesContainer(protected[core] val id: ContainerId,
           TypedLogLine(Instant.now, "stderr", Messages.logFailure)
       }
       .takeWithin(waitForLogs)
-      .map { line =>
-        lastTimestamp.set(Some(line.time))
-        line.toByteString
-      }
+      .map { _.toByteString }
   }
 }

-- 
To stop receiving notification emails like this one, please contact
dgrove@apache.org.