You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/11/30 14:23:35 UTC

[GitHub] [pulsar] congbobo184 opened a new pull request #8750: [Transaction] Transaction timeout implementation.

congbobo184 opened a new pull request #8750:
URL: https://github.com/apache/pulsar/pull/8750


   ## Motivation
   in order to handle the transaction timeout.
   
   when aborting and committing finish we should change the status in transaction coordinator.
   ## implement
   1. add the transaction timeout tracker factory
   2. add the transaction timeout tracker
   3. use HashedWheelTimer to implement it.
   ### Verifying this change
   Add the tests for it
   
   Does this pull request potentially affect one of the following parts:
   If yes was chosen, please highlight the changes
   
   Dependencies (does it add or upgrade a dependency): (no)
   The public API: (no)
   The schema: (no)
   The default values of configurations: (no)
   The wire protocol: (no)
   The rest endpoints: (no)
   The admin cli options: (no)
   Anything that affects deployment: (no)
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8750: [Transaction] Transaction timeout implementation.

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8750:
URL: https://github.com/apache/pulsar/pull/8750#issuecomment-736920910


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on a change in pull request #8750: [Transaction] Transaction timeout implementation.

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on a change in pull request #8750:
URL: https://github.com/apache/pulsar/pull/8750#discussion_r534243188



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -18,32 +18,126 @@
  */
 package org.apache.pulsar.broker.transaction.buffer.impl;
 
+import static org.apache.pulsar.common.protocol.Markers.isTxnMarker;
 import io.netty.buffer.ByteBuf;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReplayCallback;
 import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.api.proto.PulsarMarkers;
+import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Markers;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
+import org.jctools.queues.MessagePassingQueue;
+import org.jctools.queues.SpscArrayQueue;
 
 /**
  * Transaction buffer based on normal persistent topic.
  */
 @Slf4j
