You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2019/08/13 15:53:13 UTC

[kafka] branch 2.1 updated: KAFKA-7941: Catch TimeoutException in KafkaBasedLog worker thread (#6283)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 2bc2af6  KAFKA-7941: Catch TimeoutException in KafkaBasedLog worker thread (#6283)
2bc2af6 is described below

commit 2bc2af6653dca883eee954cf67ab81c7159df199
Author: Paul <pg...@gmail.com>
AuthorDate: Tue Aug 13 10:16:55 2019 -0500

    KAFKA-7941: Catch TimeoutException in KafkaBasedLog worker thread (#6283)
    
    When calling readLogToEnd(), the KafkaBasedLog worker thread should catch TimeoutException and log a warning, which can occur if brokers are unavailable, otherwise the worker thread terminates.
    
    Includes an enhancement to MockConsumer that allows simulating exceptions not just when polling but also when querying for offsets, which is necessary for testing the fix.
    
    Author: Paul Whalen <pg...@gmail.com>
    Reviewers: Randall Hauch <rh...@gmail.com>, Arjun Satish <ar...@confluent.io>, Ryanne Dolan <ry...@gmail.com>
---
 .../kafka/clients/consumer/MockConsumer.java       | 35 ++++++++--
 .../apache/kafka/connect/util/KafkaBasedLog.java   |  5 ++
 .../kafka/connect/util/KafkaBasedLogTest.java      | 76 +++++++++++++++++++++-
 .../internals/GlobalStateManagerImplTest.java      |  2 +-
 .../internals/GlobalStreamThreadTest.java          |  2 +-
 .../internals/StoreChangelogReaderTest.java        |  2 +-
 .../processor/internals/StreamThreadTest.java      |  2 +-
 7 files changed, 112 insertions(+), 12 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index dccb359..be95627 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -58,7 +58,8 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     private final Set<TopicPartition> paused;
 
     private Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
-    private KafkaException exception;
+    private KafkaException pollException;
+    private KafkaException offsetsException;
     private AtomicBoolean wakeup;
     private boolean closed;
 
@@ -71,7 +72,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
         this.beginningOffsets = new HashMap<>();
         this.endOffsets = new HashMap<>();
         this.pollTasks = new LinkedList<>();
-        this.exception = null;
+        this.pollException = null;
         this.wakeup = new AtomicBoolean(false);
         this.committed = new HashMap<>();
     }
@@ -170,9 +171,9 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
             throw new WakeupException();
         }
 
-        if (exception != null) {
-            RuntimeException exception = this.exception;
-            this.exception = null;
+        if (pollException != null) {
+            RuntimeException exception = this.pollException;
+            this.pollException = null;
             throw exception;
         }
 
@@ -213,8 +214,20 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
         recs.add(record);
     }
 
+    /**
+     * @deprecated Use {@link #setPollException(KafkaException)} instead
+     */
+    @Deprecated
     public synchronized void setException(KafkaException exception) {
-        this.exception = exception;
+        setPollException(exception);
+    }
+
+    public synchronized void setPollException(KafkaException exception) {
+        this.pollException = exception;
+    }
+
+    public synchronized void setOffsetsException(KafkaException exception) {
+        this.offsetsException = exception;
     }
 
     @Override
@@ -382,6 +395,11 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
 
     @Override
     public synchronized Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
