You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/12/13 15:40:51 UTC

[kafka] branch trunk updated: KAFKA-7655 Metadata spamming requests from Kafka Streams under some circumstances, potential DOS (#5929)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dffce6e  KAFKA-7655 Metadata spamming requests from Kafka Streams under some circumstances, potential DOS (#5929)
dffce6e is described below

commit dffce6e7ae047a21fb40ec2c27d3d1f562a511a3
Author: Pasquale Vazzana <pa...@gmail.com>
AuthorDate: Thu Dec 13 15:40:39 2018 +0000

    KAFKA-7655 Metadata spamming requests from Kafka Streams under some circumstances, potential DOS (#5929)
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
 .../org/apache/kafka/streams/StreamsConfig.java    |  1 +
 .../processor/internals/InternalTopicManager.java  | 37 +++++++++++++++++-----
 .../apache/kafka/streams/StreamsConfigTest.java    |  5 ++-
 3 files changed, 34 insertions(+), 9 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 90523d2..1aabf54 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -1000,6 +1000,7 @@ public class StreamsConfig extends AbstractConfig {
         // add admin retries configs for creating topics
         final AdminClientConfig adminClientDefaultConfig = new AdminClientConfig(getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX, AdminClientConfig.configNames()));
         consumerProps.put(adminClientPrefix(AdminClientConfig.RETRIES_CONFIG), adminClientDefaultConfig.getInt(AdminClientConfig.RETRIES_CONFIG));
+        consumerProps.put(adminClientPrefix(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG), adminClientDefaultConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG));
 
         // verify that producer batch config is no larger than segment size, then add topic configs required for creating topics
         final Map<String, Object> topicProps = originalsWithPrefix(TOPIC_PREFIX, false);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index 6159ee2..7e35126 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -57,6 +57,7 @@ public class InternalTopicManager {
     private final AdminClient adminClient;
 
     private final int retries;
+    private final long retryBackOffMs;
 
     public InternalTopicManager(final AdminClient adminClient,
                                 final StreamsConfig streamsConfig) {
@@ -67,7 +68,9 @@ public class InternalTopicManager {
 
         replicationFactor = streamsConfig.getInt(StreamsConfig.REPLICATION_FACTOR_CONFIG).shortValue();
         windowChangeLogAdditionalRetention = streamsConfig.getLong(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG);
-        retries = new InternalAdminClientConfig(streamsConfig.getAdminConfigs("dummy")).getInt(AdminClientConfig.RETRIES_CONFIG);
+        final InternalAdminClientConfig dummyAdmin = new InternalAdminClientConfig(streamsConfig.getAdminConfigs("dummy"));
+        retries = dummyAdmin.getInt(AdminClientConfig.RETRIES_CONFIG);
+        retryBackOffMs = dummyAdmin.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG);
 
         log.debug("Configs:" + Utils.NL,
             "\t{} = {}" + Utils.NL,
@@ -115,17 +118,22 @@ public class InternalTopicManager {
 
             // TODO: KAFKA-6928. should not need retries in the outer caller as it will be retried internally in admin client
             int remainingRetries = retries;
+            boolean retryBackOff = false;
             boolean retry;
             do {
                 retry = false;
 
                 final CreateTopicsResult createTopicsResult = adminClient.createTopics(newTopics);
 
-                final Set<String> createTopicNames = new HashSet<>();
+                final Set<String> createdTopicNames = new HashSet<>();
                 for (final Map.Entry<String, KafkaFuture<Void>> createTopicResult : createTopicsResult.values().entrySet()) {
                     try {
+                        if (retryBackOff) {
+                            retryBackOff = false;
+                            Thread.sleep(retryBackOffMs);
+                        }
                         createTopicResult.getValue().get();
-                        createTopicNames.add(createTopicResult.getKey());
+                        createdTopicNames.add(createTopicResult.getKey());
                     } catch (final ExecutionException couldNotCreateTopic) {
                         final Throwable cause = couldNotCreateTopic.getCause();
                         final String topicName = createTopicResult.getKey();
@@ -135,10 +143,23 @@ public class InternalTopicManager {
                             log.debug("Could not get number of partitions for topic {} due to timeout. " +
                                 "Will try again (remaining retries {}).", topicName, remainingRetries - 1);
                         } else if (cause instanceof TopicExistsException) {
-                            createTopicNames.add(createTopicResult.getKey());
-                            log.info("Topic {} exist already: {}",
-                                topicName,
-                                couldNotCreateTopic.toString());
+                            // This topic didn't exist earlier, it might be marked for deletion or it might differ
+                            // from the desired setup. It needs re-validation.
+                            final Map<String, Integer> existingTopicPartition = getNumPartitions(Collections.singleton(topicName));
+
+                            if (existingTopicPartition.containsKey(topicName)
+                                    && validateTopicPartitions(Collections.singleton(topics.get(topicName)), existingTopicPartition).isEmpty()) {
+                                createdTopicNames.add(createTopicResult.getKey());
+                                log.info("Topic {} exists already and has the right number of partitions: {}",
+                                        topicName,
+                                        couldNotCreateTopic.toString());
+                            } else {
+                                retry = true;
+                                retryBackOff = true;
+                                log.info("Could not create topic {}. Topic is probably marked for deletion (number of partitions is unknown).\n" +
+                                        "Will retry to create this topic in {} ms (to let broker finish async delete operation first).\n" +
+                                        "Error message was: {}", topicName, retryBackOffMs, couldNotCreateTopic.toString());
+                            }
                         } else {
                             throw new StreamsException(String.format("Could not create topic %s.", topicName),
                                 couldNotCreateTopic);
@@ -151,7 +172,7 @@ public class InternalTopicManager {
                 }
 
                 if (retry) {
-                    newTopics.removeIf(newTopic -> createTopicNames.contains(newTopic.name()));
+                    newTopics.removeIf(newTopic -> createdTopicNames.contains(newTopic.name()));
 
                     continue;
                 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 7b34615..569867d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -129,9 +129,11 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void consumerConfigMustUseAdminClientConfigForRetries() {
+    public void consumerConfigShouldContainAdminClientConfigsForRetriesAndRetryBackOffMsWithAdminPrefix() {
         props.put(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG), 20);
+        props.put(StreamsConfig.adminClientPrefix(StreamsConfig.RETRY_BACKOFF_MS_CONFIG), 200L);
         props.put(StreamsConfig.RETRIES_CONFIG, 10);
+        props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 100L);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
 
         final String groupId = "example-application";
@@ -139,6 +141,7 @@ public class StreamsConfigTest {
         final Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId);
 
         assertEquals(20, returnedProps.get(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG)));
+        assertEquals(200L, returnedProps.get(StreamsConfig.adminClientPrefix(StreamsConfig.RETRY_BACKOFF_MS_CONFIG)));
     }
 
     @Test