You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/09 08:16:36 UTC

[pulsar] 03/09: [Transaction] add method to clear up transaction buffer snapshot (#11934)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2cd8d47b60645339f643d7ba53741ef9cd324645
Author: ran <ga...@126.com>
AuthorDate: Tue Sep 7 14:11:05 2021 +0800

    [Transaction] add method to clear up transaction buffer snapshot (#11934)
    
    
    (cherry picked from commit d86db3f4ec4fb6bd04216a123cde2fee5c43f9d9)
---
 .../broker/service/persistent/PersistentTopic.java |  5 +-
 .../pulsar/broker/systopic/SystemTopicClient.java  | 19 ++++++
 .../TransactionBufferSystemTopicClient.java        | 16 +++++
 .../transaction/buffer/TransactionBuffer.java      |  7 +++
 .../buffer/impl/InMemTransactionBuffer.java        |  5 ++
 .../buffer/impl/TopicTransactionBuffer.java        |  9 +++
 .../buffer/impl/TransactionBufferDisable.java      |  5 ++
 .../TopicTransactionBufferRecoverTest.java         | 72 +++++++++++++++++++++-
 8 files changed, 136 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index a4db0bc..e165916 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -47,6 +47,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
+import lombok.Getter;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
@@ -218,6 +219,7 @@ public class PersistentTopic extends AbstractTopic
 
     // this future is for publish txn message in order.
     private volatile CompletableFuture<Void> transactionCompletableFuture;
+    @Getter
     protected final TransactionBuffer transactionBuffer;
 
     private final LongAdder bytesOutFromRemovedSubscriptions = new LongAdder();
@@ -1108,7 +1110,8 @@ public class PersistentTopic extends AbstractTopic
                     CompletableFuture<SchemaVersion> deleteSchemaFuture =
                             deleteSchema ? deleteSchema() : CompletableFuture.completedFuture(null);
 
-                    deleteSchemaFuture.thenAccept(__ -> deleteTopicPolicies()).whenComplete((v, ex) -> {
+                    deleteSchemaFuture.thenAccept(__ -> deleteTopicPolicies())
+                            .thenCompose(__ -> transactionBuffer.clearSnapshot()).whenComplete((v, ex) -> {
                         if (ex != null) {
                             log.error("[{}] Error deleting topic", topic, ex);
                             unfenceTopicToResume();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
index 3f5a0a9..ceb1df6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
@@ -104,6 +104,25 @@ public interface SystemTopicClient<T> {
         CompletableFuture<MessageId> writeAsync(T t);
 
         /**
+         * Delete event in the system topic.
+         * @param t pulsar event
+         * @return message id
+         * @throws PulsarClientException exception while write event cause
+         */
+        default MessageId delete(T t) throws PulsarClientException {
+            throw new UnsupportedOperationException("Unsupported operation");
+        }
+
+        /**
+         * Async delete event in the system topic.
+         * @param t pulsar event
+         * @return message id future
+         */
+        default CompletableFuture<MessageId> deleteAsync(T t) {
+            throw new UnsupportedOperationException("Unsupported operation");
+        }
+
+        /**
          * Close the system topic writer.
          */
         void close() throws IOException;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSystemTopicClient.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSystemTopicClient.java
index 81b7096..807bb9d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSystemTopicClient.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSystemTopicClient.java
@@ -105,6 +105,22 @@ public class TransactionBufferSystemTopicClient extends SystemTopicClientBase<Tr
         }
 
         @Override
+        public MessageId delete(TransactionBufferSnapshot transactionBufferSnapshot) throws PulsarClientException {
+            return producer.newMessage()
+                    .key(transactionBufferSnapshot.getTopicName())
+                    .value(null)
+                    .send();
+        }
+
+        @Override
+        public CompletableFuture<MessageId> deleteAsync(TransactionBufferSnapshot transactionBufferSnapshot) {
+            return producer.newMessage()
+                    .key(transactionBufferSnapshot.getTopicName())
+                    .value(null)
+                    .sendAsync();
+        }
+
+        @Override
         public void close() throws IOException {
             this.producer.close();
             transactionBufferSystemTopicClient.removeWriter(this);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
index c2f6006..6ffc218 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
@@ -125,6 +125,13 @@ public interface TransactionBuffer {
     CompletableFuture<Void> purgeTxns(List<Long> dataLedgers);
 
     /**
+     * Clear up the snapshot of the TransactionBuffer.
+     *
+     * @return Clear up operation result.
+     */
+    CompletableFuture<Void> clearSnapshot();
+
+    /**
      * Close the buffer asynchronously.
      *
      * @return
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
index 43ed06f..213c7d0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
@@ -346,6 +346,11 @@ class InMemTransactionBuffer implements TransactionBuffer {
     }
 
     @Override
+    public CompletableFuture<Void> clearSnapshot() {
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
     public CompletableFuture<Void> closeAsync() {
         buffers.values().forEach(TxnBuffer::close);
         return CompletableFuture.completedFuture(null);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index c33c404..220b432 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -372,6 +372,15 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
     }
 
     @Override
+    public CompletableFuture<Void> clearSnapshot() {
+        return this.takeSnapshotWriter.thenCompose(writer -> {
+            TransactionBufferSnapshot snapshot = new TransactionBufferSnapshot();
+            snapshot.setTopicName(topic.getName());
+            return writer.deleteAsync(snapshot);
+        }).thenCompose(__ -> CompletableFuture.completedFuture(null));
+    }
+
+    @Override
     public CompletableFuture<Void> closeAsync() {
         changeToCloseState();
         return this.takeSnapshotWriter.thenCompose(SystemTopicClient.Writer::closeAsync);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
index 4b50e55..ff18924 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
@@ -69,6 +69,11 @@ public class TransactionBufferDisable implements TransactionBuffer {
     }
 
     @Override
+    public CompletableFuture<Void> clearSnapshot() {
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
     public CompletableFuture<Void> closeAsync() {
         return CompletableFuture.completedFuture(null);
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index d79d3c8..956b86e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Sets;
 
 import java.io.IOException;
 import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.util.NavigableMap;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -33,9 +34,11 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.commons.collections4.map.LinkedMap;
+import org.apache.commons.lang3.RandomUtils;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
 import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
 import org.apache.pulsar.client.api.Consumer;
@@ -50,11 +53,11 @@ import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
+import org.apache.pulsar.common.events.EventType;
 import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.awaitility.Awaitility;
@@ -403,4 +406,71 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
         }
         assertTrue(exist);
     }
+
+    @Test
+    public void clearTransactionBufferSnapshotTest() throws Exception {
+        String topic = NAMESPACE1 + "/tb-snapshot-delete-" + RandomUtils.nextInt();
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient
+                .newProducer()
+                .topic(topic)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .create();
+
+        Transaction txn = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.SECONDS)
+                .build().get();
+        producer.newMessage(txn).value("test".getBytes()).sendAsync();
+        producer.newMessage(txn).value("test".getBytes()).sendAsync();
+        txn.commit().get();
+
+        // take snapshot
+        PersistentTopic originalTopic = (PersistentTopic) getPulsarServiceList().get(0)
+                .getBrokerService().getTopic(TopicName.get(topic).toString(), false).get().get();
+        TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer) originalTopic.getTransactionBuffer();
+        Method takeSnapshotMethod = TopicTransactionBuffer.class.getDeclaredMethod("takeSnapshot");
+        takeSnapshotMethod.setAccessible(true);
+        takeSnapshotMethod.invoke(topicTransactionBuffer);
+
+        TopicName transactionBufferTopicName =
+                NamespaceEventsSystemTopicFactory.getSystemTopicName(
+                        TopicName.get(topic).getNamespaceObject(), EventType.TRANSACTION_BUFFER_SNAPSHOT);
+        PersistentTopic snapshotTopic = (PersistentTopic) getPulsarServiceList().get(0)
+                .getBrokerService().getTopic(transactionBufferTopicName.toString(), false).get().get();
+        Field field = PersistentTopic.class.getDeclaredField("currentCompaction");
+        field.setAccessible(true);
+
+        // Trigger compaction and make sure it is finished.
+        checkSnapshotCount(transactionBufferTopicName, true, snapshotTopic, field);
+        admin.topics().delete(topic, true);
+        checkSnapshotCount(transactionBufferTopicName, false, snapshotTopic, field);
+    }
+
+    private void checkSnapshotCount(TopicName topicName, boolean hasSnapshot,
+                                    PersistentTopic persistentTopic, Field field) throws Exception {
+        persistentTopic.triggerCompaction();
+        CompletableFuture<Long> compactionFuture = (CompletableFuture<Long>) field.get(persistentTopic);
+        Awaitility.await().untilAsserted(() -> assertTrue(compactionFuture.isDone()));
+
+        Reader<TransactionBufferSnapshot> reader = pulsarClient.newReader(Schema.AVRO(TransactionBufferSnapshot.class))
+                .readCompacted(true)
+                .startMessageId(MessageId.earliest)
+                .startMessageIdInclusive()
+                .topic(topicName.toString())
+                .create();
+
+        int count = 0;
+        while (true) {
+            Message<TransactionBufferSnapshot> snapshotMsg = reader.readNext(2, TimeUnit.SECONDS);
+            if (snapshotMsg != null) {
+                count++;
+            } else {
+                break;
+            }
+        }
+        assertTrue(hasSnapshot ? count > 0 : count == 0);
+        reader.close();
+    }
+
 }