You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/04 07:35:55 UTC
[pulsar] branch branch-2.8 updated: [Branch-2.8][Cherry-pick] Fix compaction subscription acknowledge Marker msg issue. (#16918)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new f6015a4e400 [Branch-2.8][Cherry-pick] Fix compaction subscription acknowledge Marker msg issue. (#16918)
f6015a4e400 is described below
commit f6015a4e40094a5bcfe18be96ad4e6d5aa5dc10c
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Thu Aug 4 15:35:47 2022 +0800
[Branch-2.8][Cherry-pick] Fix compaction subscription acknowledge Marker msg issue. (#16918)
---
.../broker/service/AbstractBaseDispatcher.java | 16 +++--
.../apache/pulsar/compaction/CompactionTest.java | 70 ++++++++++++++++++++++
2 files changed, 82 insertions(+), 4 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 3646ae67f23..1195263fbf0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -23,15 +23,18 @@ import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import io.netty.buffer.ByteBuf;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
+import org.apache.pulsar.broker.service.persistent.CompactorSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
@@ -128,13 +131,13 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
if (!isReplayRead && msgMetadata != null && msgMetadata.hasTxnidMostBits()
&& msgMetadata.hasTxnidLeastBits()) {
if (Markers.isTxnMarker(msgMetadata)) {
+ individualAcknowledgeMessageIfNeeded(entry.getPosition(), Collections.emptyMap());
entries.set(i, null);
entry.release();
continue;
} else if (((PersistentTopic) subscription.getTopic())
.isTxnAborted(new TxnID(msgMetadata.getTxnidMostBits(), msgMetadata.getTxnidLeastBits()))) {
- subscription.acknowledgeMessage(Collections.singletonList(entry.getPosition()), AckType.Individual,
- Collections.emptyMap());
+ individualAcknowledgeMessageIfNeeded(entry.getPosition(), Collections.emptyMap());
entries.set(i, null);
entry.release();
continue;
@@ -149,8 +152,7 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
entries.set(i, null);
entry.release();
- subscription.acknowledgeMessage(Collections.singletonList(pos), AckType.Individual,
- Collections.emptyMap());
+ individualAcknowledgeMessageIfNeeded(pos, Collections.emptyMap());
continue;
} else if (msgMetadata.hasDeliverAtTime()
&& trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) {
@@ -186,6 +188,12 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
sendMessageInfo.setTotalChunkedMessages(totalChunkedMessages);
}
+ private void individualAcknowledgeMessageIfNeeded(Position position, Map<String, Long> properties) {
+ if (!(subscription instanceof CompactorSubscription)) {
+ subscription.acknowledgeMessage(Collections.singletonList(position), AckType.Individual, properties);
+ }
+ }
+
/**
* Determine whether the number of consumers on the subscription reaches the threshold.
* @return
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index c9e0d95f7ff..622e6e6add7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -28,6 +28,7 @@ import static org.testng.Assert.assertTrue;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
@@ -43,11 +44,18 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import io.netty.buffer.ByteBuf;
+import lombok.Cleanup;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.api.OpenBuilder;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
+import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
@@ -61,13 +69,17 @@ import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -1656,4 +1668,62 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
assertNull(none);
}
}
+
+ @Test(timeOut = 60000)
+ public void testCompactionWithMarker() throws Exception {
+ String namespace = "my-property/use/my-ns";
+ final TopicName dest = TopicName.get(
+ BrokerTestUtil.newUniqueName("persistent://" + namespace + "/testWriteMarker"));
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(dest.toString())
+ .subscriptionName("test-compaction-sub")
+ .subscriptionType(SubscriptionType.Exclusive)
+ .readCompacted(true)
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
+ .subscribe();
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(dest.toString())
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+ producer.send("msg-1".getBytes(StandardCharsets.UTF_8));
+ Optional<Topic> topic = pulsar.getBrokerService().getTopic(dest.toString(), true).join();
+ Assert.assertTrue(topic.isPresent());
+ PersistentTopic persistentTopic = (PersistentTopic) topic.get();
+ Random random = new Random();
+ for (int i = 0; i < 100; i++) {
+ int rad = random.nextInt(3);
+ ByteBuf marker;
+ if (rad == 0) {
+ marker = Markers.newTxnCommitMarker(-1L, 0, i);
+ } else if (rad == 1) {
+ marker = Markers.newTxnAbortMarker(-1L, 0, i);
+ } else {
+ marker = Markers.newReplicatedSubscriptionsSnapshotRequest(UUID.randomUUID().toString(), "r1");
+ }
+ persistentTopic.getManagedLedger().asyncAddEntry(marker, new AsyncCallbacks.AddEntryCallback() {
+ @Override
+ public void addComplete(Position position, ByteBuf entryData, Object ctx) {
+ //
+ }
+
+ @Override
+ public void addFailed(ManagedLedgerException exception, Object ctx) {
+ //
+ }
+ }, null);
+ marker.release();
+ }
+ producer.send("msg-2".getBytes(StandardCharsets.UTF_8));
+ admin.topics().triggerCompaction(dest.toString());
+ Awaitility.await()
+ .atMost(50, TimeUnit.SECONDS)
+ .pollInterval(1, TimeUnit.SECONDS)
+ .untilAsserted(() -> {
+ long ledgerId = admin.topics().getInternalStats(dest.toString()).compactedLedger.ledgerId;
+ Assert.assertNotEquals(ledgerId, -1L);
+ });
+ }
}