You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2018/01/23 17:19:58 UTC

[incubator-openwhisk] branch master updated: Make payload limit configurable. (#3209)

This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new a891e06  Make payload limit configurable. (#3209)
a891e06 is described below

commit a891e067157848103df56c92daa3a143666425b5
Author: Markus Thömmes <ma...@me.com>
AuthorDate: Tue Jan 23 18:19:55 2018 +0100

    Make payload limit configurable. (#3209)
    
    Lifts hardcoded activation payload and size to a deployment parameter.
---
 ansible/roles/controller/tasks/deploy.yml          |  2 ++
 ansible/roles/invoker/tasks/deploy.yml             |  1 +
 common/scala/src/main/resources/application.conf   | 19 ++++++++++++++++---
 .../connector/kafka/KafkaMessagingProvider.scala   | 22 +++++++++++-----------
 .../connector/kafka/KafkaProducerConnector.scala   |  8 +++++++-
 .../src/main/scala/whisk/core/WhiskConfig.scala    |  3 +++
 .../whisk/core/entity/ActivationEntityLimit.scala  | 11 +++++++++--
 .../containerpool/docker/DockerContainer.scala     |  4 ++--
 .../whisk/core/limits/ActionLimitsTests.scala      | 22 ++++++++++++++++++++++
 9 files changed, 73 insertions(+), 19 deletions(-)

diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml
index 671aa17..e673438 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -122,6 +122,8 @@
       "CONFIG_whisk_memory_max": "{{ limit_action_memory_max | default() }}"
       "CONFIG_whisk_memory_std": "{{ limit_action_memory_std | default() }}"
 
+      "CONFIG_whisk_activation_payload_max": "{{ limit_activation_payload | default() }}"
+
       "RUNTIMES_MANIFEST": "{{ runtimesManifest | to_json }}"
       "CONTROLLER_LOCALBOOKKEEPING": "{{ controller.localBookkeeping }}"
       "AKKA_CLUSTER_SEED_NODES": "{{seed_nodes_list | join(' ') }}"
diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index f274adc..c215e39 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -187,6 +187,7 @@
         -e CONFIG_whisk_memory_min='{{ limit_action_memory_min | default() }}'
         -e CONFIG_whisk_memory_max='{{ limit_action_memory_max | default() }}'
         -e CONFIG_whisk_memory_std='{{ limit_action_memory_std | default() }}'
+        -e CONFIG_whisk_activation_payload_max='{{ limit_activation_payload | default() }}'
         -v /sys/fs/cgroup:/sys/fs/cgroup
         -v /run/runc:/run/runc
         -v {{ whisk_logs_dir }}/invoker{{ groups['invokers'].index(inventory_hostname) }}:/logs
diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index 0288106..97b1e63 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -53,6 +53,12 @@ whisk {
     # kafka related configuration
     kafka {
         replication-factor = 1
+
+        producer {
+            acks = 1
+            max-request-size = ${whisk.activation.payload.max}
+        }
+
         topics {
             cache-invalidation {
                 segment-bytes   =  536870912
@@ -70,9 +76,10 @@ whisk {
                 retention-ms    = 3600000
             }
             invoker {
-                segment-bytes   =  536870912
-                retention-bytes = 1073741824
-                retention-ms    = 172800000
+                segment-bytes     =  536870912
+                retention-bytes   = 1073741824
+                retention-ms      =  172800000
+                max-message-bytes = ${whisk.activation.payload.max}
             }
         }
     }
@@ -88,6 +95,12 @@ whisk {
         stride = ${?CONTROLLER_INSTANCES}
     }
 
+    activation {
+        payload {
+            max = 1048576 // 5 m not possible because cross-referenced to kafka configurations
+        }
+    }
+
     # action memory configuration
     memory {
         min = 128 m
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
index 7fd82c5..ac148cd 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
@@ -40,15 +40,6 @@ import pureconfig._
 
 case class KafkaConfig(replicationFactor: Short)
 
-case class TopicConfig(segmentBytes: Long, retentionBytes: Long, retentionMs: Long) {
-  def toMap: Map[String, String] = {
-    Map(
-      "retention.bytes" -> retentionBytes.toString,
-      "retention.ms" -> retentionMs.toString,
-      "segment.bytes" -> segmentBytes.toString)
-  }
-}
-
 /**
  * A Kafka based implementation of MessagingProvider
  */
@@ -62,12 +53,13 @@ object KafkaMessagingProvider extends MessagingProvider {
 
   def ensureTopic(config: WhiskConfig, topic: String, topicConfig: String)(implicit logging: Logging): Boolean = {
     val kc = loadConfigOrThrow[KafkaConfig](ConfigKeys.kafka)
-    val tc = loadConfigOrThrow[TopicConfig](ConfigKeys.kafkaTopics + s".$topicConfig")
+    val tc = KafkaConfiguration.configMapToKafkaConfig(
+      loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaTopics + s".$topicConfig"))
     val props = new Properties
     props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaHosts)
     val client = AdminClient.create(props)
     val numPartitions = 1
-    val nt = new NewTopic(topic, numPartitions, kc.replicationFactor).configs(tc.toMap.asJava)
+    val nt = new NewTopic(topic, numPartitions, kc.replicationFactor).configs(tc.asJava)
     val results = client.createTopics(List(nt).asJava)
     try {
       results.values().get(topic).get()
@@ -85,3 +77,11 @@ object KafkaMessagingProvider extends MessagingProvider {
     }
   }
 }
+
+object KafkaConfiguration {
+  def configToKafkaKey(configKey: String) = configKey.replace("-", ".")
+
+  def configMapToKafkaConfig(configMap: Map[String, String]) = configMap.map {
+    case (key, value) => configToKafkaKey(key) -> value
+  }
+}
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
index 18b3824..47dce3a 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
@@ -36,6 +36,8 @@ import whisk.common.Logging
 import whisk.core.connector.Message
 import whisk.core.connector.MessageProducer
 import whisk.core.entity.UUIDs
+import pureconfig._
+import whisk.core.ConfigKeys
 
 class KafkaProducerConnector(kafkahosts: String,
                              implicit val executionContext: ExecutionContext,
@@ -86,7 +88,11 @@ class KafkaProducerConnector(kafkahosts: String,
   private def getProps: Properties = {
     val props = new Properties
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkahosts)
-    props.put(ProducerConfig.ACKS_CONFIG, 1.toString)
+
+    // Load additional config from the config files and add them here.
+    val config =
+      KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaProducer))
+    config.foreach { case (key, value) => props.put(key, value) }
     props
   }
 
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 61b7f2a..b306cdb 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -233,9 +233,12 @@ object ConfigKeys {
   val loadbalancer = "whisk.loadbalancer"
 
   val kafka = "whisk.kafka"
+  val kafkaProducer = s"$kafka.producer"
   val kafkaTopics = s"$kafka.topics"
 
   val memory = "whisk.memory"
+  val activation = "whisk.activation"
+  val activationPayload = s"$activation.payload"
 
   val db = "whisk.db"
 
diff --git a/common/scala/src/main/scala/whisk/core/entity/ActivationEntityLimit.scala b/common/scala/src/main/scala/whisk/core/entity/ActivationEntityLimit.scala
index 5c149db..4332a3b 100644
--- a/common/scala/src/main/scala/whisk/core/entity/ActivationEntityLimit.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/ActivationEntityLimit.scala
@@ -17,7 +17,11 @@
 
 package whisk.core.entity
 
-import whisk.core.entity.size.SizeInt
+import pureconfig._
+import whisk.core.ConfigKeys
+import whisk.core.entity.size.SizeLong
+
+case class ActivationEntityLimitConf(max: ByteSize)
 
 /**
  * ActivationEntityLimit defines the limits on the input/output payloads for actions
@@ -25,5 +29,8 @@ import whisk.core.entity.size.SizeInt
  * parameters for triggers.
  */
 protected[core] object ActivationEntityLimit {
-  protected[core] val MAX_ACTIVATION_ENTITY_LIMIT = 1.MB
+  private implicit val pureconfigLongReader: ConfigReader[ByteSize] = ConfigReader[Long].map(_.bytes)
+  private val config = loadConfigOrThrow[ActivationEntityLimitConf](ConfigKeys.activationPayload)
+
+  protected[core] val MAX_ACTIVATION_ENTITY_LIMIT: ByteSize = config.max
 }
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
index a4c3418..265a450 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
@@ -31,7 +31,7 @@ import whisk.common.Logging
 import whisk.common.TransactionId
 import whisk.core.containerpool._
 import whisk.core.entity.ActivationResponse.{ConnectionError, MemoryExhausted}
-import whisk.core.entity.ByteSize
+import whisk.core.entity.{ActivationEntityLimit, ByteSize}
 import whisk.core.entity.size._
 import akka.stream.scaladsl.{Framing, Source}
 import akka.stream.stage._
@@ -189,7 +189,7 @@ class DockerContainer(protected val id: ContainerId,
     implicit transid: TransactionId): Future[RunResult] = {
     val started = Instant.now()
     val http = httpConnection.getOrElse {
-      val conn = new HttpUtils(s"${addr.host}:${addr.port}", timeout, 1.MB)
+      val conn = new HttpUtils(s"${addr.host}:${addr.port}", timeout, ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT)
       httpConnection = Some(conn)
       conn
     }
diff --git a/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala b/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
index b98b711..5c77f30 100644
--- a/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
+++ b/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
@@ -118,6 +118,28 @@ class ActionLimitsTests extends TestHelpers with WskTestHelpers {
       }
   }
 
+  it should s"successfully invoke an action with a payload close to the limit (${ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT.toMB} MB)" in withAssetCleaner(
+    wskprops) { (wp, assetHelper) =>
+    val name = "TestActionCausingJustInBoundaryResult"
+    assetHelper.withCleaner(wsk.action, name) {
+      val actionName = TestUtils.getTestActionFilename("echo.js")
+      (action, _) =>
+        action.create(name, Some(actionName), timeout = Some(15.seconds))
+    }
+
+    val allowedSize = ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT.toBytes
+
+    // Needs some bytes grace since activation message is not only the payload.
+    val args = Map("p" -> ("a" * (allowedSize - 700).toInt).toJson)
+    val rr = wsk.action.invoke(name, args, blocking = true, expectedExitCode = TestUtils.SUCCESS_EXIT)
+    val activation = wsk.parseJsonString(rr.respData).convertTo[ActivationResult]
+
+    activation.response.success shouldBe true
+
+    // The payload is echoed and thus the backchannel supports the limit as well.
+    activation.response.result shouldBe Some(args.toJson)
+  }
+
   Seq(true, false).foreach { blocking =>
     it should s"succeed but truncate result, if result exceeds its limit (blocking: $blocking)" in withAssetCleaner(
       wskprops) { (wp, assetHelper) =>

-- 
To stop receiving notification emails like this one, please contact
rabbah@apache.org.