You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/09/18 10:09:55 UTC

[GitHub] sijie closed pull request #2591: Fix: Compaction with last deleted keys not completing compaction

sijie closed pull request #2591: Fix: Compaction with last deleted keys not completing compaction
URL: https://github.com/apache/incubator-pulsar/pull/2591
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index cc3f710249..425e04921b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -76,7 +76,7 @@ public TwoPhaseCompactor(ServiceConfiguration conf,
     @Override
     protected CompletableFuture<Long> doCompaction(RawReader reader, BookKeeper bk) {
         return phaseOne(reader).thenCompose(
-                (r) -> phaseTwo(reader, r.from, r.to, r.latestForKey, bk));
+                (r) -> phaseTwo(reader, r.from, r.to, r.lastReadId, r.latestForKey, bk));
     }
 
     private CompletableFuture<PhaseOneResult> phaseOne(RawReader reader) {
@@ -90,7 +90,8 @@ public TwoPhaseCompactor(ServiceConfiguration conf,
                     } else {
                         log.info("Commencing phase one of compaction for {}, reading to {}",
                                  reader.getTopic(), lastMessageId);
-                        phaseOneLoop(reader, Optional.empty(), lastMessageId, latestForKey, loopPromise);
+                        phaseOneLoop(reader, Optional.empty(), Optional.empty(), lastMessageId, latestForKey,
+                                loopPromise);
                     }
                 });
         return loopPromise;
