You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/07/06 03:34:28 UTC
[pulsar] branch branch-2.7 updated: [Branch-2.7][Cherry-pick] Fix the reader skips compacted data which original ledger been removed. (#16407)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new f78c63b6d06 [Branch-2.7][Cherry-pick] Fix the reader skips compacted data which original ledger been removed. (#16407)
f78c63b6d06 is described below
commit f78c63b6d06c1efb759f3f2171186cddca6bc6eb
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Wed Jul 6 11:34:20 2022 +0800
[Branch-2.7][Cherry-pick] Fix the reader skips compacted data which original ledger been removed. (#16407)
---
.../service/persistent/CompactorSubscription.java | 20 +++++++++-----------
.../org/apache/pulsar/compaction/CompactedTopic.java | 3 ++-
.../apache/pulsar/compaction/CompactedTopicImpl.java | 16 +++++++++-------
.../pulsar/broker/service/PersistentTopicTest.java | 4 ++++
.../apache/pulsar/compaction/CompactedTopicTest.java | 14 +++++++++++++-
5 files changed, 37 insertions(+), 20 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
index 94af8063f5a..ea249ca9471 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
@@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkArgument;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -36,7 +35,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CompactorSubscription extends PersistentSubscription {
- private CompactedTopic compactedTopic;
+ private final CompactedTopic compactedTopic;
public CompactorSubscription(PersistentTopic topic, CompactedTopic compactedTopic,
String subscriptionName, ManagedCursor cursor) {
@@ -67,15 +66,17 @@ public class CompactorSubscription extends PersistentSubscription {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Cumulative ack on compactor subscription {}", topicName, subName, position);
}
- CompletableFuture<Void> future = new CompletableFuture<>();
- cursor.asyncMarkDelete(position, properties, new MarkDeleteCallback() {
+ compactedTopic.newCompactedLedger(position, compactedLedgerId).thenAccept(previousContext -> {
+ cursor.asyncMarkDelete(position, properties, new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Mark deleted messages until position on compactor subscription {}",
- topicName, subName, position);
+ topicName, subName, position);
+ }
+ if (previousContext != null) {
+ compactedTopic.deleteCompactedLedger(previousContext.getLedger().getId());
}
- future.complete(null);
}
@Override
@@ -83,19 +84,16 @@ public class CompactorSubscription extends PersistentSubscription {
// TODO: cut consumer connection on markDeleteFailed
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Failed to mark delete for position on compactor subscription {}",
- topicName, subName, ctx, exception);
+ topicName, subName, ctx, exception);
}
}
}, null);
+ });
if (topic.getManagedLedger().isTerminated() && cursor.getNumberOfEntriesInBacklog(false) == 0) {
// Notify all consumer that the end of topic was reached
dispatcher.getConsumers().forEach(Consumer::reachedEndOfTopic);
}
-
- // Once properties have been persisted, we can notify the compacted topic to use
- // the new ledger
- future.thenAccept((v) -> compactedTopic.newCompactedLedger(position, compactedLedgerId));
}
private static final Logger log = LoggerFactory.getLogger(CompactorSubscription.class);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
index c91931d071a..17e55eeae76 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
@@ -27,7 +27,8 @@ import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.service.Consumer;
public interface CompactedTopic {
- CompletableFuture<?> newCompactedLedger(Position p, long compactedLedgerId);
+ CompletableFuture<CompactedTopicImpl.CompactedTopicContext> newCompactedLedger(Position p, long compactedLedgerId);
+ CompletableFuture<Void> deleteCompactedLedger(long compactedLedgerId);
void asyncReadEntriesOrWait(ManagedCursor cursor,
int numberOfEntriesToRead,
boolean isFirstRead,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index c3af219f21b..83c9fd1d684 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -65,7 +65,7 @@ public class CompactedTopicImpl implements CompactedTopic {
}
@Override
- public CompletableFuture<?> newCompactedLedger(Position p, long compactedLedgerId) {
+ public CompletableFuture<CompactedTopicContext> newCompactedLedger(Position p, long compactedLedgerId) {
synchronized (this) {
compactionHorizon = (PositionImpl)p;
@@ -73,15 +73,16 @@ public class CompactedTopicImpl implements CompactedTopic {
compactedTopicContext = openCompactedLedger(bk, compactedLedgerId);
// delete the ledger from the old context once the new one is open
- if (previousContext != null) {
- return compactedTopicContext.thenCompose((res) -> previousContext)
- .thenCompose((res) -> tryDeleteCompactedLedger(bk, res.ledger.getId()));
- } else {
- return compactedTopicContext;
- }
+ return compactedTopicContext.thenCompose(__ ->
+ previousContext != null ? previousContext : CompletableFuture.completedFuture(null));
}
}
+ @Override
+ public CompletableFuture<Void> deleteCompactedLedger(long compactedLedgerId) {
+ return tryDeleteCompactedLedger(bk, compactedLedgerId);
+ }
+
@Override
public void asyncReadEntriesOrWait(ManagedCursor cursor,
int numberOfEntriesToRead,
@@ -323,6 +324,7 @@ public class CompactedTopicImpl implements CompactedTopic {
public synchronized Optional<Position> getCompactionHorizon() {
return Optional.ofNullable(this.compactionHorizon);
}
+
private static final Logger log = LoggerFactory.getLogger(CompactedTopicImpl.class);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 17093ab41d9..1d265c8a368 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -22,6 +22,7 @@ import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMo
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.matches;
@@ -117,6 +118,7 @@ import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
import org.apache.pulsar.compaction.CompactedTopic;
+import org.apache.pulsar.compaction.CompactedTopicImpl;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
@@ -1509,6 +1511,8 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
public void testCompactorSubscription() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
CompactedTopic compactedTopic = mock(CompactedTopic.class);
+ when(compactedTopic.newCompactedLedger(any(Position.class), anyLong()))
+ .thenReturn(CompletableFuture.completedFuture(mock(CompactedTopicImpl.CompactedTopicContext.class)));
PersistentSubscription sub = new CompactorSubscription(topic, compactedTopic,
Compactor.COMPACTION_SUBSCRIPTION,
cursorMock);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index 58be6d8936f..2252ed9a214 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -237,7 +237,19 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close();
// update the compacted topic ledger
- compactedTopic.newCompactedLedger(new PositionImpl(1,2), newCompactedLedger.getId()).get();
+ PositionImpl newHorizon = new PositionImpl(1,3);
+ compactedTopic.newCompactedLedger(newHorizon, newCompactedLedger.getId()).get();
+
+ // Make sure the old compacted ledger still exist after the new compacted ledger created.
+ bk.openLedger(oldCompactedLedger.getId(),
+ Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
+ Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close();
+
+ Assert.assertTrue(compactedTopic.getCompactedTopicContext().isPresent());
+ Assert.assertEquals(compactedTopic.getCompactedTopicContext().get().getLedger().getId(),
+ newCompactedLedger.getId());
+ Assert.assertEquals(compactedTopic.getCompactionHorizon().get(), newHorizon);
+ compactedTopic.deleteCompactedLedger(oldCompactedLedger.getId()).join();
// old ledger should be deleted, new still there
try {