You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2023/05/09 16:12:47 UTC
[kafka] branch 3.3 updated: KAFKA-14974: Restore backward compatibility in KafkaBasedLog (#13688)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 9c3ca5afe2d KAFKA-14974: Restore backward compatibility in KafkaBasedLog (#13688)
9c3ca5afe2d is described below
commit 9c3ca5afe2dbcef16d1d30caa436369d9fd9db6a
Author: Yash Mayya <ya...@gmail.com>
AuthorDate: Tue May 9 17:58:45 2023 +0530
KAFKA-14974: Restore backward compatibility in KafkaBasedLog (#13688)
`KafkaBasedLog` is a widely used utility class that provides a generic implementation of a shared, compacted log of records in a Kafka topic. It isn't in Connect's public API, but has been used outside of Connect and we try to preserve backward compatibility whenever possible. KAFKA-14455 modified the two overloaded void `KafkaBasedLog::send` methods to return a `Future`. While this change is source compatible, it isn't binary compatible. We can restore backward compatibility simply b [...]
This refactoring changes no functionality other than restoring the older methods.
Reviewers: Randall Hauch <rh...@gmail.com>
---
.../connect/storage/KafkaConfigBackingStore.java | 4 +--
.../apache/kafka/connect/util/KafkaBasedLog.java | 37 +++++++++++++++++++---
.../storage/KafkaConfigBackingStoreTest.java | 10 +++---
.../storage/KafkaOffsetBackingStoreTest.java | 14 ++++----
4 files changed, 47 insertions(+), 18 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index ac1f4ad7829..8f3b38c2d26 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -631,7 +631,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
byte[] serializedTargetState = converter.fromConnectData(topic, TARGET_STATE_V0, connectTargetState);
log.debug("Writing target state {} for connector {}", state, connector);
try {
- configLog.send(TARGET_STATE_KEY(connector), serializedTargetState).get(READ_WRITE_TOTAL_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ configLog.sendWithReceipt(TARGET_STATE_KEY(connector), serializedTargetState).get(READ_WRITE_TOTAL_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Failed to write target state to Kafka", e);
throw new ConnectException("Error writing target state to Kafka", e);
@@ -780,7 +780,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
if (!usesFencableWriter) {
List<Future<RecordMetadata>> producerFutures = new ArrayList<>();
keyValues.forEach(
- keyValue -> producerFutures.add(configLog.send(keyValue.key, keyValue.value))
+ keyValue -> producerFutures.add(configLog.sendWithReceipt(keyValue.key, keyValue.value))
);
timer.update();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index 431ae871ce9..a05a05dffe7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -77,6 +77,10 @@ import java.util.function.Supplier;
* calling class keeps track of state based on the log and only writes to it when consume callbacks are invoked
* and only reads it in {@link #readToEnd(Callback)} callbacks then no additional synchronization will be required.
* </p>
+ * <p>
+ * This is a useful utility that has been used outside of Connect. This isn't in Connect's public API,
+ * but we've tried to maintain the method signatures and backward compatibility since early Kafka versions.
+ * </p>
*/
public class KafkaBasedLog<K, V> {
private static final Logger log = LoggerFactory.getLogger(KafkaBasedLog.class);
@@ -351,6 +355,31 @@ public class KafkaBasedLog<K, V> {
return future;
}
+ /**
+ * Send a record asynchronously to the configured {@link #topic} without using a producer callback.
+ * <p>
+ * This method exists for backward compatibility reasons and delegates to the newer
+ * {@link #sendWithReceipt(Object, Object)} method that returns a future.
+ * @param key the key for the {@link ProducerRecord}
+ * @param value the value for the {@link ProducerRecord}
+ */
+ public void send(K key, V value) {
+ sendWithReceipt(key, value);
+ }
+
+ /**
+ * Send a record asynchronously to the configured {@link #topic}.
+ * <p>
+ * This method exists for backward compatibility reasons and delegates to the newer
+ * {@link #sendWithReceipt(Object, Object, org.apache.kafka.clients.producer.Callback)} method that returns a future.
+ * @param key the key for the {@link ProducerRecord}
+ * @param value the value for the {@link ProducerRecord}
+ * @param callback the callback to invoke after completion; can be null if no callback is desired
+ */
+ public void send(K key, V value, org.apache.kafka.clients.producer.Callback callback) {
+ sendWithReceipt(key, value, callback);
+ }
+
/**
* Send a record asynchronously to the configured {@link #topic} without using a producer callback.
* @param key the key for the {@link ProducerRecord}
@@ -359,12 +388,12 @@ public class KafkaBasedLog<K, V> {
* @return the future from the call to {@link Producer#send}. {@link Future#get} can be called on this returned
* future if synchronous behavior is desired.
*/
- public Future<RecordMetadata> send(K key, V value) {
- return send(key, value, null);
+ public Future<RecordMetadata> sendWithReceipt(K key, V value) {
+ return sendWithReceipt(key, value, null);
}
/**
- * Send a record asynchronously to the configured {@link #topic}
+ * Send a record asynchronously to the configured {@link #topic}.
* @param key the key for the {@link ProducerRecord}
* @param value the value for the {@link ProducerRecord}
* @param callback the callback to invoke after completion; can be null if no callback is desired
@@ -372,7 +401,7 @@ public class KafkaBasedLog<K, V> {
* @return the future from the call to {@link Producer#send}. {@link Future#get} can be called on this returned
* future if synchronous behavior is desired.
*/
- public Future<RecordMetadata> send(K key, V value, org.apache.kafka.clients.producer.Callback callback) {
+ public Future<RecordMetadata> sendWithReceipt(K key, V value, org.apache.kafka.clients.producer.Callback callback) {
return producer.orElseThrow(() ->
new IllegalStateException("This KafkaBasedLog was created in read-only mode and does not support write operations")
).send(new ProducerRecord<>(topic, key, value), callback);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index 0eb45eb4971..d66b5c62b7d 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -334,7 +334,7 @@ public class KafkaConfigBackingStoreTest {
expectConvert(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONNECTOR_CONFIG_STRUCTS.get(0), CONFIGS_SERIALIZED.get(0));
- storeLog.send(EasyMock.anyObject(), EasyMock.anyObject());
+ storeLog.sendWithReceipt(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().andReturn(producerFuture);
producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject());
@@ -369,13 +369,13 @@ public class KafkaConfigBackingStoreTest {
@SuppressWarnings("unchecked")
Future<RecordMetadata> connectorConfigProducerFuture = PowerMock.createMock(Future.class);
// tombstone for the connector config
- storeLog.send(EasyMock.anyObject(), EasyMock.isNull());
+ storeLog.sendWithReceipt(EasyMock.anyObject(), EasyMock.isNull());
EasyMock.expectLastCall().andReturn(connectorConfigProducerFuture);
@SuppressWarnings("unchecked")
Future<RecordMetadata> targetStateProducerFuture = PowerMock.createMock(Future.class);
// tombstone for the connector target state
- storeLog.send(EasyMock.anyObject(), EasyMock.isNull());
+ storeLog.sendWithReceipt(EasyMock.anyObject(), EasyMock.isNull());
EasyMock.expectLastCall().andReturn(targetStateProducerFuture);
connectorConfigProducerFuture.get(EasyMock.eq(READ_WRITE_TOTAL_TIMEOUT_MS), EasyMock.anyObject());
@@ -452,7 +452,7 @@ public class KafkaConfigBackingStoreTest {
// In the meantime, write a target state (which doesn't require write privileges)
expectConvert(KafkaConfigBackingStore.TARGET_STATE_V0, TARGET_STATE_PAUSED, CONFIGS_SERIALIZED.get(1));
- storeLog.send("target-state-" + CONNECTOR_IDS.get(1), CONFIGS_SERIALIZED.get(1));
+ storeLog.sendWithReceipt("target-state-" + CONNECTOR_IDS.get(1), CONFIGS_SERIALIZED.get(1));
EasyMock.expectLastCall().andReturn(producerFuture);
producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject());
EasyMock.expectLastCall().andReturn(null);
@@ -1583,7 +1583,7 @@ public class KafkaConfigBackingStoreTest {
EasyMock.expect(converter.fromConnectData(EasyMock.eq(TOPIC), EasyMock.eq(valueSchema), EasyMock.capture(capturedRecord)))
.andReturn(serialized);
- storeLog.send(EasyMock.eq(configKey), EasyMock.aryEq(serialized));
+ storeLog.sendWithReceipt(EasyMock.eq(configKey), EasyMock.aryEq(serialized));
EasyMock.expectLastCall().andReturn(producerFuture);
producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject());
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
index e6927ab62cc..701351cb676 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
@@ -193,9 +193,9 @@ public class KafkaOffsetBackingStoreTest {
// Set offsets
Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture();
- EasyMock.expect(storeLog.send(EasyMock.aryEq(TP0_KEY.array()), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0))).andReturn(null);
+ EasyMock.expect(storeLog.sendWithReceipt(EasyMock.aryEq(TP0_KEY.array()), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0))).andReturn(null);
Capture<org.apache.kafka.clients.producer.Callback> callback1 = EasyMock.newCapture();
- EasyMock.expect(storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.aryEq(TP1_VALUE.array()), EasyMock.capture(callback1))).andReturn(null);
+ EasyMock.expect(storeLog.sendWithReceipt(EasyMock.aryEq(TP1_KEY.array()), EasyMock.aryEq(TP1_VALUE.array()), EasyMock.capture(callback1))).andReturn(null);
// Second get() should get the produced data and return the new values
final Capture<Callback<Void>> secondGetReadToEndCallback = EasyMock.newCapture();
@@ -274,9 +274,9 @@ public class KafkaOffsetBackingStoreTest {
// Set offsets
Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture();
- EasyMock.expect(storeLog.send(EasyMock.isNull(byte[].class), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0))).andReturn(null);
+ EasyMock.expect(storeLog.sendWithReceipt(EasyMock.isNull(byte[].class), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0))).andReturn(null);
Capture<org.apache.kafka.clients.producer.Callback> callback1 = EasyMock.newCapture();
- EasyMock.expect(storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.isNull(byte[].class), EasyMock.capture(callback1))).andReturn(null);
+ EasyMock.expect(storeLog.sendWithReceipt(EasyMock.aryEq(TP1_KEY.array()), EasyMock.isNull(byte[].class), EasyMock.capture(callback1))).andReturn(null);
PowerMock.expectLastCall();
// Second get() should get the produced data and return the new values
@@ -334,11 +334,11 @@ public class KafkaOffsetBackingStoreTest {
// Set offsets
Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture();
- EasyMock.expect(storeLog.send(EasyMock.aryEq(TP0_KEY.array()), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0))).andReturn(null);
+ EasyMock.expect(storeLog.sendWithReceipt(EasyMock.aryEq(TP0_KEY.array()), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0))).andReturn(null);
Capture<org.apache.kafka.clients.producer.Callback> callback1 = EasyMock.newCapture();
- EasyMock.expect(storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.aryEq(TP1_VALUE.array()), EasyMock.capture(callback1))).andReturn(null);
+ EasyMock.expect(storeLog.sendWithReceipt(EasyMock.aryEq(TP1_KEY.array()), EasyMock.aryEq(TP1_VALUE.array()), EasyMock.capture(callback1))).andReturn(null);
Capture<org.apache.kafka.clients.producer.Callback> callback2 = EasyMock.newCapture();
- EasyMock.expect(storeLog.send(EasyMock.aryEq(TP2_KEY.array()), EasyMock.aryEq(TP2_VALUE.array()), EasyMock.capture(callback2))).andReturn(null);
+ EasyMock.expect(storeLog.sendWithReceipt(EasyMock.aryEq(TP2_KEY.array()), EasyMock.aryEq(TP2_VALUE.array()), EasyMock.capture(callback2))).andReturn(null);
expectClusterId();