You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2017/12/06 12:57:27 UTC

[GitHub] cbickel closed pull request #2974: Add LogStore which stores to database and file simultaneously.

cbickel closed pull request #2974: Add LogStore which stores to database and file simultaneously.
URL: https://github.com/apache/incubator-openwhisk/pull/2974
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 0e89cc0757..7467aee7f2 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -184,6 +184,8 @@ invoker:
   docker:
     become: "{{ invoker_docker_become | default(false) }}"
 
+userLogs:
+  spi: "{{ userLogs_spi | default('whisk.core.containerpool.logging.DockerToActivationLogStoreProvider') }}"
 
 nginx:
   confdir: "{{ config_root_dir }}/nginx"
diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml
index a0cdc72d30..6caa545536 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -94,6 +94,8 @@
 
       "CONFIG_kamon_statsd_hostname": "{{ metrics.kamon.host }}"
       "CONFIG_kamon_statsd_port": "{{ metrics.kamon.port }}"
+
+      "CONFIG_whisk_spi_LogStoreProvider": "{{ userLogs.spi }}"
     volumes:
       - "{{ whisk_logs_dir }}/controller{{ groups['controllers'].index(inventory_hostname) }}:/logs"
     ports:
diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index 0aa0d5d451..112b37b80f 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -153,6 +153,7 @@
         -e METRICS_LOG='{{ metrics.log.enabled }}'
         -e CONFIG_kamon_statsd_hostname='{{ metrics.kamon.host }}'
         -e CONFIG_kamon_statsd_port='{{ metrics.kamon.port }}'
+        -e CONFIG_whisk_spi_LogStoreProvider='{{ userLogs.spi }}'
         -v /sys/fs/cgroup:/sys/fs/cgroup
         -v /run/runc:/run/runc
         -v {{ whisk_logs_dir }}/invoker{{ groups['invokers'].index(inventory_hostname) }}:/logs
