You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "C0urante (via GitHub)" <gi...@apache.org> on 2023/01/23 21:17:35 UTC

[GitHub] [kafka] C0urante commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic

C0urante commented on code in PR #12984:
URL: https://github.com/apache/kafka/pull/12984#discussion_r1084555816


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -711,9 +742,9 @@ KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final Wo
         return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminSupplier);
     }
 
-    private void sendPrivileged(String key, byte[] value) {
+    private void sendPrivileged(String key, byte[] value) throws ExecutionException, InterruptedException {
         if (!usesFencableWriter) {
-            configLog.send(key, value);
+            configLog.send(key, value).get();

Review Comment:
   Unbounded waiting for the record to send doesn't seem very safe here. It looks like we were already trying to achieve synchronous writes by immediately performing a read-to-end after the call to `configLog::send` with a timeout of `READ_TO_END_TIMEOUT_MS`; could we use that same timeout while waiting for producer send futures to be completed (possibly after renaming to reflect the new purpose of the timeout)?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -711,9 +742,9 @@ KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final Wo
         return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminSupplier);
     }
 
-    private void sendPrivileged(String key, byte[] value) {
+    private void sendPrivileged(String key, byte[] value) throws ExecutionException, InterruptedException {
         if (!usesFencableWriter) {
-            configLog.send(key, value);
+            configLog.send(key, value).get();

Review Comment:
   Also, making this synchronously await the producer future's completion has the potential to slow things down for operations that involve writing multiple records to the config topic (such as [deleting a connector](https://github.com/apache/kafka/blob/b2cb546fba03bbdc4054c7d120b0b2654c7cf34e/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L493-L494)).
   
   This is already the case with exactly-once support enabled since we perform each write in its own transaction, but while we're in the neighborhood, if there's an easy way to group together producer sends before awaiting the completion of any of their futures, it'd be nice. Not going to block on this, though.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -712,8 +733,16 @@ KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final Wo
     }
 
     private void sendPrivileged(String key, byte[] value) {
+        sendPrivileged(key, value, null);
+    }
+
+    private void sendPrivileged(String key, byte[] value, Callback<Void> callback) {
         if (!usesFencableWriter) {
-            configLog.send(key, value);

Review Comment:
   Do we still see the cause of the failure to send in the REST response with the current changes? It looks like we're only going to include the generic "Failed to... to/from Kafka" message, or the privileged write failure message.
   
   I think it'd be nice to include more detail on the cause of the failure, but if that's too invasive or difficult to get quite right, it can be left as a follow-up item.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org