You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/06/16 17:30:20 UTC

[kafka] branch 2.6 updated: KAFKA-10169: Error message when transit to Aborting / AbortableError / FatalError (#8880)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 564ac46  KAFKA-10169: Error message when transit to Aborting / AbortableError / FatalError (#8880)
564ac46 is described below

commit 564ac4624cf50518e3fc69b5232240874f745f79
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Tue Jun 16 10:28:36 2020 -0700

    KAFKA-10169: Error message when transit to Aborting / AbortableError / FatalError (#8880)
    
    Reviewers: John Roesler <vv...@apache.org>
---
 .../src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java  | 1 +
 .../org/apache/kafka/clients/producer/internals/TransactionManager.java | 2 ++
 .../apache/kafka/streams/processor/internals/RecordCollectorImpl.java   | 2 +-
 3 files changed, 4 insertions(+), 1 deletion(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 375ada5..e53d02b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -743,6 +743,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     public void abortTransaction() throws ProducerFencedException {
         throwIfNoTransactionManager();
         throwIfProducerClosed();
+        log.info("Aborting incomplete transaction");
         TransactionalRequestResult result = transactionManager.beginAbort();
         sender.wakeup();
         result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 5f1a1f4..e3ce1d1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -478,10 +478,12 @@ public class TransactionManager {
             return;
         }
 
+        log.info("Transiting to abortable error state due to {}", exception.toString());
         transitionTo(State.ABORTABLE_ERROR, exception);
     }
 
     synchronized void transitionToFatalError(RuntimeException exception) {
+        log.info("Transiting to fatal error state due to {}", exception.toString());
         transitionTo(State.FATAL_ERROR, exception);
 
         if (pendingResult != null) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 124e6f6..180aae2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -251,7 +251,7 @@ public class RecordCollectorImpl implements RecordCollector {
      */
     @Override
     public void close() {
-        log.debug("Closing record collector");
+        log.info("Closing record collector");
 
         if (eosEnabled) {
             streamsProducer.abortTransaction();