diff --git a/common/scala/src/main/resources/reference.conf b/common/scala/src/main/resources/reference.conf
index bf2e694d8b..45543e5865 100644
--- a/common/scala/src/main/resources/reference.conf
+++ b/common/scala/src/main/resources/reference.conf
@@ -2,5 +2,5 @@ whisk.spi{
   ArtifactStoreProvider = whisk.core.database.CouchDbStoreProvider
   MessagingProvider = whisk.connector.kafka.KafkaMessagingProvider
   ContainerFactoryProvider = whisk.core.containerpool.docker.DockerContainerFactoryProvider
-  LogStoreProvider = whisk.core.containerpool.logging.DockerLogStoreProvider
-}
\ No newline at end of file
+  LogStoreProvider = whisk.core.containerpool.logging.DockerToActivationLogStoreProvider
+}
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
new file mode 100644
index 0000000000..f31c3205b2
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.logging
+
+import java.nio.file.{Path, Paths}
+
+import akka.NotUsed
+import akka.actor.ActorSystem
+import akka.stream.alpakka.file.scaladsl.LogRotatorSink
+import akka.stream.{Graph, SinkShape, UniformFanOutShape}
+import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Keep, MergeHub, Sink, Source}
+import akka.util.ByteString
+import whisk.common.TransactionId
+import whisk.core.containerpool.Container
+import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation}
+import whisk.core.entity.size._
+import spray.json._
+import spray.json.DefaultJsonProtocol._
+import java.time.Instant
+
+import whisk.http.Messages
+
+import scala.concurrent.Future
+
+/**
+ * Docker based implementation of a LogStore.
+ *
+ * Relies on docker's implementation details with regards to the JSON log-driver. When using the JSON log-driver
+ * docker writes stdout/stderr to a JSON formatted file which is read by this store. Logs are written in the
+ * activation record itself.
+ *
+ * Additionally writes logs to a separate file which can be processed by any backend service asynchronously.
+ */
+class DockerToActivationFileLogStore(system: ActorSystem, destinationDirectory: Path = Paths.get("logs"))
+    extends DockerToActivationLogStore(system) {
+
+  /**
+   * End of an event as written to a file. Closes the json-object and also appends a newline.
+   */
+  private val eventEnd = ByteString("}\n")
+
+  private def fieldsString(fields: Map[String, JsValue]) =
+    fields
+      .map {
+        case (key, value) => s""""$key":${value.compactPrint}"""
+      }
+      .mkString(",")
+
+  /**
+   * Merges all file-writing streams into one globally buffered stream.
+   *
+   * This effectively decouples the time it takes to {@code collectLogs} from the time it takes to write the augmented
+   * logging data to a file on the disk.
+   *
+   * All lines are written to a rotating sink, which will create a new file, appended with the creation timestamp,
+   * once the defined limit is reached.
+   */
+  val bufferSize = 100.MB
+  protected val writeToFile: Sink[ByteString, _] = MergeHub
+    .source[ByteString]
+    .batchWeighted(bufferSize.toBytes, _.length, identity)(_ ++ _)
+    .to(LogRotatorSink(() => {
+      val maxSize = bufferSize.toBytes
+      var bytesRead = maxSize
+      element =>
+        {
+          val size = element.size
+          if (bytesRead + size > maxSize) {
+            bytesRead = size
+            Some(destinationDirectory.resolve(s"userlogs-${Instant.now.toEpochMilli}.log"))
+          } else {
+            bytesRead += size
+            None
+          }
+        }
+    }))
+    .run()
+
+  override def collectLogs(transid: TransactionId,
+                           user: Identity,
+                           activation: WhiskActivation,
+                           container: Container,
+                           action: ExecutableWhiskAction): Future[ActivationLogs] = {
+
+    val logs = container.logs(action.limits.logs.asMegaBytes, action.exec.sentinelledLogs)(transid)
+
+    val additionalMetadata = Map(
+      "activationId" -> activation.activationId.asString.toJson,
+      "action" -> action.fullyQualifiedName(false).asString.toJson,
+      "userId" -> user.authkey.uuid.toJson)
+
+    // Manually construct JSON fields to omit parsing the whole structure
+    val metadata = ByteString("," + fieldsString(additionalMetadata))
+
+    val toSeq = Flow[ByteString].via(DockerToActivationLogStore.toFormattedString).toMat(Sink.seq[String])(Keep.right)
+    val toFile = Flow[ByteString]
+    // As each element is a JSON-object, we know we can add the manually constructed fields to it by dropping
+    // the closing "}", adding the fields and finally add "}\n" to the end again.
+      .map(_.dropRight(1) ++ metadata ++ eventEnd)
+      // As the last element of the stream, print the activation record.
+      .concat(Source.single(ByteString(activation.toJson.compactPrint + "\n")))
+      .to(writeToFile)
+
+    val combined = OwSink.combine(toSeq, toFile)(Broadcast[ByteString](_))
+
+    logs.runWith(combined)._1.flatMap { seq =>
+      val possibleErrors = Set(Messages.logFailure, Messages.truncateLogs(action.limits.logs.asMegaBytes))
+      val errored = seq.lastOption.exists(last => possibleErrors.exists(last.contains))
+      val logs = ActivationLogs(seq.toVector)
+      if (!errored) {
+        Future.successful(logs)
+      } else {
+        Future.failed(LogCollectingException(logs))
+      }
+    }
+  }
+}
+
+object DockerToActivationFileLogStoreProvider extends LogStoreProvider {
+  override def logStore(actorSystem: ActorSystem): LogStore = new DockerToActivationFileLogStore(actorSystem)
+}
+
+object OwSink {
+
+  /**
+   * Combines two sinks into one sink using the given strategy. The materialized value is a Tuple2 of the materialized
+   * values of either sink. Code basically copied from {@code Sink.combine}
+   */
+  def combine[T, U, M1, M2](first: Sink[U, M1], second: Sink[U, M2])(
+    strategy: Int ? Graph[UniformFanOutShape[T, U], NotUsed]): Sink[T, (M1, M2)] = {
+    Sink.fromGraph(GraphDSL.create(first, second)((_, _)) { implicit b => (s1, s2) =>
+      import GraphDSL.Implicits._
+      val d = b.add(strategy(2))
+
+      d ~> s1
+      d ~> s2
+
+      SinkShape(d.in)
+    })
+  }
+}
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerLogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala
similarity index 85%
rename from common/scala/src/main/scala/whisk/core/containerpool/logging/DockerLogStore.scala
rename to common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala
index 64434ad027..153aa59c67 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerLogStore.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala
@@ -25,7 +25,7 @@ import akka.stream.scaladsl.Flow
 import akka.util.ByteString
 import whisk.common.TransactionId
 import whisk.core.containerpool.Container
