You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/05/23 07:16:56 UTC

[GitHub] style95 closed pull request #3671: Activation id in header

style95 closed pull request #3671: Activation id in header
URL: https://github.com/apache/incubator-openwhisk/pull/3671
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/controller/src/main/scala/whisk/core/controller/Actions.scala b/core/controller/src/main/scala/whisk/core/controller/Actions.scala
index 25be8fa4ee..17da20e358 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Actions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Actions.scala
@@ -28,7 +28,6 @@ import akka.http.scaladsl.model.StatusCodes._
 import akka.http.scaladsl.server.RequestContext
 import akka.http.scaladsl.server.RouteResult
 import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
-import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport.sprayJsonMarshaller
 import akka.http.scaladsl.unmarshalling._
 import spray.json._
 import spray.json.DefaultJsonProtocol._
@@ -257,24 +256,31 @@ trait WhiskActionsApi extends WhiskCollectionAPI with PostActionActivation with
     onComplete(invokeAction(user, actionWithMergedParams, payload, waitForResponse, cause = None)) {
       case Success(Left(activationId)) =>
         // non-blocking invoke or blocking invoke which got queued instead
-        complete(Accepted, activationId.toJsObject)
+
+        respondWithActivationIdHeader(activationId) {
+          // note that if header defined a content-type, it will be ignored
+          // since the type must be compatible with the data response
+          complete(Accepted, activationId.toJsObject)
+        }
+
       case Success(Right(activation)) =>
         val response = if (result) activation.resultAsJson else activation.toExtendedJson
-
-        if (activation.response.isSuccess) {
-          complete(OK, response)
-        } else if (activation.response.isApplicationError) {
-          // actions that result is ApplicationError status are considered a 'success'
-          // and will have an 'error' property in the result - the HTTP status is OK
-          // and clients must check the response status if it exists
-          // NOTE: response status will not exist in the JSON object if ?result == true
-          // and instead clients must check if 'error' is in the JSON
-          // PRESERVING OLD BEHAVIOR and will address defect in separate change
-          complete(BadGateway, response)
-        } else if (activation.response.isContainerError) {
-          complete(BadGateway, response)
-        } else {
-          complete(InternalServerError, response)
+        respondWithActivationIdHeader(activation) {
+          if (activation.response.isSuccess) {
+            complete(OK, response)
+          } else if (activation.response.isApplicationError) {
+            // actions that result is ApplicationError status are considered a 'success'
+            // and will have an 'error' property in the result - the HTTP status is OK
+            // and clients must check the response status if it exists
+            // NOTE: response status will not exist in the JSON object if ?result == true
+            // and instead clients must check if 'error' is in the JSON
+            // PRESERVING OLD BEHAVIOR and will address defect in separate change
+            complete(BadGateway, response)
+          } else if (activation.response.isContainerError) {
+            complete(BadGateway, response)
+          } else {
+            complete(InternalServerError, response)
+          }
         }
       case Failure(t: RecordTooLargeException) =>
         logging.debug(this, s"[POST] action payload was too large")
diff --git a/core/controller/src/main/scala/whisk/core/controller/Entities.scala b/core/controller/src/main/scala/whisk/core/controller/Entities.scala
index cd9a72a08d..f9f0ccc3bc 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Entities.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Entities.scala
@@ -20,9 +20,9 @@ package whisk.core.controller
 import scala.concurrent.Future
 import scala.language.postfixOps
 import scala.util.Try
-
 import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
 import akka.http.scaladsl.model.StatusCodes.RequestEntityTooLarge
+import akka.http.scaladsl.model.headers.RawHeader
 import akka.http.scaladsl.server.Directive0
 import akka.http.scaladsl.server.Directives
 import akka.http.scaladsl.server.RequestContext
@@ -65,6 +65,16 @@ protected[controller] trait ValidateRequestSize extends Directives {
   protected val fieldDescriptionForSizeError = "Request"
 }
 
+protected trait CustomHeaders extends Directives {
+
+  /** Add activation ID in headers */
+  protected def respondWithActivationIdHeader(activation: WhiskActivation): Directive0 =
+    respondWithActivationIdHeader(activation.activationId)
+
+  protected def respondWithActivationIdHeader(activationId: ActivationId): Directive0 =
+    respondWithHeader(RawHeader("X-OpenWhisk-Activation-Id", activationId.asString))
+}
+
 /** A trait implementing the basic operations on WhiskEntities in support of the various APIs. */
 trait WhiskCollectionAPI
     extends Directives
@@ -72,7 +82,8 @@ trait WhiskCollectionAPI
     with AuthorizedRouteProvider
     with ValidateRequestSize
     with ReadOps
-    with WriteOps {
+    with WriteOps
+    with CustomHeaders {
 
   /** The core collections require backend services to be injected in this trait. */
   services: WhiskServices =>
diff --git a/core/controller/src/main/scala/whisk/core/controller/Triggers.scala b/core/controller/src/main/scala/whisk/core/controller/Triggers.scala
index b7f0438a62..a170bab0a7 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Triggers.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Triggers.scala
@@ -172,7 +172,9 @@ trait WhiskTriggersApi extends WhiskCollectionAPI {
                 case Failure(t) =>
                   logging.error(this, s"[POST] storing trigger activation $triggerActivationId failed: ${t.getMessage}")
               }
-            complete(Accepted, triggerActivationId.toJsObject)
+            respondWithActivationIdHeader(triggerActivationId) {
+              complete(Accepted, triggerActivationId.toJsObject)
+            }
           } else {
             logging
               .debug(
diff --git a/core/controller/src/main/scala/whisk/core/controller/WebActions.scala b/core/controller/src/main/scala/whisk/core/controller/WebActions.scala
index d74815c293..4509931e6d 100644
--- a/core/controller/src/main/scala/whisk/core/controller/WebActions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/WebActions.scala
@@ -348,7 +348,7 @@ protected[core] object WhiskWebActionsApi extends Directives {
     headers.filter(_.lowercaseName != `Content-Type`.lowercaseName)
 }
 
-trait WhiskWebActionsApi extends Directives with ValidateRequestSize with PostActionActivation {
+trait WhiskWebActionsApi extends Directives with ValidateRequestSize with PostActionActivation with CustomHeaders {
   services: WhiskServices =>
 
   /** API path invocation path for posting activations directly through the host. */
@@ -634,37 +634,40 @@ trait WhiskWebActionsApi extends Directives with ValidateRequestSize with PostAc
                               responseType: MediaExtension)(implicit transid: TransactionId) = {
     onComplete(queuedActivation) {
       case Success(Right(activation)) =>
-        val result = activation.resultAsJson
+        respondWithActivationIdHeader(activation) {
+          val result = activation.resultAsJson
+
+          if (activation.response.isSuccess || activation.response.isApplicationError) {
+            val resultPath = if (activation.response.isSuccess) {
+              projectResultField
+            } else {
+              // the activation produced an error response: therefore ignore
+              // the requested projection and unwrap the error instead
+              // and attempt to handle it per the desired response type (extension)
+              List(ActivationResponse.ERROR_FIELD)
+            }
 
-        if (activation.response.isSuccess || activation.response.isApplicationError) {
-          val resultPath = if (activation.response.isSuccess) {
-            projectResultField
+            val result = getFieldPath(activation.resultAsJson, resultPath)
+            result match {
+              case Some(projection) =>
+                val marshaler = Future(responseType.transcoder(projection, transid, webApiDirectives))
+                onComplete(marshaler) {
+                  case Success(done) => done // all transcoders terminate the connection
+                  case Failure(t)    => terminate(InternalServerError)
+                }
+              case _ => terminate(NotFound, Messages.propertyNotFound)
+            }
           } else {
-            // the activation produced an error response: therefore ignore
-            // the requested projection and unwrap the error instead
-            // and attempt to handle it per the desired response type (extension)
-            List(ActivationResponse.ERROR_FIELD)
+            terminate(BadRequest, Messages.errorProcessingRequest)
           }
-
-          val result = getFieldPath(activation.resultAsJson, resultPath)
-          result match {
-            case Some(projection) =>
-              val marshaler = Future(responseType.transcoder(projection, transid, webApiDirectives))
-              onComplete(marshaler) {
-                case Success(done) => done // all transcoders terminate the connection
-                case Failure(t)    => terminate(InternalServerError)
-              }
-            case _ => terminate(NotFound, Messages.propertyNotFound)
-          }
-        } else {
-          terminate(BadRequest, Messages.errorProcessingRequest)
         }
-
       case Success(Left(activationId)) =>
         // blocking invoke which got queued instead
         // this should not happen, instead it should be a blocking invoke timeout
         logging.debug(this, "activation waiting period expired")
-        terminate(Accepted, Messages.responseNotReady)
+        respondWithActivationIdHeader(activationId) {
+          terminate(Accepted, Messages.responseNotReady)
+        }
 
       case Failure(t: RejectRequest) => terminate(t.code, t.message)
 
diff --git a/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
index 609e773534..2470318308 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
@@ -68,6 +68,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
   def aname() = MakeName.next("action_tests")
   val actionLimit = Exec.sizeLimit
   val parametersLimit = Parameters.sizeLimit
+  val ActivationIdHeader = "x-openwhisk-activation-id"
 
   //// GET /actions
   it should "return empty list when no actions exist" in {
@@ -1157,6 +1158,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
 
       // repeat invoke, get only result back
       Post(s"$collectionPath/${action.name}?blocking=true&result=true") ~> Route.seal(routes(creds)) ~> check {
+        headers
         status should be(OK)
         val response = responseAs[JsObject]
         response should be(activation.resultAsJson)
@@ -1236,6 +1238,74 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
     }
   }
 
+  it should "invoke a nonblocking action and have activationId header in the response" in {
+    implicit val tid = transid()
+    val action = WhiskAction(namespace, aname(), jsDefault("??"))
+    put(entityStore, action)
+    Post(s"$collectionPath/${action.name}") ~> Route.seal(routes(creds)) ~> check {
+      status should be(Accepted)
+      headers.exists(_.is(ActivationIdHeader)) should be(true)
+    }
+  }
+
+  it should "invoke a blocking action and have activationId header in the response" in {
+    implicit val tid = transid()
+    val action = WhiskAction(namespace, aname(), jsDefault("??"))
+    val activation = WhiskActivation(
+      action.namespace,
+      action.name,
+      creds.subject,
+      activationIdFactory.make(),
+      start = Instant.now,
+      end = Instant.now,
+      response = ActivationResponse.success(Some(JsObject("test" -> "yes".toJson))))
+    put(entityStore, action)
+
+    try {
+      // do not store the activation in the db, instead register it as the response to generate on active ack
+      loadBalancer.whiskActivationStub = Some((1.milliseconds, activation))
+
+      Post(s"$collectionPath/${action.name}?blocking=true") ~> Route.seal(routes(creds)) ~> check {
+        status should be(OK)
+        headers.exists(_.is(ActivationIdHeader)) should be(true)
+      }
+
+      // repeat invoke, get only result back
+      Post(s"$collectionPath/${action.name}?blocking=true&result=true") ~> Route.seal(routes(creds)) ~> check {
+        status should be(OK)
+        headers.exists(_.is(ActivationIdHeader)) should be(true)
+      }
+    } finally {
+      loadBalancer.whiskActivationStub = None
+    }
+  }
+
+  it should "invoke a problematic action and have activationId in error response" in {
+    implicit val tid = transid()
+    val action = WhiskAction(namespace, aname(), jsDefault("??"))
+    val activation = WhiskActivation(
+      action.namespace,
+      action.name,
+      creds.subject,
+      activationIdFactory.make(),
+      start = Instant.now,
+      end = Instant.now,
+      response = ActivationResponse.whiskError("test"))
+    put(entityStore, action)
+    // storing the activation in the db will allow the db polling to retrieve it
+    // the test harness makes sure the activaiton id observed by the test matches
+    // the one generated by the api handler
+    put(activationStore, activation)
+    try {
+      Post(s"$collectionPath/${action.name}?blocking=true") ~> Route.seal(routes(creds)) ~> check {
+        status should be(InternalServerError)
+        headers.exists(_.is(ActivationIdHeader)) should be(true)
+      }
+    } finally {
+      deleteActivation(activation.docid)
+    }
+  }
+
   it should "ensure WhiskActionMetadata is used to invoke an action" in {
     implicit val tid = transid()
     val action = WhiskAction(namespace, aname(), jsDefault("??"))


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services