You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/09/18 10:09:57 UTC
[incubator-pulsar] branch master updated: Fix: Compaction with last
deleted keys not completing compaction (#2591)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8d50617 Fix: Compaction with last deleted keys not completing compaction (#2591)
8d50617 is described below
commit 8d5061779e7a869e6e5981d2010e671ed0afa322
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Tue Sep 18 03:09:50 2018 -0700
Fix: Compaction with last deleted keys not completing compaction (#2591)
### Motivation
Right now, topic-compaction ignores the message-id with empty payload but if the last message in the ledger has empty payload then compactor doesn't complete the compaction because compactor ignores last message and doesn't complete the result-future so, caller never sees complete result.
### Modifications
- Compactor calculates` from` and `to` position for compacted ledger according to last non-deleted active key.
- Compactor handles tail deleted keys from the ledger and completes compaction process gracefully.
### Result
compactor can successfully compact ledger whose last message is also deleted.
---
.../pulsar/compaction/TwoPhaseCompactor.java | 90 +++++++++++++---------
.../apache/pulsar/compaction/CompactionTest.java | 56 ++++++++++++++
2 files changed, 111 insertions(+), 35 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index cc3f710..425e049 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -76,7 +76,7 @@ public class TwoPhaseCompactor extends Compactor {
@Override
protected CompletableFuture<Long> doCompaction(RawReader reader, BookKeeper bk) {
return phaseOne(reader).thenCompose(
- (r) -> phaseTwo(reader, r.from, r.to, r.latestForKey, bk));
+ (r) -> phaseTwo(reader, r.from, r.to, r.lastReadId, r.latestForKey, bk));
}
private CompletableFuture<PhaseOneResult> phaseOne(RawReader reader) {
@@ -90,7 +90,8 @@ public class TwoPhaseCompactor extends Compactor {
} else {
log.info("Commencing phase one of compaction for {}, reading to {}",
reader.getTopic(), lastMessageId);
- phaseOneLoop(reader, Optional.empty(), lastMessageId, latestForKey, loopPromise);
+ phaseOneLoop(reader, Optional.empty(), Optional.empty(), lastMessageId, latestForKey,
+ loopPromise);
}
});
return loopPromise;
@@ -98,6 +99,7 @@ public class TwoPhaseCompactor extends Compactor {
private void phaseOneLoop(RawReader reader,
Optional<MessageId> firstMessageId,
+ Optional<MessageId> toMessageId,
MessageId lastMessageId,
Map<String,MessageId> latestForKey,
CompletableFuture<PhaseOneResult> loopPromise) {
@@ -114,6 +116,7 @@ public class TwoPhaseCompactor extends Compactor {
return;
}
MessageId id = m.getMessageId();
+ boolean deletedMessage = false;
if (RawBatchConverter.isReadableBatch(m)) {
try {
RawBatchConverter.extractIdsAndKeys(m)
@@ -125,16 +128,23 @@ public class TwoPhaseCompactor extends Compactor {
} else {
Pair<String,Integer> keyAndSize = extractKeyAndSize(m);
if (keyAndSize != null) {
- latestForKey.put(keyAndSize.getLeft(), id);
+ if(keyAndSize.getRight() > 0) {
+ latestForKey.put(keyAndSize.getLeft(), id);
+ } else {
+ deletedMessage = true;
+ latestForKey.remove(keyAndSize.getLeft());
+ }
}
}
+ MessageId first = firstMessageId.orElse(deletedMessage ? null : id);
+ MessageId to = deletedMessage ? toMessageId.orElse(null) : id;
if (id.compareTo(lastMessageId) == 0) {
- loopPromise.complete(new PhaseOneResult(firstMessageId.orElse(id),
- id, latestForKey));
+ loopPromise.complete(new PhaseOneResult(first, to, lastMessageId, latestForKey));
} else {
phaseOneLoop(reader,
- Optional.of(firstMessageId.orElse(id)),
+ Optional.ofNullable(first),
+ Optional.ofNullable(to),
lastMessageId,
latestForKey, loopPromise);
}
@@ -153,40 +163,38 @@ public class TwoPhaseCompactor extends Compactor {
});
}
- private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, MessageId to,
- Map<String,MessageId> latestForKey, BookKeeper bk) {
+ private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, MessageId to, MessageId lastReadId,
+ Map<String, MessageId> latestForKey, BookKeeper bk) {
Map<String, byte[]> metadata = ImmutableMap.of("compactedTopic", reader.getTopic().getBytes(UTF_8),
- "compactedTo", to.toByteArray());
+ "compactedTo", to.toByteArray());
return createLedger(bk, metadata).thenCompose((ledger) -> {
- log.info("Commencing phase two of compaction for {}, from {} to {}, compacting {} keys to ledger {}",
- reader.getTopic(), from, to, latestForKey.size(), ledger.getId());
- return phaseTwoSeekThenLoop(reader, from, to, latestForKey, bk, ledger);
- });
+ log.info("Commencing phase two of compaction for {}, from {} to {}, compacting {} keys to ledger {}",
+ reader.getTopic(), from, to, latestForKey.size(), ledger.getId());
+ return phaseTwoSeekThenLoop(reader, from, to, lastReadId, latestForKey, bk, ledger);
+ });
}
private CompletableFuture<Long> phaseTwoSeekThenLoop(RawReader reader, MessageId from, MessageId to,
- Map<String, MessageId> latestForKey,
- BookKeeper bk, LedgerHandle ledger) {
+ MessageId lastReadId, Map<String, MessageId> latestForKey, BookKeeper bk, LedgerHandle ledger) {
CompletableFuture<Long> promise = new CompletableFuture<>();
reader.seekAsync(from).thenCompose((v) -> {
- Semaphore outstanding = new Semaphore(MAX_OUTSTANDING);
- CompletableFuture<Void> loopPromise = new CompletableFuture<Void>();
- phaseTwoLoop(reader, to, latestForKey, ledger, outstanding, loopPromise);
- return loopPromise;
- }).thenCompose((v) -> closeLedger(ledger))
- .thenCompose((v) -> reader.acknowledgeCumulativeAsync(
- to, ImmutableMap.of(COMPACTED_TOPIC_LEDGER_PROPERTY, ledger.getId())))
- .whenComplete((res, exception) -> {
+ Semaphore outstanding = new Semaphore(MAX_OUTSTANDING);
+ CompletableFuture<Void> loopPromise = new CompletableFuture<Void>();
+ phaseTwoLoop(reader, to, latestForKey, ledger, outstanding, loopPromise);
+ return loopPromise;
+ }).thenCompose((v) -> closeLedger(ledger))
+ .thenCompose((v) -> reader.acknowledgeCumulativeAsync(lastReadId,
+ ImmutableMap.of(COMPACTED_TOPIC_LEDGER_PROPERTY, ledger.getId())))
+ .whenComplete((res, exception) -> {
if (exception != null) {
- deleteLedger(bk, ledger)
- .whenComplete((res2, exception2) -> {
- if (exception2 != null) {
- log.warn("Cleanup of ledger {} for failed", ledger, exception2);
- }
- // complete with original exception
- promise.completeExceptionally(exception);
- });
+ deleteLedger(bk, ledger).whenComplete((res2, exception2) -> {
+ if (exception2 != null) {
+ log.warn("Cleanup of ledger {} for failed", ledger, exception2);
+ }
+ // complete with original exception
+ promise.completeExceptionally(exception);
+ });
} else {
promise.complete(ledger.getId());
}
@@ -217,13 +225,23 @@ public class TwoPhaseCompactor extends Compactor {
}
} else {
Pair<String,Integer> keyAndSize = extractKeyAndSize(m);
+ MessageId msg;
if (keyAndSize == null) { // pass through messages without a key
messageToAdd = Optional.of(m);
- } else if (latestForKey.get(keyAndSize.getLeft()).equals(id)
- && keyAndSize.getRight() > 0) {
+ } else if ((msg = latestForKey.get(keyAndSize.getLeft())) != null
+ && msg.equals(id)) { // consider message only if present into latestForKey map
+ if (keyAndSize.getRight() <= 0) {
+ promise.completeExceptionally(new IllegalArgumentException(
+ "Compaction phase found empty record from sorted key-map"));
+ }
messageToAdd = Optional.of(m);
} else {
m.close();
+ // Reached to last-id and phase-one found it deleted-message while iterating on ledger so, not
+ // present under latestForKey. Complete the compaction.
+ if (to.equals(id)) {
+ promise.complete(null);
+ }
}
}
@@ -330,12 +348,14 @@ public class TwoPhaseCompactor extends Compactor {
private static class PhaseOneResult {
final MessageId from;
- final MessageId to;
+ final MessageId to; // last undeleted messageId
+ final MessageId lastReadId; // last read messageId
final Map<String,MessageId> latestForKey;
- PhaseOneResult(MessageId from, MessageId to, Map<String,MessageId> latestForKey) {
+ PhaseOneResult(MessageId from, MessageId to, MessageId lastReadId, Map<String,MessageId> latestForKey) {
this.from = from;
this.to = to;
+ this.lastReadId = lastReadId;
this.latestForKey = latestForKey;
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 95a10fa..32c93b4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -60,6 +60,9 @@ import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.testng.Assert;
+
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -1212,4 +1215,57 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
Assert.assertEquals(new String(message5.getData()), "my-message-4");
}
}
+
+ @Test(timeOut = 20000)
+ public void testCompactionWithLastDeletedKey() throws Exception {
+ String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition).create();
+
+ pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
+
+ producer.newMessage().key("1").value("1".getBytes()).send();
+ producer.newMessage().key("2").value("2".getBytes()).send();
+ producer.newMessage().key("3").value("3".getBytes()).send();
+ producer.newMessage().key("1").value("".getBytes()).send();
+ producer.newMessage().key("2").value("".getBytes()).send();
+
+ Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
+ compactor.compact(topic).get();
+
+ Set<String> expected = Sets.newHashSet("3");
+ // consumer with readCompacted enabled only get compacted entries
+ try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
+ .readCompacted(true).subscribe()) {
+ Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS);
+ assertTrue(expected.remove(m.getKey()));
+ }
+ }
+
+ @Test(timeOut = 20000)
+ public void testEmptyCompactionLedger() throws Exception {
+ String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition).create();
+
+ pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
+
+ producer.newMessage().key("1").value("1".getBytes()).send();
+ producer.newMessage().key("2").value("2".getBytes()).send();
+ producer.newMessage().key("1").value("".getBytes()).send();
+ producer.newMessage().key("2").value("".getBytes()).send();
+
+ Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
+ compactor.compact(topic).get();
+
+ // consumer with readCompacted enabled only get compacted entries
+ try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
+ .readCompacted(true).subscribe()) {
+ Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS);
+ assertNull(m);
+ }
+ }
+
}