You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by gi...@git.apache.org on 2017/08/16 14:15:55 UTC

[GitHub] rabbah commented on a change in pull request #2602: Remove old invoker code and refactor accordingly.

rabbah commented on a change in pull request #2602: Remove old invoker code and refactor accordingly.
URL: https://github.com/apache/incubator-openwhisk/pull/2602#discussion_r133461959
 
 

 ##########
 File path: core/invoker/src/main/scala/whisk/core/dispatcher/Dispatcher.scala
 ##########
 @@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.dispatcher
-
-import java.nio.charset.StandardCharsets
-
-import scala.collection.concurrent.TrieMap
-import scala.concurrent.Future
-import scala.concurrent.duration.FiniteDuration
-import scala.util.Failure
-import scala.util.Success
-
-import akka.actor.ActorSystem
-import akka.actor.Props
-import akka.actor.actorRef2Scala
-import whisk.common.Counter
-import whisk.common.Logging
-import whisk.core.connector.ActivationMessage
-import whisk.core.connector.MessageConsumer
-import whisk.core.connector.MessageFeed
-
-/**
- * Creates a dispatcher that pulls messages from the message pub/sub connector.
- * This is currently used by invoker only. It may be removed in the future and
- * its functionality merged directly with the invoker. The current model allows
- * for different message types to be received by more than one consumer in the
- * same process (via handler registration).
- *
- * @param verbosity level for logging
- * @param consumer the consumer providing messages
- * @param pollDuration the long poll duration (max duration to wait for new messages)
- * @param maxPipelineDepth the maximum number of messages allowed in the queued (even >=2)
- * @param actorSystem an actor system to create actor
- */
-@throws[IllegalArgumentException]
-class Dispatcher(
-    consumer: MessageConsumer,
-    pollDuration: FiniteDuration,
-    maxPipelineDepth: Int,
-    actorSystem: ActorSystem)(
-        implicit logging: Logging)
-    extends Registrar {
-
-    // create activation request feed but do not start it, until the invoker is registered
-    val activationFeed = actorSystem.actorOf(Props(new MessageFeed("activation", logging, consumer, maxPipelineDepth, pollDuration, process, autoStart = false)))
-
-    def start() = activationFeed ! MessageFeed.Ready
-    def stop() = consumer.close()
-
-    /**
-     * Consumes activation messages from the bus using a streaming consumer
-     * interface. Each message is a JSON object serialization of ActivationMessage.
-     *
-     * For every message that is received, process it with all attached handlers.
-     * A handler is registered via addHandler and unregistered via removeHandler.
-     * There is typically only one handler.
-     */
-    def process(bytes: Array[Byte]): Future[Unit] = Future {
-        val raw = new String(bytes, StandardCharsets.UTF_8)
-        ActivationMessage.parse(raw) match {
-            case Success(m) =>
-                handlers foreach {
-                    case (name, handler) => handleMessage(handler, m)
-                }
-            case Failure(t) => logging.info(this, errorMsg(raw, t))
-        }
-    }
-
-    private def handleMessage(handler: MessageHandler, msg: ActivationMessage): Unit = {
-        implicit val tid = msg.transid
-
-        Future {
-            val count = counter.next()
-            logging.debug(this, s"activeCount = $count while handling ${handler.name}")
-            handler.onMessage(msg) // returns a future which is flat-mapped via identity to hang onComplete
-        } flatMap (identity) onComplete {
-            case Success(a) => logging.debug(this, s"activeCount = ${counter.prev()} after handling ${handler.name}")
-            case Failure(t) => logging.error(this, s"activeCount = ${counter.prev()} ${errorMsg(handler, t)}")
 
 Review comment:
   and related to the IllegalStateException below, we no longer log the failed future from the invoker.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services