You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/04/18 01:51:00 UTC

[jira] [Commented] (KAFKA-4228) Sender thread death leaves KafkaProducer in a bad state

    [ https://issues.apache.org/jira/browse/KAFKA-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16441757#comment-16441757 ] 

ASF GitHub Bot commented on KAFKA-4228:
---------------------------------------

radai-rosenblatt closed pull request #1930: KAFKA-4228 - make producer close on sender thread death, make consumer shutdown on failure to rebalance, and make MM die on any of the above.
URL: https://github.com/apache/kafka/pull/1930
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 3efc7b5cb69..e4367997282 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -323,6 +323,17 @@ private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serial
                     this.requestTimeoutMs);
             String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
             this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
+            this.ioThread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+                @Override
+                public void uncaughtException(Thread t, Throwable e) {
+                    try {
+                        log.error("Thread " + t.getName() + " died due to uncaught exception", e);
+                    } finally {
+                        close(0, TimeUnit.MILLISECONDS); //cant wait to properly flush, because the thread doing the flushing just died
+                    }
+
+                }
+            });
             this.ioThread.start();
 
             this.errors = this.metrics.sensor("errors");
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 6471dad78df..0ab5521d66a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -126,41 +126,51 @@ public Sender(KafkaClient client,
      * The main run loop for the sender thread
      */
     public void run() {
-        log.debug("Starting Kafka producer I/O thread.");
-
-        // main loop, runs until close is called
-        while (running) {
-            try {
-                run(time.milliseconds());
-            } catch (Exception e) {
-                log.error("Uncaught error in kafka producer I/O thread: ", e);
+        boolean gracefulShutdown = false;
+        try {
+            log.debug("Starting Kafka producer I/O thread.");
+
+            // main loop, runs until close is called
+            while (running) {
+                try {
+                    run(time.milliseconds());
+                } catch (Exception e) {
+                    log.error("Uncaught error in kafka producer I/O thread: ", e);
+                }
             }
-        }
 
-        log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
+            log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
 
-        // okay we stopped accepting requests but there may still be
-        // requests in the accumulator or waiting for acknowledgment,
-        // wait until these are completed.
-        while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {
+            // okay we stopped accepting requests but there may still be
+            // requests in the accumulator or waiting for acknowledgment,
+            // wait until these are completed.
+            while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {
+                try {
+                    run(time.milliseconds());
+                } catch (Exception e) {
+                    log.error("Uncaught error in kafka producer I/O thread: ", e);
+                }
+            }
+            if (forceClose) {
+                // We need to fail all the incomplete batches and wake up the threads waiting on
+                // the futures.
+                this.accumulator.abortIncompleteBatches();
+            }
             try {
-                run(time.milliseconds());
+                this.client.close();
             } catch (Exception e) {
-                log.error("Uncaught error in kafka producer I/O thread: ", e);
+                log.error("Failed to close network client", e);
             }
-        }
-        if (forceClose) {
-            // We need to fail all the incomplete batches and wake up the threads waiting on
-            // the futures.
-            this.accumulator.abortIncompleteBatches();
-        }
-        try {
-            this.client.close();
-        } catch (Exception e) {
-            log.error("Failed to close network client", e);
-        }
 
-        log.debug("Shutdown of Kafka producer I/O thread has completed.");
+            log.debug("Shutdown of Kafka producer I/O thread has completed.");
+            gracefulShutdown = true;
+        } finally {
+            //make sure to clean up any pending batches. this is a nop on graceful shutdown
+            if (!gracefulShutdown) {
+                forceClose();
+                this.accumulator.abortIncompleteBatches();
+            }
+        }
     }
 
     /**
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index f776578f6d4..f9ee2b633e0 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -588,7 +588,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
             if (doRebalance)
               syncedRebalance
           } catch {
-            case t: Throwable => error("error during syncedRebalance", t)
+            case t: Throwable =>
+              error("Error during syncedRebalance", t)
+              shutdown()
           }
         }
         info("stopping watcher executor thread for consumer " + consumerIdString)
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 17b8f0be6cf..7122c2136a5 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -434,6 +434,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
               records.foreach(producer.send)
               maybeFlushAndCommitOffsets()
             }
+            if (!mirrorMakerConsumer.hasData && !shuttingDown && !exitingOnSendFailure) {
+              //consumer has closed (due to error)
+              throw new IllegalStateException("Consumer has shut down")
+            }
           } catch {
             case cte: ConsumerTimeoutException =>
               trace("Caught ConsumerTimeoutException, continue iteration.")


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Sender thread death leaves KafkaProducer in a bad state
> -------------------------------------------------------
>
>                 Key: KAFKA-4228
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4228
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 0.10.0.1
>            Reporter: radai rosenblatt
>            Priority: Major
>
> a KafkaProducer's Sender thread may die:
> {noformat}
> 2016/09/28 00:28:01.065 ERROR [KafkaThread] [kafka-producer-network-thread | mm_ei-lca1_uniform] [kafka-mirror-maker] [] Uncaught exception in kafka-producer-network-thread | mm_ei-lca1_uni
> java.lang.OutOfMemoryError: Java heap space
>        at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) ~[?:1.8.0_40]
>        at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) ~[?:1.8.0_40]
>        at org.apache.kafka.common.requests.RequestSend.serialize(RequestSend.java:35) ~[kafka-clients-0.9.0.666.jar:?]
>        at org.apache.kafka.common.requests.RequestSend.<init>(RequestSend.java:29) ~[kafka-clients-0.9.0.666.jar:?]
>        at org.apache.kafka.clients.producer.internals.Sender.produceRequest(Sender.java:355) ~[kafka-clients-0.9.0.666.jar:?]
>        at org.apache.kafka.clients.producer.internals.Sender.createProduceRequests(Sender.java:337) ~[kafka-clients-0.9.0.666.jar:?]
>        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:211) ~[kafka-clients-0.9.0.666.jar:?]
>        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134) ~[kafka-clients-0.9.0.666.jar:?]
>        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40]
> {noformat}
> which leaves the producer in a bad state. in this state, a call to flush(), for example, will hang indefinitely as the sender thread is not around to flush batches but theyve not been aborted.
> even worse, this can happen in MirrorMaker just before a rebalance, at which point MM will just block indefinitely during a rebalance (in beforeReleasingPartitions()).
> a rebalance participant hung in such a way will cause rebalance to fail for the rest of the participants, at which point ZKRebalancerListener.watcherExecutorThread() dies to an exception (cannot rebalance after X attempts) but the consumer that ran the thread will remain live. the end result is a bunch of zombie mirror makers and orphan topic partitions.
> a dead sender thread should result in closing the producer.
> a consumer failing to rebalance should shut down.
> any issue with the producer or consumer should cause mirror-maker death.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)