You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2023/02/08 20:20:32 UTC

[kafka] branch 3.3 updated: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic (#12984)

This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 2aeceff25a4 KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic (#12984)
2aeceff25a4 is described below

commit 2aeceff25a4a6a0581adf779b77cd94838b4e224
Author: Yash Mayya <ya...@gmail.com>
AuthorDate: Thu Feb 2 21:33:38 2023 +0530

    KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic (#12984)
    
    Reviewers: Greg Harris <gr...@aiven.io>, Chris Egerton <ch...@aiven.io>
---
 .../java/org/apache/kafka/common/utils/Timer.java  |  10 +-
 .../connect/storage/KafkaConfigBackingStore.java   | 165 ++++++++++++++++-----
 .../apache/kafka/connect/util/KafkaBasedLog.java   |  26 +++-
 .../runtime/distributed/DistributedHerderTest.java |  52 +++++++
 .../storage/KafkaConfigBackingStoreTest.java       | 127 ++++++++++++++--
 .../storage/KafkaOffsetBackingStoreTest.java       |  20 +--
 .../util/clusters/EmbeddedConnectCluster.java      |   2 +-
 7 files changed, 333 insertions(+), 69 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Timer.java b/clients/src/main/java/org/apache/kafka/common/utils/Timer.java
index 98b09a38d36..d60eea25710 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Timer.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Timer.java
@@ -26,7 +26,7 @@ package org.apache.kafka.common.utils;
  * This class also ensures monotonic updates to the timer even if the underlying clock is subject
  * to non-monotonic behavior. For example, the remaining time returned by {@link #remainingMs()} is
  * guaranteed to decrease monotonically until it hits zero.
- *
+ * <p>
  * Note that it is up to the caller to ensure progress of the timer using one of the
  * {@link #update()} methods or {@link #sleep(long)}. The timer will cache the current time and
  * return it indefinitely until the timer has been updated. This allows the caller to limit
@@ -34,14 +34,14 @@ package org.apache.kafka.common.utils;
  * waiting a request sent through the {@link org.apache.kafka.clients.NetworkClient} should call
  * {@link #update()} following each blocking call to
  * {@link org.apache.kafka.clients.NetworkClient#poll(long, long)}.
- *
+ * <p>
  * A typical usage might look something like this:
  *
  * <pre>
  *     Time time = Time.SYSTEM;
  *     Timer timer = time.timer(500);
  *
- *     while (!conditionSatisfied() && timer.notExpired) {
+ *     while (!conditionSatisfied() && timer.notExpired()) {
  *         client.poll(timer.remainingMs(), timer.currentTimeMs());
  *         timer.update();
  *     }
@@ -137,7 +137,7 @@ public class Timer {
     /**
      * Update the cached current time to a specific value. In some contexts, the caller may already
      * have an accurate time, so this avoids unnecessary calls to system time.
-     *
+     * <p>
      * Note that if the updated current time is smaller than the cached time, then the update
      * is ignored.
      *
@@ -161,7 +161,7 @@ public class Timer {
     /**
      * Get the current time in milliseconds. This will return the same cached value until the timer
      * has been updated using one of the {@link #update()} methods or {@link #sleep(long)} is used.
-     *
+     * <p>
      * Note that the value returned is guaranteed to increase monotonically even if the underlying
      * {@link Time} implementation goes backwards. Effectively, the timer will just wait for the
      * time to catch up.
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index 76c626964e6..ac1f4ad7829 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.TopicConfig;
@@ -32,6 +33,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
@@ -39,11 +41,13 @@ import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.DataException;
+import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.RestartRequest;
 import org.apache.kafka.connect.runtime.SessionKey;
 import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
+import org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectUtils;
@@ -68,6 +72,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Supplier;
@@ -77,26 +82,45 @@ import java.util.function.Supplier;
  * Provides persistent storage of Kafka Connect connector configurations in a Kafka topic.
  * </p>
  * <p>
- * This class manages both connector and task configurations. It tracks three types of configuration entries:
+ * This class manages both connector and task configurations, among other various configurations. It tracks seven types
+ * of records:
  * <p/>
- * 1. Connector config: map of string -> string configurations passed to the Connector class, with support for
+ * <ol>
+ * <li> Connector config: map of string -> string configurations passed to the Connector class, with support for
  * expanding this format if necessary. (Kafka key: connector-[connector-id]).
  * These configs are *not* ephemeral. They represent the source of truth. If the entire Connect
- * cluster goes down, this is all that is really needed to recover.
- * 2. Task configs: map of string -> string configurations passed to the Task class, with support for expanding
+ * cluster goes down, this is all that is really needed to recover. </li>
+ * <li> Task configs: map of string -> string configurations passed to the Task class, with support for expanding
  * this format if necessary. (Kafka key: task-[connector-id]-[task-id]).
- * These configs are ephemeral; they are stored here to a) disseminate them to all workers while
- * ensuring agreement and b) to allow faster cluster/worker recovery since the common case
- * of recovery (restoring a connector) will simply result in the same configuration as before
- * the failure.
- * 3. Task commit "configs": records indicating that previous task config entries should be committed and all task
+ * These configs are ephemeral; they are stored here to
+ * <ul>
+ * <li> disseminate them to all workers while ensuring agreement </li>
+ * <li> to allow faster cluster/worker recovery since the common case of recovery (restoring a connector) will simply
+ * result in the same task configuration as before the failure. </li>
+ * </ul>
+ * </li>
+ * <li> Task commit "configs": records indicating that previous task config entries should be committed and all task
  * configs for a connector can be applied. (Kafka key: commit-[connector-id].
  * This config has two effects. First, it records the number of tasks the connector is currently
  * running (and can therefore increase/decrease parallelism). Second, because each task config
  * is stored separately but they need to be applied together to ensure each partition is assigned
  * to a single task, this record also indicates that task configs for the specified connector
- * can be "applied" or "committed".
- * </p>
+ * can be "applied" or "committed". </li>
+ * <li> Connector target states: records indicating the {@link TargetState} for a connector </li>
+ * <li> {@link RestartRequest Restart requests}: records representing requests to restart a connector and / or its
+ * tasks. See <a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308623">KIP-745</a> for more
+ * details.</li>
+ * <li> Task count records: an integer value that that tracks the number of task producers (for source connectors) that
+ * will have to be fenced out if a connector is reconfigured before bringing up any tasks with the new set of task
+ * configurations. This is required for exactly-once support for source connectors, see
+ * <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors">KIP-618</a>
+ * for more details.</li>
+ * <li> Session key records: A {@link SessionKey} generated by the leader of the cluster when
+ * {@link ConnectProtocolCompatibility#SESSIONED} is the Connect protocol being used by the cluster. This session key
+ * is used to verify internal REST requests between workers in the cluster. See
+ * <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-507%3A+Securing+Internal+Connect+REST+Endpoints">KIP-507</a>
+ * for more details.</li>
+ * </ol>
  * <p>
  * This configuration is expected to be stored in a *single partition* and *compacted* topic. Using a single partition
  * ensures we can enforce ordering on messages, allowing Kafka to be used as a write ahead log. Compaction allows
@@ -146,7 +170,7 @@ import java.util.function.Supplier;
  * Because we can encounter these inconsistencies and addressing them requires support from the rest of the system
  * (resolving the task configuration inconsistencies requires support from the connector instance to regenerate updated
  * configs), this class exposes not only the current set of configs, but also which connectors have inconsistent data.
- * This allows users of this class (i.e., Herder implementations) to take action to resolve any inconsistencies. These
+ * This allows users of this class (i.e., {@link Herder} implementations) to take action to resolve any inconsistencies. These
  * inconsistencies should be rare (as described above, due to compaction combined with leader failures in the middle
  * of updating task configurations).
  * </p>
@@ -239,7 +263,8 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
             .field(ONLY_FAILED_FIELD_NAME, Schema.BOOLEAN_SCHEMA)
             .build();
 
-    private static final long READ_TO_END_TIMEOUT_MS = 30000;
+    // Visible for testing
+    static final long READ_WRITE_TOTAL_TIMEOUT_MS = 30000;
 
     private final Object lock;
     private final Converter converter;
@@ -285,6 +310,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
     private final boolean usesFencableWriter;
     private volatile Producer<String, byte[]> fencableProducer;
     private final Map<String, Object> fencableProducerProps;
+    private final Time time;
 
     @Deprecated
     public KafkaConfigBackingStore(Converter converter, DistributedConfig config, WorkerConfigTransformer configTransformer) {
@@ -292,6 +318,10 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
     }
 
     public KafkaConfigBackingStore(Converter converter, DistributedConfig config, WorkerConfigTransformer configTransformer, Supplier<TopicAdmin> adminSupplier) {
+        this(converter, config, configTransformer, adminSupplier, Time.SYSTEM);
+    }
+
+    KafkaConfigBackingStore(Converter converter, DistributedConfig config, WorkerConfigTransformer configTransformer, Supplier<TopicAdmin> adminSupplier, Time time) {
         this.lock = new Object();
         this.started = false;
         this.converter = converter;
@@ -315,6 +345,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
 
         configLog = setupAndCreateKafkaBasedLog(this.topic, config);
         this.configTransformer = configTransformer;
+        this.time = time;
     }
 
     @Override
@@ -463,8 +494,9 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
         connectConfig.put("properties", properties);
         byte[] serializedConfig = converter.fromConnectData(topic, CONNECTOR_CONFIGURATION_V0, connectConfig);
         try {
-            sendPrivileged(CONNECTOR_KEY(connector), serializedConfig);
-            configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+            Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
+            sendPrivileged(CONNECTOR_KEY(connector), serializedConfig, timer);
+            configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException e) {
             log.error("Failed to write connector configuration to Kafka: ", e);
             throw new ConnectException("Error writing connector configuration to Kafka", e);
@@ -484,9 +516,14 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
     public void removeConnectorConfig(String connector) {
         log.debug("Removing connector configuration for connector '{}'", connector);
         try {
-            sendPrivileged(CONNECTOR_KEY(connector), null);
-            sendPrivileged(TARGET_STATE_KEY(connector), null);
-            configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+            Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
+            List<ProducerKeyValue> keyValues = Arrays.asList(
+                    new ProducerKeyValue(CONNECTOR_KEY(connector), null),
+                    new ProducerKeyValue(TARGET_STATE_KEY(connector), null)
+            );
+            sendPrivileged(keyValues, timer);
+
+            configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException e) {
             log.error("Failed to remove connector configuration from Kafka: ", e);
             throw new ConnectException("Error removing connector configuration from Kafka", e);
@@ -515,10 +552,12 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
      */
     @Override
     public void putTaskConfigs(String connector, List<Map<String, String>> configs) {
+        Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
         // Make sure we're at the end of the log. We should be the only writer, but we want to make sure we don't have
         // any outstanding lagging data to consume.
         try {
-            configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+            configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
+            timer.update();
         } catch (InterruptedException | ExecutionException | TimeoutException e) {
             log.error("Failed to write root configuration to Kafka: ", e);
             throw new ConnectException("Error writing root configuration to Kafka", e);
@@ -526,34 +565,44 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
 
         int taskCount = configs.size();
 
-        // Start sending all the individual updates
+        // Send all the individual updates
         int index = 0;
+        List<ProducerKeyValue> keyValues = new ArrayList<>();
         for (Map<String, String> taskConfig: configs) {
             Struct connectConfig = new Struct(TASK_CONFIGURATION_V0);
             connectConfig.put("properties", taskConfig);
             byte[] serializedConfig = converter.fromConnectData(topic, TASK_CONFIGURATION_V0, connectConfig);
             log.debug("Writing configuration for connector '{}' task {}", connector, index);
             ConnectorTaskId connectorTaskId = new ConnectorTaskId(connector, index);
-            sendPrivileged(TASK_KEY(connectorTaskId), serializedConfig);
+            keyValues.add(new ProducerKeyValue(TASK_KEY(connectorTaskId), serializedConfig));
             index++;
         }
 
+        try {
+            sendPrivileged(keyValues, timer);
+        } catch (ExecutionException | InterruptedException | TimeoutException e) {
+            log.error("Failed to write task configurations to Kafka", e);
+            throw new ConnectException("Error writing task configurations to Kafka", e);
+        }
+
         // Finally, send the commit to update the number of tasks and apply the new configs, then wait until we read to
         // the end of the log
         try {
             // Read to end to ensure all the task configs have been written
             if (taskCount > 0) {
-                configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+                configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
+                timer.update();
             }
             // Write the commit message
             Struct connectConfig = new Struct(CONNECTOR_TASKS_COMMIT_V0);
             connectConfig.put("tasks", taskCount);
             byte[] serializedConfig = converter.fromConnectData(topic, CONNECTOR_TASKS_COMMIT_V0, connectConfig);
             log.debug("Writing commit for connector '{}' with {} tasks.", connector, taskCount);
-            sendPrivileged(COMMIT_TASKS_KEY(connector), serializedConfig);
+
+            sendPrivileged(COMMIT_TASKS_KEY(connector), serializedConfig, timer);
 
             // Read to end to ensure all the commit messages have been written
-            configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+            configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException e) {
             log.error("Failed to write root configuration to Kafka: ", e);
             throw new ConnectException("Error writing root configuration to Kafka", e);
@@ -581,7 +630,12 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
         connectTargetState.put("state", state.name());
         byte[] serializedTargetState = converter.fromConnectData(topic, TARGET_STATE_V0, connectTargetState);
         log.debug("Writing target state {} for connector {}", state, connector);
-        configLog.send(TARGET_STATE_KEY(connector), serializedTargetState);
+        try {
+            configLog.send(TARGET_STATE_KEY(connector), serializedTargetState).get(READ_WRITE_TOTAL_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            log.error("Failed to write target state to Kafka", e);
+            throw new ConnectException("Error writing target state to Kafka", e);
+        }
     }
 
     /**
@@ -602,8 +656,9 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
         byte[] serializedTaskCountRecord = converter.fromConnectData(topic, TASK_COUNT_RECORD_V0, taskCountRecord);
         log.debug("Writing task count record {} for connector {}", taskCount, connector);
         try {
-            sendPrivileged(TASK_COUNT_RECORD_KEY(connector), serializedTaskCountRecord);
-            configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+            Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
+            sendPrivileged(TASK_COUNT_RECORD_KEY(connector), serializedTaskCountRecord, timer);
+            configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException e) {
             log.error("Failed to write task count record with {} tasks for connector {} to Kafka: ", taskCount, connector, e);
             throw new ConnectException("Error writing task count record to Kafka", e);
@@ -629,8 +684,9 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
         sessionKeyStruct.put("creation-timestamp", sessionKey.creationTimestamp());
         byte[] serializedSessionKey = converter.fromConnectData(topic, SESSION_KEY_V0, sessionKeyStruct);
         try {
-            sendPrivileged(SESSION_KEY_KEY, serializedSessionKey);
-            configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+            Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
+            sendPrivileged(SESSION_KEY_KEY, serializedSessionKey, timer);
+            configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException e) {
             log.error("Failed to write session key to Kafka: ", e);
             throw new ConnectException("Error writing session key to Kafka", e);
@@ -652,8 +708,9 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
         value.put(ONLY_FAILED_FIELD_NAME, restartRequest.onlyFailed());
         byte[] serializedValue = converter.fromConnectData(topic, value.schema(), value);
         try {
-            sendPrivileged(key, serializedValue);
-            configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+            Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
+            sendPrivileged(key, serializedValue, timer);
+            configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException e) {
             log.error("Failed to write {} to Kafka: ", restartRequest, e);
             throw new ConnectException("Error writing " + restartRequest + " to Kafka", e);
@@ -702,9 +759,36 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
         return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminSupplier);
     }
 
-    private void sendPrivileged(String key, byte[] value) {
+    /**
+     * Send a single record to the config topic synchronously. Note that {@link #claimWritePrivileges()} must be
+     * successfully invoked before calling this method if this store is configured to use a fencable writer.
+     * @param key the record key
+     * @param value the record value
+     * @param timer Timer bounding how long this method can block. The timer is updated before the method returns.
+     */
+    private void sendPrivileged(String key, byte[] value, Timer timer) throws ExecutionException, InterruptedException, TimeoutException {
+        sendPrivileged(Collections.singletonList(new ProducerKeyValue(key, value)), timer);
+    }
+
+    /**
+     * Send one or more records to the config topic synchronously. Note that {@link #claimWritePrivileges()} must be
+     * successfully invoked before calling this method if this store is configured to use a fencable writer.
+     * @param keyValues the list of producer record key/value pairs
+     * @param timer Timer bounding how long this method can block. The timer is updated before the method returns.
+     */
+    private void sendPrivileged(List<ProducerKeyValue> keyValues, Timer timer) throws ExecutionException, InterruptedException, TimeoutException {
         if (!usesFencableWriter) {
-            configLog.send(key, value);
+            List<Future<RecordMetadata>> producerFutures = new ArrayList<>();
+            keyValues.forEach(
+                    keyValue -> producerFutures.add(configLog.send(keyValue.key, keyValue.value))
+            );
+
+            timer.update();
+            for (Future<RecordMetadata> future : producerFutures) {
+                future.get(timer.remainingMs(), TimeUnit.MILLISECONDS);
+                timer.update();
+            }
+
             return;
         }
 
@@ -714,8 +798,11 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
 
         try {
             fencableProducer.beginTransaction();
-            fencableProducer.send(new ProducerRecord<>(topic, key, value));
+            keyValues.forEach(
+                    keyValue -> fencableProducer.send(new ProducerRecord<>(topic, keyValue.key, keyValue.value))
+            );
             fencableProducer.commitTransaction();
+            timer.update();
         } catch (Exception e) {
             log.warn("Failed to perform fencable send to config topic", e);
             relinquishWritePrivileges();
@@ -723,6 +810,16 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
         }
     }
 
+    private static class ProducerKeyValue {
+        final String key;
+        final byte[] value;
+
+        ProducerKeyValue(String key, byte[] value) {
+            this.key = key;
+            this.value = value;
+        }
+    }
+
     private void relinquishWritePrivileges() {
         if (fencableProducer != null) {
             Utils.closeQuietly(() -> fencableProducer.close(Duration.ZERO), "fencable producer for config topic");
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index cde63b3f833..899b42dd877 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -25,6 +25,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.PartitionInfo;
@@ -342,12 +343,29 @@ public class KafkaBasedLog<K, V> {
         return future;
     }
 
-    public void send(K key, V value) {
-        send(key, value, null);
+    /**
+     * Send a record asynchronously to the configured {@link #topic} without using a producer callback.
+     * @param key the key for the {@link ProducerRecord}
+     * @param value the value for the {@link ProducerRecord}
+     *
+     * @return the future from the call to {@link Producer#send}. {@link Future#get} can be called on this returned
+     *         future if synchronous behavior is desired.
+     */
+    public Future<RecordMetadata> send(K key, V value) {
+        return send(key, value, null);
     }
 
-    public void send(K key, V value, org.apache.kafka.clients.producer.Callback callback) {
-        producer.orElseThrow(() ->
+    /**
+     * Send a record asynchronously to the configured {@link #topic}
+     * @param key the key for the {@link ProducerRecord}
+     * @param value the value for the {@link ProducerRecord}
+     * @param callback the callback to invoke after completion; can be null if no callback is desired
+     *
+     * @return the future from the call to {@link Producer#send}. {@link Future#get} can be called on this returned
+     *         future if synchronous behavior is desired.
+     */
+    public Future<RecordMetadata> send(K key, V value, org.apache.kafka.clients.producer.Callback callback) {
+        return producer.orElseThrow(() ->
                 new IllegalStateException("This KafkaBasedLog was created in read-only mode and does not support write operations")
         ).send(new ProducerRecord<>(topic, key, value), callback);
     }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index f2985cc8a23..886163f2348 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -762,6 +762,58 @@ public class DistributedHerderTest extends ThreadedTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testCreateConnectorConfigBackingStoreError() {
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+
+        member.wakeup();
+        PowerMock.expectLastCall();
+
+        // mock the actual validation since its asynchronous nature is difficult to test and should
+        // be covered sufficiently by the unit tests for the AbstractHerder class
+        Capture<Callback<ConfigInfos>> validateCallback = newCapture();
+        herder.validateConnectorConfig(EasyMock.eq(CONN2_CONFIG), capture(validateCallback));
+        PowerMock.expectLastCall().andAnswer(() -> {
+            validateCallback.getValue().onCompletion(null, CONN2_CONFIG_INFOS);
+            return null;
+        });
+
+        configBackingStore.putConnectorConfig(CONN2, CONN2_CONFIG);
+        PowerMock.expectLastCall().andThrow(new ConnectException("Error writing connector configuration to Kafka"));
+
+        // verify that the exception from config backing store write is propagated via the callback
+        putConnectorCallback.onCompletion(EasyMock.anyObject(ConnectException.class), EasyMock.isNull());
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+        // These will occur just before/during the second tick
+        member.wakeup();
+        PowerMock.expectLastCall();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.putConnectorConfig(CONN2, CONN2_CONFIG, false, putConnectorCallback);
+        // First tick runs the initial herder request, which issues an asynchronous request for
+        // connector validation
+        herder.tick();
+
+        // Once that validation is complete, another request is added to the herder request queue
+        // for actually performing the config write; this tick is for that request
+        herder.tick();
+
+        time.sleep(1000L);
+        assertStatistics(3, 1, 100, 1000L);
+
+        PowerMock.verifyAll();
+    }
+
     @Test
     public void testCreateConnectorFailedValidation() throws Exception {
         EasyMock.expect(member.memberId()).andStubReturn("leader");
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index b374f8f5d2f..0eb45eb4971 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -22,15 +22,19 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.RestartRequest;
 import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
@@ -62,6 +66,8 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 
@@ -71,6 +77,7 @@ import static org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_
 import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.GROUP_ID_CONFIG;
 import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.INCLUDE_TASKS_FIELD_NAME;
 import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.ONLY_FAILED_FIELD_NAME;
+import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.READ_WRITE_TOTAL_TIMEOUT_MS;
 import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.RESTART_KEY;
 import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG;
 import static org.junit.Assert.assertEquals;
@@ -168,14 +175,17 @@ public class KafkaConfigBackingStoreTest {
     KafkaBasedLog<String, byte[]> storeLog;
     @Mock
     Producer<String, byte[]> fencableProducer;
+    @Mock
+    Future<RecordMetadata> producerFuture;
     private KafkaConfigBackingStore configStorage;
 
-    private Capture<String> capturedTopic = EasyMock.newCapture();
-    private Capture<Map<String, Object>> capturedProducerProps = EasyMock.newCapture();
-    private Capture<Map<String, Object>> capturedConsumerProps = EasyMock.newCapture();
-    private Capture<Supplier<TopicAdmin>> capturedAdminSupplier = EasyMock.newCapture();
-    private Capture<NewTopic> capturedNewTopic = EasyMock.newCapture();
-    private Capture<Callback<ConsumerRecord<String, byte[]>>> capturedConsumedCallback = EasyMock.newCapture();
+    private final Capture<String> capturedTopic = EasyMock.newCapture();
+    private final Capture<Map<String, Object>> capturedProducerProps = EasyMock.newCapture();
+    private final Capture<Map<String, Object>> capturedConsumerProps = EasyMock.newCapture();
+    private final Capture<Supplier<TopicAdmin>> capturedAdminSupplier = EasyMock.newCapture();
+    private final Capture<NewTopic> capturedNewTopic = EasyMock.newCapture();
+    private final Capture<Callback<ConsumerRecord<String, byte[]>>> capturedConsumedCallback = EasyMock.newCapture();
+    private final MockTime time = new MockTime();
 
     private long logOffset = 0;
 
@@ -183,7 +193,7 @@ public class KafkaConfigBackingStoreTest {
         configStorage = PowerMock.createPartialMock(
                 KafkaConfigBackingStore.class,
                 new String[]{"createKafkaBasedLog", "createFencableProducer"},
-                converter, config, null);
+                converter, config, null, null, time);
         Whitebox.setInternalState(configStorage, "configLog", storeLog);
         configStorage.setUpdateListener(configUpdateListener);
     }
@@ -316,6 +326,92 @@ public class KafkaConfigBackingStoreTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testPutConnectorConfigProducerError() throws Exception {
+        expectConfigure();
+        expectStart(Collections.emptyList(), Collections.emptyMap());
+        expectPartitionCount(1);
+
+        expectConvert(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONNECTOR_CONFIG_STRUCTS.get(0), CONFIGS_SERIALIZED.get(0));
+
+        storeLog.send(EasyMock.anyObject(), EasyMock.anyObject());
+        EasyMock.expectLastCall().andReturn(producerFuture);
+
+        producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject());
+        EasyMock.expectLastCall().andThrow(new ExecutionException(new TopicAuthorizationException(Collections.singleton("test"))));
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
+        configStorage.start();
+
+        // Verify initial state
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(-1, configState.offset());
+        assertEquals(0, configState.connectors().size());
+
+        // verify that the producer exception from KafkaBasedLog::send is propagated
+        ConnectException e = assertThrows(ConnectException.class, () -> configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0)));
+        assertTrue(e.getMessage().contains("Error writing connector configuration to Kafka"));
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testRemoveConnectorConfigSlowProducer() throws Exception {
+        expectConfigure();
+        expectStart(Collections.emptyList(), Collections.emptyMap());
+        expectPartitionCount(1);
+
+        @SuppressWarnings("unchecked")
+        Future<RecordMetadata> connectorConfigProducerFuture = PowerMock.createMock(Future.class);
+        // tombstone for the connector config
+        storeLog.send(EasyMock.anyObject(), EasyMock.isNull());
+        EasyMock.expectLastCall().andReturn(connectorConfigProducerFuture);
+
+        @SuppressWarnings("unchecked")
+        Future<RecordMetadata> targetStateProducerFuture = PowerMock.createMock(Future.class);
+        // tombstone for the connector target state
+        storeLog.send(EasyMock.anyObject(), EasyMock.isNull());
+        EasyMock.expectLastCall().andReturn(targetStateProducerFuture);
+
+        connectorConfigProducerFuture.get(EasyMock.eq(READ_WRITE_TOTAL_TIMEOUT_MS), EasyMock.anyObject());
+        EasyMock.expectLastCall().andAnswer(() -> {
+            time.sleep(READ_WRITE_TOTAL_TIMEOUT_MS - 1000);
+            return null;
+        });
+
+        // the future get timeout is expected to be reduced according to how long the previous Future::get took
+        targetStateProducerFuture.get(EasyMock.eq(1000L), EasyMock.anyObject());
+        EasyMock.expectLastCall().andAnswer(() -> {
+            time.sleep(1000);
+            return null;
+        });
+
+        @SuppressWarnings("unchecked")
+        Future<Void> future = PowerMock.createMock(Future.class);
+        EasyMock.expect(storeLog.readToEnd()).andAnswer(() -> future);
+
+        // the Future::get calls on the previous two producer futures exhausted the overall timeout; so expect the
+        // timeout on the log read future to be 0
+        EasyMock.expect(future.get(EasyMock.eq(0L), EasyMock.anyObject())).andReturn(null);
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
+        configStorage.start();
+
+        configStorage.removeConnectorConfig("test-connector");
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
     @Test
     public void testWritePrivileges() throws Exception {
         // With exactly.once.source.support = preparing (or also, "enabled"), we need to use a transactional producer
@@ -357,7 +453,9 @@ public class KafkaConfigBackingStoreTest {
         // In the meantime, write a target state (which doesn't require write privileges)
         expectConvert(KafkaConfigBackingStore.TARGET_STATE_V0, TARGET_STATE_PAUSED, CONFIGS_SERIALIZED.get(1));
         storeLog.send("target-state-" + CONNECTOR_IDS.get(1), CONFIGS_SERIALIZED.get(1));
-        PowerMock.expectLastCall();
+        EasyMock.expectLastCall().andReturn(producerFuture);
+        producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject());
+        EasyMock.expectLastCall().andReturn(null);
 
         // Reclaim write privileges
         expectFencableProducer();
@@ -1479,13 +1577,18 @@ public class KafkaConfigBackingStoreTest {
     // from the log. Validate the data that is captured when the conversion is performed matches the specified data
     // (by checking a single field's value)
     private void expectConvertWriteRead(final String configKey, final Schema valueSchema, final byte[] serialized,
-                                        final String dataFieldName, final Object dataFieldValue) {
+                                        final String dataFieldName, final Object dataFieldValue) throws Exception {
         final Capture<Struct> capturedRecord = EasyMock.newCapture();
         if (serialized != null)
             EasyMock.expect(converter.fromConnectData(EasyMock.eq(TOPIC), EasyMock.eq(valueSchema), EasyMock.capture(capturedRecord)))
                     .andReturn(serialized);
+
         storeLog.send(EasyMock.eq(configKey), EasyMock.aryEq(serialized));
-        PowerMock.expectLastCall();
+        EasyMock.expectLastCall().andReturn(producerFuture);
+
+        producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject());
+        EasyMock.expectLastCall().andReturn(null);
+
         EasyMock.expect(converter.toConnectData(EasyMock.eq(TOPIC), EasyMock.aryEq(serialized)))
                 .andAnswer(() -> {
                     if (dataFieldName != null)
@@ -1518,7 +1621,7 @@ public class KafkaConfigBackingStoreTest {
                 });
     }
 
-    private void expectConnectorRemoval(String configKey, String targetStateKey) {
+    private void expectConnectorRemoval(String configKey, String targetStateKey) throws Exception {
         expectConvertWriteRead(configKey, KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, null, null, null);
         expectConvertWriteRead(targetStateKey, KafkaConfigBackingStore.TARGET_STATE_V0, null, null, null);
 
@@ -1529,7 +1632,7 @@ public class KafkaConfigBackingStoreTest {
     }
 
     private void expectConvertWriteAndRead(final String configKey, final Schema valueSchema, final byte[] serialized,
-                                           final String dataFieldName, final Object dataFieldValue) {
+                                           final String dataFieldName, final Object dataFieldValue) throws Exception {
         expectConvertWriteRead(configKey, valueSchema, serialized, dataFieldName, dataFieldValue);
         LinkedHashMap<String, byte[]> recordsToRead = new LinkedHashMap<>();
         recordsToRead.put(configKey, serialized);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
index cf11230f3d2..e6927ab62cc 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
@@ -193,11 +193,9 @@ public class KafkaOffsetBackingStoreTest {
 
         // Set offsets
         Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture();
-        storeLog.send(EasyMock.aryEq(TP0_KEY.array()), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0));
-        PowerMock.expectLastCall();
+        EasyMock.expect(storeLog.send(EasyMock.aryEq(TP0_KEY.array()), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0))).andReturn(null);
         Capture<org.apache.kafka.clients.producer.Callback> callback1 = EasyMock.newCapture();
-        storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.aryEq(TP1_VALUE.array()), EasyMock.capture(callback1));
-        PowerMock.expectLastCall();
+        EasyMock.expect(storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.aryEq(TP1_VALUE.array()), EasyMock.capture(callback1))).andReturn(null);
 
         // Second get() should get the produced data and return the new values
         final Capture<Callback<Void>> secondGetReadToEndCallback = EasyMock.newCapture();
@@ -276,10 +274,9 @@ public class KafkaOffsetBackingStoreTest {
 
         // Set offsets
         Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture();
-        storeLog.send(EasyMock.isNull(byte[].class), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0));
-        PowerMock.expectLastCall();
+        EasyMock.expect(storeLog.send(EasyMock.isNull(byte[].class), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0))).andReturn(null);
         Capture<org.apache.kafka.clients.producer.Callback> callback1 = EasyMock.newCapture();
-        storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.isNull(byte[].class), EasyMock.capture(callback1));
+        EasyMock.expect(storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.isNull(byte[].class), EasyMock.capture(callback1))).andReturn(null);
         PowerMock.expectLastCall();
 
         // Second get() should get the produced data and return the new values
@@ -337,14 +334,11 @@ public class KafkaOffsetBackingStoreTest {
 
         // Set offsets
         Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture();
-        storeLog.send(EasyMock.aryEq(TP0_KEY.array()), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0));
-        PowerMock.expectLastCall();
+        EasyMock.expect(storeLog.send(EasyMock.aryEq(TP0_KEY.array()), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0))).andReturn(null);
         Capture<org.apache.kafka.clients.producer.Callback> callback1 = EasyMock.newCapture();
-        storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.aryEq(TP1_VALUE.array()), EasyMock.capture(callback1));
-        PowerMock.expectLastCall();
+        EasyMock.expect(storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.aryEq(TP1_VALUE.array()), EasyMock.capture(callback1))).andReturn(null);
         Capture<org.apache.kafka.clients.producer.Callback> callback2 = EasyMock.newCapture();
-        storeLog.send(EasyMock.aryEq(TP2_KEY.array()), EasyMock.aryEq(TP2_VALUE.array()), EasyMock.capture(callback2));
-        PowerMock.expectLastCall();
+        EasyMock.expect(storeLog.send(EasyMock.aryEq(TP2_KEY.array()), EasyMock.aryEq(TP2_VALUE.array()), EasyMock.capture(callback2))).andReturn(null);
 
         expectClusterId();
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
index ccbf2c495d6..4396f7dbb50 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
@@ -260,7 +260,7 @@ public class EmbeddedConnectCluster {
         putIfAbsent(workerProps, OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG, internalTopicsReplFactor);
         putIfAbsent(workerProps, CONFIG_TOPIC_CONFIG, "connect-config-topic-" + connectClusterName);
         putIfAbsent(workerProps, CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG, internalTopicsReplFactor);
-        putIfAbsent(workerProps, STATUS_STORAGE_TOPIC_CONFIG, "connect-storage-topic-" + connectClusterName);
+        putIfAbsent(workerProps, STATUS_STORAGE_TOPIC_CONFIG, "connect-status-topic-" + connectClusterName);
         putIfAbsent(workerProps, STATUS_STORAGE_REPLICATION_FACTOR_CONFIG, internalTopicsReplFactor);
         putIfAbsent(workerProps, KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
         putIfAbsent(workerProps, VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");