You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/12/13 15:40:51 UTC
[kafka] branch trunk updated: KAFKA-7655 Metadata spamming requests
from Kafka Streams under some circumstances, potential DOS (#5929)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new dffce6e KAFKA-7655 Metadata spamming requests from Kafka Streams under some circumstances, potential DOS (#5929)
dffce6e is described below
commit dffce6e7ae047a21fb40ec2c27d3d1f562a511a3
Author: Pasquale Vazzana <pa...@gmail.com>
AuthorDate: Thu Dec 13 15:40:39 2018 +0000
KAFKA-7655 Metadata spamming requests from Kafka Streams under some circumstances, potential DOS (#5929)
Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
.../org/apache/kafka/streams/StreamsConfig.java | 1 +
.../processor/internals/InternalTopicManager.java | 37 +++++++++++++++++-----
.../apache/kafka/streams/StreamsConfigTest.java | 5 ++-
3 files changed, 34 insertions(+), 9 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 90523d2..1aabf54 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -1000,6 +1000,7 @@ public class StreamsConfig extends AbstractConfig {
// add admin retries configs for creating topics
final AdminClientConfig adminClientDefaultConfig = new AdminClientConfig(getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX, AdminClientConfig.configNames()));
consumerProps.put(adminClientPrefix(AdminClientConfig.RETRIES_CONFIG), adminClientDefaultConfig.getInt(AdminClientConfig.RETRIES_CONFIG));
+ consumerProps.put(adminClientPrefix(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG), adminClientDefaultConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG));
// verify that producer batch config is no larger than segment size, then add topic configs required for creating topics
final Map<String, Object> topicProps = originalsWithPrefix(TOPIC_PREFIX, false);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index 6159ee2..7e35126 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -57,6 +57,7 @@ public class InternalTopicManager {
private final AdminClient adminClient;
private final int retries;
+ private final long retryBackOffMs;
public InternalTopicManager(final AdminClient adminClient,
final StreamsConfig streamsConfig) {
@@ -67,7 +68,9 @@ public class InternalTopicManager {
replicationFactor = streamsConfig.getInt(StreamsConfig.REPLICATION_FACTOR_CONFIG).shortValue();
windowChangeLogAdditionalRetention = streamsConfig.getLong(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG);
- retries = new InternalAdminClientConfig(streamsConfig.getAdminConfigs("dummy")).getInt(AdminClientConfig.RETRIES_CONFIG);
+ final InternalAdminClientConfig dummyAdmin = new InternalAdminClientConfig(streamsConfig.getAdminConfigs("dummy"));
+ retries = dummyAdmin.getInt(AdminClientConfig.RETRIES_CONFIG);
+ retryBackOffMs = dummyAdmin.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG);
log.debug("Configs:" + Utils.NL,
"\t{} = {}" + Utils.NL,
@@ -115,17 +118,22 @@ public class InternalTopicManager {
// TODO: KAFKA-6928. should not need retries in the outer caller as it will be retried internally in admin client
int remainingRetries = retries;
+ boolean retryBackOff = false;
boolean retry;
do {
retry = false;
final CreateTopicsResult createTopicsResult = adminClient.createTopics(newTopics);
- final Set<String> createTopicNames = new HashSet<>();
+ final Set<String> createdTopicNames = new HashSet<>();
for (final Map.Entry<String, KafkaFuture<Void>> createTopicResult : createTopicsResult.values().entrySet()) {
try {
+ if (retryBackOff) {
+ retryBackOff = false;
+ Thread.sleep(retryBackOffMs);
+ }
createTopicResult.getValue().get();
- createTopicNames.add(createTopicResult.getKey());
+ createdTopicNames.add(createTopicResult.getKey());
} catch (final ExecutionException couldNotCreateTopic) {
final Throwable cause = couldNotCreateTopic.getCause();
final String topicName = createTopicResult.getKey();
@@ -135,10 +143,23 @@ public class InternalTopicManager {
log.debug("Could not get number of partitions for topic {} due to timeout. " +
"Will try again (remaining retries {}).", topicName, remainingRetries - 1);
} else if (cause instanceof TopicExistsException) {
- createTopicNames.add(createTopicResult.getKey());
- log.info("Topic {} exist already: {}",
- topicName,
- couldNotCreateTopic.toString());
+ // This topic didn't exist earlier, it might be marked for deletion or it might differ
+ // from the desired setup. It needs re-validation.
+ final Map<String, Integer> existingTopicPartition = getNumPartitions(Collections.singleton(topicName));
+
+ if (existingTopicPartition.containsKey(topicName)
+ && validateTopicPartitions(Collections.singleton(topics.get(topicName)), existingTopicPartition).isEmpty()) {
+ createdTopicNames.add(createTopicResult.getKey());
+ log.info("Topic {} exists already and has the right number of partitions: {}",
+ topicName,
+ couldNotCreateTopic.toString());
+ } else {
+ retry = true;
+ retryBackOff = true;
+ log.info("Could not create topic {}. Topic is probably marked for deletion (number of partitions is unknown).\n" +
+ "Will retry to create this topic in {} ms (to let broker finish async delete operation first).\n" +
+ "Error message was: {}", topicName, retryBackOffMs, couldNotCreateTopic.toString());
+ }
} else {
throw new StreamsException(String.format("Could not create topic %s.", topicName),
couldNotCreateTopic);
@@ -151,7 +172,7 @@ public class InternalTopicManager {
}
if (retry) {
- newTopics.removeIf(newTopic -> createTopicNames.contains(newTopic.name()));
+ newTopics.removeIf(newTopic -> createdTopicNames.contains(newTopic.name()));
continue;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 7b34615..569867d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -129,9 +129,11 @@ public class StreamsConfigTest {
}
@Test
- public void consumerConfigMustUseAdminClientConfigForRetries() {
+ public void consumerConfigShouldContainAdminClientConfigsForRetriesAndRetryBackOffMsWithAdminPrefix() {
props.put(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG), 20);
+ props.put(StreamsConfig.adminClientPrefix(StreamsConfig.RETRY_BACKOFF_MS_CONFIG), 200L);
props.put(StreamsConfig.RETRIES_CONFIG, 10);
+ props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 100L);
final StreamsConfig streamsConfig = new StreamsConfig(props);
final String groupId = "example-application";
@@ -139,6 +141,7 @@ public class StreamsConfigTest {
final Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId);
assertEquals(20, returnedProps.get(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG)));
+ assertEquals(200L, returnedProps.get(StreamsConfig.adminClientPrefix(StreamsConfig.RETRY_BACKOFF_MS_CONFIG)));
}
@Test