You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/05/24 19:44:05 UTC

[GitHub] tysonnorris closed pull request #3687: Concurrency limit per action

tysonnorris closed pull request #3687: Concurrency limit per action
URL: https://github.com/apache/incubator-openwhisk/pull/3687
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.travis.yml b/.travis.yml
index e37432e21a..0d6f7e9e1b 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -64,10 +64,14 @@ jobs:
       env: DESCRIPTION="System Tests"
     - script:
         - ./performance/preparation/deploy.sh
-        - TERM=dumb ./performance/wrk_tests/latency.sh "https://172.17.0.1:10001" "$(cat ansible/files/auth.guest)" 2m
-        - TERM=dumb ./performance/wrk_tests/throughput.sh "https://172.17.0.1:10001" "$(cat ansible/files/auth.guest)" 4 2 2m
+        - TERM=dumb ./performance/wrk_tests/latency.sh "https://172.17.0.1:10001" "$(cat ansible/files/auth.guest)" ./performance/preparation/actions/noop.js 2m
+        - TERM=dumb ./performance/wrk_tests/latency.sh "https://172.17.0.1:10001" "$(cat ansible/files/auth.guest)" ./performance/preparation/actions/async.js 2m
+        - TERM=dumb ./performance/wrk_tests/throughput.sh "https://172.17.0.1:10001" "$(cat ansible/files/auth.guest)" ./performance/preparation/actions/noop.js 4 2 2m
+        - TERM=dumb ./performance/wrk_tests/throughput.sh "https://172.17.0.1:10001" "$(cat ansible/files/auth.guest)" ./performance/preparation/actions/async.js 4 2 2m
         - OPENWHISK_HOST="172.17.0.1" CONNECTIONS="100" REQUESTS_PER_SEC="1" ./gradlew gatlingRun-ApiV1Simulation
         - OPENWHISK_HOST="172.17.0.1" MEAN_RESPONSE_TIME="1000" API_KEY="$(cat ansible/files/auth.guest)" EXCLUDED_KINDS="python:default,java:default,swift:default" ./gradlew gatlingRun-LatencySimulation
         - OPENWHISK_HOST="172.17.0.1" API_KEY="$(cat ansible/files/auth.guest)" CONNECTIONS="100" REQUESTS_PER_SEC="1" ./gradlew gatlingRun-BlockingInvokeOneActionSimulation
+        - OPENWHISK_HOST="172.17.0.1" API_KEY="$(cat ansible/files/auth.guest)" CONNECTIONS="100" REQUESTS_PER_SEC="1" ASYNC="true" ./gradlew gatlingRun-BlockingInvokeOneActionSimulation
+        - ./tools/travis/checkAndUploadLogs.sh perf
       env:
         - DESCRIPTION="Execute wrk-performance test suite."
diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index 1d50e84bc2..5c8f284d85 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -209,6 +209,8 @@
       "CONFIG_whisk_timeLimit_std": "{{ limit_action_time_std | default() }}"
       "CONFIG_whisk_activation_payload_max": "{{ limit_activation_payload | default() }}"
       "CONFIG_whisk_transactions_header": "{{ transactions.header }}"
+      "CONFIG_whisk_containerPool_maxConcurrent": "{{ invoker.maxConcurrent }}"
+
 
 - name: extend invoker env
   set_fact:
diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index c61a60c546..aa653320f4 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -175,6 +175,13 @@ whisk {
         std = 10 m
     }
 
