You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2023/02/23 07:48:14 UTC

[kafka] branch 3.4 updated: KAFKA-14128: Kafka Streams does not handle TimeoutException (#13161)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
     new 96e1e41f93f KAFKA-14128: Kafka Streams does not handle TimeoutException (#13161)
96e1e41f93f is described below

commit 96e1e41f93f84cec99338aa68477a0a686f92284
Author: Lucia Cerchie <lu...@gmail.com>
AuthorDate: Wed Feb 22 23:51:51 2023 -0700

    KAFKA-14128: Kafka Streams does not handle TimeoutException (#13161)
    
    Kafka Streams is supposed to handle TimeoutException during internal topic creation gracefully. This PR fixes the exception handling code to avoid crashing on an TimeoutException returned by the admin client.
    
    Reviewer: Matthias J. Sax <ma...@confluent.io>, Colin Patrick McCabe <cm...@apache.org>, Alexandre Dupriez (@Hangleton)
---
 .../processor/internals/InternalTopicManager.java  | 17 +++---
 .../internals/InternalTopicManagerTest.java        | 68 +++++++++++++++++++---
 2 files changed, 70 insertions(+), 15 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index 695492122a3..288aaa6aa8c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -57,9 +57,12 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
+
+
 public class InternalTopicManager {
     private final static String BUG_ERROR_MESSAGE = "This indicates a bug. " +
         "Please report at https://issues.apache.org/jira/projects/KAFKA/issues or to the dev-mailing list (https://kafka.apache.org/contact).";
@@ -466,6 +469,9 @@ public class InternalTopicManager {
                                                 topicName)
                                         );
                                     }
+                                } else if (cause instanceof TimeoutException) {
+                                    log.error("Creating topic {} timed out.\n" +
+                                            "Error message was: {}", topicName, cause.toString());
                                 } else {
                                     throw new StreamsException(
                                             String.format("Could not create topic %s.", topicName),
@@ -473,9 +479,6 @@ public class InternalTopicManager {
                                     );
                                 }
                             }
-                        } catch (final TimeoutException retriableException) {
-                            log.error("Creating topic {} timed out.\n" +
-                                    "Error message was: {}", topicName, retriableException.toString());
                         }
                     }
                 }
@@ -538,15 +541,15 @@ public class InternalTopicManager {
                     tempUnknownTopics.add(topicName);
                     log.debug("The leader of topic {} is not available.\n" +
                         "Error message was: {}", topicName, cause.toString());
+                } else if (cause instanceof TimeoutException) {
+                    tempUnknownTopics.add(topicName);
+                    log.debug("Describing topic {} (to get number of partitions) timed out.\n" +
+                            "Error message was: {}", topicName, cause.toString());
                 } else {
                     log.error("Unexpected error during topic description for {}.\n" +
                         "Error message was: {}", topicName, cause.toString());
                     throw new StreamsException(String.format("Could not create topic %s.", topicName), cause);
                 }
-            } catch (final TimeoutException retriableException) {
-                tempUnknownTopics.add(topicName);
-                log.debug("Describing topic {} (to get number of partitions) timed out.\n" +
-                    "Error message was: {}", topicName, retriableException.toString());
             }
         }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
index 9e4964b5647..c3b0093e4c2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
@@ -15,7 +15,6 @@
  * limitations under the License.
  */
 package org.apache.kafka.streams.processor.internals;
-
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.Config;
 import org.apache.kafka.clients.admin.ConfigEntry;
@@ -58,16 +57,17 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
-
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkSet;
@@ -305,6 +305,35 @@ public class InternalTopicManagerTest {
         );
     }
 
+    @Test
+    public void shouldThrowTimeoutExceptionIfGetNumPartitionsHasTopicDescriptionTimeout() {
+        mockAdminClient.timeoutNextRequest(1);
+
+        final InternalTopicManager internalTopicManager =
+                new InternalTopicManager(time, mockAdminClient, new StreamsConfig(config));
+        try {
+            final Set<String> topic1set = new HashSet<String>(Arrays.asList(topic1));
+            final Set<String> topic2set = new HashSet<String>(Arrays.asList(topic2));
+
+            internalTopicManager.getNumPartitions(topic1set, topic2set);
+
+        } catch (final TimeoutException expected) {
+            assertEquals(TimeoutException.class, expected.getCause().getClass());
+        }
+
+        mockAdminClient.timeoutNextRequest(1);
+
+        try {
+            final Set<String> topic1set = new HashSet<String>(Arrays.asList(topic1));
+            final Set<String> topic2set = new HashSet<String>(Arrays.asList(topic2));
+
+            internalTopicManager.getNumPartitions(topic1set, topic2set);
+
+        } catch (final TimeoutException expected) {
+            assertEquals(TimeoutException.class, expected.getCause().getClass());
+        }
+    }
+
     @Test
     public void shouldThrowWhenCreateTopicsThrowsUnexpectedException() {
         final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);
@@ -764,18 +793,24 @@ public class InternalTopicManagerTest {
 
     @Test
     public void shouldExhaustRetriesOnTimeoutExceptionForMakeReady() {
-        mockAdminClient.timeoutNextRequest(1);
+        mockAdminClient.timeoutNextRequest(5);
+
+        final InternalTopicManager topicManager = new InternalTopicManager(
+                new AutoAdvanceMockTime(time),
+                mockAdminClient,
+                new StreamsConfig(config)
+        );
 
         final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic1, Collections.emptyMap());
         internalTopicConfig.setNumberOfPartitions(1);
         try {
-            internalTopicManager.makeReady(Collections.singletonMap(topic1, internalTopicConfig));
-            fail("Should have thrown StreamsException.");
-        } catch (final StreamsException expected) {
-            assertEquals(TimeoutException.class, expected.getCause().getClass());
+            topicManager.makeReady(Collections.singletonMap(topic1, internalTopicConfig));
+            fail("Should have thrown TimeoutException.");
+        } catch (final TimeoutException expected) {
+            assertThat(expected.getMessage(), is("Could not create topics within 50 milliseconds. " +
+                    "This can happen if the Kafka cluster is temporarily not available."));
         }
     }
-
     @Test
     public void shouldLogWhenTopicNotFoundAndNotThrowException() {
         mockAdminClient.addTopic(
@@ -1731,6 +1766,21 @@ public class InternalTopicManagerTest {
         return internalTopicConfig;
     }
 
+    private static class AutoAdvanceMockTime extends MockTime {
+        private final MockTime time;
+
+        private AutoAdvanceMockTime(final MockTime time) {
+            this.time = time;
+        }
+
+        @Override
+        public long milliseconds() {
+            final long ms = time.milliseconds();
+            time.sleep(10L);
+            return ms;
+        }
+    }
+
     private static class MockCreateTopicsResult extends CreateTopicsResult {
         MockCreateTopicsResult(final Map<String, KafkaFuture<TopicMetadataAndConfig>> futures) {
             super(futures);
@@ -1754,4 +1804,6 @@ public class InternalTopicManagerTest {
             super(futures);
         }
     }
+
+
 }