You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by "heesung-sn (via GitHub)" <gi...@apache.org> on 2023/02/17 22:47:25 UTC

[GitHub] [pulsar] heesung-sn commented on a diff in pull request #19546: [improve][broker] PIP-192 Added Disabled, Deleted, and Init states in ServiceUnitState

heesung-sn commented on code in PR #19546:
URL: https://github.com/apache/pulsar/pull/19546#discussion_r1110428037


##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java:
##########
@@ -301,6 +304,34 @@ private <T> void phaseOneLoop(Reader<T> reader, CompletableFuture<PhaseOneResult
 
     }
 
+    private <T> void waitForReconnection(Reader<T> reader) {
+        long started = System.currentTimeMillis();
+
+        // initial sleep
+        try {
+            Thread.sleep(100);
+        } catch (InterruptedException e) {
+        }
+        while (!reader.isConnected()) {
+            long now = System.currentTimeMillis();
+            if (now - started > MAX_READER_RECONNECT_WAITING_TIME_IN_MILLIS) {
+                String errorMsg = String.format(
+                        "Reader has not been reconnected for %d secs. Stopping the compaction.",
+                        MAX_READER_RECONNECT_WAITING_TIME_IN_MILLIS / 1000);
+                log.error(errorMsg);
+                throw new RuntimeException(errorMsg);
+            }
+            log.warn(
+                    "Reader has not been reconnected after the cursor reset. elapsed :{} ms. Retrying "
+                            + "soon.", now - started);
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                log.warn("The thread got interrupted while waiting. continuing", e);
+            }
+        }
+    }
+

Review Comment:
   When `seek()` resets the cursor, this reader will be temporarily disconnected. 
   
   Then, when calling `acknowledgeCumulativeAsync()` at the end of the compaction(below code), the reader might throw an exception because state == Connecting. This issue could likely happen if there is only one message to compact.
   
   ```
                   .thenCompose(v -> {
                       log.info("Acking ledger id {}", phaseOneResult.firstId);
                       return ((CompactionReaderImpl<T>) reader)
                               .acknowledgeCumulativeAsync(
                                       phaseOneResult.lastId, Map.of(COMPACTED_TOPIC_LEDGER_PROPERTY,
                                               ledger.getId()));
                   })
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org