You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "C0urante (via GitHub)" <gi...@apache.org> on 2023/02/01 16:26:19 UTC

[GitHub] [kafka] C0urante commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic

C0urante commented on code in PR #12984:
URL: https://github.com/apache/kafka/pull/12984#discussion_r1093433981


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -711,9 +768,35 @@ KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final Wo
         return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminSupplier);
     }
 
-    private void sendPrivileged(String key, byte[] value) {
+    /**
+     * Send a single record to the config topic synchronously. Note that {@link #claimWritePrivileges()} must be
+     * successfully invoked before calling this method if this store is configured to use a fencable writer.
+     * @param key the record key
+     * @param value the record value
+     * @param timer Timer bounding how long this method can block. The timer is updated before the method returns.
+     */
+    private void sendPrivileged(String key, byte[] value, Timer timer) throws ExecutionException, InterruptedException, TimeoutException {
+        sendPrivileged(Collections.singletonList(new ProducerKeyValue(key, value)), timer);
+    }
+
+    /**
+     * Send one or more records to the config topic synchronously. Note that {@link #claimWritePrivileges()} must be
+     * successfully invoked before calling this method if this store is configured to use a fencable writer.
+     * @param keyValues the list of producer record key/value pairs
+     * @param timer Timer bounding how long this method can block. The timer is updated before the method returns.

Review Comment:
   Thank you for taking care to call out that the timer is updated before returning; it's an important detail.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -711,9 +768,35 @@ KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final Wo
         return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminSupplier);
     }
 
-    private void sendPrivileged(String key, byte[] value) {
+    /**
+     * Send a single record to the config topic synchronously. Note that {@link #claimWritePrivileges()} must be
+     * successfully invoked before calling this method if this store is configured to use a fencable writer.
+     * @param key the record key
+     * @param value the record value
+     * @param timer Timer bounding how long this method can block. The timer is updated before the method returns.
+     */
+    private void sendPrivileged(String key, byte[] value, Timer timer) throws ExecutionException, InterruptedException, TimeoutException {
+        sendPrivileged(Collections.singletonList(new ProducerKeyValue(key, value)), timer);
+    }
+
+    /**
+     * Send one or more records to the config topic synchronously. Note that {@link #claimWritePrivileges()} must be
+     * successfully invoked before calling this method if this store is configured to use a fencable writer.
+     * @param keyValues the list of producer record key/value pairs
+     * @param timer Timer bounding how long this method can block. The timer is updated before the method returns.
+     */
+    private void sendPrivileged(List<ProducerKeyValue> keyValues, Timer timer) throws ExecutionException, InterruptedException, TimeoutException {
         if (!usesFencableWriter) {
-            configLog.send(key, value);
+            List<Future<RecordMetadata>> producerFutures = new ArrayList<>();
+            keyValues.forEach(
+                    keyValue -> producerFutures.add(configLog.send(keyValue.key, keyValue.value))
+            );
+
+            for (Future<RecordMetadata> future : producerFutures) {

Review Comment:
   We should update the timer before waiting on the first future, since in some rare cases `configLog::send` may actually block for a bit:
   
   ```suggestion
               timer.update();
               for (Future<RecordMetadata> future : producerFutures) {
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -723,12 +806,26 @@ private void sendPrivileged(String key, byte[] value) {
 
         try {
             fencableProducer.beginTransaction();
-            fencableProducer.send(new ProducerRecord<>(topic, key, value));
+            keyValues.forEach(
+                    keyValue -> fencableProducer.send(new ProducerRecord<>(topic, keyValue.key, keyValue.value))
+            );
             fencableProducer.commitTransaction();
         } catch (Exception e) {
             log.warn("Failed to perform fencable send to config topic", e);
             relinquishWritePrivileges();
             throw new PrivilegedWriteException("Failed to perform fencable send to config topic", e);
+        } finally {
+            timer.update();
+        }

Review Comment:
   I'm not sure this belongs in a `finally` block, especially since that introduces inconsistencies in how this method updates the timer depending on whether EOS source support is enabled. IMO it's fine to add inside the `try` block after the call to `fencableProducer::commitTransaction`



##########
clients/src/main/java/org/apache/kafka/common/utils/Timer.java:
##########
@@ -161,7 +161,7 @@ public long remainingMs() {
     /**
      * Get the current time in milliseconds. This will return the same cached value until the timer
      * has been updated using one of the {@link #update()} methods or {@link #sleep(long)} is used.
-     *
+     * <p>

Review Comment:
   In the future, please save nonessential improvements like this for a dedicated PR; it adds noise to the diff and makes things harder to review.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -712,8 +733,16 @@ KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final Wo
     }
 
     private void sendPrivileged(String key, byte[] value) {
+        sendPrivileged(key, value, null);
+    }
+
+    private void sendPrivileged(String key, byte[] value, Callback<Void> callback) {
         if (!usesFencableWriter) {
-            configLog.send(key, value);

Review Comment:
   (Resolving this convo as it's taking up way too much real estate in the GitHub UI)



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -711,9 +752,32 @@ KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final Wo
         return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminSupplier);
     }
 
-    private void sendPrivileged(String key, byte[] value) {
+    /**
+     * Send a single record to the config topic synchronously. Note that {@link #claimWritePrivileges()} must be
+     * successfully invoked before calling this method if this store is configured to use a fencable writer.
+     * @param key the record key
+     * @param value the record value
+     */
+    private void sendPrivileged(String key, byte[] value) throws ExecutionException, InterruptedException, TimeoutException {
+        sendPrivileged(Collections.singletonList(new ProducerKeyValue(key, value)));
+    }
+
+    /**
+     * Send one or more records to the config topic synchronously. Note that {@link #claimWritePrivileges()} must be
+     * successfully invoked before calling this method if this store is configured to use a fencable writer.
+     * @param keyValues the list of producer record key/value pairs
+     */
+    private void sendPrivileged(List<ProducerKeyValue> keyValues) throws ExecutionException, InterruptedException, TimeoutException {
         if (!usesFencableWriter) {
-            configLog.send(key, value);
+            List<Future<RecordMetadata>> producerFutures = new ArrayList<>();
+            keyValues.forEach(
+                    keyValue -> producerFutures.add(configLog.send(keyValue.key, keyValue.value))
+            );
+
+            for (Future<RecordMetadata> future : producerFutures) {
+                future.get(READ_WRITE_TIMEOUT_MS, TimeUnit.MILLISECONDS);

Review Comment:
   Yeah, that's fine. The less time we spend blocking the tick thread, the better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org