You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2020/03/27 09:50:39 UTC

[GitHub] [druid] yuanlihan opened a new pull request #9573: Fix some flaws of KafkaEmitter

yuanlihan opened a new pull request #9573: Fix some flaws of KafkaEmitter
URL: https://github.com/apache/druid/pull/9573
 
 
   
   ### Description
   There are some flaws in `KafkaEmitter`:
   1. annoying error logs(see [KAFKA-4854](https://issues.apache.org/jira/browse/KAFKA-4854)), like this 
   `2020-03-27T09:53:02,057 ERROR [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.producer.internals.RecordBatch - Error executing user-provided callback on message for topic-partition 'druid_metrics_test-0'
   java.lang.NullPointerException`
   2. the `start` method is called twice by the `LifecycleStart` framework
   3. redundant periodic scheduling of send tasks
   
   
   <hr>
   
   This PR has:
   - [x] been self-reviewed.
   - [x] been tested in a test Druid cluster.
   
   <hr>
   
   ##### Key changed class in this PR
    * `KafkaEmitter `
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] himanshug commented on a change in pull request #9573: Fix some flaws of KafkaEmitter

Posted by GitBox <gi...@apache.org>.
himanshug commented on a change in pull request #9573: Fix some flaws of KafkaEmitter
URL: https://github.com/apache/druid/pull/9573#discussion_r403854106
 
 

 ##########
 File path: extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
 ##########
 @@ -130,21 +120,21 @@ public void start()
 
   private void sendMetricToKafka()
   {
-    sendToKafka(config.getMetricTopic(), metricQueue);
+    sendToKafka(config.getMetricTopic(), metricQueue, setProducerCallback(metricLost));
   }
 
   private void sendAlertToKafka()
   {
-    sendToKafka(config.getAlertTopic(), alertQueue);
+    sendToKafka(config.getAlertTopic(), alertQueue, setProducerCallback(alertLost));
   }
 
-  private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<String> recordQueue)
+  private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<String> recordQueue, Callback callback)
   {
     ObjectContainer<String> objectToSend;
     try {
       while (true) {
         objectToSend = recordQueue.take();
-        producer.send(new ProducerRecord<>(topic, objectToSend.getData()), producerCallback);
+        producer.send(new ProducerRecord<>(topic, objectToSend.getData()), callback);
       }
     }
     catch (InterruptedException e) {
 
 Review comment:
   ```suggestion
       catch (Throwable e) {
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] himanshug commented on a change in pull request #9573: Fix some flaws of KafkaEmitter

Posted by GitBox <gi...@apache.org>.
himanshug commented on a change in pull request #9573: Fix some flaws of KafkaEmitter
URL: https://github.com/apache/druid/pull/9573#discussion_r405966642
 
 

 ##########
 File path: extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
 ##########
 @@ -130,24 +120,24 @@ public void start()
 
   private void sendMetricToKafka()
   {
-    sendToKafka(config.getMetricTopic(), metricQueue);
+    sendToKafka(config.getMetricTopic(), metricQueue, setProducerCallback(metricLost));
   }
 
   private void sendAlertToKafka()
   {
-    sendToKafka(config.getAlertTopic(), alertQueue);
+    sendToKafka(config.getAlertTopic(), alertQueue, setProducerCallback(alertLost));
   }
 
-  private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<String> recordQueue)
+  private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<String> recordQueue, Callback callback)
   {
     ObjectContainer<String> objectToSend;
     try {
       while (true) {
         objectToSend = recordQueue.take();
-        producer.send(new ProducerRecord<>(topic, objectToSend.getData()), producerCallback);
+        producer.send(new ProducerRecord<>(topic, objectToSend.getData()), callback);
       }
     }
-    catch (InterruptedException e) {
+    catch (Throwable e) {
       log.warn(e, "Failed to take record from queue!");
 
 Review comment:
   sounds reasonable, however I would still make slight log messaage change just in case.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] himanshug merged pull request #9573: Fix some flaws of KafkaEmitter

Posted by GitBox <gi...@apache.org>.
himanshug merged pull request #9573: Fix some flaws of KafkaEmitter
URL: https://github.com/apache/druid/pull/9573
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] yuanlihan commented on a change in pull request #9573: Fix some flaws of KafkaEmitter

Posted by GitBox <gi...@apache.org>.
yuanlihan commented on a change in pull request #9573: Fix some flaws of KafkaEmitter
URL: https://github.com/apache/druid/pull/9573#discussion_r405903401
 
 

 ##########
 File path: extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
 ##########
 @@ -130,24 +120,24 @@ public void start()
 
   private void sendMetricToKafka()
   {
-    sendToKafka(config.getMetricTopic(), metricQueue);
+    sendToKafka(config.getMetricTopic(), metricQueue, setProducerCallback(metricLost));
   }
 
   private void sendAlertToKafka()
   {
-    sendToKafka(config.getAlertTopic(), alertQueue);
+    sendToKafka(config.getAlertTopic(), alertQueue, setProducerCallback(alertLost));
   }
 
-  private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<String> recordQueue)
+  private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<String> recordQueue, Callback callback)
   {
     ObjectContainer<String> objectToSend;
     try {
       while (true) {
         objectToSend = recordQueue.take();
-        producer.send(new ProducerRecord<>(topic, objectToSend.getData()), producerCallback);
+        producer.send(new ProducerRecord<>(topic, objectToSend.getData()), callback);
       }
     }
-    catch (InterruptedException e) {
+    catch (Throwable e) {
       log.warn(e, "Failed to take record from queue!");
 
 Review comment:
   @himanshug, thanks for the review. As the potential exceptions from the producer are handled in the `callback`, so in my opinion, currently the `while(true) { ... }` block is already supposed to raised the `InterruptedException` only.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] himanshug commented on a change in pull request #9573: Fix some flaws of KafkaEmitter

Posted by GitBox <gi...@apache.org>.
himanshug commented on a change in pull request #9573: Fix some flaws of KafkaEmitter
URL: https://github.com/apache/druid/pull/9573#discussion_r405966622
 
 

 ##########
 File path: extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
 ##########
 @@ -130,24 +120,24 @@ public void start()
 
   private void sendMetricToKafka()
   {
-    sendToKafka(config.getMetricTopic(), metricQueue);
+    sendToKafka(config.getMetricTopic(), metricQueue, setProducerCallback(metricLost));
   }
 
   private void sendAlertToKafka()
   {
-    sendToKafka(config.getAlertTopic(), alertQueue);
+    sendToKafka(config.getAlertTopic(), alertQueue, setProducerCallback(alertLost));
   }
 
-  private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<String> recordQueue)
+  private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<String> recordQueue, Callback callback)
   {
     ObjectContainer<String> objectToSend;
     try {
       while (true) {
         objectToSend = recordQueue.take();
-        producer.send(new ProducerRecord<>(topic, objectToSend.getData()), producerCallback);
+        producer.send(new ProducerRecord<>(topic, objectToSend.getData()), callback);
       }
     }
-    catch (InterruptedException e) {
+    catch (Throwable e) {
       log.warn(e, "Failed to take record from queue!");
 
 Review comment:
   ```suggestion
         log.warn(e, "Exception while getting record from queue or producer send, Events would not be emitted anymore.");
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] yuanlihan commented on a change in pull request #9573: Fix some flaws of KafkaEmitter

Posted by GitBox <gi...@apache.org>.
yuanlihan commented on a change in pull request #9573: Fix some flaws of KafkaEmitter
URL: https://github.com/apache/druid/pull/9573#discussion_r406565172
 
 

 ##########
 File path: extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
 ##########
 @@ -130,24 +120,24 @@ public void start()
 
   private void sendMetricToKafka()
   {
-    sendToKafka(config.getMetricTopic(), metricQueue);
+    sendToKafka(config.getMetricTopic(), metricQueue, setProducerCallback(metricLost));
   }
 
   private void sendAlertToKafka()
   {
-    sendToKafka(config.getAlertTopic(), alertQueue);
+    sendToKafka(config.getAlertTopic(), alertQueue, setProducerCallback(alertLost));
   }
 
-  private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<String> recordQueue)
+  private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<String> recordQueue, Callback callback)
   {
     ObjectContainer<String> objectToSend;
     try {
       while (true) {
         objectToSend = recordQueue.take();
-        producer.send(new ProducerRecord<>(topic, objectToSend.getData()), producerCallback);
+        producer.send(new ProducerRecord<>(topic, objectToSend.getData()), callback);
       }
     }
-    catch (InterruptedException e) {
+    catch (Throwable e) {
       log.warn(e, "Failed to take record from queue!");
 
 Review comment:
   thanks @himanshug, it's more clear now.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] himanshug commented on a change in pull request #9573: Fix some flaws of KafkaEmitter

Posted by GitBox <gi...@apache.org>.
himanshug commented on a change in pull request #9573: Fix some flaws of KafkaEmitter
URL: https://github.com/apache/druid/pull/9573#discussion_r405729389
 
 

 ##########
 File path: extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
 ##########
 @@ -130,24 +120,24 @@ public void start()
 
   private void sendMetricToKafka()
   {
-    sendToKafka(config.getMetricTopic(), metricQueue);
+    sendToKafka(config.getMetricTopic(), metricQueue, setProducerCallback(metricLost));
   }
 
   private void sendAlertToKafka()
   {
-    sendToKafka(config.getAlertTopic(), alertQueue);
+    sendToKafka(config.getAlertTopic(), alertQueue, setProducerCallback(alertLost));
   }
 
-  private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<String> recordQueue)
+  private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<String> recordQueue, Callback callback)
   {
     ObjectContainer<String> objectToSend;
     try {
       while (true) {
         objectToSend = recordQueue.take();
-        producer.send(new ProducerRecord<>(topic, objectToSend.getData()), producerCallback);
+        producer.send(new ProducerRecord<>(topic, objectToSend.getData()), callback);
       }
     }
-    catch (InterruptedException e) {
+    catch (Throwable e) {
       log.warn(e, "Failed to take record from queue!");
 
 Review comment:
   on any  exception  from the block inside `while(true) { ... }` would lead to event emission getting stopped. Does it make sense to have a try-catch inside there to ignore exceptions except for `InterruptedException` so that emitting stops only on thread interrupt which would happen when druid is shutting down.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org