You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2023/02/08 20:20:32 UTC
[kafka] branch 3.3 updated: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic (#12984)
This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 2aeceff25a4 KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic (#12984)
2aeceff25a4 is described below
commit 2aeceff25a4a6a0581adf779b77cd94838b4e224
Author: Yash Mayya <ya...@gmail.com>
AuthorDate: Thu Feb 2 21:33:38 2023 +0530
KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic (#12984)
Reviewers: Greg Harris <gr...@aiven.io>, Chris Egerton <ch...@aiven.io>
---
.../java/org/apache/kafka/common/utils/Timer.java | 10 +-
.../connect/storage/KafkaConfigBackingStore.java | 165 ++++++++++++++++-----
.../apache/kafka/connect/util/KafkaBasedLog.java | 26 +++-
.../runtime/distributed/DistributedHerderTest.java | 52 +++++++
.../storage/KafkaConfigBackingStoreTest.java | 127 ++++++++++++++--
.../storage/KafkaOffsetBackingStoreTest.java | 20 +--
.../util/clusters/EmbeddedConnectCluster.java | 2 +-
7 files changed, 333 insertions(+), 69 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Timer.java b/clients/src/main/java/org/apache/kafka/common/utils/Timer.java
index 98b09a38d36..d60eea25710 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Timer.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Timer.java
@@ -26,7 +26,7 @@ package org.apache.kafka.common.utils;
* This class also ensures monotonic updates to the timer even if the underlying clock is subject
* to non-monotonic behavior. For example, the remaining time returned by {@link #remainingMs()} is
* guaranteed to decrease monotonically until it hits zero.
- *
+ * <p>
* Note that it is up to the caller to ensure progress of the timer using one of the
* {@link #update()} methods or {@link #sleep(long)}. The timer will cache the current time and
* return it indefinitely until the timer has been updated. This allows the caller to limit
@@ -34,14 +34,14 @@ package org.apache.kafka.common.utils;
* waiting a request sent through the {@link org.apache.kafka.clients.NetworkClient} should call
* {@link #update()} following each blocking call to
* {@link org.apache.kafka.clients.NetworkClient#poll(long, long)}.
- *
+ * <p>
* A typical usage might look something like this:
*
* <pre>
* Time time = Time.SYSTEM;
* Timer timer = time.timer(500);
*
- * while (!conditionSatisfied() && timer.notExpired) {
+ * while (!conditionSatisfied() && timer.notExpired()) {
* client.poll(timer.remainingMs(), timer.currentTimeMs());
* timer.update();
* }
@@ -137,7 +137,7 @@ public class Timer {
/**
* Update the cached current time to a specific value. In some contexts, the caller may already
* have an accurate time, so this avoids unnecessary calls to system time.
- *
+ * <p>
* Note that if the updated current time is smaller than the cached time, then the update
* is ignored.
*
@@ -161,7 +161,7 @@ public class Timer {
/**
* Get the current time in milliseconds. This will return the same cached value until the timer
* has been updated using one of the {@link #update()} methods or {@link #sleep(long)} is used.
- *
+ * <p>
* Note that the value returned is guaranteed to increase monotonically even if the underlying
* {@link Time} implementation goes backwards. Effectively, the timer will just wait for the
* time to catch up.
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index 76c626964e6..ac1f4ad7829 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.TopicConfig;
@@ -32,6 +33,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
@@ -39,11 +41,13 @@ import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
+import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.RestartRequest;
import org.apache.kafka.connect.runtime.SessionKey;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
+import org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectUtils;
@@ -68,6 +72,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
@@ -77,26 +82,45 @@ import java.util.function.Supplier;
* Provides persistent storage of Kafka Connect connector configurations in a Kafka topic.
* </p>
* <p>
- * This class manages both connector and task configurations. It tracks three types of configuration entries:
+ * This class manages both connector and task configurations, among other various configurations. It tracks seven types
+ * of records:
* <p/>
- * 1. Connector config: map of string -> string configurations passed to the Connector class, with support for
+ * <ol>
+ * <li> Connector config: map of string -> string configurations passed to the Connector class, with support for
* expanding this format if necessary. (Kafka key: connector-[connector-id]).
* These configs are *not* ephemeral. They represent the source of truth. If the entire Connect
- * cluster goes down, this is all that is really needed to recover.
- * 2. Task configs: map of string -> string configurations passed to the Task class, with support for expanding
+ * cluster goes down, this is all that is really needed to recover. </li>
+ * <li> Task configs: map of string -> string configurations passed to the Task class, with support for expanding
* this format if necessary. (Kafka key: task-[connector-id]-[task-id]).
- * These configs are ephemeral; they are stored here to a) disseminate them to all workers while
- * ensuring agreement and b) to allow faster cluster/worker recovery since the common case
- * of recovery (restoring a connector) will simply result in the same configuration as before
- * the failure.
- * 3. Task commit "configs": records indicating that previous task config entries should be committed and all task
+ * These configs are ephemeral; they are stored here to
+ * <ul>
+ * <li> disseminate them to all workers while ensuring agreement </li>
+ * <li> to allow faster cluster/worker recovery since the common case of recovery (restoring a connector) will simply
+ * result in the same task configuration as before the failure. </li>
+ * </ul>
+ * </li>
+ * <li> Task commit "configs": records indicating that previous task config entries should be committed and all task
* configs for a connector can be applied. (Kafka key: commit-[connector-id].
* This config has two effects. First, it records the number of tasks the connector is currently
* running (and can therefore increase/decrease parallelism). Second, because each task config
* is stored separately but they need to be applied together to ensure each partition is assigned
* to a single task, this record also indicates that task configs for the specified connector
- * can be "applied" or "committed".
- * </p>
+ * can be "applied" or "committed". </li>
+ * <li> Connector target states: records indicating the {@link TargetState} for a connector </li>
+ * <li> {@link RestartRequest Restart requests}: records representing requests to restart a connector and / or its
+ * tasks. See <a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308623">KIP-745</a> for more
+ * details.</li>
+ * <li> Task count records: an integer value that that tracks the number of task producers (for source connectors) that
+ * will have to be fenced out if a connector is reconfigured before bringing up any tasks with the new set of task
+ * configurations. This is required for exactly-once support for source connectors, see
+ * <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors">KIP-618</a>
+ * for more details.</li>
+ * <li> Session key records: A {@link SessionKey} generated by the leader of the cluster when
+ * {@link ConnectProtocolCompatibility#SESSIONED} is the Connect protocol being used by the cluster. This session key
+ * is used to verify internal REST requests between workers in the cluster. See
+ * <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-507%3A+Securing+Internal+Connect+REST+Endpoints">KIP-507</a>
+ * for more details.</li>
+ * </ol>
* <p>
* This configuration is expected to be stored in a *single partition* and *compacted* topic. Using a single partition
* ensures we can enforce ordering on messages, allowing Kafka to be used as a write ahead log. Compaction allows
@@ -146,7 +170,7 @@ import java.util.function.Supplier;
* Because we can encounter these inconsistencies and addressing them requires support from the rest of the system
* (resolving the task configuration inconsistencies requires support from the connector instance to regenerate updated
* configs), this class exposes not only the current set of configs, but also which connectors have inconsistent data.
- * This allows users of this class (i.e., Herder implementations) to take action to resolve any inconsistencies. These
+ * This allows users of this class (i.e., {@link Herder} implementations) to take action to resolve any inconsistencies. These
* inconsistencies should be rare (as described above, due to compaction combined with leader failures in the middle
* of updating task configurations).
* </p>
@@ -239,7 +263,8 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
.field(ONLY_FAILED_FIELD_NAME, Schema.BOOLEAN_SCHEMA)
.build();
- private static final long READ_TO_END_TIMEOUT_MS = 30000;
+ // Visible for testing
+ static final long READ_WRITE_TOTAL_TIMEOUT_MS = 30000;
private final Object lock;
private final Converter converter;
@@ -285,6 +310,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
private final boolean usesFencableWriter;
private volatile Producer<String, byte[]> fencableProducer;
private final Map<String, Object> fencableProducerProps;
+ private final Time time;
@Deprecated
public KafkaConfigBackingStore(Converter converter, DistributedConfig config, WorkerConfigTransformer configTransformer) {
@@ -292,6 +318,10 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
}
public KafkaConfigBackingStore(Converter converter, DistributedConfig config, WorkerConfigTransformer configTransformer, Supplier<TopicAdmin> adminSupplier) {
+ this(converter, config, configTransformer, adminSupplier, Time.SYSTEM);
+ }
+
+ KafkaConfigBackingStore(Converter converter, DistributedConfig config, WorkerConfigTransformer configTransformer, Supplier<TopicAdmin> adminSupplier, Time time) {
this.lock = new Object();
this.started = false;
this.converter = converter;
@@ -315,6 +345,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
configLog = setupAndCreateKafkaBasedLog(this.topic, config);
this.configTransformer = configTransformer;
+ this.time = time;
}
@Override
@@ -463,8 +494,9 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
connectConfig.put("properties", properties);
byte[] serializedConfig = converter.fromConnectData(topic, CONNECTOR_CONFIGURATION_V0, connectConfig);
try {
- sendPrivileged(CONNECTOR_KEY(connector), serializedConfig);
- configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
+ sendPrivileged(CONNECTOR_KEY(connector), serializedConfig, timer);
+ configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Failed to write connector configuration to Kafka: ", e);
throw new ConnectException("Error writing connector configuration to Kafka", e);
@@ -484,9 +516,14 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
public void removeConnectorConfig(String connector) {
log.debug("Removing connector configuration for connector '{}'", connector);
try {
- sendPrivileged(CONNECTOR_KEY(connector), null);
- sendPrivileged(TARGET_STATE_KEY(connector), null);
- configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
+ List<ProducerKeyValue> keyValues = Arrays.asList(
+ new ProducerKeyValue(CONNECTOR_KEY(connector), null),
+ new ProducerKeyValue(TARGET_STATE_KEY(connector), null)
+ );
+ sendPrivileged(keyValues, timer);
+
+ configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Failed to remove connector configuration from Kafka: ", e);
throw new ConnectException("Error removing connector configuration from Kafka", e);
@@ -515,10 +552,12 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
*/
@Override
public void putTaskConfigs(String connector, List<Map<String, String>> configs) {
+ Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
// Make sure we're at the end of the log. We should be the only writer, but we want to make sure we don't have
// any outstanding lagging data to consume.
try {
- configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
+ timer.update();
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Failed to write root configuration to Kafka: ", e);
throw new ConnectException("Error writing root configuration to Kafka", e);
@@ -526,34 +565,44 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
int taskCount = configs.size();
- // Start sending all the individual updates
+ // Send all the individual updates
int index = 0;
+ List<ProducerKeyValue> keyValues = new ArrayList<>();
for (Map<String, String> taskConfig: configs) {
Struct connectConfig = new Struct(TASK_CONFIGURATION_V0);
connectConfig.put("properties", taskConfig);
byte[] serializedConfig = converter.fromConnectData(topic, TASK_CONFIGURATION_V0, connectConfig);
log.debug("Writing configuration for connector '{}' task {}", connector, index);
ConnectorTaskId connectorTaskId = new ConnectorTaskId(connector, index);
- sendPrivileged(TASK_KEY(connectorTaskId), serializedConfig);
+ keyValues.add(new ProducerKeyValue(TASK_KEY(connectorTaskId), serializedConfig));
index++;
}
+ try {
+ sendPrivileged(keyValues, timer);
+ } catch (ExecutionException | InterruptedException | TimeoutException e) {
+ log.error("Failed to write task configurations to Kafka", e);
+ throw new ConnectException("Error writing task configurations to Kafka", e);
+ }
+
// Finally, send the commit to update the number of tasks and apply the new configs, then wait until we read to
// the end of the log
try {
// Read to end to ensure all the task configs have been written
if (taskCount > 0) {
- configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
+ timer.update();
}
// Write the commit message
Struct connectConfig = new Struct(CONNECTOR_TASKS_COMMIT_V0);
connectConfig.put("tasks", taskCount);
byte[] serializedConfig = converter.fromConnectData(topic, CONNECTOR_TASKS_COMMIT_V0, connectConfig);
log.debug("Writing commit for connector '{}' with {} tasks.", connector, taskCount);
- sendPrivileged(COMMIT_TASKS_KEY(connector), serializedConfig);
+
+ sendPrivileged(COMMIT_TASKS_KEY(connector), serializedConfig, timer);
// Read to end to ensure all the commit messages have been written
- configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Failed to write root configuration to Kafka: ", e);
throw new ConnectException("Error writing root configuration to Kafka", e);
@@ -581,7 +630,12 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
connectTargetState.put("state", state.name());
byte[] serializedTargetState = converter.fromConnectData(topic, TARGET_STATE_V0, connectTargetState);
log.debug("Writing target state {} for connector {}", state, connector);
- configLog.send(TARGET_STATE_KEY(connector), serializedTargetState);
+ try {
+ configLog.send(TARGET_STATE_KEY(connector), serializedTargetState).get(READ_WRITE_TOTAL_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ log.error("Failed to write target state to Kafka", e);
+ throw new ConnectException("Error writing target state to Kafka", e);
+ }
}
/**
@@ -602,8 +656,9 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
byte[] serializedTaskCountRecord = converter.fromConnectData(topic, TASK_COUNT_RECORD_V0, taskCountRecord);
log.debug("Writing task count record {} for connector {}", taskCount, connector);
try {
- sendPrivileged(TASK_COUNT_RECORD_KEY(connector), serializedTaskCountRecord);
- configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
+ sendPrivileged(TASK_COUNT_RECORD_KEY(connector), serializedTaskCountRecord, timer);
+ configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Failed to write task count record with {} tasks for connector {} to Kafka: ", taskCount, connector, e);
throw new ConnectException("Error writing task count record to Kafka", e);
@@ -629,8 +684,9 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
sessionKeyStruct.put("creation-timestamp", sessionKey.creationTimestamp());
byte[] serializedSessionKey = converter.fromConnectData(topic, SESSION_KEY_V0, sessionKeyStruct);
try {
- sendPrivileged(SESSION_KEY_KEY, serializedSessionKey);
- configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
+ sendPrivileged(SESSION_KEY_KEY, serializedSessionKey, timer);
+ configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Failed to write session key to Kafka: ", e);
throw new ConnectException("Error writing session key to Kafka", e);
@@ -652,8 +708,9 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
value.put(ONLY_FAILED_FIELD_NAME, restartRequest.onlyFailed());
byte[] serializedValue = converter.fromConnectData(topic, value.schema(), value);
try {
- sendPrivileged(key, serializedValue);
- configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
+ sendPrivileged(key, serializedValue, timer);
+ configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Failed to write {} to Kafka: ", restartRequest, e);
throw new ConnectException("Error writing " + restartRequest + " to Kafka", e);
@@ -702,9 +759,36 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminSupplier);
}
- private void sendPrivileged(String key, byte[] value) {
+ /**
+ * Send a single record to the config topic synchronously. Note that {@link #claimWritePrivileges()} must be
+ * successfully invoked before calling this method if this store is configured to use a fencable writer.
+ * @param key the record key
+ * @param value the record value
+ * @param timer Timer bounding how long this method can block. The timer is updated before the method returns.
+ */
+ private void sendPrivileged(String key, byte[] value, Timer timer) throws ExecutionException, InterruptedException, TimeoutException {
+ sendPrivileged(Collections.singletonList(new ProducerKeyValue(key, value)), timer);
+ }
+
+ /**
+ * Send one or more records to the config topic synchronously. Note that {@link #claimWritePrivileges()} must be
+ * successfully invoked before calling this method if this store is configured to use a fencable writer.
+ * @param keyValues the list of producer record key/value pairs
+ * @param timer Timer bounding how long this method can block. The timer is updated before the method returns.
+ */
+ private void sendPrivileged(List<ProducerKeyValue> keyValues, Timer timer) throws ExecutionException, InterruptedException, TimeoutException {
if (!usesFencableWriter) {
- configLog.send(key, value);
+ List<Future<RecordMetadata>> producerFutures = new ArrayList<>();
+ keyValues.forEach(
+ keyValue -> producerFutures.add(configLog.send(keyValue.key, keyValue.value))
+ );
+
+ timer.update();
+ for (Future<RecordMetadata> future : producerFutures) {
+ future.get(timer.remainingMs(), TimeUnit.MILLISECONDS);
+ timer.update();
+ }
+
return;
}
@@ -714,8 +798,11 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
try {
fencableProducer.beginTransaction();
- fencableProducer.send(new ProducerRecord<>(topic, key, value));
+ keyValues.forEach(
+ keyValue -> fencableProducer.send(new ProducerRecord<>(topic, keyValue.key, keyValue.value))
+ );
fencableProducer.commitTransaction();
+ timer.update();
} catch (Exception e) {
log.warn("Failed to perform fencable send to config topic", e);
relinquishWritePrivileges();
@@ -723,6 +810,16 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
}
}
+ private static class ProducerKeyValue {
+ final String key;
+ final byte[] value;
+
+ ProducerKeyValue(String key, byte[] value) {
+ this.key = key;
+ this.value = value;
+ }
+ }
+
private void relinquishWritePrivileges() {
if (fencableProducer != null) {
Utils.closeQuietly(() -> fencableProducer.close(Duration.ZERO), "fencable producer for config topic");
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index cde63b3f833..899b42dd877 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -25,6 +25,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
@@ -342,12 +343,29 @@ public class KafkaBasedLog<K, V> {
return future;
}
- public void send(K key, V value) {
- send(key, value, null);
+ /**
+ * Send a record asynchronously to the configured {@link #topic} without using a producer callback.
+ * @param key the key for the {@link ProducerRecord}
+ * @param value the value for the {@link ProducerRecord}
+ *
+ * @return the future from the call to {@link Producer#send}. {@link Future#get} can be called on this returned
+ * future if synchronous behavior is desired.
+ */
+ public Future<RecordMetadata> send(K key, V value) {
+ return send(key, value, null);
}
- public void send(K key, V value, org.apache.kafka.clients.producer.Callback callback) {
- producer.orElseThrow(() ->
+ /**
+ * Send a record asynchronously to the configured {@link #topic}
+ * @param key the key for the {@link ProducerRecord}
+ * @param value the value for the {@link ProducerRecord}
+ * @param callback the callback to invoke after completion; can be null if no callback is desired
+ *
+ * @return the future from the call to {@link Producer#send}. {@link Future#get} can be called on this returned
+ * future if synchronous behavior is desired.
+ */
+ public Future<RecordMetadata> send(K key, V value, org.apache.kafka.clients.producer.Callback callback) {
+ return producer.orElseThrow(() ->
new IllegalStateException("This KafkaBasedLog was created in read-only mode and does not support write operations")
).send(new ProducerRecord<>(topic, key, value), callback);
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index f2985cc8a23..886163f2348 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -762,6 +762,58 @@ public class DistributedHerderTest extends ThreadedTest {
PowerMock.verifyAll();
}
+ @Test
+ public void testCreateConnectorConfigBackingStoreError() {
+ EasyMock.expect(member.memberId()).andStubReturn("leader");
+ EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+ expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true);
+ expectConfigRefreshAndSnapshot(SNAPSHOT);
+
+ member.wakeup();
+ PowerMock.expectLastCall();
+
+ // mock the actual validation since its asynchronous nature is difficult to test and should
+ // be covered sufficiently by the unit tests for the AbstractHerder class
+ Capture<Callback<ConfigInfos>> validateCallback = newCapture();
+ herder.validateConnectorConfig(EasyMock.eq(CONN2_CONFIG), capture(validateCallback));
+ PowerMock.expectLastCall().andAnswer(() -> {
+ validateCallback.getValue().onCompletion(null, CONN2_CONFIG_INFOS);
+ return null;
+ });
+
+ configBackingStore.putConnectorConfig(CONN2, CONN2_CONFIG);
+ PowerMock.expectLastCall().andThrow(new ConnectException("Error writing connector configuration to Kafka"));
+
+ // verify that the exception from config backing store write is propagated via the callback
+ putConnectorCallback.onCompletion(EasyMock.anyObject(ConnectException.class), EasyMock.isNull());
+ PowerMock.expectLastCall();
+ member.poll(EasyMock.anyInt());
+ PowerMock.expectLastCall();
+ // These will occur just before/during the second tick
+ member.wakeup();
+ PowerMock.expectLastCall();
+ member.ensureActive();
+ PowerMock.expectLastCall();
+ member.poll(EasyMock.anyInt());
+ PowerMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ herder.putConnectorConfig(CONN2, CONN2_CONFIG, false, putConnectorCallback);
+ // First tick runs the initial herder request, which issues an asynchronous request for
+ // connector validation
+ herder.tick();
+
+ // Once that validation is complete, another request is added to the herder request queue
+ // for actually performing the config write; this tick is for that request
+ herder.tick();
+
+ time.sleep(1000L);
+ assertStatistics(3, 1, 100, 1000L);
+
+ PowerMock.verifyAll();
+ }
+
@Test
public void testCreateConnectorFailedValidation() throws Exception {
EasyMock.expect(member.memberId()).andStubReturn("leader");
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index b374f8f5d2f..0eb45eb4971 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -22,15 +22,19 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.RestartRequest;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
@@ -62,6 +66,8 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
@@ -71,6 +77,7 @@ import static org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.GROUP_ID_CONFIG;
import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.INCLUDE_TASKS_FIELD_NAME;
import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.ONLY_FAILED_FIELD_NAME;
+import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.READ_WRITE_TOTAL_TIMEOUT_MS;
import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.RESTART_KEY;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG;
import static org.junit.Assert.assertEquals;
@@ -168,14 +175,17 @@ public class KafkaConfigBackingStoreTest {
KafkaBasedLog<String, byte[]> storeLog;
@Mock
Producer<String, byte[]> fencableProducer;
+ @Mock
+ Future<RecordMetadata> producerFuture;
private KafkaConfigBackingStore configStorage;
- private Capture<String> capturedTopic = EasyMock.newCapture();
- private Capture<Map<String, Object>> capturedProducerProps = EasyMock.newCapture();
- private Capture<Map<String, Object>> capturedConsumerProps = EasyMock.newCapture();
- private Capture<Supplier<TopicAdmin>> capturedAdminSupplier = EasyMock.newCapture();
- private Capture<NewTopic> capturedNewTopic = EasyMock.newCapture();
- private Capture<Callback<ConsumerRecord<String, byte[]>>> capturedConsumedCallback = EasyMock.newCapture();
+ private final Capture<String> capturedTopic = EasyMock.newCapture();
+ private final Capture<Map<String, Object>> capturedProducerProps = EasyMock.newCapture();
+ private final Capture<Map<String, Object>> capturedConsumerProps = EasyMock.newCapture();
+ private final Capture<Supplier<TopicAdmin>> capturedAdminSupplier = EasyMock.newCapture();
+ private final Capture<NewTopic> capturedNewTopic = EasyMock.newCapture();
+ private final Capture<Callback<ConsumerRecord<String, byte[]>>> capturedConsumedCallback = EasyMock.newCapture();
+ private final MockTime time = new MockTime();
private long logOffset = 0;
@@ -183,7 +193,7 @@ public class KafkaConfigBackingStoreTest {
configStorage = PowerMock.createPartialMock(
KafkaConfigBackingStore.class,
new String[]{"createKafkaBasedLog", "createFencableProducer"},
- converter, config, null);
+ converter, config, null, null, time);
Whitebox.setInternalState(configStorage, "configLog", storeLog);
configStorage.setUpdateListener(configUpdateListener);
}
@@ -316,6 +326,92 @@ public class KafkaConfigBackingStoreTest {
PowerMock.verifyAll();
}
+ @Test
+ public void testPutConnectorConfigProducerError() throws Exception {
+ expectConfigure();
+ expectStart(Collections.emptyList(), Collections.emptyMap());
+ expectPartitionCount(1);
+
+ expectConvert(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONNECTOR_CONFIG_STRUCTS.get(0), CONFIGS_SERIALIZED.get(0));
+
+ storeLog.send(EasyMock.anyObject(), EasyMock.anyObject());
+ EasyMock.expectLastCall().andReturn(producerFuture);
+
+ producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject());
+ EasyMock.expectLastCall().andThrow(new ExecutionException(new TopicAuthorizationException(Collections.singleton("test"))));
+
+ expectStop();
+
+ PowerMock.replayAll();
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
+ configStorage.start();
+
+ // Verify initial state
+ ClusterConfigState configState = configStorage.snapshot();
+ assertEquals(-1, configState.offset());
+ assertEquals(0, configState.connectors().size());
+
+ // verify that the producer exception from KafkaBasedLog::send is propagated
+ ConnectException e = assertThrows(ConnectException.class, () -> configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0)));
+ assertTrue(e.getMessage().contains("Error writing connector configuration to Kafka"));
+ configStorage.stop();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testRemoveConnectorConfigSlowProducer() throws Exception {
+ expectConfigure();
+ expectStart(Collections.emptyList(), Collections.emptyMap());
+ expectPartitionCount(1);
+
+ @SuppressWarnings("unchecked")
+ Future<RecordMetadata> connectorConfigProducerFuture = PowerMock.createMock(Future.class);
+ // tombstone for the connector config
+ storeLog.send(EasyMock.anyObject(), EasyMock.isNull());
+ EasyMock.expectLastCall().andReturn(connectorConfigProducerFuture);
+
+ @SuppressWarnings("unchecked")
+ Future<RecordMetadata> targetStateProducerFuture = PowerMock.createMock(Future.class);
+ // tombstone for the connector target state
+ storeLog.send(EasyMock.anyObject(), EasyMock.isNull());
+ EasyMock.expectLastCall().andReturn(targetStateProducerFuture);
+
+ connectorConfigProducerFuture.get(EasyMock.eq(READ_WRITE_TOTAL_TIMEOUT_MS), EasyMock.anyObject());
+ EasyMock.expectLastCall().andAnswer(() -> {
+ time.sleep(READ_WRITE_TOTAL_TIMEOUT_MS - 1000);
+ return null;
+ });
+
+ // the future get timeout is expected to be reduced according to how long the previous Future::get took
+ targetStateProducerFuture.get(EasyMock.eq(1000L), EasyMock.anyObject());
+ EasyMock.expectLastCall().andAnswer(() -> {
+ time.sleep(1000);
+ return null;
+ });
+
+ @SuppressWarnings("unchecked")
+ Future<Void> future = PowerMock.createMock(Future.class);
+ EasyMock.expect(storeLog.readToEnd()).andAnswer(() -> future);
+
+ // the Future::get calls on the previous two producer futures exhausted the overall timeout; so expect the
+ // timeout on the log read future to be 0
+ EasyMock.expect(future.get(EasyMock.eq(0L), EasyMock.anyObject())).andReturn(null);
+
+ expectStop();
+
+ PowerMock.replayAll();
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
+ configStorage.start();
+
+ configStorage.removeConnectorConfig("test-connector");
+ configStorage.stop();
+
+ PowerMock.verifyAll();
+ }
+
@Test
public void testWritePrivileges() throws Exception {
// With exactly.once.source.support = preparing (or also, "enabled"), we need to use a transactional producer
@@ -357,7 +453,9 @@ public class KafkaConfigBackingStoreTest {
// In the meantime, write a target state (which doesn't require write privileges)
expectConvert(KafkaConfigBackingStore.TARGET_STATE_V0, TARGET_STATE_PAUSED, CONFIGS_SERIALIZED.get(1));
storeLog.send("target-state-" + CONNECTOR_IDS.get(1), CONFIGS_SERIALIZED.get(1));
- PowerMock.expectLastCall();
+ EasyMock.expectLastCall().andReturn(producerFuture);
+ producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject());
+ EasyMock.expectLastCall().andReturn(null);
// Reclaim write privileges
expectFencableProducer();
@@ -1479,13 +1577,18 @@ public class KafkaConfigBackingStoreTest {
// from the log. Validate the data that is captured when the conversion is performed matches the specified data
// (by checking a single field's value)
private void expectConvertWriteRead(final String configKey, final Schema valueSchema, final byte[] serialized,
- final String dataFieldName, final Object dataFieldValue) {
+ final String dataFieldName, final Object dataFieldValue) throws Exception {
final Capture<Struct> capturedRecord = EasyMock.newCapture();
if (serialized != null)
EasyMock.expect(converter.fromConnectData(EasyMock.eq(TOPIC), EasyMock.eq(valueSchema), EasyMock.capture(capturedRecord)))
.andReturn(serialized);
+
storeLog.send(EasyMock.eq(configKey), EasyMock.aryEq(serialized));
- PowerMock.expectLastCall();
+ EasyMock.expectLastCall().andReturn(producerFuture);
+
+ producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject());
+ EasyMock.expectLastCall().andReturn(null);
+
EasyMock.expect(converter.toConnectData(EasyMock.eq(TOPIC), EasyMock.aryEq(serialized)))
.andAnswer(() -> {
if (dataFieldName != null)
@@ -1518,7 +1621,7 @@ public class KafkaConfigBackingStoreTest {
});
}
- private void expectConnectorRemoval(String configKey, String targetStateKey) {
+ private void expectConnectorRemoval(String configKey, String targetStateKey) throws Exception {
expectConvertWriteRead(configKey, KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, null, null, null);
expectConvertWriteRead(targetStateKey, KafkaConfigBackingStore.TARGET_STATE_V0, null, null, null);
@@ -1529,7 +1632,7 @@ public class KafkaConfigBackingStoreTest {
}
private void expectConvertWriteAndRead(final String configKey, final Schema valueSchema, final byte[] serialized,
- final String dataFieldName, final Object dataFieldValue) {
+ final String dataFieldName, final Object dataFieldValue) throws Exception {
expectConvertWriteRead(configKey, valueSchema, serialized, dataFieldName, dataFieldValue);
LinkedHashMap<String, byte[]> recordsToRead = new LinkedHashMap<>();
recordsToRead.put(configKey, serialized);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
index cf11230f3d2..e6927ab62cc 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
@@ -193,11 +193,9 @@ public class KafkaOffsetBackingStoreTest {
// Set offsets
Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture();
- storeLog.send(EasyMock.aryEq(TP0_KEY.array()), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0));
- PowerMock.expectLastCall();
+ EasyMock.expect(storeLog.send(EasyMock.aryEq(TP0_KEY.array()), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0))).andReturn(null);
Capture<org.apache.kafka.clients.producer.Callback> callback1 = EasyMock.newCapture();
- storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.aryEq(TP1_VALUE.array()), EasyMock.capture(callback1));
- PowerMock.expectLastCall();
+ EasyMock.expect(storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.aryEq(TP1_VALUE.array()), EasyMock.capture(callback1))).andReturn(null);
// Second get() should get the produced data and return the new values
final Capture<Callback<Void>> secondGetReadToEndCallback = EasyMock.newCapture();
@@ -276,10 +274,9 @@ public class KafkaOffsetBackingStoreTest {
// Set offsets
Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture();
- storeLog.send(EasyMock.isNull(byte[].class), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0));
- PowerMock.expectLastCall();
+ EasyMock.expect(storeLog.send(EasyMock.isNull(byte[].class), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0))).andReturn(null);
Capture<org.apache.kafka.clients.producer.Callback> callback1 = EasyMock.newCapture();
- storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.isNull(byte[].class), EasyMock.capture(callback1));
+ EasyMock.expect(storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.isNull(byte[].class), EasyMock.capture(callback1))).andReturn(null);
PowerMock.expectLastCall();
// Second get() should get the produced data and return the new values
@@ -337,14 +334,11 @@ public class KafkaOffsetBackingStoreTest {
// Set offsets
Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture();
- storeLog.send(EasyMock.aryEq(TP0_KEY.array()), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0));
- PowerMock.expectLastCall();
+ EasyMock.expect(storeLog.send(EasyMock.aryEq(TP0_KEY.array()), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0))).andReturn(null);
Capture<org.apache.kafka.clients.producer.Callback> callback1 = EasyMock.newCapture();
- storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.aryEq(TP1_VALUE.array()), EasyMock.capture(callback1));
- PowerMock.expectLastCall();
+ EasyMock.expect(storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.aryEq(TP1_VALUE.array()), EasyMock.capture(callback1))).andReturn(null);
Capture<org.apache.kafka.clients.producer.Callback> callback2 = EasyMock.newCapture();
- storeLog.send(EasyMock.aryEq(TP2_KEY.array()), EasyMock.aryEq(TP2_VALUE.array()), EasyMock.capture(callback2));
- PowerMock.expectLastCall();
+ EasyMock.expect(storeLog.send(EasyMock.aryEq(TP2_KEY.array()), EasyMock.aryEq(TP2_VALUE.array()), EasyMock.capture(callback2))).andReturn(null);
expectClusterId();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
index ccbf2c495d6..4396f7dbb50 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
@@ -260,7 +260,7 @@ public class EmbeddedConnectCluster {
putIfAbsent(workerProps, OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG, internalTopicsReplFactor);
putIfAbsent(workerProps, CONFIG_TOPIC_CONFIG, "connect-config-topic-" + connectClusterName);
putIfAbsent(workerProps, CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG, internalTopicsReplFactor);
- putIfAbsent(workerProps, STATUS_STORAGE_TOPIC_CONFIG, "connect-storage-topic-" + connectClusterName);
+ putIfAbsent(workerProps, STATUS_STORAGE_TOPIC_CONFIG, "connect-status-topic-" + connectClusterName);
putIfAbsent(workerProps, STATUS_STORAGE_REPLICATION_FACTOR_CONFIG, internalTopicsReplFactor);
putIfAbsent(workerProps, KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
putIfAbsent(workerProps, VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");