You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/06/11 15:08:21 UTC

[GitHub] dubee closed pull request #3567: Activation result integration with log store SPI

dubee closed pull request #3567: Activation result integration with log store SPI
URL: https://github.com/apache/incubator-openwhisk/pull/3567
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala
index b4e3983af4..ea6aaf8ba8 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala
@@ -27,7 +27,7 @@ import akka.util.ByteString
 
 import whisk.common.TransactionId
 import whisk.core.containerpool.Container
-import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation}
+import whisk.core.entity.{ActivationLogs, ActivationResponse, ExecutableWhiskAction, Identity, WhiskActivation}
 import whisk.http.Messages
 
 import scala.concurrent.{ExecutionContext, Future}
@@ -70,6 +70,12 @@ class DockerToActivationLogStore(system: ActorSystem) extends LogStore {
   override def fetchLogs(user: Identity, activation: WhiskActivation, request: HttpRequest): Future[ActivationLogs] =
     Future.successful(activation.logs)
 
+  /* As result is already part of the activation record, just return that bit of it */
+  override def fetchResponse(user: Identity,
+                             activation: WhiskActivation,
+                             request: HttpRequest): Future[ActivationResponse] =
+    Future.successful(activation.response)
+
   override def collectLogs(transid: TransactionId,
                            user: Identity,
                            activation: WhiskActivation,
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchLogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchLogStore.scala
index 26f0cd3f30..2b7d04dbe8 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchLogStore.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchLogStore.scala
@@ -23,7 +23,7 @@ import akka.actor.ActorSystem
 import akka.stream.scaladsl.Flow
 import akka.http.scaladsl.model._
 
-import whisk.core.entity.{ActivationLogs, Identity, WhiskActivation}
+import whisk.core.entity.{ActivationLogs, ActivationResponse, Identity, WhiskActivation}
 import whisk.core.containerpool.logging.ElasticSearchJsonProtocol._
 import whisk.core.ConfigKeys
 
@@ -35,6 +35,7 @@ import spray.json._
 import pureconfig._
 
 case class ElasticSearchLogFieldConfig(userLogs: String,
+                                       activationRecord: String,
                                        message: String,
                                        activationId: String,
                                        stream: String,
@@ -60,7 +61,7 @@ class ElasticSearchLogStore(
     loadConfigOrThrow[ElasticSearchLogStoreConfig](ConfigKeys.elasticSearch))
     extends DockerToActivationFileLogStore(system, destinationDirectory) {
 
-  // Schema of resultant logs from ES
+  // Schema of logs from ES
   case class UserLogEntry(message: String, stream: String, time: String) {
     def toFormattedString = s"${time} ${stream}: ${message.stripLineEnd}"
   }
@@ -74,6 +75,16 @@ class ElasticSearchLogStore(
         elasticSearchConfig.logSchema.time)
   }
 
+  // Scehma of results from ES
+  case class UserResponseEntry(message: String) {
+    def toFormattedString = message
+  }
+
+  object UserResponseEntry extends DefaultJsonProtocol {
+    implicit val serdes =
+      jsonFormat(UserResponseEntry.apply _, elasticSearchConfig.logSchema.message)
+  }
+
   implicit val actorSystem = system
 
   private val esClient = new ElasticSearchRestClient(
@@ -85,13 +96,31 @@ class ElasticSearchLogStore(
   private def transcribeLogs(queryResult: EsSearchResult): ActivationLogs =
     ActivationLogs(queryResult.hits.hits.map(_.source.convertTo[UserLogEntry].toFormattedString))
 
+  private def transcribeResponse(queryResult: EsSearchResult): ActivationResponse = {
+    val res = queryResult.hits.hits.map(_.source.convertTo[UserResponseEntry].toFormattedString)
+
+    ActivationResponse.success(Some(res(0).parseJson.asJsObject))
+  }
+
   private def extractRequiredHeaders(headers: Seq[HttpHeader]) =
     headers.filter(h => elasticSearchConfig.requiredHeaders.contains(h.lowercaseName)).toList
 
-  private def generatePayload(activation: WhiskActivation) = {
-    val logQuery =
+  private def generateLogPayload(activation: WhiskActivation) = {
+    val query =
       s"_type: ${elasticSearchConfig.logSchema.userLogs} AND ${elasticSearchConfig.logSchema.activationId}: ${activation.activationId}"
-    val queryString = EsQueryString(logQuery)
+
+    generateEsQuery(query)
+  }
+
+  private def generateResponsePayload(activation: WhiskActivation) = {
+    val query =
+      s"_type: ${elasticSearchConfig.logSchema.activationRecord} AND ${elasticSearchConfig.logSchema.activationId}: ${activation.activationId}"
+
+    generateEsQuery(query)
+  }
+
+  private def generateEsQuery(query: String) = {
+    val queryString = EsQueryString(query)
     val queryOrder = EsQueryOrder(elasticSearchConfig.logSchema.time, EsOrderAsc)
 
     EsQuery(queryString, Some(queryOrder))
@@ -104,7 +133,7 @@ class ElasticSearchLogStore(
 
     // Return logs from ElasticSearch, or return logs from activation if required headers are not present
     if (headers.length == elasticSearchConfig.requiredHeaders.length) {
-      esClient.search[EsSearchResult](generatePath(user), generatePayload(activation), headers).flatMap {
+      esClient.search[EsSearchResult](generatePath(user), generateLogPayload(activation), headers).flatMap {
         case Right(queryResult) =>
           Future.successful(transcribeLogs(queryResult))
         case Left(code) =>
@@ -114,6 +143,24 @@ class ElasticSearchLogStore(
       Future.successful(activation.logs)
     }
   }
+
+  override def fetchResponse(user: Identity,
+                             activation: WhiskActivation,
+                             request: HttpRequest): Future[ActivationResponse] = {
+    val headers = extractRequiredHeaders(request.headers)
+
+    // Return result from ElasticSearch, or return result from activation if required headers are not present
+    if (headers.length == elasticSearchConfig.requiredHeaders.length) {
+      esClient.search[EsSearchResult](generatePath(user), generateResponsePayload(activation), headers).flatMap {
+        case Right(queryResult) =>
+          Future.successful(transcribeResponse(queryResult))
+        case Left(code) =>
+          Future.failed(new RuntimeException(s"Status code '$code' was returned from log store"))
+      }
+    } else {
+      Future.successful(activation.response)
+    }
+  }
 }
 
 object ElasticSearchLogStoreProvider extends LogStoreProvider {
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala
index 5320d4d1d7..af25ddd75a 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala
@@ -23,7 +23,7 @@ import akka.http.scaladsl.model.HttpRequest
 import whisk.core.entity.Identity
 import whisk.common.TransactionId
 import whisk.core.containerpool.Container
-import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation}
+import whisk.core.entity.{ActivationLogs, ActivationResponse, ExecutableWhiskAction, Identity, WhiskActivation}
 
 import scala.concurrent.Future
 
@@ -51,6 +51,11 @@ class LogDriverLogStore(actorSystem: ActorSystem) extends LogStore {
    * e.g. the SplunkLogStore to expose logs from some external source */
   def fetchLogs(user: Identity, activation: WhiskActivation, request: HttpRequest): Future[ActivationLogs] =
     Future.successful(ActivationLogs(Vector("Logs are not available.")))
+
+  /** no result exposed to API/CLI using only the LogDriverLogStore; use an extended version,
+   * e.g. the ElasticSearchLogStore to expose logs from some external source */
+  def fetchResponse(user: Identity, activation: WhiskActivation, request: HttpRequest): Future[ActivationResponse] =
+    Future.successful(ActivationResponse.applicationError("Result is not available."))
 }
 
 object LogDriverLogStoreProvider extends LogStoreProvider {
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala
index 28c5b9e93f..6bf8f87e4e 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala
@@ -22,7 +22,7 @@ import akka.http.scaladsl.model.HttpRequest
 
 import whisk.common.TransactionId
 import whisk.core.containerpool.Container
-import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation}
+import whisk.core.entity.{ActivationLogs, ActivationResponse, ExecutableWhiskAction, Identity, WhiskActivation}
 import whisk.spi.Spi
 
 import scala.concurrent.Future
@@ -77,6 +77,16 @@ trait LogStore {
    * @return the relevant logs
    */
   def fetchLogs(user: Identity, activation: WhiskActivation, request: HttpRequest): Future[ActivationLogs]
+
+  /**
+   * Fetch relevant result for the given activation from the store.
+   *
+   * This method is called when a user requests a result via the API.
+   *
+   * @param activation activation to fetch the result for
+   * @return the relevant result
+   */
+  def fetchResponse(user: Identity, activation: WhiskActivation, request: HttpRequest): Future[ActivationResponse]
 }
 
 trait LogStoreProvider extends Spi {
diff --git a/core/controller/src/main/scala/whisk/core/controller/Activations.scala b/core/controller/src/main/scala/whisk/core/controller/Activations.scala
index 3c57d0d480..6693653989 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Activations.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Activations.scala
@@ -19,7 +19,6 @@ package whisk.core.controller
 
 import java.time.Instant
 
-import scala.concurrent.Future
 import scala.language.postfixOps
 import scala.util.{Failure, Success, Try}
 import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport.sprayJsonMarshaller
@@ -212,7 +211,7 @@ trait WhiskActivationsApi extends Directives with AuthenticatedRouteProvider wit
         docid,
         postProcess = Some((activation: WhiskActivation) => complete(activation.toExtendedJson)))
 
-    } ~ (pathPrefix(resultPath) & pathEnd) { fetchResponse(docid) } ~
+    } ~ (pathPrefix(resultPath) & pathEnd) { fetchResponse(user, docid) } ~
       (pathPrefix(logsPath) & pathEnd) { fetchLogs(user, docid) }
   }
 
@@ -224,12 +223,14 @@ trait WhiskActivationsApi extends Directives with AuthenticatedRouteProvider wit
    * - 404 Not Found
    * - 500 Internal Server Error
    */
-  private def fetchResponse(docid: DocId)(implicit transid: TransactionId) = {
-    getEntityAndProject(
-      WhiskActivation,
-      activationStore,
-      docid,
-      (activation: WhiskActivation) => Future.successful(activation.response.toExtendedJson))
+  private def fetchResponse(user: Identity, docid: DocId)(implicit transid: TransactionId) = {
+    extractRequest { request =>
+      getEntityAndProject(
+        WhiskActivation,
+        activationStore,
+        docid,
+        (activation: WhiskActivation) => logStore.fetchResponse(user, activation, request).map(_.toExtendedJson))
+    }
   }
 
   /**
diff --git a/tests/src/test/scala/whisk/core/containerpool/logging/ElasticSearchLogStoreTests.scala b/tests/src/test/scala/whisk/core/containerpool/logging/ElasticSearchLogStoreTests.scala
index 13ca7dc7a4..07a2a00f1c 100644
--- a/tests/src/test/scala/whisk/core/containerpool/logging/ElasticSearchLogStoreTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/logging/ElasticSearchLogStoreTests.scala
@@ -61,7 +61,13 @@ class ElasticSearchLogStoreTests
   private val activationId = ActivationId.generate()
 
   private val defaultLogSchema =
-    ElasticSearchLogFieldConfig("user_logs", "message", "activationId_str", "stream_str", "time_date")
+    ElasticSearchLogFieldConfig(
+      "user_logs",
+      "activation_record",
+      "message",
+      "activationId_str",
+      "stream_str",
+      "time_date")
   private val defaultConfig =
     ElasticSearchLogStoreConfig("https", "host", 443, "/whisk_user_logs/_search", defaultLogSchema)
   private val defaultConfigRequiredHeaders =
@@ -73,27 +79,46 @@ class ElasticSearchLogStoreTests
       defaultLogSchema,
       Seq("x-auth-token", "x-auth-project-id"))
 
-  private val defaultHttpResponse = HttpResponse(
+  private val defaultLogHttpResponse = HttpResponse(
     StatusCodes.OK,
     entity = HttpEntity(
       ContentTypes.`application/json`,
       s"""{"took":799,"timed_out":false,"_shards":{"total":204,"successful":204,"failed":0},"hits":{"total":2,"max_score":null,"hits":[{"_index":"logstash-2018.03.05.02","_type":"user_logs","_id":"1c00007f-ecb9-4083-8d2e-4d5e2849621f","_score":null,"_source":{"time_date":"2018-03-05T02:10:38.196689522Z","accountId":null,"message":"some log stuff\\n","type":"user_logs","event_uuid":"1c00007f-ecb9-4083-8d2e-4d5e2849621f","activationId_str":"$activationId","action_str":"user@email.com/logs","tenantId":"tenantId","logmet_cluster":"topic1-elasticsearch_1","@timestamp":"2018-03-05T02:11:37.687Z","@version":"1","stream_str":"stdout","timestamp":"2018-03-05T02:10:39.131Z"},"sort":[1520215897687]},{"_index":"logstash-2018.03.05.02","_type":"user_logs","_id":"14c2a5b7-8cad-4ec0-992e-70fab1996465","_score":null,"_source":{"time_date":"2018-03-05T02:10:38.196754258Z","accountId":null,"message":"more logs\\n","type":"user_logs","event_uuid":"14c2a5b7-8cad-4ec0-992e-70fab1996465","activationId_str":"$activationId","action_str":"user@email.com/logs","tenantId":"tenant","logmet_cluster":"topic1-elasticsearch_1","@timestamp":"2018-03-05T02:11:37.701Z","@version":"1","stream_str":"stdout","timestamp":"2018-03-05T02:10:39.131Z"},"sort":[1520215897701]}]}}"""))
-  private val defaultPayload = JsObject(
+  private val defaultResultHttpResponse = HttpResponse(
+    StatusCodes.OK,
+    entity = HttpEntity(
+      ContentTypes.`application/json`,
+      s"""{"took":1061,"timed_out":false,"_shards":{"total":432,"successful":432,"failed":0},"hits":{"total":1,"max_score":null,"hits":[{"_index":"logstash-2018.04.23.05","_type":"activation_record","_id":"cfc00874-8ab1-4e5f-af9b-0c787c5a07c1","_score":null,"_source":{"end_date":"2018-04-23T17:21:34.437Z","name_str":"logs","time_date":"2018-04-23T17:21:34.371Z","accountId":null,"status_str":"0","message":"{\\"res\\":1}","type":"activation_record","duration_int":66,"event_uuid":"cfc00874-8ab1-4e5f-af9b-0c787c5a07c1","namespace_str":"user@email.com","activationId_str":"$activationId","subject_str":"user@email.com","tenantId":"tenantId","logmet_cluster":"topic2-elasticsearch_2","@timestamp":"2018-04-23T17:21:38.911Z","@version":"1","version_str":"0.0.3","timestamp":"2018-04-23T17:21:34.821Z"},"sort":[1524504094371]}]}}"""))
+
+  private val defaultLogPayload = JsObject(
     "query" -> JsObject(
       "query_string" -> JsObject("query" -> JsString(
         s"_type: ${defaultConfig.logSchema.userLogs} AND ${defaultConfig.logSchema.activationId}: $activationId"))),
     "sort" -> JsArray(JsObject(defaultConfig.logSchema.time -> JsObject("order" -> JsString("asc"))))).compactPrint
-  private val defaultHttpRequest = HttpRequest(
+  private val defaultResultPayload = JsObject(
+    "query" -> JsObject("query_string" -> JsObject("query" -> JsString(
+      s"_type: ${defaultConfig.logSchema.activationRecord} AND ${defaultConfig.logSchema.activationId}: $activationId"))),
+    "sort" -> JsArray(JsObject(defaultConfig.logSchema.time -> JsObject("order" -> JsString("asc"))))).compactPrint
+
+  private val defaultLogHttpRequest = HttpRequest(
+    POST,
+    Uri(s"/whisk_user_logs/_search"),
+    List(Accept(MediaTypes.`application/json`)),
+    HttpEntity(ContentTypes.`application/json`, defaultLogPayload))
+  private val defaultResultHttpRequest = HttpRequest(
     POST,
     Uri(s"/whisk_user_logs/_search"),
     List(Accept(MediaTypes.`application/json`)),
-    HttpEntity(ContentTypes.`application/json`, defaultPayload))
+    HttpEntity(ContentTypes.`application/json`, defaultResultPayload))
+
   private val defaultLogStoreHttpRequest =
     HttpRequest(method = GET, uri = "https://some.url", entity = HttpEntity.Empty)
 
   private val expectedLogs = ActivationLogs(
     Vector("2018-03-05T02:10:38.196689522Z stdout: some log stuff", "2018-03-05T02:10:38.196754258Z stdout: more logs"))
 
+  private val expectedResponse = ActivationResponse.success(Some(JsObject("res" -> JsNumber(1))))
+
   private val activation = WhiskActivation(
     namespace = EntityPath("namespace"),
     name = EntityName("name"),
@@ -101,7 +126,7 @@ class ElasticSearchLogStoreTests
     activationId = activationId,
     start = ZonedDateTime.now.toInstant,
     end = ZonedDateTime.now.toInstant,
-    response = ActivationResponse.success(Some(JsObject("res" -> JsNumber(1)))),
+    response = expectedResponse,
     logs = expectedLogs,
     annotations = Parameters("limits", ActionLimits(TimeLimit(1.second), MemoryLimit(128.MB), LogLimit(1.MB)).toJson))
 
@@ -126,23 +151,43 @@ class ElasticSearchLogStoreTests
     val esLogStore =
       new ElasticSearchLogStore(
         system,
-        Some(testFlow(defaultHttpResponse, defaultHttpRequest)),
+        Some(testFlow(defaultLogHttpResponse, defaultLogHttpRequest)),
         elasticSearchConfig = defaultConfig)
 
     await(esLogStore.fetchLogs(user, activation.withoutLogs, defaultLogStoreHttpRequest)) shouldBe expectedLogs
   }
 
+  it should "get result from ElasticSearch when there are no required headers needed" in {
+    val esLogStore =
+      new ElasticSearchLogStore(
+        system,
+        Some(testFlow(defaultResultHttpResponse, defaultResultHttpRequest)),
+        elasticSearchConfig = defaultConfig)
+
+    await(esLogStore.fetchResponse(user, activation.withoutLogsOrResult, defaultLogStoreHttpRequest)) shouldBe expectedResponse
+  }
+
   it should "get logs from supplied activation record when required headers are not present" in {
     val esLogStore =
       new ElasticSearchLogStore(
         system,
-        Some(testFlow(defaultHttpResponse, defaultHttpRequest)),
+        Some(testFlow(defaultLogHttpResponse, defaultLogHttpRequest)),
         elasticSearchConfig = defaultConfigRequiredHeaders)
 
     await(esLogStore.fetchLogs(user, activation, defaultLogStoreHttpRequest)) shouldBe expectedLogs
   }
 
-  it should "get user logs from ElasticSearch when required headers are needed" in {
+  it should "get result from supplied activation record when required headers are not present" in {
+    val esLogStore =
+      new ElasticSearchLogStore(
+        system,
+        Some(testFlow(defaultResultHttpResponse, defaultResultHttpRequest)),
+        elasticSearchConfig = defaultConfigRequiredHeaders)
+
+    await(esLogStore.fetchResponse(user, activation, defaultLogStoreHttpRequest)) shouldBe expectedResponse
+  }
+
+  it should "get logs from ElasticSearch when required headers are needed" in {
     val authToken = "token"
     val authProjectId = "projectId"
     val httpRequest = HttpRequest(
@@ -152,11 +197,11 @@ class ElasticSearchLogStoreTests
         Accept(MediaTypes.`application/json`),
         RawHeader("x-auth-token", authToken),
         RawHeader("x-auth-project-id", authProjectId)),
-      HttpEntity(ContentTypes.`application/json`, defaultPayload))
+      HttpEntity(ContentTypes.`application/json`, defaultLogPayload))
     val esLogStore =
       new ElasticSearchLogStore(
         system,
-        Some(testFlow(defaultHttpResponse, httpRequest)),
+        Some(testFlow(defaultLogHttpResponse, httpRequest)),
         elasticSearchConfig = defaultConfigRequiredHeaders)
     val requiredHeadersHttpRequest = HttpRequest(
       uri = "https://some.url",
@@ -166,6 +211,30 @@ class ElasticSearchLogStoreTests
     await(esLogStore.fetchLogs(user, activation.withoutLogs, requiredHeadersHttpRequest)) shouldBe expectedLogs
   }
 
+  it should "get result from ElasticSearch when required headers are needed" in {
+    val authToken = "token"
+    val authProjectId = "projectId"
+    val httpRequest = HttpRequest(
+      POST,
+      Uri(s"/whisk_user_logs/_search"),
+      List(
+        Accept(MediaTypes.`application/json`),
+        RawHeader("x-auth-token", authToken),
+        RawHeader("x-auth-project-id", authProjectId)),
+      HttpEntity(ContentTypes.`application/json`, defaultResultPayload))
+    val esLogStore =
+      new ElasticSearchLogStore(
+        system,
+        Some(testFlow(defaultResultHttpResponse, httpRequest)),
+        elasticSearchConfig = defaultConfigRequiredHeaders)
+    val requiredHeadersHttpRequest = HttpRequest(
+      uri = "https://some.url",
+      headers = List(RawHeader("x-auth-token", authToken), RawHeader("x-auth-project-id", authProjectId)),
+      entity = HttpEntity.Empty)
+
+    await(esLogStore.fetchResponse(user, activation.withoutLogsOrResult, requiredHeadersHttpRequest)) shouldBe expectedResponse
+  }
+
   it should "dynamically replace $UUID in request path" in {
     val dynamicPathConfig =
       ElasticSearchLogStoreConfig("https", "host", 443, "/elasticsearch/logstash-%s*/_search", defaultLogSchema)
@@ -173,10 +242,10 @@ class ElasticSearchLogStoreTests
       POST,
       Uri(s"/elasticsearch/logstash-${user.uuid.asString}*/_search"),
       List(Accept(MediaTypes.`application/json`)),
-      HttpEntity(ContentTypes.`application/json`, defaultPayload))
+      HttpEntity(ContentTypes.`application/json`, defaultLogPayload))
     val esLogStore = new ElasticSearchLogStore(
       system,
-      Some(testFlow(defaultHttpResponse, httpRequest)),
+      Some(testFlow(defaultLogHttpResponse, httpRequest)),
       elasticSearchConfig = dynamicPathConfig)
 
     await(esLogStore.fetchLogs(user, activation.withoutLogs, defaultLogStoreHttpRequest)) shouldBe expectedLogs
@@ -186,19 +255,32 @@ class ElasticSearchLogStoreTests
     val esLogStore = new ElasticSearchLogStore(system, elasticSearchConfig = defaultConfig)
 
     a[StreamTcpException] should be thrownBy await(esLogStore.fetchLogs(user, activation, defaultLogStoreHttpRequest))
+    a[StreamTcpException] should be thrownBy await(
+      esLogStore.fetchResponse(user, activation, defaultLogStoreHttpRequest))
   }
 
-  it should "forward errors from ElasticSearch" in {
+  it should "forward errors from ElasticSearch log query" in {
     val httpResponse = HttpResponse(StatusCodes.InternalServerError)
     val esLogStore =
       new ElasticSearchLogStore(
         system,
-        Some(testFlow(httpResponse, defaultHttpRequest)),
+        Some(testFlow(httpResponse, defaultLogHttpRequest)),
         elasticSearchConfig = defaultConfig)
 
     a[RuntimeException] should be thrownBy await(esLogStore.fetchLogs(user, activation, defaultLogStoreHttpRequest))
   }
 
+  it should "forward errors from ElasticSearch result query" in {
+    val httpResponse = HttpResponse(StatusCodes.InternalServerError)
+    val esLogStore =
+      new ElasticSearchLogStore(
+        system,
+        Some(testFlow(httpResponse, defaultResultHttpRequest)),
+        elasticSearchConfig = defaultConfig)
+
+    a[RuntimeException] should be thrownBy await(esLogStore.fetchResponse(user, activation, defaultLogStoreHttpRequest))
+  }
+
   it should "error when configuration protocol is invalid" in {
     val invalidHostConfig =
       ElasticSearchLogStoreConfig("protocol", "host", 443, "/whisk_user_logs", defaultLogSchema, Seq.empty)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services