You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "mumrah (via GitHub)" <gi...@apache.org> on 2023/03/28 14:53:04 UTC

[GitHub] [kafka] mumrah commented on a diff in pull request #13462: KAFKA-14857: Fix some MetadataLoader bugs

mumrah commented on code in PR #13462:
URL: https://github.com/apache/kafka/pull/13462#discussion_r1150744092


##########
metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java:
##########
@@ -212,38 +216,67 @@ private MetadataLoader(
         this.uninitializedPublishers = new LinkedHashMap<>();
         this.publishers = new LinkedHashMap<>();
         this.image = MetadataImage.EMPTY;
-        this.eventQueue = new KafkaEventQueue(time, logContext,
+        this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext,
                 threadNamePrefix + "metadata-loader-",
                 new ShutdownEvent());
     }
 
-    private boolean stillNeedToCatchUp(long offset) {
+    private boolean stillNeedToCatchUp(String where, long offset) {
         if (!catchingUp) {
-            log.trace("We are not in the initial catching up state.");
+            log.trace("{}: we are not in the initial catching up state.", where);
             return false;
         }
         OptionalLong highWaterMark = highWaterMarkAccessor.get();
         if (!highWaterMark.isPresent()) {
-            log.info("The loader is still catching up because we still don't know the high " +
-                    "water mark yet.");
+            log.info("{}: the loader is still catching up because we still don't know the high " +
+                    "water mark yet.", where);
             return true;
         }
         if (highWaterMark.getAsLong() > offset) {
-            log.info("The loader is still catching up because we have loaded up to offset " +
-                    offset + ", but the high water mark is " + highWaterMark.getAsLong());
+            log.info("{}: The loader is still catching up because we have loaded up to offset " +
+                    offset + ", but the high water mark is {}", where, highWaterMark.getAsLong());
             return true;
         }
-        log.info("The loader finished catch up to the current high water mark of " +
-                highWaterMark.getAsLong());
+        log.info("{}: The loader finished catching up to the current high water mark of {}",
+                where, highWaterMark.getAsLong());
         catchingUp = true;
         return false;
     }
 
-    private void maybeInitializeNewPublishers() {
+    /**
+     * Schedule an event to initialize the new publishers that are present in the system.
+     *
+     * @param delayNs   The minimum time in nanoseconds we should wait. If there is already an
+     *                  initialization event scheduled, we will either move its deadline forward
+     *                  in time or leave it unchanged.
+     */
+    void scheduleInitializeNewPublishers(long delayNs) {
+        eventQueue.scheduleDeferred(INITIALIZE_NEW_PUBLISHERS,
+            new EventQueue.EarliestDeadlineFunction(Time.SYSTEM.nanoseconds() + delayNs),
+            () -> {
+                try {
+                    initializeNewPublishers();
+                } catch (Throwable e) {
+                    faultHandler.handleFault("Unhandled error initializing new publishers", e);
+                }
+            });
+    }
+
+    void initializeNewPublishers() {
         if (uninitializedPublishers.isEmpty()) {
-            log.trace("There are no uninitialized publishers to initialize.");
+            log.debug("InitializeNewPublishers: nothing to do.");
+            return;
+        }
+        if (stillNeedToCatchUp("initializeNewPublishers", image.highestOffsetAndEpoch().offset())) {
+            // Reschedule the initialization for later.
+            log.debug("InitializeNewPublishers: unable to initialize new publisher(s) {} " +
+                            "because we are still catching up with quorum metadata. Rescheduling.",
+                    uninitializedPublisherNames());
+            scheduleInitializeNewPublishers(TimeUnit.MILLISECONDS.toNanos(1));

Review Comment:
   The high-watermark accessor is reading the HW from RaftClient. Is there a case where we would be waiting on fetches from RaftClient before we had sufficient logs to be caught up? I'm wondering if 1ms is too tight for cases where we're waiting on some network requests.



##########
metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java:
##########
@@ -256,23 +289,26 @@ private void maybeInitializeNewPublishers() {
                 image.provenance(),
                 time.nanoseconds() - startNs);
         for (Iterator<MetadataPublisher> iter = uninitializedPublishers.values().iterator();
-                iter.hasNext(); ) {
+                 iter.hasNext(); ) {

Review Comment:
   nit: is this the right indentation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org