You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2017/12/01 18:50:59 UTC

[GitHub] starpit closed pull request #3044: add WhiskActivation.Outcome, hiding Either[ActivationId, WhiskActivation] impl

starpit closed pull request #3044: add WhiskActivation.Outcome, hiding Either[ActivationId, WhiskActivation] impl
URL: https://github.com/apache/incubator-openwhisk/pull/3044
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/src/main/scala/whisk/core/connector/Message.scala b/common/scala/src/main/scala/whisk/core/connector/Message.scala
index 3059f6b051..61356755f0 100644
--- a/common/scala/src/main/scala/whisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/Message.scala
@@ -92,7 +92,7 @@ object ActivationMessage extends DefaultJsonProtocol {
  * The whisk activation field will have its logs stripped.
  */
 case class CompletionMessage(override val transid: TransactionId,
-                             response: Either[ActivationId, WhiskActivation],
+                             response: WhiskActivation.Outcome,
                              invoker: InstanceId)
     extends Message {
 
diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskActivation.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskActivation.scala
index 1e04b11dd3..90121c100b 100644
--- a/common/scala/src/main/scala/whisk/core/entity/WhiskActivation.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/WhiskActivation.scala
@@ -123,6 +123,13 @@ object WhiskActivation
     with WhiskEntityQueries[WhiskActivation]
     with DefaultJsonProtocol {
 
+  /**
+   * The invoker's result for an activation might only be an ActivationId,
+   * e.g. in the case of RecordTooLargeException; see InvokerReactive
+   *
+   */
+  type Outcome = Either[ActivationId, WhiskActivation]
+
   private implicit val instantSerdes = new RootJsonFormat[Instant] {
     def write(t: Instant) = t.toEpochMilli.toJson
 
diff --git a/core/controller/src/main/scala/whisk/core/controller/WebActions.scala b/core/controller/src/main/scala/whisk/core/controller/WebActions.scala
index 8cfb1e935e..e4844c8fdb 100644
--- a/core/controller/src/main/scala/whisk/core/controller/WebActions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/WebActions.scala
@@ -624,7 +624,7 @@ trait WhiskWebActionsApi extends Directives with ValidateRequestSize with PostAc
     completeRequest(queuedActivation, projectResultField(context, responseType), responseType)
   }
 
-  private def completeRequest(queuedActivation: Future[Either[ActivationId, WhiskActivation]],
+  private def completeRequest(queuedActivation: Future[WhiskActivation.Outcome],
                               projectResultField: => List[String],
                               responseType: MediaExtension)(implicit transid: TransactionId) = {
     onComplete(queuedActivation) {
diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/PostActionActivation.scala b/core/controller/src/main/scala/whisk/core/controller/actions/PostActionActivation.scala
index 5548229ca8..c83d040839 100644
--- a/core/controller/src/main/scala/whisk/core/controller/actions/PostActionActivation.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/actions/PostActionActivation.scala
@@ -49,7 +49,7 @@ protected[core] trait PostActionActivation extends PrimitiveActions with Sequenc
     action: WhiskActionMetaData,
     payload: Option[JsObject],
     waitForResponse: Option[FiniteDuration],
-    cause: Option[ActivationId])(implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] = {
+    cause: Option[ActivationId])(implicit transid: TransactionId): Future[WhiskActivation.Outcome] = {
     action.toExecutableWhiskAction match {
       // this is a topmost sequence
       case None =>
diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
index f604e63b33..a248476404 100644
--- a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
@@ -95,7 +95,7 @@ protected[actions] trait PrimitiveActions {
     action: ExecutableWhiskActionMetaData,
     payload: Option[JsObject],
     waitForResponse: Option[FiniteDuration],
-    cause: Option[ActivationId])(implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] = {
+    cause: Option[ActivationId])(implicit transid: TransactionId): Future[WhiskActivation.Outcome] = {
 
     // merge package parameters with action (action parameters supersede), then merge in payload
     val args = action.parameters merge payload
@@ -152,8 +152,8 @@ protected[actions] trait PrimitiveActions {
   private def waitForActivationResponse(user: Identity,
                                         activationId: ActivationId,
                                         totalWaitTime: FiniteDuration,
-                                        activeAckResponse: Future[Either[ActivationId, WhiskActivation]])(
-    implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] = {
+                                        activeAckResponse: Future[WhiskActivation.Outcome])(
+    implicit transid: TransactionId): Future[WhiskActivation.Outcome] = {
     // this is the promise which active ack or db polling will try to complete via:
     // 1. active ack response, or
     // 2. failing active ack (due to active ack timeout), fall over to db polling
@@ -207,11 +207,10 @@ protected[actions] object ActivationFinisher {
    */
   private val datastorePreemptivePolling = Seq(1.second, 3.seconds, 5.seconds, 7.seconds)
 
-  def props(activationLookup: ActivationLookup)(
-    implicit transid: TransactionId,
-    actorSystem: ActorSystem,
-    executionContext: ExecutionContext,
-    logging: Logging): (Future[Either[ActivationId, WhiskActivation]], ActorRef) = {
+  def props(activationLookup: ActivationLookup)(implicit transid: TransactionId,
+                                                actorSystem: ActorSystem,
+                                                executionContext: ExecutionContext,
+                                                logging: Logging): (Future[WhiskActivation.Outcome], ActorRef) = {
 
     val (p, _, f) = props(activationLookup, datastorePollPeriodForActivation, datastorePreemptivePolling)
     (p.future, f) // hides the polling actor
@@ -227,10 +226,10 @@ protected[actions] object ActivationFinisher {
     implicit transid: TransactionId,
     actorSystem: ActorSystem,
     executionContext: ExecutionContext,
-    logging: Logging): (Promise[Either[ActivationId, WhiskActivation]], ActorRef, ActorRef) = {
+    logging: Logging): (Promise[WhiskActivation.Outcome], ActorRef, ActorRef) = {
 
     // this is strictly completed by the finishing actor
-    val promise = Promise[Either[ActivationId, WhiskActivation]]
+    val promise = Promise[WhiskActivation.Outcome]
     val dbpoller = poller(slowPoll, promise, activationLookup)
     val finisher = Props(new ActivationFinisher(dbpoller, fastPolls, promise))
 
@@ -247,11 +246,10 @@ protected[actions] object ActivationFinisher {
    */
   private class ActivationFinisher(poller: ActorRef, // the activation poller
                                    fastPollPeriods: Seq[FiniteDuration],
-                                   promise: Promise[Either[ActivationId, WhiskActivation]])(
-    implicit transid: TransactionId,
-    actorSystem: ActorSystem,
-    executionContext: ExecutionContext,
-    logging: Logging)
+                                   promise: Promise[WhiskActivation.Outcome])(implicit transid: TransactionId,
+                                                                              actorSystem: ActorSystem,
+                                                                              executionContext: ExecutionContext,
+                                                                              logging: Logging)
       extends Actor {
 
     // when the future completes, self-destruct
@@ -290,7 +288,7 @@ protected[actions] object ActivationFinisher {
    * It is a factory method to facilitate testing.
    */
   private def poller(slowPollPeriod: FiniteDuration,
-                     promise: Promise[Either[ActivationId, WhiskActivation]],
+                     promise: Promise[WhiskActivation.Outcome],
                      activationLookup: ActivationLookup)(implicit transid: TransactionId,
                                                          actorSystem: ActorSystem,
                                                          executionContext: ExecutionContext,
diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala b/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
index 8df2033ad5..d8a6207c16 100644
--- a/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
@@ -66,7 +66,7 @@ protected[actions] trait SequenceActions {
     action: WhiskActionMetaData,
     payload: Option[JsObject],
     waitForResponse: Option[FiniteDuration],
-    cause: Option[ActivationId])(implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]]
+    cause: Option[ActivationId])(implicit transid: TransactionId): Future[WhiskActivation.Outcome]
 
   /**
    * Executes a sequence by invoking in a blocking fashion each of its components.
@@ -90,7 +90,7 @@ protected[actions] trait SequenceActions {
     waitForOutermostResponse: Option[FiniteDuration],
     cause: Option[ActivationId],
     topmost: Boolean,
-    atomicActionsCount: Int)(implicit transid: TransactionId): Future[(Either[ActivationId, WhiskActivation], Int)] = {
+    atomicActionsCount: Int)(implicit transid: TransactionId): Future[(WhiskActivation.Outcome, Int)] = {
     require(action.exec.kind == Exec.SEQUENCE, "this method requires an action sequence")
 
     // create new activation id that corresponds to the sequence
@@ -98,7 +98,7 @@ protected[actions] trait SequenceActions {
     logging.info(this, s"invoking sequence $action topmost $topmost activationid '$seqActivationId'")
 
     val start = Instant.now(Clock.systemUTC())
-    val futureSeqResult: Future[(Either[ActivationId, WhiskActivation], Int)] = {
+    val futureSeqResult: Future[(WhiskActivation.Outcome, Int)] = {
       // even though the result of completeSequenceActivation is Right[WhiskActivation],
       // use a more general type for futureSeqResult in case a blocking invoke takes
       // longer than expected and we must return Left[ActivationId] instead
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
index 1866d2dcba..fec334c5e8 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
@@ -24,7 +24,7 @@ import scala.concurrent.{Future, Promise}
 case class ActivationEntry(id: ActivationId,
                            namespaceId: UUID,
                            invokerName: InstanceId,
-                           promise: Promise[Either[ActivationId, WhiskActivation]])
+                           promise: Promise[WhiskActivation.Outcome])
 trait LoadBalancerData {
 
   /** Get the number of activations across all namespaces. */
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
index be3bb0b09e..d8a4cf5225 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
@@ -82,7 +82,7 @@ trait LoadBalancer {
    *         plus a grace period (see activeAckTimeoutGrace).
    */
   def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
-    implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]]
+    implicit transid: TransactionId): Future[Future[WhiskActivation.Outcome]]
 
 }
 
@@ -118,7 +118,7 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
   override def totalActiveActivations = loadBalancerData.totalActivationCount
 
   override def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
-    implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] = {
+    implicit transid: TransactionId): Future[Future[WhiskActivation.Outcome]] = {
     chooseInvoker(msg.user, action).flatMap { invokerName =>
       val entry = setupActivation(action, msg.activationId, msg.user.uuid, invokerName, transid)
       sendActivationToInvoker(messageProducer, msg, invokerName).map { _ =>
@@ -139,7 +139,7 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
    *
    * @param msg is the kafka message payload as Json
    */
-  private def processCompletion(response: Either[ActivationId, WhiskActivation],
+  private def processCompletion(response: WhiskActivation.Outcome,
                                 tid: TransactionId,
                                 forced: Boolean,
                                 invoker: InstanceId): Unit = {
@@ -195,7 +195,7 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
         processCompletion(Left(activationId), transid, forced = true, invoker = invokerName)
       }
 
-      ActivationEntry(activationId, namespaceId, invokerName, Promise[Either[ActivationId, WhiskActivation]]())
+      ActivationEntry(activationId, namespaceId, invokerName, Promise[WhiskActivation.Outcome]())
     })
   }
 
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index 51246c8f11..b44118f092 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -113,7 +113,7 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa
              controllerInstance: InstanceId) => {
     implicit val transid = tid
 
-    def send(res: Either[ActivationId, WhiskActivation], recovery: Boolean = false) = {
+    def send(res: WhiskActivation.Outcome, recovery: Boolean = false) = {
       val msg = CompletionMessage(transid, res, instance)
 
       producer.send(s"completed${controllerInstance.toInt}", msg).andThen {
diff --git a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
index e7af616a91..83ab92c39e 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
@@ -185,7 +185,7 @@ class DegenerateLoadBalancerService(config: WhiskConfig)(implicit ec: ExecutionC
   override def activeActivationsFor(namespace: UUID) = Future.successful(0)
 
   override def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
-    implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] =
+    implicit transid: TransactionId): Future[Future[WhiskActivation.Outcome]] =
     Future.successful {
       whiskActivationStub map {
         case (timeout, activation) =>
diff --git a/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala
index 9f5acaded5..2aee558249 100644
--- a/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala
@@ -245,7 +245,7 @@ trait WebActionsApiBaseTests extends ControllerTestCommon with BeforeAndAfterEac
     action: WhiskActionMetaData,
     payload: Option[JsObject],
     waitForResponse: Option[FiniteDuration],
-    cause: Option[ActivationId])(implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] = {
+    cause: Option[ActivationId])(implicit transid: TransactionId): Future[WhiskActivation.Outcome] = {
     invocationCount = invocationCount + 1
 
     if (failActivation == 0) {
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala
index af511021de..c4b3359acd 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala
@@ -31,7 +31,7 @@ import scala.concurrent.duration._
 
 class LoadBalancerDataTests extends FlatSpec with Matchers with StreamLogging {
 
-  val activationIdPromise = Promise[Either[ActivationId, WhiskActivation]]()
+  val activationIdPromise = Promise[WhiskActivation.Outcome]()
   val firstEntry: ActivationEntry = ActivationEntry(ActivationId(), UUID(), InstanceId(0), activationIdPromise)
   val secondEntry: ActivationEntry = ActivationEntry(ActivationId(), UUID(), InstanceId(1), activationIdPromise)
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services