You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/05/29 01:01:45 UTC
[kafka] branch trunk updated: MINOR: remove unnecessary timeout for
admin request (#8738)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 36ca33f MINOR: remove unnecessary timeout for admin request (#8738)
36ca33f is described below
commit 36ca33f9cf1902399afe7494f88c6bbd16022b56
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Thu May 28 18:01:01 2020 -0700
MINOR: remove unnecessary timeout for admin request (#8738)
Reviewers: John Roesler <jo...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
.../org/apache/kafka/streams/KafkaStreams.java | 5 ++--
.../kafka/streams/kstream/CogroupedKStream.java | 2 +-
.../org/apache/kafka/streams/kstream/KStream.java | 2 +-
.../streams/processor/internals/ClientUtils.java | 30 ++++++++++------------
.../processor/internals/InternalTopicManager.java | 13 +++-------
.../streams/processor/internals/StreamThread.java | 1 +
.../internals/StreamsPartitionAssignor.java | 5 +---
.../assignment/AssignorConfiguration.java | 5 ----
.../integration/TaskAssignorIntegrationTest.java | 8 ------
.../processor/internals/ClientUtilsTest.java | 22 ++++++++--------
.../internals/StreamsPartitionAssignorTest.java | 12 ---------
11 files changed, 34 insertions(+), 71 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 7a7e4df..263bf9a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -92,7 +92,7 @@ import java.util.concurrent.TimeUnit;
import static org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG;
import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
-import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsetsWithoutTimeout;
+import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsets;
/**
* A Kafka client that allows for performing continuous computation on input coming from one or more input topics and
@@ -1230,6 +1230,7 @@ public class KafkaStreams implements AutoCloseable {
* of invocation to once every few seconds.
*
* @return map of store names to another map of partition to {@link LagInfo}s
+ * @throws StreamsException if the admin client request throws exception
*/
public Map<String, Map<Integer, LagInfo>> allLocalStorePartitionLags() {
final Map<String, Map<Integer, LagInfo>> localStorePartitionLags = new TreeMap<>();
@@ -1246,7 +1247,7 @@ public class KafkaStreams implements AutoCloseable {
}
log.debug("Current changelog positions: {}", allChangelogPositions);
- final Map<TopicPartition, ListOffsetsResultInfo> allEndOffsets = fetchEndOffsetsWithoutTimeout(allPartitions, adminClient);
+ final Map<TopicPartition, ListOffsetsResultInfo> allEndOffsets = fetchEndOffsets(allPartitions, adminClient);
log.debug("Current end offsets :{}", allEndOffsets);
for (final Map.Entry<TopicPartition, ListOffsetsResultInfo> entry : allEndOffsets.entrySet()) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
index aba2f6b..59e2fe7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
@@ -45,7 +45,7 @@ public interface CogroupedKStream<K, VOut> {
* streams of this {@code CogroupedKStream}.
* If this is not the case, you would need to call {@link KStream#repartition(Repartitioned)} before
* {@link KStream#groupByKey() grouping} the {@link KStream} and specify the "correct" number of
- * partitions via {@link Repartitioned) parameter.
+ * partitions via {@link Repartitioned} parameter.
* <p>
* The specified {@link Aggregator} is applied in the actual {@link #aggregate(Initializer) aggregation} step for
* each input record and computes a new aggregate using the current aggregate (or for the very first record per key
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 8f99c9d..bcc911c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -3324,7 +3324,7 @@ public interface KStream<K, V> {
* }
* }</pre>
* Even if any upstream operation was key-changing, no auto-repartition is triggered.
- * If repartitioning is required, a call to {@link #repartition() should be performed before
+ * If repartitioning is required, a call to {@link #repartition()} should be performed before
* {@code transformValues()}.
* <p>
* Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
index 33aee64..613565d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.Consumer;
@@ -24,25 +25,29 @@ import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
-import java.time.Duration;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ClientUtils {
-
private static final Logger LOG = LoggerFactory.getLogger(ClientUtils.class);
+ public static final class QuietAdminClientConfig extends AdminClientConfig {
+ QuietAdminClientConfig(final StreamsConfig streamsConfig) {
+ // If you just want to look up admin configs, you don't care about the clientId
+ super(streamsConfig.getAdminConfigs("dummy"), false);
+ }
+ }
+
// currently admin client is shared among all threads
public static String getSharedAdminClientId(final String clientId) {
return clientId + "-admin";
@@ -90,25 +95,16 @@ public class ClientUtils {
return result;
}
- public static Map<TopicPartition, ListOffsetsResultInfo> fetchEndOffsetsWithoutTimeout(final Collection<TopicPartition> partitions,
- final Admin adminClient) {
- return fetchEndOffsets(partitions, adminClient, null);
- }
-
public static Map<TopicPartition, ListOffsetsResultInfo> fetchEndOffsets(final Collection<TopicPartition> partitions,
- final Admin adminClient,
- final Duration timeout) {
+ final Admin adminClient) {
final Map<TopicPartition, ListOffsetsResultInfo> endOffsets;
try {
final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> future = adminClient.listOffsets(
partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest())))
.all();
- if (timeout == null) {
- endOffsets = future.get();
- } else {
- endOffsets = future.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
- }
- } catch (final TimeoutException | RuntimeException | InterruptedException | ExecutionException e) {
+ endOffsets = future.get();
+
+ } catch (final RuntimeException | InterruptedException | ExecutionException e) {
LOG.warn("listOffsets request failed.", e);
throw new StreamsException("Unable to obtain end offsets from kafka", e);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index a05aa48..dbbdb22 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.internals.ClientUtils.QuietAdminClientConfig;
import org.slf4j.Logger;
import java.util.HashMap;
@@ -44,12 +45,6 @@ public class InternalTopicManager {
private final static String INTERRUPTED_ERROR_MESSAGE = "Thread got interrupted. This indicates a bug. " +
"Please report at https://issues.apache.org/jira/projects/KAFKA or dev-mailing list (https://kafka.apache.org/contact).";
- private static final class InternalAdminClientConfig extends AdminClientConfig {
- private InternalAdminClientConfig(final Map<?, ?> props) {
- super(props, false);
- }
- }
-
private final Logger log;
private final long windowChangeLogAdditionalRetention;
private final Map<String, String> defaultTopicConfigs = new HashMap<>();
@@ -68,9 +63,9 @@ public class InternalTopicManager {
replicationFactor = streamsConfig.getInt(StreamsConfig.REPLICATION_FACTOR_CONFIG).shortValue();
windowChangeLogAdditionalRetention = streamsConfig.getLong(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG);
- final InternalAdminClientConfig dummyAdmin = new InternalAdminClientConfig(streamsConfig.getAdminConfigs("dummy"));
- retries = dummyAdmin.getInt(AdminClientConfig.RETRIES_CONFIG);
- retryBackOffMs = dummyAdmin.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG);
+ final QuietAdminClientConfig adminConfigs = new QuietAdminClientConfig(streamsConfig);
+ retries = adminConfigs.getInt(AdminClientConfig.RETRIES_CONFIG);
+ retryBackOffMs = adminConfigs.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG);
log.debug("Configs:" + Utils.NL +
"\t{} = {}" + Utils.NL +
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index fb73a66..6e7a3aa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -551,6 +551,7 @@ public class StreamThread extends Thread {
if (nextProbingRebalanceMs.get() < time.milliseconds()) {
log.info("Triggering the followup rebalance scheduled for {} ms.", nextProbingRebalanceMs.get());
mainConsumer.enforceRebalance();
+ nextProbingRebalanceMs.set(Long.MAX_VALUE);
}
} catch (final TaskCorruptedException e) {
log.warn("Detected the states of tasks " + e.corruptedTaskWithChangelogs() + " are corrupted. " +
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 7e207d2..a6fbdfb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -48,7 +48,6 @@ import org.apache.kafka.streams.state.HostInfo;
import org.slf4j.Logger;
import java.nio.ByteBuffer;
-import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -168,7 +167,6 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
protected int usedSubscriptionMetadataVersion = LATEST_SUPPORTED_VERSION;
private Admin adminClient;
- private int adminClientTimeout;
private InternalTopicManager internalTopicManager;
private CopartitionedTopicsEnforcer copartitionedTopicsEnforcer;
private RebalanceProtocol rebalanceProtocol;
@@ -200,7 +198,6 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
partitionGrouper = assignorConfiguration.partitionGrouper();
userEndPoint = assignorConfiguration.userEndPoint();
adminClient = assignorConfiguration.adminClient();
- adminClientTimeout = assignorConfiguration.adminClientTimeout();
internalTopicManager = assignorConfiguration.internalTopicManager();
copartitionedTopicsEnforcer = assignorConfiguration.copartitionedTopicsEnforcer();
rebalanceProtocol = assignorConfiguration.rebalanceProtocol();
@@ -773,7 +770,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
allNewlyCreatedChangelogPartitions.removeAll(allPreexistingChangelogPartitions);
final Map<TopicPartition, ListOffsetsResultInfo> endOffsets =
- fetchEndOffsets(allPreexistingChangelogPartitions, adminClient, Duration.ofMillis(adminClientTimeout));
+ fetchEndOffsets(allPreexistingChangelogPartitions, adminClient);
allTaskEndOffsetSums = computeEndOffsetSumsByTask(endOffsets, changelogsByStatefulTask, allNewlyCreatedChangelogPartitions);
fetchEndOffsetsSuccessful = true;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
index 84604ee..2d510a6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.processor.internals.assignment;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigDef;
@@ -296,10 +295,6 @@ public final class AssignorConfiguration {
return adminClient;
}
- public int adminClientTimeout() {
- return streamsConfig.getInt(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG);
- }
-
public InternalTopicManager internalTopicManager() {
return new InternalTopicManager(adminClient, streamsConfig);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
index 38e391d..4d3c947 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.integration;
-import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.streams.KafkaStreams;
@@ -95,7 +94,6 @@ public class TaskAssignorIntegrationTest {
mkEntry(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, "6"),
mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "7"),
mkEntry(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, "480000"),
- mkEntry(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 9),
mkEntry(StreamsConfig.InternalConfig.ASSIGNMENT_LISTENER, configuredAssignmentListener),
mkEntry(StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, MyTaskAssignor.class.getName())
)
@@ -130,11 +128,6 @@ public class TaskAssignorIntegrationTest {
final AssignorConfiguration.AssignmentListener actualAssignmentListener =
(AssignorConfiguration.AssignmentListener) assignmentListenerField.get(streamsPartitionAssignor);
- final Field adminClientTimeoutField = StreamsPartitionAssignor.class.getDeclaredField("adminClientTimeout");
- adminClientTimeoutField.setAccessible(true);
- final int adminClientTimeout =
- (int) adminClientTimeoutField.get(streamsPartitionAssignor);
-
final Field taskAssignorSupplierField = StreamsPartitionAssignor.class.getDeclaredField("taskAssignorSupplier");
taskAssignorSupplierField.setAccessible(true);
final Supplier<TaskAssignor> taskAssignorSupplier =
@@ -146,7 +139,6 @@ public class TaskAssignorIntegrationTest {
assertThat(configs.maxWarmupReplicas, is(7));
assertThat(configs.probingRebalanceIntervalMs, is(480000L));
assertThat(actualAssignmentListener, sameInstance(configuredAssignmentListener));
- assertThat(adminClientTimeout, is(9));
assertThat(taskAssignor, instanceOf(MyTaskAssignor.class));
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ClientUtilsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ClientUtilsTest.java
index 96b6b62..6d9d8df 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ClientUtilsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ClientUtilsTest.java
@@ -18,22 +18,19 @@ package org.apache.kafka.streams.processor.internals;
import static java.util.Collections.emptyList;
import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsets;
-import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsetsWithoutTimeout;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.junit.Assert.assertThrows;
-import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.streams.errors.StreamsException;
import org.easymock.EasyMock;
import org.junit.Test;
@@ -45,12 +42,12 @@ public class ClientUtilsTest {
final Admin adminClient = EasyMock.createMock(AdminClient.class);
EasyMock.expect(adminClient.listOffsets(EasyMock.anyObject())).andThrow(new RuntimeException());
replay(adminClient);
- assertThrows(StreamsException.class, () -> fetchEndOffsetsWithoutTimeout(emptyList(), adminClient));
+ assertThrows(StreamsException.class, () -> fetchEndOffsets(emptyList(), adminClient));
verify(adminClient);
}
@Test
- public void fetchEndOffsetsShouldRethrowInterruptedExceptionAsStreamsException() throws InterruptedException, ExecutionException {
+ public void fetchEndOffsetsShouldRethrowInterruptedExceptionAsStreamsException() throws Exception {
final Admin adminClient = EasyMock.createMock(AdminClient.class);
final ListOffsetsResult result = EasyMock.createNiceMock(ListOffsetsResult.class);
final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> allFuture = EasyMock.createMock(KafkaFuture.class);
@@ -60,12 +57,12 @@ public class ClientUtilsTest {
EasyMock.expect(allFuture.get()).andThrow(new InterruptedException());
replay(adminClient, result, allFuture);
- assertThrows(StreamsException.class, () -> fetchEndOffsetsWithoutTimeout(emptyList(), adminClient));
+ assertThrows(StreamsException.class, () -> fetchEndOffsets(emptyList(), adminClient));
verify(adminClient);
}
@Test
- public void fetchEndOffsetsShouldRethrowExecutionExceptionAsStreamsException() throws InterruptedException, ExecutionException {
+ public void fetchEndOffsetsShouldRethrowExecutionExceptionAsStreamsException() throws Exception {
final Admin adminClient = EasyMock.createMock(AdminClient.class);
final ListOffsetsResult result = EasyMock.createNiceMock(ListOffsetsResult.class);
final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> allFuture = EasyMock.createMock(KafkaFuture.class);
@@ -75,22 +72,23 @@ public class ClientUtilsTest {
EasyMock.expect(allFuture.get()).andThrow(new ExecutionException(new RuntimeException()));
replay(adminClient, result, allFuture);
- assertThrows(StreamsException.class, () -> fetchEndOffsetsWithoutTimeout(emptyList(), adminClient));
+ assertThrows(StreamsException.class, () -> fetchEndOffsets(emptyList(), adminClient));
verify(adminClient);
}
@Test
- public void fetchEndOffsetsWithTimeoutShouldRethrowTimeoutExceptionAsStreamsException() throws InterruptedException, ExecutionException, TimeoutException {
+ public void fetchEndOffsetsWithTimeoutShouldRethrowTimeoutExceptionAsStreamsException() throws Exception {
final Admin adminClient = EasyMock.createMock(AdminClient.class);
final ListOffsetsResult result = EasyMock.createNiceMock(ListOffsetsResult.class);
final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> allFuture = EasyMock.createMock(KafkaFuture.class);
EasyMock.expect(adminClient.listOffsets(EasyMock.anyObject())).andStubReturn(result);
EasyMock.expect(result.all()).andStubReturn(allFuture);
- EasyMock.expect(allFuture.get(1L, TimeUnit.MILLISECONDS)).andThrow(new TimeoutException());
+ EasyMock.expect(allFuture.get()).andThrow(new TimeoutException());
replay(adminClient, result, allFuture);
- assertThrows(StreamsException.class, () -> fetchEndOffsets(emptyList(), adminClient, Duration.ofMillis(1)));
+ assertThrows(StreamsException.class, () -> fetchEndOffsets(emptyList(), adminClient));
verify(adminClient);
}
+
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 82edeb0..350a598 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
@@ -1791,17 +1790,6 @@ public class StreamsPartitionAssignorTest {
}
@Test
- public void shouldSetAdminClientTimeout() {
- createDefaultMockTaskManager();
-
- final Map<String, Object> props = configProps();
- props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 2 * 60 * 1000);
- final AssignorConfiguration assignorConfiguration = new AssignorConfiguration(props);
-
- assertThat(assignorConfiguration.adminClientTimeout(), is(2 * 60 * 1000));
- }
-
- @Test
public void shouldGetNextProbingRebalanceMs() {
nextScheduledRebalanceMs.set(5 * 60 * 1000L);