You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/31 02:04:21 UTC
kafka git commit: KAFKA-5324;
AdminClient: add close with timeout, fix some timeout bugs
Repository: kafka
Updated Branches:
refs/heads/trunk c060c4828 -> 3250cc767
KAFKA-5324; AdminClient: add close with timeout, fix some timeout bugs
Author: Colin P. Mccabe <cm...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #3141 from cmccabe/KAFKA-5324
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3250cc76
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3250cc76
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3250cc76
Branch: refs/heads/trunk
Commit: 3250cc767e8ef79d0160312a8b605535d3002851
Parents: c060c48
Author: Colin P. Mccabe <cm...@confluent.io>
Authored: Wed May 31 03:02:26 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed May 31 03:02:52 2017 +0100
----------------------------------------------------------------------
.../apache/kafka/clients/admin/AdminClient.java | 29 ++-
.../kafka/clients/admin/KafkaAdminClient.java | 240 +++++++++++++++----
.../clients/admin/MockKafkaAdminClientEnv.java | 2 +-
.../api/KafkaAdminClientIntegrationTest.scala | 59 ++++-
4 files changed, 273 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/3250cc76/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
index 4cfc174..96b8ebb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
/**
* The public interface for the {@link KafkaAdminClient}, which supports managing and inspecting topics,
@@ -36,8 +37,8 @@ public abstract class AdminClient implements AutoCloseable {
/**
* Create a new AdminClient with the given configuration.
*
- * @param props The configuration.
- * @return The new KafkaAdminClient.
+ * @param props The configuration.
+ * @return The new KafkaAdminClient.
*/
public static AdminClient create(Properties props) {
return KafkaAdminClient.createInternal(new AdminClientConfig(props));
@@ -46,8 +47,8 @@ public abstract class AdminClient implements AutoCloseable {
/**
* Create a new AdminClient with the given configuration.
*
- * @param conf The configuration.
- * @return The new KafkaAdminClient.
+ * @param conf The configuration.
+ * @return The new KafkaAdminClient.
*/
public static AdminClient create(Map<String, Object> conf) {
return KafkaAdminClient.createInternal(new AdminClientConfig(conf));
@@ -55,8 +56,26 @@ public abstract class AdminClient implements AutoCloseable {
/**
* Close the AdminClient and release all associated resources.
+ *
+ * See {@link AdminClient#close(long, TimeUnit)}
+ */
+ @Override
+ public void close() {
+ close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Close the AdminClient and release all associated resources.
+ *
+ * The close operation has a grace period during which current operations will be allowed to
+ * complete, specified by the given duration and time unit.
+ * New operations will not be accepted during the grace period. Once the grace period is over,
+ * all operations that have not yet been completed will be aborted with a TimeoutException.
+ *
+ * @param duration The duration to use for the wait time.
+ * @param unit The time unit to use for the wait time.
*/
- public abstract void close();
+ public abstract void close(long duration, TimeUnit unit);
/**
* Create a batch of new topics with the default options.
http://git-wip-us.apache.org/repos/asf/kafka/blob/3250cc76/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 355dc9c..98fc3f3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -97,6 +97,7 @@ import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import static org.apache.kafka.common.utils.Utils.closeQuietly;
@@ -124,6 +125,11 @@ public class KafkaAdminClient extends AdminClient {
private static final String JMX_PREFIX = "kafka.admin.client";
/**
+ * An invalid shutdown time which indicates that a shutdown has not yet been performed.
+ */
+ private static final long INVALID_SHUTDOWN_TIME = -1;
+
+ /**
* The default timeout to use for an operation.
*/
private final int defaultTimeoutMs;
@@ -164,9 +170,10 @@ public class KafkaAdminClient extends AdminClient {
private final Thread thread;
/**
- * True if this client is closed.
+ * During a close operation, this is the time at which we will time out all pending operations
+ * and force the RPC thread to exit. If the admin client is not closing, this will be 0.
*/
- private volatile boolean closed = false;
+ private final AtomicLong hardShutdownTimeMs = new AtomicLong(INVALID_SHUTDOWN_TIME);
/**
* Get or create a list value from a map.
@@ -289,12 +296,12 @@ public class KafkaAdminClient extends AdminClient {
selector,
metadata,
clientId,
- 100,
+ 1,
config.getLong(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG),
config.getLong(AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
config.getInt(AdminClientConfig.SEND_BUFFER_CONFIG),
config.getInt(AdminClientConfig.RECEIVE_BUFFER_CONFIG),
- config.getInt(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG),
+ (int) TimeUnit.HOURS.toMillis(1),
time,
true,
apiVersions);
@@ -309,7 +316,7 @@ public class KafkaAdminClient extends AdminClient {
}
}
- static KafkaAdminClient create(AdminClientConfig config, KafkaClient client, Metadata metadata) {
+ static KafkaAdminClient createInternal(AdminClientConfig config, KafkaClient client, Metadata metadata) {
Metrics metrics = null;
Time time = Time.SYSTEM;
String clientId = generateClientId(config);
@@ -343,9 +350,33 @@ public class KafkaAdminClient extends AdminClient {
}
@Override
- public void close() {
- closed = true;
- client.wakeup(); // Wake the thread, if it is blocked inside poll().
+ public void close(long duration, TimeUnit unit) {
+ long waitTimeMs = unit.toMillis(duration);
+ waitTimeMs = Math.min(TimeUnit.DAYS.toMillis(365), waitTimeMs); // Limit the timeout to a year.
+ long now = time.milliseconds();
+ long newHardShutdownTimeMs = now + waitTimeMs;
+ long prev = INVALID_SHUTDOWN_TIME;
+ while (true) {
+ if (hardShutdownTimeMs.compareAndSet(prev, newHardShutdownTimeMs)) {
+ if (prev == INVALID_SHUTDOWN_TIME) {
+ log.debug("{}: initiating close operation.", clientId);
+ } else {
+ log.debug("{}: moving hard shutdown time forward.", clientId);
+ }
+ client.wakeup(); // Wake the thread, if it is blocked inside poll().
+ break;
+ }
+ prev = hardShutdownTimeMs.get();
+ if (prev < newHardShutdownTimeMs) {
+ log.debug("{}: hard shutdown time is already earlier than requested.", clientId);
+ newHardShutdownTimeMs = prev;
+ break;
+ }
+ }
+ if (log.isDebugEnabled()) {
+ long deltaMs = Math.max(0, newHardShutdownTimeMs - time.milliseconds());
+ log.debug("{}: waiting for the I/O thread to exit. Hard shutdown in {} ms.", clientId, deltaMs);
+ }
try {
// Wait for the thread to be joined.
thread.join();
@@ -439,7 +470,7 @@ public class KafkaAdminClient extends AdminClient {
if ((throwable instanceof UnsupportedVersionException) &&
handleUnsupportedVersionException((UnsupportedVersionException) throwable)) {
log.trace("{} attempting protocol downgrade.", this);
- runnable.call(this, now);
+ runnable.enqueue(this, now);
return;
}
tries++;
@@ -474,7 +505,7 @@ public class KafkaAdminClient extends AdminClient {
log.debug("{} failed: {}. Beginning retry #{}",
this, prettyPrintException(throwable), tries);
}
- runnable.call(this, now);
+ runnable.enqueue(this, now);
}
/**
@@ -523,6 +554,7 @@ public class KafkaAdminClient extends AdminClient {
private final class AdminClientRunnable implements Runnable {
/**
* Pending calls. Protected by the object monitor.
+ * This will be null only if the thread has shut down.
*/
private List<Call> newCalls = new LinkedList<>();
@@ -554,47 +586,96 @@ public class KafkaAdminClient extends AdminClient {
return null;
}
- /**
- * Time out a list of calls.
- *
- * @param now The current time in milliseconds.
- * @param calls The collection of calls. Must be sorted from oldest to newest.
- */
- private int timeoutCalls(long now, Collection<Call> calls) {
- int numTimedOut = 0;
- for (Iterator<Call> iter = calls.iterator(); iter.hasNext(); ) {
- Call call = iter.next();
- if (calcTimeoutMsRemainingAsInt(now, call.deadlineMs) < 0) {
- call.fail(now, new TimeoutException());
- iter.remove();
- numTimedOut++;
+ private class TimeoutProcessor {
+ /**
+ * The current time in milliseconds.
+ */
+ private final long now;
+
+ /**
+ * The number of milliseconds until the next timeout.
+ */
+ private int nextTimeoutMs;
+
+ /**
+ * Create a new timeout processor.
+ *
+ * @param now The current time in milliseconds since the epoch.
+ */
+ TimeoutProcessor(long now) {
+ this.now = now;
+ this.nextTimeoutMs = Integer.MAX_VALUE;
+ }
+
+ /**
+ * Check for calls which have timed out.
+ * Timed out calls will be removed and failed.
+ * The remaining milliseconds until the next timeout will be updated.
+ *
+ * @param calls The collection of calls.
+ *
+ * @return The number of calls which were timed out.
+ */
+ int handleTimeouts(Collection<Call> calls, String msg) {
+ int numTimedOut = 0;
+ for (Iterator<Call> iter = calls.iterator(); iter.hasNext(); ) {
+ Call call = iter.next();
+ int remainingMs = calcTimeoutMsRemainingAsInt(now, call.deadlineMs);
+ if (remainingMs < 0) {
+ call.fail(now, new TimeoutException(msg));
+ iter.remove();
+ numTimedOut++;
+ } else {
+ nextTimeoutMs = Math.min(nextTimeoutMs, remainingMs);
+ }
}
+ return numTimedOut;
+ }
+
+ /**
+ * Check whether a call should be timed out.
+ * The remaining milliseconds until the next timeout will be updated.
+ *
+ * @param call The call.
+ *
+ * @return True if the call should be timed out.
+ */
+ boolean callHasExpired(Call call) {
+ int remainingMs = calcTimeoutMsRemainingAsInt(now, call.deadlineMs);
+ if (remainingMs < 0)
+ return true;
+ nextTimeoutMs = Math.min(nextTimeoutMs, remainingMs);
+ return false;
+ }
+
+ int nextTimeoutMs() {
+ return nextTimeoutMs;
}
- return numTimedOut;
}
/**
* Time out the elements in the newCalls list which are expired.
*
- * @param now The current time in milliseconds.
+ * @param processor The timeout processor.
*/
- private synchronized void timeoutNewCalls(long now) {
- int numTimedOut = timeoutCalls(now, newCalls);
- if (numTimedOut > 0) {
+ private synchronized void timeoutNewCalls(TimeoutProcessor processor) {
+ int numTimedOut = processor.handleTimeouts(newCalls,
+ "Timed out waiting for a node assignment.");
+ if (numTimedOut > 0)
log.debug("{}: timed out {} new calls.", clientId, numTimedOut);
- }
}
/**
* Time out calls which have been assigned to nodes.
*
- * @param now The current time in milliseconds.
+ * @param processor The timeout processor.
* @param callsToSend A map of nodes to the calls they need to handle.
*/
- private void timeoutCallsToSend(long now, Map<Node, List<Call>> callsToSend) {
+ private void timeoutCallsToSend(TimeoutProcessor processor, Map<Node, List<Call>> callsToSend) {
int numTimedOut = 0;
for (List<Call> callList : callsToSend.values()) {
- numTimedOut += timeoutCalls(now, callList);
+ numTimedOut += processor.handleTimeouts(callList,
+ "Timed out waiting to send the call.");
}
if (numTimedOut > 0)
log.debug("{}: timed out {} call(s) with assigned nodes.", clientId, numTimedOut);
@@ -698,10 +779,10 @@ public class KafkaAdminClient extends AdminClient {
* even be in the process of being processed by the remote server. At the moment, our only option
* to time them out is to close the entire connection.
*
- * @param now The current time in milliseconds.
- * @param callsInFlight A map of nodes to the calls they have in flight.
+ * @param processor The timeout processor.
+ * @param callsInFlight A map of nodes to the calls they have in flight.
*/
- private void timeoutCallsInFlight(long now, Map<String, List<Call>> callsInFlight) {
+ private void timeoutCallsInFlight(TimeoutProcessor processor, Map<String, List<Call>> callsInFlight) {
int numTimedOut = 0;
for (Map.Entry<String, List<Call>> entry : callsInFlight.entrySet()) {
List<Call> contexts = entry.getValue();
@@ -711,7 +792,7 @@ public class KafkaAdminClient extends AdminClient {
// We assume that the first element in the list is the earliest. So it should be the
// only one we need to check the timeout for.
Call call = contexts.get(0);
- if (calcTimeoutMsRemainingAsInt(now, call.deadlineMs) < 0) {
+ if (processor.callHasExpired(call)) {
log.debug("{}: Closing connection to {} to time out {}", clientId, nodeId, call);
client.close(nodeId);
numTimedOut++;
@@ -773,6 +854,22 @@ public class KafkaAdminClient extends AdminClient {
}
}
+ private synchronized boolean threadShouldExit(long now, long curHardShutdownTimeMs,
+ Map<Node, List<Call>> callsToSend, Map<Integer, Call> correlationIdToCalls) {
+ if (newCalls.isEmpty() && callsToSend.isEmpty() && correlationIdToCalls.isEmpty()) {
+ log.trace("{}: all work has been completed, and the I/O thread is now " +
+ "exiting.", clientId);
+ return true;
+ }
+ if (now > curHardShutdownTimeMs) {
+ log.info("{}: forcing a hard I/O thread shutdown. Requests in progress will " +
+ "be aborted.", clientId);
+ return true;
+ }
+ log.debug("{}: hard shutdown in {} ms.", clientId, curHardShutdownTimeMs - now);
+ return false;
+ }
+
@Override
public void run() {
/**
@@ -798,18 +895,25 @@ public class KafkaAdminClient extends AdminClient {
long now = time.milliseconds();
log.trace("{} thread starting", clientId);
while (true) {
- // Check if the AdminClient is shutting down.
- if (closed)
+ // Check if the AdminClient thread should shut down.
+ long curHardShutdownTimeMs = hardShutdownTimeMs.get();
+ if ((curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) &&
+ threadShouldExit(now, curHardShutdownTimeMs, callsToSend, correlationIdToCalls))
break;
// Handle timeouts.
- timeoutNewCalls(now);
- timeoutCallsToSend(now, callsToSend);
- timeoutCallsInFlight(now, callsInFlight);
+ TimeoutProcessor timeoutProcessor = new TimeoutProcessor(now);
+ timeoutNewCalls(timeoutProcessor);
+ timeoutCallsToSend(timeoutProcessor, callsToSend);
+ timeoutCallsInFlight(timeoutProcessor, callsInFlight);
+
+ long pollTimeout = Math.min(1200000, timeoutProcessor.nextTimeoutMs());
+ if (curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) {
+ pollTimeout = Math.min(pollTimeout, curHardShutdownTimeMs - now);
+ }
// Handle new calls and metadata update requests.
prevMetadataVersion = checkMetadataReady(prevMetadataVersion);
- long pollTimeout = 1200000;
if (prevMetadataVersion == null) {
chooseNodesForNewCalls(now, callsToSend);
pollTimeout = Math.min(pollTimeout,
@@ -826,10 +930,14 @@ public class KafkaAdminClient extends AdminClient {
handleResponses(now, responses, callsInFlight, correlationIdToCalls);
}
int numTimedOut = 0;
+ TimeoutProcessor timeoutProcessor = new TimeoutProcessor(Long.MAX_VALUE);
synchronized (this) {
- numTimedOut += timeoutCalls(Long.MAX_VALUE, newCalls);
+ numTimedOut += timeoutProcessor.handleTimeouts(newCalls,
+ "The AdminClient thread has exited.");
+ newCalls = null;
}
- numTimedOut += timeoutCalls(Long.MAX_VALUE, correlationIdToCalls.values());
+ numTimedOut += timeoutProcessor.handleTimeouts(correlationIdToCalls.values(),
+ "The AdminClient thread has exited.");
if (numTimedOut > 0) {
log.debug("{}: timed out {} remaining operations.", clientId, numTimedOut);
}
@@ -838,15 +946,51 @@ public class KafkaAdminClient extends AdminClient {
log.debug("{}: exiting AdminClientRunnable thread.", clientId);
}
- void call(Call call, long now) {
+ /**
+ * Queue a call for sending.
+ *
+ * If the AdminClient thread has exited, this will fail. Otherwise, it will succeed (even
+ * if the AdminClient is shutting down.) This function should called when retrying an
+ * existing call.
+ *
+ * @param call The new call object.
+ * @param now The current time in milliseconds.
+ */
+ void enqueue(Call call, long now) {
if (log.isDebugEnabled()) {
log.debug("{}: queueing {} with a timeout {} ms from now.",
clientId, call, call.deadlineMs - now);
}
+ boolean accepted = false;
synchronized (this) {
- newCalls.add(call);
+ if (newCalls != null) {
+ newCalls.add(call);
+ accepted = true;
+ }
+ }
+ if (accepted) {
+ client.wakeup(); // wake the thread if it is in poll()
+ } else {
+ log.debug("{}: the AdminClient thread has exited. Timing out {}.", clientId, call);
+ call.fail(Long.MAX_VALUE, new TimeoutException("The AdminClient thread has exited."));
+ }
+ }
+
+ /**
+ * Initiate a new call.
+ *
+ * This will fail if the AdminClient is scheduled to shut down.
+ *
+ * @param call The new call object.
+ * @param now The current time in milliseconds.
+ */
+ void call(Call call, long now) {
+ if (hardShutdownTimeMs.get() != INVALID_SHUTDOWN_TIME) {
+ log.debug("{}: the AdminClient is not accepting new calls. Timing out {}.", clientId, call);
+ call.fail(Long.MAX_VALUE, new TimeoutException("The AdminClient thread is not accepting new calls."));
+ } else {
+ enqueue(call, now);
}
- client.wakeup();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3250cc76/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java
index 1fa0249..ba7b528 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java
@@ -52,7 +52,7 @@ public class MockKafkaAdminClientEnv implements AutoCloseable {
this.metadata = new Metadata(adminClientConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
this.mockClient = new MockClient(Time.SYSTEM, this.metadata);
- this.client = KafkaAdminClient.create(adminClientConfig, mockClient, metadata);
+ this.client = KafkaAdminClient.createInternal(adminClientConfig, mockClient, metadata);
}
public Cluster cluster() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/3250cc76/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
index 81f5c27..c52594b 100644
--- a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
@@ -18,9 +18,9 @@ package kafka.api
import java.util
import java.util.{Collections, Properties}
-import java.util.concurrent.ExecutionException
+import java.util.concurrent.{ExecutionException, TimeUnit}
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.utils.{Time, Utils}
import kafka.integration.KafkaServerTestHarness
import kafka.log.LogConfig
import kafka.server.{Defaults, KafkaConfig}
@@ -28,7 +28,7 @@ import org.apache.kafka.clients.admin._
import kafka.utils.{Logging, TestUtils}
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.common.KafkaFuture
-import org.apache.kafka.common.errors.{InvalidRequestException, SecurityDisabledException, TopicExistsException}
+import org.apache.kafka.common.errors.{InvalidRequestException, SecurityDisabledException, TimeoutException, TopicExistsException}
import org.apache.kafka.common.protocol.ApiKeys
import org.junit.{After, Before, Rule, Test}
import org.apache.kafka.common.requests.MetadataResponse
@@ -381,6 +381,59 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Loggin
client.close()
}
+ /**
+ * Test closing the AdminClient with a generous timeout. Calls in progress should be completed,
+ * since they can be done within the timeout. New calls should receive timeouts.
+ */
+ @Test
+ def testDelayedClose(): Unit = {
+ client = AdminClient.create(createConfig())
+ val topics = Seq("mytopic", "mytopic2")
+ val newTopics = topics.map(new NewTopic(_, 1, 1))
+ val future = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all()
+ client.close(2, TimeUnit.HOURS)
+ val future2 = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all()
+ assertFutureExceptionTypeEquals(future2, classOf[TimeoutException])
+ future.get
+ client.close(30, TimeUnit.MINUTES) // multiple close-with-timeout should have no effect
+ }
+
+ /**
+ * Test closing the AdminClient with a timeout of 0, when there are calls with extremely long
+ * timeouts in progress. The calls should be aborted after the hard shutdown timeout elapses.
+ */
+ @Test
+ def testForceClose(): Unit = {
+ val config = createConfig()
+ config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:22")
+ client = AdminClient.create(config)
+ // Because the bootstrap servers are set up incorrectly, this call will not complete, but must be
+ // cancelled by the close operation.
+ val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1)).asJava,
+ new CreateTopicsOptions().timeoutMs(900000)).all()
+ client.close(0, TimeUnit.MILLISECONDS)
+ assertFutureExceptionTypeEquals(future, classOf[TimeoutException])
+ }
+
+ /**
+ * Check that a call with a timeout does not complete before the minimum timeout has elapsed,
+ * even when the default request timeout is shorter.
+ */
+ @Test
+ def testMinimumRequestTimeouts(): Unit = {
+ val config = createConfig()
+ config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:22")
+ config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "0")
+ client = AdminClient.create(config)
+ val startTimeMs = Time.SYSTEM.milliseconds()
+ val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1)).asJava,
+ new CreateTopicsOptions().timeoutMs(2)).all()
+ assertFutureExceptionTypeEquals(future, classOf[TimeoutException])
+ val endTimeMs = Time.SYSTEM.milliseconds()
+ assertTrue("Expected the timeout to take at least one millisecond.", endTimeMs > startTimeMs);
+ client.close()
+ }
+
override def generateConfigs() = {
val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)