You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2019/01/09 19:04:10 UTC

[GitHub] dubee closed pull request #4088: Make activation polling for blocking invocations configurable

dubee closed pull request #4088: Make activation polling for blocking invocations configurable
URL: https://github.com/apache/incubator-openwhisk/pull/4088
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml
index 0fb2ebbdf1..b1b3a0ea97 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -245,6 +245,8 @@
 
       "CONFIG_whisk_transactions_header": "{{ transactions.header }}"
 
+      "CONFIG_whisk_controller_activation_pollingFromDb": "{{ controller_activation_pollingFromDb | default(true) }}"
+
 - name: merge extra env variables
   set_fact:
     env: "{{ env | combine(controller.extraEnv) }}"
diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index 40c98a70c2..7179978252 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -301,6 +301,12 @@ whisk {
         #   sample-rate = "0.01" // sample 1% of requests by default
         #}
     }
+
+    controller {
+        activation {
+            polling-from-db: true
+        }
+    }
 }
 #placeholder for test overrides so that tests can override defaults in application.conf (todo: move all defaults to reference.conf)
 test {
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index 8dd78107ab..8061b2ad08 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -241,4 +241,6 @@ object ConfigKeys {
   val query = "whisk.query-limit"
   val execSizeLimit = "whisk.exec-size-limit"
 
+  val controller = s"whisk.controller"
+  val controllerActivation = s"$controller.activation"
 }
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala
index 1d2ca0c1b5..b5084423c1 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala
@@ -22,6 +22,7 @@ import java.time.{Clock, Instant}
 import akka.actor.ActorSystem
 import akka.event.Logging.InfoLevel
 import spray.json._
+
 import org.apache.openwhisk.common.tracing.WhiskTracerProvider
 import org.apache.openwhisk.common.{Logging, LoggingMarkers, TransactionId, UserEvents}
 import org.apache.openwhisk.core.connector.{ActivationMessage, EventMessage, MessagingProvider}
@@ -35,6 +36,7 @@ import org.apache.openwhisk.core.entity.types.EntityStore
 import org.apache.openwhisk.http.Messages._
 import org.apache.openwhisk.spi.SpiLoader
 import org.apache.openwhisk.utils.ExecutionContextFactory.FutureExtensions
+import org.apache.openwhisk.core.ConfigKeys
 
 import scala.collection.mutable.Buffer
 import scala.concurrent.duration._
@@ -42,6 +44,8 @@ import scala.concurrent.{ExecutionContext, Future, Promise}
 import scala.language.postfixOps
 import scala.util.{Failure, Success}
 
+import pureconfig.loadConfigOrThrow
+
 protected[actions] trait PrimitiveActions {
   /** The core collections require backend services to be injected in this trait. */
   services: WhiskServices =>
@@ -593,11 +597,15 @@ protected[actions] trait PrimitiveActions {
     //    in case of an incomplete active-ack (record too large for example).
     activeAckResponse.foreach {
       case Right(activation) => result.trySuccess(Right(activation))
-      case _                 => pollActivation(docid, context, result, i => 1.seconds + (2.seconds * i), maxRetries = 4)
+      case _ if (controllerActivationConfig.pollingFromDb) =>
+        pollActivation(docid, context, result, i => 1.seconds + (2.seconds * i), maxRetries = 4)
+      case _ =>
     }
 
-    // 2. Poll the database slowly in case the active-ack never arrives
-    pollActivation(docid, context, result, _ => 15.seconds)
+    if (controllerActivationConfig.pollingFromDb) {
+      // 2. Poll the database slowly in case the active-ack never arrives
+      pollActivation(docid, context, result, _ => 15.seconds)
+    }
 
     // 3. Timeout forces a fallback to activationId
     val timeout = actorSystem.scheduler.scheduleOnce(totalWaitTime)(result.trySuccess(Left(activationId)))
@@ -644,4 +652,10 @@ protected[actions] trait PrimitiveActions {
 
   /** Max atomic action count allowed for sequences */
   private lazy val actionSequenceLimit = whiskConfig.actionSequenceLimit.toInt
+
+  private val controllerActivationConfig =
+    loadConfigOrThrow[ControllerActivationConfig](ConfigKeys.controllerActivation)
+
 }
+
+case class ControllerActivationConfig(pollingFromDb: Boolean)
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala
index ce8a1bf73c..b69536b2b9 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala
@@ -36,9 +36,12 @@ import org.apache.openwhisk.core.entitlement.Collection
 import org.apache.openwhisk.http.ErrorResponse
 import org.apache.openwhisk.http.Messages
 import org.apache.openwhisk.core.database.UserContext
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.controller.actions.ControllerActivationConfig
 import akka.http.scaladsl.model.headers.RawHeader
 import org.apache.commons.lang3.StringUtils
 import org.apache.openwhisk.core.entity.Attachments.Inline
+import pureconfig.loadConfigOrThrow
 
 /**
  * Tests Actions API.
@@ -65,6 +68,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
   def aname() = MakeName.next("action_tests")
   val actionLimit = Exec.sizeLimit
   val parametersLimit = Parameters.sizeLimit
+  val controllerActivationConfig = loadConfigOrThrow[ControllerActivationConfig](ConfigKeys.controllerActivation)
 
   //// GET /actions
   it should "return empty list when no actions exist" in {
@@ -1360,39 +1364,41 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
     }
   }
 
-  it should "invoke an action, blocking and retrieve result via db polling" in {
-    implicit val tid = transid()
-    val action = WhiskAction(namespace, aname(), jsDefault("??"))
-    val activation = WhiskActivation(
-      action.namespace,
-      action.name,
-      creds.subject,
-      activationIdFactory.make(),
-      start = Instant.now,
-      end = Instant.now,
-      response = ActivationResponse.success(Some(JsObject("test" -> "yes".toJson))),
-      logs = ActivationLogs(Vector("first line", "second line")))
-    put(entityStore, action)
-    // storing the activation in the db will allow the db polling to retrieve it
-    // the test harness makes sure the activation id observed by the test matches
-    // the one generated by the api handler
-    storeActivation(activation, context)
-    try {
-      Post(s"$collectionPath/${action.name}?blocking=true") ~> Route.seal(routes(creds)) ~> check {
-        status should be(OK)
-        val response = responseAs[JsObject]
-        response should be(activation.withoutLogs.toExtendedJson)
-      }
+  if (controllerActivationConfig.pollingFromDb) {
+    it should "invoke an action, blocking and retrieve result via db polling" in {
+      implicit val tid = transid()
+      val action = WhiskAction(namespace, aname(), jsDefault("??"))
+      val activation = WhiskActivation(
+        action.namespace,
+        action.name,
+        creds.subject,
+        activationIdFactory.make(),
+        start = Instant.now,
+        end = Instant.now,
+        response = ActivationResponse.success(Some(JsObject("test" -> "yes".toJson))),
+        logs = ActivationLogs(Vector("first line", "second line")))
+      put(entityStore, action)
+      // storing the activation in the db will allow the db polling to retrieve it
+      // the test harness makes sure the activation id observed by the test matches
+      // the one generated by the api handler
+      storeActivation(activation, context)
+      try {
+        Post(s"$collectionPath/${action.name}?blocking=true") ~> Route.seal(routes(creds)) ~> check {
+          status should be(OK)
+          val response = responseAs[JsObject]
+          response should be(activation.withoutLogs.toExtendedJson)
+        }
 
-      // repeat invoke, get only result back
-      Post(s"$collectionPath/${action.name}?blocking=true&result=true") ~> Route.seal(routes(creds)) ~> check {
-        status should be(OK)
-        val response = responseAs[JsObject]
-        response should be(activation.resultAsJson)
-        headers should contain(RawHeader(ActivationIdHeader, activation.activationId.asString))
+        // repeat invoke, get only result back
+        Post(s"$collectionPath/${action.name}?blocking=true&result=true") ~> Route.seal(routes(creds)) ~> check {
+          status should be(OK)
+          val response = responseAs[JsObject]
+          response should be(activation.resultAsJson)
+          headers should contain(RawHeader(ActivationIdHeader, activation.activationId.asString))
+        }
+      } finally {
+        deleteActivation(ActivationId(activation.docid.asString), context)
       }
-    } finally {
-      deleteActivation(ActivationId(activation.docid.asString), context)
     }
   }
 
@@ -1477,31 +1483,33 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
     }
   }
 
-  it should "invoke a blocking action and return error response when activation fails" in {
-    implicit val tid = transid()
-    val action = WhiskAction(namespace, aname(), jsDefault("??"))
-    val activation = WhiskActivation(
-      action.namespace,
-      action.name,
-      creds.subject,
-      activationIdFactory.make(),
-      start = Instant.now,
-      end = Instant.now,
-      response = ActivationResponse.whiskError("test"))
-    put(entityStore, action)
-    // storing the activation in the db will allow the db polling to retrieve it
-    // the test harness makes sure the activation id observed by the test matches
-    // the one generated by the api handler
-    storeActivation(activation, context)
-    try {
-      Post(s"$collectionPath/${action.name}?blocking=true") ~> Route.seal(routes(creds)) ~> check {
-        status should be(InternalServerError)
-        val response = responseAs[JsObject]
-        response should be(activation.withoutLogs.toExtendedJson)
-        headers should contain(RawHeader(ActivationIdHeader, response.fields("activationId").convertTo[String]))
+  if (controllerActivationConfig.pollingFromDb) {
+    it should "invoke a blocking action and return error response when activation fails" in {
+      implicit val tid = transid()
+      val action = WhiskAction(namespace, aname(), jsDefault("??"))
+      val activation = WhiskActivation(
+        action.namespace,
+        action.name,
+        creds.subject,
+        activationIdFactory.make(),
+        start = Instant.now,
+        end = Instant.now,
+        response = ActivationResponse.whiskError("test"))
+      put(entityStore, action)
+      // storing the activation in the db will allow the db polling to retrieve it
+      // the test harness makes sure the activation id observed by the test matches
+      // the one generated by the api handler
+      storeActivation(activation, context)
+      try {
+        Post(s"$collectionPath/${action.name}?blocking=true") ~> Route.seal(routes(creds)) ~> check {
+          status should be(InternalServerError)
+          val response = responseAs[JsObject]
+          response should be(activation.withoutLogs.toExtendedJson)
+          headers should contain(RawHeader(ActivationIdHeader, response.fields("activationId").convertTo[String]))
+        }
+      } finally {
+        deleteActivation(ActivationId(activation.docid.asString), context)
       }
-    } finally {
-      deleteActivation(ActivationId(activation.docid.asString), context)
     }
   }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services