You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/09/08 15:48:00 UTC

[GitHub] [kafka] ccding commented on a change in pull request #11033: KAFKA-12988 Asynchronous API support for RemoteLogMetadataManager add/update methods.

ccding commented on a change in pull request #11033:
URL: https://github.com/apache/kafka/pull/11033#discussion_r704546511



##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ProducerManager.java
##########
@@ -50,37 +51,38 @@ public ProducerManager(TopicBasedRemoteLogMetadataManagerConfig rlmmConfig,
         topicPartitioner = rlmmTopicPartitioner;
     }
 
-    public RecordMetadata publishMessage(RemoteLogMetadata remoteLogMetadata) throws KafkaException {
+    public CompletableFuture<RecordMetadata> publishMessage(RemoteLogMetadata remoteLogMetadata) {
+        CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
+
         TopicIdPartition topicIdPartition = remoteLogMetadata.topicIdPartition();
         int metadataPartitionNum = topicPartitioner.metadataPartition(topicIdPartition);
         log.debug("Publishing metadata message of partition:[{}] into metadata topic partition:[{}] with payload: [{}]",
-                topicIdPartition, metadataPartitionNum, remoteLogMetadata);
+                  topicIdPartition, metadataPartitionNum, remoteLogMetadata);
         if (metadataPartitionNum >= rlmmConfig.metadataTopicPartitionsCount()) {
             // This should never occur as long as metadata partitions always remain the same.
             throw new KafkaException("Chosen partition no " + metadataPartitionNum +
                                              " must be less than the partition count: " + rlmmConfig.metadataTopicPartitionsCount());
         }
 
-        ProducerCallback callback = new ProducerCallback();
         try {
+            Callback callback = new Callback() {
+                @Override
+                public void onCompletion(RecordMetadata metadata,
+                                         Exception exception) {
+                    if (exception != null) {
+                        future.completeExceptionally(exception);
+                    } else {
+                        future.complete(metadata);
+                    }
+                }
+            };
             producer.send(new ProducerRecord<>(rlmmConfig.remoteLogMetadataTopicName(), metadataPartitionNum, null,
-                    serde.serialize(remoteLogMetadata)), callback).get();
-        } catch (KafkaException e) {
-            throw e;
-        } catch (Exception e) {
-            throw new KafkaException("Exception occurred while publishing message for topicIdPartition: " + topicIdPartition, e);
+                                               serde.serialize(remoteLogMetadata)), callback);
+        } catch (Exception ex) {
+            future.completeExceptionally(ex);
         }

Review comment:
       do we want to remove printing the topic id in the exception?

##########
File path: storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
##########
@@ -62,16 +63,17 @@
      * @param remoteLogSegmentMetadata metadata about the remote log segment.
      * @throws RemoteStorageException   if there are any storage related errors occurred.
      * @throws IllegalArgumentException if the given metadata instance does not have the state as {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}
+     * @return a CompletableFuture which will complete once this operation is finished.
      */
-    void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;
+    CompletableFuture<Void> addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;
 
     /**
-     * This method is used to update the {@link RemoteLogSegmentMetadata}. Currently, it allows to update with the new
+     * This method is used to update the {@link RemoteLogSegmentMetadata} asynchronously. Currently, it allows to update with the new
      * state based on the life cycle of the segment. It can go through the below state transitions.
      * <p>
      * <pre>
      * +---------------------+            +----------------------+
-     * |COPY_SEGMENT_STARTED |-----------&gt;|COPY_SEGMENT_FINISHED |
+     * |COPY_SEGMENT_STARTED |----------->|COPY_SEGMENT_FINISHED |

Review comment:
       Can you verify if this could result in a correct HTML doc?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org