+    # action concurrency-limit configuration
+    concurrency-limit {
+        min = 1
+        max = 500
+        std = 1
+    }
+
     mesos {
         master-url = "http://localhost:5050" //your mesos master
         master-public-url = "http://localhost:5050" // if mesos-link-log-message == true, this link will be included with the static log message (may or may not be different from master-url)
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 0c937ae372..2660167939 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -218,6 +218,7 @@ object ConfigKeys {
   val memory = "whisk.memory"
   val timeLimit = "whisk.time-limit"
   val logLimit = "whisk.log-limit"
+  val concurrencyLimit = "whisk.concurrency-limit"
   val activation = "whisk.activation"
   val activationPayload = s"$activation.payload"
   val userEvents = "whisk.user-events"
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/Container.scala b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
index 49c692b086..dab42d419e 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
@@ -79,7 +79,8 @@ trait Container {
   }
 
   /** Initializes code in the container. */
-  def initialize(initializer: JsObject, timeout: FiniteDuration)(implicit transid: TransactionId): Future[Interval] = {
+  def initialize(initializer: JsObject, timeout: FiniteDuration, maxConcurrent: Int)(
+    implicit transid: TransactionId): Future[Interval] = {
     val start = transid.started(
       this,
       LoggingMarkers.INVOKER_ACTIVATION_INIT,
@@ -87,7 +88,7 @@ trait Container {
       logLevel = InfoLevel)
 
     val body = JsObject("value" -> initializer)
-    callContainer("/init", body, timeout, retry = true)
+    callContainer("/init", body, timeout, maxConcurrent, retry = true)
       .andThen { // never fails
         case Success(r: RunResult) =>
           transid.finished(
@@ -117,7 +118,7 @@ trait Container {
   }
 
   /** Runs code in the container. */
-  def run(parameters: JsObject, environment: JsObject, timeout: FiniteDuration)(
+  def run(parameters: JsObject, environment: JsObject, timeout: FiniteDuration, maxConcurrent: Int)(
     implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
     val actionName = environment.fields.get("action_name").map(_.convertTo[String]).getOrElse("")
     val start =
@@ -129,7 +130,7 @@ trait Container {
 
     val parameterWrapper = JsObject("value" -> parameters)
     val body = JsObject(parameterWrapper.fields ++ environment.fields)
-    callContainer("/run", body, timeout, retry = false)
+    callContainer("/run", body, timeout, maxConcurrent, retry = false)
       .andThen { // never fails
         case Success(r: RunResult) =>
           transid.finished(
@@ -162,11 +163,14 @@ trait Container {
    * @param timeout timeout of the request
    * @param retry whether or not to retry the request
    */
-  protected def callContainer(path: String, body: JsObject, timeout: FiniteDuration, retry: Boolean = false)(
-    implicit transid: TransactionId): Future[RunResult] = {
+  protected def callContainer(path: String,
+                              body: JsObject,
+                              timeout: FiniteDuration,
+                              maxConcurrent: Int,
+                              retry: Boolean = false)(implicit transid: TransactionId): Future[RunResult] = {
     val started = Instant.now()
     val http = httpConnection.getOrElse {
-      val conn = new HttpUtils(s"${addr.host}:${addr.port}", timeout, 1.MB)
+      val conn = new HttpUtils(s"${addr.host}:${addr.port}", timeout, 1.MB, maxConcurrent)
       httpConnection = Some(conn)
       conn
     }
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala b/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala
index 35d3b8bb0c..f5d1542ced 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala
@@ -31,7 +31,8 @@ case class ContainerArgsConfig(network: String,
                                dnsServers: Seq[String] = Seq.empty,
                                extraArgs: Map[String, Set[String]] = Map.empty)
 
-case class ContainerPoolConfig(numCore: Int, coreShare: Int) {
+case class ContainerPoolConfig(numCore: Int, coreShare: Int, concurrentPeekFactor: Double = 0.5) {
+  require(concurrentPeekFactor > 0 && concurrentPeekFactor <= 1.0, "concurrentPeekFactor must be > 0 and <= 1.0")
 
   /**
    * The total number of containers is simply the number of cores dilated by the cpu sharing.
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala b/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
index e0fd37ff41..9c7ee8e344 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
@@ -18,13 +18,11 @@
 package whisk.core.containerpool
 
 import java.nio.charset.StandardCharsets
-
 import scala.concurrent.duration.DurationInt
 import scala.concurrent.duration.FiniteDuration
 import scala.util.Failure
 import scala.util.Success
 import scala.util.Try
-
 import org.apache.commons.io.IOUtils
 import org.apache.http.HttpHeaders
 import org.apache.http.client.config.RequestConfig
@@ -34,7 +32,7 @@ import org.apache.http.client.utils.URIBuilder
 import org.apache.http.conn.HttpHostConnectException
 import org.apache.http.entity.StringEntity
 import org.apache.http.impl.client.HttpClientBuilder
-
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager
 import spray.json._
 import whisk.core.entity.ActivationResponse._
 import whisk.core.entity.ByteSize
@@ -52,7 +50,7 @@ import whisk.core.entity.size.SizeLong
  * @param timeout the timeout in msecs to wait for a response
  * @param maxResponse the maximum size in bytes the connection will accept
  */
-protected[core] class HttpUtils(hostname: String, timeout: FiniteDuration, maxResponse: ByteSize) {
+protected[core] class HttpUtils(hostname: String, timeout: FiniteDuration, maxResponse: ByteSize, maxConcurrent: Int) {
 
   def close() = Try(connection.close())
 
@@ -76,11 +74,12 @@ protected[core] class HttpUtils(hostname: String, timeout: FiniteDuration, maxRe
     request.addHeader(HttpHeaders.ACCEPT, "application/json")
     request.setEntity(entity)
 
-    execute(request, timeout.toMillis.toInt, retry)
+    execute(request, timeout.toMillis.toInt, maxConcurrent, retry)
   }
 
   private def execute(request: HttpRequestBase,
                       timeoutMsec: Integer,
+                      maxConcurrent: Int,
                       retry: Boolean): Either[ContainerHttpError, ContainerResponse] = {
     Try(connection.execute(request)).map { response =>
       val containerResponse = Option(response.getEntity)
@@ -111,7 +110,7 @@ protected[core] class HttpUtils(hostname: String, timeout: FiniteDuration, maxRe
         if (timeoutMsec > 0) {
           Thread sleep 100
           val newTimeout = timeoutMsec - 100
-          execute(request, newTimeout, retry)
+          execute(request, newTimeout, maxConcurrent, retry)
         } else {
           Left(Timeout())
         }
@@ -131,8 +130,15 @@ protected[core] class HttpUtils(hostname: String, timeout: FiniteDuration, maxRe
     .setSocketTimeout(timeout.toMillis.toInt)
     .build
 
+  // Use PoolingHttpClientConnectionManager so that concurrent activation processing (if enabled) will reuse connections
+  val cm = new PoolingHttpClientConnectionManager
+  // Increase default max connections per route (default is 2)
+  cm.setDefaultMaxPerRoute(maxConcurrent)
+  // Increase max total connections (default is 20)
+  cm.setMaxTotal(maxConcurrent)
   private val connection = HttpClientBuilder.create
     .setDefaultRequestConfig(httpconfig)
+    .setConnectionManager(if (maxConcurrent > 1) cm else null) //set the Pooling connection manager IFF maxConcurrent > 1
     .useSystemProperties()
     .disableAutomaticRetries()
     .build
@@ -142,7 +148,7 @@ object HttpUtils {
 
   /** A helper method to post one single request to a connection. Used for container tests. */
   def post(host: String, port: Int, endPoint: String, content: JsValue): (Int, Option[JsObject]) = {
-    val connection = new HttpUtils(s"$host:$port", 90.seconds, 1.MB)
+    val connection = new HttpUtils(s"$host:$port", 90.seconds, 1.MB, 1)
     val response = connection.post(endPoint, content, retry = true)
     connection.close()
     response match {
diff --git a/common/scala/src/main/scala/whisk/core/entity/ConcurrencyLimit.scala b/common/scala/src/main/scala/whisk/core/entity/ConcurrencyLimit.scala
new file mode 100644
index 0000000000..717afd72c4
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/entity/ConcurrencyLimit.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.entity
+
+import scala.util.Failure
+import scala.util.Success
+import scala.util.Try
+
+import spray.json._
+import whisk.core.ConfigKeys
+import pureconfig._
+
+case class ConcurrencyLimitConfig(min: Int, max: Int, std: Int)
+
+/**
+ * ConcurrencyLimit encapsulates allowed concurrency in a single container for an action. The limit must be within a
+ * permissible range (by default [1, 500]).
+ *
+ * It is a value type (hence == is .equals, immutable and cannot be assigned null).
+ * The constructor is private so that argument requirements are checked and normalized
+ * before creating a new instance.
+ *
+ * @param maxConcurrent the max number of concurrent activations in a single container
+ */
+protected[entity] class ConcurrencyLimit private (val maxConcurrent: Int) extends AnyVal
+
+protected[core] object ConcurrencyLimit extends ArgNormalizer[ConcurrencyLimit] {
+  private val concurrencyConfig = loadConfigOrThrow[ConcurrencyLimitConfig](ConfigKeys.concurrencyLimit)
+
+  protected[core] val minConcurrent: Int = concurrencyConfig.min
+  protected[core] val maxConcurrent: Int = concurrencyConfig.max
+  protected[core] val stdConcurrent: Int = concurrencyConfig.std
+
+  /** Gets ConcurrencyLimit with default value */
+  protected[core] def apply(): ConcurrencyLimit = ConcurrencyLimit(stdConcurrent)
+
+  /**
+   * Creates ConcurrencyLimit for limit, iff limit is within permissible range.
+   *
+   * @param concurrency the limit, must be within permissible range
+   * @return ConcurrencyLimit with limit set
+   * @throws IllegalArgumentException if limit does not conform to requirements
+   */
+  @throws[IllegalArgumentException]
+  protected[core] def apply(concurrency: Int): ConcurrencyLimit = {
+    require(concurrency >= minConcurrent, s"concurrency $concurrency below allowed threshold of $minConcurrent")
+    require(concurrency <= maxConcurrent, s"concurrency $concurrency exceeds allowed threshold of $maxConcurrent")
+    new ConcurrencyLimit(concurrency)
+  }
+
+  override protected[core] implicit val serdes = new RootJsonFormat[ConcurrencyLimit] {
+    def write(m: ConcurrencyLimit) = JsNumber(m.maxConcurrent)
+
+    def read(value: JsValue) = {
+      Try {
+        val JsNumber(c) = value
+        require(c.isWhole(), "concurrency limit must be whole number")
+
+        ConcurrencyLimit(c.toInt)
+      } match {
+        case Success(limit)                       => limit
+        case Failure(e: IllegalArgumentException) => deserializationError(e.getMessage, e)
+        case Failure(e: Throwable)                => deserializationError("concurrency limit malformed", e)
+      }
+    }
+  }
+}
diff --git a/common/scala/src/main/scala/whisk/core/entity/Limits.scala b/common/scala/src/main/scala/whisk/core/entity/Limits.scala
index 5937df7c74..723671a285 100644
--- a/common/scala/src/main/scala/whisk/core/entity/Limits.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/Limits.scala
@@ -38,16 +38,19 @@ protected[entity] abstract class Limits {
  * {
  *   timeout: maximum duration in msecs an action is allowed to consume in [100 msecs, 5 minutes],
  *   memory: maximum memory in megabytes an action is allowed to consume within system limit, default [128 MB, 512 MB],
- *   logs: maximum logs line in megabytes an action is allowed to generate [10 MB]
+ *   logs: maximum logs line in megabytes an action is allowed to generate [10 MB],
+ *   concurrency: maximum number of concurrently processed activations per container [1, 200]
  * }
  *
  * @param timeout the duration in milliseconds, assured to be non-null because it is a value
  * @param memory the memory limit in megabytes, assured to be non-null because it is a value
  * @param logs the limit for logs written by the container and stored in the activation record, assured to be non-null because it is a value
+ * @param concurrency the limit on concurrently processed activations per container, assured to be non-null because it is a value
  */
 protected[core] case class ActionLimits(timeout: TimeLimit = TimeLimit(),
                                         memory: MemoryLimit = MemoryLimit(),
-                                        logs: LogLimit = LogLimit())
+                                        logs: LogLimit = LogLimit(),
+                                        concurrency: ConcurrencyLimit = ConcurrencyLimit())
     extends Limits {
   override protected[entity] def toJson = ActionLimits.serdes.write(this)
 }
@@ -62,7 +65,7 @@ protected[core] case class TriggerLimits protected[core] () extends Limits {
 protected[core] object ActionLimits extends ArgNormalizer[ActionLimits] with DefaultJsonProtocol {
 
   override protected[core] implicit val serdes = new RootJsonFormat[ActionLimits] {
-    val helper = jsonFormat3(ActionLimits.apply)
+    val helper = jsonFormat4(ActionLimits.apply)
 
     def read(value: JsValue) = {
       val obj = Try {
@@ -72,8 +75,9 @@ protected[core] object ActionLimits extends ArgNormalizer[ActionLimits] with Def
       val time = TimeLimit.serdes.read(obj.get("timeout") getOrElse deserializationError("'timeout' is missing"))
       val memory = MemoryLimit.serdes.read(obj.get("memory") getOrElse deserializationError("'memory' is missing"))
       val logs = obj.get("logs") map { LogLimit.serdes.read(_) } getOrElse LogLimit()
+      val concurrency = obj.get("concurrency") map { ConcurrencyLimit.serdes.read(_) } getOrElse ConcurrencyLimit()
 
-      ActionLimits(time, memory, logs)
+      ActionLimits(time, memory, logs, concurrency)
     }
 
     def write(a: ActionLimits) = helper.write(a)
diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala
index 1e380c2e88..da8dcffed8 100644
--- a/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala
@@ -37,7 +37,10 @@ import whisk.core.entity.types.EntityStore
  * ActionLimitsOption mirrors ActionLimits but makes both the timeout and memory
  * limit optional so that it is convenient to override just one limit at a time.
  */
-case class ActionLimitsOption(timeout: Option[TimeLimit], memory: Option[MemoryLimit], logs: Option[LogLimit])
+case class ActionLimitsOption(timeout: Option[TimeLimit],
+                              memory: Option[MemoryLimit],
+                              logs: Option[LogLimit],
+                              concurrency: Option[ConcurrencyLimit])
 
 /**
  * WhiskActionPut is a restricted WhiskAction view that eschews properties
@@ -538,7 +541,7 @@ object WhiskActionMetaData
 }
 
 object ActionLimitsOption extends DefaultJsonProtocol {
-  implicit val serdes = jsonFormat3(ActionLimitsOption.apply)
+  implicit val serdes = jsonFormat4(ActionLimitsOption.apply)
 }
 
 object WhiskActionPut extends DefaultJsonProtocol {
diff --git a/core/controller/src/main/scala/whisk/core/controller/Actions.scala b/core/controller/src/main/scala/whisk/core/controller/Actions.scala
index 25be8fa4ee..3f6a873bb6 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Actions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Actions.scala
@@ -382,7 +382,11 @@ trait WhiskActionsApi extends WhiskCollectionAPI with PostActionActivation with
     implicit transid: TransactionId) = {
     val exec = content.exec.get
     val limits = content.limits map { l =>
-      ActionLimits(l.timeout getOrElse TimeLimit(), l.memory getOrElse MemoryLimit(), l.logs getOrElse LogLimit())
+      ActionLimits(
+        l.timeout getOrElse TimeLimit(),
+        l.memory getOrElse MemoryLimit(),
+        l.logs getOrElse LogLimit(),
+        l.concurrency getOrElse ConcurrencyLimit())
     } getOrElse ActionLimits()
     // This is temporary while we are making sequencing directly supported in the controller.
     // The parameter override allows this to work with Pipecode.code. Any parameters other
diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf
index ebd45e5838..c0baae5a82 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -29,6 +29,7 @@ whisk {
   container-pool {
     num-core: 4      # used for computing --cpushares, and max number of containers allowed
     core-share: 2    # used for computing --cpushares, and max number of containers allowed
+    concurrent-schedule-factor: 0.5 #factor used to limit message peeking: 0 < factor <= 1.0 - larger number improves concurrent processing, but increases risk of message loss during invoker crash
   }
 
   kubernetes {
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
index 183556942e..f36203504d 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
@@ -18,18 +18,14 @@
 package whisk.core.containerpool
 
 import scala.collection.immutable
-
 import whisk.common.{AkkaLogging, LoggingMarkers, TransactionId}
-
 import akka.actor.{Actor, ActorRef, ActorRefFactory, Props}
-
 import whisk.core.entity.ByteSize
 import whisk.core.entity.CodeExec
 import whisk.core.entity.EntityName
 import whisk.core.entity.ExecutableWhiskAction
 import whisk.core.entity.size._
 import whisk.core.connector.MessageFeed
-
 import scala.concurrent.duration._
 
 sealed trait WorkerState
@@ -77,15 +73,16 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
     }
   }
 
-  def logContainerStart(r: Run, containerState: String): Unit = {
+  def logContainerStart(r: Run, containerState: String, activeActivations: Int): Unit = {
     val namespaceName = r.msg.user.namespace.name
     val actionName = r.action.name.name
+    val maxConcurrent = r.action.limits.concurrency.maxConcurrent
     val activationId = r.msg.activationId.toString
 
     r.msg.transid.mark(
       this,
       LoggingMarkers.INVOKER_CONTAINER_START(containerState),
-      s"containerStart containerState: $containerState action: $actionName namespace: $namespaceName activationId: $activationId",
+      s"containerStart containerState: $containerState ($activeActivations of max $maxConcurrent) action: $actionName namespace: $namespaceName activationId: $activationId",
       akka.event.Logging.InfoLevel)
   }
 
@@ -102,16 +99,16 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
         ContainerPool
           .schedule(r.action, r.msg.user.namespace, freePool)
           .map(container => {
-            (container, "warm")
+            (container, "warm", container._2.activeActivationCount)
           })
           .orElse {
             if (busyPool.size + freePool.size < poolConfig.maxActiveContainers) {
               takePrewarmContainer(r.action)
                 .map(container => {
-                  (container, "prewarmed")
+                  (container, "prewarmed", container._2.activeActivationCount)
                 })
                 .orElse {
-                  Some(createContainer(), "cold")
+                  Some(createContainer(), "cold", 0)
                 }
             } else None
           }
@@ -121,21 +118,29 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
               removeContainer(toDelete)
               takePrewarmContainer(r.action)
                 .map(container => {
-                  (container, "recreated")
+                  (container, "recreated", container._2.activeActivationCount)
                 })
                 .getOrElse {
-                  (createContainer(), "recreated")
+                  (createContainer(), "recreated", 0)
                 }
             }
           }
       } else None
 
       createdContainer match {
-        case Some(((actor, data), containerState)) =>
-          busyPool = busyPool + (actor -> data)
-          freePool = freePool - actor
+        case Some(((actor, data), containerState, activeActivations)) =>
+          //only move to busyPool if max reached
+          if (data.activeActivationCount + 1 >= r.action.limits.concurrency.maxConcurrent) {
+            if (r.action.limits.concurrency.maxConcurrent > 1) {
+              logging.info(
+                this,
+                s"container for ${r.action} is now busy with ${data.activeActivationCount + 1} activations")
+            }
+            busyPool = busyPool + (actor -> data)
+            freePool = freePool - actor
+          }
           actor ! r // forwards the run request to the container
-          logContainerStart(r, containerState)
+          logContainerStart(r, containerState, activeActivations)
         case None =>
           // this can also happen if createContainer fails to start a new container, or
           // if a job is rescheduled but the container it was allocated to has not yet destroyed itself
@@ -156,10 +161,33 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
 
     // Container is free to take more work
     case NeedWork(data: WarmedData) =>
-      freePool = freePool + (sender() -> data)
-      busyPool.get(sender()).foreach { _ =>
-        busyPool = busyPool - sender()
-        feed ! MessageFeed.Processed
+      feed ! MessageFeed.Processed
+      if (data.activeActivationCount < data.action.limits.concurrency.maxConcurrent) {
+        //remove from busy pool (may already not be there), put back into free pool (to update activation counts)
+        freePool = freePool + (sender() -> data)
+        if (busyPool.contains(sender())) {
+          busyPool = busyPool - sender()
+          if (data.action.limits.concurrency.maxConcurrent > 1) {
+            logging.info(
+              this,
+              s"container for ${data.action} is no longer busy with ${data.activeActivationCount} activations")
+          }
+        }
+      } else {
+        //update freePool IFF it was previously PreWarmedData (it is still free, but now has WarmedData)
+        //otherwise update busyPool to reflect the updated activation counts
+        freePool.get(sender()) match {
+          case Some(_: PreWarmedData) =>
+            freePool = freePool + (sender() -> data)
+          case None =>
+            if (data.action.limits.concurrency.maxConcurrent > 1) {
+              logging.info(
+                this,
+                s"container for ${data.action} is now busy with ${data.activeActivationCount} activations")
+            }
+            busyPool = busyPool + (sender() -> data)
+          case _ => //was free+WarmedData - do nothing
+        }
       }
 
     // Container is prewarmed and ready to take work
@@ -169,11 +197,9 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
     // Container got removed
     case ContainerRemoved =>
       freePool = freePool - sender()
-      busyPool.get(sender()).foreach { _ =>
-        busyPool = busyPool - sender()
-        // container was busy, so there is capacity to accept another job request
-        feed ! MessageFeed.Processed
-      }
+      busyPool = busyPool - sender()
+      // container was busy, so there is capacity to accept another job request
+      feed ! MessageFeed.Processed
 
     // This message is received for one of these reasons:
     // 1. Container errored while resuming a warm container, could not process the job, and sent the job back
@@ -210,8 +236,8 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
       val memory = action.limits.memory.megabytes.MB
       prewarmedPool
         .find {
-          case (_, PreWarmedData(_, `kind`, `memory`)) => true
-          case _                                       => false
+          case (_, PreWarmedData(_, `kind`, `memory`, _)) => true
+          case _                                          => false
         }
         .map {
           case (ref, data) =>
@@ -254,8 +280,10 @@ object ContainerPool {
                                            invocationNamespace: EntityName,
                                            idles: Map[A, ContainerData]): Option[(A, ContainerData)] = {
     idles.find {
-      case (_, WarmedData(_, `invocationNamespace`, `action`, _)) => true
-      case _                                                      => false
+      case c @ (_, WarmedData(_, `invocationNamespace`, `action`, _, _))
+          if c._2.activeActivationCount < action.limits.concurrency.maxConcurrent =>
+        true
+      case _ => false
     }
   }
 
@@ -269,8 +297,10 @@ object ContainerPool {
    * @return a container to be removed iff found
    */
   protected[containerpool] def remove[A](pool: Map[A, ContainerData]): Option[A] = {
+    // Try to find a Free container that does NOT have any active activations AND is initialized with any OTHER action
     val freeContainers = pool.collect {
-      case (ref, w: WarmedData) => ref -> w
+      case (ref, w: WarmedData) if w.activeActivationCount == 0 =>
+        ref -> w
     }
 
     if (freeContainers.nonEmpty) {
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
index b75ad72a7e..676e21f765 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
@@ -18,7 +18,6 @@
 package whisk.core.containerpool
 
 import java.time.Instant
-
 import scala.concurrent.Future
 import scala.concurrent.duration._
 import scala.util.Success
@@ -53,14 +52,24 @@ case object Paused extends ContainerState
 case object Removing extends ContainerState
 
 // Data
-sealed abstract class ContainerData(val lastUsed: Instant)
+sealed abstract class ContainerData(val lastUsed: Instant, val activeActivationCount: Int = 0)
 case class NoData() extends ContainerData(Instant.EPOCH)
-case class PreWarmedData(container: Container, kind: String, memoryLimit: ByteSize) extends ContainerData(Instant.EPOCH)
+case class PreWarmedData(container: Container,
+                         kind: String,
+                         memoryLimit: ByteSize,
+                         override val activeActivationCount: Int = 0)
+    extends ContainerData(Instant.EPOCH)
 case class WarmedData(container: Container,
                       invocationNamespace: EntityName,
                       action: ExecutableWhiskAction,
-                      override val lastUsed: Instant)
-    extends ContainerData(lastUsed)
+                      override val lastUsed: Instant,
+                      override val activeActivationCount: Int = 0)
+    extends ContainerData(lastUsed) {
+  def incrementActive: WarmedData =
+    WarmedData(container, invocationNamespace, action, Instant.now, activeActivationCount + 1)
+  def decrementActive: WarmedData =
+    WarmedData(container, invocationNamespace, action, Instant.now, activeActivationCount - 1)
+}
 
 // Events received by the actor
 case class Start(exec: CodeExec[_], memoryLimit: ByteSize)
@@ -146,7 +155,7 @@ class ContainerProxy(
           case Success(container) =>
             // the container is ready to accept an activation; register it as PreWarmed; this
             // normalizes the life cycle for containers and their cleanup when activations fail
-            self ! PreWarmedData(container, job.action.exec.kind, job.action.limits.memory.megabytes.MB)
+            self ! PreWarmedData(container, job.action.exec.kind, job.action.limits.memory.megabytes.MB, 1)
 
           case Failure(t) =>
             // the container did not come up cleanly, so disambiguate the failure mode and then cleanup
@@ -194,8 +203,7 @@ class ContainerProxy(
       initializeAndRun(data.container, job)
         .map(_ => WarmedData(data.container, job.msg.user.namespace, job.action, Instant.now))
         .pipeTo(self)
-
-      goto(Running)
+      goto(Running) using PreWarmedData(data.container, data.kind, data.memoryLimit, 1)
 
     case Event(Remove, data: PreWarmedData) => destroyContainer(data.container)
   }
@@ -205,10 +213,34 @@ class ContainerProxy(
     // and we keep it in case we need to destroy it.
     case Event(data: PreWarmedData, _) => stay using data
 
-    // Run was successful
-    case Event(data: WarmedData, _) =>
+    // Init was successful
+    case Event(data: WarmedData, _: PreWarmedData) =>
+      //in case concurrency supported, multiple runs can begin as soon as init is complete
       context.parent ! NeedWork(data)
-      goto(Ready) using data
+      stay using data
+
+    // Run was successful
+    case Event(_: WarmedData, s: WarmedData) =>
+      val newData = s.decrementActive
+
+      context.parent ! NeedWork(newData)
+
+      if (newData.activeActivationCount > 0) {
+        stay using newData
+      } else {
+        goto(Ready) using newData
+      }
+
+    case Event(job: Run, data: WarmedData)
+        if stateData.activeActivationCount < data.action.limits.concurrency.maxConcurrent && !rescheduleJob => //if there was a delay, or a failure on resume, skip the run
+
+      implicit val transid = job.msg.transid
+      val newData = data.incrementActive
+
+      initializeAndRun(data.container, job)
+        .map(_ => WarmedData(data.container, job.msg.user.namespace, job.action, Instant.now))
+        .pipeTo(self)
+      stay() using newData
 
     // Failed after /init (the first run failed)
     case Event(_: FailureMessage, data: PreWarmedData) => destroyContainer(data.container)
@@ -227,11 +259,13 @@ class ContainerProxy(
   when(Ready, stateTimeout = pauseGrace) {
     case Event(job: Run, data: WarmedData) =>
       implicit val transid = job.msg.transid
+      val newData = data.incrementActive
+
       initializeAndRun(data.container, job)
         .map(_ => WarmedData(data.container, job.msg.user.namespace, job.action, Instant.now))
         .pipeTo(self)
 
-      goto(Running)
+      goto(Running) using newData
 
     // pause grace timed out
     case Event(StateTimeout, data: WarmedData) =>
@@ -250,6 +284,8 @@ class ContainerProxy(
   when(Paused, stateTimeout = unusedTimeout) {
     case Event(job: Run, data: WarmedData) =>
       implicit val transid = job.msg.transid
+      val newData = data.incrementActive
+
       data.container
         .resume()
         .andThen {
@@ -264,7 +300,7 @@ class ContainerProxy(
         .map(_ => WarmedData(data.container, job.msg.user.namespace, job.action, Instant.now))
         .pipeTo(self)
 
-      goto(Running)
+      goto(Running) using newData
 
     // container is reclaimed by the pool or it has become too old
     case Event(StateTimeout | Remove, data: WarmedData) =>
@@ -341,12 +377,20 @@ class ContainerProxy(
 
     // Only initialize iff we haven't yet warmed the container
     val initialize = stateData match {
-      case data: WarmedData => Future.successful(None)
-      case _                => container.initialize(job.action.containerInitializer, actionTimeout).map(Some(_))
+      case data: WarmedData =>
+        Future.successful(None)
+      case _ =>
+        container
+          .initialize(job.action.containerInitializer, actionTimeout, job.action.limits.concurrency.maxConcurrent)
+          .map(Some(_))
     }
 
     val activation: Future[WhiskActivation] = initialize
       .flatMap { initInterval =>
+        //immediately setup warmedData for use (before first execution) so that concurrent actions can use it asap
+        if (!initInterval.isEmpty) {
+          self ! WarmedData(container, job.msg.user.namespace, job.action, Instant.now, 1)
+        }
         val parameters = job.msg.content getOrElse JsObject()
 
         val environment = JsObject(
@@ -358,13 +402,15 @@ class ContainerProxy(
           // but potentially under-estimates actual deadline
           "deadline" -> (Instant.now.toEpochMilli + actionTimeout.toMillis).toString.toJson)
 
-        container.run(parameters, environment, actionTimeout)(job.msg.transid).map {
-          case (runInterval, response) =>
-            val initRunInterval = initInterval
-              .map(i => Interval(runInterval.start.minusMillis(i.duration.toMillis), runInterval.end))
-              .getOrElse(runInterval)
-            ContainerProxy.constructWhiskActivation(job, initInterval, initRunInterval, response)
-        }
+        container
+          .run(parameters, environment, actionTimeout, job.action.limits.concurrency.maxConcurrent)(job.msg.transid)
+          .map {
+            case (runInterval, response) =>
+              val initRunInterval = initInterval
+                .map(i => Interval(runInterval.start.minusMillis(i.duration.toMillis), runInterval.end))
+                .getOrElse(runInterval)
+              ContainerProxy.constructWhiskActivation(job, initInterval, initRunInterval, response)
+          }
       }
       .recover {
         case InitializationError(interval, response) =>
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
index 5c959de4f2..5f14357b2b 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
@@ -186,11 +186,18 @@ class DockerContainer(protected val id: ContainerId,
     }
   }
 
-  override protected def callContainer(path: String, body: JsObject, timeout: FiniteDuration, retry: Boolean = false)(
-    implicit transid: TransactionId): Future[RunResult] = {
+  override protected def callContainer(path: String,
+                                       body: JsObject,
+                                       timeout: FiniteDuration,
+                                       maxConcurrent: Int,
+                                       retry: Boolean = false)(implicit transid: TransactionId): Future[RunResult] = {
     val started = Instant.now()
     val http = httpConnection.getOrElse {
-      val conn = new HttpUtils(s"${addr.host}:${addr.port}", timeout, ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT)
+      val conn = new HttpUtils(
+        s"${addr.host}:${addr.port}",
+        timeout,
+        ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT,
+        maxConcurrent)
       httpConnection = Some(conn)
       conn
     }
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index b132dd815f..0015805331 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -46,7 +46,8 @@ class InvokerReactive(
   config: WhiskConfig,
   instance: InstanceId,
   producer: MessageProducer,
-  poolConfig: ContainerPoolConfig = loadConfigOrThrow[ContainerPoolConfig](ConfigKeys.containerPool))(
+  poolConfig: ContainerPoolConfig = loadConfigOrThrow[ContainerPoolConfig](ConfigKeys.containerPool),
+  limitsConfig: ConcurrencyLimitConfig = loadConfigOrThrow[ConcurrencyLimitConfig](ConfigKeys.concurrencyLimit))(
   implicit actorSystem: ActorSystem,
   logging: Logging) {
 
@@ -94,19 +95,21 @@ class InvokerReactive(
     }
   }
 
+  private val maximumContainers = poolConfig.maxActiveContainers
+
   /** Initialize message consumers */
   private val topic = s"invoker${instance.toInt}"
-  private val maximumContainers = poolConfig.maxActiveContainers
   private val msgProvider = SpiLoader.get[MessagingProvider]
-  private val consumer = msgProvider.getConsumer(
-    config,
-    topic,
-    topic,
-    maximumContainers,
-    maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
+
+  //number of peeked messages - increasing the concurrentScheduleFactor improves concurrent usage, but adds risk for message loss in case of crash
+  private val maxPeek =
+    math.max(maximumContainers, (maximumContainers * limitsConfig.max * poolConfig.concurrentPeekFactor).toInt)
+
+  private val consumer =
+    msgProvider.getConsumer(config, topic, topic, maxPeek, maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
 
   private val activationFeed = actorSystem.actorOf(Props {
-    new MessageFeed("activation", logging, consumer, maximumContainers, 500.milliseconds, processActivationMessage)
+    new MessageFeed("activation", logging, consumer, maxPeek, 500.milliseconds, processActivationMessage)
   })
 
   /** Sends an active-ack. */
@@ -180,7 +183,8 @@ class InvokerReactive(
     .get
 
   private val pool = actorSystem.actorOf(
-    ContainerPool.props(childFactory, poolConfig, activationFeed, Some(PrewarmingConfig(2, prewarmExec, 256.MB))))
+    ContainerPool
+      .props(childFactory, poolConfig, activationFeed, Some(PrewarmingConfig(2, prewarmExec, 256.MB))))
 
   /** Is called when an ActivationMessage is read from Kafka */
   def processActivationMessage(bytes: Array[Byte]): Future[Unit] = {
diff --git a/performance/gatling_tests/src/gatling/resources/data/nodeJSAsyncAction.js b/performance/gatling_tests/src/gatling/resources/data/nodeJSAsyncAction.js
new file mode 100644
index 0000000000..b75cdb92a2
--- /dev/null
+++ b/performance/gatling_tests/src/gatling/resources/data/nodeJSAsyncAction.js
@@ -0,0 +1,12 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more contributor
+// license agreements; and to You under the Apache License, Version 2.0.
+
+function main(params) {
+    var greeting = "Hello" + (params.text || "stranger") + "!";
+    console.log(greeting);
+    return new Promise(function (resolve, reject) {
+        setTimeout(function () {
+            resolve({payload: greeting});
+        }, 175);
+    })
+}
diff --git a/performance/gatling_tests/src/gatling/scala/BlockingInvokeOneActionSimulation.scala b/performance/gatling_tests/src/gatling/scala/BlockingInvokeOneActionSimulation.scala
index 79f2751dbf..f3c2be4f8c 100644
--- a/performance/gatling_tests/src/gatling/scala/BlockingInvokeOneActionSimulation.scala
+++ b/performance/gatling_tests/src/gatling/scala/BlockingInvokeOneActionSimulation.scala
@@ -43,17 +43,21 @@ class BlockingInvokeOneActionSimulation extends Simulation {
   // Generate the OpenWhiskProtocol
   val openWhiskProtocol: OpenWhiskProtocolBuilder = openWhisk.apiHost(host)
 
+  // Specify async
+  val async = sys.env.getOrElse("ASYNC", "false").toBoolean
+
   val actionName = "testActionForBlockingInvokeOneAction"
+  val actionfile = if (async) "nodeJSAsyncAction.js" else "nodeJSAction.js"
 
   // Define scenario
-  val test: ScenarioBuilder = scenario("Invoke one action blocking")
+  val test: ScenarioBuilder = scenario(s"Invoke one ${if (async) "async" else "sync"} action blocking")
     .doIf(_.userId == 1) {
       exec(
         openWhisk("Create action")
           .authenticate(uuid, key)
           .action(actionName)
           .create(FileUtils
-            .readFileToString(Resource.body("nodeJSAction.js").get.file, StandardCharsets.UTF_8)))
+            .readFileToString(Resource.body(actionfile).get.file, StandardCharsets.UTF_8)))
     }
     .rendezVous(connections)
     .during(5.seconds) {
diff --git a/performance/preparation/actions/async.js b/performance/preparation/actions/async.js
new file mode 100644
index 0000000000..3091025b2e
--- /dev/null
+++ b/performance/preparation/actions/async.js
@@ -0,0 +1,7 @@
+function main() {
+  return new Promise(function (resolve, reject) {
+    setTimeout(function () {
+      resolve({done: true});
+    }, 175);
+  })
+}
\ No newline at end of file
diff --git a/performance/preparation/actions/noop.js b/performance/preparation/actions/noop.js
new file mode 100644
index 0000000000..e3a962c4ad
--- /dev/null
+++ b/performance/preparation/actions/noop.js
@@ -0,0 +1,3 @@
+function main() {
+  return {};
+}
\ No newline at end of file
diff --git a/performance/preparation/create.sh b/performance/preparation/create.sh
index a7249fe418..61ed3e3b24 100755
--- a/performance/preparation/create.sh
+++ b/performance/preparation/create.sh
@@ -23,10 +23,23 @@ host=$1
 credentials=$2
 # Name of the action to create and test.
 action=$3
+# Path to action src
+action_src=$4
+# Concurrency setting
+action_concurrency=${5:-1}
+
+# jq will json encode the src (need to strip leading/trailing quotes that jq adds)
+#action_code=$(jq -cs . "$action_src" | sed 's/^.\(.*\).$/\1/')
+action_code=$(cat "$action_src")
+
+# setup the action json to create the action
+action_json='{"namespace":"_","name":"'"$action"'","exec":{"kind":"nodejs:default","code":""},"limits":{"concurrency":'"$action_concurrency"'}}'
+action_json=$(echo  "$action_json" | jq -c --arg code "$action_code" '.exec.code=($code)')
+
 
 # create a noop action
 echo "Creating action $action"
-curl -k -u "$credentials" "$host/api/v1/namespaces/_/actions/$action" -XPUT -d '{"namespace":"_","name":"test","exec":{"kind":"nodejs:default","code":"function main(){return {};}"}}' -H "Content-Type: application/json"
+curl -k -u "$credentials" "$host/api/v1/namespaces/_/actions/$action" -XPUT -d "$action_json" -H "Content-Type: application/json"
 
 # run the noop action
 echo "Running $action once to assert an intact system"
diff --git a/performance/wrk_tests/latency.sh b/performance/wrk_tests/latency.sh
index 5031253d08..214c6e81a7 100755
--- a/performance/wrk_tests/latency.sh
+++ b/performance/wrk_tests/latency.sh
@@ -22,7 +22,9 @@ currentDir="$(cd "$(dirname "$0")"; pwd)"
 host=$1
 # Credentials to use for the test. USER:PASS format.
 credentials=$2
+# Path to action src
+action_src=$3
 # How long to run the test
-duration=${3:-30s}
+duration=${4:-30s}
 
-$currentDir/throughput.sh $host $credentials 1 1 $duration
+$currentDir/throughput.sh $host $credentials  $action_src 1 1 $duration
diff --git a/performance/wrk_tests/throughput.sh b/performance/wrk_tests/throughput.sh
index 5629b9284e..fb9ab5e631 100755
--- a/performance/wrk_tests/throughput.sh
+++ b/performance/wrk_tests/throughput.sh
@@ -22,17 +22,23 @@ currentDir="$(cd "$(dirname "$0")"; pwd)"
 host=$1
 # Credentials to use for the test. USER:PASS format.
 credentials=$2
+# Path to action src
+action_src=$3
 # concurrency level of the throughput test: How many requests should
 # open in parallel.
-concurrency=$3
+concurrency=$4
+# Action concurrency setting (how many concurrent activations does action allow?)
+action_concurrency=${5:-1}
 # How many threads to utilize, directly correlates to the number
 # of CPU cores
-threads=${4:-4}
+threads=${6:-4}
 # How long to run the test
-duration=${5:-30s}
+duration=${7:-30s}
 
-action="noopThroughput"
-"$currentDir/../preparation/create.sh" "$host" "$credentials" "$action"
+# Use the filename (without extension) of the action_src as the name of the action
+action="$(basename $action_src | cut -f 1 -d '.')_$action_concurrency"
+
+"$currentDir/../preparation/create.sh" "$host" "$credentials" "$action" "$action_src" "$action_concurrency"
 
 # run throughput tests
 encodedAuth=$(echo "$credentials" | tr -d '\n' | base64 | tr -d '\n')
@@ -41,6 +47,7 @@ docker run --pid=host --userns=host --rm -v "$currentDir":/data williamyeh/wrk \
   --connections "$concurrency" \
   --duration "$duration" \
   --header "Authorization: basic $encodedAuth" \
+  --header "X-Request-ID: throughput-$action" \
   "$host/api/v1/namespaces/_/actions/$action?blocking=true" \
   --latency \
   --timeout 10s \
diff --git a/tests/src/test/scala/common/BaseWsk.scala b/tests/src/test/scala/common/BaseWsk.scala
index 8a1611ae1e..9d15875801 100644
--- a/tests/src/test/scala/common/BaseWsk.scala
+++ b/tests/src/test/scala/common/BaseWsk.scala
@@ -199,6 +199,7 @@ trait BaseAction extends BaseRunWsk with BaseDeleteFromCollection with BaseListO
              timeout: Option[Duration] = None,
              memory: Option[ByteSize] = None,
              logsize: Option[ByteSize] = None,
+             concurrency: Option[Int] = None,
              shared: Option[Boolean] = None,
              update: Boolean = false,
              web: Option[String] = None,
diff --git a/tests/src/test/scala/common/Wsk.scala b/tests/src/test/scala/common/Wsk.scala
index f7f03f6bbb..5ed0c71677 100644
--- a/tests/src/test/scala/common/Wsk.scala
+++ b/tests/src/test/scala/common/Wsk.scala
@@ -246,6 +246,7 @@ class WskAction()
     timeout: Option[Duration] = None,
     memory: Option[ByteSize] = None,
     logsize: Option[ByteSize] = None,
+    concurrency: Option[Int] = None,
     shared: Option[Boolean] = None,
     update: Boolean = false,
     web: Option[String] = None,
diff --git a/tests/src/test/scala/common/rest/WskRest.scala b/tests/src/test/scala/common/rest/WskRest.scala
index 395047efc4..18ccd144b8 100644
--- a/tests/src/test/scala/common/rest/WskRest.scala
+++ b/tests/src/test/scala/common/rest/WskRest.scala
@@ -291,6 +291,7 @@ class WskRestAction
     timeout: Option[Duration] = None,
     memory: Option[ByteSize] = None,
     logsize: Option[ByteSize] = None,
+    concurrency: Option[Int] = None,
     shared: Option[Boolean] = None,
     update: Boolean = false,
     web: Option[String] = None,
@@ -437,6 +438,10 @@ class WskRestAction
       memory map { m =>
         Map("memory" -> m.toMB.toJson)
       } getOrElse Map[String, JsValue]()
+    } ++ {
+      concurrency map { c =>
+        Map("concurrency" -> c.toInt.toJson)
+      } getOrElse Map[String, JsValue]()
     }
 
     val bodyContent =
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala
index 0e0867a25d..d5385c27df 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala
@@ -85,7 +85,7 @@ class ContainerConnectionTests extends FlatSpec with Matchers with BeforeAndAfte
 
   it should "not wait longer than set timeout" in {
     val timeout = 5.seconds
-    val connection = new HttpUtils(hostWithPort, timeout, 1.B)
+    val connection = new HttpUtils(hostWithPort, timeout, 1.B, 1)
     testHang = timeout * 2
     val start = Instant.now()
     val result = connection.post("/init", JsObject(), retry = true)
@@ -98,7 +98,7 @@ class ContainerConnectionTests extends FlatSpec with Matchers with BeforeAndAfte
 
   it should "handle empty entity response" in {
     val timeout = 5.seconds
-    val connection = new HttpUtils(hostWithPort, timeout, 1.B)
+    val connection = new HttpUtils(hostWithPort, timeout, 1.B, 1)
     testStatusCode = 204
     val result = connection.post("/init", JsObject(), retry = true)
     result shouldBe Left(NoResponseReceived())
@@ -106,7 +106,7 @@ class ContainerConnectionTests extends FlatSpec with Matchers with BeforeAndAfte
 
   it should "not truncate responses within limit" in {
     val timeout = 1.minute.toMillis
-    val connection = new HttpUtils(hostWithPort, timeout.millis, 50.B)
+    val connection = new HttpUtils(hostWithPort, timeout.millis, 50.B, 1)
     Seq(true, false).foreach { code =>
       Seq(null, "", "abc", """{"a":"B"}""", """["a", "b"]""").foreach { r =>
         testStatusCode = if (code) 200 else 500
@@ -123,7 +123,7 @@ class ContainerConnectionTests extends FlatSpec with Matchers with BeforeAndAfte
     val timeout = 1.minute.toMillis
     val limit = 1.B
     val excess = limit + 1.B
-    val connection = new HttpUtils(hostWithPort, timeout.millis, limit)
+    val connection = new HttpUtils(hostWithPort, timeout.millis, limit, 1)
     Seq(true, false).foreach { code =>
       Seq("abc", """{"a":"B"}""", """["a", "b"]""").foreach { r =>
         testStatusCode = if (code) 200 else 500
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
index 25bc0c980c..5447d77abb 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
@@ -111,6 +111,7 @@ class DockerContainerTests
         path: String,
         body: JsObject,
         timeout: FiniteDuration,
+        concurrent: Int,
         retry: Boolean = false)(implicit transid: TransactionId): Future[RunResult] = {
         ccRes
       }
@@ -362,7 +363,7 @@ class DockerContainerTests
       Future.successful(RunResult(interval, Right(ContainerResponse(true, "", None))))
     }
 
-    val initInterval = container.initialize(JsObject(), initTimeout)
+    val initInterval = container.initialize(JsObject(), initTimeout, 1)
     await(initInterval, initTimeout) shouldBe interval
 
     // assert the starting log is there
@@ -386,7 +387,7 @@ class DockerContainerTests
       Future.successful(RunResult(interval, Left(Timeout())))
     }
 
-    val init = container.initialize(JsObject(), initTimeout)
+    val init = container.initialize(JsObject(), initTimeout, 1)
 
     val error = the[InitializationError] thrownBy await(init, initTimeout)
     error.interval shouldBe interval
@@ -413,7 +414,7 @@ class DockerContainerTests
       Future.successful(RunResult(interval, Right(ContainerResponse(true, result.compactPrint, None))))
     }
 
-    val runResult = container.run(JsObject(), JsObject(), 1.second)
+    val runResult = container.run(JsObject(), JsObject(), 1.second, 1)
     await(runResult) shouldBe (interval, ActivationResponse.success(Some(result)))
 
     // assert the starting log is there
@@ -437,7 +438,7 @@ class DockerContainerTests
       Future.successful(RunResult(interval, Left(Timeout())))
     }
 
-    val runResult = container.run(JsObject(), JsObject(), runTimeout)
+    val runResult = container.run(JsObject(), JsObject(), runTimeout, 1)
     await(runResult) shouldBe (interval, ActivationResponse.applicationError(
       Messages.timedoutActivation(runTimeout, false)))
 
diff --git a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
index fc3ba6805f..61d1fab3cd 100644
--- a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
@@ -108,6 +108,7 @@ class KubernetesContainerTests
         path: String,
         body: JsObject,
         timeout: FiniteDuration,
+        concurrent: Int,
         retry: Boolean = false)(implicit transid: TransactionId): Future[RunResult] = {
         ccRes
       }
@@ -215,7 +216,7 @@ class KubernetesContainerTests
       Future.successful(RunResult(interval, Right(ContainerResponse(true, "", None))))
     }
 
-    val initInterval = container.initialize(JsObject(), initTimeout)
+    val initInterval = container.initialize(JsObject(), initTimeout, 1)
     await(initInterval, initTimeout) shouldBe interval
 
     // assert the starting log is there
@@ -238,7 +239,7 @@ class KubernetesContainerTests
       Future.successful(RunResult(interval, Left(Timeout())))
     }
 
-    val init = container.initialize(JsObject(), initTimeout)
+    val init = container.initialize(JsObject(), initTimeout, 1)
 
     val error = the[InitializationError] thrownBy await(init, initTimeout)
     error.interval shouldBe interval
@@ -264,7 +265,7 @@ class KubernetesContainerTests
       Future.successful(RunResult(interval, Right(ContainerResponse(true, result.compactPrint, None))))
     }
 
-    val runResult = container.run(JsObject(), JsObject(), 1.second)
+    val runResult = container.run(JsObject(), JsObject(), 1.second, 1)
     await(runResult) shouldBe (interval, ActivationResponse.success(Some(result)))
 
     // assert the starting log is there
@@ -287,7 +288,7 @@ class KubernetesContainerTests
       Future.successful(RunResult(interval, Left(Timeout())))
     }
 
-    val runResult = container.run(JsObject(), JsObject(), runTimeout)
+    val runResult = container.run(JsObject(), JsObject(), runTimeout, 1)
     await(runResult) shouldBe (interval, ActivationResponse.applicationError(
       Messages.timedoutActivation(runTimeout, false)))
 
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
index b61e6f8db0..d8f87b54f5 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
@@ -330,14 +330,15 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory {
   val differentNamespace = EntityName("differentNamespace")
 
   /** Helper to create a new action from String representations */
-  def createAction(namespace: String = "actionNS", name: String = "actionName") =
-    ExecutableWhiskAction(EntityPath(namespace), EntityName(name), actionExec)
+  def createAction(namespace: String = "actionNS", name: String = "actionName", limits: ActionLimits = ActionLimits()) =
+    ExecutableWhiskAction(EntityPath(namespace), EntityName(name), actionExec, limits = limits)
 
   /** Helper to create WarmedData with sensible defaults */
   def warmedData(action: ExecutableWhiskAction = createAction(),
                  namespace: String = standardNamespace.asString,
-                 lastUsed: Instant = Instant.now) =
-    WarmedData(stub[Container], EntityName(namespace), action, lastUsed)
+                 lastUsed: Instant = Instant.now,
+                 active: Int = 0) =
+    WarmedData(stub[Container], EntityName(namespace), action, lastUsed, active)
 
   /** Helper to create PreWarmedData with sensible defaults */
   def preWarmedData(kind: String = "anyKind") = PreWarmedData(stub[Container], kind, 256.MB)
@@ -411,6 +412,24 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory {
     ContainerPool.schedule(differentAction, data.invocationNamespace, pool) shouldBe None
   }
 
+  it should "not use a container when active activation count >= maxconcurrent" in {
+    val maxConcurrent = 25
+
+    val data = warmedData(
+      active = maxConcurrent,
+      action = createAction(limits = ActionLimits(concurrency = ConcurrencyLimit(maxConcurrent))))
+    val pool = Map('warm -> data)
+    ContainerPool.schedule(data.action, data.invocationNamespace, pool) shouldBe None
+
+    val data2 = warmedData(
+      active = maxConcurrent - 1,
+      action = createAction(limits = ActionLimits(concurrency = ConcurrencyLimit(maxConcurrent))))
+    val pool2 = Map('warm -> data2)
+
+    ContainerPool.schedule(data2.action, data2.invocationNamespace, pool2) shouldBe Some('warm, data2)
+
+  }
+
   behavior of "ContainerPool remove()"
 
   it should "not provide a container if pool is empty" in {
@@ -438,4 +457,16 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory {
 
     ContainerPool.remove(pool) shouldBe Some('oldest)
   }
+
+  it should "provide oldest container (excluding concurrently busy) from busy pool with multiple containers" in {
+    val commonNamespace = differentNamespace.asString
+    val first = warmedData(namespace = commonNamespace, lastUsed = Instant.ofEpochMilli(1), active = 0)
+    val second = warmedData(namespace = commonNamespace, lastUsed = Instant.ofEpochMilli(2), active = 0)
+    val oldest = warmedData(namespace = commonNamespace, lastUsed = Instant.ofEpochMilli(0), active = 3)
+
+    var pool = Map('first -> first, 'second -> second, 'oldest -> oldest)
+    ContainerPool.remove(pool) shouldBe Some('first)
+    pool = pool - 'first
+    ContainerPool.remove(pool) shouldBe Some('second)
+  }
 }
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index 205834ed40..f66a6a2934 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -18,7 +18,6 @@
 package whisk.core.containerpool.test
 
 import java.time.Instant
-
 import akka.actor.FSM.{CurrentState, SubscribeTransitionCallBack, Transition}
 import akka.actor.{ActorRef, ActorSystem, FSM}
 import akka.stream.scaladsl.Source
@@ -38,9 +37,10 @@ import whisk.core.entity.ExecManifest.{ImageName, RuntimeManifest}
 import whisk.core.entity._
 import whisk.core.entity.size._
 import whisk.http.Messages
-
 import scala.concurrent.duration._
 import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.Try
+//import scala.util.Try
 
 @RunWith(classOf[JUnitRunner])
 class ContainerProxyTests
@@ -65,6 +65,11 @@ class ContainerProxyTests
 
   val invocationNamespace = EntityName("invocationSpace")
   val action = ExecutableWhiskAction(EntityPath("actionSpace"), EntityName("actionName"), exec)
+  val concurrentAction = ExecutableWhiskAction(
+    EntityPath("actionSpace"),
+    EntityName("actionName"),
+    exec,
+    limits = ActionLimits(concurrency = ConcurrencyLimit(20)))
 
   // create a transaction id to set the start time and control queue time
   val messageTransId = TransactionId(TransactionId.testing.meta.id)
@@ -110,23 +115,26 @@ class ContainerProxyTests
   }
 
   /** Run the common action on the state-machine, assumes good cases */
-  def run(machine: ActorRef, currentState: ContainerState) = {
+  def run(machine: ActorRef, currentState: ContainerState, expectInit: Boolean = true) = {
     machine ! Run(action, message)
     expectMsg(Transition(machine, currentState, Running))
-    expectWarmed(invocationNamespace.name, action)
+    if (expectInit) {
+      expectWarmed(invocationNamespace.name, action, 1)
+    }
+    expectWarmed(invocationNamespace.name, action, 0)
     expectMsg(Transition(machine, Running, Ready))
   }
 
   /** Expect a NeedWork message with prewarmed data */
   def expectPreWarmed(kind: String) = expectMsgPF() {
-    case NeedWork(PreWarmedData(_, kind, memoryLimit)) => true
+    case NeedWork(PreWarmedData(_, kind, memoryLimit, _)) => true
   }
 
   /** Expect a NeedWork message with warmed data */
-  def expectWarmed(namespace: String, action: ExecutableWhiskAction) = {
+  def expectWarmed(namespace: String, action: ExecutableWhiskAction, count: Int) = {
     val test = EntityName(namespace)
     expectMsgPF() {
-      case NeedWork(WarmedData(_, `test`, `action`, _)) => true
+      case a @ NeedWork(WarmedData(_, `test`, `action`, _, _)) if a.data.activeActivationCount == count => true
     }
   }
 
@@ -212,7 +220,7 @@ class ContainerProxyTests
     registerCallback(machine)
 
     preWarm(machine)
-    run(machine, Started)
+    run(machine, Started, true)
 
     // Timeout causes the container to pause
     timeout(machine)
@@ -251,7 +259,7 @@ class ContainerProxyTests
 
     run(machine, Started)
     // Note that there are no intermediate state changes
-    run(machine, Ready)
+    run(machine, Ready, false)
 
     awaitAssert {
       factory.calls should have size 1
@@ -300,7 +308,7 @@ class ContainerProxyTests
     run(machine, Started)
     timeout(machine)
     expectPause(machine)
-    run(machine, Paused)
+    run(machine, Paused, false)
 
     awaitAssert {
       factory.calls should have size 1
@@ -334,7 +342,7 @@ class ContainerProxyTests
         ContainerProxy
           .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
     registerCallback(machine)
-    run(machine, Uninitialized)
+    run(machine, Uninitialized, true)
 
     awaitAssert {
       factory.calls should have size 1
@@ -370,7 +378,8 @@ class ContainerProxyTests
 
     machine ! Run(noLogsAction, message)
     expectMsg(Transition(machine, Uninitialized, Running))
-    expectWarmed(invocationNamespace.name, noLogsAction)
+    expectWarmed(invocationNamespace.name, noLogsAction, 1)
+    expectWarmed(invocationNamespace.name, noLogsAction, 0)
     expectMsg(Transition(machine, Running, Ready))
 
     awaitAssert {
@@ -383,6 +392,97 @@ class ContainerProxyTests
     }
   }
 
+  //This tests concurrency from the ContainerPool perspective - where multiple Run messages may be sent to ContainerProxy
+  //without waiting for the completion of the previous Run message (signaled by NeedWork message)
+  //Multiple messages can only be handled after Warming.
+  it should "stay in Running state if others are still running" in within(timeout) {
+    val initPromise = Promise[Interval]()
+    val runPromises = Seq(
+      Promise[(Interval, ActivationResponse)](),
+      Promise[(Interval, ActivationResponse)](),
+      Promise[(Interval, ActivationResponse)](),
+      Promise[(Interval, ActivationResponse)]())
+    val container = new TestContainer(Some(initPromise), Some(runPromises))
+    val factory = createFactory(Future.successful(container))
+    val acker = createAcker(concurrentAction)
+    val store = createStore
+    val collector = createCollector()
+
+    val machine =
+      childActorOf(
+        ContainerProxy
+          .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+    registerCallback(machine)
+    preWarm(machine) //ends in Started state
+
+    machine ! Run(concurrentAction, message) //first in Started state
+    machine ! Run(concurrentAction, message) //second in Started or Running state
+
+    //first message go from Started -> Running -> Ready, with 2 NeedWork messages (1 for init, 1 for run)
+    //second message will be delayed until we get to Running state with WarmedData
+    //   (and will produce 1 NeedWork message after run)
+    expectMsg(Transition(machine, Started, Running))
+
+    //complete the init
+    initPromise complete Try(initInterval)
+    expectWarmed(invocationNamespace.name, concurrentAction, 1) //when init completes
+
+    //complete the first run
+    runPromises(0) complete Try(runInterval, ActivationResponse.success())
+    expectWarmed(invocationNamespace.name, concurrentAction, 0) //when first completes (count is 0 since stashed not counted)
+    expectMsg(Transition(machine, Running, Ready)) //wait for first to complete to skip the delay step that can only reliably be tested in single threaded
+    expectMsg(Transition(machine, Ready, Running)) //when second starts (after delay...)
+
+    //complete the second run
+    runPromises(1) complete Try(runInterval, ActivationResponse.success())
+    expectWarmed(invocationNamespace.name, concurrentAction, 0) //when second completes
+
+    //go back to ready after first and second runs are complete
+    expectMsg(Transition(machine, Running, Ready))
+
+    machine ! Run(concurrentAction, message) //third in Ready state
+    machine ! Run(concurrentAction, message) //fourth in Ready state
+
+    //third message will go from Ready -> Running -> Ready (after fourth run)
+    expectMsg(Transition(machine, Ready, Running))
+
+    //complete the third run
+    runPromises(2) complete Try(runInterval, ActivationResponse.success())
+    expectWarmed(invocationNamespace.name, concurrentAction, 1) //when third completes (stays in running)
+
+    //complete the fourth run
+    runPromises(3) complete Try(runInterval, ActivationResponse.success())
+    expectWarmed(invocationNamespace.name, concurrentAction, 0) //when fourth completes
+
+    //back to ready
+    expectMsg(Transition(machine, Running, Ready))
+
+    //timeout + pause after getting back to Ready
+    timeout(machine)
+    expectMsg(Transition(machine, Ready, Pausing))
+    expectMsg(Transition(machine, Pausing, Paused))
+
+    awaitAssert {
+      factory.calls should have size 1
+      container.initializeCount shouldBe 1
+      container.runCount shouldBe 4
+      collector.calls should have size 4
+      container.suspendCount shouldBe 1
+      container.resumeCount shouldBe 0
+      acker.calls should have size 4
+      store.calls should have size 4
+      acker
+        .calls(0)
+        ._2
+        .annotations
+        .get(WhiskActivation.initTimeAnnotation)
+        .get
+        .convertTo[Int] shouldBe initInterval.duration.toMillis
+      acker.calls(1)._2.annotations.get(WhiskActivation.initTimeAnnotation) shouldBe empty
+    }
+
+  }
+
   /*
    * ERROR CASES
    */
@@ -419,7 +519,8 @@ class ContainerProxyTests
   it should "complete the transaction and destroy the container on a failed init" in within(timeout) {
     val container = new TestContainer {
       override def initialize(initializer: JsObject,
-                              timeout: FiniteDuration)(implicit transid: TransactionId): Future[Interval] = {
+                              timeout: FiniteDuration,
+                              concurrent: Int)(implicit transid: TransactionId): Future[Interval] = {
         initializeCount += 1
         Future.failed(InitializationError(initInterval, ActivationResponse.applicationError("boom")))
       }
@@ -458,7 +559,7 @@ class ContainerProxyTests
 
   it should "complete the transaction and destroy the container on a failed run" in within(timeout) {
     val container = new TestContainer {
-      override def run(parameters: JsObject, environment: JsObject, timeout: FiniteDuration)(
+      override def run(parameters: JsObject, environment: JsObject, timeout: FiniteDuration, concurrent: Int)(
         implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
         runCount += 1
         Future.successful((initInterval, ActivationResponse.applicationError("boom")))
@@ -476,6 +577,7 @@ class ContainerProxyTests
     registerCallback(machine)
     machine ! Run(action, message)
     expectMsg(Transition(machine, Uninitialized, Running))
+    expectWarmed(invocationNamespace.name, action, 1)
     expectMsg(ContainerRemoved) // The message is sent as soon as the container decides to destroy itself
     expectMsg(Transition(machine, Running, Removing))
 
@@ -507,6 +609,7 @@ class ContainerProxyTests
     registerCallback(machine)
     machine ! Run(action, message)
     expectMsg(Transition(machine, Uninitialized, Running))
+    expectWarmed(invocationNamespace.name, action, 1)
     expectMsg(ContainerRemoved) // The message is sent as soon as the container decides to destroy itself
     expectMsg(Transition(machine, Running, Removing))
 
@@ -537,6 +640,7 @@ class ContainerProxyTests
     registerCallback(machine)
     machine ! Run(action, message)
     expectMsg(Transition(machine, Uninitialized, Running))
+    expectWarmed(invocationNamespace.name, action, 1)
     expectMsg(ContainerRemoved) // The message is sent as soon as the container decides to destroy itself
     expectMsg(Transition(machine, Running, Removing))
 
@@ -627,7 +731,8 @@ class ContainerProxyTests
     val initPromise = Promise[Interval]
     val container = new TestContainer {
       override def initialize(initializer: JsObject,
-                              timeout: FiniteDuration)(implicit transid: TransactionId): Future[Interval] = {
+                              timeout: FiniteDuration,
+                              concurrent: Int)(implicit transid: TransactionId): Future[Interval] = {
         initializeCount += 1
         initPromise.future
       }
@@ -652,7 +757,8 @@ class ContainerProxyTests
 
     // Finish /init, note that /run and log-collecting happens nonetheless
     initPromise.success(Interval.zero)
-    expectWarmed(invocationNamespace.name, action)
+    expectWarmed(invocationNamespace.name, action, 1)
+    expectWarmed(invocationNamespace.name, action, 0)
     expectMsg(Transition(machine, Running, Ready))
 
     // Remove the container after the transaction finished
@@ -725,7 +831,9 @@ class ContainerProxyTests
   /**
    * Implements all the good cases of a perfect run to facilitate error case overriding.
    */
-  class TestContainer extends Container {
+  class TestContainer(initPromise: Option[Promise[Interval]] = None,
+                      runPromises: Option[Seq[Promise[(Interval, ActivationResponse)]]] = None)
+      extends Container {
     protected val id = ContainerId("testcontainer")
     protected val addr = ContainerAddress("0.0.0.0")
     protected implicit val logging: Logging = log
@@ -749,14 +857,18 @@ class ContainerProxyTests
       destroyCount += 1
       super.destroy()
     }
-    override def initialize(initializer: JsObject, timeout: FiniteDuration)(
+    override def initialize(initializer: JsObject, timeout: FiniteDuration, concurrent: Int)(
       implicit transid: TransactionId): Future[Interval] = {
       initializeCount += 1
       initializer shouldBe action.containerInitializer
       timeout shouldBe action.limits.timeout.duration
-      Future.successful(initInterval)
+
+      initPromise match {
+        case Some(promise) => promise.future
+        case None          => Future.successful(initInterval)
+      }
     }
-    override def run(parameters: JsObject, environment: JsObject, timeout: FiniteDuration)(
+    override def run(parameters: JsObject, environment: JsObject, timeout: FiniteDuration, concurrent: Int)(
       implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
       runCount += 1
       environment.fields("api_key") shouldBe message.user.authkey.toJson
@@ -770,7 +882,12 @@ class ContainerProxyTests
       // a freshly computed deadline, as they get computed slightly after each other
       deadline should (be <= maxDeadline and be >= Instant.now)
 
-      Future.successful((runInterval, ActivationResponse.success()))
+      runPromises match {
+        case Some(promises) =>
+          promises(runCount - 1).future
+        case None =>
+          Future.successful((runInterval, ActivationResponse.success()))
+      }
     }
     def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId): Source[ByteString, Any] = ???
   }
diff --git a/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
index 609e773534..c9c8951d69 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
@@ -670,7 +670,12 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
     val content = WhiskActionPut(
       Some(action.exec),
       Some(action.parameters),
-      Some(ActionLimitsOption(Some(action.limits.timeout), Some(action.limits.memory), Some(action.limits.logs))))
+      Some(
+        ActionLimitsOption(
+          Some(action.limits.timeout),
+          Some(action.limits.memory),
+          Some(action.limits.logs),
+          Some(action.limits.concurrency))))
     Put(s"$collectionPath/${action.name}", content) ~> Route.seal(routes(creds)) ~> check {
       deleteAction(action.docid)
       status should be(OK)
@@ -699,7 +704,12 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
         val content = WhiskActionPut(
           Some(action.exec),
           Some(action.parameters),
-          Some(ActionLimitsOption(Some(action.limits.timeout), Some(action.limits.memory), Some(action.limits.logs))))
+          Some(
+            ActionLimitsOption(
+              Some(action.limits.timeout),
+              Some(action.limits.memory),
+              Some(action.limits.logs),
+              Some(action.limits.concurrency))))
 
         // first request invalidates any previous entries and caches new result
         Put(s"$collectionPath/${action.name}", content) ~> Route.seal(routes(creds)(transid())) ~> check {
@@ -785,7 +795,12 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
     val content = WhiskActionPut(
       Some(action.exec),
       Some(action.parameters),
-      Some(ActionLimitsOption(Some(action.limits.timeout), Some(action.limits.memory), Some(action.limits.logs))))
+      Some(
+        ActionLimitsOption(
+          Some(action.limits.timeout),
+          Some(action.limits.memory),
+          Some(action.limits.logs),
+          Some(action.limits.concurrency))))
     val name = action.name
     val cacheKey = s"${CacheKey(action)}".replace("(", "\\(").replace(")", "\\)")
     val expectedPutLog = Seq(
@@ -863,7 +878,12 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
     val content = WhiskActionPut(
       Some(action.exec),
       Some(action.parameters),
-      Some(ActionLimitsOption(Some(action.limits.timeout), Some(action.limits.memory), Some(action.limits.logs))))
+      Some(
+        ActionLimitsOption(
+          Some(action.limits.timeout),
+          Some(action.limits.memory),
+          Some(action.limits.logs),
+          Some(action.limits.concurrency))))
     val name = action.name
     val cacheKey = s"${CacheKey(action)}".replace("(", "\\(").replace(")", "\\)")
     val expectedGetLog = Seq(
@@ -914,7 +934,12 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
     val content = WhiskActionPut(
       Some(action.exec),
       Some(action.parameters),
-      Some(ActionLimitsOption(Some(action.limits.timeout), Some(action.limits.memory), Some(action.limits.logs))))
+      Some(
+        ActionLimitsOption(
+          Some(action.limits.timeout),
+          Some(action.limits.memory),
+          Some(action.limits.logs),
+          Some(action.limits.concurrency))))
     val name = action.name
     val cacheKey = s"${CacheKey(action)}".replace("(", "\\(").replace(")", "\\)")
     val expectedPutLog = Seq(
diff --git a/tests/src/test/scala/whisk/core/entity/test/SchemaTests.scala b/tests/src/test/scala/whisk/core/entity/test/SchemaTests.scala
index 6ee2b24d64..d52d55717d 100644
--- a/tests/src/test/scala/whisk/core/entity/test/SchemaTests.scala
+++ b/tests/src/test/scala/whisk/core/entity/test/SchemaTests.scala
@@ -681,11 +681,13 @@ class SchemaTests extends FlatSpec with BeforeAndAfter with ExecHelpers with Mat
       JsObject(
         "timeout" -> TimeLimit.STD_DURATION.toMillis.toInt.toJson,
         "memory" -> MemoryLimit.stdMemory.toMB.toInt.toJson,
-        "logs" -> LogLimit.stdLogSize.toMB.toInt.toJson),
+        "logs" -> LogLimit.stdLogSize.toMB.toInt.toJson,
+        "concurrency" -> ConcurrencyLimit.stdConcurrent.toInt.toJson),
       JsObject(
         "timeout" -> TimeLimit.STD_DURATION.toMillis.toInt.toJson,
         "memory" -> MemoryLimit.stdMemory.toMB.toInt.toJson,
         "logs" -> LogLimit.stdLogSize.toMB.toInt.toJson,
+        "concurrency" -> ConcurrencyLimit.stdConcurrent.toInt.toJson,
         "foo" -> "bar".toJson),
       JsObject(
         "timeout" -> TimeLimit.STD_DURATION.toMillis.toInt.toJson,
@@ -759,6 +761,11 @@ class SchemaTests extends FlatSpec with BeforeAndAfter with ExecHelpers with Mat
       TimeLimit(),
       MemoryLimit(),
       LogLimit(LogLimit.minLogSize - 1.B))
+    an[IllegalArgumentException] should be thrownBy ActionLimits(
+      TimeLimit(),
+      MemoryLimit(),
+      LogLimit(),
+      ConcurrencyLimit(ConcurrencyLimit.minConcurrent - 1))
 
     an[IllegalArgumentException] should be thrownBy ActionLimits(
       TimeLimit(TimeLimit.MAX_DURATION + 1.millisecond),
@@ -772,6 +779,11 @@ class SchemaTests extends FlatSpec with BeforeAndAfter with ExecHelpers with Mat
       TimeLimit(),
       MemoryLimit(),
       LogLimit(LogLimit.maxLogSize + 1.B))
+    an[IllegalArgumentException] should be thrownBy ActionLimits(
+      TimeLimit(),
+      MemoryLimit(),
+      LogLimit(),
+      ConcurrencyLimit(ConcurrencyLimit.maxConcurrent + 1))
   }
 
   it should "parse activation id as uuid" in {
diff --git a/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala b/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
index aa3caf8bee..4a6d420935 100644
--- a/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
+++ b/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
@@ -22,7 +22,6 @@ import akka.http.scaladsl.model.StatusCodes.BadGateway
 import java.io.File
 import java.io.PrintWriter
 import java.time.Instant
-
 import scala.concurrent.duration.{Duration, DurationInt}
 import scala.language.postfixOps
 import org.junit.runner.RunWith
@@ -37,6 +36,7 @@ import common.WskProps
 import common.WskTestHelpers
 import spray.json._
 import spray.json.DefaultJsonProtocol._
+import whisk.core.entity.ConcurrencyLimit
 import whisk.core.entity.{ActivationEntityLimit, ActivationResponse, ByteSize, Exec, LogLimit, MemoryLimit, TimeLimit}
 import whisk.core.entity.size._
 import whisk.http.Messages
@@ -64,14 +64,16 @@ class ActionLimitsTests extends TestHelpers with WskTestHelpers {
    * @param timeout the action timeout limit to be set in test
    * @param memory the action memory size limit to be set in test
    * @param logs the action log size limit to be set in test
+   * @param concurrency the action concurrency limit to be set in test
    * @param ec the expected exit code when creating the action
    */
   sealed case class PermutationTestParameter(timeout: Option[Duration] = None,
                                              memory: Option[ByteSize] = None,
                                              logs: Option[ByteSize] = None,
+                                             concurrency: Option[Int] = None,
                                              ec: Int = SUCCESS_EXIT) {
     override def toString: String =
-      s"timeout: ${toTimeoutString}, memory: ${toMemoryString}, logsize: ${toLogsString}"
+      s"timeout: ${toTimeoutString}, memory: ${toMemoryString}, logsize: ${toLogsString}, concurrency: ${toConcurrencyString}"
 
     val toTimeoutString = timeout match {
       case None                                    => "None"
@@ -102,7 +104,15 @@ class ActionLimitsTests extends TestHelpers with WskTestHelpers {
       case Some(l) if (l > LogLimit.maxLogSize) => s"${l} (> max)"
       case Some(l)                              => s"${l} (allowed)"
     }
-
+    val toConcurrencyString = concurrency match {
+      case None                                            => "None"
+      case Some(ConcurrencyLimit.minConcurrent)            => s"${ConcurrencyLimit.minConcurrent} (= min)"
+      case Some(ConcurrencyLimit.stdConcurrent)            => s"${ConcurrencyLimit.stdConcurrent} (= std)"
+      case Some(ConcurrencyLimit.maxConcurrent)            => s"${ConcurrencyLimit.maxConcurrent} (= max)"
+      case Some(c) if (c < ConcurrencyLimit.minConcurrent) => s"${c} (< min)"
+      case Some(c) if (c > ConcurrencyLimit.maxConcurrent) => s"${c} (> max)"
+      case Some(c)                                         => s"${c} (allowed)"
+    }
     val toExpectedResultString: String = if (ec == SUCCESS_EXIT) "allow" else "reject"
   }
 
@@ -111,20 +121,23 @@ class ActionLimitsTests extends TestHelpers with WskTestHelpers {
       time <- Seq(None, Some(TimeLimit.MIN_DURATION), Some(TimeLimit.MAX_DURATION))
       mem <- Seq(None, Some(MemoryLimit.minMemory), Some(MemoryLimit.maxMemory))
       log <- Seq(None, Some(LogLimit.minLogSize), Some(LogLimit.maxLogSize))
-    } yield PermutationTestParameter(time, mem, log)
+      concurrency <- Seq(None, Some(ConcurrencyLimit.minConcurrent), Some(ConcurrencyLimit.maxConcurrent))
+    } yield PermutationTestParameter(time, mem, log, concurrency)
   } ++
     // Add variations for negative tests
     Seq(
-      PermutationTestParameter(Some(0.milliseconds), None, None, BAD_REQUEST), // timeout that is lower than allowed
-      PermutationTestParameter(Some(TimeLimit.MAX_DURATION.plus(1 second)), None, None, BAD_REQUEST), // timeout that is slightly higher than allowed
-      PermutationTestParameter(Some(TimeLimit.MAX_DURATION * 10), None, None, BAD_REQUEST), // timeout that is much higher than allowed
-      PermutationTestParameter(None, Some(0.MB), None, BAD_REQUEST), // memory limit that is lower than allowed
-      PermutationTestParameter(None, Some(MemoryLimit.maxMemory + 1.MB), None, BAD_REQUEST), // memory limit that is slightly higher than allowed
-      PermutationTestParameter(None, Some((MemoryLimit.maxMemory.toMB * 5).MB), None, BAD_REQUEST), // memory limit that is much higher than allowed
-      PermutationTestParameter(None, None, Some((LogLimit.maxLogSize.toMB * 5).MB), BAD_REQUEST)) // log size limit that is much higher than allowed
+      PermutationTestParameter(Some(0.milliseconds), None, None, None, BAD_REQUEST), // timeout that is lower than allowed
+      PermutationTestParameter(Some(TimeLimit.MAX_DURATION.plus(1 second)), None, None, None, BAD_REQUEST), // timeout that is slightly higher than allowed
+      PermutationTestParameter(Some(TimeLimit.MAX_DURATION * 10), None, None, None, BAD_REQUEST), // timeout that is much higher than allowed
+      PermutationTestParameter(None, Some(0.MB), None, None, BAD_REQUEST), // memory limit that is lower than allowed
+      PermutationTestParameter(None, None, None, Some(0), BAD_REQUEST), // concurrency limit that is lower than allowed
+      PermutationTestParameter(None, Some(MemoryLimit.maxMemory + 1.MB), None, None, BAD_REQUEST), // memory limit that is slightly higher than allowed
+      PermutationTestParameter(None, Some((MemoryLimit.maxMemory.toMB * 5).MB), None, None, BAD_REQUEST), // memory limit that is much higher than allowed
+      PermutationTestParameter(None, None, Some((LogLimit.maxLogSize.toMB * 5).MB), None, BAD_REQUEST), // log size limit that is much higher than allowed
+      PermutationTestParameter(None, None, None, Some(Int.MaxValue), BAD_REQUEST)) // concurrency limit that is much higher than allowed
 
   /**
-   * Integration test to verify that valid timeout, memory and log size limits are accepted
+   * Integration test to verify that valid timeout, memory, log size, and concurrency limits are accepted
    * when creating an action while any invalid limit is rejected.
    *
    * At the first sight, this test looks like a typical unit test that should not be performed
@@ -141,7 +154,8 @@ class ActionLimitsTests extends TestHelpers with WskTestHelpers {
       val limits = JsObject(
         "timeout" -> parm.timeout.getOrElse(TimeLimit.STD_DURATION).toMillis.toJson,
         "memory" -> parm.memory.getOrElse(MemoryLimit.stdMemory).toMB.toInt.toJson,
-        "logs" -> parm.logs.getOrElse(LogLimit.stdLogSize).toMB.toInt.toJson)
+        "logs" -> parm.logs.getOrElse(LogLimit.stdLogSize).toMB.toInt.toJson,
+        "concurrency" -> parm.concurrency.getOrElse(ConcurrencyLimit.stdConcurrent).toJson)
 
       val name = "ActionLimitTests-" + Instant.now.toEpochMilli
       val createResult = assetHelper.withCleaner(wsk.action, name, confirmDelete = (parm.ec == SUCCESS_EXIT)) {
@@ -152,6 +166,7 @@ class ActionLimitsTests extends TestHelpers with WskTestHelpers {
             logsize = parm.logs,
             memory = parm.memory,
             timeout = parm.timeout,
+            concurrency = parm.concurrency,
             expectedExitCode = DONTCARE_EXIT)
           withClue(s"Unexpected result when creating action '${name}':\n${result.toString}\nFailed assertion:") {
             result.exitCode should be(parm.ec)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services