You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/10/21 14:28:09 UTC
incubator-eagle git commit: Minor: Expose several async producer
config
Repository: incubator-eagle
Updated Branches:
refs/heads/master 281aa52b0 -> 3980dcb4c
Minor: Expose several async producer config
Author: Zhao, Qingwen <qi...@apache.org>
Closes #549 from qingwen220/kafkaProducer.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/3980dcb4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/3980dcb4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/3980dcb4
Branch: refs/heads/master
Commit: 3980dcb4c34f024e8247747a48f5966438415c82
Parents: 281aa52
Author: Zhao, Qingwen <qi...@apache.org>
Authored: Fri Oct 21 22:27:55 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Fri Oct 21 22:27:55 2016 +0800
----------------------------------------------------------------------
.../apache/eagle/app/sink/KafkaStreamSink.java | 10 ++++++
.../eagle/app/sink/KafkaStreamSinkConfig.java | 36 ++++++++++++++++++++
...ecurity.auditlog.HdfsAuditLogAppProvider.xml | 25 ++++++++++++++
.../src/main/resources/application.conf | 4 +++
4 files changed, 75 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3980dcb4/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
index 2ac4779..e2a4b70 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
@@ -49,6 +49,11 @@ public class KafkaStreamSink extends StormStreamSink<KafkaStreamSinkConfig> {
properties.put("metadata.broker.list", config.getBrokerList());
properties.put("serializer.class", config.getSerializerClass());
properties.put("key.serializer.class", config.getKeySerializerClass());
+ // new added properties for async producer
+ properties.put("producer.type", config.getProducerType());
+ properties.put("batch.num.messages", config.getNumBatchMessages());
+ properties.put("request.required.acks", config.getRequestRequiredAcks());
+ properties.put("queue.buffering.max.ms", config.getMaxQueueBufferMs());
ProducerConfig producerConfig = new ProducerConfig(properties);
producer = new Producer(producerConfig);
}
@@ -97,6 +102,11 @@ public class KafkaStreamSink extends StormStreamSink<KafkaStreamSinkConfig> {
desc.setBrokerList(config.getString("dataSinkConfig.brokerList"));
desc.setSerializerClass(config.getString("dataSinkConfig.serializerClass"));
desc.setKeySerializerClass(config.getString("dataSinkConfig.keySerializerClass"));
+ // new added properties for async producer
+ desc.setNumBatchMessages(config.getString("dataSinkConfig.numBatchMessages"));
+ desc.setProducerType(config.getString("dataSinkConfig.producerType"));
+ desc.setMaxQueueBufferMs(config.getString("dataSinkConfig.maxQueueBufferMs"));
+ desc.setRequestRequiredAcks(config.getString("dataSinkConfig.requestRequiredAcks"));
return desc;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3980dcb4/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkConfig.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkConfig.java
index 9d6a0ab..d5479df 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkConfig.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkConfig.java
@@ -23,6 +23,10 @@ public class KafkaStreamSinkConfig implements StreamSinkConfig {
private String brokerList;
private String serializerClass;
private String keySerializerClass;
+ private String numBatchMessages;
+ private String maxQueueBufferMs;
+ private String producerType;
+ private String requestRequiredAcks;
public String getTopicId() {
return topicId;
@@ -56,6 +60,38 @@ public class KafkaStreamSinkConfig implements StreamSinkConfig {
this.keySerializerClass = keySerializerClass;
}
+ public String getNumBatchMessages() {
+ return numBatchMessages;
+ }
+
+ public void setNumBatchMessages(String numBatchMessages) {
+ this.numBatchMessages = numBatchMessages;
+ }
+
+ public String getMaxQueueBufferMs() {
+ return maxQueueBufferMs;
+ }
+
+ public void setMaxQueueBufferMs(String maxQueueBufferMs) {
+ this.maxQueueBufferMs = maxQueueBufferMs;
+ }
+
+ public String getProducerType() {
+ return producerType;
+ }
+
+ public void setProducerType(String producerType) {
+ this.producerType = producerType;
+ }
+
+ public String getRequestRequiredAcks() {
+ return requestRequiredAcks;
+ }
+
+ public void setRequestRequiredAcks(String requestRequiredAcks) {
+ this.requestRequiredAcks = requestRequiredAcks;
+ }
+
@Override
public String getType() {
return "KAFKA";
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3980dcb4/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
index 9f10fdc..2c02a8f 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
@@ -149,6 +149,31 @@
<description>serializer class Kafka message key</description>
</property>
+ <property>
+ <name>dataSinkConfig.producerType</name>
+ <displayName>dataSinkConfig.keySerializerClass</displayName>
+ <value>async</value>
+ <description>whether the messages are sent asynchronously in a background thread</description>
+ </property>
+ <property>
+ <name>dataSinkConfig.numBatchMessages</name>
+ <displayName>dataSinkConfig.numBatchMessages</displayName>
+ <value>4096</value>
+ <description>number of messages to send in one batch when using async mode</description>
+ </property>
+ <property>
+ <name>dataSinkConfig.maxQueueBufferMs</name>
+ <displayName>dataSinkConfig.maxQueueBufferMs</displayName>
+ <value>5000</value>
+ <description>maximum time to buffer data when using async mode</description>
+ </property>
+ <property>
+ <name>dataSinkConfig.requestRequiredAcks</name>
+ <displayName>dataSinkConfig.requestRequiredAcks</displayName>
+ <value>0</value>
+ <description>value controls when a produce request is considered completed</description>
+ </property>
+
<!-- web app related configurations -->
<property>
<name>fs.defaultFS</name>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3980dcb4/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
index c8bbcb1..be357ad 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
@@ -41,5 +41,9 @@
"brokerList" : "server.eagle.apache.org:6667",
"serializerClass" : "kafka.serializer.StringEncoder",
"keySerializerClass" : "kafka.serializer.StringEncoder"
+ "producerType" : "async",
+ "numBatchMessages" : "4096",
+ "maxQueueBufferMs" : "5000",
+ "requestRequiredAcks" : "0"
}
}