You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2022/06/23 14:57:07 UTC

[kafka] branch trunk updated: KAFKA-13987: Isolate REST request timeout changes in Connect integration tests (#12291)

This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d00b7875e0 KAFKA-13987: Isolate REST request timeout changes in Connect integration tests (#12291)
d00b7875e0 is described below

commit d00b7875e0ad7e005b9b54d39f47269c71e95fbe
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Thu Jun 23 10:56:53 2022 -0400

    KAFKA-13987: Isolate REST request timeout changes in Connect integration tests (#12291)
    
    This causes the artificial reductions in the Connect REST request timeout to be more isolated. Specifically, they now only take place in the tests that need them (instead of any tests that happen to be running after the reduction has taken place and before it has been reset), and they are only performed for the requests that are expected to time out, before being immediately reset. This should help reduce spurious test failures (especially in slow environments like Jenkins) for all Co [...]
    
    Reviewers: Bruno Cadonna <ca...@apache.org>
---
 .../org/apache/kafka/connect/runtime/Connect.java  |  9 +----
 .../org/apache/kafka/connect/runtime/Worker.java   |  4 +-
 .../kafka/connect/runtime/rest/RestServer.java     | 26 +++++++++---
 .../runtime/rest/resources/ConnectResource.java    | 40 ++++++++++++++++++
 .../rest/resources/ConnectorPluginsResource.java   | 11 ++++-
 .../runtime/rest/resources/ConnectorsResource.java | 26 +++++-------
 .../runtime/rest/resources/LoggingResource.java    |  7 +++-
 .../runtime/rest/resources/RootResource.java       |  7 +++-
 .../connect/integration/BlockingConnectorTest.java | 47 ++++++++++++++--------
 .../util/clusters/EmbeddedConnectCluster.java      | 10 +++++
 .../kafka/connect/util/clusters/WorkerHandle.java  | 16 ++++++--
 11 files changed, 148 insertions(+), 55 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
index 80eef0369a..e5ab246c0b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
@@ -21,7 +21,6 @@ import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URI;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -89,12 +88,8 @@ public class Connect {
     }
 
     // Visible for testing
-    public URI restUrl() {
-        return rest.serverUrl();
-    }
-
-    public URI adminUrl() {
-        return rest.adminUrl();
+    public RestServer rest() {
+        return rest;
     }
 
     private class ShutdownHook extends Thread {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 01c0389cae..16e48d8f17 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -42,7 +42,7 @@ import org.apache.kafka.connect.json.JsonConverter;
 import org.apache.kafka.connect.json.JsonConverterConfig;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
-import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
+import org.apache.kafka.connect.runtime.rest.resources.ConnectResource;
 import org.apache.kafka.connect.storage.ClusterConfigState;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
@@ -735,7 +735,7 @@ public class Worker {
                             .map(this::taskTransactionalId)
                             .collect(Collectors.toList());
                     FenceProducersOptions fencingOptions = new FenceProducersOptions()
-                            .timeoutMs((int) ConnectorsResource.REQUEST_TIMEOUT_MS);
+                            .timeoutMs((int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
                     return admin.fenceProducers(transactionalIds, fencingOptions).all().whenComplete((ignored, error) -> {
                         if (error != null)
                             log.debug("Finished fencing out {} task producers for source connector {}", numTasks, connName);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index ab18419efc..3c89ddb55f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -28,6 +28,7 @@ import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.health.ConnectClusterDetailsImpl;
 import org.apache.kafka.connect.runtime.health.ConnectClusterStateImpl;
 import org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper;
+import org.apache.kafka.connect.runtime.rest.resources.ConnectResource;
 import org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource;
 import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
 import org.apache.kafka.connect.runtime.rest.resources.LoggingResource;
@@ -60,6 +61,7 @@ import javax.ws.rs.core.UriBuilder;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
@@ -88,6 +90,7 @@ public class RestServer {
     private final ContextHandlerCollection handlers;
     private final Server jettyServer;
 
+    private Collection<ConnectResource> resources;
     private List<ConnectRestExtension> connectRestExtensions = Collections.emptyList();
 
     /**
@@ -210,9 +213,11 @@ public class RestServer {
         ResourceConfig resourceConfig = new ResourceConfig();
         resourceConfig.register(new JacksonJsonProvider());
 
-        resourceConfig.register(new RootResource(herder));
-        resourceConfig.register(new ConnectorsResource(herder, config));
-        resourceConfig.register(new ConnectorPluginsResource(herder));
+        this.resources = new ArrayList<>();
+        resources.add(new RootResource(herder));
+        resources.add(new ConnectorsResource(herder, config));
+        resources.add(new ConnectorPluginsResource(herder));
+        resources.forEach(resourceConfig::register);
 
         resourceConfig.register(ConnectExceptionMapper.class);
         resourceConfig.property(ServerProperties.WADL_FEATURE_DISABLE, true);
@@ -224,14 +229,18 @@ public class RestServer {
         if (adminListeners == null) {
             log.info("Adding admin resources to main listener");
             adminResourceConfig = resourceConfig;
-            adminResourceConfig.register(new LoggingResource());
+            LoggingResource loggingResource = new LoggingResource();
+            this.resources.add(loggingResource);
+            adminResourceConfig.register(loggingResource);
         } else if (adminListeners.size() > 0) {
             // TODO: we need to check if these listeners are same as 'listeners'
             // TODO: the following code assumes that they are different
             log.info("Adding admin resources to admin listener");
             adminResourceConfig = new ResourceConfig();
             adminResourceConfig.register(new JacksonJsonProvider());
-            adminResourceConfig.register(new LoggingResource());
+            LoggingResource loggingResource = new LoggingResource();
+            this.resources.add(loggingResource);
+            adminResourceConfig.register(loggingResource);
             adminResourceConfig.register(ConnectExceptionMapper.class);
         } else {
             log.info("Skipping adding admin resources");
@@ -385,6 +394,11 @@ public class RestServer {
         return builder.build();
     }
 
+    // For testing only
+    public void requestTimeout(long requestTimeoutMs) {
+        this.resources.forEach(resource -> resource.requestTimeout(requestTimeoutMs));
+    }
+
     String determineAdvertisedProtocol() {
         String advertisedSecurityProtocol = config.getString(WorkerConfig.REST_ADVERTISED_LISTENER_CONFIG);
         if (advertisedSecurityProtocol == null) {
@@ -432,7 +446,7 @@ public class RestServer {
             config.getList(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG),
             config, ConnectRestExtension.class);
 
-        long herderRequestTimeoutMs = ConnectorsResource.REQUEST_TIMEOUT_MS;
+        long herderRequestTimeoutMs = ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS;
 
         Integer rebalanceTimeoutMs = config.getRebalanceTimeout();
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectResource.java
new file mode 100644
index 0000000000..49d61a727a
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectResource.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest.resources;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This interface defines shared logic for all Connect REST resources.
+ */
+public interface ConnectResource {
+
+    // TODO: This should not be so long. However, due to potentially long rebalances that may have to wait a full
+    // session timeout to complete, during which we cannot serve some requests. Ideally we could reduce this, but
+    // we need to consider all possible scenarios this could fail. It might be ok to fail with a timeout in rare cases,
+    // but currently a worker simply leaving the group can take this long as well.
+    long DEFAULT_REST_REQUEST_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(90);
+
+    /**
+     * Set how long the resource will await the completion of each request before returning a 500 error.
+     * If the resource does not perform any operations that can be expected to block under reasonable
+     * circumstances, this can be implemented as a no-op.
+     * @param requestTimeoutMs the new timeout in milliseconds; must be positive
+     */
+    void requestTimeout(long requestTimeoutMs);
+
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
index 269d4471a5..05b8375183 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
@@ -61,11 +61,12 @@ import java.util.stream.Collectors;
 @Path("/connector-plugins")
 @Produces(MediaType.APPLICATION_JSON)
 @Consumes(MediaType.APPLICATION_JSON)
-public class ConnectorPluginsResource {
+public class ConnectorPluginsResource implements ConnectResource {
 
     private static final String ALIAS_SUFFIX = "Connector";
     private final Herder herder;
     private final List<PluginInfo> connectorPlugins;
+    private long requestTimeoutMs;
 
     static final List<Class<? extends SinkConnector>> SINK_CONNECTOR_EXCLUDES = Arrays.asList(
             VerifiableSinkConnector.class,
@@ -86,6 +87,7 @@ public class ConnectorPluginsResource {
     public ConnectorPluginsResource(Herder herder) {
         this.herder = herder;
         this.connectorPlugins = new ArrayList<>();
+        this.requestTimeoutMs = DEFAULT_REST_REQUEST_TIMEOUT_MS;
 
         // TODO: improve once plugins are allowed to be added/removed during runtime.
         addConnectorPlugins(herder.plugins().sinkConnectors(), SINK_CONNECTOR_EXCLUDES);
@@ -103,6 +105,11 @@ public class ConnectorPluginsResource {
                 .forEach(connectorPlugins::add);
     }
 
+    @Override
+    public void requestTimeout(long requestTimeoutMs) {
+        this.requestTimeoutMs = requestTimeoutMs;
+    }
+
     @PUT
     @Path("/{pluginName}/config/validate")
     @Operation(summary = "Validate the provided configuration against the configuration definition for the specified pluginName")
@@ -124,7 +131,7 @@ public class ConnectorPluginsResource {
         herder.validateConnectorConfig(connectorConfig, validationCallback, false);
 
         try {
-            return validationCallback.get(ConnectorsResource.REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+            return validationCallback.get(requestTimeoutMs, TimeUnit.MILLISECONDS);
         } catch (TimeoutException e) {
             // This timeout is for the operation itself. None of the timeout error codes are relevant, so internal server
             // error is the best option
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index 91dd4800c8..1d14f506c8 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -76,22 +76,14 @@ import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABL
 @Path("/connectors")
 @Produces(MediaType.APPLICATION_JSON)
 @Consumes(MediaType.APPLICATION_JSON)
-public class ConnectorsResource {
+public class ConnectorsResource implements ConnectResource {
     private static final Logger log = LoggerFactory.getLogger(ConnectorsResource.class);
     private static final TypeReference<List<Map<String, String>>> TASK_CONFIGS_TYPE =
         new TypeReference<List<Map<String, String>>>() { };
 
-    // TODO: This should not be so long. However, due to potentially long rebalances that may have to wait a full
-    // session timeout to complete, during which we cannot serve some requests. Ideally we could reduce this, but
-    // we need to consider all possible scenarios this could fail. It might be ok to fail with a timeout in rare cases,
-    // but currently a worker simply leaving the group can take this long as well.
-    public static final long REQUEST_TIMEOUT_MS = 90 * 1000;
-    // Mutable for integration testing; otherwise, some tests would take at least REQUEST_TIMEOUT_MS
-    // to run
-    private static long requestTimeoutMs = REQUEST_TIMEOUT_MS;
-
     private final Herder herder;
     private final WorkerConfig config;
+    private long requestTimeoutMs;
     @javax.ws.rs.core.Context
     private ServletContext context;
     private final boolean isTopicTrackingDisabled;
@@ -102,15 +94,15 @@ public class ConnectorsResource {
         this.config = config;
         isTopicTrackingDisabled = !config.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
         isTopicTrackingResetDisabled = !config.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG);
+        this.requestTimeoutMs = DEFAULT_REST_REQUEST_TIMEOUT_MS;
     }
 
-    // For testing purposes only
-    public static void setRequestTimeout(long requestTimeoutMs) {
-        ConnectorsResource.requestTimeoutMs = requestTimeoutMs;
-    }
-
-    public static void resetRequestTimeout() {
-        ConnectorsResource.requestTimeoutMs = REQUEST_TIMEOUT_MS;
+    @Override
+    public void requestTimeout(long requestTimeoutMs) {
+        if (requestTimeoutMs < 1) {
+            throw new IllegalArgumentException("REST request timeout must be positive");
+        }
+        this.requestTimeoutMs = requestTimeoutMs;
     }
 
     @GET
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java
index cab9e4a576..008842b572 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java
@@ -46,13 +46,18 @@ import java.util.TreeMap;
 @Path("/admin/loggers")
 @Produces(MediaType.APPLICATION_JSON)
 @Consumes(MediaType.APPLICATION_JSON)
-public class LoggingResource {
+public class LoggingResource implements ConnectResource {
 
     /**
      * Log4j uses "root" (case insensitive) as name of the root logger.
      */
     private static final String ROOT_LOGGER_NAME = "root";
 
+    @Override
+    public void requestTimeout(long requestTimeoutMs) {
+        // No-op
+    }
+
     /**
      * List the current loggers that have their levels explicitly set and their log levels.
      *
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java
index be0c2811d5..fe09e26903 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java
@@ -27,7 +27,7 @@ import javax.ws.rs.core.MediaType;
 
 @Path("/")
 @Produces(MediaType.APPLICATION_JSON)
-public class RootResource {
+public class RootResource implements ConnectResource {
 
     private final Herder herder;
 
@@ -35,6 +35,11 @@ public class RootResource {
         this.herder = herder;
     }
 
+    @Override
+    public void requestTimeout(long requestTimeoutMs) {
+        // No-op
+    }
+
     @GET
     @Path("/")
     @Operation(summary = "Get details about this Connect worker and the id of the Kafka cluster it is connected to")
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
index 74b089892b..ebb604b2a5 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
@@ -27,7 +27,7 @@ import org.apache.kafka.connect.connector.ConnectorContext;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
-import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
+import org.apache.kafka.connect.runtime.rest.resources.ConnectResource;
 import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
@@ -37,15 +37,16 @@ import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
 import org.apache.kafka.connect.source.SourceTaskContext;
 import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
-import org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions;
 import org.apache.kafka.test.IntegrationTest;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.function.ThrowingRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.ws.rs.core.Response;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -62,8 +63,9 @@ import java.util.stream.IntStream;
 import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
 import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
 import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
-import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests situations during which certain connector operations, such as start, validation,
@@ -81,7 +83,7 @@ public class BlockingConnectorTest {
     private static final int NUM_RECORDS_PRODUCED = 100;
     private static final long CONNECTOR_BLOCK_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(60);
     private static final long RECORD_TRANSFER_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(60);
-    private static final long REST_REQUEST_TIMEOUT = Worker.CONNECTOR_GRACEFUL_SHUTDOWN_TIMEOUT_MS * 2;
+    private static final long REDUCED_REST_REQUEST_TIMEOUT = Worker.CONNECTOR_GRACEFUL_SHUTDOWN_TIMEOUT_MS * 2;
 
     private static final String CONNECTOR_INITIALIZE = "Connector::initialize";
     private static final String CONNECTOR_INITIALIZE_WITH_TASK_CONFIGS = "Connector::initializeWithTaskConfigs";
@@ -115,8 +117,6 @@ public class BlockingConnectorTest {
 
     @Before
     public void setup() throws Exception {
-        // Artificially reduce the REST request timeout so that these don't take forever
-        ConnectorsResource.setRequestTimeout(REST_REQUEST_TIMEOUT);
         // build a Connect cluster backed by Kafka and Zk
         connect = new EmbeddedConnectCluster.Builder()
                 .name("connect-cluster")
@@ -129,13 +129,9 @@ public class BlockingConnectorTest {
         // start the clusters
         connect.start();
 
-        // wait for the Connect REST API to become available. necessary because of the reduced REST
-        // request timeout; otherwise, we may get an unexpected 500 with our first real REST request
-        // if the worker is still getting on its feet.
-        waitForCondition(
-            () -> connect.requestGet(connect.endpointForResource("connectors/nonexistent")).getStatus() == 404,
-            EmbeddedConnectClusterAssertions.WORKER_SETUP_DURATION_MS,
-            "Worker did not complete startup in time"
+        connect.assertions().assertAtLeastNumWorkersAreUp(
+                NUM_WORKERS,
+                "Initial group of workers did not start in time"
         );
     }
 
@@ -143,14 +139,13 @@ public class BlockingConnectorTest {
     public void close() {
         // stop all Connect, Kafka and Zk threads.
         connect.stop();
-        ConnectorsResource.resetRequestTimeout();
         Block.resetBlockLatch();
     }
 
     @Test
     public void testBlockInConnectorValidate() throws Exception {
         log.info("Starting test testBlockInConnectorValidate");
-        assertThrows(ConnectRestException.class, () -> createConnectorWithBlock(ValidateBlockingConnector.class, CONNECTOR_VALIDATE));
+        assertRequestTimesOut("create connector that blocks during validation", () -> createConnectorWithBlock(ValidateBlockingConnector.class, CONNECTOR_VALIDATE));
         // Will NOT assert that connector has failed, since the request should fail before it's even created
 
         // Connector should already be blocked so this should return immediately, but check just to
@@ -164,7 +159,7 @@ public class BlockingConnectorTest {
     @Test
     public void testBlockInConnectorConfig() throws Exception {
         log.info("Starting test testBlockInConnectorConfig");
-        assertThrows(ConnectRestException.class, () -> createConnectorWithBlock(ConfigBlockingConnector.class, CONNECTOR_CONFIG));
+        assertRequestTimesOut("create connector that blocks while getting config", () -> createConnectorWithBlock(ConfigBlockingConnector.class, CONNECTOR_CONFIG));
         // Will NOT assert that connector has failed, since the request should fail before it's even created
 
         // Connector should already be blocked so this should return immediately, but check just to
@@ -334,6 +329,26 @@ public class BlockingConnectorTest {
         normalConnectorHandle.awaitCommits(RECORD_TRANSFER_TIMEOUT_MS);
     }
 
+    private void assertRequestTimesOut(String requestDescription, ThrowingRunnable request) {
+        // Artificially reduce the REST request timeout so that these don't take 90 seconds
+        connect.requestTimeout(REDUCED_REST_REQUEST_TIMEOUT);
+        ConnectRestException exception = assertThrows(
+                "Should have failed to " + requestDescription,
+                ConnectRestException.class, request
+        );
+        assertEquals(
+                "Should have gotten 500 error from trying to " + requestDescription,
+                Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), exception.statusCode()
+        );
+        assertTrue(
+                "Should have gotten timeout message from trying to " + requestDescription
+                        + "; instead, message was: " + exception.getMessage(),
+                exception.getMessage().contains("Request timed out")
+        );
+        // Reset the REST request timeout so that other requests aren't impacted
+        connect.requestTimeout(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+    }
+
     private static class Block {
         private static CountDownLatch blockLatch;
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
index adcde378bb..ccbf2c495d 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
@@ -219,6 +219,16 @@ public class EmbeddedConnectCluster {
         }
     }
 
+    /**
+     * Set a new timeout for REST requests to each worker in the cluster. Useful if a request
+     * is expected to block, since the time spent awaiting that request can be reduced
+     * and test runtime bloat can be avoided.
+     * @param requestTimeoutMs the new timeout in milliseconds; must be positive
+     */
+    public void requestTimeout(long requestTimeoutMs) {
+        connectCluster.forEach(worker -> worker.requestTimeout(requestTimeoutMs));
+    }
+
     /**
      * Determine whether the Connect cluster has any workers running.
      *
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/WorkerHandle.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/WorkerHandle.java
index 4d947940c5..936363b496 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/WorkerHandle.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/WorkerHandle.java
@@ -81,7 +81,7 @@ public class WorkerHandle {
      * @return the worker's url
      */
     public URI url() {
-        return worker.restUrl();
+        return worker.rest().serverUrl();
     }
 
     /**
@@ -90,14 +90,24 @@ public class WorkerHandle {
      * @return the worker's admin url
      */
     public URI adminUrl() {
-        return worker.adminUrl();
+        return worker.rest().adminUrl();
+    }
+
+    /**
+     * Set a new timeout for REST requests to the worker. Useful if a request is expected
+     * to block, since the time spent awaiting that request can be reduced and test runtime
+     * bloat can be avoided.
+     * @param requestTimeoutMs the new timeout in milliseconds; must be positive
+     */
+    public void requestTimeout(long requestTimeoutMs) {
+        worker.rest().requestTimeout(requestTimeoutMs);
     }
 
     @Override
     public String toString() {
         return "WorkerHandle{" +
                 "workerName='" + workerName + '\'' +
-                "workerURL='" + worker.restUrl() + '\'' +
+                "workerURL='" + worker.rest().serverUrl() + '\'' +
                 '}';
     }