You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2023/02/08 19:59:48 UTC

[kafka] branch 3.4 updated: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic (#12984)

This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
     new e35ade9fb9d KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic (#12984)
e35ade9fb9d is described below

commit e35ade9fb9d34f75b102ea2371db4d137866d464
Author: Yash Mayya <ya...@gmail.com>
AuthorDate: Thu Feb 2 21:33:38 2023 +0530

    KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic (#12984)
    
    Reviewers: Greg Harris <gr...@aiven.io>, Chris Egerton <ch...@aiven.io>
---
 .../java/org/apache/kafka/common/utils/Timer.java  |  10 +-
 .../runtime/distributed/DistributedHerder.java     |   1 -
 .../connect/storage/KafkaConfigBackingStore.java   | 165 ++++++++++++++++-----
 .../apache/kafka/connect/util/KafkaBasedLog.java   |  26 +++-
 .../kafka/connect/runtime/AbstractHerderTest.java  |  26 ++++
 .../runtime/distributed/DistributedHerderTest.java |  53 +++++++
 .../storage/KafkaConfigBackingStoreTest.java       | 127 ++++++++++++++--
 .../util/clusters/EmbeddedConnectCluster.java      |   2 +-
 8 files changed, 353 insertions(+), 57 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Timer.java b/clients/src/main/java/org/apache/kafka/common/utils/Timer.java
index 98b09a38d36..d60eea25710 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Timer.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Timer.java
@@ -26,7 +26,7 @@ package org.apache.kafka.common.utils;
  * This class also ensures monotonic updates to the timer even if the underlying clock is subject
  * to non-monotonic behavior. For example, the remaining time returned by {@link #remainingMs()} is
  * guaranteed to decrease monotonically until it hits zero.
- *
+ * <p>
  * Note that it is up to the caller to ensure progress of the timer using one of the
  * {@link #update()} methods or {@link #sleep(long)}. The timer will cache the current time and
  * return it indefinitely until the timer has been updated. This allows the caller to limit
@@ -34,14 +34,14 @@ package org.apache.kafka.common.utils;
  * waiting a request sent through the {@link org.apache.kafka.clients.NetworkClient} should call
  * {@link #update()} following each blocking call to
  * {@link org.apache.kafka.clients.NetworkClient#poll(long, long)}.
- *
+ * <p>
  * A typical usage might look something like this:
  *
  * <pre>
  *     Time time = Time.SYSTEM;
  *     Timer timer = time.timer(500);
  *
- *     while (!conditionSatisfied() && timer.notExpired) {
+ *     while (!conditionSatisfied() && timer.notExpired()) {
  *         client.poll(timer.remainingMs(), timer.currentTimeMs());
  *         timer.update();
  *     }
@@ -137,7 +137,7 @@ public class Timer {
     /**
      * Update the cached current time to a specific value. In some contexts, the caller may already
      * have an accurate time, so this avoids unnecessary calls to system time.
-     *
+     * <p>
      * Note that if the updated current time is smaller than the cached time, then the update
      * is ignored.
      *
@@ -161,7 +161,7 @@ public class Timer {
     /**
      * Get the current time in milliseconds. This will return the same cached value until the timer
      * has been updated using one of the {@link #update()} methods or {@link #sleep(long)} is used.
-     *
+     * <p>
      * Note that the value returned is guaranteed to increase monotonically even if the underlying
      * {@link Time} implementation goes backwards. Effectively, the timer will just wait for the
      * time to catch up.
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 0f45be163cc..d2b7f75f0fd 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -1063,7 +1063,6 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
                             // Note that we use the updated connector config despite the fact that we don't have an updated
                             // snapshot yet. The existing task info should still be accurate.
                             ConnectorInfo info = new ConnectorInfo(connName, config, configState.tasks(connName),
-                                // validateConnectorConfig have checked the existence of CONNECTOR_CLASS_CONFIG
                                 connectorType(config));
                             callback.onCompletion(null, new Created<>(!exists, info));
                             return null;
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index bceb73e1715..e9f87697308 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.TopicConfig;
@@ -33,6 +34,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
@@ -40,11 +42,13 @@ import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.DataException;
+import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.RestartRequest;
 import org.apache.kafka.connect.runtime.SessionKey;
 import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
+import org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectUtils;
@@ -70,6 +74,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Supplier;
@@ -79,26 +84,45 @@ import java.util.function.Supplier;
  * Provides persistent storage of Kafka Connect connector configurations in a Kafka topic.
  * </p>
  * <p>
- * This class manages both connector and task configurations. It tracks three types of configuration entries:
+ * This class manages both connector and task configurations, among other various configurations. It tracks seven types
+ * of records:
  * <p/>
- * 1. Connector config: map of string -> string configurations passed to the Connector class, with support for
+ * <ol>
+ * <li> Connector config: map of string -> string configurations passed to the Connector class, with support for
  * expanding this format if necessary. (Kafka key: connector-[connector-id]).
  * These configs are *not* ephemeral. They represent the source of truth. If the entire Connect
- * cluster goes down, this is all that is really needed to recover.
- * 2. Task configs: map of string -> string configurations passed to the Task class, with support for expanding
+ * cluster goes down, this is all that is really needed to recover. </li>
+ * <li> Task configs: map of string -> string configurations passed to the Task class, with support for expanding
  * this format if necessary. (Kafka key: task-[connector-id]-[task-id]).
- * These configs are ephemeral; they are stored here to a) disseminate them to all workers while
- * ensuring agreement and b) to allow faster cluster/worker recovery since the common case
- * of recovery (restoring a connector) will simply result in the same configuration as before
- * the failure.
- * 3. Task commit "configs": records indicating that previous task config entries should be committed and all task
+ * These configs are ephemeral; they are stored here to
+ * <ul>
+ * <li> disseminate them to all workers while ensuring agreement </li>
+ * <li> to allow faster cluster/worker recovery since the common case of recovery (restoring a connector) will simply
+ * result in the same task configuration as before the failure. </li>
+ * </ul>
+ * </li>
+ * <li> Task commit "configs": records indicating that previous task config entries should be committed and all task
  * configs for a connector can be applied. (Kafka key: commit-[connector-id].
  * This config has two effects. First, it records the number of tasks the connector is currently
  * running (and can therefore increase/decrease parallelism). Second, because each task config
  * is stored separately but they need to be applied together to ensure each partition is assigned
  * to a single task, this record also indicates that task configs for the specified connector
- * can be "applied" or "committed".
- * </p>
+ * can be "applied" or "committed". </li>
+ * <li> Connector target states: records indicating the {@link TargetState} for a connector </li>
+ * <li> {@link RestartRequest Restart requests}: records representing requests to restart a connector and / or its
+ * tasks. See <a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308623">KIP-745</a> for more
+ * details.</li>
+ * <li> Task count records: an integer value that that tracks the number of task producers (for source connectors) that
+ * will have to be fenced out if a connector is reconfigured before bringing up any tasks with the new set of task
+ * configurations. This is required for exactly-once support for source connectors, see
+ * <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors">KIP-618</a>
+ * for more details.</li>
+ * <li> Session key records: A {@link SessionKey} generated by the leader of the cluster when
+ * {@link ConnectProtocolCompatibility#SESSIONED} is the Connect protocol being used by the cluster. This session key
+ * is used to verify internal REST requests between workers in the cluster. See
+ * <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-507%3A+Securing+Internal+Connect+REST+Endpoints">KIP-507</a>
+ * for more details.</li>
+ * </ol>
  * <p>
  * This configuration is expected to be stored in a *single partition* and *compacted* topic. Using a single partition
  * ensures we can enforce ordering on messages, allowing Kafka to be used as a write ahead log. Compaction allows
@@ -148,7 +172,7 @@ import java.util.function.Supplier;
  * Because we can encounter these inconsistencies and addressing them requires support from the rest of the system
  * (resolving the task configuration inconsistencies requires support from the connector instance to regenerate updated
  * configs), this class exposes not only the current set of configs, but also which connectors have inconsistent data.
- * This allows users of this class (i.e., Herder implementations) to take action to resolve any inconsistencies. These
+ * This allows users of this class (i.e., {@link Herder} implementations) to take action to resolve any inconsistencies. These
  * inconsistencies should be rare (as described above, due to compaction combined with leader failures in the middle
  * of updating task configurations).
  * </p>
@@ -241,7 +265,8 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
             .field(ONLY_FAILED_FIELD_NAME, Schema.BOOLEAN_SCHEMA)
             .build();
 
-    private static final long READ_TO_END_TIMEOUT_MS = 30000;
+    // Visible for testing
+    static final long READ_WRITE_TOTAL_TIMEOUT_MS = 30000;
 
     private final Object lock;
     private final Converter converter;
@@ -289,6 +314,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
     private final boolean usesFencableWriter;
     private volatile Producer<String, byte[]> fencableProducer;
     private final Map<String, Object> fencableProducerProps;
+    private final Time time;
 
     @Deprecated
     public KafkaConfigBackingStore(Converter converter, DistributedConfig config, WorkerConfigTransformer configTransformer) {
@@ -296,6 +322,10 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
     }
 
     public KafkaConfigBackingStore(Converter converter, DistributedConfig config, WorkerConfigTransformer configTransformer, Supplier<TopicAdmin> adminSupplier, String clientIdBase) {
+        this(converter, config, configTransformer, adminSupplier, clientIdBase, Time.SYSTEM);
+    }
+
+    KafkaConfigBackingStore(Converter converter, DistributedConfig config, WorkerConfigTransformer configTransformer, Supplier<TopicAdmin> adminSupplier, String clientIdBase, Time time) {
         this.lock = new Object();
         this.started = false;
         this.converter = converter;
@@ -320,6 +350,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
 
         configLog = setupAndCreateKafkaBasedLog(this.topic, config);
         this.configTransformer = configTransformer;
+        this.time = time;
     }
 
     @Override
@@ -469,8 +500,9 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
         connectConfig.put("properties", properties);
         byte[] serializedConfig = converter.fromConnectData(topic, CONNECTOR_CONFIGURATION_V0, connectConfig);
         try {
-            sendPrivileged(CONNECTOR_KEY(connector), serializedConfig);
-            configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+            Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
+            sendPrivileged(CONNECTOR_KEY(connector), serializedConfig, timer);
+            configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException e) {
             log.error("Failed to write connector configuration to Kafka: ", e);
             throw new ConnectException("Error writing connector configuration to Kafka", e);
@@ -490,9 +522,14 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
     public void removeConnectorConfig(String connector) {
         log.debug("Removing connector configuration for connector '{}'", connector);
         try {
-            sendPrivileged(CONNECTOR_KEY(connector), null);
-            sendPrivileged(TARGET_STATE_KEY(connector), null);
-            configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+            Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
+            List<ProducerKeyValue> keyValues = Arrays.asList(
+                    new ProducerKeyValue(CONNECTOR_KEY(connector), null),
+                    new ProducerKeyValue(TARGET_STATE_KEY(connector), null)
+            );
+            sendPrivileged(keyValues, timer);
+
+            configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException e) {
             log.error("Failed to remove connector configuration from Kafka: ", e);
             throw new ConnectException("Error removing connector configuration from Kafka", e);
@@ -521,10 +558,12 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
      */
     @Override
     public void putTaskConfigs(String connector, List<Map<String, String>> configs) {
+        Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
         // Make sure we're at the end of the log. We should be the only writer, but we want to make sure we don't have
         // any outstanding lagging data to consume.
         try {
-            configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+            configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
+            timer.update();
         } catch (InterruptedException | ExecutionException | TimeoutException e) {
             log.error("Failed to write root configuration to Kafka: ", e);
             throw new ConnectException("Error writing root configuration to Kafka", e);
@@ -532,34 +571,44 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
 
         int taskCount = configs.size();
 
-        // Start sending all the individual updates
+        // Send all the individual updates
         int index = 0;
+        List<ProducerKeyValue> keyValues = new ArrayList<>();
         for (Map<String, String> taskConfig: configs) {
             Struct connectConfig = new Struct(TASK_CONFIGURATION_V0);
             connectConfig.put("properties", taskConfig);
             byte[] serializedConfig = converter.fromConnectData(topic, TASK_CONFIGURATION_V0, connectConfig);
             log.debug("Writing configuration for connector '{}' task {}", connector, index);
             ConnectorTaskId connectorTaskId = new ConnectorTaskId(connector, index);
-            sendPrivileged(TASK_KEY(connectorTaskId), serializedConfig);
+            keyValues.add(new ProducerKeyValue(TASK_KEY(connectorTaskId), serializedConfig));
             index++;
         }
 
+        try {
+            sendPrivileged(keyValues, timer);
+        } catch (ExecutionException | InterruptedException | TimeoutException e) {
+            log.error("Failed to write task configurations to Kafka", e);
+            throw new ConnectException("Error writing task configurations to Kafka", e);
+        }
+
         // Finally, send the commit to update the number of tasks and apply the new configs, then wait until we read to
         // the end of the log
         try {
             // Read to end to ensure all the task configs have been written
             if (taskCount > 0) {
-                configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+                configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
+                timer.update();
             }
             // Write the commit message
             Struct connectConfig = new Struct(CONNECTOR_TASKS_COMMIT_V0);
             connectConfig.put("tasks", taskCount);
             byte[] serializedConfig = converter.fromConnectData(topic, CONNECTOR_TASKS_COMMIT_V0, connectConfig);
             log.debug("Writing commit for connector '{}' with {} tasks.", connector, taskCount);
-            sendPrivileged(COMMIT_TASKS_KEY(connector), serializedConfig);
+
+            sendPrivileged(COMMIT_TASKS_KEY(connector), serializedConfig, timer);
 
             // Read to end to ensure all the commit messages have been written
-            configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+            configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException e) {
             log.error("Failed to write root configuration to Kafka: ", e);
             throw new ConnectException("Error writing root configuration to Kafka", e);
@@ -587,7 +636,12 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
         connectTargetState.put("state", state.name());
         byte[] serializedTargetState = converter.fromConnectData(topic, TARGET_STATE_V0, connectTargetState);
         log.debug("Writing target state {} for connector {}", state, connector);
-        configLog.send(TARGET_STATE_KEY(connector), serializedTargetState);
+        try {
+            configLog.send(TARGET_STATE_KEY(connector), serializedTargetState).get(READ_WRITE_TOTAL_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            log.error("Failed to write target state to Kafka", e);
+            throw new ConnectException("Error writing target state to Kafka", e);
+        }
     }
 
     /**
@@ -608,8 +662,9 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
         byte[] serializedTaskCountRecord = converter.fromConnectData(topic, TASK_COUNT_RECORD_V0, taskCountRecord);
         log.debug("Writing task count record {} for connector {}", taskCount, connector);
         try {
-            sendPrivileged(TASK_COUNT_RECORD_KEY(connector), serializedTaskCountRecord);
-            configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+            Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
+            sendPrivileged(TASK_COUNT_RECORD_KEY(connector), serializedTaskCountRecord, timer);
+            configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException e) {
             log.error("Failed to write task count record with {} tasks for connector {} to Kafka: ", taskCount, connector, e);
             throw new ConnectException("Error writing task count record to Kafka", e);
@@ -635,8 +690,9 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
         sessionKeyStruct.put("creation-timestamp", sessionKey.creationTimestamp());
         byte[] serializedSessionKey = converter.fromConnectData(topic, SESSION_KEY_V0, sessionKeyStruct);
         try {
-            sendPrivileged(SESSION_KEY_KEY, serializedSessionKey);
-            configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+            Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
+            sendPrivileged(SESSION_KEY_KEY, serializedSessionKey, timer);
+            configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException e) {
             log.error("Failed to write session key to Kafka: ", e);
             throw new ConnectException("Error writing session key to Kafka", e);
@@ -658,8 +714,9 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
         value.put(ONLY_FAILED_FIELD_NAME, restartRequest.onlyFailed());
         byte[] serializedValue = converter.fromConnectData(topic, value.schema(), value);
         try {
-            sendPrivileged(key, serializedValue);
-            configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+            Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
+            sendPrivileged(key, serializedValue, timer);
+            configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException e) {
             log.error("Failed to write {} to Kafka: ", restartRequest, e);
             throw new ConnectException("Error writing " + restartRequest + " to Kafka", e);
@@ -711,9 +768,36 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
         return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminSupplier);
     }
 
-    private void sendPrivileged(String key, byte[] value) {
+    /**
+     * Send a single record to the config topic synchronously. Note that {@link #claimWritePrivileges()} must be
+     * successfully invoked before calling this method if this store is configured to use a fencable writer.
+     * @param key the record key
+     * @param value the record value
+     * @param timer Timer bounding how long this method can block. The timer is updated before the method returns.
+     */
+    private void sendPrivileged(String key, byte[] value, Timer timer) throws ExecutionException, InterruptedException, TimeoutException {
+        sendPrivileged(Collections.singletonList(new ProducerKeyValue(key, value)), timer);
+    }
+
+    /**
+     * Send one or more records to the config topic synchronously. Note that {@link #claimWritePrivileges()} must be
+     * successfully invoked before calling this method if this store is configured to use a fencable writer.
+     * @param keyValues the list of producer record key/value pairs
+     * @param timer Timer bounding how long this method can block. The timer is updated before the method returns.
+     */
+    private void sendPrivileged(List<ProducerKeyValue> keyValues, Timer timer) throws ExecutionException, InterruptedException, TimeoutException {
         if (!usesFencableWriter) {
-            configLog.send(key, value);
+            List<Future<RecordMetadata>> producerFutures = new ArrayList<>();
+            keyValues.forEach(
+                    keyValue -> producerFutures.add(configLog.send(keyValue.key, keyValue.value))
+            );
+
+            timer.update();
+            for (Future<RecordMetadata> future : producerFutures) {
+                future.get(timer.remainingMs(), TimeUnit.MILLISECONDS);
+                timer.update();
+            }
+
             return;
         }
 
@@ -723,8 +807,11 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
 
         try {
             fencableProducer.beginTransaction();
-            fencableProducer.send(new ProducerRecord<>(topic, key, value));
+            keyValues.forEach(
+                    keyValue -> fencableProducer.send(new ProducerRecord<>(topic, keyValue.key, keyValue.value))
+            );
             fencableProducer.commitTransaction();
+            timer.update();
         } catch (Exception e) {
             log.warn("Failed to perform fencable send to config topic", e);
             relinquishWritePrivileges();
@@ -732,6 +819,16 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
         }
     }
 
+    private static class ProducerKeyValue {
+        final String key;
+        final byte[] value;
+
+        ProducerKeyValue(String key, byte[] value) {
+            this.key = key;
+            this.value = value;
+        }
+    }
+
     private void relinquishWritePrivileges() {
         if (fencableProducer != null) {
             Utils.closeQuietly(() -> fencableProducer.close(Duration.ZERO), "fencable producer for config topic");
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index cde63b3f833..899b42dd877 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -25,6 +25,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.PartitionInfo;
@@ -342,12 +343,29 @@ public class KafkaBasedLog<K, V> {
         return future;
     }
 
-    public void send(K key, V value) {
-        send(key, value, null);
+    /**
+     * Send a record asynchronously to the configured {@link #topic} without using a producer callback.
+     * @param key the key for the {@link ProducerRecord}
+     * @param value the value for the {@link ProducerRecord}
+     *
+     * @return the future from the call to {@link Producer#send}. {@link Future#get} can be called on this returned
+     *         future if synchronous behavior is desired.
+     */
+    public Future<RecordMetadata> send(K key, V value) {
+        return send(key, value, null);
     }
 
-    public void send(K key, V value, org.apache.kafka.clients.producer.Callback callback) {
-        producer.orElseThrow(() ->
+    /**
+     * Send a record asynchronously to the configured {@link #topic}
+     * @param key the key for the {@link ProducerRecord}
+     * @param value the value for the {@link ProducerRecord}
+     * @param callback the callback to invoke after completion; can be null if no callback is desired
+     *
+     * @return the future from the call to {@link Producer#send}. {@link Future#get} can be called on this returned
+     *         future if synchronous behavior is desired.
+     */
+    public Future<RecordMetadata> send(K key, V value, org.apache.kafka.clients.producer.Callback callback) {
+        return producer.orElseThrow(() ->
                 new IllegalStateException("This KafkaBasedLog was created in read-only mode and does not support write operations")
         ).send(new ProducerRecord<>(topic, key, value), callback);
     }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index eddabba8012..a50d4dfe898 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -248,6 +248,32 @@ public class AbstractHerderTest {
         assertEquals(ConnectorType.SOURCE, info.type());
     }
 
+    @Test
+    public void testPauseConnector() {
+        AbstractHerder herder = mock(AbstractHerder.class, withSettings()
+                .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
+                .defaultAnswer(CALLS_REAL_METHODS));
+
+        when(configStore.contains(CONN1)).thenReturn(true);
+
+        herder.pauseConnector(CONN1);
+
+        verify(configStore).putTargetState(CONN1, TargetState.PAUSED);
+    }
+
+    @Test
+    public void testResumeConnector() {
+        AbstractHerder herder = mock(AbstractHerder.class, withSettings()
+                .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
+                .defaultAnswer(CALLS_REAL_METHODS));
+
+        when(configStore.contains(CONN1)).thenReturn(true);
+
+        herder.resumeConnector(CONN1);
+
+        verify(configStore).putTargetState(CONN1, TargetState.STARTED);
+    }
+
     @Test
     public void testConnectorInfoMissingPlugin() {
         AbstractHerder herder = mock(AbstractHerder.class, withSettings()
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 1d79d59aa33..ec45e37c58b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -739,6 +739,58 @@ public class DistributedHerderTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testCreateConnectorConfigBackingStoreError() {
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+
+        member.wakeup();
+        PowerMock.expectLastCall();
+
+        // mock the actual validation since its asynchronous nature is difficult to test and should
+        // be covered sufficiently by the unit tests for the AbstractHerder class
+        Capture<Callback<ConfigInfos>> validateCallback = newCapture();
+        herder.validateConnectorConfig(EasyMock.eq(CONN2_CONFIG), capture(validateCallback));
+        PowerMock.expectLastCall().andAnswer(() -> {
+            validateCallback.getValue().onCompletion(null, CONN2_CONFIG_INFOS);
+            return null;
+        });
+
+        configBackingStore.putConnectorConfig(CONN2, CONN2_CONFIG);
+        PowerMock.expectLastCall().andThrow(new ConnectException("Error writing connector configuration to Kafka"));
+
+        // verify that the exception from config backing store write is propagated via the callback
+        putConnectorCallback.onCompletion(EasyMock.anyObject(ConnectException.class), EasyMock.isNull());
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+        // These will occur just before/during the second tick
+        member.wakeup();
+        PowerMock.expectLastCall();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.putConnectorConfig(CONN2, CONN2_CONFIG, false, putConnectorCallback);
+        // First tick runs the initial herder request, which issues an asynchronous request for
+        // connector validation
+        herder.tick();
+
+        // Once that validation is complete, another request is added to the herder request queue
+        // for actually performing the config write; this tick is for that request
+        herder.tick();
+
+        time.sleep(1000L);
+        assertStatistics(3, 1, 100, 1000L);
+
+        PowerMock.verifyAll();
+    }
+
     @Test
     public void testCreateConnectorFailedValidation() throws Exception {
         EasyMock.expect(member.memberId()).andStubReturn("leader");
@@ -2596,6 +2648,7 @@ public class DistributedHerderTest {
         });
         member.wakeup();
         PowerMock.expectLastCall();
+
         configBackingStore.putConnectorConfig(CONN1, CONN1_CONFIG_UPDATED);
         PowerMock.expectLastCall().andAnswer(() -> {
             // Simulate response to writing config + waiting until end of log to be read
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index 2197d7f7a09..8fb3bf30165 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -22,15 +22,19 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.RestartRequest;
 import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.runtime.WorkerConfig;
@@ -62,6 +66,8 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 
@@ -72,6 +78,7 @@ import static org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_
 import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.GROUP_ID_CONFIG;
 import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.INCLUDE_TASKS_FIELD_NAME;
 import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.ONLY_FAILED_FIELD_NAME;
+import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.READ_WRITE_TOTAL_TIMEOUT_MS;
 import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.RESTART_KEY;
 import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG;
 import static org.junit.Assert.assertEquals;
@@ -170,14 +177,17 @@ public class KafkaConfigBackingStoreTest {
     KafkaBasedLog<String, byte[]> storeLog;
     @Mock
     Producer<String, byte[]> fencableProducer;
+    @Mock
+    Future<RecordMetadata> producerFuture;
     private KafkaConfigBackingStore configStorage;
 
-    private Capture<String> capturedTopic = EasyMock.newCapture();
-    private Capture<Map<String, Object>> capturedProducerProps = EasyMock.newCapture();
-    private Capture<Map<String, Object>> capturedConsumerProps = EasyMock.newCapture();
-    private Capture<Supplier<TopicAdmin>> capturedAdminSupplier = EasyMock.newCapture();
-    private Capture<NewTopic> capturedNewTopic = EasyMock.newCapture();
-    private Capture<Callback<ConsumerRecord<String, byte[]>>> capturedConsumedCallback = EasyMock.newCapture();
+    private final Capture<String> capturedTopic = EasyMock.newCapture();
+    private final Capture<Map<String, Object>> capturedProducerProps = EasyMock.newCapture();
+    private final Capture<Map<String, Object>> capturedConsumerProps = EasyMock.newCapture();
+    private final Capture<Supplier<TopicAdmin>> capturedAdminSupplier = EasyMock.newCapture();
+    private final Capture<NewTopic> capturedNewTopic = EasyMock.newCapture();
+    private final Capture<Callback<ConsumerRecord<String, byte[]>>> capturedConsumedCallback = EasyMock.newCapture();
+    private final MockTime time = new MockTime();
 
     private long logOffset = 0;
 
@@ -193,7 +203,7 @@ public class KafkaConfigBackingStoreTest {
         configStorage = PowerMock.createPartialMock(
                 KafkaConfigBackingStore.class,
                 new String[]{"createKafkaBasedLog", "createFencableProducer"},
-                converter, config, null, null, CLIENT_ID_BASE);
+                converter, config, null, null, CLIENT_ID_BASE, time);
         Whitebox.setInternalState(configStorage, "configLog", storeLog);
         configStorage.setUpdateListener(configUpdateListener);
         // The mock must be reset and re-mocked for the remainder of the test.
@@ -326,6 +336,92 @@ public class KafkaConfigBackingStoreTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testPutConnectorConfigProducerError() throws Exception {
+        expectConfigure();
+        expectStart(Collections.emptyList(), Collections.emptyMap());
+        expectPartitionCount(1);
+
+        expectConvert(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONNECTOR_CONFIG_STRUCTS.get(0), CONFIGS_SERIALIZED.get(0));
+
+        storeLog.send(EasyMock.anyObject(), EasyMock.anyObject());
+        EasyMock.expectLastCall().andReturn(producerFuture);
+
+        producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject());
+        EasyMock.expectLastCall().andThrow(new ExecutionException(new TopicAuthorizationException(Collections.singleton("test"))));
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+        configStorage.start();
+
+        // Verify initial state
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(-1, configState.offset());
+        assertEquals(0, configState.connectors().size());
+
+        // verify that the producer exception from KafkaBasedLog::send is propagated
+        ConnectException e = assertThrows(ConnectException.class, () -> configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0)));
+        assertTrue(e.getMessage().contains("Error writing connector configuration to Kafka"));
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testRemoveConnectorConfigSlowProducer() throws Exception {
+        expectConfigure();
+        expectStart(Collections.emptyList(), Collections.emptyMap());
+        expectPartitionCount(1);
+
+        @SuppressWarnings("unchecked")
+        Future<RecordMetadata> connectorConfigProducerFuture = PowerMock.createMock(Future.class);
+        // tombstone for the connector config
+        storeLog.send(EasyMock.anyObject(), EasyMock.isNull());
+        EasyMock.expectLastCall().andReturn(connectorConfigProducerFuture);
+
+        @SuppressWarnings("unchecked")
+        Future<RecordMetadata> targetStateProducerFuture = PowerMock.createMock(Future.class);
+        // tombstone for the connector target state
+        storeLog.send(EasyMock.anyObject(), EasyMock.isNull());
+        EasyMock.expectLastCall().andReturn(targetStateProducerFuture);
+
+        connectorConfigProducerFuture.get(EasyMock.eq(READ_WRITE_TOTAL_TIMEOUT_MS), EasyMock.anyObject());
+        EasyMock.expectLastCall().andAnswer(() -> {
+            time.sleep(READ_WRITE_TOTAL_TIMEOUT_MS - 1000);
+            return null;
+        });
+
+        // the future get timeout is expected to be reduced according to how long the previous Future::get took
+        targetStateProducerFuture.get(EasyMock.eq(1000L), EasyMock.anyObject());
+        EasyMock.expectLastCall().andAnswer(() -> {
+            time.sleep(1000);
+            return null;
+        });
+
+        @SuppressWarnings("unchecked")
+        Future<Void> future = PowerMock.createMock(Future.class);
+        EasyMock.expect(storeLog.readToEnd()).andAnswer(() -> future);
+
+        // the Future::get calls on the previous two producer futures exhausted the overall timeout; so expect the
+        // timeout on the log read future to be 0
+        EasyMock.expect(future.get(EasyMock.eq(0L), EasyMock.anyObject())).andReturn(null);
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+        configStorage.start();
+
+        configStorage.removeConnectorConfig("test-connector");
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
     @Test
     public void testWritePrivileges() throws Exception {
         // With exactly.once.source.support = preparing (or also, "enabled"), we need to use a transactional producer
@@ -365,7 +461,9 @@ public class KafkaConfigBackingStoreTest {
         // In the meantime, write a target state (which doesn't require write privileges)
         expectConvert(KafkaConfigBackingStore.TARGET_STATE_V0, TARGET_STATE_PAUSED, CONFIGS_SERIALIZED.get(1));
         storeLog.send("target-state-" + CONNECTOR_IDS.get(1), CONFIGS_SERIALIZED.get(1));
-        PowerMock.expectLastCall();
+        EasyMock.expectLastCall().andReturn(producerFuture);
+        producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject());
+        EasyMock.expectLastCall().andReturn(null);
 
         // Reclaim write privileges
         expectFencableProducer();
@@ -1497,13 +1595,18 @@ public class KafkaConfigBackingStoreTest {
     // from the log. Validate the data that is captured when the conversion is performed matches the specified data
     // (by checking a single field's value)
     private void expectConvertWriteRead(final String configKey, final Schema valueSchema, final byte[] serialized,
-                                        final String dataFieldName, final Object dataFieldValue) {
+                                        final String dataFieldName, final Object dataFieldValue) throws Exception {
         final Capture<Struct> capturedRecord = EasyMock.newCapture();
         if (serialized != null)
             EasyMock.expect(converter.fromConnectData(EasyMock.eq(TOPIC), EasyMock.eq(valueSchema), EasyMock.capture(capturedRecord)))
                     .andReturn(serialized);
+
         storeLog.send(EasyMock.eq(configKey), EasyMock.aryEq(serialized));
-        PowerMock.expectLastCall();
+        EasyMock.expectLastCall().andReturn(producerFuture);
+
+        producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject());
+        EasyMock.expectLastCall().andReturn(null);
+
         EasyMock.expect(converter.toConnectData(EasyMock.eq(TOPIC), EasyMock.aryEq(serialized)))
                 .andAnswer(() -> {
                     if (dataFieldName != null)
@@ -1536,7 +1639,7 @@ public class KafkaConfigBackingStoreTest {
                 });
     }
 
-    private void expectConnectorRemoval(String configKey, String targetStateKey) {
+    private void expectConnectorRemoval(String configKey, String targetStateKey) throws Exception {
         expectConvertWriteRead(configKey, KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, null, null, null);
         expectConvertWriteRead(targetStateKey, KafkaConfigBackingStore.TARGET_STATE_V0, null, null, null);
 
@@ -1547,7 +1650,7 @@ public class KafkaConfigBackingStoreTest {
     }
 
     private void expectConvertWriteAndRead(final String configKey, final Schema valueSchema, final byte[] serialized,
-                                           final String dataFieldName, final Object dataFieldValue) {
+                                           final String dataFieldName, final Object dataFieldValue) throws Exception {
         expectConvertWriteRead(configKey, valueSchema, serialized, dataFieldName, dataFieldValue);
         LinkedHashMap<String, byte[]> recordsToRead = new LinkedHashMap<>();
         recordsToRead.put(configKey, serialized);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
index 652f4ff4dd7..9b91b55e92f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
@@ -261,7 +261,7 @@ public class EmbeddedConnectCluster {
         putIfAbsent(workerProps, OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG, internalTopicsReplFactor);
         putIfAbsent(workerProps, CONFIG_TOPIC_CONFIG, "connect-config-topic-" + connectClusterName);
         putIfAbsent(workerProps, CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG, internalTopicsReplFactor);
-        putIfAbsent(workerProps, STATUS_STORAGE_TOPIC_CONFIG, "connect-storage-topic-" + connectClusterName);
+        putIfAbsent(workerProps, STATUS_STORAGE_TOPIC_CONFIG, "connect-status-topic-" + connectClusterName);
         putIfAbsent(workerProps, STATUS_STORAGE_REPLICATION_FACTOR_CONFIG, internalTopicsReplFactor);
         putIfAbsent(workerProps, KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
         putIfAbsent(workerProps, VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");