You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/10/21 14:28:09 UTC

incubator-eagle git commit: Minor: Expose several async producer config

Repository: incubator-eagle
Updated Branches:
  refs/heads/master 281aa52b0 -> 3980dcb4c


Minor: Expose several async producer config

Author: Zhao, Qingwen <qi...@apache.org>

Closes #549 from qingwen220/kafkaProducer.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/3980dcb4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/3980dcb4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/3980dcb4

Branch: refs/heads/master
Commit: 3980dcb4c34f024e8247747a48f5966438415c82
Parents: 281aa52
Author: Zhao, Qingwen <qi...@apache.org>
Authored: Fri Oct 21 22:27:55 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Fri Oct 21 22:27:55 2016 +0800

----------------------------------------------------------------------
 .../apache/eagle/app/sink/KafkaStreamSink.java  | 10 ++++++
 .../eagle/app/sink/KafkaStreamSinkConfig.java   | 36 ++++++++++++++++++++
 ...ecurity.auditlog.HdfsAuditLogAppProvider.xml | 25 ++++++++++++++
 .../src/main/resources/application.conf         |  4 +++
 4 files changed, 75 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3980dcb4/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
index 2ac4779..e2a4b70 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
@@ -49,6 +49,11 @@ public class KafkaStreamSink extends StormStreamSink<KafkaStreamSinkConfig> {
         properties.put("metadata.broker.list", config.getBrokerList());
         properties.put("serializer.class", config.getSerializerClass());
         properties.put("key.serializer.class", config.getKeySerializerClass());
+        // new added properties for async producer
+        properties.put("producer.type", config.getProducerType());
+        properties.put("batch.num.messages", config.getNumBatchMessages());
+        properties.put("request.required.acks", config.getRequestRequiredAcks());
+        properties.put("queue.buffering.max.ms", config.getMaxQueueBufferMs());
         ProducerConfig producerConfig = new ProducerConfig(properties);
         producer = new Producer(producerConfig);
     }
@@ -97,6 +102,11 @@ public class KafkaStreamSink extends StormStreamSink<KafkaStreamSinkConfig> {
             desc.setBrokerList(config.getString("dataSinkConfig.brokerList"));
             desc.setSerializerClass(config.getString("dataSinkConfig.serializerClass"));
             desc.setKeySerializerClass(config.getString("dataSinkConfig.keySerializerClass"));
+            // new added properties for async producer
+            desc.setNumBatchMessages(config.getString("dataSinkConfig.numBatchMessages"));
+            desc.setProducerType(config.getString("dataSinkConfig.producerType"));
+            desc.setMaxQueueBufferMs(config.getString("dataSinkConfig.maxQueueBufferMs"));
+            desc.setRequestRequiredAcks(config.getString("dataSinkConfig.requestRequiredAcks"));
             return desc;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3980dcb4/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkConfig.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkConfig.java
index 9d6a0ab..d5479df 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkConfig.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkConfig.java
@@ -23,6 +23,10 @@ public class KafkaStreamSinkConfig implements StreamSinkConfig {
     private String brokerList;
     private String serializerClass;
     private String keySerializerClass;
+    private String numBatchMessages;
+    private String maxQueueBufferMs;
+    private String producerType;
+    private String requestRequiredAcks;
 
     public String getTopicId() {
         return topicId;
@@ -56,6 +60,38 @@ public class KafkaStreamSinkConfig implements StreamSinkConfig {
         this.keySerializerClass = keySerializerClass;
     }
 
+    public String getNumBatchMessages() {
+        return numBatchMessages;
+    }
+
+    public void setNumBatchMessages(String numBatchMessages) {
+        this.numBatchMessages = numBatchMessages;
+    }
+
+    public String getMaxQueueBufferMs() {
+        return maxQueueBufferMs;
+    }
+
+    public void setMaxQueueBufferMs(String maxQueueBufferMs) {
+        this.maxQueueBufferMs = maxQueueBufferMs;
+    }
+
+    public String getProducerType() {
+        return producerType;
+    }
+
+    public void setProducerType(String producerType) {
+        this.producerType = producerType;
+    }
+
+    public String getRequestRequiredAcks() {
+        return requestRequiredAcks;
+    }
+
+    public void setRequestRequiredAcks(String requestRequiredAcks) {
+        this.requestRequiredAcks = requestRequiredAcks;
+    }
+
     @Override
     public String getType() {
         return "KAFKA";

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3980dcb4/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
index 9f10fdc..2c02a8f 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
@@ -149,6 +149,31 @@
             <description>serializer class Kafka message key</description>
         </property>
 
+        <property>
+            <name>dataSinkConfig.producerType</name>
+            <displayName>dataSinkConfig.keySerializerClass</displayName>
+            <value>async</value>
+            <description>whether the messages are sent asynchronously in a background thread</description>
+        </property>
+        <property>
+            <name>dataSinkConfig.numBatchMessages</name>
+            <displayName>dataSinkConfig.numBatchMessages</displayName>
+            <value>4096</value>
+            <description>number of messages to send in one batch when using async mode</description>
+        </property>
+        <property>
+            <name>dataSinkConfig.maxQueueBufferMs</name>
+            <displayName>dataSinkConfig.maxQueueBufferMs</displayName>
+            <value>5000</value>
+            <description>maximum time to buffer data when using async mode</description>
+        </property>
+        <property>
+            <name>dataSinkConfig.requestRequiredAcks</name>
+            <displayName>dataSinkConfig.requestRequiredAcks</displayName>
+            <value>0</value>
+            <description>value controls when a produce request is considered completed</description>
+        </property>
+
         <!-- web app related configurations -->
         <property>
             <name>fs.defaultFS</name>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3980dcb4/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
index c8bbcb1..be357ad 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
@@ -41,5 +41,9 @@
     "brokerList" : "server.eagle.apache.org:6667",
     "serializerClass" : "kafka.serializer.StringEncoder",
     "keySerializerClass" : "kafka.serializer.StringEncoder"
+    "producerType" : "async",
+    "numBatchMessages" : "4096",
+    "maxQueueBufferMs" : "5000",
+    "requestRequiredAcks" : "0"
   }
 }