You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/02/23 19:42:52 UTC

[GitHub] markusthoemmes closed pull request #3330: Update Log Store Fetch

markusthoemmes closed pull request #3330: Update Log Store Fetch
URL: https://github.com/apache/incubator-openwhisk/pull/3330
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
index f9ec413c38..4be36a730a 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
@@ -18,6 +18,7 @@
 package whisk.core.containerpool.logging
 
 import java.nio.file.{Path, Paths}
+import java.time.Instant
 
 import akka.NotUsed
 import akka.actor.ActorSystem
@@ -25,15 +26,15 @@ import akka.stream.alpakka.file.scaladsl.LogRotatorSink
 import akka.stream.{Graph, SinkShape, UniformFanOutShape}
 import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Keep, MergeHub, Sink, Source}
 import akka.util.ByteString
+
 import whisk.common.TransactionId
 import whisk.core.containerpool.Container
 import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation}
 import whisk.core.entity.size._
+import whisk.http.Messages
+
 import spray.json._
 import spray.json.DefaultJsonProtocol._
-import java.time.Instant
-
-import whisk.http.Messages
 
 import scala.concurrent.Future
 
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala
index 153aa59c67..b4e3983af4 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala
@@ -19,18 +19,21 @@ package whisk.core.containerpool.logging
 
 import akka.NotUsed
 import akka.actor.ActorSystem
+import akka.http.scaladsl.model.HttpRequest
 import akka.stream.ActorMaterializer
 import akka.stream.scaladsl.Sink
 import akka.stream.scaladsl.Flow
 import akka.util.ByteString
+
 import whisk.common.TransactionId
 import whisk.core.containerpool.Container
 import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation}
-import spray.json._
 import whisk.http.Messages
 
 import scala.concurrent.{ExecutionContext, Future}
 
+import spray.json._
+
 /**
  * Represents a single log line as read from a docker log
  */
@@ -64,7 +67,8 @@ class DockerToActivationLogStore(system: ActorSystem) extends LogStore {
   override val containerParameters = Map("--log-driver" -> Set("json-file"))
 
   /* As logs are already part of the activation record, just return that bit of it */
-  override def fetchLogs(activation: WhiskActivation): Future[ActivationLogs] = Future.successful(activation.logs)
+  override def fetchLogs(user: Identity, activation: WhiskActivation, request: HttpRequest): Future[ActivationLogs] =
+    Future.successful(activation.logs)
 
   override def collectLogs(transid: TransactionId,
                            user: Identity,
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala
index 465fb2532f..ea1576cc12 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala
@@ -18,10 +18,12 @@
 package whisk.core.containerpool.logging
 
 import akka.actor.ActorSystem
+import akka.http.scaladsl.model.HttpRequest
+
 import whisk.core.entity.Identity
 import whisk.common.TransactionId
 import whisk.core.containerpool.Container
-import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, WhiskActivation}
+import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation}
 
 import scala.concurrent.Future
 
@@ -47,7 +49,7 @@ class LogDriverLogStore(actorSystem: ActorSystem) extends LogStore {
 
   /** no logs exposed to API/CLI using only the LogDriverLogStore; use an extended version,
    * e.g. the SplunkLogStore to expose logs from some external source */
-  def fetchLogs(activation: WhiskActivation): Future[ActivationLogs] =
+  def fetchLogs(user: Identity, activation: WhiskActivation, request: HttpRequest): Future[ActivationLogs] =
     Future.successful(ActivationLogs(Vector("Logs are not available.")))
 }
 
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala
index 335eed5d3e..28c5b9e93f 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala
@@ -18,6 +18,8 @@
 package whisk.core.containerpool.logging
 
 import akka.actor.ActorSystem
+import akka.http.scaladsl.model.HttpRequest
+
 import whisk.common.TransactionId
 import whisk.core.containerpool.Container
 import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation}
