You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2018/04/03 07:18:14 UTC
[incubator-openwhisk] branch master updated: Add
ElasticSearchLogStore. (#3421)
This is an automated email from the ASF dual-hosted git repository.
markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new cb60132 Add ElasticSearchLogStore. (#3421)
cb60132 is described below
commit cb601327568a54721b3ccc5cff8b881e993eb388
Author: James Dubee <jw...@us.ibm.com>
AuthorDate: Tue Apr 3 03:18:11 2018 -0400
Add ElasticSearchLogStore. (#3421)
Provides a log store that allows logs to be fetched from ElasticSearch.
---
.../src/main/scala/whisk/core/WhiskConfig.scala | 1 +
.../logging/ElasticSearchLogStore.scala | 121 +++++++++++
.../logging/ElasticSearchRestClient.scala | 178 ++++++++++++++++
.../main/scala/whisk/http/PoolingRestClient.scala | 18 +-
.../scala/whisk/core/controller/ApiUtils.scala | 8 +-
.../logging/ElasticSearchLogStoreTests.scala | 211 +++++++++++++++++++
.../logging/ElasticSearchRestClientTests.scala | 223 +++++++++++++++++++++
7 files changed, 755 insertions(+), 5 deletions(-)
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 7e61480..9b4dd98 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -247,6 +247,7 @@ object ConfigKeys {
val logStore = "whisk.logstore"
val splunk = s"$logStore.splunk"
+ val elasticSearch = s"$logStore.elasticsearch"
val mesos = "whisk.mesos"
}
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchLogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchLogStore.scala
new file mode 100644
index 0000000..26f0cd3
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchLogStore.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.logging
+
+import java.nio.file.{Path, Paths}
+
+import akka.actor.ActorSystem
+import akka.stream.scaladsl.Flow
+import akka.http.scaladsl.model._
+
+import whisk.core.entity.{ActivationLogs, Identity, WhiskActivation}
+import whisk.core.containerpool.logging.ElasticSearchJsonProtocol._
+import whisk.core.ConfigKeys
+
+import scala.concurrent.{Future, Promise}
+import scala.util.Try
+
+import spray.json._
+
+import pureconfig._
+
+case class ElasticSearchLogFieldConfig(userLogs: String,
+ message: String,
+ activationId: String,
+ stream: String,
+ time: String)
+
+case class ElasticSearchLogStoreConfig(protocol: String,
+ host: String,
+ port: Int,
+ path: String,
+ logSchema: ElasticSearchLogFieldConfig,
+ requiredHeaders: Seq[String] = Seq.empty)
+
+/**
+ * ElasticSearch based implementation of a DockerToActivationFileLogStore. When using the JSON log driver, docker writes
+ * stdout/stderr to JSON formatted files. Those files can be processed by a backend service asynchronously to store
+ * user logs in ElasticSearch. This log store allows user logs then to be fetched from ElasticSearch.
+ */
+class ElasticSearchLogStore(
+ system: ActorSystem,
+ httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None,
+ destinationDirectory: Path = Paths.get("logs"),
+ elasticSearchConfig: ElasticSearchLogStoreConfig =
+ loadConfigOrThrow[ElasticSearchLogStoreConfig](ConfigKeys.elasticSearch))
+ extends DockerToActivationFileLogStore(system, destinationDirectory) {
+
+ // Schema of resultant logs from ES
+ case class UserLogEntry(message: String, stream: String, time: String) {
+ def toFormattedString = s"${time} ${stream}: ${message.stripLineEnd}"
+ }
+
+ object UserLogEntry extends DefaultJsonProtocol {
+ implicit val serdes =
+ jsonFormat(
+ UserLogEntry.apply,
+ elasticSearchConfig.logSchema.message,
+ elasticSearchConfig.logSchema.stream,
+ elasticSearchConfig.logSchema.time)
+ }
+
+ implicit val actorSystem = system
+
+ private val esClient = new ElasticSearchRestClient(
+ elasticSearchConfig.protocol,
+ elasticSearchConfig.host,
+ elasticSearchConfig.port,
+ httpFlow)
+
+ private def transcribeLogs(queryResult: EsSearchResult): ActivationLogs =
+ ActivationLogs(queryResult.hits.hits.map(_.source.convertTo[UserLogEntry].toFormattedString))
+
+ private def extractRequiredHeaders(headers: Seq[HttpHeader]) =
+ headers.filter(h => elasticSearchConfig.requiredHeaders.contains(h.lowercaseName)).toList
+
+ private def generatePayload(activation: WhiskActivation) = {
+ val logQuery =
+ s"_type: ${elasticSearchConfig.logSchema.userLogs} AND ${elasticSearchConfig.logSchema.activationId}: ${activation.activationId}"
+ val queryString = EsQueryString(logQuery)
+ val queryOrder = EsQueryOrder(elasticSearchConfig.logSchema.time, EsOrderAsc)
+
+ EsQuery(queryString, Some(queryOrder))
+ }
+
+ private def generatePath(user: Identity) = elasticSearchConfig.path.format(user.uuid.asString)
+
+ override def fetchLogs(user: Identity, activation: WhiskActivation, request: HttpRequest): Future[ActivationLogs] = {
+ val headers = extractRequiredHeaders(request.headers)
+
+ // Return logs from ElasticSearch, or return logs from activation if required headers are not present
+ if (headers.length == elasticSearchConfig.requiredHeaders.length) {
+ esClient.search[EsSearchResult](generatePath(user), generatePayload(activation), headers).flatMap {
+ case Right(queryResult) =>
+ Future.successful(transcribeLogs(queryResult))
+ case Left(code) =>
+ Future.failed(new RuntimeException(s"Status code '$code' was returned from log store"))
+ }
+ } else {
+ Future.successful(activation.logs)
+ }
+ }
+}
+
+object ElasticSearchLogStoreProvider extends LogStoreProvider {
+ override def logStore(actorSystem: ActorSystem): LogStore = new ElasticSearchLogStore(actorSystem)
+}
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchRestClient.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchRestClient.scala
new file mode 100644
index 0000000..2f34c26
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchRestClient.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.logging
+
+import scala.concurrent.Future
+import scala.util.{Either, Try}
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.model.HttpMethods.{GET, POST}
+import akka.http.scaladsl.model.headers.Accept
+import akka.stream.scaladsl.Flow
+
+import scala.concurrent.Promise
+import scala.util.Try
+
+import spray.json._
+
+import whisk.http.PoolingRestClient
+
+trait EsQueryMethod
+trait EsOrder
+trait EsRange
+trait EsAgg
+trait EsMatch
+
+// Schema of ES query operators
+case object EsOrderAsc extends EsOrder { override def toString = "asc" }
+case object EsOrderDesc extends EsOrder { override def toString = "desc" }
+case object EsRangeGte extends EsRange { override def toString = "gte" }
+case object EsRangeGt extends EsRange { override def toString = "gt" }
+case object EsRangeLte extends EsRange { override def toString = "lte" }
+case object EsRangeLt extends EsRange { override def toString = "lt" }
+case object EsAggMax extends EsAgg { override def toString = "max" }
+case object EsAggMin extends EsAgg { override def toString = "min" }
+case object EsMatchPhrase extends EsMatch { override def toString = "phrase" }
+case object EsMatchPhrasePrefix extends EsMatch { override def toString = "phrase_prefix" }
+
+// Schema of ES queries
+case class EsQueryAggs(aggField: String, agg: EsAgg, field: String)
+case class EsQueryRange(key: String, range: EsRange, value: String)
+case class EsQueryBoolMatch(key: String, value: String)
+case class EsQueryOrder(field: String, kind: EsOrder)
+case class EsQuerySize(size: Integer)
+case class EsQueryAll() extends EsQueryMethod
+case class EsQueryMust(matches: Vector[EsQueryBoolMatch], range: Option[EsQueryRange] = None) extends EsQueryMethod
+case class EsQueryMatch(field: String, value: String, matchType: Option[EsMatch] = None) extends EsQueryMethod
+case class EsQueryTerm(key: String, value: String) extends EsQueryMethod
+case class EsQueryString(queryString: String) extends EsQueryMethod
+case class EsQuery(query: EsQueryMethod,
+ sort: Option[EsQueryOrder] = None,
+ size: Option[EsQuerySize] = None,
+ aggs: Option[EsQueryAggs] = None)
+
+// Schema of ES query results
+case class EsSearchHit(source: JsObject)
+case class EsSearchHits(hits: Vector[EsSearchHit])
+case class EsSearchResult(hits: EsSearchHits)
+
+object ElasticSearchJsonProtocol extends DefaultJsonProtocol {
+
+ implicit object EsQueryMatchJsonFormat extends RootJsonFormat[EsQueryMatch] {
+ def read(query: JsValue) = ???
+ def write(query: EsQueryMatch) = {
+ val matchQuery = Map("query" -> query.value.toJson) ++ query.matchType.map(m => "type" -> m.toString.toJson)
+ JsObject("match" -> JsObject(query.field -> matchQuery.toJson))
+ }
+ }
+
+ implicit object EsQueryTermJsonFormat extends RootJsonFormat[EsQueryTerm] {
+ def read(query: JsValue) = ???
+ def write(query: EsQueryTerm) = JsObject("term" -> JsObject(query.key -> query.value.toJson))
+ }
+
+ implicit object EsQueryStringJsonFormat extends RootJsonFormat[EsQueryString] {
+ def read(query: JsValue) = ???
+ def write(query: EsQueryString) =
+ JsObject("query_string" -> JsObject("query" -> query.queryString.toJson))
+ }
+
+ implicit object EsQueryRangeJsonFormat extends RootJsonFormat[EsQueryRange] {
+ def read(query: JsValue) = ???
+ def write(query: EsQueryRange) =
+ JsObject("range" -> JsObject(query.key -> JsObject(query.range.toString -> query.value.toJson)))
+ }
+
+ implicit object EsQueryBoolMatchJsonFormat extends RootJsonFormat[EsQueryBoolMatch] {
+ def read(query: JsValue) = ???
+ def write(query: EsQueryBoolMatch) = JsObject("match" -> JsObject(query.key -> query.value.toJson))
+ }
+
+ implicit object EsQueryMustJsonFormat extends RootJsonFormat[EsQueryMust] {
+ def read(query: JsValue) = ???
+ def write(query: EsQueryMust) = {
+ val boolQuery = Map("must" -> query.matches.toJson) ++ query.range.map(r => "filter" -> r.toJson)
+ JsObject("bool" -> boolQuery.toJson)
+ }
+ }
+
+ implicit object EsQueryOrderJsonFormat extends RootJsonFormat[EsQueryOrder] {
+ def read(query: JsValue) = ???
+ def write(query: EsQueryOrder) =
+ JsArray(JsObject(query.field -> JsObject("order" -> query.kind.toString.toJson)))
+ }
+
+ implicit object EsQuerySizeJsonFormat extends RootJsonFormat[EsQuerySize] {
+ def read(query: JsValue) = ???
+ def write(query: EsQuerySize) = JsNumber(query.size)
+ }
+
+ implicit object EsQueryAggsJsonFormat extends RootJsonFormat[EsQueryAggs] {
+ def read(query: JsValue) = ???
+ def write(query: EsQueryAggs) =
+ JsObject(query.aggField -> JsObject(query.agg.toString -> JsObject("field" -> query.field.toJson)))
+ }
+
+ implicit object EsQueryAllJsonFormat extends RootJsonFormat[EsQueryAll] {
+ def read(query: JsValue) = ???
+ def write(query: EsQueryAll) = JsObject("match_all" -> JsObject())
+ }
+
+ implicit object EsQueryMethod extends RootJsonFormat[EsQueryMethod] {
+ def read(query: JsValue) = ???
+ def write(method: EsQueryMethod) = method match {
+ case queryTerm: EsQueryTerm => queryTerm.toJson
+ case queryString: EsQueryString => queryString.toJson
+ case queryMatch: EsQueryMatch => queryMatch.toJson
+ case queryMust: EsQueryMust => queryMust.toJson
+ case queryAll: EsQueryAll => queryAll.toJson
+ }
+ }
+
+ implicit val esQueryFormat = jsonFormat4(EsQuery.apply)
+ implicit val esSearchHitFormat = jsonFormat(EsSearchHit.apply _, "_source")
+ implicit val esSearchHitsFormat = jsonFormat1(EsSearchHits.apply)
+ implicit val esSearchResultFormat = jsonFormat1(EsSearchResult.apply)
+}
+
+class ElasticSearchRestClient(
+ protocol: String,
+ host: String,
+ port: Int,
+ httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None)(
+ implicit system: ActorSystem)
+ extends PoolingRestClient(protocol, host, port, 16 * 1024, httpFlow) {
+
+ import ElasticSearchJsonProtocol._
+
+ private val baseHeaders: List[HttpHeader] = List(Accept(MediaTypes.`application/json`))
+
+ def info(headers: List[HttpHeader] = List.empty): Future[Either[StatusCode, JsObject]] = {
+ requestJson[JsObject](mkRequest(GET, Uri./, baseHeaders ++ headers))
+ }
+
+ def index(index: String, headers: List[HttpHeader] = List.empty): Future[Either[StatusCode, JsObject]] = {
+ requestJson[JsObject](mkRequest(GET, Uri(index), baseHeaders ++ headers))
+ }
+
+ def search[T: RootJsonReader](index: String,
+ payload: EsQuery = EsQuery(EsQueryAll()),
+ headers: List[HttpHeader] = List.empty): Future[Either[StatusCode, T]] =
+ requestJson[T](mkJsonRequest(POST, Uri(index), payload.toJson.asJsObject, baseHeaders ++ headers))
+}
diff --git a/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala b/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
index ef09770..0da6630 100644
--- a/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
+++ b/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
@@ -20,6 +20,7 @@ package whisk.http
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.util.{Failure, Success}
+import scala.util.Try
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
@@ -32,6 +33,7 @@ import akka.stream.ActorMaterializer
import akka.stream.OverflowStrategy
import akka.stream.QueueOfferResult
import akka.stream.scaladsl._
+import akka.stream.scaladsl.Flow
import spray.json._
@@ -42,7 +44,13 @@ import spray.json._
* on each request because it doesn't need to look up the pool corresponding
* to the host. It is also easier to add an extra queueing mechanism.
*/
-class PoolingRestClient(protocol: String, host: String, port: Int, queueSize: Int)(implicit system: ActorSystem) {
+class PoolingRestClient(
+ protocol: String,
+ host: String,
+ port: Int,
+ queueSize: Int,
+ httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None)(
+ implicit system: ActorSystem) {
require(protocol == "http" || protocol == "https", "Protocol must be one of { http, https }.")
implicit val context = system.dispatcher
@@ -55,6 +63,10 @@ class PoolingRestClient(protocol: String, host: String, port: Int, queueSize: In
Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host = host, port = port)
}
+ private val defaultHttpFlow = pool.mapMaterializedValue { x =>
+ poolPromise.success(x); x
+ }
+
private val poolPromise = Promise[HostConnectionPool]
// Additional queue in case all connections are busy. Should hardly ever be
@@ -62,9 +74,7 @@ class PoolingRestClient(protocol: String, host: String, port: Int, queueSize: In
// asynchronous requests in a very short period of time.
private val requestQueue = Source
.queue(queueSize, OverflowStrategy.dropNew)
- .via(pool.mapMaterializedValue { x =>
- poolPromise.success(x); x
- })
+ .via(httpFlow.getOrElse(defaultHttpFlow))
.toMat(Sink.foreach({
case ((Success(response), p)) => p.success(response)
case ((Failure(error), p)) => p.failure(error)
diff --git a/core/controller/src/main/scala/whisk/core/controller/ApiUtils.scala b/core/controller/src/main/scala/whisk/core/controller/ApiUtils.scala
index ea74d72..ed52693 100644
--- a/core/controller/src/main/scala/whisk/core/controller/ApiUtils.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/ApiUtils.scala
@@ -184,7 +184,13 @@ trait ReadOps extends Directives {
onComplete(factory.get(datastore, docid)) {
case Success(entity) =>
logging.debug(this, s"[PROJECT] entity success")
- complete(OK, project(entity))
+
+ onComplete(project(entity)) {
+ case Success(response: JsObject) => complete(OK, response)
+ case Failure(t: Throwable) =>
+ logging.error(this, s"[PROJECT] projection failed: ${t.getMessage}")
+ terminate(InternalServerError, t.getMessage)
+ }
case Failure(t: NoDocumentException) =>
logging.debug(this, s"[PROJECT] entity does not exist")
terminate(NotFound)
diff --git a/tests/src/test/scala/whisk/core/containerpool/logging/ElasticSearchLogStoreTests.scala b/tests/src/test/scala/whisk/core/containerpool/logging/ElasticSearchLogStoreTests.scala
new file mode 100644
index 0000000..13ca7dc
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/containerpool/logging/ElasticSearchLogStoreTests.scala
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.logging
+
+import java.time.ZonedDateTime
+
+import akka.NotUsed
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.model.HttpMethods.{GET, POST}
+import akka.http.scaladsl.model.headers.{Accept, RawHeader}
+import akka.stream.scaladsl.Flow
+import akka.stream.{ActorMaterializer, StreamTcpException}
+import akka.testkit.TestKit
+
+import common.StreamLogging
+
+import org.junit.runner.RunWith
+import org.scalatest.{FlatSpecLike, Matchers}
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+
+import pureconfig.error.ConfigReaderException
+
+import spray.json._
+
+import whisk.core.entity._
+import whisk.core.entity.size._
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, ExecutionContext, Future, Promise}
+import scala.util.{Success, Try}
+
+@RunWith(classOf[JUnitRunner])
+class ElasticSearchLogStoreTests
+ extends TestKit(ActorSystem("ElasticSearchLogStore"))
+ with FlatSpecLike
+ with Matchers
+ with ScalaFutures
+ with StreamLogging {
+
+ implicit val ec: ExecutionContext = system.dispatcher
+ implicit val materializer: ActorMaterializer = ActorMaterializer()
+
+ private val user = Identity(Subject(), EntityName("testSpace"), AuthKey(), Set())
+ private val activationId = ActivationId.generate()
+
+ private val defaultLogSchema =
+ ElasticSearchLogFieldConfig("user_logs", "message", "activationId_str", "stream_str", "time_date")
+ private val defaultConfig =
+ ElasticSearchLogStoreConfig("https", "host", 443, "/whisk_user_logs/_search", defaultLogSchema)
+ private val defaultConfigRequiredHeaders =
+ ElasticSearchLogStoreConfig(
+ "https",
+ "host",
+ 443,
+ "/whisk_user_logs/_search",
+ defaultLogSchema,
+ Seq("x-auth-token", "x-auth-project-id"))
+
+ private val defaultHttpResponse = HttpResponse(
+ StatusCodes.OK,
+ entity = HttpEntity(
+ ContentTypes.`application/json`,
+ s"""{"took":799,"timed_out":false,"_shards":{"total":204,"successful":204,"failed":0},"hits":{"total":2,"max_score":null,"hits":[{"_index":"logstash-2018.03.05.02","_type":"user_logs","_id":"1c00007f-ecb9-4083-8d2e-4d5e2849621f","_score":null,"_source":{"time_date":"2018-03-05T02:10:38.196689522Z","accountId":null,"message":"some log stuff\\n","type":"user_logs","event_uuid":"1c00007f-ecb9-4083-8d2e-4d5e2849621f","activationId_str":"$activationId","action_str":"user@email.com/logs" [...]
+ private val defaultPayload = JsObject(
+ "query" -> JsObject(
+ "query_string" -> JsObject("query" -> JsString(
+ s"_type: ${defaultConfig.logSchema.userLogs} AND ${defaultConfig.logSchema.activationId}: $activationId"))),
+ "sort" -> JsArray(JsObject(defaultConfig.logSchema.time -> JsObject("order" -> JsString("asc"))))).compactPrint
+ private val defaultHttpRequest = HttpRequest(
+ POST,
+ Uri(s"/whisk_user_logs/_search"),
+ List(Accept(MediaTypes.`application/json`)),
+ HttpEntity(ContentTypes.`application/json`, defaultPayload))
+ private val defaultLogStoreHttpRequest =
+ HttpRequest(method = GET, uri = "https://some.url", entity = HttpEntity.Empty)
+
+ private val expectedLogs = ActivationLogs(
+ Vector("2018-03-05T02:10:38.196689522Z stdout: some log stuff", "2018-03-05T02:10:38.196754258Z stdout: more logs"))
+
+ private val activation = WhiskActivation(
+ namespace = EntityPath("namespace"),
+ name = EntityName("name"),
+ Subject(),
+ activationId = activationId,
+ start = ZonedDateTime.now.toInstant,
+ end = ZonedDateTime.now.toInstant,
+ response = ActivationResponse.success(Some(JsObject("res" -> JsNumber(1)))),
+ logs = expectedLogs,
+ annotations = Parameters("limits", ActionLimits(TimeLimit(1.second), MemoryLimit(128.MB), LogLimit(1.MB)).toJson))
+
+ private def testFlow(httpResponse: HttpResponse = HttpResponse(), httpRequest: HttpRequest = HttpRequest())
+ : Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), NotUsed] =
+ Flow[(HttpRequest, Promise[HttpResponse])]
+ .mapAsyncUnordered(1) {
+ case (request, userContext) =>
+ request shouldBe httpRequest
+ Future.successful((Success(httpResponse), userContext))
+ }
+
+ private def await[T](awaitable: Future[T], timeout: FiniteDuration = 10.seconds) = Await.result(awaitable, timeout)
+
+ behavior of "ElasticSearch Log Store"
+
+ it should "fail when loading out of box configs since whisk.logstore.elasticsearch does not exist" in {
+ a[ConfigReaderException[_]] should be thrownBy new ElasticSearchLogStore(system)
+ }
+
+ it should "get user logs from ElasticSearch when there are no required headers needed" in {
+ val esLogStore =
+ new ElasticSearchLogStore(
+ system,
+ Some(testFlow(defaultHttpResponse, defaultHttpRequest)),
+ elasticSearchConfig = defaultConfig)
+
+ await(esLogStore.fetchLogs(user, activation.withoutLogs, defaultLogStoreHttpRequest)) shouldBe expectedLogs
+ }
+
+ it should "get logs from supplied activation record when required headers are not present" in {
+ val esLogStore =
+ new ElasticSearchLogStore(
+ system,
+ Some(testFlow(defaultHttpResponse, defaultHttpRequest)),
+ elasticSearchConfig = defaultConfigRequiredHeaders)
+
+ await(esLogStore.fetchLogs(user, activation, defaultLogStoreHttpRequest)) shouldBe expectedLogs
+ }
+
+ it should "get user logs from ElasticSearch when required headers are needed" in {
+ val authToken = "token"
+ val authProjectId = "projectId"
+ val httpRequest = HttpRequest(
+ POST,
+ Uri(s"/whisk_user_logs/_search"),
+ List(
+ Accept(MediaTypes.`application/json`),
+ RawHeader("x-auth-token", authToken),
+ RawHeader("x-auth-project-id", authProjectId)),
+ HttpEntity(ContentTypes.`application/json`, defaultPayload))
+ val esLogStore =
+ new ElasticSearchLogStore(
+ system,
+ Some(testFlow(defaultHttpResponse, httpRequest)),
+ elasticSearchConfig = defaultConfigRequiredHeaders)
+ val requiredHeadersHttpRequest = HttpRequest(
+ uri = "https://some.url",
+ headers = List(RawHeader("x-auth-token", authToken), RawHeader("x-auth-project-id", authProjectId)),
+ entity = HttpEntity.Empty)
+
+ await(esLogStore.fetchLogs(user, activation.withoutLogs, requiredHeadersHttpRequest)) shouldBe expectedLogs
+ }
+
+ it should "dynamically replace $UUID in request path" in {
+ val dynamicPathConfig =
+ ElasticSearchLogStoreConfig("https", "host", 443, "/elasticsearch/logstash-%s*/_search", defaultLogSchema)
+ val httpRequest = HttpRequest(
+ POST,
+ Uri(s"/elasticsearch/logstash-${user.uuid.asString}*/_search"),
+ List(Accept(MediaTypes.`application/json`)),
+ HttpEntity(ContentTypes.`application/json`, defaultPayload))
+ val esLogStore = new ElasticSearchLogStore(
+ system,
+ Some(testFlow(defaultHttpResponse, httpRequest)),
+ elasticSearchConfig = dynamicPathConfig)
+
+ await(esLogStore.fetchLogs(user, activation.withoutLogs, defaultLogStoreHttpRequest)) shouldBe expectedLogs
+ }
+
+ it should "fail to connect to invalid host" in {
+ val esLogStore = new ElasticSearchLogStore(system, elasticSearchConfig = defaultConfig)
+
+ a[StreamTcpException] should be thrownBy await(esLogStore.fetchLogs(user, activation, defaultLogStoreHttpRequest))
+ }
+
+ it should "forward errors from ElasticSearch" in {
+ val httpResponse = HttpResponse(StatusCodes.InternalServerError)
+ val esLogStore =
+ new ElasticSearchLogStore(
+ system,
+ Some(testFlow(httpResponse, defaultHttpRequest)),
+ elasticSearchConfig = defaultConfig)
+
+ a[RuntimeException] should be thrownBy await(esLogStore.fetchLogs(user, activation, defaultLogStoreHttpRequest))
+ }
+
+ it should "error when configuration protocol is invalid" in {
+ val invalidHostConfig =
+ ElasticSearchLogStoreConfig("protocol", "host", 443, "/whisk_user_logs", defaultLogSchema, Seq.empty)
+
+ a[IllegalArgumentException] should be thrownBy new ElasticSearchLogStore(
+ system,
+ elasticSearchConfig = invalidHostConfig)
+ }
+
+}
diff --git a/tests/src/test/scala/whisk/core/containerpool/logging/ElasticSearchRestClientTests.scala b/tests/src/test/scala/whisk/core/containerpool/logging/ElasticSearchRestClientTests.scala
new file mode 100644
index 0000000..74eccdf
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/containerpool/logging/ElasticSearchRestClientTests.scala
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.logging
+
+import spray.json._
+
+import org.junit.runner.RunWith
+import org.scalatest.{FlatSpecLike, Matchers}
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+
+import akka.NotUsed
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.model.headers.Accept
+import akka.stream.scaladsl.Flow
+import akka.stream.ActorMaterializer
+import akka.testkit.TestKit
+import akka.http.scaladsl.model.HttpMethods.POST
+
+import common.StreamLogging
+
+import whisk.core.containerpool.logging.ElasticSearchJsonProtocol._
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, ExecutionContext, Future, Promise}
+import scala.util.{Success, Try}
+
+@RunWith(classOf[JUnitRunner])
+class ElasticSearchRestClientTests
+ extends TestKit(ActorSystem("ElasticSearchRestClient"))
+ with FlatSpecLike
+ with Matchers
+ with ScalaFutures
+ with StreamLogging {
+
+ implicit val ec: ExecutionContext = system.dispatcher
+ implicit val materializer: ActorMaterializer = ActorMaterializer()
+
+ private val defaultResponseSource =
+ """{"stream":"stdout","activationId":"197d60b33137424ebd60b33137d24ea3","action":"guest/someAction","@version":"1","@timestamp":"2018-03-27T15:48:09.112Z","type":"user_logs","tenant":"19bc46b1-71f6-4ed5-8c54-816aa4f8c502","message":"namespace : user@email.com\n","time_date":"2018-03-27T15:48:08.716152793Z"}"""
+ private val defaultResponse =
+ s"""{"took":2,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":1375,"max_score":1.0,"hits":[{"_index":"whisk_user_logs","_type":"user_logs","_id":"AWJoJSwAMGbzgxiD1jr9","_score":1.0,"_source":$defaultResponseSource}]}}"""
+ private val defaultHttpResponse = HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, defaultResponse))
+ private val defaultHttpRequest = HttpRequest(
+ POST,
+ headers = List(Accept(MediaTypes.`application/json`)),
+ entity = HttpEntity(ContentTypes.`application/json`, EsQuery(EsQueryAll()).toJson.toString))
+
+ private def testFlow(httpResponse: HttpResponse = HttpResponse(), httpRequest: HttpRequest = HttpRequest())
+ : Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), NotUsed] =
+ Flow[(HttpRequest, Promise[HttpResponse])]
+ .mapAsyncUnordered(1) {
+ case (request, userContext) =>
+ request shouldBe httpRequest
+ Future.successful((Success(httpResponse), userContext))
+ }
+
+ private def await[T](awaitable: Future[T], timeout: FiniteDuration = 10.seconds) = Await.result(awaitable, timeout)
+
+ behavior of "ElasticSearch Rest Client"
+
+ it should "construct a query with must" in {
+ val queryTerms = Vector(EsQueryBoolMatch("someKey1", "someValue1"), EsQueryBoolMatch("someKey2", "someValue2"))
+ val queryMust = EsQueryMust(queryTerms)
+
+ EsQuery(queryMust).toJson shouldBe JsObject(
+ "query" ->
+ JsObject(
+ "bool" ->
+ JsObject(
+ "must" ->
+ JsArray(
+ JsObject("match" -> JsObject("someKey1" -> JsString("someValue1"))),
+ JsObject("match" -> JsObject("someKey2" -> JsString("someValue2")))))))
+
+ // Test must with ranges
+ Seq((EsRangeGte, "gte"), (EsRangeGt, "gt"), (EsRangeLte, "lte"), (EsRangeLt, "lt")).foreach {
+ case (rangeArg, rangeValue) =>
+ val queryRange = EsQueryRange("someKey", rangeArg, "someValue")
+ val queryTerms = Vector(EsQueryBoolMatch("someKey1", "someValue1"), EsQueryBoolMatch("someKey2", "someValue2"))
+ val queryMust = EsQueryMust(queryTerms, Some(queryRange))
+
+ EsQuery(queryMust).toJson shouldBe JsObject(
+ "query" ->
+ JsObject(
+ "bool" ->
+ JsObject(
+ "must" ->
+ JsArray(
+ JsObject("match" -> JsObject("someKey1" -> JsString("someValue1"))),
+ JsObject("match" -> JsObject("someKey2" -> JsString("someValue2")))),
+ "filter" ->
+ JsObject("range" ->
+ JsObject("someKey" ->
+ JsObject(rangeValue -> "someValue".toJson))))))
+ }
+ }
+
+ it should "construct a query with aggregations" in {
+ Seq((EsAggMax, "max"), (EsAggMin, "min")).foreach {
+ case (aggArg, aggValue) =>
+ val queryAgg = EsQueryAggs("someAgg", aggArg, "someField")
+
+ EsQuery(EsQueryAll(), aggs = Some(queryAgg)).toJson shouldBe JsObject(
+ "query" -> JsObject("match_all" -> JsObject()),
+ "aggs" -> JsObject("someAgg" -> JsObject(aggValue -> JsObject("field" -> "someField".toJson))))
+ }
+ }
+
+ it should "construct a query with match" in {
+ val queryMatch = EsQueryMatch("someField", "someValue")
+
+ EsQuery(queryMatch).toJson shouldBe JsObject(
+ "query" -> JsObject("match" -> JsObject("someField" -> JsObject("query" -> "someValue".toJson))))
+
+ // Test match with types
+ Seq((EsMatchPhrase, "phrase"), (EsMatchPhrasePrefix, "phrase_prefix")).foreach {
+ case (typeArg, typeValue) =>
+ val queryMatch = EsQueryMatch("someField", "someValue", Some(typeArg))
+
+ EsQuery(queryMatch).toJson shouldBe JsObject(
+ "query" -> JsObject(
+ "match" -> JsObject("someField" -> JsObject("query" -> "someValue".toJson, "type" -> typeValue.toJson))))
+ }
+ }
+
+ it should "construct a query with term" in {
+ val queryTerm = EsQueryTerm("user", "someUser")
+
+ EsQuery(queryTerm).toJson shouldBe JsObject("query" -> JsObject("term" -> JsObject("user" -> JsString("someUser"))))
+ }
+
+ it should "construct a query with query string" in {
+ val queryString = EsQueryString("_type: someType")
+
+ EsQuery(queryString).toJson shouldBe JsObject(
+ "query" -> JsObject("query_string" -> JsObject("query" -> JsString("_type: someType"))))
+ }
+
+ it should "construct a query with order" in {
+ Seq((EsOrderAsc, "asc"), (EsOrderDesc, "desc")).foreach {
+ case (orderArg, orderValue) =>
+ val queryOrder = EsQueryOrder("someField", orderArg)
+
+ EsQuery(EsQueryAll(), Some(queryOrder)).toJson shouldBe JsObject(
+ "query" -> JsObject("match_all" -> JsObject()),
+ "sort" -> JsArray(JsObject("someField" -> JsObject("order" -> orderValue.toJson))))
+ }
+ }
+
+ it should "construct query with size" in {
+ val querySize = EsQuerySize(1)
+
+ EsQuery(EsQueryAll(), size = Some(querySize)).toJson shouldBe JsObject(
+ "query" -> JsObject("match_all" -> JsObject()),
+ "size" -> JsNumber(1))
+ }
+
+ it should "error when search response does not match expected type" in {
+ val esClient = new ElasticSearchRestClient("https", "host", 443, Some(testFlow(httpRequest = defaultHttpRequest)))
+
+ a[RuntimeException] should be thrownBy await(esClient.search[JsObject]("/"))
+ }
+
+ it should "parse search response into EsSearchResult" in {
+ val esClient =
+ new ElasticSearchRestClient("https", "host", 443, Some(testFlow(defaultHttpResponse, defaultHttpRequest)))
+ val response = await(esClient.search[EsSearchResult]("/"))
+
+ response shouldBe 'right
+ response.right.get.hits.hits should have size 1
+ response.right.get.hits.hits(0).source shouldBe defaultResponseSource.parseJson.asJsObject
+ }
+
+ it should "return status code when HTTP error occurs" in {
+ val httpResponse = HttpResponse(StatusCodes.InternalServerError)
+ val esClient = new ElasticSearchRestClient("https", "host", 443, Some(testFlow(httpResponse, defaultHttpRequest)))
+ val response = await(esClient.search[JsObject]("/"))
+
+ response shouldBe 'left
+ response.left.get shouldBe StatusCodes.InternalServerError
+ }
+
+ it should "perform info request" in {
+ val responseBody = s"""{"cluster_name" : "elasticsearch"}"""
+ val httpRequest = HttpRequest(headers = List(Accept(MediaTypes.`application/json`)))
+ val httpResponse = HttpResponse(StatusCodes.OK, entity = HttpEntity(ContentTypes.`application/json`, responseBody))
+ val esClient =
+ new ElasticSearchRestClient("https", "host", 443, Some(testFlow(httpResponse, httpRequest)))
+ val response = await(esClient.info())
+
+ response shouldBe 'right
+ response.right.get shouldBe responseBody.parseJson.asJsObject
+ }
+
+ it should "perform index request" in {
+ val responseBody = s"""{"some_index" : {}}"""
+ val httpRequest = HttpRequest(uri = Uri("some_index"), headers = List(Accept(MediaTypes.`application/json`)))
+ val httpResponse = HttpResponse(StatusCodes.OK, entity = HttpEntity(ContentTypes.`application/json`, responseBody))
+ val esClient =
+ new ElasticSearchRestClient("https", "host", 443, Some(testFlow(httpResponse, httpRequest)))
+ val response = await(esClient.index("some_index"))
+
+ response shouldBe 'right
+ response.right.get shouldBe responseBody.parseJson.asJsObject
+ }
+}
--
To stop receiving notification emails like this one, please contact
markusthoemmes@apache.org.