You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/11/15 21:23:53 UTC
[kafka] branch trunk updated: MINOR: Code cleanup in
StreamsResetter (#5891)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9346660 MINOR: Code cleanup in StreamsResetter (#5891)
9346660 is described below
commit 93466602d80c0c8dba7544964f48feb49589b0b5
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Thu Nov 15 13:23:39 2018 -0800
MINOR: Code cleanup in StreamsResetter (#5891)
Reviewers: Bill Bejeck <bi...@confluent.io>, John Roesler <jo...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
.../main/scala/kafka/tools/StreamsResetter.java | 49 +++++++++++++---------
1 file changed, 29 insertions(+), 20 deletions(-)
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 09d3b9e..967901c 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -23,11 +23,11 @@ import joptsimple.OptionSpec;
import joptsimple.OptionSpecBuilder;
import kafka.utils.CommandLineUtils;
import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
-import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.DescribeConsumerGroupsOptions;
import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -39,15 +39,14 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
-import javax.xml.datatype.DatatypeConfigurationException;
import javax.xml.datatype.DatatypeFactory;
import javax.xml.datatype.Duration;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
-import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
@@ -111,7 +110,7 @@ public class StreamsResetter {
public int run(final String[] args,
final Properties config) {
- int exitCode = EXIT_CODE_SUCCESS;
+ int exitCode;
KafkaAdminClient kafkaAdminClient = null;
@@ -157,10 +156,10 @@ public class StreamsResetter {
private void validateNoActiveConsumers(final String groupId,
final AdminClient adminClient) throws ExecutionException, InterruptedException {
- final DescribeConsumerGroupsResult describeResult = adminClient.describeConsumerGroups(Arrays.asList(groupId),
+ final DescribeConsumerGroupsResult describeResult = adminClient.describeConsumerGroups(Collections.singleton(groupId),
(new DescribeConsumerGroupsOptions()).timeoutMs(10 * 1000));
final List<MemberDescription> members =
- new ArrayList<MemberDescription>(describeResult.describedGroups().get(groupId).get().members());
+ new ArrayList<>(describeResult.describedGroups().get(groupId).get().members());
if (!members.isEmpty()) {
throw new IllegalStateException("Consumer group '" + groupId + "' is still active "
+ "and has following members: " + members + ". "
@@ -216,7 +215,7 @@ public class StreamsResetter {
executeOption = optionParser.accepts("execute", "Execute the command.");
dryRunOption = optionParser.accepts("dry-run", "Display the actions that would be performed without executing the reset commands.");
- // TODO: deprecated in 1.0; can be removed eventually
+ // TODO: deprecated in 1.0; can be removed eventually: https://issues.apache.org/jira/browse/KAFKA-7606
optionParser.accepts("zookeeper", "Zookeeper option is deprecated by bootstrap.servers, as the reset tool would no longer access Zookeeper directly.");
try {
@@ -230,7 +229,7 @@ public class StreamsResetter {
CommandLineUtils.printUsageAndDie(optionParser, "Only one of --dry-run and --execute can be specified");
}
- scala.collection.immutable.HashSet<OptionSpec<?>> allScenarioOptions = new scala.collection.immutable.HashSet<>();
+ final scala.collection.immutable.HashSet<OptionSpec<?>> allScenarioOptions = new scala.collection.immutable.HashSet<>();
allScenarioOptions.$plus(toOffsetOption);
allScenarioOptions.$plus(toDatetimeOption);
allScenarioOptions.$plus(byDurationOption);
@@ -315,7 +314,7 @@ public class StreamsResetter {
config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
try (final KafkaConsumer<byte[], byte[]> client = new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
- Collection<TopicPartition> partitions = topicsToSubscribe.stream().map(client::partitionsFor)
+ final Collection<TopicPartition> partitions = topicsToSubscribe.stream().map(client::partitionsFor)
.flatMap(Collection::stream)
.map(info -> new TopicPartition(info.topic(), info.partition()))
.collect(Collectors.toList());
@@ -407,7 +406,9 @@ public class StreamsResetter {
}
// visible for testing
- public void resetOffsetsFromResetPlan(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, Map<TopicPartition, Long> topicPartitionsAndOffset) {
+ public void resetOffsetsFromResetPlan(final Consumer<byte[], byte[]> client,
+ final Set<TopicPartition> inputTopicPartitions,
+ final Map<TopicPartition, Long> topicPartitionsAndOffset) {
final Map<TopicPartition, Long> endOffsets = client.endOffsets(inputTopicPartitions);
final Map<TopicPartition, Long> beginningOffsets = client.beginningOffsets(inputTopicPartitions);
@@ -419,12 +420,14 @@ public class StreamsResetter {
}
}
- private Map<TopicPartition, Long> getTopicPartitionOffsetFromResetPlan(String resetPlanPath) throws IOException, ParseException {
+ private Map<TopicPartition, Long> getTopicPartitionOffsetFromResetPlan(final String resetPlanPath) throws IOException, ParseException {
final String resetPlanCsv = Utils.readFileAsString(resetPlanPath);
return parseResetPlan(resetPlanCsv);
}
- private void resetByDuration(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, Duration duration) throws DatatypeConfigurationException {
+ private void resetByDuration(final Consumer<byte[], byte[]> client,
+ final Set<TopicPartition> inputTopicPartitions,
+ final Duration duration) {
final Date now = new Date();
duration.negate().addTo(now);
final long timestamp = now.getTime();
@@ -441,7 +444,9 @@ public class StreamsResetter {
}
}
- private void resetToDatetime(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, Long timestamp) {
+ private void resetToDatetime(final Consumer<byte[], byte[]> client,
+ final Set<TopicPartition> inputTopicPartitions,
+ final Long timestamp) {
final Map<TopicPartition, Long> topicPartitionsAndTimes = new HashMap<>(inputTopicPartitions.size());
for (final TopicPartition topicPartition : inputTopicPartitions) {
topicPartitionsAndTimes.put(topicPartition, timestamp);
@@ -455,7 +460,9 @@ public class StreamsResetter {
}
// visible for testing
- public void shiftOffsetsBy(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, long shiftBy) {
+ public void shiftOffsetsBy(final Consumer<byte[], byte[]> client,
+ final Set<TopicPartition> inputTopicPartitions,
+ final long shiftBy) {
final Map<TopicPartition, Long> endOffsets = client.endOffsets(inputTopicPartitions);
final Map<TopicPartition, Long> beginningOffsets = client.beginningOffsets(inputTopicPartitions);
@@ -475,7 +482,9 @@ public class StreamsResetter {
}
// visible for testing
- public void resetOffsetsTo(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, Long offset) {
+ public void resetOffsetsTo(final Consumer<byte[], byte[]> client,
+ final Set<TopicPartition> inputTopicPartitions,
+ final Long offset) {
final Map<TopicPartition, Long> endOffsets = client.endOffsets(inputTopicPartitions);
final Map<TopicPartition, Long> beginningOffsets = client.beginningOffsets(inputTopicPartitions);
@@ -511,7 +520,7 @@ public class StreamsResetter {
try {
final Date date = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse(timestamp);
return date.getTime();
- } catch (ParseException e) {
+ } catch (final ParseException e) {
final Date date = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX").parse(timestamp);
return date.getTime();
}
@@ -574,7 +583,7 @@ public class StreamsResetter {
private void maybeDeleteInternalTopics(final KafkaAdminClient adminClient, final boolean dryRun) {
System.out.println("Deleting all internal/auto-created topics for application " + options.valueOf(applicationIdOption));
- List<String> topicsToDelete = new ArrayList<>();
+ final List<String> topicsToDelete = new ArrayList<>();
for (final String listing : allTopics) {
if (isInternalTopic(listing)) {
if (!dryRun) {
@@ -600,7 +609,7 @@ public class StreamsResetter {
for (final Map.Entry<String, KafkaFuture<Void>> entry : results.entrySet()) {
try {
entry.getValue().get(30, TimeUnit.SECONDS);
- } catch (Exception e) {
+ } catch (final Exception e) {
System.err.println("ERROR: deleting topic " + entry.getKey());
e.printStackTrace(System.err);
hasDeleteErrors = true;
@@ -617,7 +626,7 @@ public class StreamsResetter {
&& (topicName.endsWith("-changelog") || topicName.endsWith("-repartition"));
}
- private void printHelp(OptionParser parser) throws IOException {
+ private void printHelp(final OptionParser parser) throws IOException {
System.err.println("The Streams Reset Tool allows you to quickly reset an application in order to reprocess "
+ "its data from scratch.\n"
+ "* This tool resets offsets of input topics to the earliest available offset and it skips to the end of "