You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/05/23 07:16:56 UTC
[GitHub] style95 closed pull request #3671: Activation id in header
style95 closed pull request #3671: Activation id in header
URL: https://github.com/apache/incubator-openwhisk/pull/3671
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/core/controller/src/main/scala/whisk/core/controller/Actions.scala b/core/controller/src/main/scala/whisk/core/controller/Actions.scala
index 25be8fa4ee..17da20e358 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Actions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Actions.scala
@@ -28,7 +28,6 @@ import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.server.RequestContext
import akka.http.scaladsl.server.RouteResult
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
-import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport.sprayJsonMarshaller
import akka.http.scaladsl.unmarshalling._
import spray.json._
import spray.json.DefaultJsonProtocol._
@@ -257,24 +256,31 @@ trait WhiskActionsApi extends WhiskCollectionAPI with PostActionActivation with
onComplete(invokeAction(user, actionWithMergedParams, payload, waitForResponse, cause = None)) {
case Success(Left(activationId)) =>
// non-blocking invoke or blocking invoke which got queued instead
- complete(Accepted, activationId.toJsObject)
+
+ respondWithActivationIdHeader(activationId) {
+ // note that if header defined a content-type, it will be ignored
+ // since the type must be compatible with the data response
+ complete(Accepted, activationId.toJsObject)
+ }
+
case Success(Right(activation)) =>
val response = if (result) activation.resultAsJson else activation.toExtendedJson
-
- if (activation.response.isSuccess) {
- complete(OK, response)
- } else if (activation.response.isApplicationError) {
- // actions that result is ApplicationError status are considered a 'success'
- // and will have an 'error' property in the result - the HTTP status is OK
- // and clients must check the response status if it exists
- // NOTE: response status will not exist in the JSON object if ?result == true
- // and instead clients must check if 'error' is in the JSON
- // PRESERVING OLD BEHAVIOR and will address defect in separate change
- complete(BadGateway, response)
- } else if (activation.response.isContainerError) {
- complete(BadGateway, response)
- } else {
- complete(InternalServerError, response)
+ respondWithActivationIdHeader(activation) {
+ if (activation.response.isSuccess) {
+ complete(OK, response)
+ } else if (activation.response.isApplicationError) {
+ // actions that result is ApplicationError status are considered a 'success'
+ // and will have an 'error' property in the result - the HTTP status is OK
+ // and clients must check the response status if it exists
+ // NOTE: response status will not exist in the JSON object if ?result == true
+ // and instead clients must check if 'error' is in the JSON
+ // PRESERVING OLD BEHAVIOR and will address defect in separate change
+ complete(BadGateway, response)
+ } else if (activation.response.isContainerError) {
+ complete(BadGateway, response)
+ } else {
+ complete(InternalServerError, response)
+ }
}
case Failure(t: RecordTooLargeException) =>
logging.debug(this, s"[POST] action payload was too large")
diff --git a/core/controller/src/main/scala/whisk/core/controller/Entities.scala b/core/controller/src/main/scala/whisk/core/controller/Entities.scala
index cd9a72a08d..f9f0ccc3bc 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Entities.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Entities.scala
@@ -20,9 +20,9 @@ package whisk.core.controller
import scala.concurrent.Future
import scala.language.postfixOps
import scala.util.Try
-
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.model.StatusCodes.RequestEntityTooLarge
+import akka.http.scaladsl.model.headers.RawHeader
import akka.http.scaladsl.server.Directive0
import akka.http.scaladsl.server.Directives
import akka.http.scaladsl.server.RequestContext
@@ -65,6 +65,16 @@ protected[controller] trait ValidateRequestSize extends Directives {
protected val fieldDescriptionForSizeError = "Request"
}
+protected trait CustomHeaders extends Directives {
+
+ /** Add activation ID in headers */
+ protected def respondWithActivationIdHeader(activation: WhiskActivation): Directive0 =
+ respondWithActivationIdHeader(activation.activationId)
+
+ protected def respondWithActivationIdHeader(activationId: ActivationId): Directive0 =
+ respondWithHeader(RawHeader("X-OpenWhisk-Activation-Id", activationId.asString))
+}
+
/** A trait implementing the basic operations on WhiskEntities in support of the various APIs. */
trait WhiskCollectionAPI
extends Directives
@@ -72,7 +82,8 @@ trait WhiskCollectionAPI
with AuthorizedRouteProvider
with ValidateRequestSize
with ReadOps
- with WriteOps {
+ with WriteOps
+ with CustomHeaders {
/** The core collections require backend services to be injected in this trait. */
services: WhiskServices =>
diff --git a/core/controller/src/main/scala/whisk/core/controller/Triggers.scala b/core/controller/src/main/scala/whisk/core/controller/Triggers.scala
index b7f0438a62..a170bab0a7 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Triggers.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Triggers.scala
@@ -172,7 +172,9 @@ trait WhiskTriggersApi extends WhiskCollectionAPI {
case Failure(t) =>
logging.error(this, s"[POST] storing trigger activation $triggerActivationId failed: ${t.getMessage}")
}
- complete(Accepted, triggerActivationId.toJsObject)
+ respondWithActivationIdHeader(triggerActivationId) {
+ complete(Accepted, triggerActivationId.toJsObject)
+ }
} else {
logging
.debug(
diff --git a/core/controller/src/main/scala/whisk/core/controller/WebActions.scala b/core/controller/src/main/scala/whisk/core/controller/WebActions.scala
index d74815c293..4509931e6d 100644
--- a/core/controller/src/main/scala/whisk/core/controller/WebActions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/WebActions.scala
@@ -348,7 +348,7 @@ protected[core] object WhiskWebActionsApi extends Directives {
headers.filter(_.lowercaseName != `Content-Type`.lowercaseName)
}
-trait WhiskWebActionsApi extends Directives with ValidateRequestSize with PostActionActivation {
+trait WhiskWebActionsApi extends Directives with ValidateRequestSize with PostActionActivation with CustomHeaders {
services: WhiskServices =>
/** API path invocation path for posting activations directly through the host. */
@@ -634,37 +634,40 @@ trait WhiskWebActionsApi extends Directives with ValidateRequestSize with PostAc
responseType: MediaExtension)(implicit transid: TransactionId) = {
onComplete(queuedActivation) {
case Success(Right(activation)) =>
- val result = activation.resultAsJson
+ respondWithActivationIdHeader(activation) {
+ val result = activation.resultAsJson
+
+ if (activation.response.isSuccess || activation.response.isApplicationError) {
+ val resultPath = if (activation.response.isSuccess) {
+ projectResultField
+ } else {
+ // the activation produced an error response: therefore ignore
+ // the requested projection and unwrap the error instead
+ // and attempt to handle it per the desired response type (extension)
+ List(ActivationResponse.ERROR_FIELD)
+ }
- if (activation.response.isSuccess || activation.response.isApplicationError) {
- val resultPath = if (activation.response.isSuccess) {
- projectResultField
+ val result = getFieldPath(activation.resultAsJson, resultPath)
+ result match {
+ case Some(projection) =>
+ val marshaler = Future(responseType.transcoder(projection, transid, webApiDirectives))
+ onComplete(marshaler) {
+ case Success(done) => done // all transcoders terminate the connection
+ case Failure(t) => terminate(InternalServerError)
+ }
+ case _ => terminate(NotFound, Messages.propertyNotFound)
+ }
} else {
- // the activation produced an error response: therefore ignore
- // the requested projection and unwrap the error instead
- // and attempt to handle it per the desired response type (extension)
- List(ActivationResponse.ERROR_FIELD)
+ terminate(BadRequest, Messages.errorProcessingRequest)
}
-
- val result = getFieldPath(activation.resultAsJson, resultPath)
- result match {
- case Some(projection) =>
- val marshaler = Future(responseType.transcoder(projection, transid, webApiDirectives))
- onComplete(marshaler) {
- case Success(done) => done // all transcoders terminate the connection
- case Failure(t) => terminate(InternalServerError)
- }
- case _ => terminate(NotFound, Messages.propertyNotFound)
- }
- } else {
- terminate(BadRequest, Messages.errorProcessingRequest)
}
-
case Success(Left(activationId)) =>
// blocking invoke which got queued instead
// this should not happen, instead it should be a blocking invoke timeout
logging.debug(this, "activation waiting period expired")
- terminate(Accepted, Messages.responseNotReady)
+ respondWithActivationIdHeader(activationId) {
+ terminate(Accepted, Messages.responseNotReady)
+ }
case Failure(t: RejectRequest) => terminate(t.code, t.message)
diff --git a/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
index 609e773534..2470318308 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
@@ -68,6 +68,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
def aname() = MakeName.next("action_tests")
val actionLimit = Exec.sizeLimit
val parametersLimit = Parameters.sizeLimit
+ val ActivationIdHeader = "x-openwhisk-activation-id"
//// GET /actions
it should "return empty list when no actions exist" in {
@@ -1157,6 +1158,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
// repeat invoke, get only result back
Post(s"$collectionPath/${action.name}?blocking=true&result=true") ~> Route.seal(routes(creds)) ~> check {
+ headers
status should be(OK)
val response = responseAs[JsObject]
response should be(activation.resultAsJson)
@@ -1236,6 +1238,74 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
}
}
+ it should "invoke a nonblocking action and have activationId header in the response" in {
+ implicit val tid = transid()
+ val action = WhiskAction(namespace, aname(), jsDefault("??"))
+ put(entityStore, action)
+ Post(s"$collectionPath/${action.name}") ~> Route.seal(routes(creds)) ~> check {
+ status should be(Accepted)
+ headers.exists(_.is(ActivationIdHeader)) should be(true)
+ }
+ }
+
+ it should "invoke a blocking action and have activationId header in the response" in {
+ implicit val tid = transid()
+ val action = WhiskAction(namespace, aname(), jsDefault("??"))
+ val activation = WhiskActivation(
+ action.namespace,
+ action.name,
+ creds.subject,
+ activationIdFactory.make(),
+ start = Instant.now,
+ end = Instant.now,
+ response = ActivationResponse.success(Some(JsObject("test" -> "yes".toJson))))
+ put(entityStore, action)
+
+ try {
+ // do not store the activation in the db, instead register it as the response to generate on active ack
+ loadBalancer.whiskActivationStub = Some((1.milliseconds, activation))
+
+ Post(s"$collectionPath/${action.name}?blocking=true") ~> Route.seal(routes(creds)) ~> check {
+ status should be(OK)
+ headers.exists(_.is(ActivationIdHeader)) should be(true)
+ }
+
+ // repeat invoke, get only result back
+ Post(s"$collectionPath/${action.name}?blocking=true&result=true") ~> Route.seal(routes(creds)) ~> check {
+ status should be(OK)
+ headers.exists(_.is(ActivationIdHeader)) should be(true)
+ }
+ } finally {
+ loadBalancer.whiskActivationStub = None
+ }
+ }
+
+ it should "invoke a problematic action and have activationId in error response" in {
+ implicit val tid = transid()
+ val action = WhiskAction(namespace, aname(), jsDefault("??"))
+ val activation = WhiskActivation(
+ action.namespace,
+ action.name,
+ creds.subject,
+ activationIdFactory.make(),
+ start = Instant.now,
+ end = Instant.now,
+ response = ActivationResponse.whiskError("test"))
+ put(entityStore, action)
+ // storing the activation in the db will allow the db polling to retrieve it
+ // the test harness makes sure the activaiton id observed by the test matches
+ // the one generated by the api handler
+ put(activationStore, activation)
+ try {
+ Post(s"$collectionPath/${action.name}?blocking=true") ~> Route.seal(routes(creds)) ~> check {
+ status should be(InternalServerError)
+ headers.exists(_.is(ActivationIdHeader)) should be(true)
+ }
+ } finally {
+ deleteActivation(activation.docid)
+ }
+ }
+
it should "ensure WhiskActionMetadata is used to invoke an action" in {
implicit val tid = transid()
val action = WhiskAction(namespace, aname(), jsDefault("??"))
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services