-public class TopicTransactionBuffer implements TransactionBuffer {
+public class TopicTransactionBuffer extends TopicTransactionBufferState implements TransactionBuffer {
 
     private final PersistentTopic topic;
 
-    public TopicTransactionBuffer(PersistentTopic topic) {
+    private final SpscArrayQueue<Entry> entryQueue;
+
+    private final ConcurrentOpenHashMap<TxnID, ConcurrentOpenHashSet<PositionImpl>> txnBufferCache = new ConcurrentOpenHashMap<>();
+
+    //this is for transaction buffer replay start position
+    //this will be stored in managed ledger properties and every 10000 will sync to zk by default.
+    private final ConcurrentSkipListSet<PositionImpl> positionsSort = new ConcurrentSkipListSet<>();
+
+    private final ManagedCursor cursor;
+
+    //this is for replay
+    private final PositionImpl lastConfirmedEntry;
+    //this if for replay
+    private PositionImpl currentLoadPosition;
+
+    private final AtomicInteger countToSyncPosition = new AtomicInteger(0);
+
+    private final static String TXN_ON_GOING_POSITION_SUFFIX = "-txnOnGoingPosition";
+
+    private final String txnOnGoingPositionName;
+
+    //TODO this can config
+    private int defaultCountToSyncPosition = 10000;
+
+    public TopicTransactionBuffer(PersistentTopic topic) throws ManagedLedgerException {
+        super(State.None);
+        this.entryQueue = new SpscArrayQueue<>(2000);
         this.topic = topic;
+        ManagedLedger managedLedger = topic.getManagedLedger();
+        this.lastConfirmedEntry = (PositionImpl) managedLedger.getLastConfirmedEntry();
+        this.txnOnGoingPositionName = topic.getName() + TXN_ON_GOING_POSITION_SUFFIX;
+        String positionString = managedLedger.getProperties().get(txnOnGoingPositionName);
+        if (positionString == null) {
+            this.currentLoadPosition = PositionImpl.earliest;
+        } else {
+            PositionImpl position = PositionImpl.earliest;
+            try {
+                position = PositionImpl.convertStringToPosition(positionString);
+            } catch (Exception e) {
+                log.error("Topic : [{}] transaction buffer get replay start position error!", topic.getName());
+            }
+            this.currentLoadPosition = position;
+        }
+        this.cursor = managedLedger.newNonDurableCursor(currentLoadPosition);
+
+        new Thread(() -> new TopicTransactionBufferReplayer(new TransactionBufferReplayCallback() {
+
+            @Override
+            public void replayComplete() {
+                if (!changeToReadyState()) {
+                    log.error("Managed ledger transaction metadata store change state error when replay complete");
+                }
+            }
+
+            @Override
+            public void handleMetadataEntry(Position position, MessageMetadata messageMetadata) {
+                if (!messageMetadata.hasTxnidMostBits() || !messageMetadata.hasTxnidLeastBits()) {
+                    return;
+                }
+                TxnID txnID = new TxnID(messageMetadata.getTxnidMostBits(),
+                        messageMetadata.getTxnidLeastBits());
+                if (isTxnMarker(messageMetadata)) {
+                    ConcurrentOpenHashSet<PositionImpl> positions = txnBufferCache.remove(txnID);
+                    positionsSort.removeAll(positions.values());
+                } else {
+                    ConcurrentOpenHashSet<PositionImpl> positions =
+                            txnBufferCache.computeIfAbsent(txnID, (v) -> new ConcurrentOpenHashSet<>());
+                    positions.add((PositionImpl) position);
+                    positionsSort.add((PositionImpl) position);
+                }
+            }
+        }).start()).start();

Review comment:
       it is good idea, i will change it. :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 removed a comment on pull request #8750: [Transaction] Transaction timeout implementation.

Posted by GitBox <gi...@apache.org>.
congbobo184 removed a comment on pull request #8750:
URL: https://github.com/apache/pulsar/pull/8750#issuecomment-736919215


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #8750: [Transaction] Transaction timeout implementation.

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #8750:
URL: https://github.com/apache/pulsar/pull/8750#discussion_r533946688



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
##########
@@ -129,6 +129,20 @@ public boolean equals(Object obj) {
         return false;
     }
 
+    public static PositionImpl convertStringToPosition(String positionString) {
+        if (positionString == null) {
+            throw new NullPointerException();
+        } else {
+            String[] strings = positionString.split(":");
+            if (strings.length != 2) {
+                throw new IndexOutOfBoundsException();
+            }
+            long ledgerId = Long.parseLong(strings[0]);
+            long entryId = Long.parseLong(strings[1]);

Review comment:
       can you please catch "NumberFormatException" and then rethrow a meaningful error message ?

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
##########
@@ -129,6 +129,20 @@ public boolean equals(Object obj) {
         return false;
     }
 
+    public static PositionImpl convertStringToPosition(String positionString) {
+        if (positionString == null) {
+            throw new NullPointerException();
+        } else {
+            String[] strings = positionString.split(":");
+            if (strings.length != 2) {
+                throw new IndexOutOfBoundsException();

Review comment:
       can you add a message ? like "invalid position "+positionString
   it will ease debugging issues in the future

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -18,32 +18,126 @@
  */
 package org.apache.pulsar.broker.transaction.buffer.impl;
 
+import static org.apache.pulsar.common.protocol.Markers.isTxnMarker;
 import io.netty.buffer.ByteBuf;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReplayCallback;
 import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.api.proto.PulsarMarkers;
+import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Markers;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
+import org.jctools.queues.MessagePassingQueue;
+import org.jctools.queues.SpscArrayQueue;
 
 /**
  * Transaction buffer based on normal persistent topic.
  */
 @Slf4j
-public class TopicTransactionBuffer implements TransactionBuffer {
+public class TopicTransactionBuffer extends TopicTransactionBufferState implements TransactionBuffer {
 
     private final PersistentTopic topic;
 
-    public TopicTransactionBuffer(PersistentTopic topic) {
+    private final SpscArrayQueue<Entry> entryQueue;
+
+    private final ConcurrentOpenHashMap<TxnID, ConcurrentOpenHashSet<PositionImpl>> txnBufferCache = new ConcurrentOpenHashMap<>();
+
+    //this is for transaction buffer replay start position
+    //this will be stored in managed ledger properties and every 10000 will sync to zk by default.
+    private final ConcurrentSkipListSet<PositionImpl> positionsSort = new ConcurrentSkipListSet<>();
+
+    private final ManagedCursor cursor;
+
+    //this is for replay
+    private final PositionImpl lastConfirmedEntry;
+    //this if for replay
+    private PositionImpl currentLoadPosition;
+
+    private final AtomicInteger countToSyncPosition = new AtomicInteger(0);
+
+    private final static String TXN_ON_GOING_POSITION_SUFFIX = "-txnOnGoingPosition";
+
+    private final String txnOnGoingPositionName;
+
+    //TODO this can config
+    private int defaultCountToSyncPosition = 10000;
+
+    public TopicTransactionBuffer(PersistentTopic topic) throws ManagedLedgerException {
+        super(State.None);
+        this.entryQueue = new SpscArrayQueue<>(2000);
         this.topic = topic;
+        ManagedLedger managedLedger = topic.getManagedLedger();
+        this.lastConfirmedEntry = (PositionImpl) managedLedger.getLastConfirmedEntry();
+        this.txnOnGoingPositionName = topic.getName() + TXN_ON_GOING_POSITION_SUFFIX;
+        String positionString = managedLedger.getProperties().get(txnOnGoingPositionName);
+        if (positionString == null) {
+            this.currentLoadPosition = PositionImpl.earliest;
+        } else {
+            PositionImpl position = PositionImpl.earliest;
+            try {
+                position = PositionImpl.convertStringToPosition(positionString);
+            } catch (Exception e) {
+                log.error("Topic : [{}] transaction buffer get replay start position error!", topic.getName());
+            }
+            this.currentLoadPosition = position;
+        }
+        this.cursor = managedLedger.newNonDurableCursor(currentLoadPosition);
+
+        new Thread(() -> new TopicTransactionBufferReplayer(new TransactionBufferReplayCallback() {

Review comment:
       can we give a name to this thread ?
   probably it should be marked as "deamon"
   we should also ensure that the thread ends when we are shutting down this TopicTransactionBuffer
   otherwise we will have a zoombie thread that is still retaining references to this object and possibly corrupting its status.
   
   one question isn't it too heavyweight to start a thread per each topic ?
   

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
##########
@@ -243,10 +241,13 @@ public void removeTransactionMetadataStore(TransactionCoordinatorID tcId) {
             });
 
             List<MessageId> messageIdList = new ArrayList<>();
-            for (MessageIdData messageIdData : messageIdDataList) {
-                messageIdList.add(new MessageIdImpl(
-                        messageIdData.getLedgerId(), messageIdData.getEntryId(), messageIdData.getPartition()));
-                messageIdData.recycle();
+            //TODO when pending ack buffer finish this logic can remove

Review comment:
       can we link an issue for this TODO ?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
##########
@@ -266,20 +267,36 @@ public void removeTransactionMetadataStore(TransactionCoordinatorID tcId) {
                 }
                 completableFutureList.add(actionFuture);
             });
+            FutureUtil.waitForAll(completableFutureList).whenComplete((ignored, waitThrowable) -> {
+                if (waitThrowable != null) {
+                    resultFuture.completeExceptionally(waitThrowable);
+                    endTxnInTransactionBuffer(txnID, txnAction, messageIdDataList);
+                    return;
+                }
+                resultFuture.complete(null);
+                TxnStatus newStatus;
+                TxnStatus expectedStatus;
+                if (txnAction == TxnAction.COMMIT_VALUE) {
+                    newStatus = TxnStatus.COMMITTED;
+                    expectedStatus = TxnStatus.COMMITTING;
+                } else {
+                    newStatus = TxnStatus.ABORTED;
+                    expectedStatus = TxnStatus.ABORTING;
+                }
+                //TODO find a better way to handle this failure when update transaction sstatus
+                finalityEndTransaction(txnID, newStatus, expectedStatus);
+            });
+        });
+        return resultFuture;
+    }
 
-            try {
-                FutureUtil.waitForAll(completableFutureList).whenComplete((ignored, waitThrowable) -> {
-                    if (waitThrowable != null) {
-                        resultFuture.completeExceptionally(waitThrowable);
-                        return;
-                    }
-                    resultFuture.complete(null);
-                });
-            } catch (Exception e) {
-                resultFuture.completeExceptionally(e);
+    private void finalityEndTransaction(TxnID txnID, TxnStatus newStatus, TxnStatus expectedStatus) {

Review comment:
       what about renaming "finalityEndTransaction" to  "completeEndTransaction" ?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -18,32 +18,126 @@
  */
 package org.apache.pulsar.broker.transaction.buffer.impl;
 
+import static org.apache.pulsar.common.protocol.Markers.isTxnMarker;
 import io.netty.buffer.ByteBuf;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReplayCallback;
 import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.api.proto.PulsarMarkers;
+import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Markers;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
+import org.jctools.queues.MessagePassingQueue;
+import org.jctools.queues.SpscArrayQueue;
 
 /**
  * Transaction buffer based on normal persistent topic.
  */
 @Slf4j
-public class TopicTransactionBuffer implements TransactionBuffer {
+public class TopicTransactionBuffer extends TopicTransactionBufferState implements TransactionBuffer {
 
     private final PersistentTopic topic;
 
-    public TopicTransactionBuffer(PersistentTopic topic) {
+    private final SpscArrayQueue<Entry> entryQueue;
+
+    private final ConcurrentOpenHashMap<TxnID, ConcurrentOpenHashSet<PositionImpl>> txnBufferCache = new ConcurrentOpenHashMap<>();
+
+    //this is for transaction buffer replay start position
+    //this will be stored in managed ledger properties and every 10000 will sync to zk by default.
+    private final ConcurrentSkipListSet<PositionImpl> positionsSort = new ConcurrentSkipListSet<>();
+
+    private final ManagedCursor cursor;
+
+    //this is for replay
+    private final PositionImpl lastConfirmedEntry;
+    //this if for replay
+    private PositionImpl currentLoadPosition;
+
+    private final AtomicInteger countToSyncPosition = new AtomicInteger(0);
+
+    private final static String TXN_ON_GOING_POSITION_SUFFIX = "-txnOnGoingPosition";
+
+    private final String txnOnGoingPositionName;
+
+    //TODO this can config
+    private int defaultCountToSyncPosition = 10000;
+
+    public TopicTransactionBuffer(PersistentTopic topic) throws ManagedLedgerException {
+        super(State.None);
+        this.entryQueue = new SpscArrayQueue<>(2000);
         this.topic = topic;
+        ManagedLedger managedLedger = topic.getManagedLedger();
+        this.lastConfirmedEntry = (PositionImpl) managedLedger.getLastConfirmedEntry();
+        this.txnOnGoingPositionName = topic.getName() + TXN_ON_GOING_POSITION_SUFFIX;
+        String positionString = managedLedger.getProperties().get(txnOnGoingPositionName);
+        if (positionString == null) {
+            this.currentLoadPosition = PositionImpl.earliest;
+        } else {
+            PositionImpl position = PositionImpl.earliest;
+            try {
+                position = PositionImpl.convertStringToPosition(positionString);
+            } catch (Exception e) {
+                log.error("Topic : [{}] transaction buffer get replay start position error!", topic.getName());
+            }
+            this.currentLoadPosition = position;
+        }
+        this.cursor = managedLedger.newNonDurableCursor(currentLoadPosition);
+
+        new Thread(() -> new TopicTransactionBufferReplayer(new TransactionBufferReplayCallback() {
+
+            @Override
+            public void replayComplete() {
+                if (!changeToReadyState()) {
+                    log.error("Managed ledger transaction metadata store change state error when replay complete");
+                }
+            }
+
+            @Override
+            public void handleMetadataEntry(Position position, MessageMetadata messageMetadata) {
+                if (!messageMetadata.hasTxnidMostBits() || !messageMetadata.hasTxnidLeastBits()) {
+                    return;
+                }
+                TxnID txnID = new TxnID(messageMetadata.getTxnidMostBits(),
+                        messageMetadata.getTxnidLeastBits());
+                if (isTxnMarker(messageMetadata)) {
+                    ConcurrentOpenHashSet<PositionImpl> positions = txnBufferCache.remove(txnID);
+                    positionsSort.removeAll(positions.values());
+                } else {
+                    ConcurrentOpenHashSet<PositionImpl> positions =
+                            txnBufferCache.computeIfAbsent(txnID, (v) -> new ConcurrentOpenHashSet<>());
+                    positions.add((PositionImpl) position);
+                    positionsSort.add((PositionImpl) position);
+                }
+            }
+        }).start()).start();

Review comment:
       we should not start a Thread in a constructor, especially when the Thread refers to the object that is beeing created, we could have bad memory leaks in case we are not properly releasing all of the resources




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on a change in pull request #8750: [Transaction] Transaction timeout implementation.

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on a change in pull request #8750:
URL: https://github.com/apache/pulsar/pull/8750#discussion_r534241703



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -18,32 +18,126 @@
  */
 package org.apache.pulsar.broker.transaction.buffer.impl;
 
+import static org.apache.pulsar.common.protocol.Markers.isTxnMarker;
 import io.netty.buffer.ByteBuf;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReplayCallback;
 import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.api.proto.PulsarMarkers;
+import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Markers;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
+import org.jctools.queues.MessagePassingQueue;
+import org.jctools.queues.SpscArrayQueue;
 
 /**
  * Transaction buffer based on normal persistent topic.
  */
 @Slf4j
-public class TopicTransactionBuffer implements TransactionBuffer {
+public class TopicTransactionBuffer extends TopicTransactionBufferState implements TransactionBuffer {
 
     private final PersistentTopic topic;
 
-    public TopicTransactionBuffer(PersistentTopic topic) {
+    private final SpscArrayQueue<Entry> entryQueue;
+
+    private final ConcurrentOpenHashMap<TxnID, ConcurrentOpenHashSet<PositionImpl>> txnBufferCache = new ConcurrentOpenHashMap<>();
+
+    //this is for transaction buffer replay start position
+    //this will be stored in managed ledger properties and every 10000 will sync to zk by default.
+    private final ConcurrentSkipListSet<PositionImpl> positionsSort = new ConcurrentSkipListSet<>();
+
+    private final ManagedCursor cursor;
+
+    //this is for replay
+    private final PositionImpl lastConfirmedEntry;
+    //this if for replay
+    private PositionImpl currentLoadPosition;
+
+    private final AtomicInteger countToSyncPosition = new AtomicInteger(0);
+
+    private final static String TXN_ON_GOING_POSITION_SUFFIX = "-txnOnGoingPosition";
+
+    private final String txnOnGoingPositionName;
+
+    //TODO this can config
+    private int defaultCountToSyncPosition = 10000;
+
+    public TopicTransactionBuffer(PersistentTopic topic) throws ManagedLedgerException {
+        super(State.None);
+        this.entryQueue = new SpscArrayQueue<>(2000);
         this.topic = topic;
+        ManagedLedger managedLedger = topic.getManagedLedger();
+        this.lastConfirmedEntry = (PositionImpl) managedLedger.getLastConfirmedEntry();
+        this.txnOnGoingPositionName = topic.getName() + TXN_ON_GOING_POSITION_SUFFIX;
+        String positionString = managedLedger.getProperties().get(txnOnGoingPositionName);
+        if (positionString == null) {
+            this.currentLoadPosition = PositionImpl.earliest;
+        } else {
+            PositionImpl position = PositionImpl.earliest;
+            try {
+                position = PositionImpl.convertStringToPosition(positionString);
+            } catch (Exception e) {
+                log.error("Topic : [{}] transaction buffer get replay start position error!", topic.getName());
+            }
+            this.currentLoadPosition = position;
+        }
+        this.cursor = managedLedger.newNonDurableCursor(currentLoadPosition);
+
+        new Thread(() -> new TopicTransactionBufferReplayer(new TransactionBufferReplayCallback() {

Review comment:
       yes, you are right. we should control the thread start and close.
   
   ```
   one question isn't it too heavyweight to start a thread per each topic ?
   ```
   we async init TopicTransactionBuffer, do you have any better way to do it?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8750: [Transaction] Transaction timeout implementation.

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8750:
URL: https://github.com/apache/pulsar/pull/8750#issuecomment-736920332


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8750: [Transaction] Transaction timeout implementation.

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8750:
URL: https://github.com/apache/pulsar/pull/8750#issuecomment-737291775


   @eolivelli thank for you comment :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on a change in pull request #8750: [Transaction] Transaction timeout implementation.

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on a change in pull request #8750:
URL: https://github.com/apache/pulsar/pull/8750#discussion_r534214172



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
##########
@@ -129,6 +129,20 @@ public boolean equals(Object obj) {
         return false;
     }
 
+    public static PositionImpl convertStringToPosition(String positionString) {
+        if (positionString == null) {
+            throw new NullPointerException();
+        } else {
+            String[] strings = positionString.split(":");
+            if (strings.length != 2) {
+                throw new IndexOutOfBoundsException();

Review comment:
       sure, it is a good idea!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 closed pull request #8750: [Transaction] Transaction timeout implementation.

Posted by GitBox <gi...@apache.org>.
congbobo184 closed pull request #8750:
URL: https://github.com/apache/pulsar/pull/8750


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8750: [Transaction] Transaction timeout implementation.

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8750:
URL: https://github.com/apache/pulsar/pull/8750#issuecomment-736938662


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8750: [Transaction] Transaction timeout implementation.

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8750:
URL: https://github.com/apache/pulsar/pull/8750#issuecomment-736919215


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 removed a comment on pull request #8750: [Transaction] Transaction timeout implementation.

Posted by GitBox <gi...@apache.org>.
congbobo184 removed a comment on pull request #8750:
URL: https://github.com/apache/pulsar/pull/8750#issuecomment-736920332


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org