You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/08/02 12:55:28 UTC
[pulsar] branch master updated: [Transaction][buffer] Add basic
operation of transaction (#4738)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e7195eb [Transaction][buffer] Add basic operation of transaction (#4738)
e7195eb is described below
commit e7195ebf5c0bcc7a3dda1446e78f2284e27d74bf
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Fri Aug 2 20:55:22 2019 +0800
[Transaction][buffer] Add basic operation of transaction (#4738)
*Modifications*
Add primary operation of transaction. Keep all actions persistently.
Describe the modifications you've done.
- add commit operation
- add abort operation
- add openreader operation
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 2 +-
.../mledger/impl/ReadOnlyManagedLedgerImpl.java | 2 +-
.../broker/service/persistent/PersistentTopic.java | 2 +-
pulsar-transaction/buffer/pom.xml | 44 ++
.../transaction/buffer/TransactionBuffer.java | 14 +-
.../transaction/buffer/TransactionCursor.java | 73 ++
.../pulsar/transaction/buffer/TransactionMeta.java | 58 ++
.../NoTxnsCommittedAtLedgerException.java} | 35 +-
.../buffer/impl/InMemTransactionBuffer.java | 50 +-
.../buffer/impl/PersistentTransactionBuffer.java | 268 ++++++++
.../impl/PersistentTransactionBufferReader.java | 134 ++++
.../buffer/impl/TransactionCursorImpl.java | 126 ++++
.../buffer/impl/TransactionMetaImpl.java | 163 +++++
.../impl/PersistentTransactionBufferTest.java | 732 +++++++++++++++++++++
.../buffer/impl/TransactionBufferTest.java | 2 +-
15 files changed, 1663 insertions(+), 42 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 78d1816..0555936 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1465,7 +1465,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
}
}
- void asyncReadEntry(PositionImpl position, ReadEntryCallback callback, Object ctx) {
+ public void asyncReadEntry(PositionImpl position, ReadEntryCallback callback, Object ctx) {
LedgerHandle currentLedger = this.currentLedger;
if (log.isDebugEnabled()) {
log.debug("[{}] Reading entry ledger {}: {}", name, position.getLedgerId(), position.getEntryId());
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
index 36376b8..9721b15 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
@@ -142,7 +142,7 @@ public class ReadOnlyManagedLedgerImpl extends ManagedLedgerImpl {
}
@Override
- void asyncReadEntry(PositionImpl position, AsyncCallbacks.ReadEntryCallback callback, Object ctx) {
+ public void asyncReadEntry(PositionImpl position, AsyncCallbacks.ReadEntryCallback callback, Object ctx) {
this.getLedgerHandle(position.getLedgerId()).thenAccept((ledger) -> {
asyncReadEntry(ledger, position, callback, ctx);
}).exceptionally((ex) -> {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 2a51378..a14e296 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -125,7 +125,7 @@ import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCallback {
// Managed ledger associated with the topic
- private final ManagedLedger ledger;
+ protected final ManagedLedger ledger;
// Subscriptions to this topic
private final ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions;
diff --git a/pulsar-transaction/buffer/pom.xml b/pulsar-transaction/buffer/pom.xml
index b13cf43..9605ec1 100644
--- a/pulsar-transaction/buffer/pom.xml
+++ b/pulsar-transaction/buffer/pom.xml
@@ -52,6 +52,50 @@
<artifactId>pulsar-common</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-broker</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>managed-ledger-original</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>managed-ledger-original</artifactId>
+ <version>${project.parent.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ <version>${protobuf-maven-plugin.version}</version>
+ <configuration>
+ <protocArtifact>com.google.protobuf:protoc:${protoc3.version}:exe:${os.detected.classifier}</protocArtifact>
+ <checkStaleness>true</checkStaleness>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>compile</goal>
+ <goal>test-compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
\ No newline at end of file
diff --git a/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/TransactionBuffer.java b/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/TransactionBuffer.java
index 7d0bcc4..f106f1c 100644
--- a/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/TransactionBuffer.java
+++ b/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/TransactionBuffer.java
@@ -45,7 +45,7 @@ import org.apache.pulsar.transaction.impl.common.TxnID;
* commits the buffer again.
*/
@Beta
-public interface TransactionBuffer extends AutoCloseable {
+public interface TransactionBuffer {
/**
* Return the metadata of a transaction in the buffer.
@@ -95,9 +95,7 @@ public interface TransactionBuffer extends AutoCloseable {
* @throws org.apache.pulsar.transaction.buffer.exceptions.TransactionNotFoundException if the transaction
* is not in the buffer.
*/
- CompletableFuture<Void> commitTxn(TxnID txnID,
- long committedAtLedgerId,
- long committedAtEntryId);
+ CompletableFuture<Void> commitTxn(TxnID txnID, long committedAtLedgerId, long committedAtEntryId);
/**
* Abort the transaction and all the entries of this transaction will
@@ -123,9 +121,9 @@ public interface TransactionBuffer extends AutoCloseable {
CompletableFuture<Void> purgeTxns(List<Long> dataLedgers);
/**
- * {@inheritDoc}
+ * Close the buffer asynchronously.
+ *
+ * @return
*/
- @Override
- void close();
-
+ CompletableFuture<Void> closeAsync();
}
diff --git a/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/TransactionCursor.java b/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/TransactionCursor.java
new file mode 100644
index 0000000..a7dc070
--- /dev/null
+++ b/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/TransactionCursor.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.buffer;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+
+/**
+ * The transaction Cursor maintains the index of all transactions.
+ */
+public interface TransactionCursor {
+ /**
+ * Get the specified transaction meta.
+ *
+ * @param txnID
+ * @param createIfNotExist
+ * @return
+ */
+ CompletableFuture<TransactionMeta> getTxnMeta(TxnID txnID, boolean createIfNotExist);
+
+ /**
+ * Commit transaction.
+ *
+ * @param committedLedgerId the ledger which txn committed at.
+ * @param committedEntryId the entry which txn committed at.
+ * @param txnID the id which txn committed.
+ * @param position the commit position at transaction log.
+ * @return
+ */
+ CompletableFuture<Void> commitTxn(long committedLedgerId, long committedEntryId, TxnID txnID, Position position);
+
+ /**
+ * Abort transaction.
+ *
+ * @param txnID aborted transaction id.
+ * @return
+ */
+ CompletableFuture<Void> abortTxn(TxnID txnID);
+
+ /**
+ * Get all the transaction id on the specified ledger.
+ *
+ * @param ledgerId the transaction committed ledger id
+ * @return
+ */
+ CompletableFuture<Set<TxnID>> getAllTxnsCommittedAtLedger(long ledgerId);
+
+ /**
+ * Remove the transactions on the specified ledger.
+ *
+ * @param ledgerId the remove transaction id
+ * @return
+ */
+ CompletableFuture<Void> removeTxnsCommittedAtLedger(long ledgerId);
+}
diff --git a/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/TransactionMeta.java b/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/TransactionMeta.java
index 8771c2b..2b929d2 100644
--- a/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/TransactionMeta.java
+++ b/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/TransactionMeta.java
@@ -19,6 +19,9 @@
package org.apache.pulsar.transaction.buffer;
import com.google.common.annotations.Beta;
+import java.util.SortedMap;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.transaction.impl.common.TxnID;
import org.apache.pulsar.transaction.impl.common.TxnStatus;
@@ -49,4 +52,59 @@ public interface TransactionMeta {
*/
int numEntries();
+ /**
+ * Return the committed ledger id at data ledger.
+ *
+ * @return the committed ledger id
+ */
+ long committedAtLedgerId();
+
+ /**
+ * Return the committed entry id at data ledger.
+ *
+ * @return the committed entry id
+ */
+ long committedAtEntryId();
+
+ /**
+ * Return the last sequence id.
+ *
+ * @return the last sequence id
+ */
+ long lastSequenceId();
+
+ /**
+ * Read the entries from start sequence id.
+ *
+ * @param num the entries number need to read
+ * @param startSequenceId the start position of the entries
+ * @return
+ */
+ CompletableFuture<SortedMap<Long, Position>> readEntries(int num, long startSequenceId);
+
+ /**
+ * Add transaction entry into the transaction.
+ *
+ * @param sequenceId the message sequence id
+ * @param position the position of transaction log
+ * @return
+ */
+ CompletableFuture<Void> appendEntry(long sequenceId, Position position);
+
+ /**
+ * Mark the transaction is committed.
+ *
+ * @param committedAtLedgerId
+ * @param committedAtEntryId
+ * @return
+ */
+ CompletableFuture<TransactionMeta> commitTxn(long committedAtLedgerId, long committedAtEntryId);
+
+ /**
+ * Mark the transaction is aborted.
+ *
+ * @return
+ */
+ CompletableFuture<TransactionMeta> abortTxn();
+
}
diff --git a/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/TransactionMeta.java b/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/exceptions/NoTxnsCommittedAtLedgerException.java
similarity index 53%
copy from pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/TransactionMeta.java
copy to pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/exceptions/NoTxnsCommittedAtLedgerException.java
index 8771c2b..74f950a 100644
--- a/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/TransactionMeta.java
+++ b/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/exceptions/NoTxnsCommittedAtLedgerException.java
@@ -16,37 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.transaction.buffer;
-
-import com.google.common.annotations.Beta;
-import org.apache.pulsar.transaction.impl.common.TxnID;
-import org.apache.pulsar.transaction.impl.common.TxnStatus;
+package org.apache.pulsar.transaction.buffer.exceptions;
/**
- * The metadata for the transaction in the transaction buffer.
+ * Exception is thrown when no transactions found committed at a given ledger.
*/
-@Beta
-public interface TransactionMeta {
-
- /**
- * Returns the transaction id.
- *
- * @return the transaction id
- */
- TxnID id();
-
- /**
- * Return the status of the transaction.
- *
- * @return the status of the transaction
- */
- TxnStatus status();
+public class NoTxnsCommittedAtLedgerException extends TransactionBufferException {
- /**
- * Return the number of entries appended to the transaction.
- *
- * @return the number of entries
- */
- int numEntries();
+ private static final long serialVersionUID = 0L;
+ public NoTxnsCommittedAtLedgerException(String message) {
+ super(message);
+ }
}
diff --git a/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/impl/InMemTransactionBuffer.java b/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/impl/InMemTransactionBuffer.java
index 93a3e2e..1ce8234 100644
--- a/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/impl/InMemTransactionBuffer.java
+++ b/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/impl/InMemTransactionBuffer.java
@@ -30,6 +30,8 @@ import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.transaction.buffer.TransactionBufferReader;
import org.apache.pulsar.transaction.buffer.TransactionMeta;
@@ -79,6 +81,49 @@ class InMemTransactionBuffer implements TransactionBuffer {
}
}
+ @Override
+ public long committedAtLedgerId() {
+ return committedAtLedgerId;
+ }
+
+ @Override
+ public long committedAtEntryId() {
+ return committedAtEntryId;
+ }
+
+ @Override
+ public long lastSequenceId() {
+ return entries.lastKey();
+ }
+
+ @Override
+ public CompletableFuture<SortedMap<Long, Position>> readEntries(int num, long startSequenceId) {
+ return FutureUtil.failedFuture(new UnsupportedOperationException());
+ }
+
+ @Override
+ public CompletableFuture<Void> appendEntry(long sequenceId, Position position) {
+ return FutureUtil.failedFuture(new UnsupportedOperationException());
+ }
+
+ @Override
+ public CompletableFuture<TransactionMeta> commitTxn(long committedAtLedgerId, long committedAtEntryId) {
+ try {
+ return CompletableFuture.completedFuture(commitAt(committedAtLedgerId, committedAtEntryId));
+ } catch (UnexpectedTxnStatusException e) {
+ return FutureUtil.failedFuture(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<TransactionMeta> abortTxn() {
+ try {
+ return CompletableFuture.completedFuture(abort());
+ } catch (UnexpectedTxnStatusException e) {
+ return FutureUtil.failedFuture(e);
+ }
+ }
+
synchronized TxnBuffer abort() throws UnexpectedTxnStatusException {
if (TxnStatus.OPEN != status) {
throw new UnexpectedTxnStatusException(txnid, TxnStatus.OPEN, status);
@@ -150,7 +195,6 @@ class InMemTransactionBuffer implements TransactionBuffer {
final ConcurrentMap<TxnID, TxnBuffer> buffers;
final Map<Long, Set<TxnID>> txnIndex;
-
public InMemTransactionBuffer() {
this.buffers = new ConcurrentHashMap<>();
this.txnIndex = new HashMap<>();
@@ -285,7 +329,9 @@ class InMemTransactionBuffer implements TransactionBuffer {
}
@Override
- public void close() {
+ public CompletableFuture<Void> closeAsync() {
buffers.values().forEach(TxnBuffer::close);
+ return CompletableFuture.completedFuture(null);
}
+
}
diff --git a/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/impl/PersistentTransactionBuffer.java b/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/impl/PersistentTransactionBuffer.java
new file mode 100644
index 0000000..f0513cd
--- /dev/null
+++ b/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/impl/PersistentTransactionBuffer.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.buffer.impl;
+
+import io.netty.buffer.ByteBuf;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import lombok.Builder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData;
+import org.apache.pulsar.common.protocol.Markers;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.transaction.buffer.TransactionBuffer;
+import org.apache.pulsar.transaction.buffer.TransactionBufferReader;
+import org.apache.pulsar.transaction.buffer.TransactionCursor;
+import org.apache.pulsar.transaction.buffer.TransactionMeta;
+import org.apache.pulsar.transaction.buffer.exceptions.TransactionNotSealedException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+
+/**
+ * A persistent transaction buffer implementation.
+ */
+@Slf4j
+public class PersistentTransactionBuffer extends PersistentTopic implements TransactionBuffer {
+
+ private TransactionCursor txnCursor;
+ private ManagedCursor retentionCursor;
+
+
+ abstract static class TxnCtx implements PublishContext {
+ private final long sequenceId;
+ private final CompletableFuture<Position> completableFuture;
+ private final String producerName;
+
+ TxnCtx(String producerName, long sequenceId, CompletableFuture<Position> future) {
+ this.sequenceId = sequenceId;
+ this.completableFuture = future;
+ this.producerName = producerName;
+ }
+
+
+
+ @Override
+ public String getProducerName() {
+ return this.producerName;
+ }
+
+ @Override
+ public long getSequenceId() {
+ return this.sequenceId;
+ }
+ }
+
+ public PersistentTransactionBuffer(String topic, ManagedLedger ledger, BrokerService brokerService)
+ throws BrokerServiceException.NamingException, ManagedLedgerException {
+ super(topic, ledger, brokerService);
+ this.txnCursor = new TransactionCursorImpl();
+ this.retentionCursor = ledger.newNonDurableCursor(PositionImpl.earliest);
+ }
+
+ @Override
+ public CompletableFuture<TransactionMeta> getTransactionMeta(TxnID txnID) {
+ return txnCursor.getTxnMeta(txnID, false);
+ }
+
+ @Override
+ public CompletableFuture<Void> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) {
+ return publishMessage(txnId, buffer, sequenceId).thenCompose(position -> appendBuffer(txnId, position,
+ sequenceId));
+ }
+
+ private CompletableFuture<Void> appendBuffer(TxnID txnID, Position position, long sequenceId) {
+ return txnCursor.getTxnMeta(txnID, true).thenCompose(meta -> meta.appendEntry(sequenceId, position));
+ }
+
+ @Override
+ public CompletableFuture<TransactionBufferReader> openTransactionBufferReader(TxnID txnID, long startSequenceId) {
+ return txnCursor.getTxnMeta(txnID, false).thenCompose(this::createNewReader);
+ }
+
+ private CompletableFuture<TransactionBufferReader> createNewReader(TransactionMeta meta) {
+ CompletableFuture<TransactionBufferReader> createReaderFuture = new CompletableFuture<>();
+
+ try {
+ PersistentTransactionBufferReader reader = new PersistentTransactionBufferReader(meta, ledger);
+ createReaderFuture.complete(reader);
+ } catch (TransactionNotSealedException e) {
+ createReaderFuture.completeExceptionally(e);
+ }
+
+ return createReaderFuture;
+ }
+
+ @Builder
+ final static class Marker {
+ long sequenceId;
+ ByteBuf marker;
+ }
+
+ @Override
+ public CompletableFuture<Void> commitTxn(TxnID txnID, long committedAtLedgerId, long committedAtEntryId) {
+ return txnCursor.getTxnMeta(txnID, false)
+ .thenApply(meta -> createCommitMarker(meta, committedAtLedgerId, committedAtEntryId))
+ .thenCompose(marker -> publishMessage(txnID, marker.marker, marker.sequenceId))
+ .thenCompose(position -> txnCursor.commitTxn(committedAtLedgerId, committedAtEntryId, txnID,
+ position));
+ }
+
+ private Marker createCommitMarker(TransactionMeta meta, long committedAtLedgerId, long committedAtEntryId) {
+ if (log.isDebugEnabled()) {
+ log.debug("Transaction {} create a commit marker", meta.id());
+ }
+ long sequenceId = meta.lastSequenceId() + 1;
+ MessageIdData messageIdData = MessageIdData.newBuilder()
+ .setLedgerId(committedAtLedgerId)
+ .setEntryId(committedAtEntryId)
+ .build();
+ ByteBuf commitMarker = Markers.newTxnCommitMarker(sequenceId, meta.id().getMostSigBits(),
+ meta.id().getLeastSigBits(), messageIdData);
+ Marker marker = Marker.builder().sequenceId(sequenceId).marker(commitMarker).build();
+ return marker;
+ }
+
+ @Override
+ public CompletableFuture<Void> abortTxn(TxnID txnID) {
+ return txnCursor.getTxnMeta(txnID, false)
+ .thenApply(meta -> createAbortMarker(meta))
+ .thenCompose(marker -> publishMessage(txnID, marker.marker, marker.sequenceId))
+ .thenCompose(position -> txnCursor.abortTxn(txnID));
+ }
+
+ private Marker createAbortMarker(TransactionMeta meta) {
+ if (log.isDebugEnabled()) {
+ log.debug("Transaction {} create a abort marker", meta.id());
+ }
+ long sequenceId = meta.lastSequenceId() + 1;
+ ByteBuf abortMarker = Markers.newTxnAbortMarker(sequenceId, meta.id().getMostSigBits(),
+ meta.id().getLeastSigBits());
+ Marker marker = Marker.builder().sequenceId(sequenceId).marker(abortMarker).build();
+ return marker;
+ }
+
+
+ private CompletableFuture<Position> publishMessage(TxnID txnID, ByteBuf msg, long sequenceId) {
+ CompletableFuture<Position> publishFuture = new CompletableFuture<>();
+ publishMessage(msg, new TxnCtx(txnID.toString(), sequenceId, publishFuture) {
+ @Override
+ public void completed(Exception e, long ledgerId, long entryId) {
+ if (e != null) {
+ publishFuture.completeExceptionally(e);
+ } else {
+ publishFuture.complete(PositionImpl.get(ledgerId, entryId));
+ }
+ }
+ });
+ return publishFuture;
+ }
+
+ @Override
+ public CompletableFuture<Void> purgeTxns(List<Long> dataLedgers) {
+ if (log.isDebugEnabled()) {
+ log.debug("Begin to purge the ledgers {}", dataLedgers);
+ }
+
+ List<CompletableFuture<Void>> futures = dataLedgers.stream().map(dataLedger -> cleanTxnsOnLedger(dataLedger))
+ .collect(Collectors.toList());
+ return FutureUtil.waitForAll(futures).thenCompose(v -> removeCommittedLedgerFromIndex(dataLedgers));
+ }
+
+ private CompletableFuture<Void> removeCommittedLedgerFromIndex(List<Long> dataLedgers) {
+ List<CompletableFuture<Void>> removeFutures = dataLedgers.stream().map(
+ dataLedger -> txnCursor.removeTxnsCommittedAtLedger(dataLedger)).collect(Collectors.toList());
+ return FutureUtil.waitForAll(removeFutures);
+ }
+
+ private CompletableFuture<Void> cleanTxnsOnLedger(long dataledger) {
+ if (log.isDebugEnabled()) {
+ log.debug("Start to clean ledger {}", dataledger);
+ }
+ return txnCursor.getAllTxnsCommittedAtLedger(dataledger).thenCompose(txnIDS -> deleteTxns(txnIDS));
+ }
+
+ private CompletableFuture<Void> deleteTxns(Set<TxnID> txnIDS) {
+ if (log.isDebugEnabled()) {
+ log.debug("Start delete txns {} under ledger", txnIDS);
+ }
+ List<CompletableFuture<Void>> futures = txnIDS.stream().map(txnID -> deleteTxn(txnID))
+ .collect(Collectors.toList());
+ return FutureUtil.waitForAll(futures);
+ }
+
+ private CompletableFuture<Void> deleteTxn(TxnID txnID) {
+ if (log.isDebugEnabled()) {
+ log.debug("Start to delete txn {} entries", txnID);
+ }
+ return txnCursor.getTxnMeta(txnID, false)
+ .thenCompose(meta -> meta.readEntries(meta.numEntries(), -1L))
+ .thenCompose(longPositionSortedMap -> deleteEntries(longPositionSortedMap, txnID));
+ }
+
+ private CompletableFuture<Void> deleteEntries(SortedMap<Long, Position> entriesMap, TxnID txnID) {
+ if (log.isDebugEnabled()) {
+ log.debug("Delete entries {}", entriesMap);
+ }
+ List<CompletableFuture<Void>> deleteFutures = entriesMap.values().stream()
+ .map(position -> asyncDeletePosition(position, txnID))
+ .collect(Collectors.toList());
+
+ return FutureUtil.waitForAll(deleteFutures);
+ }
+
+ private CompletableFuture<Void> asyncDeletePosition(Position position, TxnID txnID) {
+ if (log.isDebugEnabled()) {
+ log.debug("Ready to delete position {} for txn {}", position, txnID);
+ }
+ CompletableFuture<Void> deleteFuture = new CompletableFuture<>();
+ retentionCursor.asyncMarkDelete(position, new AsyncCallbacks.MarkDeleteCallback() {
+ @Override
+ public void markDeleteComplete(Object ctx) {
+ if (log.isDebugEnabled()) {
+ log.debug("Success delete transaction `{}` entry on position {}", txnID, position);
+ }
+ deleteFuture.complete(null);
+ }
+
+ @Override
+ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
+ log.error("Failed delete transaction `{}` entry on position {}", txnID, position, exception);
+ deleteFuture.completeExceptionally(exception);
+ }
+ }, null);
+
+ return deleteFuture;
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ return FutureUtil.failedFuture(new UnsupportedOperationException());
+ }
+}
diff --git a/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/impl/PersistentTransactionBufferReader.java b/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/impl/PersistentTransactionBufferReader.java
new file mode 100644
index 0000000..4806c9e
--- /dev/null
+++ b/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/impl/PersistentTransactionBufferReader.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.buffer.impl;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.transaction.buffer.TransactionBufferReader;
+import org.apache.pulsar.transaction.buffer.TransactionEntry;
+import org.apache.pulsar.transaction.buffer.TransactionMeta;
+import org.apache.pulsar.transaction.buffer.exceptions.TransactionNotSealedException;
+import org.apache.pulsar.transaction.impl.common.TxnStatus;
+
+/**
+ * A persistent transaction buffer reader implementation.
+ */
+@Slf4j
+public class PersistentTransactionBufferReader implements TransactionBufferReader {
+
+ static final long DEFAULT_START_SEQUENCE_ID = -1L;
+
+ private final ManagedLedger ledger;
+ private final TransactionMeta meta;
+ private volatile long currentSequenceId = DEFAULT_START_SEQUENCE_ID;
+
+
+ PersistentTransactionBufferReader(TransactionMeta meta, ManagedLedger ledger)
+ throws TransactionNotSealedException {
+ if (TxnStatus.OPEN == meta.status()) {
+ throw new TransactionNotSealedException("Transaction `" + meta.id() + "` is not sealed yet");
+ }
+ this.meta = meta;
+ this.ledger = ledger;
+ }
+
+ @Override
+ public CompletableFuture<List<TransactionEntry>> readNext(int numEntries) {
+ return meta.readEntries(numEntries, currentSequenceId)
+ .thenCompose(this::readEntry)
+ .thenApply(entries -> entries.stream()
+ .sorted(Comparator.comparingLong(entry -> entry.sequenceId()))
+ .collect(Collectors.toList()));
+ }
+
+ private CompletableFuture<List<TransactionEntry>> readEntry(SortedMap<Long, Position> entries) {
+ CompletableFuture<List<TransactionEntry>> readFuture = new CompletableFuture<>();
+ List<TransactionEntry> txnEntries = new ArrayList<>(entries.size());
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+ for (Map.Entry<Long, Position> longPositionEntry : entries.entrySet()) {
+ CompletableFuture<Void> tmpFuture = new CompletableFuture<>();
+ readEntry(longPositionEntry.getValue()).whenComplete((entry, throwable) -> {
+ if (null != throwable) {
+ tmpFuture.completeExceptionally(throwable);
+ } else {
+ TransactionEntry txnEntry = new TransactionEntryImpl(meta.id(), longPositionEntry.getKey(),
+ entry.getDataBuffer(),
+ meta.committedAtLedgerId(),
+ meta.committedAtEntryId());
+ synchronized (txnEntries) {
+ txnEntries.add(txnEntry);
+ }
+ tmpFuture.complete(null);
+ }
+ });
+ futures.add(tmpFuture);
+ }
+
+ FutureUtil.waitForAll(futures).whenComplete((ignore, error) -> {
+ if (error != null) {
+ readFuture.completeExceptionally(error);
+ } else {
+ currentSequenceId = entries.lastKey();
+ readFuture.complete(txnEntries);
+ }
+ });
+
+ return readFuture;
+ }
+
+ private CompletableFuture<Entry> readEntry(Position position) {
+ CompletableFuture<Entry> readFuture = new CompletableFuture<>();
+
+ ManagedLedgerImpl readLedger = (ManagedLedgerImpl) ledger;
+
+ readLedger.asyncReadEntry((PositionImpl) position, new AsyncCallbacks.ReadEntryCallback() {
+ @Override
+ public void readEntryComplete(Entry entry, Object ctx) {
+ readFuture.complete(entry);
+ }
+
+ @Override
+ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+ readFuture.completeExceptionally(exception);
+ }
+ }, null);
+
+ return readFuture;
+ }
+
+ @Override
+ public void close() {
+ log.info("Txn {} reader closed.", meta.id());
+ }
+}
diff --git a/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/impl/TransactionCursorImpl.java b/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/impl/TransactionCursorImpl.java
new file mode 100644
index 0000000..65431fa
--- /dev/null
+++ b/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/impl/TransactionCursorImpl.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.buffer.impl;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.transaction.buffer.TransactionCursor;
+import org.apache.pulsar.transaction.buffer.TransactionMeta;
+import org.apache.pulsar.transaction.buffer.exceptions.NoTxnsCommittedAtLedgerException;
+import org.apache.pulsar.transaction.buffer.exceptions.TransactionNotFoundException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+
+public class TransactionCursorImpl implements TransactionCursor {
+
+
+ private final ConcurrentMap<TxnID, TransactionMetaImpl> txnIndex;
+ private final Map<Long, Set<TxnID>> committedLedgerTxnIndex;
+
+ TransactionCursorImpl() {
+ this.txnIndex = new ConcurrentHashMap<>();
+ this.committedLedgerTxnIndex = new TreeMap<>();
+ }
+
+ @Override
+ public CompletableFuture<TransactionMeta> getTxnMeta(TxnID txnID, boolean createIfNotExist) {
+ CompletableFuture<TransactionMeta> getFuture = new CompletableFuture<>();
+ TransactionMeta meta = txnIndex.get(txnID);
+ if (null == meta) {
+ if (!createIfNotExist) {
+ getFuture.completeExceptionally(
+ new TransactionNotFoundException("Transaction `" + txnID + "` doesn't" + " exist"));
+ return getFuture;
+ }
+
+ TransactionMetaImpl newMeta = new TransactionMetaImpl(txnID);
+ TransactionMeta oldMeta = txnIndex.putIfAbsent(txnID, newMeta);
+ if (null != oldMeta) {
+ meta = oldMeta;
+ } else {
+ meta = newMeta;
+ }
+ }
+ getFuture.complete(meta);
+
+ return getFuture;
+ }
+
+ @Override
+ public CompletableFuture<Void> commitTxn(long committedLedgerId, long committedEntryId, TxnID txnID,
+ Position position) {
+ return getTxnMeta(txnID, false)
+ .thenCompose(meta -> meta.commitTxn(committedLedgerId, committedEntryId))
+ .thenAccept(meta -> addTxnToCommittedIndex(txnID, committedLedgerId));
+ }
+
+ private void addTxnToCommittedIndex(TxnID txnID, long committedAtLedgerId) {
+ synchronized (committedLedgerTxnIndex) {
+ committedLedgerTxnIndex.computeIfAbsent(committedAtLedgerId, ledgerId -> new HashSet<>()).add(txnID);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> abortTxn(TxnID txnID) {
+ return getTxnMeta(txnID, false)
+ .thenCompose(meta -> meta.abortTxn())
+ .thenApply(meta -> null);
+ }
+
+ public CompletableFuture<Set<TxnID>> getAllTxnsCommittedAtLedger(long ledgerId) {
+ CompletableFuture<Set<TxnID>> removeFuture = new CompletableFuture<>();
+
+ Set<TxnID> txnIDS = committedLedgerTxnIndex.get(ledgerId);
+
+ if (null == txnIDS) {
+ removeFuture.completeExceptionally(new NoTxnsCommittedAtLedgerException(
+ "Transaction committed ledger id `" + ledgerId + "` doesn't exist") {
+ });
+ return removeFuture;
+ }
+
+ removeFuture.complete(txnIDS);
+ return removeFuture;
+ }
+
+ @Override
+ public CompletableFuture<Void> removeTxnsCommittedAtLedger(long ledgerId) {
+ CompletableFuture<Void> removeFuture = new CompletableFuture<>();
+
+ synchronized (committedLedgerTxnIndex) {
+ Set<TxnID> txnIDS = committedLedgerTxnIndex.remove(ledgerId);
+ if (null == txnIDS) {
+ removeFuture.completeExceptionally(new NoTxnsCommittedAtLedgerException(
+ "Transaction committed ledger id `" + ledgerId + "` doesn't exist"));
+ } else {
+ txnIDS.forEach(txnID -> {
+ txnIndex.remove(txnID);
+ });
+ removeFuture.complete(null);
+ }
+ }
+
+ return removeFuture;
+ }
+}
diff --git a/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/impl/TransactionMetaImpl.java b/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/impl/TransactionMetaImpl.java
new file mode 100644
index 0000000..9325d04
--- /dev/null
+++ b/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/impl/TransactionMetaImpl.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.buffer.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.transaction.buffer.TransactionMeta;
+import org.apache.pulsar.transaction.buffer.exceptions.EndOfTransactionException;
+import org.apache.pulsar.transaction.buffer.exceptions.TransactionSealedException;
+import org.apache.pulsar.transaction.buffer.exceptions.UnexpectedTxnStatusException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+import org.apache.pulsar.transaction.impl.common.TxnStatus;
+
+public class TransactionMetaImpl implements TransactionMeta {
+
+ private final TxnID txnID;
+ private SortedMap<Long, Position> entries;
+ private TxnStatus txnStatus;
+ private long committedAtLedgerId = -1L;
+ private long committedAtEntryId = -1L;
+
+ TransactionMetaImpl(TxnID txnID) {
+ this.txnID = txnID;
+ this.entries = new TreeMap<>();
+ this.txnStatus = TxnStatus.OPEN;
+ }
+
+ @Override
+ public TxnID id() {
+ return this.txnID;
+ }
+
+ @Override
+ public synchronized TxnStatus status() {
+ return this.txnStatus;
+ }
+
+ @Override
+ public int numEntries() {
+ synchronized (entries) {
+ return entries.size();
+ }
+ }
+
+ @VisibleForTesting
+ public SortedMap<Long, Position> getEntries() {
+ return entries;
+ }
+
+ @Override
+ public long committedAtLedgerId() {
+ return committedAtLedgerId;
+ }
+
+ @Override
+ public long committedAtEntryId() {
+ return committedAtEntryId;
+ }
+
+ @Override
+ public long lastSequenceId() {
+ return entries.lastKey();
+ }
+
+ @Override
+ public CompletableFuture<SortedMap<Long, Position>> readEntries(int num, long startSequenceId) {
+ CompletableFuture<SortedMap<Long, Position>> readFuture = new CompletableFuture<>();
+
+ SortedMap<Long, Position> result = new TreeMap<>();
+
+ SortedMap<Long, Position> readEntries = entries;
+ if (startSequenceId != PersistentTransactionBufferReader.DEFAULT_START_SEQUENCE_ID) {
+ readEntries = entries.tailMap(startSequenceId);
+ }
+
+ if (readEntries.isEmpty()) {
+ readFuture.completeExceptionally(
+ new EndOfTransactionException("No more entries found in transaction `" + txnID + "`"));
+ return readFuture;
+ }
+
+ for (Map.Entry<Long, Position> longPositionEntry : readEntries.entrySet()) {
+ result.put(longPositionEntry.getKey(), longPositionEntry.getValue());
+ }
+
+ readFuture.complete(result);
+
+ return readFuture;
+ }
+
+ @Override
+ public CompletableFuture<Void> appendEntry(long sequenceId, Position position) {
+ CompletableFuture<Void> appendFuture = new CompletableFuture<>();
+ synchronized (this) {
+ if (TxnStatus.OPEN != txnStatus) {
+ appendFuture.completeExceptionally(
+ new TransactionSealedException("Transaction `" + txnID + "` is " + "already sealed"));
+ return appendFuture;
+ }
+ }
+ synchronized (this.entries) {
+ this.entries.put(sequenceId, position);
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public synchronized CompletableFuture<TransactionMeta> commitTxn(long committedAtLedgerId,
+ long committedAtEntryId) {
+ CompletableFuture<TransactionMeta> commitFuture = new CompletableFuture<>();
+ if (!checkOpened(txnID, commitFuture)) {
+ return commitFuture;
+ }
+
+ this.committedAtLedgerId = committedAtLedgerId;
+ this.committedAtEntryId = committedAtEntryId;
+ this.txnStatus = TxnStatus.COMMITTED;
+ TransactionMeta meta = this;
+ commitFuture.complete(meta);
+ return commitFuture;
+ }
+
+ @Override
+ public synchronized CompletableFuture<TransactionMeta> abortTxn() {
+ CompletableFuture<TransactionMeta> abortFuture = new CompletableFuture<>();
+ if (!checkOpened(txnID, abortFuture)) {
+ return abortFuture;
+ }
+
+ this.txnStatus = TxnStatus.ABORTED;
+ abortFuture.complete(this);
+
+ return abortFuture;
+ }
+
+ private boolean checkOpened(TxnID txnID, CompletableFuture<TransactionMeta> future) {
+ if (TxnStatus.OPEN != txnStatus) {
+ future.completeExceptionally(new UnexpectedTxnStatusException(txnID, TxnStatus.OPEN, txnStatus));
+ return false;
+ }
+ return true;
+ }
+}
diff --git a/pulsar-transaction/buffer/src/test/java/org/apache/pulsar/transaction/buffer/impl/PersistentTransactionBufferTest.java b/pulsar-transaction/buffer/src/test/java/org/apache/pulsar/transaction/buffer/impl/PersistentTransactionBufferTest.java
new file mode 100644
index 0000000..f77f50c
--- /dev/null
+++ b/pulsar-transaction/buffer/src/test/java/org/apache/pulsar/transaction/buffer/impl/PersistentTransactionBufferTest.java
@@ -0,0 +1,732 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.buffer.impl;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.mockito.ArgumentMatchers.endsWith;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.matches;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+import lombok.Cleanup;
+import org.apache.bookkeeper.client.PulsarMockBookKeeper;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
+import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.Markers;
+import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.transaction.buffer.TransactionBufferReader;
+import org.apache.pulsar.transaction.buffer.TransactionEntry;
+import org.apache.pulsar.transaction.buffer.TransactionMeta;
+import org.apache.pulsar.transaction.buffer.exceptions.EndOfTransactionException;
+import org.apache.pulsar.transaction.buffer.exceptions.NoTxnsCommittedAtLedgerException;
+import org.apache.pulsar.transaction.buffer.exceptions.TransactionBufferException;
+import org.apache.pulsar.transaction.buffer.exceptions.TransactionNotFoundException;
+import org.apache.pulsar.transaction.buffer.exceptions.TransactionNotSealedException;
+import org.apache.pulsar.transaction.buffer.exceptions.UnexpectedTxnStatusException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+import org.apache.pulsar.transaction.impl.common.TxnStatus;
+import org.apache.pulsar.zookeeper.ZooKeeperCache;
+import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
+import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.MockZooKeeper;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.txn.Txn;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class PersistentTransactionBufferTest extends MockedBookKeeperTestCase {
+ private PulsarService pulsar;
+ private BrokerService brokerService;
+ private ManagedLedgerFactory mlFactoryMock;
+ private ServerCnx serverCnx;
+ private ManagedLedger ledgerMock;
+ private ManagedCursor cursorMock;
+ private ConfigurationCacheService configCacheService;
+
+ final String successTopicName = "persistent://prop/use/ns-abc/successTopic_txn";
+ private static final Logger log = LoggerFactory.getLogger(PersistentTransactionBufferTest.class);
+
+ @BeforeMethod
+ public void setup() throws Exception {
+ ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
+ pulsar = spy(new PulsarService(svcConfig));
+ doReturn(svcConfig).when(pulsar).getConfiguration();
+ doReturn(mock(Compactor.class)).when(pulsar).getCompactor();
+
+ mlFactoryMock = mock(ManagedLedgerFactory.class);
+ doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
+
+ ZooKeeper mockZk = createMockZooKeeper();
+ doReturn(mockZk).when(pulsar).getZkClient();
+ doReturn(createMockBookKeeper(mockZk, pulsar.getOrderedExecutor().chooseThread(0)))
+ .when(pulsar).getBookKeeperClient();
+
+ ZooKeeperCache cache = mock(ZooKeeperCache.class);
+ doReturn(30).when(cache).getZkOperationTimeoutSeconds();
+ doReturn(cache).when(pulsar).getLocalZkCache();
+
+ configCacheService = mock(ConfigurationCacheService.class);
+ @SuppressWarnings("unchecked")
+ ZooKeeperDataCache<Policies> zkDataCache = mock(ZooKeeperDataCache.class);
+ doReturn(zkDataCache).when(configCacheService).policiesCache();
+ doReturn(configCacheService).when(pulsar).getConfigurationCache();
+ doReturn(Optional.empty()).when(zkDataCache).get(anyString());
+
+ LocalZooKeeperCacheService zkCache = mock(LocalZooKeeperCacheService.class);
+ doReturn(CompletableFuture.completedFuture(Optional.empty())).when(zkDataCache).getAsync(any());
+ doReturn(zkDataCache).when(zkCache).policiesCache();
+ doReturn(configCacheService).when(pulsar).getConfigurationCache();
+ doReturn(zkCache).when(pulsar).getLocalZkCacheService();
+
+ brokerService = spy(new BrokerService(pulsar));
+ doReturn(brokerService).when(pulsar).getBrokerService();
+
+ serverCnx = spy(new ServerCnx(pulsar));
+ doReturn(true).when(serverCnx).isActive();
+ doReturn(true).when(serverCnx).isWritable();
+ doReturn(new InetSocketAddress("localhost", 1234)).when(serverCnx).clientAddress();
+
+ NamespaceService nsSvc = mock(NamespaceService.class);
+ doReturn(nsSvc).when(pulsar).getNamespaceService();
+ doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class));
+ doReturn(true).when(nsSvc).isServiceUnitActive(any(TopicName.class));
+
+ setupMLAsyncCallbackMocks();
+ }
+
+ public static MockZooKeeper createMockZooKeeper() throws Exception {
+ MockZooKeeper zk = MockZooKeeper.newInstance(MoreExecutors.newDirectExecutorService());
+ List<ACL> dummyAclList = new ArrayList<>(0);
+
+ ZkUtils.createFullPathOptimistic(zk, "/ledgers/available/192.168.1.1:" + 5000,
+ "".getBytes(ZookeeperClientFactoryImpl.ENCODING_SCHEME), dummyAclList, CreateMode.PERSISTENT);
+
+ zk.create("/ledgers/LAYOUT", "1\nflat:1".getBytes(ZookeeperClientFactoryImpl.ENCODING_SCHEME), dummyAclList,
+ CreateMode.PERSISTENT);
+ return zk;
+ }
+
+ public static NonClosableMockBookKeeper createMockBookKeeper(ZooKeeper zookeeper,
+ ExecutorService executor) throws Exception {
+ return spy(new NonClosableMockBookKeeper(zookeeper, executor));
+ }
+
+ public static class NonClosableMockBookKeeper extends PulsarMockBookKeeper {
+
+ public NonClosableMockBookKeeper(ZooKeeper zk, ExecutorService executor) throws Exception {
+ super(zk, executor);
+ }
+
+ @Override
+ public void close() {
+ // no-op
+ }
+
+ @Override
+ public void shutdown() {
+ // no-op
+ }
+
+ public void reallyShutdown() {
+ super.shutdown();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ void setupMLAsyncCallbackMocks()
+ throws BrokerServiceException.NamingException, ManagedLedgerException, InterruptedException {
+ ledgerMock = mock(ManagedLedger.class);
+ cursorMock = mock(ManagedCursor.class);
+ final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+
+ doReturn(new ArrayList<Object>()).when(ledgerMock).getCursors();
+ doReturn("mockCursor").when(cursorMock).getName();
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ return closeFuture.complete(null);
+ }
+ })
+
+ .when(cursorMock).asyncClose(new CloseCallback() {
+
+ @Override
+ public void closeComplete(Object ctx) {
+ log.info("[{}] Successfully closed cursor ledger", "mockCursor");
+ closeFuture.complete(null);
+ }
+
+ @Override
+ public void closeFailed(ManagedLedgerException exception, Object ctx) {
+ // isFenced.set(false);
+
+ log.error("Error closing cursor for subscription", exception);
+ closeFuture.completeExceptionally(new BrokerServiceException.PersistenceException(exception));
+ }
+ }, null);
+
+ // call openLedgerComplete with ledgerMock on ML factory asyncOpen
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ ((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
+ return null;
+ }
+ }).when(mlFactoryMock)
+ .asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class), any(OpenLedgerCallback.class), anyObject());
+
+ // call openLedgerFailed on ML factory asyncOpen
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ ((OpenLedgerCallback) invocationOnMock.getArguments()[2])
+ .openLedgerFailed(new ManagedLedgerException("Managed ledger failure"), null);
+ return null;
+ }
+ }).when(mlFactoryMock)
+ .asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class), any(OpenLedgerCallback.class), anyObject());
+
+ // call addComplete on ledger asyncAddEntry
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ ((AddEntryCallback) invocationOnMock.getArguments()[1])
+ .addComplete(new PositionImpl(1, 1), invocationOnMock.getArguments()[2]);
+ return null;
+ }
+ }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), any(AddEntryCallback.class), anyObject());
+
+ // call openCursorComplete on cursor asyncOpen
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ ((OpenCursorCallback) invocationOnMock.getArguments()[2]).openCursorComplete(cursorMock, null);
+ return null;
+ }
+ }).when(ledgerMock)
+ .asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(OpenCursorCallback.class), anyObject());
+
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ ((OpenCursorCallback) invocationOnMock.getArguments()[3]).openCursorComplete(cursorMock, null);
+ return null;
+ }
+ }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(Map.class),
+ any(OpenCursorCallback.class), anyObject());
+
+ // call deleteLedgerComplete on ledger asyncDelete
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ ((DeleteLedgerCallback) invocationOnMock.getArguments()[0]).deleteLedgerComplete(null);
+ return null;
+ }
+ }).when(ledgerMock).asyncDelete(any(DeleteLedgerCallback.class), anyObject());
+
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ ((DeleteCursorCallback) invocationOnMock.getArguments()[1]).deleteCursorComplete(null);
+ return null;
+ }
+ }).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"), any(DeleteCursorCallback.class), anyObject());
+
+ doAnswer((invokactionOnMock) -> {
+ ((MarkDeleteCallback) invokactionOnMock.getArguments()[2])
+ .markDeleteComplete(invokactionOnMock.getArguments()[3]);
+ return null;
+ }).when(cursorMock).asyncMarkDelete(anyObject(), anyObject(), any(MarkDeleteCallback.class), anyObject());
+
+ this.buffer = new PersistentTransactionBuffer(successTopicName, factory.open("hello"), brokerService);
+ }
+
+ @AfterMethod
+ public void teardown() throws Exception {
+ brokerService.getTopics().clear();
+ brokerService.close(); //to clear pulsarStats
+ try {
+ pulsar.close();
+ } catch (Exception e) {
+ log.warn("Failed to close pulsar service", e);
+ throw e;
+ }
+ }
+
+ private final TxnID txnID = new TxnID(1234L, 5678L);
+ private PersistentTransactionBuffer buffer;
+
+ @Test
+ public void testGetANonExistTxn() throws InterruptedException {
+ try {
+ buffer.getTransactionMeta(txnID).get();
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof TransactionNotFoundException);
+ }
+ }
+
+ @Test
+ public void testOpenReaderOnNonExistentTxn() throws InterruptedException{
+ try {
+ buffer.openTransactionBufferReader(txnID, 0L).get();
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof TransactionNotFoundException);
+ }
+ }
+
+ @Test
+ public void testOpenReadOnAnOpenTxn() throws InterruptedException {
+ final int numEntries = 10;
+ appendEntries(buffer, txnID, numEntries, 0L);
+ TransactionMeta meta = null;
+ try {
+ meta = buffer.getTransactionMeta(txnID).get();
+ } catch (ExecutionException e) {
+ fail("Should not failed at here");
+ }
+ assertEquals(txnID, meta.id());
+ assertEquals(TxnStatus.OPEN, meta.status());
+
+ try {
+ buffer.openTransactionBufferReader(txnID, 0L).get();
+ fail("Should failed");
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof TransactionNotSealedException);
+ }
+ }
+
+ @Test
+ public void testOpenReaderOnCommittedTxn() throws ExecutionException, InterruptedException {
+ final int numEntries = 10;
+ appendEntries(buffer, txnID, numEntries, 0L);
+ TransactionMeta meta = buffer.getTransactionMeta(txnID).get();
+ assertEquals(txnID, meta.id());
+ assertEquals(TxnStatus.OPEN, meta.status());
+
+ buffer.commitTxn(txnID, 22L, 33L).get();
+
+ meta = buffer.getTransactionMeta(txnID).get();
+ assertEquals(txnID, meta.id());
+ assertEquals(TxnStatus.COMMITTED, meta.status());
+
+ try (TransactionBufferReader reader = buffer.openTransactionBufferReader(txnID, 0L).get()) {
+ List<TransactionEntry> entries = reader.readNext(numEntries).get();
+ verifyAndReleaseEntries(entries, txnID, 0L, numEntries);
+
+ reader.readNext(1).get();
+
+ } catch (ExecutionException ee) {
+ assertTrue(ee.getCause() instanceof EndOfTransactionException);
+ }
+
+ }
+
+ @Test
+ public void testCommitNonExistentTxn() throws ExecutionException, InterruptedException {
+ try {
+ buffer.commitTxn(txnID, 22L, 33L).get();
+ } catch (ExecutionException ee) {
+ assertTrue(ee.getCause() instanceof TransactionNotFoundException);
+ }
+ }
+
+ @Test
+ public void testCommitTxn() throws Exception {
+ final int numEntries = 10;
+ appendEntries(buffer, txnID, numEntries, 0L);
+ TransactionMeta meta = buffer.getTransactionMeta(txnID).get();
+
+ assertEquals(txnID, meta.id());
+ assertEquals(meta.status(), TxnStatus.OPEN);
+
+ buffer.commitTxn(txnID, 22L, 33L).get();
+ meta = buffer.getTransactionMeta(txnID).get();
+
+ assertEquals(txnID, meta.id());
+ assertEquals(meta.status(), TxnStatus.COMMITTED);
+ }
+
+ @Test
+ public void testCommitTxnMultiTimes() throws ExecutionException, InterruptedException {
+ final int numEntries = 10;
+ appendEntries(buffer, txnID, numEntries, 0L);
+ TransactionMeta meta = buffer.getTransactionMeta(txnID).get();
+
+ assertEquals(txnID, meta.id());
+ assertEquals(meta.status(), TxnStatus.OPEN);
+
+ buffer.commitTxn(txnID, 22L, 33L).get();
+ try {
+ buffer.commitTxn(txnID, 23L, 34L).get();
+ buffer.commitTxn(txnID, 24L, 34L).get();
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof UnexpectedTxnStatusException);
+ }
+ meta = buffer.getTransactionMeta(txnID).get();
+
+ assertEquals(txnID, meta.id());
+ assertEquals(meta.status(), TxnStatus.COMMITTED);
+ assertEquals(meta.committedAtLedgerId(), 22L);
+ assertEquals(meta.committedAtEntryId(), 33L);
+ assertEquals(meta.numEntries(), numEntries);
+ }
+
+ @Test
+ public void testAbortNonExistentTxn() throws Exception {
+ try {
+ buffer.abortTxn(txnID).get();
+ fail("Should fail to abort a transaction if it doesn't exist");
+ } catch (ExecutionException ee) {
+ assertTrue(ee.getCause() instanceof TransactionNotFoundException);
+ }
+ }
+
+ @Test
+ public void testAbortCommittedTxn() throws Exception {
+ final int numEntries = 10;
+ appendEntries(buffer, txnID, numEntries, 0L);
+ TransactionMeta meta = buffer.getTransactionMeta(txnID).get();
+ assertEquals(txnID, meta.id());
+ assertEquals(TxnStatus.OPEN, meta.status());
+
+ buffer.commitTxn(txnID, 22L, 33L).get();
+ meta = buffer.getTransactionMeta(txnID).get();
+ assertEquals(txnID, meta.id());
+ assertEquals(TxnStatus.COMMITTED, meta.status());
+
+ try {
+ buffer.abortTxn(txnID).get();
+ fail("Should fail to abort a committed transaction");
+ } catch (ExecutionException ee) {
+ assertTrue(ee.getCause() instanceof UnexpectedTxnStatusException);
+ }
+
+ meta = buffer.getTransactionMeta(txnID).get();
+ assertEquals(txnID, meta.id());
+ assertEquals(TxnStatus.COMMITTED, meta.status());
+ }
+
+ @Test
+ public void testAbortTxn() throws Exception {
+ final int numEntries = 10;
+ appendEntries(buffer, txnID, numEntries, 0L);
+ TransactionMeta meta = buffer.getTransactionMeta(txnID).get();
+ assertEquals(txnID, meta.id());
+ assertEquals(TxnStatus.OPEN, meta.status());
+
+ buffer.abortTxn(txnID).get();
+ verifyTxnNotExist(txnID);
+ }
+
+ @Test
+ public void testPurgeTxns() throws Exception {
+ final int numEntries = 10;
+ TxnID txnId1 = new TxnID(1234L, 2345L);
+ appendEntries(buffer, txnId1, numEntries, 0L);
+ TransactionMeta meta1 = buffer.getTransactionMeta(txnId1).get();
+ assertEquals(txnId1, meta1.id());
+ assertEquals(TxnStatus.OPEN, meta1.status());
+
+ TxnID txnId2 = new TxnID(1234L, 3456L);
+ appendEntries(buffer, txnId2, numEntries, 0L);
+ buffer.commitTxn(txnId2, 22L, 0L).get();
+ TransactionMeta meta2 = buffer.getTransactionMeta(txnId2).get();
+ assertEquals(txnId2, meta2.id());
+ assertEquals(TxnStatus.COMMITTED, meta2.status());
+
+ TxnID txnId3 = new TxnID(1234L, 4567L);
+ appendEntries(buffer, txnId3, numEntries, 0L);
+ buffer.commitTxn(txnId3, 23L, 0L).get();
+ TransactionMeta meta3 = buffer.getTransactionMeta(txnId3).get();
+ assertEquals(txnId3, meta3.id());
+ assertEquals(TxnStatus.COMMITTED, meta3.status());
+
+ buffer.purgeTxns(Lists.newArrayList(Long.valueOf(22L))).get();
+
+ verifyTxnNotExist(txnId2);
+
+ meta1 = buffer.getTransactionMeta(txnId1).get();
+ assertEquals(txnId1, meta1.id());
+ assertEquals(TxnStatus.OPEN, meta1.status());
+
+ meta3 = buffer.getTransactionMeta(txnId3).get();
+ assertEquals(txnId3, meta3.id());
+ assertEquals(TxnStatus.COMMITTED, meta3.status());
+
+ // purge a non exist ledger.
+ try {
+ buffer.purgeTxns(Lists.newArrayList(Long.valueOf(1L))).get();
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof NoTxnsCommittedAtLedgerException);
+ }
+
+ verifyTxnNotExist(txnId2);
+
+ meta1 = buffer.getTransactionMeta(txnId1).get();
+ assertEquals(txnId1, meta1.id());
+ assertEquals(TxnStatus.OPEN, meta1.status());
+
+ meta3 = buffer.getTransactionMeta(txnId3).get();
+ assertEquals(txnId3, meta3.id());
+ assertEquals(TxnStatus.COMMITTED, meta3.status());
+ }
+
+ @Test
+ public void testAppendEntry() throws ExecutionException, InterruptedException, ManagedLedgerException,
+ BrokerServiceException.NamingException {
+ ManagedLedger ledger = factory.open("test_ledger");
+ PersistentTransactionBuffer newBuffer = new PersistentTransactionBuffer(successTopicName, ledger,
+ brokerService);
+ final int numEntries = 10;
+ TxnID txnID = new TxnID(1111L, 2222L);
+ List<ByteBuf> appendEntries = appendEntries(newBuffer, txnID, numEntries, 0L);
+ List<ByteBuf> copy = new ArrayList<>(appendEntries);
+ TransactionMetaImpl meta = (TransactionMetaImpl) newBuffer.getTransactionMeta(txnID).get();
+ assertEquals(meta.id(), txnID);
+ assertEquals(numEntries, meta.numEntries());
+ assertEquals(meta.status(), TxnStatus.OPEN);
+
+ verifyEntries(ledger, appendEntries, meta.getEntries());
+
+ newBuffer.commitTxn(txnID, 22L, 33L).get();
+ meta = (TransactionMetaImpl) newBuffer.getTransactionMeta(txnID).get();
+
+ assertEquals(meta.id(), txnID);
+ assertEquals(meta.numEntries(), numEntries);
+ assertEquals(meta.status(), TxnStatus.COMMITTED);
+ verifyEntries(ledger, copy, meta.getEntries());
+ }
+
+ @Test
+ public void testCommitMarker() throws Exception {
+ ManagedLedger ledger = factory.open("test_commit_ledger");
+ PersistentTransactionBuffer commitBuffer = new PersistentTransactionBuffer(successTopicName, ledger,
+ brokerService);
+ final int numEntries = 10;
+ List<ByteBuf> appendEntires = appendEntries(commitBuffer, txnID, numEntries, 0L);
+
+ TransactionMetaImpl meta = (TransactionMetaImpl) commitBuffer.getTransactionMeta(txnID).get();
+ assertEquals(meta.id(), txnID);
+ assertEquals(meta.numEntries(), numEntries);
+ assertEquals(meta.status(), TxnStatus.OPEN);
+
+ verifyEntries(ledger, appendEntires, meta.getEntries());
+
+ commitBuffer.commitTxn(txnID, 22L, 33L).get();
+ assertEquals(meta.id(), txnID);
+ assertEquals(meta.numEntries(), numEntries);
+ assertEquals(meta.status(), TxnStatus.COMMITTED);
+
+ ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.earliest);
+ Entry entry = getEntry(cursor, ledger.getLastConfirmedEntry());
+
+ boolean commitMarker = Markers.isTxnCommitMarker(Commands.parseMessageMetadata(entry.getDataBuffer()));
+ assertTrue(commitMarker);
+
+ }
+
+ @Test
+ public void testAbortMarker() throws Exception {
+ ManagedLedger ledger = factory.open("test_abort_ledger");
+ PersistentTransactionBuffer abortBuffer = new PersistentTransactionBuffer(successTopicName, ledger,
+ brokerService);
+ final int numEntries = 10;
+ List<ByteBuf> appendEntires = appendEntries(abortBuffer, txnID, numEntries, 0L);
+
+ TransactionMetaImpl meta = (TransactionMetaImpl) abortBuffer.getTransactionMeta(txnID).get();
+ assertEquals(meta.id(), txnID);
+ assertEquals(meta.numEntries(), numEntries);
+ assertEquals(meta.status(), TxnStatus.OPEN);
+
+ verifyEntries(ledger, appendEntires, meta.getEntries());
+
+ abortBuffer.abortTxn(txnID).get();
+ assertEquals(meta.id(), txnID);
+ assertEquals(meta.numEntries(), numEntries);
+ assertEquals(meta.status(), TxnStatus.ABORTED);
+
+ ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.earliest);
+ Entry entry = getEntry(cursor, ledger.getLastConfirmedEntry());
+
+ boolean abortMarker = Markers.isTxnAbortMarker(Commands.parseMessageMetadata(entry.getDataBuffer()));
+ assertTrue(abortMarker);
+ }
+
+ private void verifyEntries(ManagedLedger ledger, List<ByteBuf> appendEntries,
+ SortedMap<Long, Position> addedEntries)
+ throws ManagedLedgerException, InterruptedException {
+ ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.earliest);
+ assertNotNull(cursor);
+ for (Map.Entry<Long, Position> longPositionEntry : addedEntries.entrySet()) {
+ Entry entry = getEntry(cursor, longPositionEntry.getValue());
+ assertTrue(appendEntries.remove(entry.getDataBuffer()));
+ }
+ }
+
+ private Entry getEntry(ManagedCursor cursor, Position position)
+ throws ManagedLedgerException, InterruptedException {
+ assertNotNull(cursor);
+ cursor.seek(position);
+ List<Entry> readEntry = cursor.readEntries(1);
+ assertEquals(readEntry.size(), 1);
+ return readEntry.get(0);
+ }
+
+ @Test
+ public void testNoDeduplicateMessage()
+ throws ManagedLedgerException, InterruptedException, BrokerServiceException.NamingException,
+ ExecutionException {
+ ManagedLedger ledger = factory.open("test_deduplicate");
+ PersistentTransactionBuffer newBuffer = new PersistentTransactionBuffer(successTopicName, ledger,
+ brokerService);
+ final int numEntries = 10;
+
+ TxnID txnID = new TxnID(1234L, 5678L);
+ List<ByteBuf> appendEntries = appendEntries(newBuffer, txnID, numEntries, 0L);
+ TransactionMetaImpl meta = (TransactionMetaImpl) newBuffer.getTransactionMeta(txnID).get();
+
+ assertEquals(meta.id(), txnID);
+ assertEquals(meta.status(), TxnStatus.OPEN);
+ assertEquals(meta.numEntries(), appendEntries.size());
+
+ verifyEntries(ledger, appendEntries, meta.getEntries());
+
+ // append new message with same sequenceId
+ List<ByteBuf> deduplicateData = new ArrayList<>();
+ for (int i = 0; i < numEntries; i++) {
+ long sequenceId = i;
+ ByteBuf data = Unpooled.copiedBuffer("message-deduplicate-" + sequenceId, UTF_8);
+ newBuffer.appendBufferToTxn(txnID, sequenceId, data);
+ deduplicateData.add(data);
+ }
+
+ TransactionMetaImpl meta1 = (TransactionMetaImpl) newBuffer.getTransactionMeta(txnID).get();
+
+ assertEquals(meta1.id(), txnID);
+ assertEquals(meta1.numEntries(), numEntries);
+ assertEquals(meta.status(), TxnStatus.OPEN);
+
+ // read all entries in new buffer
+ ManagedCursor read = ledger.newNonDurableCursor(PositionImpl.earliest);
+ List<Entry> allEntries = read.readEntries(100);
+ List<ByteBuf> allMsg = allEntries.stream().map(entry -> entry.getDataBuffer()).collect(Collectors.toList());
+
+ assertEquals(allEntries.size(), numEntries);
+ verifyEntries(ledger, allMsg, meta1.getEntries());
+ }
+
+ private void verifyTxnNotExist(TxnID txnID) throws Exception {
+ try {
+ buffer.getTransactionMeta(txnID).get();
+ } catch (ExecutionException ee) {
+ assertTrue(ee.getCause() instanceof TransactionNotFoundException);
+ }
+ }
+
+ private List<ByteBuf> appendEntries(PersistentTransactionBuffer writeBuffer, TxnID id, int numEntries,
+ long startSequenceId) {
+ List<ByteBuf> entries = new ArrayList<>();
+ for (int i = 0; i < numEntries; i++) {
+ long sequenceId = startSequenceId + i;
+ writeBuffer.appendBufferToTxn(id, sequenceId, Unpooled.copiedBuffer("message-" + sequenceId, UTF_8)).join();
+ entries.add(Unpooled.copiedBuffer("message-" + sequenceId, UTF_8));
+ }
+ return entries;
+ }
+
+ private void verifyAndReleaseEntries(List<TransactionEntry> txnEntries,
+ TxnID txnID,
+ long startSequenceId,
+ int numEntriesToRead) {
+ assertEquals(txnEntries.size(), numEntriesToRead);
+ for (int i = 0; i < numEntriesToRead; i++) {
+ try (TransactionEntry txnEntry = txnEntries.get(i)) {
+ assertEquals(txnEntry.committedAtLedgerId(), 22L);
+ assertEquals(txnEntry.committedAtEntryId(), 33L);
+ assertEquals(txnEntry.txnId(), txnID);
+ assertEquals(txnEntry.sequenceId(), startSequenceId + i);
+ assertEquals(new String(
+ ByteBufUtil.getBytes(txnEntry.getEntryBuffer()),
+ UTF_8
+ ), "message-" + i);
+ }
+ }
+ }
+
+}
diff --git a/pulsar-transaction/buffer/src/test/java/org/apache/pulsar/transaction/buffer/impl/TransactionBufferTest.java b/pulsar-transaction/buffer/src/test/java/org/apache/pulsar/transaction/buffer/impl/TransactionBufferTest.java
index b5bda24..4902e2f 100644
--- a/pulsar-transaction/buffer/src/test/java/org/apache/pulsar/transaction/buffer/impl/TransactionBufferTest.java
+++ b/pulsar-transaction/buffer/src/test/java/org/apache/pulsar/transaction/buffer/impl/TransactionBufferTest.java
@@ -74,7 +74,7 @@ public class TransactionBufferTest {
@AfterMethod
public void teardown() throws Exception {
- this.buffer.close();
+ this.buffer.closeAsync();
}
@Test