You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/19 00:15:14 UTC

[kafka] branch 2.1 updated: MINOR: Log exception thrown by consumer.poll() in VerifiableConsumer (#6368)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 6acd58a  MINOR: Log exception thrown by consumer.poll() in VerifiableConsumer (#6368)
6acd58a is described below

commit 6acd58a4236f5553e689a80d1bd48cd408cb0414
Author: Bob Barrett <bo...@outlook.com>
AuthorDate: Tue Mar 5 21:12:47 2019 -0500

    MINOR: Log exception thrown by consumer.poll() in VerifiableConsumer (#6368)
    
    SecurityTest.test_client_ssl_endpoint_validation_failure is failing because it greps for 'SSLHandshakeException in the consumer and producer log files. With the fix for KAKFA-7773, the test uses the VerifiableConsumer instead of the ConsoleConsumer, which does not log the exception stack trace to the service log. This patch catches exceptions in the VerifiableConsumer and logs them in order to fix the test. Tested by running the test locally.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>
---
 .../src/main/java/org/apache/kafka/tools/VerifiableConsumer.java  | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
index 58f3471..e6955ba 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
@@ -43,6 +43,8 @@ import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -82,6 +84,8 @@ import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
  */
 public class VerifiableConsumer implements Closeable, OffsetCommitCallback, ConsumerRebalanceListener {
 
+    private static final Logger log = LoggerFactory.getLogger(VerifiableConsumer.class);
+
     private final ObjectMapper mapper = new ObjectMapper();
     private final PrintStream out;
     private final KafkaConsumer<String, String> consumer;
@@ -233,6 +237,10 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
             }
         } catch (WakeupException e) {
             // ignore, we are closing
+            log.trace("Caught WakeupException because consumer is shutdown, ignore and terminate.", e);
+        } catch (Throwable t) {
+            // Log the error so it goes to the service log and not stdout
+            log.error("Error during processing, terminating consumer process: ", t);
         } finally {
             consumer.close();
             printJson(new ShutdownComplete());