+        if (offsetsException != null) {
+            RuntimeException exception = this.offsetsException;
+            this.offsetsException = null;
+            throw exception;
+        }
         Map<TopicPartition, Long> result = new HashMap<>();
         for (TopicPartition tp : partitions) {
             Long beginningOffset = beginningOffsets.get(tp);
@@ -394,6 +412,11 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
 
     @Override
     public synchronized Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
+        if (offsetsException != null) {
+            RuntimeException exception = this.offsetsException;
+            this.offsetsException = null;
+            throw exception;
+        }
         Map<TopicPartition, Long> result = new HashMap<>();
         for (TopicPartition tp : partitions) {
             Long endOffset = getEndOffset(endOffsets.get(tp));
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index 9d77d21..e78276a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -28,6 +28,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -312,6 +313,10 @@ public class KafkaBasedLog<K, V> {
                         try {
                             readToLogEnd();
                             log.trace("Finished read to end log for topic {}", topic);
+                        } catch (TimeoutException e) {
+                            log.warn("Timeout while reading log to end for topic '{}'. Retrying automatically. " +
+                                "This may occur when brokers are unavailable or unreachable. Reason: {}", topic, e.getMessage());
+                            continue;
                         } catch (WakeupException e) {
                             // Either received another get() call and need to retry reading to end of log or stop() was
                             // called. Both are handled by restarting this loop.
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
index 6d5efe8..1805550 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.TimestampType;
@@ -369,7 +370,7 @@ public class KafkaBasedLogTest {
     }
 
     @Test
-    public void testConsumerError() throws Exception {
+    public void testPollConsumerError() throws Exception {
         expectStart();
         expectStop();
 
@@ -387,7 +388,7 @@ public class KafkaBasedLogTest {
                 consumer.schedulePollTask(new Runnable() {
                     @Override
                     public void run() {
-                        consumer.setException(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+                        consumer.setPollException(Errors.COORDINATOR_NOT_AVAILABLE.exception());
                     }
                 });
 
@@ -423,6 +424,77 @@ public class KafkaBasedLogTest {
     }
 
     @Test
+    public void testGetOffsetsConsumerErrorOnReadToEnd() throws Exception {
+        expectStart();
+
+        // Producer flushes when read to log end is called
+        producer.flush();
+        PowerMock.expectLastCall();
+
+        expectStop();
+
+        PowerMock.replayAll();
+        final CountDownLatch finishedLatch = new CountDownLatch(1);
+        Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(TP0, 0L);
+        endOffsets.put(TP1, 0L);
+        consumer.updateEndOffsets(endOffsets);
+        store.start();
+        final AtomicBoolean getInvoked = new AtomicBoolean(false);
+        final FutureCallback<Void> readEndFutureCallback = new FutureCallback<>(new Callback<Void>() {
+            @Override
+            public void onCompletion(Throwable error, Void result) {
+                getInvoked.set(true);
+            }
+        });
+        consumer.schedulePollTask(new Runnable() {
+            @Override
+            public void run() {
+                // Once we're synchronized in a poll, start the read to end and schedule the exact set of poll events
+                // that should follow. This readToEnd call will immediately wakeup this consumer.poll() call without
+                // returning any data.
+                Map<TopicPartition, Long> newEndOffsets = new HashMap<>();
+                newEndOffsets.put(TP0, 1L);
+                newEndOffsets.put(TP1, 1L);
+                consumer.updateEndOffsets(newEndOffsets);
+                // Set exception to occur when getting offsets to read log to end.  It'll be caught in the work thread,
+                // which will retry and eventually get the correct offsets and read log to end.
+                consumer.setOffsetsException(new TimeoutException("Failed to get offsets by times"));
+                store.readToEnd(readEndFutureCallback);
+
+                // Should keep polling until it reaches current log end offset for all partitions
+                consumer.scheduleNopPollTask();
+                consumer.scheduleNopPollTask();
+                consumer.schedulePollTask(new Runnable() {
+                    @Override
+                    public void run() {
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE));
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE_NEW));
+                    }
+                });
+
+                consumer.schedulePollTask(new Runnable() {
+                    @Override
+                    public void run() {
+                        finishedLatch.countDown();
+                    }
+                });
+            }
+        });
+        readEndFutureCallback.get(10000, TimeUnit.MILLISECONDS);
+        assertTrue(getInvoked.get());
+        assertTrue(finishedLatch.await(10000, TimeUnit.MILLISECONDS));
+        assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
+        assertEquals(1L, consumer.position(TP0));
+
+        store.stop();
+
+        assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
+        assertTrue(consumer.closed());
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testProducerError() throws Exception {
         expectStart();
         TestFuture<RecordMetadata> tp0Future = new TestFuture<>();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 977c193..4867fdb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -243,7 +243,7 @@ public class GlobalStateManagerImplTest {
     @Test
     public void shouldRecoverFromInvalidOffsetExceptionAndRestoreRecords() {
         initializeConsumer(2, 1, t1);
-        consumer.setException(new InvalidOffsetException("Try Again!") {
+        consumer.setPollException(new InvalidOffsetException("Try Again!") {
             public Set<TopicPartition> partitions() {
                 return Collections.singleton(t1);
             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index 37a6fdb..e0d3096 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -242,7 +242,7 @@ public class GlobalStreamThreadTest {
             }
         }, 10 * 1000, "Input record never consumed");
 
-        mockConsumer.setException(new InvalidOffsetException("Try Again!") {
+        mockConsumer.setPollException(new InvalidOffsetException("Try Again!") {
             @Override
             public Set<TopicPartition> partitions() {
                 return Collections.singleton(topicPartition);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index aa6395b..729efc2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -134,7 +134,7 @@ public class StoreChangelogReaderTest {
     public void shouldRecoverFromInvalidOffsetExceptionAndFinishRestore() {
         final int messages = 10;
         setupConsumer(messages, topicPartition);
-        consumer.setException(new InvalidOffsetException("Try Again!") {
+        consumer.setPollException(new InvalidOffsetException("Try Again!") {
             @Override
             public Set<TopicPartition> partitions() {
                 return Collections.singleton(topicPartition);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index dc867dc..2bdb353 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -1407,7 +1407,7 @@ public class StreamThreadTest {
                 }
             }, "Never restore first record");
 
-            mockRestoreConsumer.setException(new InvalidOffsetException("Try Again!") {
+            mockRestoreConsumer.setPollException(new InvalidOffsetException("Try Again!") {
                 @Override
                 public Set<TopicPartition> partitions() {
                     return changelogPartitionSet;