You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by gi...@git.apache.org on 2017/08/16 14:02:23 UTC

[GitHub] markusthoemmes commented on a change in pull request #2602: Remove old invoker code and refactor accordingly.

markusthoemmes commented on a change in pull request #2602: Remove old invoker code and refactor accordingly.
URL: https://github.com/apache/incubator-openwhisk/pull/2602#discussion_r133458130
 
 

 ##########
 File path: core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
 ##########
 @@ -147,70 +156,74 @@ class InvokerReactive(
 
     val pool = actorSystem.actorOf(ContainerPool.props(
         childFactory,
-        OldContainerPool.getDefaultMaxActive(config),
-        OldContainerPool.getDefaultMaxActive(config),
+        maximumContainers,
+        maximumContainers,
         activationFeed,
         Some(PrewarmingConfig(2, prewarmExec, 256.MB))))
 
     /** Is called when an ActivationMessage is read from Kafka */
-    override def onMessage(msg: ActivationMessage)(implicit transid: TransactionId): Future[Unit] = {
-        require(msg != null, "message undefined")
-        require(msg.action.version.isDefined, "action version undefined")
-
-        val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION)
-        val namespace = msg.action.path
-        val name = msg.action.name
-        val actionid = FullyQualifiedEntityName(namespace, name).toDocId.asDocInfo(msg.revision)
-        val tran = Transaction(msg)
-        val subject = msg.user.subject
-
-        logging.info(this, s"${actionid.id} $subject ${msg.activationId}")
-
-        // caching is enabled since actions have revision id and an updated
-        // action will not hit in the cache due to change in the revision id;
-        // if the doc revision is missing, then bypass cache
-        if (actionid.rev == DocRevision.empty) {
-            logging.warn(this, s"revision was not provided for ${actionid.id}")
-        }
-        WhiskAction.get(entityStore, actionid.id, actionid.rev, fromCache = actionid.rev != DocRevision.empty).flatMap { action =>
-            action.toExecutableWhiskAction match {
-                case Some(executable) =>
-                    pool ! Run(executable, msg)
-                    Future.successful(())
-                case None =>
-                    logging.error(this, s"non-executable action reached the invoker ${action.fullyQualifiedName(false)}")
-                    Future.failed(new IllegalStateException())
-            }
-        }.recover {
-            case t =>
-                // If the action cannot be found, the user has concurrently deleted it,
-                // making this an application error. All other errors are considered system
-                // errors and should cause the invoker to be considered unhealthy.
-                val response = t match {
-                    case _: NoDocumentException => ActivationResponse.applicationError(Messages.actionRemovedWhileInvoking)
-                    case _                      => ActivationResponse.whiskError(Messages.actionRemovedWhileInvoking)
+    def processActivationMessage(bytes: Array[Byte]): Future[Unit] = {
+        Future(ActivationMessage.parse(new String(bytes, StandardCharsets.UTF_8)))
+            .flatMap(Future.fromTry(_))
+            .filter(_.action.version.isDefined)
+            .flatMap { msg =>
+                implicit val transid = msg.transid
+
+                val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION)
+                val namespace = msg.action.path
+                val name = msg.action.name
+                val actionid = FullyQualifiedEntityName(namespace, name).toDocId.asDocInfo(msg.revision)
+                val subject = msg.user.subject
+
+                logging.info(this, s"${actionid.id} $subject ${msg.activationId}")
+
+                // caching is enabled since actions have revision id and an updated
+                // action will not hit in the cache due to change in the revision id;
+                // if the doc revision is missing, then bypass cache
+                if (actionid.rev == DocRevision.empty) {
+                    logging.warn(this, s"revision was not provided for ${actionid.id}")
                 }
-                val interval = Interval.zero
-                val causedBy = if (msg.causedBySequence) Parameters("causedBy", "sequence".toJson) else Parameters()
-                val activation = WhiskActivation(
-                    activationId = msg.activationId,
-                    namespace = msg.activationNamespace,
-                    subject = msg.user.subject,
-                    cause = msg.cause,
-                    name = msg.action.name,
-                    version = msg.action.version.getOrElse(SemVer()),
-                    start = interval.start,
-                    end = interval.end,
-                    duration = Some(interval.duration.toMillis),
-                    response = response,
-                    annotations = {
-                        Parameters("path", msg.action.toString.toJson) ++ causedBy
-                    })
-
-                activationFeed ! MessageFeed.Processed
-                ack(msg.transid, activation, msg.rootControllerIndex)
-                store(msg.transid, activation)
-        }
+
+                WhiskAction.get(entityStore, actionid.id, actionid.rev, fromCache = actionid.rev != DocRevision.empty).flatMap { action =>
+                    action.toExecutableWhiskAction match {
+                        case Some(executable) =>
+                            pool ! Run(executable, msg)
+                            Future.successful(())
+                        case None =>
+                            logging.error(this, s"non-executable action reached the invoker ${action.fullyQualifiedName(false)}")
+                            Future.failed(new IllegalStateException())
 
 Review comment:
   We could do that for good measure. Note though that the message wouldn't be displayed anywhere (at least not today)
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services