You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/11/18 06:27:25 UTC

[pulsar] 02/15: ### Motivation (#12698)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d040942938b318cf34595503abe421f09ca2adaf
Author: lipenghui <pe...@apache.org>
AuthorDate: Wed Nov 10 15:58:51 2021 +0800

    ### Motivation (#12698)
    
    Fix lost compaction data due to compaction properties missed during reset-cursor.
    
    1. The compaction reader will seek to the earliest position to read data from the topic, but the compaction properties missed during the cursor reset, this will lead to the inited compaction subscribe without compaction horizon, so the compaction reader will skip the last compacted data. It will only happen when init the compaction subscription, so can introduced by the loadbalance or topic unloading manually.
    
    2. Advance the cursor should also keep the properties, otherwise, the properties will lost during the cursor trimming.
    
    ### Changes
    
    1. Keep the properties for resetting the cursor while the cursor is for data compaction.
    2. Copy the properties to the new mark delete entry while advance the cursor, this is triggered byt the managed ledger internal, so it's not only for compacted topic, the internal task should not loss the properties when trimming the cursor.
    
    ### Tests
    
    New tests added to make sure the compaction will not loss data during topic unloading and the reader can read all the compacted data after the compaction task complete
    
    (cherry picked from commit 98e2c66a7b2d5fd42641527cd9ad6c3b497d65c7)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 10 +++-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  2 +-
 .../broker/intercept/CounterBrokerInterceptor.java |  8 +--
 .../pulsar/compaction/CompactedTopicTest.java      | 70 +++++++++++++++++++++-
 4 files changed, 82 insertions(+), 8 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 8e794df..b622e55 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -194,6 +194,7 @@ public class ManagedCursorImpl implements ManagedCursor {
     private long entriesReadCount;
     private long entriesReadSize;
     private int individualDeletedMessagesSerializedSize;
+    private static final String COMPACTION_CURSOR_NAME = "__compaction";
 
     class MarkDeleteEntry {
         final PositionImpl newPosition;
@@ -1068,7 +1069,8 @@ public class ManagedCursorImpl implements ManagedCursor {
                                 Range.closedOpen(markDeletePosition, newMarkDeletePosition)));
                     }
                     markDeletePosition = newMarkDeletePosition;
-                    lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, Collections.emptyMap(),
+                    lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, isCompactionCursor() ?
+                            getProperties() : Collections.emptyMap(),
                             null, null);
                     individualDeletedMessages.clear();
                     if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
@@ -1118,7 +1120,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
         };
 
