You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by cs...@apache.org on 2018/02/11 14:53:14 UTC

[incubator-openwhisk] branch master updated: Refactor some bits of the triggers API. (#3256)

This is an automated email from the ASF dual-hosted git repository.

csantanapr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 863858a  Refactor some bits of the triggers API. (#3256)
863858a is described below

commit 863858a50e28d3ebf23b588683f6495933baa16a
Author: Markus Thömmes <ma...@me.com>
AuthorDate: Sun Feb 11 15:53:12 2018 +0100

    Refactor some bits of the triggers API. (#3256)
---
 .../scala/whisk/core/controller/Triggers.scala     | 164 ++++++++-------------
 1 file changed, 63 insertions(+), 101 deletions(-)

diff --git a/core/controller/src/main/scala/whisk/core/controller/Triggers.scala b/core/controller/src/main/scala/whisk/core/controller/Triggers.scala
index f50fdb1..207e4f5 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Triggers.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Triggers.scala
@@ -21,13 +21,12 @@ import java.time.{Clock, Instant}
 
 import scala.collection.immutable.Map
 import scala.concurrent.Future
-import scala.util.{Failure, Success}
-
+import scala.util.{Failure, Try}
 import akka.actor.ActorSystem
 import akka.http.scaladsl.Http
 import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
 import akka.http.scaladsl.model.HttpMethods.POST
-import akka.http.scaladsl.model.StatusCodes.{Accepted, BadRequest, InternalServerError, OK}
+import akka.http.scaladsl.model.StatusCodes.{Accepted, BadRequest, InternalServerError, OK, ServerError}
 import akka.http.scaladsl.model.Uri.Path
 import akka.http.scaladsl.model.headers.{Authorization, BasicHttpCredentials}
 import akka.http.scaladsl.model._
@@ -129,35 +128,23 @@ trait WhiskTriggersApi extends WhiskCollectionAPI {
 
           if (activeRules.nonEmpty) {
             val args: JsObject = trigger.parameters.merge(payload).getOrElse(JsObject())
-            val actionLogList: Iterable[Future[JsObject]] = activateRules(user, args, activeRules)
-
-            // For each of the action activation results, generate a log message to attach to the trigger activation
-            Future
-              .sequence(actionLogList)
-              .map(_.map(_.compactPrint))
-              .onComplete {
-                case Success(triggerLogs) =>
-                  val triggerActivationDoc = triggerActivation.withLogs(ActivationLogs(triggerLogs.toVector))
-                  logging
-                    .debug(
-                      this,
-                      s"[POST] trigger activated, writing activation record to datastore: $triggerActivationId")
-                  WhiskActivation.put(activationStore, triggerActivationDoc) recover {
-                    case t =>
-                      logging
-                        .error(this, s"[POST] storing trigger activation $triggerActivationId failed: ${t.getMessage}")
-                  }
-                case Failure(e) =>
+
+            activateRules(user, args, activeRules)
+              .map(results => triggerActivation.withLogs(ActivationLogs(results.map(_.toJson.compactPrint).toVector)))
+              .recover {
+                case e =>
                   logging.error(this, s"Failed to write action activation results to trigger activation: $e")
-                  logging
-                    .info(
-                      this,
-                      s"[POST] trigger activated, writing activation record to datastore: $triggerActivationId")
-                  WhiskActivation.put(activationStore, triggerActivation) recover {
-                    case t =>
-                      logging
-                        .error(this, s"[POST] storing trigger activation $triggerActivationId failed: ${t.getMessage}")
-                  }
+                  triggerActivation
+              }
+              .map { activation =>
+                logging.debug(
+                  this,
+                  s"[POST] trigger activated, writing activation record to datastore: $triggerActivationId")
+                WhiskActivation.put(activationStore, activation)
+              }
+              .andThen {
+                case Failure(t) =>
+                  logging.error(this, s"[POST] storing trigger activation $triggerActivationId failed: ${t.getMessage}")
               }
           }
 
@@ -305,59 +292,56 @@ trait WhiskTriggersApi extends WhiskCollectionAPI {
   private def activateRules(user: Identity,
                             args: JsObject,
                             rulesToActivate: Map[FullyQualifiedEntityName, ReducedRule])(
-    implicit transid: TransactionId): Iterable[Future[JsObject]] = {
-    rulesToActivate.map {
+    implicit transid: TransactionId): Future[Iterable[RuleActivationResult]] = {
+    val ruleResults = rulesToActivate.map {
       case (ruleName, rule) =>
         // Invoke the action. Retain action results for inclusion in the trigger activation record
-        val actionActivationResult: Future[JsObject] = postActivation(user, rule, args)
+        postActivation(user, rule, args)
           .flatMap { response =>
             response.status match {
               case OK | Accepted =>
                 Unmarshal(response.entity).to[JsObject].map { activationResponse =>
-                  val activationId: JsValue = activationResponse.fields("activationId")
+                  val activationId = activationResponse.fields("activationId").convertTo[ActivationId]
                   logging.debug(this, s"trigger-fired action '${rule.action}' invoked with activation $activationId")
-                  ruleResult(ActivationResponse.Success, ruleName, rule.action, Some(activationId))
+                  RuleActivationResult(ActivationResponse.Success, ruleName, rule.action, Right(activationId))
                 }
 
-              // all proper controller responses are JSON objects that deserialize to an ErrorResponse instance
-              case code if (response.entity.contentType == ContentTypes.`application/json`) =>
-                Unmarshal(response.entity).to[ErrorResponse].map { e =>
-                  val statusCode =
-                    if (code != InternalServerError) {
-                      logging
-                        .debug(
-                          this,
-                          s"trigger-fired action '${rule.action}' failed to invoke with ${e.error}, ${e.code}")
-                      ActivationResponse.ApplicationError
-                    } else {
+              case code =>
+                Unmarshal(response.entity).to[String].map { error =>
+                  val failureType = code match {
+                    case _: ServerError => ActivationResponse.WhiskError // all 500s are to be considered whisk errors
+                    case _              => ActivationResponse.ApplicationError
+                  }
+                  val errorMessage: String = Try(error.parseJson.convertTo[ErrorResponse])
+                    .map { e =>
+                      def logMsg = s"trigger-fired action '${rule.action}' failed to invoke with ${e.error}, ${e.code}"
+                      if (failureType == ActivationResponse.ApplicationError) logging.debug(this, logMsg)
+                      else logging.error(this, logMsg)
+
+                      e.error
+                    }
+                    .getOrElse {
                       logging
-                        .error(
-                          this,
-                          s"trigger-fired action '${rule.action}' failed to invoke with ${e.error}, ${e.code}")
-                      ActivationResponse.WhiskError
+                        .error(this, s"trigger-fired action '${rule.action}' failed to invoke with status code $code")
+                      InternalServerError.defaultMessage
                     }
-                  ruleResult(statusCode, ruleName, rule.action, errorMsg = Some(e.error))
-                }
 
-              case code =>
-                logging.error(this, s"trigger-fired action '${rule.action}' failed to invoke with status code $code")
-                Unmarshal(response.entity).to[String].map { error =>
-                  ruleResult(ActivationResponse.WhiskError, ruleName, rule.action, errorMsg = Some(error))
+                  RuleActivationResult(failureType, ruleName, rule.action, Left(errorMessage))
                 }
             }
           }
           .recover {
             case t =>
               logging.error(this, s"trigger-fired action '${rule.action}' failed to invoke with $t")
-              ruleResult(
+              RuleActivationResult(
                 ActivationResponse.WhiskError,
                 ruleName,
                 rule.action,
-                errorMsg = Some(InternalServerError.defaultMessage))
+                Left(InternalServerError.defaultMessage))
           }
-
-        actionActivationResult
     }
+
+    Future.sequence(ruleResults)
   }
 
   /**
@@ -372,58 +356,36 @@ trait WhiskTriggersApi extends WhiskCollectionAPI {
     // Build the url to invoke an action mapped to the rule
     val actionUrl = baseControllerPath / rule.action.path.root.asString / "actions"
 
-    val actionPath = {
-      rule.action.path.relativePath.map { pkg =>
-        (Path.SingleSlash + pkg.namespace) / rule.action.name.asString
-      } getOrElse {
-        Path.SingleSlash + rule.action.name.asString
-      }
-    }.toString
+    val actionPath = rule.action.path.relativePath
+      .map(pkg => Path / pkg.namespace / rule.action.name.asString)
+      .getOrElse(Path / rule.action.name.asString)
 
     val request = HttpRequest(
       method = POST,
-      uri = url.withPath(actionUrl + actionPath),
+      uri = url.withPath(actionUrl ++ actionPath),
       headers = List(Authorization(BasicHttpCredentials(user.authkey.uuid.asString, user.authkey.key.asString))),
       entity = HttpEntity(MediaTypes.`application/json`, args.compactPrint))
 
     Http().singleRequest(request)
   }
 
-  /**
-   * Create JSON object containing the pertinent rule activation details.
-   * {
-   *   "rule": "my-rule",
-   *   "action": "my-action",
-   *   "statusCode": 0,
-   *   "status": "success",
-   *   "activationId": "...",                              // either this field, ...
-   *   "error": "The requested resource does not exist."   // ... or this field will be present
-   * }
-   *
-   * @param statusCode one of ActivationResponse values
-   * @param ruleName the name of the rule that was activated
-   * @param actionName the name of the action activated by the rule
-   * @param actionActivationId the activation id, if there is one
-   * @param errorMsg the rror messages otherwise
-   * @return JsObject as formatted above
-   */
-  private def ruleResult(statusCode: Int,
-                         ruleName: FullyQualifiedEntityName,
-                         actionName: FullyQualifiedEntityName,
-                         actionActivationId: Option[JsValue] = None,
-                         errorMsg: Option[String] = None): JsObject = {
-    JsObject(
-      Map(
-        "rule" -> JsString(ruleName.asString),
-        "action" -> JsString(actionName.asString),
-        "statusCode" -> JsNumber(statusCode),
-        "success" -> JsBoolean(statusCode == ActivationResponse.Success)) ++
-        actionActivationId.map("activationId" -> _.toJson) ++
-        errorMsg.map("error" -> JsString(_)))
+  /** Contains the result of invoking a rule */
+  case class RuleActivationResult(statusCode: Int,
+                                  ruleName: FullyQualifiedEntityName,
+                                  actionName: FullyQualifiedEntityName,
+                                  response: Either[String, ActivationId]) {
+    def toJson: JsObject =
+      JsObject(
+        Map(
+          "rule" -> ruleName.asString.toJson,
+          "action" -> actionName.asString.toJson,
+          "statusCode" -> statusCode.toJson,
+          "success" -> (statusCode == ActivationResponse.Success).toJson,
+          response.fold("error" -> _.toJson, "activationId" -> _.toJson)))
   }
 
   /** Common base bath for the controller, used by internal action activation mechanism. */
-  private val baseControllerPath = Path("/api/v1/namespaces")
+  private val baseControllerPath = Path / "api" / "v1" / "namespaces"
 
   /** Custom unmarshaller for query parameters "limit" for "list" operations. */
   private implicit val stringToListLimit: Unmarshaller[String, ListLimit] = RestApiCommons.stringToListLimit(collection)

-- 
To stop receiving notification emails like this one, please contact
csantanapr@apache.org.