You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2021/02/19 20:44:52 UTC

[kafka] branch trunk updated: KAFKA-12343: Handle exceptions better in TopicAdmin, including UnsupportedVersionException (#10158)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1c31176  KAFKA-12343: Handle exceptions better in TopicAdmin, including UnsupportedVersionException (#10158)
1c31176 is described below

commit 1c31176ae1ef49071b0d3e717e5f3037492b5b30
Author: Randall Hauch <rh...@gmail.com>
AuthorDate: Fri Feb 19 14:43:32 2021 -0600

    KAFKA-12343: Handle exceptions better in TopicAdmin, including UnsupportedVersionException (#10158)
    
    Refactored the KafkaBasedLog logic to read end offsets into a separate method to make it easier to test. Also changed the TopicAdmin.endOffsets method to throw the original UnsupportedVersionException, LeaderNotAvailableException, and TimeoutException rather than wrapping, to better conform with the consumer method and how the KafkaBasedLog retries those exceptions.
    
    Added new tests to verify various scenarios and errors.
    
    Author: Randall Hauch <rh...@gmail.com>
    Reviewers: Konstantine Karantasis <ko...@confluent.io>, Chia-Ping Tsai <ch...@gmail.com>
---
 .../apache/kafka/connect/util/KafkaBasedLog.java   | 65 +++++++++-------
 .../org/apache/kafka/connect/util/TopicAdmin.java  | 16 ++--
 .../kafka/connect/util/KafkaBasedLogTest.java      | 89 +++++++++++++++++++++-
 .../apache/kafka/connect/util/TopicAdminTest.java  | 12 ++-
 4 files changed, 140 insertions(+), 42 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index 6a2a787..6e2350f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -28,7 +28,9 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.errors.ConnectException;
@@ -318,32 +320,8 @@ public class KafkaBasedLog<K, V> {
     }
 
     private void readToLogEnd() {
-        log.trace("Reading to end of offset log");
-
         Set<TopicPartition> assignment = consumer.assignment();
-        Map<TopicPartition, Long> endOffsets;
-        // Note that we'd prefer to not use the consumer to find the end offsets for the assigned topic partitions.
-        // That is because it's possible that the consumer is already blocked waiting for new records to appear, when
-        // the consumer is already at the end. In such cases, using 'consumer.endOffsets(...)' will block until at least
-        // one more record becomes available, meaning we can't even check whether we're at the end offset.
-        // Since all we're trying to do here is get the end offset, we should use the supplied admin client
-        // (if available)
-        // (which prevents 'consumer.endOffsets(...)'
-        // from
-
-        // Deprecated constructors do not provide an admin supplier, so the admin is potentially null.
-        if (admin != null) {
-            // Use the admin client to immediately find the end offsets for the assigned topic partitions.
-            // Unlike using the consumer
-            endOffsets = admin.endOffsets(assignment);
-        } else {
-            // The admin may be null if older deprecated constructor is used, though AK Connect currently always provides an admin client.
-            // Using the consumer is not ideal, because when the topic has low volume, the 'poll(...)' method called from the
-            // work thread may have blocked the consumer while waiting for more records (even when there are none).
-            // In such cases, this call to the consumer to simply find the end offsets will block even though we might already be
-            // at the end offset.
-            endOffsets = consumer.endOffsets(assignment);
-        }
+        Map<TopicPartition, Long> endOffsets = readEndOffsets(assignment);
         log.trace("Reading to end of log offsets {}", endOffsets);
 
         while (!endOffsets.isEmpty()) {
@@ -366,6 +344,37 @@ public class KafkaBasedLog<K, V> {
         }
     }
 
+    // Visible for testing
+    Map<TopicPartition, Long> readEndOffsets(Set<TopicPartition> assignment) {
+        log.trace("Reading to end of offset log");
+
+        // Note that we'd prefer to not use the consumer to find the end offsets for the assigned topic partitions.
+        // That is because it's possible that the consumer is already blocked waiting for new records to appear, when
+        // the consumer is already at the end. In such cases, using 'consumer.endOffsets(...)' will block until at least
+        // one more record becomes available, meaning we can't even check whether we're at the end offset.
+        // Since all we're trying to do here is get the end offset, we should use the supplied admin client
+        // (if available) to obtain the end offsets for the given topic partitions.
+
+        // Deprecated constructors do not provide an admin supplier, so the admin is potentially null.
+        if (admin != null) {
+            // Use the admin client to immediately find the end offsets for the assigned topic partitions.
+            // Unlike using the consumer
+            try {
+                return admin.endOffsets(assignment);
+            } catch (UnsupportedVersionException e) {
+                // This may happen with really old brokers that don't support the auto topic creation
+                // field in metadata requests
+                log.debug("Reading to end of log offsets with consumer since admin client is unsupported: {}", e.getMessage());
+                // Forget the reference to the admin so that we won't even try to use the admin the next time this method is called
+                admin = null;
+                // continue and let the consumer handle the read
+            }
+            // Other errors, like timeouts and retriable exceptions are intentionally propagated
+        }
+        // The admin may be null if older deprecated constructor is used or if the admin client is using a broker that doesn't
+        // support getting the end offsets (e.g., 0.10.x). In such cases, we should use the consumer, which is not ideal (see above).
+        return consumer.endOffsets(assignment);
+    }
 
     private class WorkThread extends Thread {
         public WorkThread() {
@@ -390,7 +399,11 @@ public class KafkaBasedLog<K, V> {
                             log.trace("Finished read to end log for topic {}", topic);
                         } catch (TimeoutException e) {
                             log.warn("Timeout while reading log to end for topic '{}'. Retrying automatically. " +
-                                "This may occur when brokers are unavailable or unreachable. Reason: {}", topic, e.getMessage());
+                                     "This may occur when brokers are unavailable or unreachable. Reason: {}", topic, e.getMessage());
+                            continue;
+                        } catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) {
+                            log.warn("Retriable error while reading log to end for topic '{}'. Retrying automatically. " +
+                                     "Reason: {}", topic, e.getMessage());
                             continue;
                         } catch (WakeupException e) {
                             // Either received another get() call and need to retry reading to end of log or stop() was
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
index 9a7907b..9661c69 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
@@ -651,8 +651,12 @@ public class TopicAdmin implements AutoCloseable {
      * @param partitions the topic partitions
      * @return the map of offset for each topic partition, or an empty map if the supplied partitions
      *         are null or empty
-     * @throws RetriableException if a retriable error occurs, the operation takes too long, or the
-     *         thread is interrupted while attempting to perform this operation
+     * @throws UnsupportedVersionException if the admin client cannot read end offsets
+     * @throws TimeoutException if the offset metadata could not be fetched before the amount of time allocated
+     *         by {@code request.timeout.ms} expires, and this call can be retried
+     * @throws LeaderNotAvailableException if the leader was not available and this call can be retried
+     * @throws RetriableException if a retriable error occurs, or the thread is interrupted while attempting 
+     *         to perform this operation
      * @throws ConnectException if a non retriable error occurs
      */
     public Map<TopicPartition, Long> endOffsets(Set<TopicPartition> partitions) {
@@ -677,13 +681,15 @@ public class TopicAdmin implements AutoCloseable {
                     // Should theoretically never happen, because this method is the same as what the consumer uses and therefore
                     // should exist in the broker since before the admin client was added
                     String msg = String.format("API to get the get the end offsets for topic '%s' is unsupported on brokers at %s", topic, bootstrapServers());
-                    throw new ConnectException(msg, e);
+                    throw new UnsupportedVersionException(msg, e);
                 } else if (cause instanceof TimeoutException) {
                     String msg = String.format("Timed out while waiting to get end offsets for topic '%s' on brokers at %s", topic, bootstrapServers());
-                    throw new RetriableException(msg, e);
+                    throw new TimeoutException(msg, e);
                 } else if (cause instanceof LeaderNotAvailableException) {
                     String msg = String.format("Unable to get end offsets during leader election for topic '%s' on brokers at %s", topic, bootstrapServers());
-                    throw new RetriableException(msg, e);
+                    throw new LeaderNotAvailableException(msg, e);
+                } else if (cause instanceof org.apache.kafka.common.errors.RetriableException) {
+                    throw (org.apache.kafka.common.errors.RetriableException) cause;
                 } else {
                     String msg = String.format("Error while getting end offsets for topic '%s' on brokers at %s", topic, bootstrapServers());
                     throw new ConnectException(msg, e);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
index 15bf8ca..e36f2a9 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.LeaderNotAvailableException;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.TimestampType;
@@ -61,11 +62,13 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
 @RunWith(PowerMockRunner.class)
@@ -117,6 +120,8 @@ public class KafkaBasedLogTest {
     @Mock
     private KafkaProducer<String, String> producer;
     private MockConsumer<String, String> consumer;
+    @Mock
+    private TopicAdmin admin;
 
     private Map<TopicPartition, List<ConsumerRecord<String, String>>> consumedRecords = new HashMap<>();
     private Callback<ConsumerRecord<String, String>> consumedCallback = (error, record) -> {
@@ -463,15 +468,91 @@ public class KafkaBasedLogTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testReadEndOffsetsUsingAdmin() throws Exception {
+        // Create a log that uses the admin supplier
+        setupWithAdmin();
+        expectProducerAndConsumerCreate();
+
+        Set<TopicPartition> tps = new HashSet<>(Arrays.asList(TP0, TP1));
+        Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(TP0, 0L);
+        endOffsets.put(TP1, 0L);
+        admin.endOffsets(EasyMock.eq(tps));
+        PowerMock.expectLastCall().andReturn(endOffsets).times(2);
+
+        PowerMock.replayAll();
+
+        store.start();
+        assertEquals(endOffsets, store.readEndOffsets(tps));
+    }
+
+    @Test
+    public void testReadEndOffsetsUsingAdminThatFailsWithUnsupported() throws Exception {
+        // Create a log that uses the admin supplier
+        setupWithAdmin();
+        expectProducerAndConsumerCreate();
+
+        Set<TopicPartition> tps = new HashSet<>(Arrays.asList(TP0, TP1));
+        // Getting end offsets using the admin client should fail with unsupported version
+        admin.endOffsets(EasyMock.eq(tps));
+        PowerMock.expectLastCall().andThrow(new UnsupportedVersionException("too old"));
+
+        // Falls back to the consumer
+        Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(TP0, 0L);
+        endOffsets.put(TP1, 0L);
+        consumer.updateEndOffsets(endOffsets);
+
+        PowerMock.replayAll();
+
+        store.start();
+        assertEquals(endOffsets, store.readEndOffsets(tps));
+    }
+
+    @Test
+    public void testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws Exception {
+        // Create a log that uses the admin supplier
+        setupWithAdmin();
+        expectProducerAndConsumerCreate();
+
+        Set<TopicPartition> tps = new HashSet<>(Arrays.asList(TP0, TP1));
+        Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(TP0, 0L);
+        endOffsets.put(TP1, 0L);
+        // Getting end offsets upon startup should work fine
+        admin.endOffsets(EasyMock.eq(tps));
+        PowerMock.expectLastCall().andReturn(endOffsets).times(1);
+        // Getting end offsets using the admin client should fail with leader not available
+        admin.endOffsets(EasyMock.eq(tps));
+        PowerMock.expectLastCall().andThrow(new LeaderNotAvailableException("retry"));
+
+        PowerMock.replayAll();
+
+        store.start();
+        assertThrows(LeaderNotAvailableException.class, () -> store.readEndOffsets(tps));
+    }
+
+    @SuppressWarnings("unchecked")
+    private void setupWithAdmin() {
+        Supplier<TopicAdmin> adminSupplier = () -> admin;
+        java.util.function.Consumer<TopicAdmin> initializer = admin -> { };
+        store = PowerMock.createPartialMock(KafkaBasedLog.class, new String[]{"createConsumer", "createProducer"},
+                TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, consumedCallback, time, initializer);
+    }
+
+    private void expectProducerAndConsumerCreate() throws Exception {
+        PowerMock.expectPrivate(store, "createProducer")
+                 .andReturn(producer);
+        PowerMock.expectPrivate(store, "createConsumer")
+                 .andReturn(consumer);
+    }
 
     private void expectStart() throws Exception {
         initializer.run();
         EasyMock.expectLastCall().times(1);
 
-        PowerMock.expectPrivate(store, "createProducer")
-                .andReturn(producer);
-        PowerMock.expectPrivate(store, "createConsumer")
-                .andReturn(consumer);
+        expectProducerAndConsumerCreate();
     }
 
     private void expectStop() {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
index 9ba0b1d..edd9891 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.CreateTopicsResponseData;
@@ -50,7 +51,6 @@ import org.apache.kafka.common.requests.ListOffsetsResponse;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.connect.errors.ConnectException;
-import org.apache.kafka.connect.errors.RetriableException;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -485,7 +485,7 @@ public class TopicAdminTest {
     }
 
     @Test
-    public void endOffsetsShouldFailWithNonRetriableWhenVersionUnsupportedErrorOccurs() {
+    public void endOffsetsShouldFailWithUnsupportedVersionWhenVersionUnsupportedErrorOccurs() {
         String topicName = "myTopic";
         TopicPartition tp1 = new TopicPartition(topicName, 0);
         Set<TopicPartition> tps = Collections.singleton(tp1);
@@ -496,15 +496,14 @@ public class TopicAdminTest {
             env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
             env.kafkaClient().prepareResponse(listOffsetsResultWithUnsupportedVersion(tp1, offset));
             TopicAdmin admin = new TopicAdmin(null, env.adminClient());
-            ConnectException e = assertThrows(ConnectException.class, () -> {
+            UnsupportedVersionException e = assertThrows(UnsupportedVersionException.class, () -> {
                 admin.endOffsets(tps);
             });
-            assertTrue(e.getMessage().contains("is unsupported on brokers"));
         }
     }
 
     @Test
-    public void endOffsetsShouldFailWithRetriableWhenTimeoutErrorOccurs() {
+    public void endOffsetsShouldFailWithTimeoutExceptionWhenTimeoutErrorOccurs() {
         String topicName = "myTopic";
         TopicPartition tp1 = new TopicPartition(topicName, 0);
         Set<TopicPartition> tps = Collections.singleton(tp1);
@@ -515,10 +514,9 @@ public class TopicAdminTest {
             env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
             env.kafkaClient().prepareResponse(listOffsetsResultWithTimeout(tp1, offset));
             TopicAdmin admin = new TopicAdmin(null, env.adminClient());
-            RetriableException e = assertThrows(RetriableException.class, () -> {
+            TimeoutException e = assertThrows(TimeoutException.class, () -> {
                 admin.endOffsets(tps);
             });
-            assertTrue(e.getMessage().contains("Timed out while waiting"));
         }
     }