You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2018/01/23 17:19:58 UTC
[incubator-openwhisk] branch master updated: Make payload limit
configurable. (#3209)
This is an automated email from the ASF dual-hosted git repository.
rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new a891e06 Make payload limit configurable. (#3209)
a891e06 is described below
commit a891e067157848103df56c92daa3a143666425b5
Author: Markus Thömmes <ma...@me.com>
AuthorDate: Tue Jan 23 18:19:55 2018 +0100
Make payload limit configurable. (#3209)
Lifts hardcoded activation payload and size to a deployment parameter.
---
ansible/roles/controller/tasks/deploy.yml | 2 ++
ansible/roles/invoker/tasks/deploy.yml | 1 +
common/scala/src/main/resources/application.conf | 19 ++++++++++++++++---
.../connector/kafka/KafkaMessagingProvider.scala | 22 +++++++++++-----------
.../connector/kafka/KafkaProducerConnector.scala | 8 +++++++-
.../src/main/scala/whisk/core/WhiskConfig.scala | 3 +++
.../whisk/core/entity/ActivationEntityLimit.scala | 11 +++++++++--
.../containerpool/docker/DockerContainer.scala | 4 ++--
.../whisk/core/limits/ActionLimitsTests.scala | 22 ++++++++++++++++++++++
9 files changed, 73 insertions(+), 19 deletions(-)
diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml
index 671aa17..e673438 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -122,6 +122,8 @@
"CONFIG_whisk_memory_max": "{{ limit_action_memory_max | default() }}"
"CONFIG_whisk_memory_std": "{{ limit_action_memory_std | default() }}"
+ "CONFIG_whisk_activation_payload_max": "{{ limit_activation_payload | default() }}"
+
"RUNTIMES_MANIFEST": "{{ runtimesManifest | to_json }}"
"CONTROLLER_LOCALBOOKKEEPING": "{{ controller.localBookkeeping }}"
"AKKA_CLUSTER_SEED_NODES": "{{seed_nodes_list | join(' ') }}"
diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index f274adc..c215e39 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -187,6 +187,7 @@
-e CONFIG_whisk_memory_min='{{ limit_action_memory_min | default() }}'
-e CONFIG_whisk_memory_max='{{ limit_action_memory_max | default() }}'
-e CONFIG_whisk_memory_std='{{ limit_action_memory_std | default() }}'
+ -e CONFIG_whisk_activation_payload_max='{{ limit_activation_payload | default() }}'
-v /sys/fs/cgroup:/sys/fs/cgroup
-v /run/runc:/run/runc
-v {{ whisk_logs_dir }}/invoker{{ groups['invokers'].index(inventory_hostname) }}:/logs
diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index 0288106..97b1e63 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -53,6 +53,12 @@ whisk {
# kafka related configuration
kafka {
replication-factor = 1
+
+ producer {
+ acks = 1
+ max-request-size = ${whisk.activation.payload.max}
+ }
+
topics {
cache-invalidation {
segment-bytes = 536870912
@@ -70,9 +76,10 @@ whisk {
retention-ms = 3600000
}
invoker {
- segment-bytes = 536870912
- retention-bytes = 1073741824
- retention-ms = 172800000
+ segment-bytes = 536870912
+ retention-bytes = 1073741824
+ retention-ms = 172800000
+ max-message-bytes = ${whisk.activation.payload.max}
}
}
}
@@ -88,6 +95,12 @@ whisk {
stride = ${?CONTROLLER_INSTANCES}
}
+ activation {
+ payload {
+ max = 1048576 // 5 m not possible because cross-referenced to kafka configurations
+ }
+ }
+
# action memory configuration
memory {
min = 128 m
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
index 7fd82c5..ac148cd 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
@@ -40,15 +40,6 @@ import pureconfig._
case class KafkaConfig(replicationFactor: Short)
-case class TopicConfig(segmentBytes: Long, retentionBytes: Long, retentionMs: Long) {
- def toMap: Map[String, String] = {
- Map(
- "retention.bytes" -> retentionBytes.toString,
- "retention.ms" -> retentionMs.toString,
- "segment.bytes" -> segmentBytes.toString)
- }
-}
-
/**
* A Kafka based implementation of MessagingProvider
*/
@@ -62,12 +53,13 @@ object KafkaMessagingProvider extends MessagingProvider {
def ensureTopic(config: WhiskConfig, topic: String, topicConfig: String)(implicit logging: Logging): Boolean = {
val kc = loadConfigOrThrow[KafkaConfig](ConfigKeys.kafka)
- val tc = loadConfigOrThrow[TopicConfig](ConfigKeys.kafkaTopics + s".$topicConfig")
+ val tc = KafkaConfiguration.configMapToKafkaConfig(
+ loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaTopics + s".$topicConfig"))
val props = new Properties
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaHosts)
val client = AdminClient.create(props)
val numPartitions = 1
- val nt = new NewTopic(topic, numPartitions, kc.replicationFactor).configs(tc.toMap.asJava)
+ val nt = new NewTopic(topic, numPartitions, kc.replicationFactor).configs(tc.asJava)
val results = client.createTopics(List(nt).asJava)
try {
results.values().get(topic).get()
@@ -85,3 +77,11 @@ object KafkaMessagingProvider extends MessagingProvider {
}
}
}
+
+object KafkaConfiguration {
+ def configToKafkaKey(configKey: String) = configKey.replace("-", ".")
+
+ def configMapToKafkaConfig(configMap: Map[String, String]) = configMap.map {
+ case (key, value) => configToKafkaKey(key) -> value
+ }
+}
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
index 18b3824..47dce3a 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
@@ -36,6 +36,8 @@ import whisk.common.Logging
import whisk.core.connector.Message
import whisk.core.connector.MessageProducer
import whisk.core.entity.UUIDs
+import pureconfig._
+import whisk.core.ConfigKeys
class KafkaProducerConnector(kafkahosts: String,
implicit val executionContext: ExecutionContext,
@@ -86,7 +88,11 @@ class KafkaProducerConnector(kafkahosts: String,
private def getProps: Properties = {
val props = new Properties
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkahosts)
- props.put(ProducerConfig.ACKS_CONFIG, 1.toString)
+
+ // Load additional config from the config files and add them here.
+ val config =
+ KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaProducer))
+ config.foreach { case (key, value) => props.put(key, value) }
props
}
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 61b7f2a..b306cdb 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -233,9 +233,12 @@ object ConfigKeys {
val loadbalancer = "whisk.loadbalancer"
val kafka = "whisk.kafka"
+ val kafkaProducer = s"$kafka.producer"
val kafkaTopics = s"$kafka.topics"
val memory = "whisk.memory"
+ val activation = "whisk.activation"
+ val activationPayload = s"$activation.payload"
val db = "whisk.db"
diff --git a/common/scala/src/main/scala/whisk/core/entity/ActivationEntityLimit.scala b/common/scala/src/main/scala/whisk/core/entity/ActivationEntityLimit.scala
index 5c149db..4332a3b 100644
--- a/common/scala/src/main/scala/whisk/core/entity/ActivationEntityLimit.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/ActivationEntityLimit.scala
@@ -17,7 +17,11 @@
package whisk.core.entity
-import whisk.core.entity.size.SizeInt
+import pureconfig._
+import whisk.core.ConfigKeys
+import whisk.core.entity.size.SizeLong
+
+case class ActivationEntityLimitConf(max: ByteSize)
/**
* ActivationEntityLimit defines the limits on the input/output payloads for actions
@@ -25,5 +29,8 @@ import whisk.core.entity.size.SizeInt
* parameters for triggers.
*/
protected[core] object ActivationEntityLimit {
- protected[core] val MAX_ACTIVATION_ENTITY_LIMIT = 1.MB
+ private implicit val pureconfigLongReader: ConfigReader[ByteSize] = ConfigReader[Long].map(_.bytes)
+ private val config = loadConfigOrThrow[ActivationEntityLimitConf](ConfigKeys.activationPayload)
+
+ protected[core] val MAX_ACTIVATION_ENTITY_LIMIT: ByteSize = config.max
}
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
index a4c3418..265a450 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
@@ -31,7 +31,7 @@ import whisk.common.Logging
import whisk.common.TransactionId
import whisk.core.containerpool._
import whisk.core.entity.ActivationResponse.{ConnectionError, MemoryExhausted}
-import whisk.core.entity.ByteSize
+import whisk.core.entity.{ActivationEntityLimit, ByteSize}
import whisk.core.entity.size._
import akka.stream.scaladsl.{Framing, Source}
import akka.stream.stage._
@@ -189,7 +189,7 @@ class DockerContainer(protected val id: ContainerId,
implicit transid: TransactionId): Future[RunResult] = {
val started = Instant.now()
val http = httpConnection.getOrElse {
- val conn = new HttpUtils(s"${addr.host}:${addr.port}", timeout, 1.MB)
+ val conn = new HttpUtils(s"${addr.host}:${addr.port}", timeout, ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT)
httpConnection = Some(conn)
conn
}
diff --git a/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala b/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
index b98b711..5c77f30 100644
--- a/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
+++ b/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
@@ -118,6 +118,28 @@ class ActionLimitsTests extends TestHelpers with WskTestHelpers {
}
}
+ it should s"successfully invoke an action with a payload close to the limit (${ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT.toMB} MB)" in withAssetCleaner(
+ wskprops) { (wp, assetHelper) =>
+ val name = "TestActionCausingJustInBoundaryResult"
+ assetHelper.withCleaner(wsk.action, name) {
+ val actionName = TestUtils.getTestActionFilename("echo.js")
+ (action, _) =>
+ action.create(name, Some(actionName), timeout = Some(15.seconds))
+ }
+
+ val allowedSize = ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT.toBytes
+
+ // Needs some bytes grace since activation message is not only the payload.
+ val args = Map("p" -> ("a" * (allowedSize - 700).toInt).toJson)
+ val rr = wsk.action.invoke(name, args, blocking = true, expectedExitCode = TestUtils.SUCCESS_EXIT)
+ val activation = wsk.parseJsonString(rr.respData).convertTo[ActivationResult]
+
+ activation.response.success shouldBe true
+
+ // The payload is echoed and thus the backchannel supports the limit as well.
+ activation.response.result shouldBe Some(args.toJson)
+ }
+
Seq(true, false).foreach { blocking =>
it should s"succeed but truncate result, if result exceeds its limit (blocking: $blocking)" in withAssetCleaner(
wskprops) { (wp, assetHelper) =>
--
To stop receiving notification emails like this one, please contact
rabbah@apache.org.