You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2018/02/13 18:57:31 UTC
[incubator-openwhisk] branch master updated: Adding implementation
of a Splunk-backed LogStore (#2957)
This is an automated email from the ASF dual-hosted git repository.
markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 6f1d1a1 Adding implementation of a Splunk-backed LogStore (#2957)
6f1d1a1 is described below
commit 6f1d1a18bd53c288a1bad9de740f664ea595b4e5
Author: tysonnorris <ty...@gmail.com>
AuthorDate: Tue Feb 13 10:57:28 2018 -0800
Adding implementation of a Splunk-backed LogStore (#2957)
Co-authored-by: mcdan <me...@danmcweeney.com>
---
.../src/main/scala/whisk/core/WhiskConfig.scala | 3 +
.../containerpool/logging/LogDriverLogStore.scala | 56 +++++++
.../containerpool/logging/SplunkLogStore.scala | 160 +++++++++++++++++++
.../logging/LogDriverLogStoreTests.scala | 38 +++++
.../logging/SplunkLogStoreTests.scala | 177 +++++++++++++++++++++
5 files changed, 434 insertions(+)
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 5d3fada..c538d6e 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -258,4 +258,7 @@ object ConfigKeys {
val transactions = "whisk.transactions"
val stride = s"$transactions.stride"
+
+ val logStore = "whisk.logstore"
+ val splunk = s"$logStore.splunk"
}
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala
new file mode 100644
index 0000000..465fb25
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.logging
+
+import akka.actor.ActorSystem
+import whisk.core.entity.Identity
+import whisk.common.TransactionId
+import whisk.core.containerpool.Container
+import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, WhiskActivation}
+
+import scala.concurrent.Future
+
+/**
+ * Docker log driver based LogStore impl. Uses docker log driver to emit container logs to an external store.
+ * Fetching logs from that external store is not provided in this trait. This SPI requires the
+ * ContainerArgs.extraArgs to be used to indicate where the logs are shipped.
+ * see https://docs.docker.com/config/containers/logging/configure/#configure-the-logging-driver-for-a-container
+ *
+ * Fetching logs here is a NOOP, but extended versions can customize fetching, e.g. from ELK or Splunk etc.
+ */
+class LogDriverLogStore(actorSystem: ActorSystem) extends LogStore {
+
+ /** Indicate --log-driver and --log-opt flags via ContainerArgsConfig.extraArgs */
+ override def containerParameters = Map()
+
+ def collectLogs(transid: TransactionId,
+ user: Identity,
+ activation: WhiskActivation,
+ container: Container,
+ action: ExecutableWhiskAction): Future[ActivationLogs] =
+ Future.successful(ActivationLogs()) //no logs collected when using docker log drivers (see DockerLogStore for json-file exception)
+
+ /** no logs exposed to API/CLI using only the LogDriverLogStore; use an extended version,
+ * e.g. the SplunkLogStore to expose logs from some external source */
+ def fetchLogs(activation: WhiskActivation): Future[ActivationLogs] =
+ Future.successful(ActivationLogs(Vector("Logs are not available.")))
+}
+
+object LogDriverLogStoreProvider extends LogStoreProvider {
+ override def logStore(actorSystem: ActorSystem) = new LogDriverLogStore(actorSystem)
+}
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala
new file mode 100644
index 0000000..596b776
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.logging
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.Http
+import akka.http.scaladsl.client.RequestBuilding.Post
+import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
+import akka.http.scaladsl.model.FormData
+import akka.http.scaladsl.model.HttpRequest
+import akka.http.scaladsl.model.HttpResponse
+import akka.http.scaladsl.model.Uri
+import akka.http.scaladsl.model.Uri.Path
+import akka.http.scaladsl.model.headers.Authorization
+import akka.http.scaladsl.model.headers.BasicHttpCredentials
+import akka.http.scaladsl.unmarshalling.Unmarshal
+import akka.stream.ActorMaterializer
+import akka.stream.OverflowStrategy
+import akka.stream.QueueOfferResult
+import akka.stream.scaladsl.Flow
+import akka.stream.scaladsl.Keep
+import akka.stream.scaladsl.Sink
+import akka.stream.scaladsl.Source
+import com.typesafe.sslconfig.akka.AkkaSSLConfig
+import pureconfig._
+import scala.concurrent.Future
+import scala.concurrent.Promise
+import scala.util.Failure
+import scala.util.Success
+import scala.util.Try
+import spray.json._
+import whisk.common.AkkaLogging
+import whisk.core.ConfigKeys
+import whisk.core.entity.ActivationLogs
+import whisk.core.entity.WhiskActivation
+
+case class SplunkLogStoreConfig(host: String,
+ port: Int,
+ username: String,
+ password: String,
+ index: String,
+ logMessageField: String,
+ activationIdField: String,
+ disableSNI: Boolean)
+case class SplunkResponse(results: Vector[JsObject])
+object SplunkResponseJsonProtocol extends DefaultJsonProtocol {
+ implicit val orderFormat = jsonFormat1(SplunkResponse)
+}
+
+/**
+ * A Splunk based impl of LogDriverLogStore. Logs are routed to splunk via docker log driver, and retrieved via Splunk REST API
+ *
+ * @param actorSystem
+ * @param httpFlow Optional Flow to use for HttpRequest handling (to enable stream based tests)
+ */
+class SplunkLogStore(
+ actorSystem: ActorSystem,
+ httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None,
+ splunkConfig: SplunkLogStoreConfig = loadConfigOrThrow[SplunkLogStoreConfig](ConfigKeys.splunk))
+ extends LogDriverLogStore(actorSystem) {
+ implicit val as = actorSystem
+ implicit val ec = as.dispatcher
+ implicit val materializer = ActorMaterializer()
+ private val logging = new AkkaLogging(actorSystem.log)
+
+ private val splunkApi = Path / "services" / "search" / "jobs" //see http://docs.splunk.com/Documentation/Splunk/6.6.3/RESTREF/RESTsearch#search.2Fjobs
+
+ import SplunkResponseJsonProtocol._
+
+ val maxPendingRequests = 500
+
+ val defaultHttpFlow = Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](
+ host = splunkConfig.host,
+ port = splunkConfig.port,
+ connectionContext =
+ if (splunkConfig.disableSNI)
+ Http().createClientHttpsContext(AkkaSSLConfig().mapSettings(s => s.withLoose(s.loose.withDisableSNI(true))))
+ else Http().defaultClientHttpsContext)
+
+ override def fetchLogs(activation: WhiskActivation): Future[ActivationLogs] = {
+
+ //example curl request:
+ // curl -u username:password -k https://splunkhost:port/services/search/jobs -d exec_mode=oneshot -d output_mode=json -d "search=search index=\"someindex\" | spath=activation_id | search activation_id=a930e5ae4ad4455c8f2505d665aad282 | table log_message" -d "earliest_time=2017-08-29T12:00:00" -d "latest_time=2017-10-29T12:00:00"
+ //example response:
+ // {"preview":false,"init_offset":0,"messages":[],"fields":[{"name":"log_message"}],"results":[{"log_message":"some log message"}], "highlighted":{}}
+ //note: splunk returns results in reverse-chronological order, therefore we include "| reverse" to cause results to arrive in chronological order
+ val search =
+ s"""search index="${splunkConfig.index}"| spath ${splunkConfig.activationIdField}| search ${splunkConfig.activationIdField}=${activation.activationId.toString}| table ${splunkConfig.logMessageField}| reverse"""
+
+ val entity = FormData(
+ Map(
+ "exec_mode" -> "oneshot",
+ "search" -> search,
+ "output_mode" -> "json",
+ "earliest_time" -> activation.start.toString, //assume that activation start/end are UTC zone, and splunk events are the same
+ "latest_time" -> activation.end
+ .plusSeconds(5) //add 5s to avoid a timerange of 0 on short-lived activations
+ .toString)).toEntity
+
+ logging.debug(this, "sending request")
+ queueRequest(
+ Post(Uri(path = splunkApi))
+ .withEntity(entity)
+ .withHeaders(List(Authorization(BasicHttpCredentials(splunkConfig.username, splunkConfig.password)))))
+ .flatMap(response => {
+ logging.debug(this, s"splunk API response ${response}")
+ Unmarshal(response.entity)
+ .to[SplunkResponse]
+ .map(r => {
+ ActivationLogs(
+ r.results
+ .map(_.fields(splunkConfig.logMessageField).convertTo[String]))
+ })
+ })
+ }
+
+ //based on http://doc.akka.io/docs/akka-http/10.0.6/scala/http/client-side/host-level.html
+ val queue =
+ Source
+ .queue[(HttpRequest, Promise[HttpResponse])](maxPendingRequests, OverflowStrategy.dropNew)
+ .via(httpFlow.getOrElse(defaultHttpFlow))
+ .toMat(Sink.foreach({
+ case ((Success(resp), p)) => p.success(resp)
+ case ((Failure(e), p)) => p.failure(e)
+ }))(Keep.left)
+ .run()
+
+ def queueRequest(request: HttpRequest): Future[HttpResponse] = {
+ val responsePromise = Promise[HttpResponse]()
+ queue.offer(request -> responsePromise).flatMap {
+ case QueueOfferResult.Enqueued => responsePromise.future
+ case QueueOfferResult.Dropped =>
+ Future.failed(new RuntimeException("Splunk API Client Queue overflowed. Try again later."))
+ case QueueOfferResult.Failure(ex) => Future.failed(ex)
+ case QueueOfferResult.QueueClosed =>
+ Future.failed(
+ new RuntimeException(
+ "Splunk API Client Queue was closed (pool shut down) while running the request. Try again later."))
+ }
+ }
+}
+
+object SplunkLogStoreProvider extends LogStoreProvider {
+ override def logStore(actorSystem: ActorSystem) = new SplunkLogStore(actorSystem)
+}
diff --git a/tests/src/test/scala/whisk/core/containerpool/logging/LogDriverLogStoreTests.scala b/tests/src/test/scala/whisk/core/containerpool/logging/LogDriverLogStoreTests.scala
new file mode 100644
index 0000000..b87e423
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/containerpool/logging/LogDriverLogStoreTests.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.logging
+
+import akka.actor.ActorSystem
+import akka.testkit.TestKit
+import org.scalatest.FlatSpecLike
+import org.scalatest.Matchers
+import whisk.core.containerpool.ContainerArgsConfig
+
+class LogDriverLogStoreTests extends TestKit(ActorSystem("LogDriverLogStore")) with FlatSpecLike with Matchers {
+
+ val testConfig = ContainerArgsConfig(
+ network = "network",
+ extraArgs =
+ Map("log-driver" -> Set("fluentd"), "log-opt" -> Set("fluentd-address=localhost:24225", "tag=OW_CONTAINER")))
+ behavior of "LogDriver LogStore"
+
+ it should "set the container parameters from the config" in {
+ val logDriverLogStore = new LogDriverLogStore(system)
+ logDriverLogStore.containerParameters shouldBe Map()
+ }
+}
diff --git a/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala b/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala
new file mode 100644
index 0000000..08faa47
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.logging
+
+import akka.NotUsed
+import akka.actor.ActorSystem
+import akka.http.javadsl.model.headers.Authorization
+import akka.http.scaladsl.model.ContentTypes
+import akka.http.scaladsl.model.FormData
+import akka.http.scaladsl.model.HttpEntity
+import akka.http.scaladsl.model.HttpRequest
+import akka.http.scaladsl.model.HttpResponse
+import akka.http.scaladsl.model.StatusCodes
+import akka.http.scaladsl.unmarshalling.Unmarshal
+import akka.stream.ActorMaterializer
+import akka.stream.StreamTcpException
+import akka.stream.scaladsl.Flow
+import akka.testkit.TestKit
+import common.StreamLogging
+import java.time.ZonedDateTime
+import org.scalatest.Matchers
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
+import org.scalatest.concurrent.ScalaFutures
+import scala.util.Failure
+import whisk.core.entity.ActivationLogs
+import org.scalatest.FlatSpecLike
+import pureconfig.error.ConfigReaderException
+import scala.concurrent.Await
+import scala.concurrent.Promise
+import scala.concurrent.duration._
+import scala.util.Success
+import scala.util.Try
+import spray.json.JsNumber
+import spray.json.JsObject
+import spray.json._
+import whisk.core.entity.ActionLimits
+import whisk.core.entity.ActivationId
+import whisk.core.entity.ActivationResponse
+import whisk.core.entity.EntityName
+import whisk.core.entity.EntityPath
+import whisk.core.entity.LogLimit
+import whisk.core.entity.MemoryLimit
+import whisk.core.entity.Parameters
+import whisk.core.entity.Subject
+import whisk.core.entity.TimeLimit
+import whisk.core.entity.WhiskActivation
+import whisk.core.entity.size._
+
+class SplunkLogStoreTests
+ extends TestKit(ActorSystem("SplunkLogStore"))
+ with FlatSpecLike
+ with Matchers
+ with ScalaFutures
+ with StreamLogging {
+ val testConfig = SplunkLogStoreConfig(
+ "splunk-host",
+ 8080,
+ "splunk-user",
+ "splunk-pass",
+ "splunk-index",
+ "log_message",
+ "activation_id",
+ false)
+
+ behavior of "Splunk LogStore"
+
+ val startTime = "2007-12-03T10:15:30Z"
+ val endTime = "2007-12-03T10:15:45Z"
+ val endTimePlus5 = "2007-12-03T10:15:50Z" //queried end time range is endTime+5
+
+ val activation = WhiskActivation(
+ namespace = EntityPath("ns"),
+ name = EntityName("a"),
+ Subject(),
+ activationId = ActivationId(),
+ start = ZonedDateTime.parse(startTime).toInstant,
+ end = ZonedDateTime.parse(endTime).toInstant,
+ response = ActivationResponse.success(Some(JsObject("res" -> JsNumber(1)))),
+ annotations = Parameters("limits", ActionLimits(TimeLimit(1.second), MemoryLimit(128.MB), LogLimit(1.MB)).toJson),
+ duration = Some(123))
+
+ implicit val ec = system.dispatcher
+ implicit val materializer = ActorMaterializer()
+
+ val testFlow: Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), NotUsed] =
+ Flow[(HttpRequest, Promise[HttpResponse])]
+ .mapAsyncUnordered(1) {
+ case (request, userContext) =>
+ //we use cachedHostConnectionPoolHttps so won't get the host+port with the request
+ Unmarshal(request.entity)
+ .to[FormData]
+ .map { form =>
+ val earliestTime = form.fields.get("earliest_time")
+ val latestTime = form.fields.get("latest_time")
+ val outputMode = form.fields.get("output_mode")
+ val search = form.fields.get("search")
+ val execMode = form.fields.get("exec_mode")
+
+ request.uri.path.toString() shouldBe "/services/search/jobs"
+ request.headers shouldBe List(Authorization.basic(testConfig.username, testConfig.password))
+ earliestTime shouldBe Some(startTime)
+ latestTime shouldBe Some(endTimePlus5)
+ outputMode shouldBe Some("json")
+ execMode shouldBe Some("oneshot")
+ search shouldBe Some(
+ s"""search index="${testConfig.index}"| spath ${testConfig.activationIdField}| search ${testConfig.activationIdField}=${activation.activationId.toString}| table ${testConfig.logMessageField}| reverse""")
+
+ (
+ Success(
+ HttpResponse(
+ StatusCodes.OK,
+ entity = HttpEntity(
+ ContentTypes.`application/json`,
+ """{"preview":false,"init_offset":0,"messages":[],"fields":[{"name":"log_message"}],"results":[{"log_message":"some log message"},{"log_message":"some other log message"}], "highlighted":{}}"""))),
+ userContext)
+ }
+ .recover {
+ case e =>
+ println("failed")
+ (Failure(e), userContext)
+ }
+ }
+ val failFlow: Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), NotUsed] =
+ Flow[(HttpRequest, Promise[HttpResponse])]
+ .map {
+ case (request, userContext) =>
+ (Success(HttpResponse(StatusCodes.InternalServerError)), userContext)
+
+ }
+
+ it should "fail when loading out of box configs (because whisk.logstore.splunk doesn't exist)" in {
+ assertThrows[ConfigReaderException[_]] {
+ val splunkStore = new SplunkLogStore(system)
+ }
+
+ }
+ it should "find logs based on activation timestamps" in {
+ //use the a flow that asserts the request structure and provides a response in the expected format
+ val splunkStore = new SplunkLogStore(system, Some(testFlow), testConfig)
+ val result = Await.result(splunkStore.fetchLogs(activation), 1.second)
+ result shouldBe ActivationLogs(Vector("some log message", "some other log message"))
+ }
+
+ it should "fail to connect to bogus host" in {
+ //use the default http flow with the default bogus-host config
+ val splunkStore = new SplunkLogStore(system, splunkConfig = testConfig)
+ val result = splunkStore.fetchLogs(activation)
+ whenReady(result.failed, Timeout(1.second)) { ex =>
+ ex shouldBe an[StreamTcpException]
+ }
+ }
+ it should "display an error if API cannot be reached" in {
+ //use a flow that generates a 500 response
+ val splunkStore = new SplunkLogStore(system, Some(failFlow), testConfig)
+ val result = splunkStore.fetchLogs(activation)
+ whenReady(result.failed, Timeout(1.second)) { ex =>
+ ex shouldBe an[RuntimeException]
+ }
+
+ }
+
+}
--
To stop receiving notification emails like this one, please contact
markusthoemmes@apache.org.