@@ -98,6 +99,7 @@ public TwoPhaseCompactor(ServiceConfiguration conf,
 
     private void phaseOneLoop(RawReader reader,
                               Optional<MessageId> firstMessageId,
+                              Optional<MessageId> toMessageId,
                               MessageId lastMessageId,
                               Map<String,MessageId> latestForKey,
                               CompletableFuture<PhaseOneResult> loopPromise) {
@@ -114,6 +116,7 @@ private void phaseOneLoop(RawReader reader,
                             return;
                         }
                         MessageId id = m.getMessageId();
+                        boolean deletedMessage = false;
                         if (RawBatchConverter.isReadableBatch(m)) {
                             try {
                                 RawBatchConverter.extractIdsAndKeys(m)
@@ -125,16 +128,23 @@ private void phaseOneLoop(RawReader reader,
                         } else {
                             Pair<String,Integer> keyAndSize = extractKeyAndSize(m);
                             if (keyAndSize != null) {
-                                latestForKey.put(keyAndSize.getLeft(), id);
+                                if(keyAndSize.getRight() > 0) {
+                                    latestForKey.put(keyAndSize.getLeft(), id);    
+                                } else {
+                                    deletedMessage = true;
+                                    latestForKey.remove(keyAndSize.getLeft());
+                                }
                             }
                         }
 
+                        MessageId first = firstMessageId.orElse(deletedMessage ? null : id);
+                        MessageId to = deletedMessage ? toMessageId.orElse(null) : id;
                         if (id.compareTo(lastMessageId) == 0) {
-                            loopPromise.complete(new PhaseOneResult(firstMessageId.orElse(id),
-                                                                    id, latestForKey));
+                            loopPromise.complete(new PhaseOneResult(first, to, lastMessageId, latestForKey));
                         } else {
                             phaseOneLoop(reader,
-                                         Optional.of(firstMessageId.orElse(id)),
+                                         Optional.ofNullable(first),
+                                         Optional.ofNullable(to),
                                          lastMessageId,
                                          latestForKey, loopPromise);
                         }
@@ -153,40 +163,38 @@ private void scheduleTimeout(CompletableFuture<RawMessage> future) {
         });
     }
 
-    private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, MessageId to,
-                                             Map<String,MessageId> latestForKey, BookKeeper bk) {
+    private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, MessageId to, MessageId lastReadId,
+            Map<String, MessageId> latestForKey, BookKeeper bk) {
         Map<String, byte[]> metadata = ImmutableMap.of("compactedTopic", reader.getTopic().getBytes(UTF_8),
-                                                       "compactedTo", to.toByteArray());
+                "compactedTo", to.toByteArray());
         return createLedger(bk, metadata).thenCompose((ledger) -> {
-                log.info("Commencing phase two of compaction for {}, from {} to {}, compacting {} keys to ledger {}",
-                         reader.getTopic(), from, to, latestForKey.size(), ledger.getId());
-                return phaseTwoSeekThenLoop(reader, from, to, latestForKey, bk, ledger);
-            });
+            log.info("Commencing phase two of compaction for {}, from {} to {}, compacting {} keys to ledger {}",
+                    reader.getTopic(), from, to, latestForKey.size(), ledger.getId());
+            return phaseTwoSeekThenLoop(reader, from, to, lastReadId, latestForKey, bk, ledger);
+        });
     }
 
     private CompletableFuture<Long> phaseTwoSeekThenLoop(RawReader reader, MessageId from, MessageId to,
-                                                         Map<String, MessageId> latestForKey,
-                                                         BookKeeper bk, LedgerHandle ledger) {
+            MessageId lastReadId, Map<String, MessageId> latestForKey, BookKeeper bk, LedgerHandle ledger) {
         CompletableFuture<Long> promise = new CompletableFuture<>();
 
         reader.seekAsync(from).thenCompose((v) -> {
-                Semaphore outstanding = new Semaphore(MAX_OUTSTANDING);
-                CompletableFuture<Void> loopPromise = new CompletableFuture<Void>();
-                phaseTwoLoop(reader, to, latestForKey, ledger, outstanding, loopPromise);
-                return loopPromise;
-            }).thenCompose((v) -> closeLedger(ledger))
-            .thenCompose((v) -> reader.acknowledgeCumulativeAsync(
-                                 to, ImmutableMap.of(COMPACTED_TOPIC_LEDGER_PROPERTY, ledger.getId())))
-            .whenComplete((res, exception) -> {
+            Semaphore outstanding = new Semaphore(MAX_OUTSTANDING);
+            CompletableFuture<Void> loopPromise = new CompletableFuture<Void>();
+            phaseTwoLoop(reader, to, latestForKey, ledger, outstanding, loopPromise);
+            return loopPromise;
+        }).thenCompose((v) -> closeLedger(ledger))
+                .thenCompose((v) -> reader.acknowledgeCumulativeAsync(lastReadId,
+                        ImmutableMap.of(COMPACTED_TOPIC_LEDGER_PROPERTY, ledger.getId())))
+                .whenComplete((res, exception) -> {
                     if (exception != null) {
-                        deleteLedger(bk, ledger)
-                            .whenComplete((res2, exception2) -> {
-                                    if (exception2 != null) {
-                                        log.warn("Cleanup of ledger {} for failed", ledger, exception2);
-                                    }
-                                    // complete with original exception
-                                    promise.completeExceptionally(exception);
-                                });
+                        deleteLedger(bk, ledger).whenComplete((res2, exception2) -> {
+                            if (exception2 != null) {
+                                log.warn("Cleanup of ledger {} for failed", ledger, exception2);
+                            }
+                            // complete with original exception
+                            promise.completeExceptionally(exception);
+                        });
                     } else {
                         promise.complete(ledger.getId());
                     }
@@ -217,13 +225,23 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId>
                         }
                     } else {
                         Pair<String,Integer> keyAndSize = extractKeyAndSize(m);
+                        MessageId msg;
                         if (keyAndSize == null) { // pass through messages without a key
                             messageToAdd = Optional.of(m);
-                        } else if (latestForKey.get(keyAndSize.getLeft()).equals(id)
-                                   && keyAndSize.getRight() > 0) {
+                        } else if ((msg = latestForKey.get(keyAndSize.getLeft())) != null 
+                                && msg.equals(id)) { // consider message only if present into latestForKey map
+                            if (keyAndSize.getRight() <= 0) {
+                                promise.completeExceptionally(new IllegalArgumentException(
+                                        "Compaction phase found empty record from sorted key-map"));
+                            }
                             messageToAdd = Optional.of(m);
                         } else {
                             m.close();
+                            // Reached to last-id and phase-one found it deleted-message while iterating on ledger so, not
+                            // present under latestForKey. Complete the compaction.
+                            if (to.equals(id)) {
+                                promise.complete(null);
+                            }
                         }
                     }
 
@@ -330,12 +348,14 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId>
 
     private static class PhaseOneResult {
         final MessageId from;
-        final MessageId to;
+        final MessageId to; // last undeleted messageId
+        final MessageId lastReadId; // last read messageId
         final Map<String,MessageId> latestForKey;
 
-        PhaseOneResult(MessageId from, MessageId to, Map<String,MessageId> latestForKey) {
+        PhaseOneResult(MessageId from, MessageId to, MessageId lastReadId, Map<String,MessageId> latestForKey) {
             this.from = from;
             this.to = to;
+            this.lastReadId = lastReadId;
             this.latestForKey = latestForKey;
         }
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 95a10fab21..32c93b4626 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -60,6 +60,9 @@
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.testng.Assert;
+
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -1212,4 +1215,57 @@ public void testEmptyPayloadDeletesWhenEncrypted() throws Exception {
             Assert.assertEquals(new String(message5.getData()), "my-message-4");
         }
     }
+    
+    @Test(timeOut = 20000)
+    public void testCompactionWithLastDeletedKey() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition).create();
+
+        pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
+
+        producer.newMessage().key("1").value("1".getBytes()).send();
+        producer.newMessage().key("2").value("2".getBytes()).send();
+        producer.newMessage().key("3").value("3".getBytes()).send();
+        producer.newMessage().key("1").value("".getBytes()).send();
+        producer.newMessage().key("2").value("".getBytes()).send();
+
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
+        compactor.compact(topic).get();
+
+        Set<String> expected = Sets.newHashSet("3");
+        // consumer with readCompacted enabled only get compacted entries
+        try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
+                .readCompacted(true).subscribe()) {
+            Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS);
+            assertTrue(expected.remove(m.getKey()));
+        }
+    }
+    
+    @Test(timeOut = 20000)
+    public void testEmptyCompactionLedger() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition).create();
+
+        pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
+
+        producer.newMessage().key("1").value("1".getBytes()).send();
+        producer.newMessage().key("2").value("2".getBytes()).send();
+        producer.newMessage().key("1").value("".getBytes()).send();
+        producer.newMessage().key("2").value("".getBytes()).send();
+
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
+        compactor.compact(topic).get();
+
+        // consumer with readCompacted enabled only get compacted entries
+        try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
+                .readCompacted(true).subscribe()) {
+            Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS);
+            assertNull(m);
+        }
+    }
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services