You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2018/04/03 07:18:14 UTC

[incubator-openwhisk] branch master updated: Add ElasticSearchLogStore. (#3421)

This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new cb60132  Add ElasticSearchLogStore. (#3421)
cb60132 is described below

commit cb601327568a54721b3ccc5cff8b881e993eb388
Author: James Dubee <jw...@us.ibm.com>
AuthorDate: Tue Apr 3 03:18:11 2018 -0400

    Add ElasticSearchLogStore. (#3421)
    
    Provides a log store that allows logs to be fetched from ElasticSearch.
---
 .../src/main/scala/whisk/core/WhiskConfig.scala    |   1 +
 .../logging/ElasticSearchLogStore.scala            | 121 +++++++++++
 .../logging/ElasticSearchRestClient.scala          | 178 ++++++++++++++++
 .../main/scala/whisk/http/PoolingRestClient.scala  |  18 +-
 .../scala/whisk/core/controller/ApiUtils.scala     |   8 +-
 .../logging/ElasticSearchLogStoreTests.scala       | 211 +++++++++++++++++++
 .../logging/ElasticSearchRestClientTests.scala     | 223 +++++++++++++++++++++
 7 files changed, 755 insertions(+), 5 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 7e61480..9b4dd98 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -247,6 +247,7 @@ object ConfigKeys {
 
   val logStore = "whisk.logstore"
   val splunk = s"$logStore.splunk"
+  val elasticSearch = s"$logStore.elasticsearch"
 
   val mesos = "whisk.mesos"
 }
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchLogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchLogStore.scala
new file mode 100644
index 0000000..26f0cd3
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchLogStore.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.logging
+
+import java.nio.file.{Path, Paths}
+
+import akka.actor.ActorSystem
+import akka.stream.scaladsl.Flow
+import akka.http.scaladsl.model._
+
+import whisk.core.entity.{ActivationLogs, Identity, WhiskActivation}
+import whisk.core.containerpool.logging.ElasticSearchJsonProtocol._
+import whisk.core.ConfigKeys
+
+import scala.concurrent.{Future, Promise}
+import scala.util.Try
+
+import spray.json._
+
+import pureconfig._
+
+case class ElasticSearchLogFieldConfig(userLogs: String,
+                                       message: String,
+                                       activationId: String,
+                                       stream: String,
+                                       time: String)
+
+case class ElasticSearchLogStoreConfig(protocol: String,
+                                       host: String,
+                                       port: Int,
+                                       path: String,
+                                       logSchema: ElasticSearchLogFieldConfig,
+                                       requiredHeaders: Seq[String] = Seq.empty)
+
+/**
+ * ElasticSearch based implementation of a DockerToActivationFileLogStore. When using the JSON log driver, docker writes
+ * stdout/stderr to JSON formatted files. Those files can be processed by a backend service asynchronously to store
+ * user logs in ElasticSearch. This log store allows user logs then to be fetched from ElasticSearch.
+ */
+class ElasticSearchLogStore(
+  system: ActorSystem,
+  httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None,
+  destinationDirectory: Path = Paths.get("logs"),
+  elasticSearchConfig: ElasticSearchLogStoreConfig =
+    loadConfigOrThrow[ElasticSearchLogStoreConfig](ConfigKeys.elasticSearch))
+    extends DockerToActivationFileLogStore(system, destinationDirectory) {
+
+  // Schema of resultant logs from ES
+  case class UserLogEntry(message: String, stream: String, time: String) {
+    def toFormattedString = s"${time} ${stream}: ${message.stripLineEnd}"
+  }
+
+  object UserLogEntry extends DefaultJsonProtocol {
+    implicit val serdes =
+      jsonFormat(
+        UserLogEntry.apply,
+        elasticSearchConfig.logSchema.message,
+        elasticSearchConfig.logSchema.stream,
+        elasticSearchConfig.logSchema.time)
+  }
+
+  implicit val actorSystem = system
+
+  private val esClient = new ElasticSearchRestClient(
+    elasticSearchConfig.protocol,
+    elasticSearchConfig.host,
+    elasticSearchConfig.port,
+    httpFlow)
+
+  private def transcribeLogs(queryResult: EsSearchResult): ActivationLogs =
+    ActivationLogs(queryResult.hits.hits.map(_.source.convertTo[UserLogEntry].toFormattedString))
+
+  private def extractRequiredHeaders(headers: Seq[HttpHeader]) =
+    headers.filter(h => elasticSearchConfig.requiredHeaders.contains(h.lowercaseName)).toList
+
+  private def generatePayload(activation: WhiskActivation) = {
+    val logQuery =
+      s"_type: ${elasticSearchConfig.logSchema.userLogs} AND ${elasticSearchConfig.logSchema.activationId}: ${activation.activationId}"
+    val queryString = EsQueryString(logQuery)
+    val queryOrder = EsQueryOrder(elasticSearchConfig.logSchema.time, EsOrderAsc)
+
+    EsQuery(queryString, Some(queryOrder))
+  }
+
+  private def generatePath(user: Identity) = elasticSearchConfig.path.format(user.uuid.asString)
+
+  override def fetchLogs(user: Identity, activation: WhiskActivation, request: HttpRequest): Future[ActivationLogs] = {
+    val headers = extractRequiredHeaders(request.headers)
+
+    // Return logs from ElasticSearch, or return logs from activation if required headers are not present
+    if (headers.length == elasticSearchConfig.requiredHeaders.length) {
+      esClient.search[EsSearchResult](generatePath(user), generatePayload(activation), headers).flatMap {
+        case Right(queryResult) =>
+          Future.successful(transcribeLogs(queryResult))
+        case Left(code) =>
+          Future.failed(new RuntimeException(s"Status code '$code' was returned from log store"))
+      }
+    } else {
+      Future.successful(activation.logs)
+    }
+  }
+}
+
+object ElasticSearchLogStoreProvider extends LogStoreProvider {
+  override def logStore(actorSystem: ActorSystem): LogStore = new ElasticSearchLogStore(actorSystem)
+}
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchRestClient.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchRestClient.scala
new file mode 100644
index 0000000..2f34c26
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchRestClient.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.logging
+
+import scala.concurrent.Future
+import scala.util.{Either, Try}
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.model.HttpMethods.{GET, POST}
+import akka.http.scaladsl.model.headers.Accept
+import akka.stream.scaladsl.Flow
+
+import scala.concurrent.Promise
+import scala.util.Try
+
+import spray.json._
+
+import whisk.http.PoolingRestClient
+
+trait EsQueryMethod
+trait EsOrder
+trait EsRange
+trait EsAgg
+trait EsMatch
+
+// Schema of ES query operators
+case object EsOrderAsc extends EsOrder { override def toString = "asc" }
+case object EsOrderDesc extends EsOrder { override def toString = "desc" }
+case object EsRangeGte extends EsRange { override def toString = "gte" }
+case object EsRangeGt extends EsRange { override def toString = "gt" }
+case object EsRangeLte extends EsRange { override def toString = "lte" }
+case object EsRangeLt extends EsRange { override def toString = "lt" }
+case object EsAggMax extends EsAgg { override def toString = "max" }
+case object EsAggMin extends EsAgg { override def toString = "min" }
+case object EsMatchPhrase extends EsMatch { override def toString = "phrase" }
+case object EsMatchPhrasePrefix extends EsMatch { override def toString = "phrase_prefix" }
+
+// Schema of ES queries
+case class EsQueryAggs(aggField: String, agg: EsAgg, field: String)
+case class EsQueryRange(key: String, range: EsRange, value: String)
+case class EsQueryBoolMatch(key: String, value: String)
+case class EsQueryOrder(field: String, kind: EsOrder)
+case class EsQuerySize(size: Integer)
+case class EsQueryAll() extends EsQueryMethod
+case class EsQueryMust(matches: Vector[EsQueryBoolMatch], range: Option[EsQueryRange] = None) extends EsQueryMethod
+case class EsQueryMatch(field: String, value: String, matchType: Option[EsMatch] = None) extends EsQueryMethod
+case class EsQueryTerm(key: String, value: String) extends EsQueryMethod
+case class EsQueryString(queryString: String) extends EsQueryMethod
+case class EsQuery(query: EsQueryMethod,
+                   sort: Option[EsQueryOrder] = None,
+                   size: Option[EsQuerySize] = None,
+                   aggs: Option[EsQueryAggs] = None)
+
+// Schema of ES query results
+case class EsSearchHit(source: JsObject)
+case class EsSearchHits(hits: Vector[EsSearchHit])
+case class EsSearchResult(hits: EsSearchHits)
+
+object ElasticSearchJsonProtocol extends DefaultJsonProtocol {
+
+  implicit object EsQueryMatchJsonFormat extends RootJsonFormat[EsQueryMatch] {
+    def read(query: JsValue) = ???
+    def write(query: EsQueryMatch) = {
+      val matchQuery = Map("query" -> query.value.toJson) ++ query.matchType.map(m => "type" -> m.toString.toJson)
+      JsObject("match" -> JsObject(query.field -> matchQuery.toJson))
+    }
+  }
+
+  implicit object EsQueryTermJsonFormat extends RootJsonFormat[EsQueryTerm] {
+    def read(query: JsValue) = ???
+    def write(query: EsQueryTerm) = JsObject("term" -> JsObject(query.key -> query.value.toJson))
+  }
+
+  implicit object EsQueryStringJsonFormat extends RootJsonFormat[EsQueryString] {
+    def read(query: JsValue) = ???
+    def write(query: EsQueryString) =
+      JsObject("query_string" -> JsObject("query" -> query.queryString.toJson))
+  }
+
+  implicit object EsQueryRangeJsonFormat extends RootJsonFormat[EsQueryRange] {
+    def read(query: JsValue) = ???
+    def write(query: EsQueryRange) =
+      JsObject("range" -> JsObject(query.key -> JsObject(query.range.toString -> query.value.toJson)))
+  }
+
+  implicit object EsQueryBoolMatchJsonFormat extends RootJsonFormat[EsQueryBoolMatch] {
+    def read(query: JsValue) = ???
+    def write(query: EsQueryBoolMatch) = JsObject("match" -> JsObject(query.key -> query.value.toJson))
+  }
+
+  implicit object EsQueryMustJsonFormat extends RootJsonFormat[EsQueryMust] {
+    def read(query: JsValue) = ???
+    def write(query: EsQueryMust) = {
+      val boolQuery = Map("must" -> query.matches.toJson) ++ query.range.map(r => "filter" -> r.toJson)
+      JsObject("bool" -> boolQuery.toJson)
+    }
+  }
+
+  implicit object EsQueryOrderJsonFormat extends RootJsonFormat[EsQueryOrder] {
+    def read(query: JsValue) = ???
+    def write(query: EsQueryOrder) =
+      JsArray(JsObject(query.field -> JsObject("order" -> query.kind.toString.toJson)))
+  }
+
+  implicit object EsQuerySizeJsonFormat extends RootJsonFormat[EsQuerySize] {
+    def read(query: JsValue) = ???
+    def write(query: EsQuerySize) = JsNumber(query.size)
+  }
+
+  implicit object EsQueryAggsJsonFormat extends RootJsonFormat[EsQueryAggs] {
+    def read(query: JsValue) = ???
+    def write(query: EsQueryAggs) =
+      JsObject(query.aggField -> JsObject(query.agg.toString -> JsObject("field" -> query.field.toJson)))
+  }
+
+  implicit object EsQueryAllJsonFormat extends RootJsonFormat[EsQueryAll] {
+    def read(query: JsValue) = ???
+    def write(query: EsQueryAll) = JsObject("match_all" -> JsObject())
+  }
+
+  implicit object EsQueryMethod extends RootJsonFormat[EsQueryMethod] {
+    def read(query: JsValue) = ???
+    def write(method: EsQueryMethod) = method match {
+      case queryTerm: EsQueryTerm     => queryTerm.toJson
+      case queryString: EsQueryString => queryString.toJson
+      case queryMatch: EsQueryMatch   => queryMatch.toJson
+      case queryMust: EsQueryMust     => queryMust.toJson
+      case queryAll: EsQueryAll       => queryAll.toJson
+    }
+  }
+
+  implicit val esQueryFormat = jsonFormat4(EsQuery.apply)
+  implicit val esSearchHitFormat = jsonFormat(EsSearchHit.apply _, "_source")
+  implicit val esSearchHitsFormat = jsonFormat1(EsSearchHits.apply)
+  implicit val esSearchResultFormat = jsonFormat1(EsSearchResult.apply)
+}
+
+class ElasticSearchRestClient(
+  protocol: String,
+  host: String,
+  port: Int,
+  httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None)(
+  implicit system: ActorSystem)
+    extends PoolingRestClient(protocol, host, port, 16 * 1024, httpFlow) {
+
+  import ElasticSearchJsonProtocol._
+
+  private val baseHeaders: List[HttpHeader] = List(Accept(MediaTypes.`application/json`))
+
+  def info(headers: List[HttpHeader] = List.empty): Future[Either[StatusCode, JsObject]] = {
+    requestJson[JsObject](mkRequest(GET, Uri./, baseHeaders ++ headers))
+  }
+
+  def index(index: String, headers: List[HttpHeader] = List.empty): Future[Either[StatusCode, JsObject]] = {
+    requestJson[JsObject](mkRequest(GET, Uri(index), baseHeaders ++ headers))
+  }
+
+  def search[T: RootJsonReader](index: String,
+                                payload: EsQuery = EsQuery(EsQueryAll()),
+                                headers: List[HttpHeader] = List.empty): Future[Either[StatusCode, T]] =
+    requestJson[T](mkJsonRequest(POST, Uri(index), payload.toJson.asJsObject, baseHeaders ++ headers))
+}
diff --git a/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala b/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
index ef09770..0da6630 100644
--- a/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
+++ b/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
@@ -20,6 +20,7 @@ package whisk.http
 import scala.concurrent.Future
 import scala.concurrent.Promise
 import scala.util.{Failure, Success}
+import scala.util.Try
 
 import akka.actor.ActorSystem
 import akka.http.scaladsl.Http
@@ -32,6 +33,7 @@ import akka.stream.ActorMaterializer
 import akka.stream.OverflowStrategy
 import akka.stream.QueueOfferResult
 import akka.stream.scaladsl._
+import akka.stream.scaladsl.Flow
 
 import spray.json._
 
@@ -42,7 +44,13 @@ import spray.json._
  *  on each request because it doesn't need to look up the pool corresponding
  *  to the host. It is also easier to add an extra queueing mechanism.
  */
-class PoolingRestClient(protocol: String, host: String, port: Int, queueSize: Int)(implicit system: ActorSystem) {
+class PoolingRestClient(
+  protocol: String,
+  host: String,
+  port: Int,
+  queueSize: Int,
+  httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None)(
+  implicit system: ActorSystem) {
   require(protocol == "http" || protocol == "https", "Protocol must be one of { http, https }.")
 
   implicit val context = system.dispatcher
@@ -55,6 +63,10 @@ class PoolingRestClient(protocol: String, host: String, port: Int, queueSize: In
     Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host = host, port = port)
   }
 
+  private val defaultHttpFlow = pool.mapMaterializedValue { x =>
+    poolPromise.success(x); x
+  }
+
   private val poolPromise = Promise[HostConnectionPool]
 
   // Additional queue in case all connections are busy. Should hardly ever be
@@ -62,9 +74,7 @@ class PoolingRestClient(protocol: String, host: String, port: Int, queueSize: In
   // asynchronous requests in a very short period of time.
   private val requestQueue = Source
     .queue(queueSize, OverflowStrategy.dropNew)
-    .via(pool.mapMaterializedValue { x =>
-      poolPromise.success(x); x
-    })
+    .via(httpFlow.getOrElse(defaultHttpFlow))
     .toMat(Sink.foreach({
       case ((Success(response), p)) => p.success(response)
       case ((Failure(error), p))    => p.failure(error)
diff --git a/core/controller/src/main/scala/whisk/core/controller/ApiUtils.scala b/core/controller/src/main/scala/whisk/core/controller/ApiUtils.scala
index ea74d72..ed52693 100644
--- a/core/controller/src/main/scala/whisk/core/controller/ApiUtils.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/ApiUtils.scala
@@ -184,7 +184,13 @@ trait ReadOps extends Directives {
     onComplete(factory.get(datastore, docid)) {
       case Success(entity) =>
         logging.debug(this, s"[PROJECT] entity success")
-        complete(OK, project(entity))
+
+        onComplete(project(entity)) {
+          case Success(response: JsObject) => complete(OK, response)
+          case Failure(t: Throwable) =>
+            logging.error(this, s"[PROJECT] projection failed: ${t.getMessage}")
+            terminate(InternalServerError, t.getMessage)
+        }
       case Failure(t: NoDocumentException) =>
         logging.debug(this, s"[PROJECT] entity does not exist")
         terminate(NotFound)
diff --git a/tests/src/test/scala/whisk/core/containerpool/logging/ElasticSearchLogStoreTests.scala b/tests/src/test/scala/whisk/core/containerpool/logging/ElasticSearchLogStoreTests.scala
new file mode 100644
index 0000000..13ca7dc
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/containerpool/logging/ElasticSearchLogStoreTests.scala
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.logging
+
+import java.time.ZonedDateTime
+
+import akka.NotUsed
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.model.HttpMethods.{GET, POST}
+import akka.http.scaladsl.model.headers.{Accept, RawHeader}
+import akka.stream.scaladsl.Flow
+import akka.stream.{ActorMaterializer, StreamTcpException}
+import akka.testkit.TestKit
+
+import common.StreamLogging
+
+import org.junit.runner.RunWith
+import org.scalatest.{FlatSpecLike, Matchers}
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+
+import pureconfig.error.ConfigReaderException
+
+import spray.json._
+
+import whisk.core.entity._
+import whisk.core.entity.size._
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, ExecutionContext, Future, Promise}
+import scala.util.{Success, Try}
+
+@RunWith(classOf[JUnitRunner])
+class ElasticSearchLogStoreTests
+    extends TestKit(ActorSystem("ElasticSearchLogStore"))
+    with FlatSpecLike
+    with Matchers
+    with ScalaFutures
+    with StreamLogging {
+
+  implicit val ec: ExecutionContext = system.dispatcher
+  implicit val materializer: ActorMaterializer = ActorMaterializer()
+
+  private val user = Identity(Subject(), EntityName("testSpace"), AuthKey(), Set())
+  private val activationId = ActivationId.generate()
+
+  private val defaultLogSchema =
+    ElasticSearchLogFieldConfig("user_logs", "message", "activationId_str", "stream_str", "time_date")
+  private val defaultConfig =
+    ElasticSearchLogStoreConfig("https", "host", 443, "/whisk_user_logs/_search", defaultLogSchema)
+  private val defaultConfigRequiredHeaders =
+    ElasticSearchLogStoreConfig(
+      "https",
+      "host",
+      443,
+      "/whisk_user_logs/_search",
+      defaultLogSchema,
+      Seq("x-auth-token", "x-auth-project-id"))
+
+  private val defaultHttpResponse = HttpResponse(
+    StatusCodes.OK,
+    entity = HttpEntity(
+      ContentTypes.`application/json`,
+      s"""{"took":799,"timed_out":false,"_shards":{"total":204,"successful":204,"failed":0},"hits":{"total":2,"max_score":null,"hits":[{"_index":"logstash-2018.03.05.02","_type":"user_logs","_id":"1c00007f-ecb9-4083-8d2e-4d5e2849621f","_score":null,"_source":{"time_date":"2018-03-05T02:10:38.196689522Z","accountId":null,"message":"some log stuff\\n","type":"user_logs","event_uuid":"1c00007f-ecb9-4083-8d2e-4d5e2849621f","activationId_str":"$activationId","action_str":"user@email.com/logs" [...]
+  private val defaultPayload = JsObject(
+    "query" -> JsObject(
+      "query_string" -> JsObject("query" -> JsString(
+        s"_type: ${defaultConfig.logSchema.userLogs} AND ${defaultConfig.logSchema.activationId}: $activationId"))),
+    "sort" -> JsArray(JsObject(defaultConfig.logSchema.time -> JsObject("order" -> JsString("asc"))))).compactPrint
+  private val defaultHttpRequest = HttpRequest(
+    POST,
+    Uri(s"/whisk_user_logs/_search"),
+    List(Accept(MediaTypes.`application/json`)),
+    HttpEntity(ContentTypes.`application/json`, defaultPayload))
+  private val defaultLogStoreHttpRequest =
+    HttpRequest(method = GET, uri = "https://some.url", entity = HttpEntity.Empty)
+
+  private val expectedLogs = ActivationLogs(
+    Vector("2018-03-05T02:10:38.196689522Z stdout: some log stuff", "2018-03-05T02:10:38.196754258Z stdout: more logs"))
+
+  private val activation = WhiskActivation(
+    namespace = EntityPath("namespace"),
+    name = EntityName("name"),
+    Subject(),
+    activationId = activationId,
+    start = ZonedDateTime.now.toInstant,
+    end = ZonedDateTime.now.toInstant,
+    response = ActivationResponse.success(Some(JsObject("res" -> JsNumber(1)))),
+    logs = expectedLogs,
+    annotations = Parameters("limits", ActionLimits(TimeLimit(1.second), MemoryLimit(128.MB), LogLimit(1.MB)).toJson))
+
+  private def testFlow(httpResponse: HttpResponse = HttpResponse(), httpRequest: HttpRequest = HttpRequest())
+    : Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), NotUsed] =
+    Flow[(HttpRequest, Promise[HttpResponse])]
+      .mapAsyncUnordered(1) {
+        case (request, userContext) =>
+          request shouldBe httpRequest
+          Future.successful((Success(httpResponse), userContext))
+      }
+
+  private def await[T](awaitable: Future[T], timeout: FiniteDuration = 10.seconds) = Await.result(awaitable, timeout)
+
+  behavior of "ElasticSearch Log Store"
+
+  it should "fail when loading out of box configs since whisk.logstore.elasticsearch does not exist" in {
+    a[ConfigReaderException[_]] should be thrownBy new ElasticSearchLogStore(system)
+  }
+
+  it should "get user logs from ElasticSearch when there are no required headers needed" in {
+    val esLogStore =
+      new ElasticSearchLogStore(
+        system,
+        Some(testFlow(defaultHttpResponse, defaultHttpRequest)),
+        elasticSearchConfig = defaultConfig)
+
+    await(esLogStore.fetchLogs(user, activation.withoutLogs, defaultLogStoreHttpRequest)) shouldBe expectedLogs
+  }
+
+  it should "get logs from supplied activation record when required headers are not present" in {
+    val esLogStore =
+      new ElasticSearchLogStore(
+        system,
+        Some(testFlow(defaultHttpResponse, defaultHttpRequest)),
+        elasticSearchConfig = defaultConfigRequiredHeaders)
+
+    await(esLogStore.fetchLogs(user, activation, defaultLogStoreHttpRequest)) shouldBe expectedLogs
+  }
+
+  it should "get user logs from ElasticSearch when required headers are needed" in {
+    val authToken = "token"
+    val authProjectId = "projectId"
+    val httpRequest = HttpRequest(
+      POST,
+      Uri(s"/whisk_user_logs/_search"),
+      List(
+        Accept(MediaTypes.`application/json`),
+        RawHeader("x-auth-token", authToken),
+        RawHeader("x-auth-project-id", authProjectId)),
+      HttpEntity(ContentTypes.`application/json`, defaultPayload))
+    val esLogStore =
+      new ElasticSearchLogStore(
+        system,
+        Some(testFlow(defaultHttpResponse, httpRequest)),
+        elasticSearchConfig = defaultConfigRequiredHeaders)
+    val requiredHeadersHttpRequest = HttpRequest(
+      uri = "https://some.url",
+      headers = List(RawHeader("x-auth-token", authToken), RawHeader("x-auth-project-id", authProjectId)),
+      entity = HttpEntity.Empty)
+
+    await(esLogStore.fetchLogs(user, activation.withoutLogs, requiredHeadersHttpRequest)) shouldBe expectedLogs
+  }
+
+  it should "dynamically replace $UUID in request path" in {
+    val dynamicPathConfig =
+      ElasticSearchLogStoreConfig("https", "host", 443, "/elasticsearch/logstash-%s*/_search", defaultLogSchema)
+    val httpRequest = HttpRequest(
+      POST,
+      Uri(s"/elasticsearch/logstash-${user.uuid.asString}*/_search"),
+      List(Accept(MediaTypes.`application/json`)),
+      HttpEntity(ContentTypes.`application/json`, defaultPayload))
+    val esLogStore = new ElasticSearchLogStore(
+      system,
+      Some(testFlow(defaultHttpResponse, httpRequest)),
+      elasticSearchConfig = dynamicPathConfig)
+
+    await(esLogStore.fetchLogs(user, activation.withoutLogs, defaultLogStoreHttpRequest)) shouldBe expectedLogs
+  }
+
+  it should "fail to connect to invalid host" in {
+    val esLogStore = new ElasticSearchLogStore(system, elasticSearchConfig = defaultConfig)
+
+    a[StreamTcpException] should be thrownBy await(esLogStore.fetchLogs(user, activation, defaultLogStoreHttpRequest))
+  }
+
+  it should "forward errors from ElasticSearch" in {
+    val httpResponse = HttpResponse(StatusCodes.InternalServerError)
+    val esLogStore =
+      new ElasticSearchLogStore(
+        system,
+        Some(testFlow(httpResponse, defaultHttpRequest)),
+        elasticSearchConfig = defaultConfig)
+
+    a[RuntimeException] should be thrownBy await(esLogStore.fetchLogs(user, activation, defaultLogStoreHttpRequest))
+  }
+
+  it should "error when configuration protocol is invalid" in {
+    val invalidHostConfig =
+      ElasticSearchLogStoreConfig("protocol", "host", 443, "/whisk_user_logs", defaultLogSchema, Seq.empty)
+
+    a[IllegalArgumentException] should be thrownBy new ElasticSearchLogStore(
+      system,
+      elasticSearchConfig = invalidHostConfig)
+  }
+
+}
diff --git a/tests/src/test/scala/whisk/core/containerpool/logging/ElasticSearchRestClientTests.scala b/tests/src/test/scala/whisk/core/containerpool/logging/ElasticSearchRestClientTests.scala
new file mode 100644
index 0000000..74eccdf
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/containerpool/logging/ElasticSearchRestClientTests.scala
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.logging
+
+import spray.json._
+
+import org.junit.runner.RunWith
+import org.scalatest.{FlatSpecLike, Matchers}
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+
+import akka.NotUsed
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.model.headers.Accept
+import akka.stream.scaladsl.Flow
+import akka.stream.ActorMaterializer
+import akka.testkit.TestKit
+import akka.http.scaladsl.model.HttpMethods.POST
+
+import common.StreamLogging
+
+import whisk.core.containerpool.logging.ElasticSearchJsonProtocol._
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, ExecutionContext, Future, Promise}
+import scala.util.{Success, Try}
+
+@RunWith(classOf[JUnitRunner])
+class ElasticSearchRestClientTests
+    extends TestKit(ActorSystem("ElasticSearchRestClient"))
+    with FlatSpecLike
+    with Matchers
+    with ScalaFutures
+    with StreamLogging {
+
+  implicit val ec: ExecutionContext = system.dispatcher
+  implicit val materializer: ActorMaterializer = ActorMaterializer()
+
+  private val defaultResponseSource =
+    """{"stream":"stdout","activationId":"197d60b33137424ebd60b33137d24ea3","action":"guest/someAction","@version":"1","@timestamp":"2018-03-27T15:48:09.112Z","type":"user_logs","tenant":"19bc46b1-71f6-4ed5-8c54-816aa4f8c502","message":"namespace     : user@email.com\n","time_date":"2018-03-27T15:48:08.716152793Z"}"""
+  private val defaultResponse =
+    s"""{"took":2,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":1375,"max_score":1.0,"hits":[{"_index":"whisk_user_logs","_type":"user_logs","_id":"AWJoJSwAMGbzgxiD1jr9","_score":1.0,"_source":$defaultResponseSource}]}}"""
+  private val defaultHttpResponse = HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, defaultResponse))
+  private val defaultHttpRequest = HttpRequest(
+    POST,
+    headers = List(Accept(MediaTypes.`application/json`)),
+    entity = HttpEntity(ContentTypes.`application/json`, EsQuery(EsQueryAll()).toJson.toString))
+
+  private def testFlow(httpResponse: HttpResponse = HttpResponse(), httpRequest: HttpRequest = HttpRequest())
+    : Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), NotUsed] =
+    Flow[(HttpRequest, Promise[HttpResponse])]
+      .mapAsyncUnordered(1) {
+        case (request, userContext) =>
+          request shouldBe httpRequest
+          Future.successful((Success(httpResponse), userContext))
+      }
+
+  private def await[T](awaitable: Future[T], timeout: FiniteDuration = 10.seconds) = Await.result(awaitable, timeout)
+
+  behavior of "ElasticSearch Rest Client"
+
+  it should "construct a query with must" in {
+    val queryTerms = Vector(EsQueryBoolMatch("someKey1", "someValue1"), EsQueryBoolMatch("someKey2", "someValue2"))
+    val queryMust = EsQueryMust(queryTerms)
+
+    EsQuery(queryMust).toJson shouldBe JsObject(
+      "query" ->
+        JsObject(
+          "bool" ->
+            JsObject(
+              "must" ->
+                JsArray(
+                  JsObject("match" -> JsObject("someKey1" -> JsString("someValue1"))),
+                  JsObject("match" -> JsObject("someKey2" -> JsString("someValue2")))))))
+
+    // Test must with ranges
+    Seq((EsRangeGte, "gte"), (EsRangeGt, "gt"), (EsRangeLte, "lte"), (EsRangeLt, "lt")).foreach {
+      case (rangeArg, rangeValue) =>
+        val queryRange = EsQueryRange("someKey", rangeArg, "someValue")
+        val queryTerms = Vector(EsQueryBoolMatch("someKey1", "someValue1"), EsQueryBoolMatch("someKey2", "someValue2"))
+        val queryMust = EsQueryMust(queryTerms, Some(queryRange))
+
+        EsQuery(queryMust).toJson shouldBe JsObject(
+          "query" ->
+            JsObject(
+              "bool" ->
+                JsObject(
+                  "must" ->
+                    JsArray(
+                      JsObject("match" -> JsObject("someKey1" -> JsString("someValue1"))),
+                      JsObject("match" -> JsObject("someKey2" -> JsString("someValue2")))),
+                  "filter" ->
+                    JsObject("range" ->
+                      JsObject("someKey" ->
+                        JsObject(rangeValue -> "someValue".toJson))))))
+    }
+  }
+
+  it should "construct a query with aggregations" in {
+    Seq((EsAggMax, "max"), (EsAggMin, "min")).foreach {
+      case (aggArg, aggValue) =>
+        val queryAgg = EsQueryAggs("someAgg", aggArg, "someField")
+
+        EsQuery(EsQueryAll(), aggs = Some(queryAgg)).toJson shouldBe JsObject(
+          "query" -> JsObject("match_all" -> JsObject()),
+          "aggs" -> JsObject("someAgg" -> JsObject(aggValue -> JsObject("field" -> "someField".toJson))))
+    }
+  }
+
+  it should "construct a query with match" in {
+    val queryMatch = EsQueryMatch("someField", "someValue")
+
+    EsQuery(queryMatch).toJson shouldBe JsObject(
+      "query" -> JsObject("match" -> JsObject("someField" -> JsObject("query" -> "someValue".toJson))))
+
+    // Test match with types
+    Seq((EsMatchPhrase, "phrase"), (EsMatchPhrasePrefix, "phrase_prefix")).foreach {
+      case (typeArg, typeValue) =>
+        val queryMatch = EsQueryMatch("someField", "someValue", Some(typeArg))
+
+        EsQuery(queryMatch).toJson shouldBe JsObject(
+          "query" -> JsObject(
+            "match" -> JsObject("someField" -> JsObject("query" -> "someValue".toJson, "type" -> typeValue.toJson))))
+    }
+  }
+
+  it should "construct a query with term" in {
+    val queryTerm = EsQueryTerm("user", "someUser")
+
+    EsQuery(queryTerm).toJson shouldBe JsObject("query" -> JsObject("term" -> JsObject("user" -> JsString("someUser"))))
+  }
+
+  it should "construct a query with query string" in {
+    val queryString = EsQueryString("_type: someType")
+
+    EsQuery(queryString).toJson shouldBe JsObject(
+      "query" -> JsObject("query_string" -> JsObject("query" -> JsString("_type: someType"))))
+  }
+
+  it should "construct a query with order" in {
+    Seq((EsOrderAsc, "asc"), (EsOrderDesc, "desc")).foreach {
+      case (orderArg, orderValue) =>
+        val queryOrder = EsQueryOrder("someField", orderArg)
+
+        EsQuery(EsQueryAll(), Some(queryOrder)).toJson shouldBe JsObject(
+          "query" -> JsObject("match_all" -> JsObject()),
+          "sort" -> JsArray(JsObject("someField" -> JsObject("order" -> orderValue.toJson))))
+    }
+  }
+
+  it should "construct query with size" in {
+    val querySize = EsQuerySize(1)
+
+    EsQuery(EsQueryAll(), size = Some(querySize)).toJson shouldBe JsObject(
+      "query" -> JsObject("match_all" -> JsObject()),
+      "size" -> JsNumber(1))
+  }
+
+  it should "error when search response does not match expected type" in {
+    val esClient = new ElasticSearchRestClient("https", "host", 443, Some(testFlow(httpRequest = defaultHttpRequest)))
+
+    a[RuntimeException] should be thrownBy await(esClient.search[JsObject]("/"))
+  }
+
+  it should "parse search response into EsSearchResult" in {
+    val esClient =
+      new ElasticSearchRestClient("https", "host", 443, Some(testFlow(defaultHttpResponse, defaultHttpRequest)))
+    val response = await(esClient.search[EsSearchResult]("/"))
+
+    response shouldBe 'right
+    response.right.get.hits.hits should have size 1
+    response.right.get.hits.hits(0).source shouldBe defaultResponseSource.parseJson.asJsObject
+  }
+
+  it should "return status code when HTTP error occurs" in {
+    val httpResponse = HttpResponse(StatusCodes.InternalServerError)
+    val esClient = new ElasticSearchRestClient("https", "host", 443, Some(testFlow(httpResponse, defaultHttpRequest)))
+    val response = await(esClient.search[JsObject]("/"))
+
+    response shouldBe 'left
+    response.left.get shouldBe StatusCodes.InternalServerError
+  }
+
+  it should "perform info request" in {
+    val responseBody = s"""{"cluster_name" : "elasticsearch"}"""
+    val httpRequest = HttpRequest(headers = List(Accept(MediaTypes.`application/json`)))
+    val httpResponse = HttpResponse(StatusCodes.OK, entity = HttpEntity(ContentTypes.`application/json`, responseBody))
+    val esClient =
+      new ElasticSearchRestClient("https", "host", 443, Some(testFlow(httpResponse, httpRequest)))
+    val response = await(esClient.info())
+
+    response shouldBe 'right
+    response.right.get shouldBe responseBody.parseJson.asJsObject
+  }
+
+  it should "perform index request" in {
+    val responseBody = s"""{"some_index" : {}}"""
+    val httpRequest = HttpRequest(uri = Uri("some_index"), headers = List(Accept(MediaTypes.`application/json`)))
+    val httpResponse = HttpResponse(StatusCodes.OK, entity = HttpEntity(ContentTypes.`application/json`, responseBody))
+    val esClient =
+      new ElasticSearchRestClient("https", "host", 443, Some(testFlow(httpResponse, httpRequest)))
+    val response = await(esClient.index("some_index"))
+
+    response shouldBe 'right
+    response.right.get shouldBe responseBody.parseJson.asJsObject
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
markusthoemmes@apache.org.