@@ -74,7 +76,7 @@ trait LogStore {
    * @param activation activation to fetch the logs for
    * @return the relevant logs
    */
-  def fetchLogs(activation: WhiskActivation): Future[ActivationLogs]
+  def fetchLogs(user: Identity, activation: WhiskActivation, request: HttpRequest): Future[ActivationLogs]
 }
 
 trait LogStoreProvider extends Spi {
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala
index 596b776131..694fc9a3f1 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala
@@ -36,18 +36,24 @@ import akka.stream.scaladsl.Flow
 import akka.stream.scaladsl.Keep
 import akka.stream.scaladsl.Sink
 import akka.stream.scaladsl.Source
+
 import com.typesafe.sslconfig.akka.AkkaSSLConfig
+
 import pureconfig._
+
 import scala.concurrent.Future
 import scala.concurrent.Promise
 import scala.util.Failure
 import scala.util.Success
 import scala.util.Try
+
 import spray.json._
+
 import whisk.common.AkkaLogging
 import whisk.core.ConfigKeys
 import whisk.core.entity.ActivationLogs
 import whisk.core.entity.WhiskActivation
+import whisk.core.entity.Identity
 
 case class SplunkLogStoreConfig(host: String,
                                 port: Int,
@@ -92,7 +98,7 @@ class SplunkLogStore(
         Http().createClientHttpsContext(AkkaSSLConfig().mapSettings(s => s.withLoose(s.loose.withDisableSNI(true))))
       else Http().defaultClientHttpsContext)
 
-  override def fetchLogs(activation: WhiskActivation): Future[ActivationLogs] = {
+  override def fetchLogs(user: Identity, activation: WhiskActivation, request: HttpRequest): Future[ActivationLogs] = {
 
     //example curl request:
     //    curl -u  username:password -k https://splunkhost:port/services/search/jobs -d exec_mode=oneshot -d output_mode=json -d "search=search index=\"someindex\" | spath=activation_id | search activation_id=a930e5ae4ad4455c8f2505d665aad282 |  table log_message" -d "earliest_time=2017-08-29T12:00:00" -d "latest_time=2017-10-29T12:00:00"
diff --git a/core/controller/src/main/scala/whisk/core/controller/Activations.scala b/core/controller/src/main/scala/whisk/core/controller/Activations.scala
index f7ea7b827f..6cc4ef73c4 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Activations.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Activations.scala
@@ -26,6 +26,7 @@ import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport.sprayJsonMarsha
 import akka.http.scaladsl.model.StatusCodes.BadRequest
 import akka.http.scaladsl.server.Directives
 import akka.http.scaladsl.unmarshalling._
+
 import spray.json._
 import spray.json.DefaultJsonProtocol.RootJsObjectFormat
 import whisk.common.TransactionId
@@ -120,7 +121,7 @@ trait WhiskActivationsApi extends Directives with AuthenticatedRouteProvider wit
     resource.entity match {
       case Some(ActivationId(id)) =>
         op match {
-          case READ => fetch(resource.namespace, id)
+          case READ => fetch(user, resource.namespace, id)
           case _    => reject // should not get here
         }
       case None =>
@@ -201,7 +202,8 @@ trait WhiskActivationsApi extends Directives with AuthenticatedRouteProvider wit
    * - 404 Not Found
    * - 500 Internal Server Error
    */
-  private def fetch(namespace: EntityPath, activationId: ActivationId)(implicit transid: TransactionId) = {
+  private def fetch(user: Identity, namespace: EntityPath, activationId: ActivationId)(
+    implicit transid: TransactionId) = {
     val docid = DocId(WhiskEntity.qualifiedName(namespace, activationId))
     pathEndOrSingleSlash {
       getEntity(
@@ -211,7 +213,7 @@ trait WhiskActivationsApi extends Directives with AuthenticatedRouteProvider wit
         postProcess = Some((activation: WhiskActivation) => complete(activation.toExtendedJson)))
 
     } ~ (pathPrefix(resultPath) & pathEnd) { fetchResponse(docid) } ~
-      (pathPrefix(logsPath) & pathEnd) { fetchLogs(docid) }
+      (pathPrefix(logsPath) & pathEnd) { fetchLogs(user, docid) }
   }
 
   /**
@@ -238,11 +240,13 @@ trait WhiskActivationsApi extends Directives with AuthenticatedRouteProvider wit
    * - 404 Not Found
    * - 500 Internal Server Error
    */
-  private def fetchLogs(docid: DocId)(implicit transid: TransactionId) = {
-    getEntityAndProject(
-      WhiskActivation,
-      activationStore,
-      docid,
-      (activation: WhiskActivation) => logStore.fetchLogs(activation).map(_.toJsonObject))
+  private def fetchLogs(user: Identity, docid: DocId)(implicit transid: TransactionId) = {
+    extractRequest { request =>
+      getEntityAndProject(
+        WhiskActivation,
+        activationStore,
+        docid,
+        (activation: WhiskActivation) => logStore.fetchLogs(user, activation, request).map(_.toJsonObject))
+    }
   }
 }
diff --git a/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala b/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala
index f25a49c06c..5c125795d9 100644
--- a/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala
@@ -26,30 +26,39 @@ import akka.http.scaladsl.model.HttpEntity
 import akka.http.scaladsl.model.HttpRequest
 import akka.http.scaladsl.model.HttpResponse
 import akka.http.scaladsl.model.StatusCodes
+import akka.http.scaladsl.model.HttpMethods.POST
+import akka.http.scaladsl.model.headers.RawHeader
+import akka.http.scaladsl.model.MediaTypes
 import akka.http.scaladsl.unmarshalling.Unmarshal
 import akka.stream.ActorMaterializer
 import akka.stream.StreamTcpException
 import akka.stream.scaladsl.Flow
 import akka.testkit.TestKit
+
 import common.StreamLogging
+
 import java.time.ZonedDateTime
+
+import pureconfig.error.ConfigReaderException
+
 import org.junit.runner.RunWith
 import org.scalatest.Matchers
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
 import org.scalatest.concurrent.ScalaFutures
 import org.scalatest.junit.JUnitRunner
-import scala.util.Failure
-import whisk.core.entity.ActivationLogs
 import org.scalatest.FlatSpecLike
-import pureconfig.error.ConfigReaderException
+
 import scala.concurrent.Await
 import scala.concurrent.Promise
 import scala.concurrent.duration._
 import scala.util.Success
 import scala.util.Try
+import scala.util.Failure
+
 import spray.json.JsNumber
 import spray.json.JsObject
 import spray.json._
+
 import whisk.core.entity.ActionLimits
 import whisk.core.entity.ActivationId
 import whisk.core.entity.ActivationResponse
@@ -62,6 +71,9 @@ import whisk.core.entity.Subject
 import whisk.core.entity.TimeLimit
 import whisk.core.entity.WhiskActivation
 import whisk.core.entity.size._
+import whisk.core.entity.AuthKey
+import whisk.core.entity.Identity
+import whisk.core.entity.ActivationLogs
 
 @RunWith(classOf[JUnitRunner])
 class SplunkLogStoreTests
@@ -85,6 +97,12 @@ class SplunkLogStoreTests
   val startTime = "2007-12-03T10:15:30Z"
   val endTime = "2007-12-03T10:15:45Z"
   val endTimePlus5 = "2007-12-03T10:15:50Z" //queried end time range is endTime+5
+  val user = Identity(Subject(), EntityName("testSpace"), AuthKey(), Set())
+  val request = HttpRequest(
+    method = POST,
+    uri = "https://some.url",
+    headers = List(RawHeader("key", "value")),
+    entity = HttpEntity(MediaTypes.`application/json`, JsObject().compactPrint))
 
   val activation = WhiskActivation(
     namespace = EntityPath("ns"),
@@ -155,14 +173,14 @@ class SplunkLogStoreTests
   it should "find logs based on activation timestamps" in {
     //use the a flow that asserts the request structure and provides a response in the expected format
     val splunkStore = new SplunkLogStore(system, Some(testFlow), testConfig)
-    val result = Await.result(splunkStore.fetchLogs(activation), 1.second)
+    val result = Await.result(splunkStore.fetchLogs(user, activation, request), 1.second)
     result shouldBe ActivationLogs(Vector("some log message", "some other log message"))
   }
 
   it should "fail to connect to bogus host" in {
     //use the default http flow with the default bogus-host config
     val splunkStore = new SplunkLogStore(system, splunkConfig = testConfig)
-    val result = splunkStore.fetchLogs(activation)
+    val result = splunkStore.fetchLogs(user, activation, request)
     whenReady(result.failed, Timeout(1.second)) { ex =>
       ex shouldBe an[StreamTcpException]
     }
@@ -170,7 +188,7 @@ class SplunkLogStoreTests
   it should "display an error if API cannot be reached" in {
     //use a flow that generates a 500 response
     val splunkStore = new SplunkLogStore(system, Some(failFlow), testConfig)
-    val result = splunkStore.fetchLogs(activation)
+    val result = splunkStore.fetchLogs(user, activation, request)
     whenReady(result.failed, Timeout(1.second)) { ex =>
       ex shouldBe an[RuntimeException]
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services