You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/01/23 17:19:59 UTC

[GitHub] rabbah closed pull request #3209: Make payload limit configurable.

rabbah closed pull request #3209: Make payload limit configurable.
URL: https://github.com/apache/incubator-openwhisk/pull/3209
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml
index 671aa171b3..e67343850a 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -122,6 +122,8 @@
       "CONFIG_whisk_memory_max": "{{ limit_action_memory_max | default() }}"
       "CONFIG_whisk_memory_std": "{{ limit_action_memory_std | default() }}"
 
+      "CONFIG_whisk_activation_payload_max": "{{ limit_activation_payload | default() }}"
+
       "RUNTIMES_MANIFEST": "{{ runtimesManifest | to_json }}"
       "CONTROLLER_LOCALBOOKKEEPING": "{{ controller.localBookkeeping }}"
       "AKKA_CLUSTER_SEED_NODES": "{{seed_nodes_list | join(' ') }}"
diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index f274adcf7c..c215e395a8 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -187,6 +187,7 @@
         -e CONFIG_whisk_memory_min='{{ limit_action_memory_min | default() }}'
         -e CONFIG_whisk_memory_max='{{ limit_action_memory_max | default() }}'
         -e CONFIG_whisk_memory_std='{{ limit_action_memory_std | default() }}'
+        -e CONFIG_whisk_activation_payload_max='{{ limit_activation_payload | default() }}'
         -v /sys/fs/cgroup:/sys/fs/cgroup
         -v /run/runc:/run/runc
         -v {{ whisk_logs_dir }}/invoker{{ groups['invokers'].index(inventory_hostname) }}:/logs
diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index 0288106c77..97b1e63a30 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -53,6 +53,12 @@ whisk {
     # kafka related configuration
     kafka {
         replication-factor = 1
+
+        producer {
+            acks = 1
+            max-request-size = ${whisk.activation.payload.max}
+        }
+
         topics {
             cache-invalidation {
                 segment-bytes   =  536870912
@@ -70,9 +76,10 @@ whisk {
                 retention-ms    = 3600000
             }
             invoker {
-                segment-bytes   =  536870912
-                retention-bytes = 1073741824
-                retention-ms    = 172800000
+                segment-bytes     =  536870912
+                retention-bytes   = 1073741824
+                retention-ms      =  172800000
+                max-message-bytes = ${whisk.activation.payload.max}
             }
         }
     }
@@ -88,6 +95,12 @@ whisk {
         stride = ${?CONTROLLER_INSTANCES}
     }
 
+    activation {
+        payload {
+            max = 1048576 // 5 m not possible because cross-referenced to kafka configurations
+        }
+    }
+
     # action memory configuration
     memory {
         min = 128 m
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
index 7fd82c507f..ac148cd554 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
@@ -40,15 +40,6 @@ import pureconfig._
 
 case class KafkaConfig(replicationFactor: Short)
 
-case class TopicConfig(segmentBytes: Long, retentionBytes: Long, retentionMs: Long) {
-  def toMap: Map[String, String] = {
-    Map(
-      "retention.bytes" -> retentionBytes.toString,
-      "retention.ms" -> retentionMs.toString,
-      "segment.bytes" -> segmentBytes.toString)
-  }
-}
-
 /**
  * A Kafka based implementation of MessagingProvider
  */
@@ -62,12 +53,13 @@ object KafkaMessagingProvider extends MessagingProvider {
 
   def ensureTopic(config: WhiskConfig, topic: String, topicConfig: String)(implicit logging: Logging): Boolean = {
     val kc = loadConfigOrThrow[KafkaConfig](ConfigKeys.kafka)
-    val tc = loadConfigOrThrow[TopicConfig](ConfigKeys.kafkaTopics + s".$topicConfig")
+    val tc = KafkaConfiguration.configMapToKafkaConfig(
+      loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaTopics + s".$topicConfig"))
     val props = new Properties
     props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaHosts)
     val client = AdminClient.create(props)
     val numPartitions = 1
-    val nt = new NewTopic(topic, numPartitions, kc.replicationFactor).configs(tc.toMap.asJava)
+    val nt = new NewTopic(topic, numPartitions, kc.replicationFactor).configs(tc.asJava)
     val results = client.createTopics(List(nt).asJava)
     try {
       results.values().get(topic).get()
@@ -85,3 +77,11 @@ object KafkaMessagingProvider extends MessagingProvider {
     }
   }
 }
+
+object KafkaConfiguration {
+  def configToKafkaKey(configKey: String) = configKey.replace("-", ".")
+
+  def configMapToKafkaConfig(configMap: Map[String, String]) = configMap.map {
+    case (key, value) => configToKafkaKey(key) -> value
+  }
+}
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
index 18b38249bd..47dce3a2bb 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
@@ -36,6 +36,8 @@ import whisk.common.Logging
 import whisk.core.connector.Message
 import whisk.core.connector.MessageProducer
 import whisk.core.entity.UUIDs
+import pureconfig._
+import whisk.core.ConfigKeys
 
 class KafkaProducerConnector(kafkahosts: String,
                              implicit val executionContext: ExecutionContext,
@@ -86,7 +88,11 @@ class KafkaProducerConnector(kafkahosts: String,
   private def getProps: Properties = {
     val props = new Properties
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkahosts)
-    props.put(ProducerConfig.ACKS_CONFIG, 1.toString)
+
+    // Load additional config from the config files and add them here.
+    val config =
+      KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaProducer))
+    config.foreach { case (key, value) => props.put(key, value) }
     props
   }
 
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 61b7f2a7cd..b306cdbe22 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -233,9 +233,12 @@ object ConfigKeys {
   val loadbalancer = "whisk.loadbalancer"
 
   val kafka = "whisk.kafka"
+  val kafkaProducer = s"$kafka.producer"
   val kafkaTopics = s"$kafka.topics"
 
   val memory = "whisk.memory"
+  val activation = "whisk.activation"
+  val activationPayload = s"$activation.payload"
 
   val db = "whisk.db"
 
diff --git a/common/scala/src/main/scala/whisk/core/entity/ActivationEntityLimit.scala b/common/scala/src/main/scala/whisk/core/entity/ActivationEntityLimit.scala
index 5c149db476..4332a3bb34 100644
--- a/common/scala/src/main/scala/whisk/core/entity/ActivationEntityLimit.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/ActivationEntityLimit.scala
@@ -17,7 +17,11 @@
 
 package whisk.core.entity
 
-import whisk.core.entity.size.SizeInt
+import pureconfig._
+import whisk.core.ConfigKeys
+import whisk.core.entity.size.SizeLong
+
+case class ActivationEntityLimitConf(max: ByteSize)
 
 /**
  * ActivationEntityLimit defines the limits on the input/output payloads for actions
@@ -25,5 +29,8 @@ import whisk.core.entity.size.SizeInt
  * parameters for triggers.
  */
 protected[core] object ActivationEntityLimit {
-  protected[core] val MAX_ACTIVATION_ENTITY_LIMIT = 1.MB
+  private implicit val pureconfigLongReader: ConfigReader[ByteSize] = ConfigReader[Long].map(_.bytes)
+  private val config = loadConfigOrThrow[ActivationEntityLimitConf](ConfigKeys.activationPayload)
+
+  protected[core] val MAX_ACTIVATION_ENTITY_LIMIT: ByteSize = config.max
 }
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
index a4c3418b09..265a45031d 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
@@ -31,7 +31,7 @@ import whisk.common.Logging
 import whisk.common.TransactionId
 import whisk.core.containerpool._
 import whisk.core.entity.ActivationResponse.{ConnectionError, MemoryExhausted}
-import whisk.core.entity.ByteSize
+import whisk.core.entity.{ActivationEntityLimit, ByteSize}
 import whisk.core.entity.size._
 import akka.stream.scaladsl.{Framing, Source}
 import akka.stream.stage._
@@ -189,7 +189,7 @@ class DockerContainer(protected val id: ContainerId,
     implicit transid: TransactionId): Future[RunResult] = {
     val started = Instant.now()
     val http = httpConnection.getOrElse {
-      val conn = new HttpUtils(s"${addr.host}:${addr.port}", timeout, 1.MB)
+      val conn = new HttpUtils(s"${addr.host}:${addr.port}", timeout, ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT)
       httpConnection = Some(conn)
       conn
     }
diff --git a/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala b/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
index b98b711e65..5c77f30ec9 100644
--- a/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
+++ b/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
@@ -118,6 +118,28 @@ class ActionLimitsTests extends TestHelpers with WskTestHelpers {
       }
   }
 
+  it should s"successfully invoke an action with a payload close to the limit (${ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT.toMB} MB)" in withAssetCleaner(
+    wskprops) { (wp, assetHelper) =>
+    val name = "TestActionCausingJustInBoundaryResult"
+    assetHelper.withCleaner(wsk.action, name) {
+      val actionName = TestUtils.getTestActionFilename("echo.js")
+      (action, _) =>
+        action.create(name, Some(actionName), timeout = Some(15.seconds))
+    }
+
+    val allowedSize = ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT.toBytes
+
+    // Needs some bytes grace since activation message is not only the payload.
+    val args = Map("p" -> ("a" * (allowedSize - 700).toInt).toJson)
+    val rr = wsk.action.invoke(name, args, blocking = true, expectedExitCode = TestUtils.SUCCESS_EXIT)
+    val activation = wsk.parseJsonString(rr.respData).convertTo[ActivationResult]
+
+    activation.response.success shouldBe true
+
+    // The payload is echoed and thus the backchannel supports the limit as well.
+    activation.response.result shouldBe Some(args.toJson)
+  }
+
   Seq(true, false).foreach { blocking =>
     it should s"succeed but truncate result, if result exceeds its limit (blocking: $blocking)" in withAssetCleaner(
       wskprops) { (wp, assetHelper) =>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services