-import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, WhiskActivation}
+import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation}
 import spray.json._
 import whisk.http.Messages
 
@@ -42,7 +42,7 @@ protected[core] object LogLine extends DefaultJsonProtocol {
   implicit val serdes = jsonFormat3(LogLine.apply)
 }
 
-object DockerLogStore {
+object DockerToActivationLogStore {
 
   /** Transforms chunked JsObjects into formatted strings */
   val toFormattedString: Flow[ByteString, String, NotUsed] =
@@ -54,9 +54,9 @@ object DockerLogStore {
  *
  * Relies on docker's implementation details with regards to the JSON log-driver. When using the JSON log-driver
  * docker writes stdout/stderr to a JSON formatted file which is read by this store. Logs are written in the
- * activation record itself and thus stored in CouchDB.
+ * activation record itself.
  */
-class DockerLogStore(system: ActorSystem) extends LogStore {
+class DockerToActivationLogStore(system: ActorSystem) extends LogStore {
   implicit val ec: ExecutionContext = system.dispatcher
   implicit val mat: ActorMaterializer = ActorMaterializer()(system)
 
@@ -67,16 +67,17 @@ class DockerLogStore(system: ActorSystem) extends LogStore {
   override def fetchLogs(activation: WhiskActivation): Future[ActivationLogs] = Future.successful(activation.logs)
 
   override def collectLogs(transid: TransactionId,
+                           user: Identity,
+                           activation: WhiskActivation,
                            container: Container,
                            action: ExecutableWhiskAction): Future[ActivationLogs] = {
 
-    val possibleErrors = Set(Messages.logFailure, Messages.truncateLogs(action.limits.logs.asMegaBytes))
-
     container
       .logs(action.limits.logs.asMegaBytes, action.exec.sentinelledLogs)(transid)
-      .via(DockerLogStore.toFormattedString)
+      .via(DockerToActivationLogStore.toFormattedString)
       .runWith(Sink.seq)
       .flatMap { seq =>
+        val possibleErrors = Set(Messages.logFailure, Messages.truncateLogs(action.limits.logs.asMegaBytes))
         val errored = seq.lastOption.exists(last => possibleErrors.exists(last.contains))
         val logs = ActivationLogs(seq.toVector)
         if (!errored) {
@@ -88,6 +89,6 @@ class DockerLogStore(system: ActorSystem) extends LogStore {
   }
 }
 
-object DockerLogStoreProvider extends LogStoreProvider {
-  override def logStore(actorSystem: ActorSystem): LogStore = new DockerLogStore(actorSystem)
+object DockerToActivationLogStoreProvider extends LogStoreProvider {
+  override def logStore(actorSystem: ActorSystem): LogStore = new DockerToActivationLogStore(actorSystem)
 }
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogRotatorSink.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogRotatorSink.scala
new file mode 100644
index 0000000000..6c5681bb0e
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogRotatorSink.scala
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// TO BE TAKEN OUT AFTER ALPAKKA 0.15 RELEASE
+
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
+ */
+
+package akka.stream.alpakka.file.scaladsl
+
+import java.nio.file.{OpenOption, Path, StandardOpenOption}
+
+import akka.Done
+import akka.stream.ActorAttributes.SupervisionStrategy
+import akka.stream._
+import akka.stream.impl.fusing.MapAsync.{Holder, NotYetThere}
+import akka.stream.scaladsl.{FileIO, Sink, Source}
+import akka.stream.stage._
+import akka.util.ByteString
+
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.{Failure, Success}
+
+object LogRotatorSink {
+  def apply(functionGeneratorFunction: () => ByteString => Option[Path],
+            fileOpenOptions: Set[OpenOption] = Set(StandardOpenOption.APPEND, StandardOpenOption.CREATE))
+    : Sink[ByteString, Future[Done]] =
+    Sink.fromGraph(new LogRotatorSink(functionGeneratorFunction, fileOpenOptions))
+}
+
+final private[scaladsl] class LogRotatorSink(functionGeneratorFunction: () => ByteString => Option[Path],
+                                             fileOpenOptions: Set[OpenOption])
+    extends GraphStageWithMaterializedValue[SinkShape[ByteString], Future[Done]] {
+
+  val in = Inlet[ByteString]("FRotator.in")
+  override val shape = SinkShape.of(in)
+
+  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
+    val promise = Promise[Done]()
+    val logic = new GraphStageLogic(shape) {
+      val pathGeneratorFunction: ByteString => Option[Path] = functionGeneratorFunction()
+      var sourceOut: SubSourceOutlet[ByteString] = _
+      var fileSinkCompleted: Seq[Future[IOResult]] = Seq.empty
+      val decider =
+        inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
+
+      def failThisStage(ex: Throwable): Unit =
+        if (!promise.isCompleted) {
+          if (sourceOut != null) {
+            sourceOut.fail(ex)
+          }
+          cancel(in)
+          promise.failure(ex)
+        }
+
+      def generatePathOrFailPeacefully(data: ByteString): Option[Path] = {
+        var ret = Option.empty[Path]
+        try {
+          ret = pathGeneratorFunction(data)
+        } catch {
+          case ex: Throwable =>
+            failThisStage(ex)
+        }
+        ret
+      }
+
+      def fileSinkFutureCallbackHandler(future: Future[IOResult])(h: Holder[IOResult]): Unit =
+        h.elem match {
+          case Success(IOResult(_, Failure(ex))) if decider(ex) == Supervision.Stop =>
+            promise.failure(ex)
+          case Success(x) if fileSinkCompleted.size == 1 && fileSinkCompleted.head == future =>
+            promise.trySuccess(Done)
+            completeStage()
+          case x: Success[IOResult] =>
+            fileSinkCompleted = fileSinkCompleted.filter(_ != future)
+          case Failure(ex) =>
+            failThisStage(ex)
+          case _ =>
+        }
+
+      //init stage where we are waiting for the first path
+      setHandler(
+        in,
+        new InHandler {
+          override def onPush(): Unit = {
+            val data = grab(in)
+            val pathO = generatePathOrFailPeacefully(data)
+            pathO.fold(if (!isClosed(in)) pull(in))(switchPath(_, data))
+          }
+
+          override def onUpstreamFinish(): Unit =
+            completeStage()
+
+          override def onUpstreamFailure(ex: Throwable): Unit =
+            failThisStage(ex)
+        })
+
+      //we must pull the first element cos we are a sink
+      override def preStart(): Unit = {
+        super.preStart()
+        pull(in)
+      }
+
+      def futureCB(newFuture: Future[IOResult]) =
+        getAsyncCallback[Holder[IOResult]](fileSinkFutureCallbackHandler(newFuture))
+
+      //we recreate the tail of the stream, and emit the data for the next req
+      def switchPath(path: Path, data: ByteString): Unit = {
+        val prevOut = Option(sourceOut)
+
+        sourceOut = new SubSourceOutlet[ByteString]("FRotatorSource")
+        sourceOut.setHandler(new OutHandler {
+          override def onPull(): Unit = {
+            sourceOut.push(data)
+            switchToNormalMode()
+          }
+        })
+        val newFuture = Source
+          .fromGraph(sourceOut.source)
+          .runWith(FileIO.toPath(path, fileOpenOptions))(interpreter.subFusingMaterializer)
+
+        fileSinkCompleted = fileSinkCompleted :+ newFuture
+
+        val holder = new Holder[IOResult](NotYetThere, futureCB(newFuture))
+
+        newFuture.onComplete(holder)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
+
+        prevOut.foreach(_.complete())
+      }
+
+      //we change path if needed or push the grabbed data
+      def switchToNormalMode(): Unit = {
+        setHandler(
+          in,
+          new InHandler {
+            override def onPush(): Unit = {
+              val data = grab(in)
+              val pathO = generatePathOrFailPeacefully(data)
+              pathO.fold(sourceOut.push(data))(switchPath(_, data))
+            }
+
+            override def onUpstreamFinish(): Unit = {
+              implicit val executionContext: ExecutionContext =
+                akka.dispatch.ExecutionContexts.sameThreadExecutionContext
+              promise.completeWith(Future.sequence(fileSinkCompleted).map(_ => Done))
+              sourceOut.complete()
+            }
+
+            override def onUpstreamFailure(ex: Throwable): Unit =
+              failThisStage(ex)
+          })
+        sourceOut.setHandler(new OutHandler {
+          override def onPull(): Unit =
+            pull(in)
+        })
+      }
+    }
+    (logic, promise.future)
+  }
+
+}
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala
index 7df2f2cad5..335eed5d3e 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala
@@ -20,7 +20,7 @@ package whisk.core.containerpool.logging
 import akka.actor.ActorSystem
 import whisk.common.TransactionId
 import whisk.core.containerpool.Container
-import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, WhiskActivation}
+import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation}
 import whisk.spi.Spi
 
 import scala.concurrent.Future
@@ -54,11 +54,17 @@ trait LogStore {
    * record in the database.
    *
    * @param transid transaction the activation ran in
+   * @param user the user who ran the activation
+   * @param activation the activation record
    * @param container container used by the activation
    * @param action action that was activated
    * @return logs for the given activation
    */
-  def collectLogs(transid: TransactionId, container: Container, action: ExecutableWhiskAction): Future[ActivationLogs]
+  def collectLogs(transid: TransactionId,
+                  user: Identity,
+                  activation: WhiskActivation,
+                  container: Container,
+                  action: ExecutableWhiskAction): Future[ActivationLogs]
 
   /**
    * Fetch relevant logs for the given activation from the store.
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
index a626a7f164..beee65473d 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
@@ -89,13 +89,14 @@ case object ContainerRemoved
  * @param unusedTimeout time after which the container is automatically thrown away
  * @param pauseGrace time to wait for new work before pausing the container
  */
-class ContainerProxy(factory: (TransactionId, String, ImageName, Boolean, ByteSize) => Future[Container],
-                     sendActiveAck: (TransactionId, WhiskActivation, Boolean, InstanceId) => Future[Any],
-                     storeActivation: (TransactionId, WhiskActivation) => Future[Any],
-                     collectLogs: (TransactionId, Container, ExecutableWhiskAction) => Future[ActivationLogs],
-                     instance: InstanceId,
-                     unusedTimeout: FiniteDuration,
-                     pauseGrace: FiniteDuration)
+class ContainerProxy(
+  factory: (TransactionId, String, ImageName, Boolean, ByteSize) => Future[Container],
+  sendActiveAck: (TransactionId, WhiskActivation, Boolean, InstanceId) => Future[Any],
+  storeActivation: (TransactionId, WhiskActivation) => Future[Any],
+  collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs],
+  instance: InstanceId,
+  unusedTimeout: FiniteDuration,
+  pauseGrace: FiniteDuration)
     extends FSM[ContainerState, ContainerData]
     with Stash {
   implicit val ec = context.system.dispatcher
@@ -367,7 +368,7 @@ class ContainerProxy(factory: (TransactionId, String, ImageName, Boolean, ByteSi
     val activationWithLogs: Future[Either[ActivationLogReadingError, WhiskActivation]] = activation
       .flatMap { activation =>
         val start = tid.started(this, LoggingMarkers.INVOKER_COLLECT_LOGS)
-        collectLogs(tid, container, job.action)
+        collectLogs(tid, job.msg.user, activation, container, job.action)
           .andThen {
             case Success(_) => tid.finished(this, start)
             case Failure(t) => tid.failed(this, start, s"reading logs failed: $t")
@@ -394,13 +395,14 @@ class ContainerProxy(factory: (TransactionId, String, ImageName, Boolean, ByteSi
 }
 
 object ContainerProxy {
-  def props(factory: (TransactionId, String, ImageName, Boolean, ByteSize) => Future[Container],
-            ack: (TransactionId, WhiskActivation, Boolean, InstanceId) => Future[Any],
-            store: (TransactionId, WhiskActivation) => Future[Any],
-            collectLogs: (TransactionId, Container, ExecutableWhiskAction) => Future[ActivationLogs],
-            instance: InstanceId,
-            unusedTimeout: FiniteDuration = 10.minutes,
-            pauseGrace: FiniteDuration = 50.milliseconds) =
+  def props(
+    factory: (TransactionId, String, ImageName, Boolean, ByteSize) => Future[Container],
+    ack: (TransactionId, WhiskActivation, Boolean, InstanceId) => Future[Any],
+    store: (TransactionId, WhiskActivation) => Future[Any],
+    collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs],
+    instance: InstanceId,
+    unusedTimeout: FiniteDuration = 10.minutes,
+    pauseGrace: FiniteDuration = 50.milliseconds) =
     Props(new ContainerProxy(factory, ack, store, collectLogs, instance, unusedTimeout, pauseGrace))
 
   // Needs to be thread-safe as it's used by multiple proxies concurrently.
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index a835d9e555..2053e3f75c 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -63,6 +63,7 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa
   implicit val cfg = config
 
   private val logsProvider = SpiLoader.get[LogStoreProvider].logStore(actorSystem)
+  logging.info(this, s"LogStoreProvider: ${logsProvider.getClass}")
 
   /**
    * Factory used by the ContainerProxy to physically create a new container.
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
index 1519d0916a..5f9898e6b2 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
@@ -33,7 +33,7 @@ import org.junit.runner.RunWith
 import org.scalamock.scalatest.MockFactory
 import org.scalatest.BeforeAndAfterEach
 import org.scalatest.FlatSpec
-import whisk.core.containerpool.logging.{DockerLogStore, LogLine}
+import whisk.core.containerpool.logging.{DockerToActivationLogStore, LogLine}
 
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.Matchers
@@ -76,7 +76,7 @@ class DockerContainerTests
 
   /** Reads logs into memory and awaits them */
   def awaitLogs(source: Source[ByteString, Any], timeout: FiniteDuration = 500.milliseconds): Vector[String] =
-    Await.result(source.via(DockerLogStore.toFormattedString).runWith(Sink.seq[String]), timeout).toVector
+    Await.result(source.via(DockerToActivationLogStore.toFormattedString).runWith(Sink.seq[String]), timeout).toVector
 
   val containerId = ContainerId("id")
 
diff --git a/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationFileLogStoreTests.scala b/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationFileLogStoreTests.scala
new file mode 100644
index 0000000000..1f5f22cb31
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationFileLogStoreTests.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.logging.test
+
+import java.time.Instant
+
+import akka.stream.scaladsl.{Flow, Sink, Source}
+import akka.testkit.TestProbe
+import akka.util.ByteString
+import common.{StreamLogging, WskActorSystem}
+import org.scalatest.Matchers
+import spray.json._
+import whisk.common.TransactionId
+import whisk.core.containerpool.logging.{DockerToActivationFileLogStore, LogLine}
+import whisk.core.entity._
+
+/**
+ * Includes the tests for the DockerToActivationLogStore since the behavior towards the activation storage should
+ * remain exactly the same.
+ */
+class DockerToActivationFileLogStoreTests
+    extends DockerToActivationLogStoreTests
+    with Matchers
+    with WskActorSystem
+    with StreamLogging {
+
+  override def createStore() = new TestLogStoreTo(Sink.ignore)
+
+  def toLoggedEvent(line: LogLine, userId: UUID, activationId: ActivationId, actionName: FullyQualifiedEntityName) = {
+    val event = line.toJson.compactPrint
+    val concatenated =
+      s""","activationId":"${activationId.asString}","action":"${actionName.asString}","userId":"${userId.asString}""""
+
+    event.dropRight(1) ++ concatenated ++ "}\n"
+  }
+
+  behavior of "DockerCouchDbFileLogStore"
+
+  it should "read logs returned by the container,in mem and enrich + write them to the provided sink" in {
+    val logs = List(LogLine(Instant.now.toString, "stdout", "this is just a test"))
+
+    val testSource: Source[ByteString, _] = Source(logs.map(line => ByteString(line.toJson.compactPrint)))
+
+    val testActor = TestProbe()
+
+    val container = new TestContainer(testSource)
+    val store = new TestLogStoreTo(Flow[ByteString].map(_.utf8String).to(Sink.actorRef(testActor.ref, ())))
+
+    val collected = store.collectLogs(TransactionId.testing, user, activation, container, action)
+
+    await(collected) shouldBe ActivationLogs(logs.map(_.toFormattedString).toVector)
+    logs.foreach { line =>
+      testActor.expectMsg(
+        toLoggedEvent(line, user.authkey.uuid, activation.activationId, action.fullyQualifiedName(false)))
+    }
+
+    // Last message should be the full activation
+    testActor.expectMsg(activation.toJson.compactPrint + "\n")
+  }
+
+  class TestLogStoreTo(override val writeToFile: Sink[ByteString, _])
+      extends DockerToActivationFileLogStore(actorSystem)
+}
diff --git a/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerLogStoreTests.scala b/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala
similarity index 85%
rename from tests/src/test/scala/whisk/core/containerpool/logging/test/DockerLogStoreTests.scala
rename to tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala
index d6de3cc0ec..170f9cc1ad 100644
--- a/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerLogStoreTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala
@@ -19,7 +19,7 @@ package whisk.core.containerpool.logging.test
 
 import common.{StreamLogging, WskActorSystem}
 import org.scalatest.{FlatSpec, Matchers}
-import whisk.core.containerpool.logging.{DockerLogStoreProvider, LogCollectingException, LogLine}
+import whisk.core.containerpool.logging.{DockerToActivationLogStoreProvider, LogCollectingException, LogLine}
 import whisk.core.entity.ExecManifest.{ImageName, RuntimeManifest}
 import whisk.core.entity._
 import java.time.Instant
@@ -34,7 +34,7 @@ import whisk.http.Messages
 import scala.concurrent.{Await, ExecutionContext, Future}
 import scala.concurrent.duration._
 
-class DockerLogStoreTests extends FlatSpec with Matchers with WskActorSystem with StreamLogging {
+class DockerToActivationLogStoreTests extends FlatSpec with Matchers with WskActorSystem with StreamLogging {
   def await[T](future: Future[T]) = Await.result(future, 1.minute)
 
   val user = Identity(Subject(), EntityName("testSpace"), AuthKey(), Set())
@@ -47,40 +47,43 @@ class DockerLogStoreTests extends FlatSpec with Matchers with WskActorSystem wit
 
   val tid = TransactionId.testing
 
+  def createStore() = DockerToActivationLogStoreProvider.logStore(actorSystem)
+
   behavior of "DockerLogStore"
 
   it should "read logs into a sequence and parse them into the specified format" in {
-    val store = DockerLogStoreProvider.logStore(actorSystem)
+    val store = createStore()
 
     val logs = List(
       LogLine(Instant.now.toString, "stdout", "this is a log"),
       LogLine(Instant.now.toString, "stdout", "this is a log too"))
     val container = new TestContainer(Source(toByteString(logs)))
 
-    await(store.collectLogs(tid, container, action)) shouldBe ActivationLogs(logs.map(_.toFormattedString).toVector)
+    await(store.collectLogs(tid, user, activation, container, action)) shouldBe ActivationLogs(
+      logs.map(_.toFormattedString).toVector)
   }
 
   it should "report an error if the logs contain an 'official' notice of such" in {
-    val store = DockerLogStoreProvider.logStore(actorSystem)
+    val store = createStore()
 
     val logs = List(
       LogLine(Instant.now.toString, "stdout", "this is a log"),
       LogLine(Instant.now.toString, "stderr", Messages.logFailure))
     val container = new TestContainer(Source(toByteString(logs)))
 
-    val ex = the[LogCollectingException] thrownBy await(store.collectLogs(tid, container, action))
+    val ex = the[LogCollectingException] thrownBy await(store.collectLogs(tid, user, activation, container, action))
     ex.partialLogs shouldBe ActivationLogs(logs.map(_.toFormattedString).toVector)
   }
 
   it should "report an error if logs have been truncated" in {
-    val store = DockerLogStoreProvider.logStore(actorSystem)
+    val store = createStore()
 
     val logs = List(
       LogLine(Instant.now.toString, "stdout", "this is a log"),
       LogLine(Instant.now.toString, "stderr", Messages.truncateLogs(action.limits.logs.asMegaBytes)))
     val container = new TestContainer(Source(toByteString(logs)))
 
-    val ex = the[LogCollectingException] thrownBy await(store.collectLogs(tid, container, action))
+    val ex = the[LogCollectingException] thrownBy await(store.collectLogs(tid, user, activation, container, action))
     ex.partialLogs shouldBe ActivationLogs(logs.map(_.toFormattedString).toVector)
   }
 
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index 8da8aeac11..6182d62e51 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -151,8 +151,13 @@ class ContainerProxyTests
   }
 
   def createCollector(response: Future[ActivationLogs] = Future.successful(ActivationLogs(Vector.empty))) =
-    LoggedFunction { (transid: TransactionId, container: Container, action: ExecutableWhiskAction) =>
-      response
+    LoggedFunction {
+      (transid: TransactionId,
+       user: Identity,
+       activation: WhiskActivation,
+       container: Container,
+       action: ExecutableWhiskAction) =>
+        response
     }
 
   def createStore = LoggedFunction { (transid: TransactionId, activation: WhiskActivation) =>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services