You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/09 08:16:36 UTC
[pulsar] 03/09: [Transaction] add method to clear up transaction
buffer snapshot (#11934)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2cd8d47b60645339f643d7ba53741ef9cd324645
Author: ran <ga...@126.com>
AuthorDate: Tue Sep 7 14:11:05 2021 +0800
[Transaction] add method to clear up transaction buffer snapshot (#11934)
(cherry picked from commit d86db3f4ec4fb6bd04216a123cde2fee5c43f9d9)
---
.../broker/service/persistent/PersistentTopic.java | 5 +-
.../pulsar/broker/systopic/SystemTopicClient.java | 19 ++++++
.../TransactionBufferSystemTopicClient.java | 16 +++++
.../transaction/buffer/TransactionBuffer.java | 7 +++
.../buffer/impl/InMemTransactionBuffer.java | 5 ++
.../buffer/impl/TopicTransactionBuffer.java | 9 +++
.../buffer/impl/TransactionBufferDisable.java | 5 ++
.../TopicTransactionBufferRecoverTest.java | 72 +++++++++++++++++++++-
8 files changed, 136 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index a4db0bc..e165916 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -47,6 +47,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
+import lombok.Getter;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
@@ -218,6 +219,7 @@ public class PersistentTopic extends AbstractTopic
// this future is for publish txn message in order.
private volatile CompletableFuture<Void> transactionCompletableFuture;
+ @Getter
protected final TransactionBuffer transactionBuffer;
private final LongAdder bytesOutFromRemovedSubscriptions = new LongAdder();
@@ -1108,7 +1110,8 @@ public class PersistentTopic extends AbstractTopic
CompletableFuture<SchemaVersion> deleteSchemaFuture =
deleteSchema ? deleteSchema() : CompletableFuture.completedFuture(null);
- deleteSchemaFuture.thenAccept(__ -> deleteTopicPolicies()).whenComplete((v, ex) -> {
+ deleteSchemaFuture.thenAccept(__ -> deleteTopicPolicies())
+ .thenCompose(__ -> transactionBuffer.clearSnapshot()).whenComplete((v, ex) -> {
if (ex != null) {
log.error("[{}] Error deleting topic", topic, ex);
unfenceTopicToResume();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
index 3f5a0a9..ceb1df6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
@@ -104,6 +104,25 @@ public interface SystemTopicClient<T> {
CompletableFuture<MessageId> writeAsync(T t);
/**
+ * Delete event in the system topic.
+ * @param t pulsar event
+ * @return message id
+ * @throws PulsarClientException exception while write event cause
+ */
+ default MessageId delete(T t) throws PulsarClientException {
+ throw new UnsupportedOperationException("Unsupported operation");
+ }
+
+ /**
+ * Async delete event in the system topic.
+ * @param t pulsar event
+ * @return message id future
+ */
+ default CompletableFuture<MessageId> deleteAsync(T t) {
+ throw new UnsupportedOperationException("Unsupported operation");
+ }
+
+ /**
* Close the system topic writer.
*/
void close() throws IOException;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSystemTopicClient.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSystemTopicClient.java
index 81b7096..807bb9d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSystemTopicClient.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSystemTopicClient.java
@@ -105,6 +105,22 @@ public class TransactionBufferSystemTopicClient extends SystemTopicClientBase<Tr
}
@Override
+ public MessageId delete(TransactionBufferSnapshot transactionBufferSnapshot) throws PulsarClientException {
+ return producer.newMessage()
+ .key(transactionBufferSnapshot.getTopicName())
+ .value(null)
+ .send();
+ }
+
+ @Override
+ public CompletableFuture<MessageId> deleteAsync(TransactionBufferSnapshot transactionBufferSnapshot) {
+ return producer.newMessage()
+ .key(transactionBufferSnapshot.getTopicName())
+ .value(null)
+ .sendAsync();
+ }
+
+ @Override
public void close() throws IOException {
this.producer.close();
transactionBufferSystemTopicClient.removeWriter(this);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
index c2f6006..6ffc218 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
@@ -125,6 +125,13 @@ public interface TransactionBuffer {
CompletableFuture<Void> purgeTxns(List<Long> dataLedgers);
/**
+ * Clear up the snapshot of the TransactionBuffer.
+ *
+ * @return Clear up operation result.
+ */
+ CompletableFuture<Void> clearSnapshot();
+
+ /**
* Close the buffer asynchronously.
*
* @return
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
index 43ed06f..213c7d0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
@@ -346,6 +346,11 @@ class InMemTransactionBuffer implements TransactionBuffer {
}
@Override
+ public CompletableFuture<Void> clearSnapshot() {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
public CompletableFuture<Void> closeAsync() {
buffers.values().forEach(TxnBuffer::close);
return CompletableFuture.completedFuture(null);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index c33c404..220b432 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -372,6 +372,15 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
}
@Override
+ public CompletableFuture<Void> clearSnapshot() {
+ return this.takeSnapshotWriter.thenCompose(writer -> {
+ TransactionBufferSnapshot snapshot = new TransactionBufferSnapshot();
+ snapshot.setTopicName(topic.getName());
+ return writer.deleteAsync(snapshot);
+ }).thenCompose(__ -> CompletableFuture.completedFuture(null));
+ }
+
+ @Override
public CompletableFuture<Void> closeAsync() {
changeToCloseState();
return this.takeSnapshotWriter.thenCompose(SystemTopicClient.Writer::closeAsync);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
index 4b50e55..ff18924 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
@@ -69,6 +69,11 @@ public class TransactionBufferDisable implements TransactionBuffer {
}
@Override
+ public CompletableFuture<Void> clearSnapshot() {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
public CompletableFuture<Void> closeAsync() {
return CompletableFuture.completedFuture(null);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index d79d3c8..956b86e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Sets;
import java.io.IOException;
import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -33,9 +34,11 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.commons.collections4.map.LinkedMap;
+import org.apache.commons.lang3.RandomUtils;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
import org.apache.pulsar.client.api.Consumer;
@@ -50,11 +53,11 @@ import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
+import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.events.EventsTopicNames;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
@@ -403,4 +406,71 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
}
assertTrue(exist);
}
+
+ @Test
+ public void clearTransactionBufferSnapshotTest() throws Exception {
+ String topic = NAMESPACE1 + "/tb-snapshot-delete-" + RandomUtils.nextInt();
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient
+ .newProducer()
+ .topic(topic)
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .create();
+
+ Transaction txn = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build().get();
+ producer.newMessage(txn).value("test".getBytes()).sendAsync();
+ producer.newMessage(txn).value("test".getBytes()).sendAsync();
+ txn.commit().get();
+
+ // take snapshot
+ PersistentTopic originalTopic = (PersistentTopic) getPulsarServiceList().get(0)
+ .getBrokerService().getTopic(TopicName.get(topic).toString(), false).get().get();
+ TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer) originalTopic.getTransactionBuffer();
+ Method takeSnapshotMethod = TopicTransactionBuffer.class.getDeclaredMethod("takeSnapshot");
+ takeSnapshotMethod.setAccessible(true);
+ takeSnapshotMethod.invoke(topicTransactionBuffer);
+
+ TopicName transactionBufferTopicName =
+ NamespaceEventsSystemTopicFactory.getSystemTopicName(
+ TopicName.get(topic).getNamespaceObject(), EventType.TRANSACTION_BUFFER_SNAPSHOT);
+ PersistentTopic snapshotTopic = (PersistentTopic) getPulsarServiceList().get(0)
+ .getBrokerService().getTopic(transactionBufferTopicName.toString(), false).get().get();
+ Field field = PersistentTopic.class.getDeclaredField("currentCompaction");
+ field.setAccessible(true);
+
+ // Trigger compaction and make sure it is finished.
+ checkSnapshotCount(transactionBufferTopicName, true, snapshotTopic, field);
+ admin.topics().delete(topic, true);
+ checkSnapshotCount(transactionBufferTopicName, false, snapshotTopic, field);
+ }
+
+ private void checkSnapshotCount(TopicName topicName, boolean hasSnapshot,
+ PersistentTopic persistentTopic, Field field) throws Exception {
+ persistentTopic.triggerCompaction();
+ CompletableFuture<Long> compactionFuture = (CompletableFuture<Long>) field.get(persistentTopic);
+ Awaitility.await().untilAsserted(() -> assertTrue(compactionFuture.isDone()));
+
+ Reader<TransactionBufferSnapshot> reader = pulsarClient.newReader(Schema.AVRO(TransactionBufferSnapshot.class))
+ .readCompacted(true)
+ .startMessageId(MessageId.earliest)
+ .startMessageIdInclusive()
+ .topic(topicName.toString())
+ .create();
+
+ int count = 0;
+ while (true) {
+ Message<TransactionBufferSnapshot> snapshotMsg = reader.readNext(2, TimeUnit.SECONDS);
+ if (snapshotMsg != null) {
+ count++;
+ } else {
+ break;
+ }
+ }
+ assertTrue(hasSnapshot ? count > 0 : count == 0);
+ reader.close();
+ }
+
}