You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/31 02:04:21 UTC

kafka git commit: KAFKA-5324; AdminClient: add close with timeout, fix some timeout bugs

Repository: kafka
Updated Branches:
  refs/heads/trunk c060c4828 -> 3250cc767


KAFKA-5324; AdminClient: add close with timeout, fix some timeout bugs

Author: Colin P. Mccabe <cm...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #3141 from cmccabe/KAFKA-5324


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3250cc76
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3250cc76
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3250cc76

Branch: refs/heads/trunk
Commit: 3250cc767e8ef79d0160312a8b605535d3002851
Parents: c060c48
Author: Colin P. Mccabe <cm...@confluent.io>
Authored: Wed May 31 03:02:26 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed May 31 03:02:52 2017 +0100

----------------------------------------------------------------------
 .../apache/kafka/clients/admin/AdminClient.java |  29 ++-
 .../kafka/clients/admin/KafkaAdminClient.java   | 240 +++++++++++++++----
 .../clients/admin/MockKafkaAdminClientEnv.java  |   2 +-
 .../api/KafkaAdminClientIntegrationTest.scala   |  59 ++++-
 4 files changed, 273 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3250cc76/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
index 4cfc174..96b8ebb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import java.util.Collection;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 /**
  * The public interface for the {@link KafkaAdminClient}, which supports managing and inspecting topics,
@@ -36,8 +37,8 @@ public abstract class AdminClient implements AutoCloseable {
     /**
      * Create a new AdminClient with the given configuration.
      *
-     * @param props         The configuration.
-     * @return              The new KafkaAdminClient.
+     * @param props The configuration.
+     * @return The new KafkaAdminClient.
      */
     public static AdminClient create(Properties props) {
         return KafkaAdminClient.createInternal(new AdminClientConfig(props));
@@ -46,8 +47,8 @@ public abstract class AdminClient implements AutoCloseable {
     /**
      * Create a new AdminClient with the given configuration.
      *
-     * @param conf          The configuration.
-     * @return              The new KafkaAdminClient.
+     * @param conf The configuration.
+     * @return The new KafkaAdminClient.
      */
     public static AdminClient create(Map<String, Object> conf) {
         return KafkaAdminClient.createInternal(new AdminClientConfig(conf));
@@ -55,8 +56,26 @@ public abstract class AdminClient implements AutoCloseable {
 
     /**
      * Close the AdminClient and release all associated resources.
+     *
+     * See {@link AdminClient#close(long, TimeUnit)}
+     */
+    @Override
+    public void close() {
+        close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Close the AdminClient and release all associated resources.
+     *
+     * The close operation has a grace period during which current operations will be allowed to
+     * complete, specified by the given duration and time unit.
+     * New operations will not be accepted during the grace period.  Once the grace period is over,
+     * all operations that have not yet been completed will be aborted with a TimeoutException.
+     *
+     * @param duration  The duration to use for the wait time.
+     * @param unit      The time unit to use for the wait time.
      */
-    public abstract void close();
+    public abstract void close(long duration, TimeUnit unit);
 
     /**
      * Create a batch of new topics with the default options.

http://git-wip-us.apache.org/repos/asf/kafka/blob/3250cc76/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 355dc9c..98fc3f3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -97,6 +97,7 @@ import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.kafka.common.utils.Utils.closeQuietly;
 
@@ -124,6 +125,11 @@ public class KafkaAdminClient extends AdminClient {
     private static final String JMX_PREFIX = "kafka.admin.client";
 
     /**
+     * An invalid shutdown time which indicates that a shutdown has not yet been performed.
+     */
+    private static final long INVALID_SHUTDOWN_TIME = -1;
+
+    /**
      * The default timeout to use for an operation.
      */
     private final int defaultTimeoutMs;
@@ -164,9 +170,10 @@ public class KafkaAdminClient extends AdminClient {
     private final Thread thread;
 
     /**
-     * True if this client is closed.
+     * During a close operation, this is the time at which we will time out all pending operations
+     * and force the RPC thread to exit.  If the admin client is not closing, this will be 0.
      */
-    private volatile boolean closed = false;
+    private final AtomicLong hardShutdownTimeMs = new AtomicLong(INVALID_SHUTDOWN_TIME);
 
     /**
      * Get or create a list value from a map.
@@ -289,12 +296,12 @@ public class KafkaAdminClient extends AdminClient {
                 selector,
                 metadata,
                 clientId,
-                100,
+                1,
                 config.getLong(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG),
                 config.getLong(AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
                 config.getInt(AdminClientConfig.SEND_BUFFER_CONFIG),
                 config.getInt(AdminClientConfig.RECEIVE_BUFFER_CONFIG),
-                config.getInt(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG),
+                (int) TimeUnit.HOURS.toMillis(1),
                 time,
                 true,
                 apiVersions);
@@ -309,7 +316,7 @@ public class KafkaAdminClient extends AdminClient {
         }
     }
 
-    static KafkaAdminClient create(AdminClientConfig config, KafkaClient client, Metadata metadata) {
+    static KafkaAdminClient createInternal(AdminClientConfig config, KafkaClient client, Metadata metadata) {
         Metrics metrics = null;
         Time time = Time.SYSTEM;
         String clientId = generateClientId(config);
@@ -343,9 +350,33 @@ public class KafkaAdminClient extends AdminClient {
     }
 
     @Override
-    public void close() {
-        closed = true;
-        client.wakeup(); // Wake the thread, if it is blocked inside poll().
+    public void close(long duration, TimeUnit unit) {
+        long waitTimeMs = unit.toMillis(duration);
+        waitTimeMs = Math.min(TimeUnit.DAYS.toMillis(365), waitTimeMs); // Limit the timeout to a year.
+        long now = time.milliseconds();
+        long newHardShutdownTimeMs = now + waitTimeMs;
+        long prev = INVALID_SHUTDOWN_TIME;
+        while (true) {
+            if (hardShutdownTimeMs.compareAndSet(prev, newHardShutdownTimeMs)) {
+                if (prev == INVALID_SHUTDOWN_TIME) {
+                    log.debug("{}: initiating close operation.", clientId);
+                } else {
+                    log.debug("{}: moving hard shutdown time forward.", clientId);
+                }
+                client.wakeup(); // Wake the thread, if it is blocked inside poll().
+                break;
+            }
+            prev = hardShutdownTimeMs.get();
+            if (prev < newHardShutdownTimeMs) {
+                log.debug("{}: hard shutdown time is already earlier than requested.", clientId);
+                newHardShutdownTimeMs = prev;
+                break;
+            }
+        }
+        if (log.isDebugEnabled()) {
+            long deltaMs = Math.max(0, newHardShutdownTimeMs - time.milliseconds());
+            log.debug("{}: waiting for the I/O thread to exit. Hard shutdown in {} ms.", clientId, deltaMs);
+        }
         try {
             // Wait for the thread to be joined.
             thread.join();
@@ -439,7 +470,7 @@ public class KafkaAdminClient extends AdminClient {
             if ((throwable instanceof UnsupportedVersionException) &&
                      handleUnsupportedVersionException((UnsupportedVersionException) throwable)) {
                 log.trace("{} attempting protocol downgrade.", this);
-                runnable.call(this, now);
+                runnable.enqueue(this, now);
                 return;
             }
             tries++;
@@ -474,7 +505,7 @@ public class KafkaAdminClient extends AdminClient {
                 log.debug("{} failed: {}.  Beginning retry #{}",
                     this, prettyPrintException(throwable), tries);
             }
-            runnable.call(this, now);
+            runnable.enqueue(this, now);
         }
 
         /**
@@ -523,6 +554,7 @@ public class KafkaAdminClient extends AdminClient {
     private final class AdminClientRunnable implements Runnable {
         /**
          * Pending calls.  Protected by the object monitor.
+         * This will be null only if the thread has shut down.
          */
         private List<Call> newCalls = new LinkedList<>();
 
@@ -554,47 +586,96 @@ public class KafkaAdminClient extends AdminClient {
             return null;
         }
 
-        /**
-         * Time out a list of calls.
-         *
-         * @param now       The current time in milliseconds.
-         * @param calls     The collection of calls.  Must be sorted from oldest to newest.
-         */
-        private int timeoutCalls(long now, Collection<Call> calls) {
-            int numTimedOut = 0;
-            for (Iterator<Call> iter = calls.iterator(); iter.hasNext(); ) {
-                Call call = iter.next();
-                if (calcTimeoutMsRemainingAsInt(now, call.deadlineMs) < 0) {
-                    call.fail(now, new TimeoutException());
-                    iter.remove();
-                    numTimedOut++;
+        private class TimeoutProcessor {
+            /**
+             * The current time in milliseconds.
+             */
+            private final long now;
+
+            /**
+             * The number of milliseconds until the next timeout.
+             */
+            private int nextTimeoutMs;
+
+            /**
+             * Create a new timeout processor.
+             *
+             * @param now           The current time in milliseconds since the epoch.
+             */
+            TimeoutProcessor(long now) {
+                this.now = now;
+                this.nextTimeoutMs = Integer.MAX_VALUE;
+            }
+
+            /**
+             * Check for calls which have timed out.
+             * Timed out calls will be removed and failed.
+             * The remaining milliseconds until the next timeout will be updated.
+             *
+             * @param calls         The collection of calls.
+             *
+             * @return              The number of calls which were timed out.
+             */
+            int handleTimeouts(Collection<Call> calls, String msg) {
+                int numTimedOut = 0;
+                for (Iterator<Call> iter = calls.iterator(); iter.hasNext(); ) {
+                    Call call = iter.next();
+                    int remainingMs = calcTimeoutMsRemainingAsInt(now, call.deadlineMs);
+                    if (remainingMs < 0) {
+                        call.fail(now, new TimeoutException(msg));
+                        iter.remove();
+                        numTimedOut++;
+                    } else {
+                        nextTimeoutMs = Math.min(nextTimeoutMs, remainingMs);
+                    }
                 }
+                return numTimedOut;
+            }
+
+            /**
+             * Check whether a call should be timed out.
+             * The remaining milliseconds until the next timeout will be updated.
+             *
+             * @param call      The call.
+             *
+             * @return          True if the call should be timed out.
+             */
+            boolean callHasExpired(Call call) {
+                int remainingMs = calcTimeoutMsRemainingAsInt(now, call.deadlineMs);
+                if (remainingMs < 0)
+                    return true;
+                nextTimeoutMs = Math.min(nextTimeoutMs, remainingMs);
+                return false;
+            }
+
+            int nextTimeoutMs() {
+                return nextTimeoutMs;
             }
-            return numTimedOut;
         }
 
         /**
          * Time out the elements in the newCalls list which are expired.
          *
-         * @param now       The current time in milliseconds.
+         * @param processor     The timeout processor.
          */
-        private synchronized void timeoutNewCalls(long now) {
-            int numTimedOut = timeoutCalls(now, newCalls);
-            if (numTimedOut > 0) {
+        private synchronized void timeoutNewCalls(TimeoutProcessor processor) {
+            int numTimedOut = processor.handleTimeouts(newCalls,
+                    "Timed out waiting for a node assignment.");
+            if (numTimedOut > 0)
                 log.debug("{}: timed out {} new calls.", clientId, numTimedOut);
-            }
         }
 
         /**
          * Time out calls which have been assigned to nodes.
          *
-         * @param now           The current time in milliseconds.
+         * @param processor     The timeout processor.
          * @param callsToSend   A map of nodes to the calls they need to handle.
          */
-        private void timeoutCallsToSend(long now, Map<Node, List<Call>> callsToSend) {
+        private void timeoutCallsToSend(TimeoutProcessor processor, Map<Node, List<Call>> callsToSend) {
             int numTimedOut = 0;
             for (List<Call> callList : callsToSend.values()) {
-                numTimedOut += timeoutCalls(now, callList);
+                numTimedOut += processor.handleTimeouts(callList,
+                    "Timed out waiting to send the call.");
             }
             if (numTimedOut > 0)
                 log.debug("{}: timed out {} call(s) with assigned nodes.", clientId, numTimedOut);
@@ -698,10 +779,10 @@ public class KafkaAdminClient extends AdminClient {
          * even be in the process of being processed by the remote server.  At the moment, our only option
          * to time them out is to close the entire connection.
          *
-         * @param now                   The current time in milliseconds.
-         * @param callsInFlight         A map of nodes to the calls they have in flight.
+         * @param processor         The timeout processor.
+         * @param callsInFlight     A map of nodes to the calls they have in flight.
          */
-        private void timeoutCallsInFlight(long now, Map<String, List<Call>> callsInFlight) {
+        private void timeoutCallsInFlight(TimeoutProcessor processor, Map<String, List<Call>> callsInFlight) {
             int numTimedOut = 0;
             for (Map.Entry<String, List<Call>> entry : callsInFlight.entrySet()) {
                 List<Call> contexts = entry.getValue();
@@ -711,7 +792,7 @@ public class KafkaAdminClient extends AdminClient {
                 // We assume that the first element in the list is the earliest.  So it should be the
                 // only one we need to check the timeout for.
                 Call call = contexts.get(0);
-                if (calcTimeoutMsRemainingAsInt(now, call.deadlineMs) < 0) {
+                if (processor.callHasExpired(call)) {
                     log.debug("{}: Closing connection to {} to time out {}", clientId, nodeId, call);
                     client.close(nodeId);
                     numTimedOut++;
@@ -773,6 +854,22 @@ public class KafkaAdminClient extends AdminClient {
             }
         }
 
+        private synchronized boolean threadShouldExit(long now, long curHardShutdownTimeMs,
+                Map<Node, List<Call>> callsToSend, Map<Integer, Call> correlationIdToCalls) {
+            if (newCalls.isEmpty() && callsToSend.isEmpty() && correlationIdToCalls.isEmpty()) {
+                log.trace("{}: all work has been completed, and the I/O thread is now " +
+                    "exiting.", clientId);
+                return true;
+            }
+            if (now > curHardShutdownTimeMs) {
+                log.info("{}: forcing a hard I/O thread shutdown.  Requests in progress will " +
+                    "be aborted.", clientId);
+                return true;
+            }
+            log.debug("{}: hard shutdown in {} ms.", clientId, curHardShutdownTimeMs - now);
+            return false;
+        }
+
         @Override
         public void run() {
             /**
@@ -798,18 +895,25 @@ public class KafkaAdminClient extends AdminClient {
             long now = time.milliseconds();
             log.trace("{} thread starting", clientId);
             while (true) {
-                // Check if the AdminClient is shutting down.
-                if (closed)
+                // Check if the AdminClient thread should shut down.
+                long curHardShutdownTimeMs = hardShutdownTimeMs.get();
+                if ((curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) &&
+                        threadShouldExit(now, curHardShutdownTimeMs, callsToSend, correlationIdToCalls))
                     break;
 
                 // Handle timeouts.
-                timeoutNewCalls(now);
-                timeoutCallsToSend(now, callsToSend);
-                timeoutCallsInFlight(now, callsInFlight);
+                TimeoutProcessor timeoutProcessor = new TimeoutProcessor(now);
+                timeoutNewCalls(timeoutProcessor);
+                timeoutCallsToSend(timeoutProcessor, callsToSend);
+                timeoutCallsInFlight(timeoutProcessor, callsInFlight);
+
+                long pollTimeout = Math.min(1200000, timeoutProcessor.nextTimeoutMs());
+                if (curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) {
+                    pollTimeout = Math.min(pollTimeout, curHardShutdownTimeMs - now);
+                }
 
                 // Handle new calls and metadata update requests.
                 prevMetadataVersion = checkMetadataReady(prevMetadataVersion);
-                long pollTimeout = 1200000;
                 if (prevMetadataVersion == null) {
                     chooseNodesForNewCalls(now, callsToSend);
                     pollTimeout = Math.min(pollTimeout,
@@ -826,10 +930,14 @@ public class KafkaAdminClient extends AdminClient {
                 handleResponses(now, responses, callsInFlight, correlationIdToCalls);
             }
             int numTimedOut = 0;
+            TimeoutProcessor timeoutProcessor = new TimeoutProcessor(Long.MAX_VALUE);
             synchronized (this) {
-                numTimedOut += timeoutCalls(Long.MAX_VALUE, newCalls);
+                numTimedOut += timeoutProcessor.handleTimeouts(newCalls,
+                        "The AdminClient thread has exited.");
+                newCalls = null;
             }
-            numTimedOut += timeoutCalls(Long.MAX_VALUE, correlationIdToCalls.values());
+            numTimedOut += timeoutProcessor.handleTimeouts(correlationIdToCalls.values(),
+                    "The AdminClient thread has exited.");
             if (numTimedOut > 0) {
                 log.debug("{}: timed out {} remaining operations.", clientId, numTimedOut);
             }
@@ -838,15 +946,51 @@ public class KafkaAdminClient extends AdminClient {
             log.debug("{}: exiting AdminClientRunnable thread.", clientId);
         }
 
-        void call(Call call, long now) {
+        /**
+         * Queue a call for sending.
+         *
+         * If the AdminClient thread has exited, this will fail.  Otherwise, it will succeed (even
+         * if the AdminClient is shutting down.)  This function should called when retrying an
+         * existing call.
+         *
+         * @param call      The new call object.
+         * @param now       The current time in milliseconds.
+         */
+        void enqueue(Call call, long now) {
             if (log.isDebugEnabled()) {
                 log.debug("{}: queueing {} with a timeout {} ms from now.",
                     clientId, call, call.deadlineMs - now);
             }
+            boolean accepted = false;
             synchronized (this) {
-                newCalls.add(call);
+                if (newCalls != null) {
+                    newCalls.add(call);
+                    accepted = true;
+                }
+            }
+            if (accepted) {
+                client.wakeup(); // wake the thread if it is in poll()
+            } else {
+                log.debug("{}: the AdminClient thread has exited.  Timing out {}.", clientId, call);
+                call.fail(Long.MAX_VALUE, new TimeoutException("The AdminClient thread has exited."));
+            }
+        }
+
+        /**
+         * Initiate a new call.
+         *
+         * This will fail if the AdminClient is scheduled to shut down.
+         *
+         * @param call      The new call object.
+         * @param now       The current time in milliseconds.
+         */
+        void call(Call call, long now) {
+            if (hardShutdownTimeMs.get() != INVALID_SHUTDOWN_TIME) {
+                log.debug("{}: the AdminClient is not accepting new calls.  Timing out {}.", clientId, call);
+                call.fail(Long.MAX_VALUE, new TimeoutException("The AdminClient thread is not accepting new calls."));
+            } else {
+                enqueue(call, now);
             }
-            client.wakeup();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3250cc76/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java
index 1fa0249..ba7b528 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java
@@ -52,7 +52,7 @@ public class MockKafkaAdminClientEnv implements AutoCloseable {
         this.metadata = new Metadata(adminClientConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
                 adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
         this.mockClient = new MockClient(Time.SYSTEM, this.metadata);
-        this.client = KafkaAdminClient.create(adminClientConfig, mockClient, metadata);
+        this.client = KafkaAdminClient.createInternal(adminClientConfig, mockClient, metadata);
     }
 
     public Cluster cluster() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3250cc76/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
index 81f5c27..c52594b 100644
--- a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
@@ -18,9 +18,9 @@ package kafka.api
 
 import java.util
 import java.util.{Collections, Properties}
-import java.util.concurrent.ExecutionException
+import java.util.concurrent.{ExecutionException, TimeUnit}
 
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.utils.{Time, Utils}
 import kafka.integration.KafkaServerTestHarness
 import kafka.log.LogConfig
 import kafka.server.{Defaults, KafkaConfig}
@@ -28,7 +28,7 @@ import org.apache.kafka.clients.admin._
 import kafka.utils.{Logging, TestUtils}
 import org.apache.kafka.clients.admin.NewTopic
 import org.apache.kafka.common.KafkaFuture
-import org.apache.kafka.common.errors.{InvalidRequestException, SecurityDisabledException, TopicExistsException}
+import org.apache.kafka.common.errors.{InvalidRequestException, SecurityDisabledException, TimeoutException, TopicExistsException}
 import org.apache.kafka.common.protocol.ApiKeys
 import org.junit.{After, Before, Rule, Test}
 import org.apache.kafka.common.requests.MetadataResponse
@@ -381,6 +381,59 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Loggin
     client.close()
   }
 
+  /**
+    * Test closing the AdminClient with a generous timeout.  Calls in progress should be completed,
+    * since they can be done within the timeout.  New calls should receive timeouts.
+    */
+  @Test
+  def testDelayedClose(): Unit = {
+    client = AdminClient.create(createConfig())
+    val topics = Seq("mytopic", "mytopic2")
+    val newTopics = topics.map(new NewTopic(_, 1, 1))
+    val future = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all()
+    client.close(2, TimeUnit.HOURS)
+    val future2 = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all()
+    assertFutureExceptionTypeEquals(future2, classOf[TimeoutException])
+    future.get
+    client.close(30, TimeUnit.MINUTES) // multiple close-with-timeout should have no effect
+  }
+
+  /**
+    * Test closing the AdminClient with a timeout of 0, when there are calls with extremely long
+    * timeouts in progress.  The calls should be aborted after the hard shutdown timeout elapses.
+    */
+  @Test
+  def testForceClose(): Unit = {
+    val config = createConfig()
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:22")
+    client = AdminClient.create(config)
+    // Because the bootstrap servers are set up incorrectly, this call will not complete, but must be
+    // cancelled by the close operation.
+    val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1)).asJava,
+      new CreateTopicsOptions().timeoutMs(900000)).all()
+    client.close(0, TimeUnit.MILLISECONDS)
+    assertFutureExceptionTypeEquals(future, classOf[TimeoutException])
+  }
+
+  /**
+    * Check that a call with a timeout does not complete before the minimum timeout has elapsed,
+    * even when the default request timeout is shorter.
+    */
+  @Test
+  def testMinimumRequestTimeouts(): Unit = {
+    val config = createConfig()
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:22")
+    config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "0")
+    client = AdminClient.create(config)
+    val startTimeMs = Time.SYSTEM.milliseconds()
+    val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1)).asJava,
+      new CreateTopicsOptions().timeoutMs(2)).all()
+    assertFutureExceptionTypeEquals(future, classOf[TimeoutException])
+    val endTimeMs = Time.SYSTEM.milliseconds()
+    assertTrue("Expected the timeout to take at least one millisecond.", endTimeMs > startTimeMs);
+    client.close()
+  }
+
   override def generateConfigs() = {
     val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
       trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)