You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/09/18 10:09:57 UTC

[incubator-pulsar] branch master updated: Fix: Compaction with last deleted keys not completing compaction (#2591)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d50617  Fix: Compaction with last deleted keys not completing compaction (#2591)
8d50617 is described below

commit 8d5061779e7a869e6e5981d2010e671ed0afa322
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Tue Sep 18 03:09:50 2018 -0700

    Fix: Compaction with last deleted keys not completing compaction (#2591)
    
    ### Motivation
    
    Right now, topic-compaction ignores the message-id with empty payload but if the last message in the ledger has empty payload then compactor doesn't complete the compaction because compactor ignores last message and doesn't complete the result-future so, caller never sees complete result.
    
    ### Modifications
    
    - Compactor calculates` from` and `to` position for compacted ledger according to last non-deleted active key.
    - Compactor handles tail deleted keys from the ledger and completes compaction process gracefully.
    
    ### Result
    
    compactor can successfully compact ledger whose last message is also deleted.
---
 .../pulsar/compaction/TwoPhaseCompactor.java       | 90 +++++++++++++---------
 .../apache/pulsar/compaction/CompactionTest.java   | 56 ++++++++++++++
 2 files changed, 111 insertions(+), 35 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index cc3f710..425e049 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -76,7 +76,7 @@ public class TwoPhaseCompactor extends Compactor {
     @Override
     protected CompletableFuture<Long> doCompaction(RawReader reader, BookKeeper bk) {
         return phaseOne(reader).thenCompose(
-                (r) -> phaseTwo(reader, r.from, r.to, r.latestForKey, bk));
+                (r) -> phaseTwo(reader, r.from, r.to, r.lastReadId, r.latestForKey, bk));
     }
 
     private CompletableFuture<PhaseOneResult> phaseOne(RawReader reader) {
@@ -90,7 +90,8 @@ public class TwoPhaseCompactor extends Compactor {
                     } else {
                         log.info("Commencing phase one of compaction for {}, reading to {}",
                                  reader.getTopic(), lastMessageId);
-                        phaseOneLoop(reader, Optional.empty(), lastMessageId, latestForKey, loopPromise);
+                        phaseOneLoop(reader, Optional.empty(), Optional.empty(), lastMessageId, latestForKey,
+                                loopPromise);
                     }
                 });
         return loopPromise;
@@ -98,6 +99,7 @@ public class TwoPhaseCompactor extends Compactor {
 
     private void phaseOneLoop(RawReader reader,
                               Optional<MessageId> firstMessageId,
+                              Optional<MessageId> toMessageId,
                               MessageId lastMessageId,
                               Map<String,MessageId> latestForKey,
                               CompletableFuture<PhaseOneResult> loopPromise) {
@@ -114,6 +116,7 @@ public class TwoPhaseCompactor extends Compactor {
                             return;
                         }
                         MessageId id = m.getMessageId();
+                        boolean deletedMessage = false;
                         if (RawBatchConverter.isReadableBatch(m)) {
                             try {
                                 RawBatchConverter.extractIdsAndKeys(m)
@@ -125,16 +128,23 @@ public class TwoPhaseCompactor extends Compactor {
                         } else {
                             Pair<String,Integer> keyAndSize = extractKeyAndSize(m);
                             if (keyAndSize != null) {
-                                latestForKey.put(keyAndSize.getLeft(), id);
+                                if(keyAndSize.getRight() > 0) {
+                                    latestForKey.put(keyAndSize.getLeft(), id);    
+                                } else {
+                                    deletedMessage = true;
+                                    latestForKey.remove(keyAndSize.getLeft());
+                                }
                             }
                         }
 
+                        MessageId first = firstMessageId.orElse(deletedMessage ? null : id);
+                        MessageId to = deletedMessage ? toMessageId.orElse(null) : id;
                         if (id.compareTo(lastMessageId) == 0) {
-                            loopPromise.complete(new PhaseOneResult(firstMessageId.orElse(id),
-                                                                    id, latestForKey));
+                            loopPromise.complete(new PhaseOneResult(first, to, lastMessageId, latestForKey));
                         } else {
                             phaseOneLoop(reader,
-                                         Optional.of(firstMessageId.orElse(id)),
+                                         Optional.ofNullable(first),
+                                         Optional.ofNullable(to),
                                          lastMessageId,
                                          latestForKey, loopPromise);
                         }
@@ -153,40 +163,38 @@ public class TwoPhaseCompactor extends Compactor {
         });
     }
 
-    private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, MessageId to,
-                                             Map<String,MessageId> latestForKey, BookKeeper bk) {
+    private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, MessageId to, MessageId lastReadId,
+            Map<String, MessageId> latestForKey, BookKeeper bk) {
         Map<String, byte[]> metadata = ImmutableMap.of("compactedTopic", reader.getTopic().getBytes(UTF_8),
-                                                       "compactedTo", to.toByteArray());
+                "compactedTo", to.toByteArray());
         return createLedger(bk, metadata).thenCompose((ledger) -> {
-                log.info("Commencing phase two of compaction for {}, from {} to {}, compacting {} keys to ledger {}",
-                         reader.getTopic(), from, to, latestForKey.size(), ledger.getId());
-                return phaseTwoSeekThenLoop(reader, from, to, latestForKey, bk, ledger);
-            });
+            log.info("Commencing phase two of compaction for {}, from {} to {}, compacting {} keys to ledger {}",
+                    reader.getTopic(), from, to, latestForKey.size(), ledger.getId());
+            return phaseTwoSeekThenLoop(reader, from, to, lastReadId, latestForKey, bk, ledger);
+        });
     }
 
     private CompletableFuture<Long> phaseTwoSeekThenLoop(RawReader reader, MessageId from, MessageId to,
-                                                         Map<String, MessageId> latestForKey,
-                                                         BookKeeper bk, LedgerHandle ledger) {
+            MessageId lastReadId, Map<String, MessageId> latestForKey, BookKeeper bk, LedgerHandle ledger) {
         CompletableFuture<Long> promise = new CompletableFuture<>();
 
         reader.seekAsync(from).thenCompose((v) -> {
-                Semaphore outstanding = new Semaphore(MAX_OUTSTANDING);
-                CompletableFuture<Void> loopPromise = new CompletableFuture<Void>();
-                phaseTwoLoop(reader, to, latestForKey, ledger, outstanding, loopPromise);
-                return loopPromise;
-            }).thenCompose((v) -> closeLedger(ledger))
-            .thenCompose((v) -> reader.acknowledgeCumulativeAsync(
-                                 to, ImmutableMap.of(COMPACTED_TOPIC_LEDGER_PROPERTY, ledger.getId())))
-            .whenComplete((res, exception) -> {
+            Semaphore outstanding = new Semaphore(MAX_OUTSTANDING);
+            CompletableFuture<Void> loopPromise = new CompletableFuture<Void>();
+            phaseTwoLoop(reader, to, latestForKey, ledger, outstanding, loopPromise);
+            return loopPromise;
+        }).thenCompose((v) -> closeLedger(ledger))
+                .thenCompose((v) -> reader.acknowledgeCumulativeAsync(lastReadId,
+                        ImmutableMap.of(COMPACTED_TOPIC_LEDGER_PROPERTY, ledger.getId())))
+                .whenComplete((res, exception) -> {
                     if (exception != null) {
-                        deleteLedger(bk, ledger)
-                            .whenComplete((res2, exception2) -> {
-                                    if (exception2 != null) {
-                                        log.warn("Cleanup of ledger {} for failed", ledger, exception2);
-                                    }
-                                    // complete with original exception
-                                    promise.completeExceptionally(exception);
-                                });
+                        deleteLedger(bk, ledger).whenComplete((res2, exception2) -> {
+                            if (exception2 != null) {
+                                log.warn("Cleanup of ledger {} for failed", ledger, exception2);
+                            }
+                            // complete with original exception
+                            promise.completeExceptionally(exception);
+                        });
                     } else {
                         promise.complete(ledger.getId());
                     }
@@ -217,13 +225,23 @@ public class TwoPhaseCompactor extends Compactor {
                         }
                     } else {
                         Pair<String,Integer> keyAndSize = extractKeyAndSize(m);
+                        MessageId msg;
                         if (keyAndSize == null) { // pass through messages without a key
                             messageToAdd = Optional.of(m);
-                        } else if (latestForKey.get(keyAndSize.getLeft()).equals(id)
-                                   && keyAndSize.getRight() > 0) {
+                        } else if ((msg = latestForKey.get(keyAndSize.getLeft())) != null 
+                                && msg.equals(id)) { // consider message only if present into latestForKey map
+                            if (keyAndSize.getRight() <= 0) {
+                                promise.completeExceptionally(new IllegalArgumentException(
+                                        "Compaction phase found empty record from sorted key-map"));
+                            }
                             messageToAdd = Optional.of(m);
                         } else {
                             m.close();
+                            // Reached to last-id and phase-one found it deleted-message while iterating on ledger so, not
+                            // present under latestForKey. Complete the compaction.
+                            if (to.equals(id)) {
+                                promise.complete(null);
+                            }
                         }
                     }
 
@@ -330,12 +348,14 @@ public class TwoPhaseCompactor extends Compactor {
 
     private static class PhaseOneResult {
         final MessageId from;
-        final MessageId to;
+        final MessageId to; // last undeleted messageId
+        final MessageId lastReadId; // last read messageId
         final Map<String,MessageId> latestForKey;
 
-        PhaseOneResult(MessageId from, MessageId to, Map<String,MessageId> latestForKey) {
+        PhaseOneResult(MessageId from, MessageId to, MessageId lastReadId, Map<String,MessageId> latestForKey) {
             this.from = from;
             this.to = to;
+            this.lastReadId = lastReadId;
             this.latestForKey = latestForKey;
         }
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 95a10fa..32c93b4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -60,6 +60,9 @@ import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.testng.Assert;
+
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -1212,4 +1215,57 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
             Assert.assertEquals(new String(message5.getData()), "my-message-4");
         }
     }
+    
+    @Test(timeOut = 20000)
+    public void testCompactionWithLastDeletedKey() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition).create();
+
+        pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
+
+        producer.newMessage().key("1").value("1".getBytes()).send();
+        producer.newMessage().key("2").value("2".getBytes()).send();
+        producer.newMessage().key("3").value("3".getBytes()).send();
+        producer.newMessage().key("1").value("".getBytes()).send();
+        producer.newMessage().key("2").value("".getBytes()).send();
+
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
+        compactor.compact(topic).get();
+
+        Set<String> expected = Sets.newHashSet("3");
+        // consumer with readCompacted enabled only get compacted entries
+        try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
+                .readCompacted(true).subscribe()) {
+            Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS);
+            assertTrue(expected.remove(m.getKey()));
+        }
+    }
+    
+    @Test(timeOut = 20000)
+    public void testEmptyCompactionLedger() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition).create();
+
+        pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
+
+        producer.newMessage().key("1").value("1".getBytes()).send();
+        producer.newMessage().key("2").value("2".getBytes()).send();
+        producer.newMessage().key("1").value("".getBytes()).send();
+        producer.newMessage().key("2").value("".getBytes()).send();
+
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
+        compactor.compact(topic).get();
+
+        // consumer with readCompacted enabled only get compacted entries
+        try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
+                .readCompacted(true).subscribe()) {
+            Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS);
+            assertNull(m);
+        }
+    }
+
 }