You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/09/17 08:15:53 UTC

[GitHub] ivankelly commented on a change in pull request #2591: Fix: Compaction with last deleted keys not completing compaction

ivankelly commented on a change in pull request #2591: Fix: Compaction with last deleted keys not completing compaction
URL: https://github.com/apache/incubator-pulsar/pull/2591#discussion_r217978740
 
 

 ##########
 File path: pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
 ##########
 @@ -153,40 +163,42 @@ private void scheduleTimeout(CompletableFuture<RawMessage> future) {
         });
     }
 
-    private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, MessageId to,
-                                             Map<String,MessageId> latestForKey, BookKeeper bk) {
+    private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, MessageId to, MessageId lastReadId,
+            Map<String, MessageId> latestForKey, BookKeeper bk) {
         Map<String, byte[]> metadata = ImmutableMap.of("compactedTopic", reader.getTopic().getBytes(UTF_8),
-                                                       "compactedTo", to.toByteArray());
+                "compactedTo", to.toByteArray());
         return createLedger(bk, metadata).thenCompose((ledger) -> {
-                log.info("Commencing phase two of compaction for {}, from {} to {}, compacting {} keys to ledger {}",
-                         reader.getTopic(), from, to, latestForKey.size(), ledger.getId());
-                return phaseTwoSeekThenLoop(reader, from, to, latestForKey, bk, ledger);
-            });
+            log.info("Commencing phase two of compaction for {}, from {} to {}, compacting {} keys to ledger {}",
+                    reader.getTopic(), from, to, latestForKey.size(), ledger.getId());
+            return phaseTwoSeekThenLoop(reader, from, to, lastReadId, latestForKey, bk, ledger);
+        });
     }
 
     private CompletableFuture<Long> phaseTwoSeekThenLoop(RawReader reader, MessageId from, MessageId to,
-                                                         Map<String, MessageId> latestForKey,
-                                                         BookKeeper bk, LedgerHandle ledger) {
+            MessageId lastReadId, Map<String, MessageId> latestForKey, BookKeeper bk, LedgerHandle ledger) {
         CompletableFuture<Long> promise = new CompletableFuture<>();
 
-        reader.seekAsync(from).thenCompose((v) -> {
-                Semaphore outstanding = new Semaphore(MAX_OUTSTANDING);
-                CompletableFuture<Void> loopPromise = new CompletableFuture<Void>();
-                phaseTwoLoop(reader, to, latestForKey, ledger, outstanding, loopPromise);
-                return loopPromise;
-            }).thenCompose((v) -> closeLedger(ledger))
-            .thenCompose((v) -> reader.acknowledgeCumulativeAsync(
-                                 to, ImmutableMap.of(COMPACTED_TOPIC_LEDGER_PROPERTY, ledger.getId())))
-            .whenComplete((res, exception) -> {
+        boolean emptyCompactedLedger = to == null;
 
 Review comment:
   Add a test that will trigger this

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services