You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/02/19 17:12:05 UTC

[GitHub] merlimat closed pull request #1231: Read from compacted topic ledger if available and enabled

merlimat closed pull request #1231: Read from compacted topic ledger if available and enabled
URL: https://github.com/apache/incubator-pulsar/pull/1231
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
index 7371ebcb8..d1c6defee 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
@@ -28,7 +28,7 @@
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.mledger.Entry;
 
-final class EntryImpl extends AbstractReferenceCounted implements Entry, Comparable<EntryImpl>, ReferenceCounted {
+public final class EntryImpl extends AbstractReferenceCounted implements Entry, Comparable<EntryImpl>, ReferenceCounted {
 
     private static final Recycler<EntryImpl> RECYCLER = new Recycler<EntryImpl>() {
         @Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index d51ea0490..7eeb94979 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -52,6 +52,10 @@ public ManagedLedgerFactory getManagedLedgerFactory() {
         return managedLedgerFactory;
     }
 
+    public BookKeeper getBookKeeperClient() {
+        return bkClient;
+    }
+
     public void close() throws IOException {
         try {
             managedLedgerFactory.shutdown();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 9a2b44a74..dee8b5c45 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -33,6 +33,7 @@
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Supplier;
 
+import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
@@ -542,6 +543,10 @@ public BrokerService getBrokerService() {
         return this.brokerService;
     }
 
+    public BookKeeper getBookKeeperClient() {
+        return managedLedgerClientFactory.getBookKeeperClient();
+    }
+
     public ManagedLedgerFactory getManagedLedgerFactory() {
         return managedLedgerClientFactory.getManagedLedgerFactory();
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 4e3fd00b5..323f44952 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -297,7 +297,11 @@ protected void readMoreEntries(Consumer consumer) {
                 log.debug("[{}-{}] Schedule read of {} messages", name, consumer, messagesToRead);
             }
             havePendingRead = true;
-            cursor.asyncReadEntriesOrWait(messagesToRead, this, consumer);
+            if (consumer.readCompacted()) {
+                topic.compactedTopic.asyncReadEntriesOrWait(cursor, messagesToRead, this, consumer);
+            } else {
+                cursor.asyncReadEntriesOrWait(messagesToRead, this, consumer);
+            }
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("[{}-{}] Consumer buffer is full, pause reading", name, consumer);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 20ae52726..53847ec72 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -158,7 +158,7 @@
     public static final int MESSAGE_RATE_BACKOFF_MS = 1000;
 
     private final MessageDeduplication messageDeduplication;
-    private final CompactedTopic compactedTopic;
+    final CompactedTopic compactedTopic;
 
     // Whether messages published must be encrypted or not in this topic
     private volatile boolean isEncryptionRequired = false;
@@ -206,7 +206,7 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
 
         this.dispatchRateLimiter = new DispatchRateLimiter(this);
 
-        this.compactedTopic = new CompactedTopicImpl();
+        this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
 
         for (ManagedCursor cursor : ledger.getCursors()) {
             if (cursor.getName().startsWith(replicatorPrefix)) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
index 284cdbfff..65e78a923 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
@@ -18,8 +18,12 @@
  */
 package org.apache.pulsar.compaction;
 
+import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 
 public interface CompactedTopic {
     void newCompactedLedger(Position p, long compactedLedgerId);
+    void asyncReadEntriesOrWait(ManagedCursor cursor, int numberOfEntriesToRead,
+                                ReadEntriesCallback callback, Object ctx);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 0afccd10b..f36e11f33 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -22,28 +22,87 @@
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.google.common.collect.ComparisonChain;
 
+import io.netty.buffer.ByteBuf;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 
+import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.client.impl.RawMessageImpl;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
 
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class CompactedTopicImpl implements CompactedTopic {
     final static long NEWER_THAN_COMPACTED = -0xfeed0fbaL;
+    final static int DEFAULT_STARTPOINT_CACHE_SIZE = 100;
+
+    private final BookKeeper bk;
+
+    private PositionImpl compactionHorizon = null;
+    private CompletableFuture<CompactedTopicContext> compactedTopicContext = null;
+
+    public CompactedTopicImpl(BookKeeper bk) {
+        this.bk = bk;
+    }
+
+    @Override
+    public void newCompactedLedger(Position p, long compactedLedgerId) {
+        synchronized (this) {
+            compactionHorizon = (PositionImpl)p;
+            compactedTopicContext = openCompactedLedger(bk, compactedLedgerId);
+        }
+    }
 
     @Override
-    public void newCompactedLedger(Position p, long compactedLedgerId) {}
+    public void asyncReadEntriesOrWait(ManagedCursor cursor, int numberOfEntriesToRead,
+                                       ReadEntriesCallback callback, Object ctx) {
+        synchronized (this) {
+            PositionImpl cursorPosition = (PositionImpl) cursor.getReadPosition();
+            if (compactionHorizon == null
+                || compactionHorizon.compareTo(cursorPosition) < 0) {
+                cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, ctx);
+            } else {
+                compactedTopicContext.thenCompose(
+                        (context) -> {
+                            return findStartPoint(cursorPosition, context.ledger.getLastAddConfirmed(), context.cache)
+                                .thenCompose((startPoint) -> {
+                                        if (startPoint == NEWER_THAN_COMPACTED) {
+                                            cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, ctx);
+                                            return CompletableFuture.completedFuture(null);
+                                        } else {
+                                            long endPoint = Math.min(context.ledger.getLastAddConfirmed(),
+                                                                     startPoint + numberOfEntriesToRead);
+                                            return readEntries(context.ledger, startPoint, endPoint)
+                                                .thenAccept((entries) -> {
+                                                        Entry lastEntry = entries.get(entries.size() - 1);
+                                                        cursor.seek(lastEntry.getPosition().getNext());
+                                                        callback.readEntriesComplete(entries, ctx);
+                                                    });
+                                        }
+                                    });
+                                })
+                    .exceptionally((exception) -> {
+                            callback.readEntriesFailed(new ManagedLedgerException(exception), ctx);
+                            return null;
+                        });
+            }
+        }
+    }
 
     static CompletableFuture<Long> findStartPoint(PositionImpl p,
                                                   long lastEntryId,
@@ -107,6 +166,60 @@ private static void findStartPointLoop(PositionImpl p, long start, long end,
         return promise;
     }
 
+    private static CompletableFuture<CompactedTopicContext> openCompactedLedger(BookKeeper bk, long id) {
+        CompletableFuture<LedgerHandle> promise = new CompletableFuture<>();
+        bk.asyncOpenLedger(id,
+                           Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
+                           Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD,
+                           (rc, ledger, ctx) -> {
+                               if (rc != BKException.Code.OK) {
+                                   promise.completeExceptionally(BKException.create(rc));
+                               } else {
+                                   promise.complete(ledger);
+                               }
+                           }, null);
+        return promise.thenApply((ledger) -> new CompactedTopicContext(
+                                         ledger, createCache(ledger, DEFAULT_STARTPOINT_CACHE_SIZE)));
+    }
+
+    private static CompletableFuture<List<Entry>> readEntries(LedgerHandle lh, long from, long to) {
+        CompletableFuture<Enumeration<LedgerEntry>> promise = new CompletableFuture<>();
+
+        lh.asyncReadEntries(from, to,
+                            (rc, _lh, seq, ctx) -> {
+                                if (rc != BKException.Code.OK) {
+                                    promise.completeExceptionally(BKException.create(rc));
+                                } else {
+                                    promise.complete(seq);
+                                }
+                            }, null);
+        return promise.thenApply(
+                (seq) -> {
+                    List<Entry> entries = new ArrayList<Entry>();
+                    while (seq.hasMoreElements()) {
+                        ByteBuf buf = seq.nextElement().getEntryBuffer();
+                        try (RawMessage m = RawMessageImpl.deserializeFrom(buf)) {
+                            entries.add(EntryImpl.create(m.getMessageIdData().getLedgerId(),
+                                                         m.getMessageIdData().getEntryId(),
+                                                         m.getHeadersAndPayload()));
+                        } finally {
+                            buf.release();
+                        }
+                    }
+                    return entries;
+                });
+    }
+
+    static class CompactedTopicContext {
+        final LedgerHandle ledger;
+        final AsyncLoadingCache<Long,MessageIdData> cache;
+
+        CompactedTopicContext(LedgerHandle ledger, AsyncLoadingCache<Long,MessageIdData> cache) {
+            this.ledger = ledger;
+            this.cache = cache;
+        }
+    }
+
     private static int comparePositionAndMessageId(PositionImpl p, MessageIdData m) {
         return ComparisonChain.start()
             .compare(p.getLedgerId(), m.getLedgerId())
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 51a4c0349..e632a47d1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -124,7 +124,7 @@ protected final void internalSetupForStatsTest() throws Exception {
 
     protected final void init() throws Exception {
         mockZookKeeper = createMockZooKeeper();
-        mockBookKeeper = new NonClosableMockBookKeeper(new ClientConfiguration(), mockZookKeeper);
+        mockBookKeeper = createMockBookKeeper(mockZookKeeper);
 
         sameThreadOrderedSafeExecutor = new SameThreadOrderedSafeExecutor();
 
@@ -207,6 +207,10 @@ public static MockZooKeeper createMockZooKeeper() throws Exception {
         return zk;
     }
 
+    public static NonClosableMockBookKeeper createMockBookKeeper(ZooKeeper zookeeper) throws Exception {
+        return new NonClosableMockBookKeeper(new ClientConfiguration(), zookeeper);
+    }
+
     // Prevent the MockBookKeeper instance from being closed when the broker is restarted within a test
     private static class NonClosableMockBookKeeper extends MockBookKeeper {
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index a68165d5f..7212e8701 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
 import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyObject;
@@ -101,6 +102,7 @@ public void setup() throws Exception {
 
         ZooKeeper mockZk = createMockZooKeeper();
         doReturn(mockZk).when(pulsar).getZkClient();
+        doReturn(createMockBookKeeper(mockZk)).when(pulsar).getBookKeeperClient();
 
         configCacheService = mock(ConfigurationCacheService.class);
         @SuppressWarnings("unchecked")
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index f76bd9074..c01cc3485 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
 import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static org.mockito.Matchers.any;
@@ -142,6 +143,7 @@ public void setup() throws Exception {
 
         ZooKeeper mockZk = createMockZooKeeper();
         doReturn(mockZk).when(pulsar).getZkClient();
+        doReturn(createMockBookKeeper(mockZk)).when(pulsar).getBookKeeperClient();
 
         configCacheService = mock(ConfigurationCacheService.class);
         @SuppressWarnings("unchecked")
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 0a5c154cc..6c82e68fa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
 import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static org.mockito.Matchers.any;
@@ -153,6 +154,7 @@ public void setup() throws Exception {
 
         ZooKeeper mockZk = createMockZooKeeper();
         doReturn(mockZk).when(pulsar).getZkClient();
+        doReturn(createMockBookKeeper(mockZk)).when(pulsar).getBookKeeperClient();
 
         configCacheService = mock(ConfigurationCacheService.class);
         @SuppressWarnings("unchecked")
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
new file mode 100644
index 000000000..300184111
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PropertyAdmin;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class CompactionTest extends MockedPulsarServiceBaseTest {
+    private static final Logger log = LoggerFactory.getLogger(CompactionTest.class);
+
+    private ScheduledExecutorService compactionScheduler;
+    private BookKeeper bk;
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        super.internalSetup();
+
+        admin.clusters().createCluster("use",
+                new ClusterData("http://127.0.0.1:" + BROKER_WEBSERVICE_PORT));
+        admin.properties().createProperty("my-property",
+                new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
+        admin.namespaces().createNamespace("my-property/use/my-ns");
+
+        compactionScheduler = Executors.newSingleThreadScheduledExecutor(
+                new ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
+        bk = pulsar.getBookKeeperClientFactory().create(this.conf, null);
+    }
+
+    @AfterMethod
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+
+        compactionScheduler.shutdownNow();
+    }
+
+    @Test
+    public void testCompaction() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+        final int numMessages = 20;
+        final int maxKeys = 10;
+
+        ProducerConfiguration producerConf = new ProducerConfiguration();
+        Producer producer = pulsarClient.createProducer(topic, producerConf);
+
+        Map<String, byte[]> expected = new HashMap<>();
+        List<Pair<String,byte[]>> all = new ArrayList<>();
+        Random r = new Random(0);
+
+        ConsumerConfiguration consumerConf = new ConsumerConfiguration().setReadCompacted(true);
+        pulsarClient.subscribe(topic, "sub1", consumerConf).close();
+
+        for (int j = 0; j < numMessages; j++) {
+            int keyIndex = r.nextInt(maxKeys);
+            String key = "key"+keyIndex;
+            byte[] data = ("my-message-" + key + "-" + j).getBytes();
+            producer.send(MessageBuilder.create()
+                          .setKey(key)
+                          .setContent(data).build());
+            expected.put(key, data);
+            all.add(Pair.of(key, data));
+        }
+
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
+        compactor.compact(topic).get();
+
+        // consumer with readCompacted enabled only get compacted entries
+        try (Consumer consumer = pulsarClient.subscribe(topic, "sub1", consumerConf)) {
+            while (true) {
+                Message m = consumer.receive(2, TimeUnit.SECONDS);
+                Assert.assertEquals(expected.remove(m.getKey()), m.getData());
+                if (expected.isEmpty()) {
+                    break;
+                }
+            }
+            Assert.assertTrue(expected.isEmpty());
+        }
+
+        // can get full backlog if read compacted disabled
+        try (Consumer consumer = pulsarClient.subscribe(topic, "sub1", consumerConf.setReadCompacted(false))) {
+            while (true) {
+                Message m = consumer.receive(2, TimeUnit.SECONDS);
+                Pair<String,byte[]> expectedMessage = all.remove(0);
+                Assert.assertEquals(expectedMessage.getLeft(), m.getKey());
+                Assert.assertEquals(expectedMessage.getRight(), m.getData());
+                if (all.isEmpty()) {
+                    break;
+                }
+            }
+            Assert.assertTrue(all.isEmpty());
+        }
+    }
+
+    @Test
+    public void testReadCompactedBeforeCompaction() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+        ProducerConfiguration producerConf = new ProducerConfiguration();
+        Producer producer = pulsarClient.createProducer(topic, producerConf);
+
+        ConsumerConfiguration consumerConf = new ConsumerConfiguration().setReadCompacted(true);
+        pulsarClient.subscribe(topic, "sub1", consumerConf).close();
+
+        producer.send(MessageBuilder.create().setKey("key0").setContent("content0".getBytes()).build());
+        producer.send(MessageBuilder.create().setKey("key0").setContent("content1".getBytes()).build());
+        producer.send(MessageBuilder.create().setKey("key0").setContent("content2".getBytes()).build());
+
+        try (Consumer consumer = pulsarClient.subscribe(topic, "sub1", consumerConf)) {
+            Message m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getData(), "content0".getBytes());
+
+            m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getData(), "content1".getBytes());
+
+            m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getData(), "content2".getBytes());
+        }
+
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
+        compactor.compact(topic).get();
+
+        try (Consumer consumer = pulsarClient.subscribe(topic, "sub1", consumerConf)) {
+            Message m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getData(), "content2".getBytes());
+        }
+    }
+
+    @Test
+    public void testReadEntriesAfterCompaction() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+        ProducerConfiguration producerConf = new ProducerConfiguration();
+        Producer producer = pulsarClient.createProducer(topic, producerConf);
+
+        ConsumerConfiguration consumerConf = new ConsumerConfiguration().setReadCompacted(true);
+        pulsarClient.subscribe(topic, "sub1", consumerConf).close();
+
+        producer.send(MessageBuilder.create().setKey("key0").setContent("content0".getBytes()).build());
+        producer.send(MessageBuilder.create().setKey("key0").setContent("content1".getBytes()).build());
+        producer.send(MessageBuilder.create().setKey("key0").setContent("content2".getBytes()).build());
+
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
+        compactor.compact(topic).get();
+
+        producer.send(MessageBuilder.create().setKey("key0").setContent("content3".getBytes()).build());
+
+        try (Consumer consumer = pulsarClient.subscribe(topic, "sub1", consumerConf)) {
+            Message m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getData(), "content2".getBytes());
+
+            m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getData(), "content3".getBytes());
+        }
+    }
+
+    @Test
+    public void testSeekEarliestAfterCompaction() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+        ProducerConfiguration producerConf = new ProducerConfiguration();
+        Producer producer = pulsarClient.createProducer(topic, producerConf);
+
+        ConsumerConfiguration consumerConf = new ConsumerConfiguration().setReadCompacted(true);
+
+        producer.send(MessageBuilder.create().setKey("key0").setContent("content0".getBytes()).build());
+        producer.send(MessageBuilder.create().setKey("key0").setContent("content1".getBytes()).build());
+        producer.send(MessageBuilder.create().setKey("key0").setContent("content2".getBytes()).build());
+
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
+        compactor.compact(topic).get();
+
+        try (Consumer consumer = pulsarClient.subscribe(topic, "sub1", consumerConf)) {
+            consumer.seek(MessageId.earliest);
+            Message m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getData(), "content2".getBytes());
+        }
+
+        try (Consumer consumer = pulsarClient.subscribe(topic, "sub1", consumerConf.setReadCompacted(false))) {
+            consumer.seek(MessageId.earliest);
+
+            Message m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getData(), "content0".getBytes());
+
+            m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getData(), "content1".getBytes());
+
+            m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getData(), "content2".getBytes());
+        }
+    }
+
+    @Test
+    public void testBrokerRestartAfterCompaction() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+        ProducerConfiguration producerConf = new ProducerConfiguration();
+        Producer producer = pulsarClient.createProducer(topic, producerConf);
+
+        ConsumerConfiguration consumerConf = new ConsumerConfiguration().setReadCompacted(true);
+        pulsarClient.subscribe(topic, "sub1", consumerConf).close();
+
+        producer.send(MessageBuilder.create().setKey("key0").setContent("content0".getBytes()).build());
+        producer.send(MessageBuilder.create().setKey("key0").setContent("content1".getBytes()).build());
+        producer.send(MessageBuilder.create().setKey("key0").setContent("content2".getBytes()).build());
+
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
+        compactor.compact(topic).get();
+
+        try (Consumer consumer = pulsarClient.subscribe(topic, "sub1", consumerConf)) {
+            Message m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getData(), "content2".getBytes());
+        }
+
+        stopBroker();
+        try (Consumer consumer = pulsarClient.subscribe(topic, "sub1", consumerConf)) {
+            consumer.receive();
+            Assert.fail("Shouldn't have been able to receive anything");
+        } catch (PulsarClientException e) {
+            // correct behaviour
+        }
+        startBroker();
+
+        try (Consumer consumer = pulsarClient.subscribe(topic, "sub1", consumerConf)) {
+            Message m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getData(), "content2".getBytes());
+        }
+    }
+
+    @Test
+    public void testCompactEmptyTopic() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+        ProducerConfiguration producerConf = new ProducerConfiguration();
+        Producer producer = pulsarClient.createProducer(topic, producerConf);
+
+        ConsumerConfiguration consumerConf = new ConsumerConfiguration().setReadCompacted(true);
+        pulsarClient.subscribe(topic, "sub1", consumerConf).close();
+
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
+
+        producer.send(MessageBuilder.create().setKey("key0").setContent("content0".getBytes()).build());
+
+        try (Consumer consumer = pulsarClient.subscribe(topic, "sub1", consumerConf)) {
+            Message m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getData(), "content0".getBytes());
+        }
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services