You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/02/23 19:42:52 UTC
[GitHub] markusthoemmes closed pull request #3330: Update Log Store Fetch
markusthoemmes closed pull request #3330: Update Log Store Fetch
URL: https://github.com/apache/incubator-openwhisk/pull/3330
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
index f9ec413c38..4be36a730a 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
@@ -18,6 +18,7 @@
package whisk.core.containerpool.logging
import java.nio.file.{Path, Paths}
+import java.time.Instant
import akka.NotUsed
import akka.actor.ActorSystem
@@ -25,15 +26,15 @@ import akka.stream.alpakka.file.scaladsl.LogRotatorSink
import akka.stream.{Graph, SinkShape, UniformFanOutShape}
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Keep, MergeHub, Sink, Source}
import akka.util.ByteString
+
import whisk.common.TransactionId
import whisk.core.containerpool.Container
import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation}
import whisk.core.entity.size._
+import whisk.http.Messages
+
import spray.json._
import spray.json.DefaultJsonProtocol._
-import java.time.Instant
-
-import whisk.http.Messages
import scala.concurrent.Future
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala
index 153aa59c67..b4e3983af4 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala
@@ -19,18 +19,21 @@ package whisk.core.containerpool.logging
import akka.NotUsed
import akka.actor.ActorSystem
+import akka.http.scaladsl.model.HttpRequest
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Flow
import akka.util.ByteString
+
import whisk.common.TransactionId
import whisk.core.containerpool.Container
import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation}
-import spray.json._
import whisk.http.Messages
import scala.concurrent.{ExecutionContext, Future}
+import spray.json._
+
/**
* Represents a single log line as read from a docker log
*/
@@ -64,7 +67,8 @@ class DockerToActivationLogStore(system: ActorSystem) extends LogStore {
override val containerParameters = Map("--log-driver" -> Set("json-file"))
/* As logs are already part of the activation record, just return that bit of it */
- override def fetchLogs(activation: WhiskActivation): Future[ActivationLogs] = Future.successful(activation.logs)
+ override def fetchLogs(user: Identity, activation: WhiskActivation, request: HttpRequest): Future[ActivationLogs] =
+ Future.successful(activation.logs)
override def collectLogs(transid: TransactionId,
user: Identity,
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala
index 465fb2532f..ea1576cc12 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala
@@ -18,10 +18,12 @@
package whisk.core.containerpool.logging
import akka.actor.ActorSystem
+import akka.http.scaladsl.model.HttpRequest
+
import whisk.core.entity.Identity
import whisk.common.TransactionId
import whisk.core.containerpool.Container
-import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, WhiskActivation}
+import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation}
import scala.concurrent.Future
@@ -47,7 +49,7 @@ class LogDriverLogStore(actorSystem: ActorSystem) extends LogStore {
/** no logs exposed to API/CLI using only the LogDriverLogStore; use an extended version,
* e.g. the SplunkLogStore to expose logs from some external source */
- def fetchLogs(activation: WhiskActivation): Future[ActivationLogs] =
+ def fetchLogs(user: Identity, activation: WhiskActivation, request: HttpRequest): Future[ActivationLogs] =
Future.successful(ActivationLogs(Vector("Logs are not available.")))
}
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala
index 335eed5d3e..28c5b9e93f 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala
@@ -18,6 +18,8 @@
package whisk.core.containerpool.logging
import akka.actor.ActorSystem
+import akka.http.scaladsl.model.HttpRequest
+
import whisk.common.TransactionId
import whisk.core.containerpool.Container
import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation}
@@ -74,7 +76,7 @@ trait LogStore {
* @param activation activation to fetch the logs for
* @return the relevant logs
*/
- def fetchLogs(activation: WhiskActivation): Future[ActivationLogs]
+ def fetchLogs(user: Identity, activation: WhiskActivation, request: HttpRequest): Future[ActivationLogs]
}
trait LogStoreProvider extends Spi {
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala
index 596b776131..694fc9a3f1 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala
@@ -36,18 +36,24 @@ import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
+
import com.typesafe.sslconfig.akka.AkkaSSLConfig
+
import pureconfig._
+
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.util.Failure
import scala.util.Success
import scala.util.Try
+
import spray.json._
+
import whisk.common.AkkaLogging
import whisk.core.ConfigKeys
import whisk.core.entity.ActivationLogs
import whisk.core.entity.WhiskActivation
+import whisk.core.entity.Identity
case class SplunkLogStoreConfig(host: String,
port: Int,
@@ -92,7 +98,7 @@ class SplunkLogStore(
Http().createClientHttpsContext(AkkaSSLConfig().mapSettings(s => s.withLoose(s.loose.withDisableSNI(true))))
else Http().defaultClientHttpsContext)
- override def fetchLogs(activation: WhiskActivation): Future[ActivationLogs] = {
+ override def fetchLogs(user: Identity, activation: WhiskActivation, request: HttpRequest): Future[ActivationLogs] = {
//example curl request:
// curl -u username:password -k https://splunkhost:port/services/search/jobs -d exec_mode=oneshot -d output_mode=json -d "search=search index=\"someindex\" | spath=activation_id | search activation_id=a930e5ae4ad4455c8f2505d665aad282 | table log_message" -d "earliest_time=2017-08-29T12:00:00" -d "latest_time=2017-10-29T12:00:00"
diff --git a/core/controller/src/main/scala/whisk/core/controller/Activations.scala b/core/controller/src/main/scala/whisk/core/controller/Activations.scala
index f7ea7b827f..6cc4ef73c4 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Activations.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Activations.scala
@@ -26,6 +26,7 @@ import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport.sprayJsonMarsha
import akka.http.scaladsl.model.StatusCodes.BadRequest
import akka.http.scaladsl.server.Directives
import akka.http.scaladsl.unmarshalling._
+
import spray.json._
import spray.json.DefaultJsonProtocol.RootJsObjectFormat
import whisk.common.TransactionId
@@ -120,7 +121,7 @@ trait WhiskActivationsApi extends Directives with AuthenticatedRouteProvider wit
resource.entity match {
case Some(ActivationId(id)) =>
op match {
- case READ => fetch(resource.namespace, id)
+ case READ => fetch(user, resource.namespace, id)
case _ => reject // should not get here
}
case None =>
@@ -201,7 +202,8 @@ trait WhiskActivationsApi extends Directives with AuthenticatedRouteProvider wit
* - 404 Not Found
* - 500 Internal Server Error
*/
- private def fetch(namespace: EntityPath, activationId: ActivationId)(implicit transid: TransactionId) = {
+ private def fetch(user: Identity, namespace: EntityPath, activationId: ActivationId)(
+ implicit transid: TransactionId) = {
val docid = DocId(WhiskEntity.qualifiedName(namespace, activationId))
pathEndOrSingleSlash {
getEntity(
@@ -211,7 +213,7 @@ trait WhiskActivationsApi extends Directives with AuthenticatedRouteProvider wit
postProcess = Some((activation: WhiskActivation) => complete(activation.toExtendedJson)))
} ~ (pathPrefix(resultPath) & pathEnd) { fetchResponse(docid) } ~
- (pathPrefix(logsPath) & pathEnd) { fetchLogs(docid) }
+ (pathPrefix(logsPath) & pathEnd) { fetchLogs(user, docid) }
}
/**
@@ -238,11 +240,13 @@ trait WhiskActivationsApi extends Directives with AuthenticatedRouteProvider wit
* - 404 Not Found
* - 500 Internal Server Error
*/
- private def fetchLogs(docid: DocId)(implicit transid: TransactionId) = {
- getEntityAndProject(
- WhiskActivation,
- activationStore,
- docid,
- (activation: WhiskActivation) => logStore.fetchLogs(activation).map(_.toJsonObject))
+ private def fetchLogs(user: Identity, docid: DocId)(implicit transid: TransactionId) = {
+ extractRequest { request =>
+ getEntityAndProject(
+ WhiskActivation,
+ activationStore,
+ docid,
+ (activation: WhiskActivation) => logStore.fetchLogs(user, activation, request).map(_.toJsonObject))
+ }
}
}
diff --git a/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala b/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala
index f25a49c06c..5c125795d9 100644
--- a/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala
@@ -26,30 +26,39 @@ import akka.http.scaladsl.model.HttpEntity
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.HttpResponse
import akka.http.scaladsl.model.StatusCodes
+import akka.http.scaladsl.model.HttpMethods.POST
+import akka.http.scaladsl.model.headers.RawHeader
+import akka.http.scaladsl.model.MediaTypes
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
import akka.stream.StreamTcpException
import akka.stream.scaladsl.Flow
import akka.testkit.TestKit
+
import common.StreamLogging
+
import java.time.ZonedDateTime
+
+import pureconfig.error.ConfigReaderException
+
import org.junit.runner.RunWith
import org.scalatest.Matchers
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.junit.JUnitRunner
-import scala.util.Failure
-import whisk.core.entity.ActivationLogs
import org.scalatest.FlatSpecLike
-import pureconfig.error.ConfigReaderException
+
import scala.concurrent.Await
import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.util.Success
import scala.util.Try
+import scala.util.Failure
+
import spray.json.JsNumber
import spray.json.JsObject
import spray.json._
+
import whisk.core.entity.ActionLimits
import whisk.core.entity.ActivationId
import whisk.core.entity.ActivationResponse
@@ -62,6 +71,9 @@ import whisk.core.entity.Subject
import whisk.core.entity.TimeLimit
import whisk.core.entity.WhiskActivation
import whisk.core.entity.size._
+import whisk.core.entity.AuthKey
+import whisk.core.entity.Identity
+import whisk.core.entity.ActivationLogs
@RunWith(classOf[JUnitRunner])
class SplunkLogStoreTests
@@ -85,6 +97,12 @@ class SplunkLogStoreTests
val startTime = "2007-12-03T10:15:30Z"
val endTime = "2007-12-03T10:15:45Z"
val endTimePlus5 = "2007-12-03T10:15:50Z" //queried end time range is endTime+5
+ val user = Identity(Subject(), EntityName("testSpace"), AuthKey(), Set())
+ val request = HttpRequest(
+ method = POST,
+ uri = "https://some.url",
+ headers = List(RawHeader("key", "value")),
+ entity = HttpEntity(MediaTypes.`application/json`, JsObject().compactPrint))
val activation = WhiskActivation(
namespace = EntityPath("ns"),
@@ -155,14 +173,14 @@ class SplunkLogStoreTests
it should "find logs based on activation timestamps" in {
//use the a flow that asserts the request structure and provides a response in the expected format
val splunkStore = new SplunkLogStore(system, Some(testFlow), testConfig)
- val result = Await.result(splunkStore.fetchLogs(activation), 1.second)
+ val result = Await.result(splunkStore.fetchLogs(user, activation, request), 1.second)
result shouldBe ActivationLogs(Vector("some log message", "some other log message"))
}
it should "fail to connect to bogus host" in {
//use the default http flow with the default bogus-host config
val splunkStore = new SplunkLogStore(system, splunkConfig = testConfig)
- val result = splunkStore.fetchLogs(activation)
+ val result = splunkStore.fetchLogs(user, activation, request)
whenReady(result.failed, Timeout(1.second)) { ex =>
ex shouldBe an[StreamTcpException]
}
@@ -170,7 +188,7 @@ class SplunkLogStoreTests
it should "display an error if API cannot be reached" in {
//use a flow that generates a 500 response
val splunkStore = new SplunkLogStore(system, Some(failFlow), testConfig)
- val result = splunkStore.fetchLogs(activation)
+ val result = splunkStore.fetchLogs(user, activation, request)
whenReady(result.failed, Timeout(1.second)) { ex =>
ex shouldBe an[RuntimeException]
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services