-        internalAsyncMarkDelete(newPosition, Collections.emptyMap(), new MarkDeleteCallback() {
+        internalAsyncMarkDelete(newPosition, isCompactionCursor() ? getProperties() : Collections.emptyMap(), new MarkDeleteCallback() {
             @Override
             public void markDeleteComplete(Object ctx) {
                 finalCallback.operationComplete();
@@ -3072,5 +3074,9 @@ public class ManagedCursorImpl implements ManagedCursor {
         return isReadPositionOnTail || isReadPositionChanged;
     }
 
+    private boolean isCompactionCursor() {
+        return COMPACTION_CURSOR_NAME.equals(name);
+    }
+
     private static final Logger log = LoggerFactory.getLogger(ManagedCursorImpl.class);
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index fa62ebe..731c6d8 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2545,7 +2545,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             if (highestPositionToDelete.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) > 0
                     && highestPositionToDelete.compareTo((PositionImpl) cursor.getManagedLedger().getLastConfirmedEntry()) <= 0
                     && !(!cursor.isDurable() && cursor instanceof NonDurableCursorImpl && ((NonDurableCursorImpl) cursor).isReadCompacted())) {
-                cursor.asyncMarkDelete(highestPositionToDelete, new MarkDeleteCallback() {
+                cursor.asyncMarkDelete(highestPositionToDelete, cursor.getProperties(), new MarkDeleteCallback() {
                     @Override
                     public void markDeleteComplete(Object ctx) {
                     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
index dc51c3d..1462cfa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
@@ -56,14 +56,14 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
                                   Entry entry,
                                   long[] ackSet,
                                   MessageMetadata msgMetadata) {
-        log.info("Send message to topic {}, subscription {}",
+        log.debug("Send message to topic {}, subscription {}",
             subscription.getTopic(), subscription.getName());
         beforeSendCount++;
     }
 
     @Override
     public void onPulsarCommand(BaseCommand command, ServerCnx cnx) {
-        log.info("[{}] On [{}] Pulsar command", count, command.getType().name());
+        log.debug("[{}] On [{}] Pulsar command", count, command.getType().name());
         count ++;
     }
 
@@ -75,13 +75,13 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
     @Override
     public void onWebserviceRequest(ServletRequest request) {
         count ++;
-        log.info("[{}] On [{}] Webservice request", count, ((HttpServletRequest)request).getRequestURL().toString());
+        log.debug("[{}] On [{}] Webservice request", count, ((HttpServletRequest)request).getRequestURL().toString());
     }
 
     @Override
     public void onWebserviceResponse(ServletRequest request, ServletResponse response) {
         count ++;
-        log.info("[{}] On [{}] Webservice response {}", count, ((HttpServletRequest)request).getRequestURL().toString(), response);
+        log.debug("[{}] On [{}] Webservice response {}", count, ((HttpServletRequest)request).getRequestURL().toString(), response);
         if (response instanceof Response) {
             Response res = (Response) response;
             responseList.add(new ResponseEvent(res.getHttpChannel().getRequest().getRequestURI(), res.getStatus()));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index cbe7372..69d66d9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -33,6 +33,7 @@ import java.util.Optional;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.IntStream;
 
@@ -577,9 +578,76 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
         // The reader should read all 600 keys
         int received = 0;
         while (reader.hasMessageAvailable()) {
-            System.out.println(reader.readNext().getKey());
+            reader.readNext();
             received++;
         }
         Assert.assertEquals(received, keys * 3);
+        reader.close();
+        producer.close();
+    }
+
+    @Test(timeOut = 120000)
+    public void testCompactionWithTopicUnloading() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/testCompactionWithTopicUnloading-" +
+                UUID.randomUUID();
+        final int numMessages = 2000;
+        final int keys = 500;
+        final String msg = "Test";
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .blockIfQueueFull(true)
+                .maxPendingMessages(numMessages)
+                .enableBatching(false)
+                .create();
+        CompletableFuture<MessageId> lastMessage = null;
+        for (int i = 0; i < numMessages; ++i) {
+            lastMessage = producer.newMessage().key(i % keys + "").value(msg).sendAsync();
+        }
+        producer.flush();
+        lastMessage.join();
+        admin.topics().triggerCompaction(topic);
+        Awaitility.await().pollInterval(5, TimeUnit.SECONDS).untilAsserted(() -> {
+            PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic);
+            Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1);
+            Assert.assertEquals(stats.compactedLedger.entries, keys);
+            Assert.assertEquals(admin.topics().getStats(topic)
+                    .getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0);
+        });
+
+        admin.topics().unload(topic);
+        for (int i = 0; i < numMessages; ++i) {
+            lastMessage = producer.newMessage().key((i % keys + keys) + "").value(msg).sendAsync();
+        }
+        producer.flush();
+        lastMessage.join();
+        admin.topics().triggerCompaction(topic);
+        Thread.sleep(100);
+        admin.topics().unload(topic);
+        admin.topics().triggerCompaction(topic);
+        Awaitility.await().pollInterval(3, TimeUnit.SECONDS).atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
+            PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic);
+            Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1);
+            Assert.assertEquals(stats.compactedLedger.entries, keys * 2);
+            Assert.assertEquals(admin.topics().getStats(topic)
+                    .getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0);
+        });
+
+        // Start a new reader to reading messages
+        Reader<String> reader = pulsarClient.newReader(Schema.STRING)
+                .topic(topic)
+                .startMessageId(MessageId.earliest)
+                .readCompacted(true)
+                .receiverQueueSize(10)
+                .create();
+
+        // The reader should read all 600 keys
+        int received = 0;
+        while (reader.hasMessageAvailable()) {
+            reader.readNext();
+            received++;
+        }
+        Assert.assertEquals(received, keys * 2);
+        reader.close();
+        producer.close();
     }
 }