You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/03/07 18:25:29 UTC
[incubator-pulsar] branch master updated: CompactedTopic should
seek to position of cursor, not next position (#1336)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new dad679a CompactedTopic should seek to position of cursor, not next position (#1336)
dad679a is described below
commit dad679ae1392bef4be67c654bcb7d19de79be266
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Wed Mar 7 19:25:26 2018 +0100
CompactedTopic should seek to position of cursor, not next position (#1336)
* CompactedTopic should seek to position of cursor, not next position
When finding the first ledger entry to read from while reading from a
compacted topic ledger, we should find the position represented by the
cursor read position, not the next position after as has been done
until now.
This bug was due to a misunderstanding of how the read position
works. It was assumed it worked the same as the mark-delete position,
which is not the case.
* Fix clash on message cleanup
---
.../pulsar/compaction/CompactedTopicImpl.java | 6 +--
.../pulsar/compaction/CompactedTopicTest.java | 27 ++++++------
.../apache/pulsar/compaction/CompactionTest.java | 48 ++++++++++++++++++++++
3 files changed, 63 insertions(+), 18 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index e70381e..b1378b6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -133,11 +133,11 @@ public class CompactedTopicImpl implements CompactedTopic {
CompletableFuture.allOf(startEntry, middleEntry, endEntry).thenRun(
() -> {
- if (comparePositionAndMessageId(p, startEntry.join()) < 0) {
+ if (comparePositionAndMessageId(p, startEntry.join()) <= 0) {
promise.complete(start);
- } else if (comparePositionAndMessageId(p, middleEntry.join()) < 0) {
+ } else if (comparePositionAndMessageId(p, middleEntry.join()) <= 0) {
findStartPointLoop(p, start, midpoint, promise, cache);
- } else if (comparePositionAndMessageId(p, endEntry.join()) < 0) {
+ } else if (comparePositionAndMessageId(p, endEntry.join()) <= 0) {
findStartPointLoop(p, midpoint + 1, end, promise, cache);
} else {
promise.complete(NEWER_THAN_COMPACTED);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index eb549fa..69d9cfb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -52,7 +52,7 @@ import io.netty.buffer.Unpooled;
import lombok.Cleanup;
public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
- private static final ByteBuf emptyBuffer = Unpooled.buffer(0);
+ private final Random r = new Random(0);
@BeforeMethod
@Override
@@ -77,9 +77,8 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
* entries in the ledger, and a list of gaps, and the entry which should be returned after the gap.
*/
private Triple<Long, List<Pair<MessageIdData,Long>>, List<Pair<MessageIdData,Long>>>
- buildCompactedLedger(BookKeeper bk, int seed, int count)
+ buildCompactedLedger(BookKeeper bk, int count)
throws Exception {
- Random r = new Random(seed);
LedgerHandle lh = bk.createLedger(1, 1,
Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
@@ -112,10 +111,12 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
.setEntryId(entryIds.addAndGet(delta + 1)).build();
@Cleanup
- RawMessage m = new RawMessageImpl(id, emptyBuffer);
+ RawMessage m = new RawMessageImpl(id, Unpooled.EMPTY_BUFFER);
CompletableFuture<Void> f = new CompletableFuture<>();
- lh.asyncAddEntry(m.serialize(),
+ ByteBuf buffer = m.serialize();
+
+ lh.asyncAddEntry(buffer,
(rc, ledger, eid, ctx) -> {
if (rc != BKException.Code.OK) {
f.completeExceptionally(BKException.create(rc));
@@ -125,6 +126,7 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
f.complete(null);
}
}, null);
+ buffer.release();
return f;
}).toArray(CompletableFuture[]::new)).get();
lh.close();
@@ -138,7 +140,7 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
this.conf, null);
Triple<Long, List<Pair<MessageIdData, Long>>, List<Pair<MessageIdData, Long>>> compactedLedgerData
- = buildCompactedLedger(bk, 0, 500);
+ = buildCompactedLedger(bk, 500);
List<Pair<MessageIdData, Long>> positions = compactedLedgerData.getMiddle();
List<Pair<MessageIdData, Long>> idsInGaps = compactedLedgerData.getRight();
@@ -170,19 +172,14 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
Long.valueOf(CompactedTopicImpl.NEWER_THAN_COMPACTED));
// shuffle to make cache work hard
- Collections.shuffle(positions);
- Collections.shuffle(idsInGaps);
+ Collections.shuffle(positions, r);
+ Collections.shuffle(idsInGaps, r);
// Check ids we know are in compacted ledger
for (Pair<MessageIdData, Long> p : positions) {
PositionImpl pos = new PositionImpl(p.getLeft().getLedgerId(), p.getLeft().getEntryId());
- if (p.equals(lastPosition)) {
- Assert.assertEquals(CompactedTopicImpl.findStartPoint(pos, lastEntryId, cache).get(),
- Long.valueOf(CompactedTopicImpl.NEWER_THAN_COMPACTED));
- } else {
- Assert.assertEquals(CompactedTopicImpl.findStartPoint(pos, lastEntryId, cache).get(),
- Long.valueOf(p.getRight() + 1));
- }
+ Long got = CompactedTopicImpl.findStartPoint(pos, lastEntryId, cache).get();
+ Assert.assertEquals(got, Long.valueOf(p.getRight()));
}
// Check ids we know are in the gaps of the compacted ledger
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 6a175b2..7a22bb1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -294,4 +294,52 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
Assert.assertEquals(m.getData(), "content0".getBytes());
}
}
+
+ @Test
+ public void testFirstMessageRetained() throws Exception {
+ String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+ // subscribe before sending anything, so that we get all messages
+ pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
+ .readCompacted(true).subscribe().close();
+
+ try (Producer producer = pulsarClient.createProducer(topic)) {
+ producer.sendAsync(MessageBuilder.create()
+ .setKey("key1")
+ .setContent("my-message-1".getBytes()).build());
+ producer.sendAsync(MessageBuilder.create()
+ .setKey("key2")
+ .setContent("my-message-2".getBytes()).build());
+ producer.sendAsync(MessageBuilder.create()
+ .setKey("key2")
+ .setContent("my-message-3".getBytes()).build()).get();
+ }
+
+ // Read messages before compaction to get ids
+ List<Message> messages = new ArrayList<>();
+ try (Consumer consumer = pulsarClient.newConsumer().topic(topic)
+ .subscriptionName("sub1").readCompacted(true).subscribe()) {
+ messages.add(consumer.receive());
+ messages.add(consumer.receive());
+ messages.add(consumer.receive());
+ }
+
+ // compact the topic
+ Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
+ compactor.compact(topic).get();
+
+ // Check that messages after compaction have same ids
+ try (Consumer consumer = pulsarClient.newConsumer().topic(topic)
+ .subscriptionName("sub1").readCompacted(true).subscribe()){
+ Message message1 = consumer.receive();
+ Assert.assertEquals(message1.getKey(), "key1");
+ Assert.assertEquals(new String(message1.getData()), "my-message-1");
+ Assert.assertEquals(message1.getMessageId(), messages.get(0).getMessageId());
+
+ Message message2 = consumer.receive();
+ Assert.assertEquals(message2.getKey(), "key2");
+ Assert.assertEquals(new String(message2.getData()), "my-message-3");
+ Assert.assertEquals(message2.getMessageId(), messages.get(2).getMessageId());
+ }
+ }
}
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.