You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2020/09/02 09:08:52 UTC
[incubator-streampipes] branch dev updated: [STREAMPIPES-130] Add
max.request.size to Kafka producer config
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new cccdfb4 [STREAMPIPES-130] Add max.request.size to Kafka producer config
cccdfb4 is described below
commit cccdfb405a821d0b9f08a04f63b6a7476f71a40b
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Wed Sep 2 11:06:09 2020 +0200
[STREAMPIPES-130] Add max.request.size to Kafka producer config
---
.../messaging/kafka/config/ProducerConfigFactory.java | 3 +++
.../model/grounding/KafkaTransportProtocol.java | 18 ++++++++++++++++++
.../org/apache/streampipes/vocabulary/StreamPipes.java | 2 ++
3 files changed, 23 insertions(+)
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ProducerConfigFactory.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ProducerConfigFactory.java
index e954759..5d18650 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ProducerConfigFactory.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ProducerConfigFactory.java
@@ -29,6 +29,7 @@ public class ProducerConfigFactory extends AbstractConfigFactory {
private static final Integer BATCH_SIZE_CONFIG_DEFAULT = 1638400;
private static final Integer LINGER_MS_DEFAULT = 20;
private static final Integer BUFFER_MEMORY_CONFIG_DEFAULT = 33554432;
+ private static final Integer MAX_REQUEST_SIZE_CONFIG_DEFAULT = 5000012;
private static final String KEY_SERIALIZER_DEFAULT = "org.apache.kafka.common.serialization" +
".StringSerializer";
@@ -51,6 +52,8 @@ public class ProducerConfigFactory extends AbstractConfigFactory {
getConfigOrDefault(protocol::getBatchSize, BATCH_SIZE_CONFIG_DEFAULT));
props.put(ProducerConfig.LINGER_MS_CONFIG,
getConfigOrDefault(protocol::getLingerMs, LINGER_MS_DEFAULT));
+ props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, getConfigOrDefault(protocol::getMaxRequestSize,
+ MAX_REQUEST_SIZE_CONFIG_DEFAULT));
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, BUFFER_MEMORY_CONFIG_DEFAULT);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER_DEFAULT);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VALUE_SERIALIZER_DEFAULT);
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/KafkaTransportProtocol.java b/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/KafkaTransportProtocol.java
index 0f56e8f..9e91127 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/KafkaTransportProtocol.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/KafkaTransportProtocol.java
@@ -45,6 +45,9 @@ public class KafkaTransportProtocol extends TransportProtocol {
@RdfProperty(StreamPipes.KAFKA_MESSAGE_MAX_BYTES)
private String messageMaxBytes;
+ @RdfProperty(StreamPipes.KAFKA_MAX_REQUEST_SIZE)
+ private String maxRequestSize;
+
@RdfProperty(StreamPipes.KAFKA_ACKS)
private String acks;
@@ -76,6 +79,13 @@ public class KafkaTransportProtocol extends TransportProtocol {
this.kafkaPort = other.getKafkaPort();
this.zookeeperHost = other.getZookeeperHost();
this.zookeeperPort = other.getZookeeperPort();
+ this.acks = other.getAcks();
+ this.batchSize = other.getBatchSize();
+ this.groupId = other.getGroupId();
+ this.lingerMs = other.getLingerMs();
+ this.maxRequestSize = other.getMaxRequestSize();
+ this.messageMaxBytes = other.getMessageMaxBytes();
+ this.offset = other.getOffset();
}
public KafkaTransportProtocol(String kafkaHost, Integer kafkaPort, WildcardTopicDefinition wildcardTopicDefinition) {
@@ -164,4 +174,12 @@ public class KafkaTransportProtocol extends TransportProtocol {
public void setGroupId(String groupId) {
this.groupId = groupId;
}
+
+ public String getMaxRequestSize() {
+ return maxRequestSize;
+ }
+
+ public void setMaxRequestSize(String maxRequestSize) {
+ this.maxRequestSize = maxRequestSize;
+ }
}
diff --git a/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java b/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
index fb65a75..8b78f48 100644
--- a/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
+++ b/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/StreamPipes.java
@@ -162,6 +162,7 @@ public class StreamPipes {
public static final String KAFKA_BATCH_SIZE = NS + "kafkaBatchSize";
public static final String KAFKA_OFFSET = NS + "kafkaOffset";
public static final String KAFKA_GROUP_ID = NS + "kafkaGroupId";
+ public static final String KAFKA_MAX_REQUEST_SIZE = NS + "kafkaMaxRequestSize";
public static final String ERROR_TOPIC = NS + "errorTopic";
public static final String STATS_TOPIC = NS + "statsTopic";
@@ -396,4 +397,5 @@ public class StreamPipes {
public static final String HAS_CODE_TEMPLATE = NS + "hasCodeTemplate";
public static final String USER_DEFINED_OUTPUT_STRATEGY = NS + "UserDefinedOutputStrategy";
public static final String PE_CONFIGURED = NS + "isPeConfigured" ;
+
}