You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2023/05/09 13:14:58 UTC

[kafka] branch 3.4 updated: KAFKA-14974: Restore backward compatibility in KafkaBasedLog (#13688)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
     new 721a917b444 KAFKA-14974: Restore backward compatibility in KafkaBasedLog (#13688)
721a917b444 is described below

commit 721a917b444fb7d06790fe40c4a21c70d7790c97
Author: Yash Mayya <ya...@gmail.com>
AuthorDate: Tue May 9 17:58:45 2023 +0530

    KAFKA-14974: Restore backward compatibility in KafkaBasedLog (#13688)
    
    `KafkaBasedLog` is a widely used utility class that provides a generic implementation of a shared, compacted log of records in a Kafka topic. It isn't in Connect's public API, but has been used outside of Connect and we try to preserve backward compatibility whenever possible. KAFKA-14455 modified the two overloaded void `KafkaBasedLog::send` methods to return a `Future`. While this change is source compatible, it isn't binary compatible. We can restore backward compatibility simply b [...]
    
    This refactoring changes no functionality other than restoring the older methods.
    
    Reviewers: Randall Hauch <rh...@gmail.com>
---
 .../connect/storage/KafkaConfigBackingStore.java   |  4 +--
 .../apache/kafka/connect/util/KafkaBasedLog.java   | 37 +++++++++++++++++++---
 .../storage/KafkaConfigBackingStoreTest.java       | 10 +++---
 3 files changed, 40 insertions(+), 11 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index e9f87697308..b5804697600 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -637,7 +637,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
         byte[] serializedTargetState = converter.fromConnectData(topic, TARGET_STATE_V0, connectTargetState);
         log.debug("Writing target state {} for connector {}", state, connector);
         try {
-            configLog.send(TARGET_STATE_KEY(connector), serializedTargetState).get(READ_WRITE_TOTAL_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+            configLog.sendWithReceipt(TARGET_STATE_KEY(connector), serializedTargetState).get(READ_WRITE_TOTAL_TIMEOUT_MS, TimeUnit.MILLISECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException e) {
             log.error("Failed to write target state to Kafka", e);
             throw new ConnectException("Error writing target state to Kafka", e);
@@ -789,7 +789,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
         if (!usesFencableWriter) {
             List<Future<RecordMetadata>> producerFutures = new ArrayList<>();
             keyValues.forEach(
-                    keyValue -> producerFutures.add(configLog.send(keyValue.key, keyValue.value))
+                    keyValue -> producerFutures.add(configLog.sendWithReceipt(keyValue.key, keyValue.value))
             );
 
             timer.update();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index 431ae871ce9..a05a05dffe7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -77,6 +77,10 @@ import java.util.function.Supplier;
  *     calling class keeps track of state based on the log and only writes to it when consume callbacks are invoked
  *     and only reads it in {@link #readToEnd(Callback)} callbacks then no additional synchronization will be required.
  * </p>
+ * <p>
+ *     This is a useful utility that has been used outside of Connect. This isn't in Connect's public API,
+ *     but we've tried to maintain the method signatures and backward compatibility since early Kafka versions.
+ * </p>
  */
 public class KafkaBasedLog<K, V> {
     private static final Logger log = LoggerFactory.getLogger(KafkaBasedLog.class);
@@ -351,6 +355,31 @@ public class KafkaBasedLog<K, V> {
         return future;
     }
 
+    /**
+     * Send a record asynchronously to the configured {@link #topic} without using a producer callback.
+     * <p>
+     * This method exists for backward compatibility reasons and delegates to the newer
+     * {@link #sendWithReceipt(Object, Object)} method that returns a future.
+     * @param key the key for the {@link ProducerRecord}
+     * @param value the value for the {@link ProducerRecord}
+     */
+    public void send(K key, V value) {
+        sendWithReceipt(key, value);
+    }
+
+    /**
+     * Send a record asynchronously to the configured {@link #topic}.
+     * <p>
+     * This method exists for backward compatibility reasons and delegates to the newer
+     * {@link #sendWithReceipt(Object, Object, org.apache.kafka.clients.producer.Callback)} method that returns a future.
+     * @param key the key for the {@link ProducerRecord}
+     * @param value the value for the {@link ProducerRecord}
+     * @param callback the callback to invoke after completion; can be null if no callback is desired
+     */
+    public void send(K key, V value, org.apache.kafka.clients.producer.Callback callback) {
+        sendWithReceipt(key, value, callback);
+    }
+
     /**
      * Send a record asynchronously to the configured {@link #topic} without using a producer callback.
      * @param key the key for the {@link ProducerRecord}
@@ -359,12 +388,12 @@ public class KafkaBasedLog<K, V> {
      * @return the future from the call to {@link Producer#send}. {@link Future#get} can be called on this returned
      *         future if synchronous behavior is desired.
      */
-    public Future<RecordMetadata> send(K key, V value) {
-        return send(key, value, null);
+    public Future<RecordMetadata> sendWithReceipt(K key, V value) {
+        return sendWithReceipt(key, value, null);
     }
 
     /**
-     * Send a record asynchronously to the configured {@link #topic}
+     * Send a record asynchronously to the configured {@link #topic}.
      * @param key the key for the {@link ProducerRecord}
      * @param value the value for the {@link ProducerRecord}
      * @param callback the callback to invoke after completion; can be null if no callback is desired
@@ -372,7 +401,7 @@ public class KafkaBasedLog<K, V> {
      * @return the future from the call to {@link Producer#send}. {@link Future#get} can be called on this returned
      *         future if synchronous behavior is desired.
      */
-    public Future<RecordMetadata> send(K key, V value, org.apache.kafka.clients.producer.Callback callback) {
+    public Future<RecordMetadata> sendWithReceipt(K key, V value, org.apache.kafka.clients.producer.Callback callback) {
         return producer.orElseThrow(() ->
                 new IllegalStateException("This KafkaBasedLog was created in read-only mode and does not support write operations")
         ).send(new ProducerRecord<>(topic, key, value), callback);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index 8fb3bf30165..20e16c2ed75 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -344,7 +344,7 @@ public class KafkaConfigBackingStoreTest {
 
         expectConvert(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONNECTOR_CONFIG_STRUCTS.get(0), CONFIGS_SERIALIZED.get(0));
 
-        storeLog.send(EasyMock.anyObject(), EasyMock.anyObject());
+        storeLog.sendWithReceipt(EasyMock.anyObject(), EasyMock.anyObject());
         EasyMock.expectLastCall().andReturn(producerFuture);
 
         producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject());
@@ -379,13 +379,13 @@ public class KafkaConfigBackingStoreTest {
         @SuppressWarnings("unchecked")
         Future<RecordMetadata> connectorConfigProducerFuture = PowerMock.createMock(Future.class);
         // tombstone for the connector config
-        storeLog.send(EasyMock.anyObject(), EasyMock.isNull());
+        storeLog.sendWithReceipt(EasyMock.anyObject(), EasyMock.isNull());
         EasyMock.expectLastCall().andReturn(connectorConfigProducerFuture);
 
         @SuppressWarnings("unchecked")
         Future<RecordMetadata> targetStateProducerFuture = PowerMock.createMock(Future.class);
         // tombstone for the connector target state
-        storeLog.send(EasyMock.anyObject(), EasyMock.isNull());
+        storeLog.sendWithReceipt(EasyMock.anyObject(), EasyMock.isNull());
         EasyMock.expectLastCall().andReturn(targetStateProducerFuture);
 
         connectorConfigProducerFuture.get(EasyMock.eq(READ_WRITE_TOTAL_TIMEOUT_MS), EasyMock.anyObject());
@@ -460,7 +460,7 @@ public class KafkaConfigBackingStoreTest {
 
         // In the meantime, write a target state (which doesn't require write privileges)
         expectConvert(KafkaConfigBackingStore.TARGET_STATE_V0, TARGET_STATE_PAUSED, CONFIGS_SERIALIZED.get(1));
-        storeLog.send("target-state-" + CONNECTOR_IDS.get(1), CONFIGS_SERIALIZED.get(1));
+        storeLog.sendWithReceipt("target-state-" + CONNECTOR_IDS.get(1), CONFIGS_SERIALIZED.get(1));
         EasyMock.expectLastCall().andReturn(producerFuture);
         producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject());
         EasyMock.expectLastCall().andReturn(null);
@@ -1601,7 +1601,7 @@ public class KafkaConfigBackingStoreTest {
             EasyMock.expect(converter.fromConnectData(EasyMock.eq(TOPIC), EasyMock.eq(valueSchema), EasyMock.capture(capturedRecord)))
                     .andReturn(serialized);
 
-        storeLog.send(EasyMock.eq(configKey), EasyMock.aryEq(serialized));
+        storeLog.sendWithReceipt(EasyMock.eq(configKey), EasyMock.aryEq(serialized));
         EasyMock.expectLastCall().andReturn(producerFuture);
 
         producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject());