You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "yashmayya (via GitHub)" <gi...@apache.org> on 2023/02/01 11:22:15 UTC

[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic

yashmayya commented on code in PR #12984:
URL: https://github.com/apache/kafka/pull/12984#discussion_r1093090686


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -711,9 +752,32 @@ KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final Wo
         return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminSupplier);
     }
 
-    private void sendPrivileged(String key, byte[] value) {
+    /**
+     * Send a single record to the config topic synchronously. Note that {@link #claimWritePrivileges()} must be
+     * successfully invoked before calling this method if this store is configured to use a fencable writer.
+     * @param key the record key
+     * @param value the record value
+     */
+    private void sendPrivileged(String key, byte[] value) throws ExecutionException, InterruptedException, TimeoutException {
+        sendPrivileged(Collections.singletonList(new ProducerKeyValue(key, value)));
+    }
+
+    /**
+     * Send one or more records to the config topic synchronously. Note that {@link #claimWritePrivileges()} must be
+     * successfully invoked before calling this method if this store is configured to use a fencable writer.
+     * @param keyValues the list of producer record key/value pairs
+     */
+    private void sendPrivileged(List<ProducerKeyValue> keyValues) throws ExecutionException, InterruptedException, TimeoutException {
         if (!usesFencableWriter) {
-            configLog.send(key, value);
+            List<Future<RecordMetadata>> producerFutures = new ArrayList<>();
+            keyValues.forEach(
+                    keyValue -> producerFutures.add(configLog.send(keyValue.key, keyValue.value))
+            );
+
+            for (Future<RecordMetadata> future : producerFutures) {
+                future.get(READ_WRITE_TIMEOUT_MS, TimeUnit.MILLISECONDS);

Review Comment:
   > Considering this is all taking place on the herder's tick thread, we should probably care about the difference.
   
   Makes sense.
   
   > We might be able to use the [Timer class](https://github.com/apache/kafka/blob/eb7f490159c924ca0f21394d58366c257998f52e/clients/src/main/java/org/apache/kafka/common/utils/Timer.java) to simplify some of this logic.
   
   Thanks, that's a great suggestion and it looks like a perfect fit for the use case here. One thing I'd like to call out is that the existing `putTaskConfigs` implementation had multiple (3) reads to the end of the log with each having a 30 second timeout (thus potentially blocking for up to 90 seconds in total). With the latest pushed changes in this PR, the overall timeout across all reads and writes done by `putTaskConfigs` will now be 30 seconds. While I do believe that this is the right thing to do, I just wanted to call it out explicitly to make sure that we're on the same page!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org