You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2020/09/02 09:08:52 UTC

[incubator-streampipes] branch dev updated: [STREAMPIPES-130] Add max.request.size to Kafka producer config

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new cccdfb4  [STREAMPIPES-130] Add max.request.size to Kafka producer config
cccdfb4 is described below

commit cccdfb405a821d0b9f08a04f63b6a7476f71a40b
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Wed Sep 2 11:06:09 2020 +0200

    [STREAMPIPES-130] Add max.request.size to Kafka producer config
---
 .../messaging/kafka/config/ProducerConfigFactory.java  |  3 +++
 .../model/grounding/KafkaTransportProtocol.java        | 18 ++++++++++++++++++
 .../org/apache/streampipes/vocabulary/StreamPipes.java |  2 ++
 3 files changed, 23 insertions(+)

diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ProducerConfigFactory.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ProducerConfigFactory.java
index e954759..5d18650 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ProducerConfigFactory.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ProducerConfigFactory.java
@@ -29,6 +29,7 @@ public class ProducerConfigFactory extends AbstractConfigFactory {
   private static final Integer BATCH_SIZE_CONFIG_DEFAULT = 1638400;
   private static final Integer LINGER_MS_DEFAULT = 20;
   private static final Integer BUFFER_MEMORY_CONFIG_DEFAULT = 33554432;
+  private static final Integer MAX_REQUEST_SIZE_CONFIG_DEFAULT = 5000012;
 
   private static final String KEY_SERIALIZER_DEFAULT = "org.apache.kafka.common.serialization" +
           ".StringSerializer";
@@ -51,6 +52,8 @@ public class ProducerConfigFactory extends AbstractConfigFactory {
             getConfigOrDefault(protocol::getBatchSize, BATCH_SIZE_CONFIG_DEFAULT));
     props.put(ProducerConfig.LINGER_MS_CONFIG,
             getConfigOrDefault(protocol::getLingerMs, LINGER_MS_DEFAULT));
+    props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, getConfigOrDefault(protocol::getMaxRequestSize,
+            MAX_REQUEST_SIZE_CONFIG_DEFAULT));
     props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, BUFFER_MEMORY_CONFIG_DEFAULT);
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER_DEFAULT);
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VALUE_SERIALIZER_DEFAULT);
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/KafkaTransportProtocol.java b/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/KafkaTransportProtocol.java
index 0f56e8f..9e91127 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/KafkaTransportProtocol.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/KafkaTransportProtocol.java
@@ -45,6 +45,9 @@ public class KafkaTransportProtocol extends TransportProtocol {
   @RdfProperty(StreamPipes.KAFKA_MESSAGE_MAX_BYTES)
   private String messageMaxBytes;
 
+  @RdfProperty(StreamPipes.KAFKA_MAX_REQUEST_SIZE)
+  private String maxRequestSize;
+
   @RdfProperty(StreamPipes.KAFKA_ACKS)
   private String acks;
 
@@ -76,6 +79,13 @@ public class KafkaTransportProtocol extends TransportProtocol {
     this.kafkaPort = other.getKafkaPort();
     this.zookeeperHost = other.getZookeeperHost();
     this.zookeeperPort = other.getZookeeperPort();
+    this.acks = other.getAcks();
+    this.batchSize = other.getBatchSize();
+    this.groupId = other.getGroupId();
+    this.lingerMs = other.getLingerMs();
+    this.maxRequestSize = other.getMaxRequestSize();
+    this.messageMaxBytes = other.getMessageMaxBytes();
+    this.offset = other.getOffset();
   }
 
   public KafkaTransportProtocol(String kafkaHost, Integer kafkaPort, WildcardTopicDefinition wildcardTopicDefinition) {
@@ -164,4 +174,12 @@ public class KafkaTransportProtocol extends TransportProtocol {
   public void setGroupId(String groupId) {
     this.groupId = groupId;
   }
+
+  public String getMaxRequestSize() {
+    return maxRequestSize;
+  }
+
+  public void setMaxRequestSize(String maxRequestSize) {
+    this.maxRequestSize = maxRequestSize;
+  }
 }
diff --git a/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java b/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
index fb65a75..8b78f48 100644
--- a/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
+++ b/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
@@ -162,6 +162,7 @@ public class StreamPipes {
   public static final String KAFKA_BATCH_SIZE = NS + "kafkaBatchSize";
   public static final String KAFKA_OFFSET = NS + "kafkaOffset";
   public static final String KAFKA_GROUP_ID = NS + "kafkaGroupId";
+  public static final String KAFKA_MAX_REQUEST_SIZE = NS + "kafkaMaxRequestSize";
 
   public static final String ERROR_TOPIC = NS + "errorTopic";
   public static final String STATS_TOPIC = NS + "statsTopic";
@@ -396,4 +397,5 @@ public class StreamPipes {
   public static final String HAS_CODE_TEMPLATE = NS + "hasCodeTemplate";
   public static final String USER_DEFINED_OUTPUT_STRATEGY = NS + "UserDefinedOutputStrategy";
   public static final String PE_CONFIGURED = NS + "isPeConfigured" ;
+
 }