You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/09/28 07:06:12 UTC

[GitHub] [pulsar] liangyepianzhou opened a new pull request #12219: Transaction buffer take snapshot max read position

liangyepianzhou opened a new pull request #12219:
URL: https://github.com/apache/pulsar/pull/12219


   ### Motivation
   In the previous implementation of transactionBuffer, if no transaction committed or aborted, TransactionBuffer will not take a snapshot. Even if it is a timed task, it will judge whether there has been a transaction committed or aborted. If no, the timer will  be skipped.
   **This will have two disadvantages:**
   * If there is no snapshot , maxReadPosition needs to be restored from the earliest when the broker restarts
   * If there is no snapShot,  transaction data offload will not get an accurate reference
   
   ### Modifications
   We hope that every modification of maxReadPosition will modify changeMaxReadPositionAndAddAbortTimes too, so that takeSnapshotByTimeout can save the latest modification of maxReadPosition in time.
   
   Since it always starts at takeSnapshot and the initial value of maxReadPosition is earliest, when persistentTopic get the snapshot,  don't need to wait for TransactionBufferReady, it can return maxReadPosition directly. Because we know that this must not be a value that will cause an error.
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   This change added tests :
   Added unit test testTakeSnapshot() in TransactionTest.
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API: (yes / no)
     - The schema: (yes / no / don't know)
     - The default values of configurations: (yes / no)
     - The wire protocol: (yes / no)
     - The rest endpoints: (yes / no)
     - The admin cli options: (yes / no)
     - Anything that affects deployment: (yes / no / don't know)
   
   ### Documentation
   
   Check the box below and label this PR (if you have committer privilege).
   
   Need to update docs? 
   
   - [ ] doc-required 
     
     (If you need help on updating docs, create a doc issue)
     
   - [ ] no-need-doc 
     
     (Please explain why)
     
   - [ ] doc 
     
     (If this PR contains doc changes)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou commented on a change in pull request #12219: [Transaction] Transaction buffer take snapshot max read position

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on a change in pull request #12219:
URL: https://github.com/apache/pulsar/pull/12219#discussion_r728018227



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java
##########
@@ -32,7 +32,8 @@
         None,
         Initializing,
         Ready,
-        Close
+        Close,
+        Unused

Review comment:
       Thx, I have adopted your suggestion. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou commented on pull request #12219: Transaction buffer take snapshot max read position

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on pull request #12219:
URL: https://github.com/apache/pulsar/pull/12219#issuecomment-933147104


   > We'd better store the snapshot into system topic instead of add properties into managedLedger.
   
   Thank you for your reminder. What do you think of this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou commented on pull request #12219: [Transaction] Transaction buffer take snapshot max read position

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on pull request #12219:
URL: https://github.com/apache/pulsar/pull/12219#issuecomment-940614914


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou commented on a change in pull request #12219: [Transaction] Transaction buffer take snapshot max read position

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on a change in pull request #12219:
URL: https://github.com/apache/pulsar/pull/12219#discussion_r728019063



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java
##########
@@ -32,7 +32,8 @@
         None,
         Initializing,
         Ready,
-        Close
+        Close,
+        Unused

Review comment:
       Which will be submitted together in this update




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on a change in pull request #12219: [Transaction] Transaction buffer take snapshot max read position

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on a change in pull request #12219:
URL: https://github.com/apache/pulsar/pull/12219#discussion_r726770905



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java
##########
@@ -49,10 +50,22 @@ protected boolean changeToReadyState() {
         return (STATE_UPDATER.compareAndSet(this, State.Initializing, State.Ready));
     }
 
+    protected boolean changeToUnUsedState() {
+        return (STATE_UPDATER.compareAndSet(this, State.Initializing, State.Unused));
+    }
+
+    protected boolean changeBackToUnUsedState() {

Review comment:
       the same as changeToUnUsedState

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -163,6 +175,37 @@ public void recoverExceptionally(Exception e) {
     @Override
     public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) {
         CompletableFuture<Position> completableFuture = new CompletableFuture<>();
+        if (checkIfReady()){
+            addTxnEntry(completableFuture, txnId, buffer);
+        } else {
+            if (checkIfUnused()){
+                if (changeToInitializingStateFromUnused()){
+                    buffer.retain();
+                    takeSnapshot().thenAccept(ignore -> {
+                        addTxnEntry(completableFuture, txnId, buffer);
+                        buffer.release();
+                        changeToReadyState();
+                    }).exceptionally(exception -> {
+                        changeBackToUnUsedState();
+                        buffer.release();
+                        log.error("Fail to takeSnapshot before adding the first message with transaction", exception);
+                        completableFuture.completeExceptionally(exception);
+                        return null;
+                    });
+                } else {
+                    completableFuture.completeExceptionally(new TransactionBufferInitialUseException("Fail to change "
+                            + "TransactionBufferState from Unused to Ready When the first message with transaction "
+                            + "was sent"));
+                }
+            } else {
+                completableFuture.completeExceptionally(new TransactionBufferStatusException(this.topic.getName(),
+                        State.Unused, getState()));

Review comment:
       except state is ready.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on pull request #12219: Transaction buffer take snapshot max read position

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on pull request #12219:
URL: https://github.com/apache/pulsar/pull/12219#issuecomment-932238825


   We'd better store the snapshot into system topic instead of add properties into managedLedger.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou commented on pull request #12219: [Transaction] Transaction buffer take snapshot max read position

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on pull request #12219:
URL: https://github.com/apache/pulsar/pull/12219#issuecomment-939429578


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou commented on a change in pull request #12219: [Transaction] Transaction buffer take snapshot max read position

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on a change in pull request #12219:
URL: https://github.com/apache/pulsar/pull/12219#discussion_r728010473



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -2981,12 +2981,16 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
                                     // Message has been successfully persisted
                                     messageDeduplication.recordMessagePersisted(publishContext,
                                             (PositionImpl) position);
-                                    publishContext.completed(null, ((PositionImpl) position).getLedgerId(),
-                                            ((PositionImpl) position).getEntryId());
+                                    publishContext.completed(null, position.getLedgerId(),
+                                            position.getEntryId());
 
                                     decrementPendingWriteOpsAndCheck();
                                 })
                                 .exceptionally(throwable -> {
+                                    throwable = throwable.getCause();
+                                    if (!(throwable instanceof ManagedLedgerException)){
+                                        throwable = new ManagedLedgerException(throwable);
+                                    }

Review comment:
       The methoda of ddFailed() needs to accept ManagedLedgerException as a parameter, and transaction will return exceptions that are not ManagedLedgerException in two places. So unified packaging here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou commented on pull request #12219: [Transaction] Transaction buffer take snapshot max read position

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on pull request #12219:
URL: https://github.com/apache/pulsar/pull/12219#issuecomment-942342432


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on a change in pull request #12219: Transaction buffer take snapshot max read position

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on a change in pull request #12219:
URL: https://github.com/apache/pulsar/pull/12219#discussion_r725422949



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -327,20 +367,25 @@ private void takeSnapshot() {
                 });
                 snapshot.setAborts(list);
             }
-            writer.writeAsync(snapshot).thenAccept((messageId) -> {
-                this.lastSnapshotTimestamps = System.currentTimeMillis();
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}]Transaction buffer take snapshot success! "
-                            + "messageId : {}", topic.getName(), messageId);
+
+            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+            writer.writeAsync(snapshot).whenComplete((v, e) -> {
+                if (e != null) {
+                    completableFuture.completeExceptionally(e);
+                    log.warn("[{}]Transaction buffer take snapshot fail! ", topic.getName(), e);
+                } else {
+                    this.lastSnapshotTimestamps = System.currentTimeMillis();
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}]Transaction buffer take snapshot success! "
+                                + "messageId : {}", topic.getName(), v);
+                    }
+
+                    completableFuture.complete(null);
                 }
-            }).exceptionally(e -> {
-                log.warn("[{}]Transaction buffer take snapshot fail! ", topic.getName(), e);
-                return null;
             });
+            return completableFuture;
         });
-    }
-
-    private void clearAbortedTransactions() {
+    }    private void clearAbortedTransactions() {

Review comment:
       format error

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
##########
@@ -246,4 +254,53 @@ public void testSubscriptionRecreateTopic()
 
     }
 
+
+    public void testTakeSnapshotBeforeFirstTxnMessageSend() throws Exception{
+        String topic = "persistent://" + NAMESPACE1 + "/testSnapShot";
+        admin.topics().createNonPartitionedTopic(topic);
+        PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0)
+                .getBrokerService().getTopic(topic, false)
+                .get().get();
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .producerName("testSnapshot").sendTimeout(0, TimeUnit.SECONDS)
+                .topic(topic).enableBatching(true)
+                .create();
+        ReaderBuilder<TransactionBufferSnapshot> readerBuilder = pulsarClient
+                .newReader(Schema.AVRO(TransactionBufferSnapshot.class))
+                .startMessageId(MessageId.latest)
+                .topic(NAMESPACE1 + "/" + EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
+        Reader<TransactionBufferSnapshot> reader= readerBuilder.create();
+
+        long waitSnapShotTime = getPulsarServiceList().get(0).getConfiguration()
+                .getTransactionBufferSnapshotMinTimeInMillis();
+
+        producer.newMessage(Schema.STRING).value("common message send").send();
+
+        Awaitility.await().atMost(waitSnapShotTime * 2, TimeUnit.MILLISECONDS)
+                .until(() -> {

Review comment:
       use untilAsserted

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferRecoverCallBack.java
##########
@@ -26,7 +26,7 @@
     /**
      * Topic transaction buffer recover complete.
      */
-    void recoverComplete();

Review comment:
       add a method named noNeedToRecover

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -163,21 +173,51 @@ public void recoverExceptionally(Exception e) {
     @Override
     public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) {
         CompletableFuture<Position> completableFuture = new CompletableFuture<>();
-        topic.getManagedLedger().asyncAddEntry(buffer, new AsyncCallbacks.AddEntryCallback() {
-            @Override
-            public void addComplete(Position position, ByteBuf entryData, Object ctx) {
-                synchronized (TopicTransactionBuffer.this) {
-                    handleTransactionMessage(txnId, position);
+        if (checkIfUnused()){
+            takeSnapshot().thenAccept(ignore -> {
+                topic.getManagedLedger().asyncAddEntry(buffer, new AsyncCallbacks.AddEntryCallback() {
+                    @Override
+                    public void addComplete(Position position, ByteBuf entryData, Object ctx) {
+                        synchronized (TopicTransactionBuffer.this) {
+                            handleTransactionMessage(txnId, position);
+                            if (!changeToReadyStateAfterUsed()){
+                                log.error("Fail to change state when add message with transaction at the first time.");
+                            }
+                            timer.newTimeout(TopicTransactionBuffer.this,
+                                    takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);

Review comment:
       can move to after takeSnapshot and before asyncAddEntry

##########
File path: pulsar-io/jdbc/lombok.config
##########
@@ -1,22 +0,0 @@
-#

Review comment:
       why delete this file?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou removed a comment on pull request #12219: [Transaction] Transaction buffer take snapshot max read position

Posted by GitBox <gi...@apache.org>.
liangyepianzhou removed a comment on pull request #12219:
URL: https://github.com/apache/pulsar/pull/12219#issuecomment-942342432


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou commented on pull request #12219: [Transaction] Transaction buffer take snapshot max read position

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on pull request #12219:
URL: https://github.com/apache/pulsar/pull/12219#issuecomment-941123562






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou commented on a change in pull request #12219: [Transaction] Transaction buffer take snapshot max read position

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on a change in pull request #12219:
URL: https://github.com/apache/pulsar/pull/12219#discussion_r728025076



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -163,6 +174,34 @@ public void recoverExceptionally(Exception e) {
     @Override
     public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) {
         CompletableFuture<Position> completableFuture = new CompletableFuture<>();
+        if (checkIfReady()){
+            addTxnEntry(completableFuture, txnId, buffer);
+        } else {
+            if (checkIfUnused() && changeToInitializingStateFromUnused()){
+                //`PulsarDecoder` will release this buffer  in `finally` and `takeSnapshot` is asynchronous
+                buffer.retain();
+                takeSnapshot().thenAccept(ignore -> {
+                    changeToReadyState();
+                    buffer.release();
+                    timer.newTimeout(TopicTransactionBuffer.this,
+                            takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
+                    log.info("Topic {} take snapshot successfully when uses TransactionBuffer at the first time",
+                            this.topic.getName());
+                }).exceptionally(exception -> {
+                    changeToUnUsedState();
+                    buffer.release();
+                    log.error("Topic {} fail to takeSnapshot before adding the first message with transaction",
+                            this.topic.getName(), exception);
+                    return null;
+                });
+            }
+        completableFuture.completeExceptionally(new TransactionBufferStatusException(this.topic.getName(),

Review comment:
       If the state of TransactionBuffer is not Ready, we hope so




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou commented on pull request #12219: [Transaction] Transaction buffer take snapshot max read position

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on pull request #12219:
URL: https://github.com/apache/pulsar/pull/12219#issuecomment-942407301


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou commented on pull request #12219: [Transaction] Transaction buffer take snapshot max read position

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on pull request #12219:
URL: https://github.com/apache/pulsar/pull/12219#issuecomment-941123562


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou commented on a change in pull request #12219: [Transaction] Transaction buffer take snapshot max read position

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on a change in pull request #12219:
URL: https://github.com/apache/pulsar/pull/12219#discussion_r728013459



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionBufferStatusException.java
##########
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.exceptions;
+
+
+import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
+
+public class TransactionBufferStatusException extends TransactionBufferException{

Review comment:
       TransactionBufferException has ten similar implementation classes. If you and @congbobo184 agree with this, I can mention another PR to do this later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #12219: [Transaction] Transaction buffer take snapshot max read position

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #12219:
URL: https://github.com/apache/pulsar/pull/12219#discussion_r727976752



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionBufferStatusException.java
##########
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.exceptions;
+
+
+import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
+
+public class TransactionBufferStatusException extends TransactionBufferException{

Review comment:
       It can be an inner class of TransactionBufferException?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -2981,12 +2981,16 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
                                     // Message has been successfully persisted
                                     messageDeduplication.recordMessagePersisted(publishContext,
                                             (PositionImpl) position);
-                                    publishContext.completed(null, ((PositionImpl) position).getLedgerId(),
-                                            ((PositionImpl) position).getEntryId());
+                                    publishContext.completed(null, position.getLedgerId(),
+                                            position.getEntryId());
 
                                     decrementPendingWriteOpsAndCheck();
                                 })
                                 .exceptionally(throwable -> {
+                                    throwable = throwable.getCause();
+                                    if (!(throwable instanceof ManagedLedgerException)){
+                                        throwable = new ManagedLedgerException(throwable);
+                                    }

Review comment:
       Why need to covert all exceptions to ManagedLedgerException? And it's better to add error log here, so that we can debug more easier

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java
##########
@@ -32,7 +32,8 @@
         None,
         Initializing,
         Ready,
-        Close
+        Close,
+        Unused

Review comment:
       Does `NoSnapshot` is more meaningful here?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -398,13 +451,14 @@ public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
         synchronized (TopicTransactionBuffer.this) {
             if (ongoingTxns.isEmpty()) {
                 maxReadPosition = position;
+                changeMaxReadPositionAndAddAbortTimes.incrementAndGet();
             }
         }
     }
 
     @Override
     public PositionImpl getMaxReadPosition() {
-        if (checkIfReady()) {
+        if (checkIfReady() || checkIfUnused()) {

Review comment:
       If the state is unused, we should return the latest?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -163,6 +174,34 @@ public void recoverExceptionally(Exception e) {
     @Override
     public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) {
         CompletableFuture<Position> completableFuture = new CompletableFuture<>();
+        if (checkIfReady()){

Review comment:
       ```suggestion
           if (checkIfReady()) {
   ```

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -163,6 +174,34 @@ public void recoverExceptionally(Exception e) {
     @Override
     public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) {
         CompletableFuture<Position> completableFuture = new CompletableFuture<>();
+        if (checkIfReady()){
+            addTxnEntry(completableFuture, txnId, buffer);
+        } else {
+            if (checkIfUnused() && changeToInitializingStateFromUnused()){
+                //`PulsarDecoder` will release this buffer  in `finally` and `takeSnapshot` is asynchronous
+                buffer.retain();
+                takeSnapshot().thenAccept(ignore -> {
+                    changeToReadyState();
+                    buffer.release();
+                    timer.newTimeout(TopicTransactionBuffer.this,
+                            takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
+                    log.info("Topic {} take snapshot successfully when uses TransactionBuffer at the first time",
+                            this.topic.getName());
+                }).exceptionally(exception -> {
+                    changeToUnUsedState();
+                    buffer.release();
+                    log.error("Topic {} fail to takeSnapshot before adding the first message with transaction",
+                            this.topic.getName(), exception);
+                    return null;
+                });
+            }
+        completableFuture.completeExceptionally(new TransactionBufferStatusException(this.topic.getName(),

Review comment:
       If we are taking the snapshot, all the messages publish with transaction will get exception here

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -102,11 +103,12 @@ public TopicTransactionBuffer(PersistentTopic topic, CompletableFuture<Void> tra
                 .getConfiguration().getTransactionBufferSnapshotMaxTransactionCount();
         this.takeSnapshotIntervalTime = topic.getBrokerService().getPulsar()
                 .getConfiguration().getTransactionBufferSnapshotMinTimeInMillis();
+        this.maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
         this.topic.getBrokerService().getPulsar().getTransactionReplayExecutor()
                 .execute(new TopicTransactionBufferRecover(new TopicTransactionBufferRecoverCallBack() {
                     @Override
                     public void recoverComplete() {
-                        if (!changeToReadyState()) {
+                        if (!changeToReadyState()){

Review comment:
       ```suggestion
                           if (!changeToReadyState()) {
   ```

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -163,6 +174,34 @@ public void recoverExceptionally(Exception e) {
     @Override
     public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) {
         CompletableFuture<Position> completableFuture = new CompletableFuture<>();
+        if (checkIfReady()){
+            addTxnEntry(completableFuture, txnId, buffer);
+        } else {
+            if (checkIfUnused() && changeToInitializingStateFromUnused()){
+                //`PulsarDecoder` will release this buffer  in `finally` and `takeSnapshot` is asynchronous
+                buffer.retain();
+                takeSnapshot().thenAccept(ignore -> {
+                    changeToReadyState();

Review comment:
       If the snapshot is created, we should add the txn entry? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou commented on a change in pull request #12219: [Transaction] Transaction buffer take snapshot max read position

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on a change in pull request #12219:
URL: https://github.com/apache/pulsar/pull/12219#discussion_r728019063



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java
##########
@@ -32,7 +32,8 @@
         None,
         Initializing,
         Ready,
-        Close
+        Close,
+        Unused

Review comment:
       Which will be submitted together in this update




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou commented on a change in pull request #12219: Transaction buffer take snapshot max read position

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on a change in pull request #12219:
URL: https://github.com/apache/pulsar/pull/12219#discussion_r724871882



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -308,36 +339,36 @@ private void takeSnapshotByTimeout() {
                 takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
     }
 
-    private void takeSnapshot() {
+    private CompletableFuture<Void> takeSnapshot() {
         changeMaxReadPositionAndAddAbortTimes.set(0);
-        takeSnapshotWriter.thenAccept(writer -> {
-            TransactionBufferSnapshot snapshot = new TransactionBufferSnapshot();
-            synchronized (TopicTransactionBuffer.this) {
-                snapshot.setTopicName(topic.getName());
-                snapshot.setMaxReadPositionLedgerId(maxReadPosition.getLedgerId());
-                snapshot.setMaxReadPositionEntryId(maxReadPosition.getEntryId());
-                List<AbortTxnMetadata> list = new ArrayList<>();
-                aborts.forEach((k, v) -> {
-                    AbortTxnMetadata abortTxnMetadata = new AbortTxnMetadata();
-                    abortTxnMetadata.setTxnIdMostBits(k.getMostSigBits());
-                    abortTxnMetadata.setTxnIdLeastBits(k.getLeastSigBits());
-                    abortTxnMetadata.setLedgerId(v.getLedgerId());
-                    abortTxnMetadata.setEntryId(v.getEntryId());
-                    list.add(abortTxnMetadata);
+        return takeSnapshotWriter.thenAccept(writer -> {
+                    TransactionBufferSnapshot snapshot = new TransactionBufferSnapshot();
+                    synchronized (TopicTransactionBuffer.this) {
+                        snapshot.setTopicName(topic.getName());
+                        snapshot.setMaxReadPositionLedgerId(maxReadPosition.getLedgerId());
+                        snapshot.setMaxReadPositionEntryId(maxReadPosition.getEntryId());
+                        List<AbortTxnMetadata> list = new ArrayList<>();
+                        aborts.forEach((k, v) -> {
+                            AbortTxnMetadata abortTxnMetadata = new AbortTxnMetadata();
+                            abortTxnMetadata.setTxnIdMostBits(k.getMostSigBits());
+                            abortTxnMetadata.setTxnIdLeastBits(k.getLeastSigBits());
+                            abortTxnMetadata.setLedgerId(v.getLedgerId());
+                            abortTxnMetadata.setEntryId(v.getEntryId());
+                            list.add(abortTxnMetadata);
+                        });
+                        snapshot.setAborts(list);
+                    }
+                    writer.writeAsync(snapshot).thenAccept((messageId) -> {
+                        this.lastSnapshotTimestamps = System.currentTimeMillis();
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}]Transaction buffer take snapshot success! "
+                                    + "messageId : {}", topic.getName(), messageId);
+                        }
+                    }).exceptionally(e -> {
+                        log.warn("[{}]Transaction buffer take snapshot fail! ", topic.getName(), e);
+                        return null;
+                    }).join();

Review comment:
       THX.Changes have been made to the shortcomings




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #12219: Transaction buffer take snapshot max read position

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #12219:
URL: https://github.com/apache/pulsar/pull/12219#discussion_r724863461



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -308,36 +339,36 @@ private void takeSnapshotByTimeout() {
                 takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
     }
 
-    private void takeSnapshot() {
+    private CompletableFuture<Void> takeSnapshot() {
         changeMaxReadPositionAndAddAbortTimes.set(0);
-        takeSnapshotWriter.thenAccept(writer -> {
-            TransactionBufferSnapshot snapshot = new TransactionBufferSnapshot();
-            synchronized (TopicTransactionBuffer.this) {
-                snapshot.setTopicName(topic.getName());
-                snapshot.setMaxReadPositionLedgerId(maxReadPosition.getLedgerId());
-                snapshot.setMaxReadPositionEntryId(maxReadPosition.getEntryId());
-                List<AbortTxnMetadata> list = new ArrayList<>();
-                aborts.forEach((k, v) -> {
-                    AbortTxnMetadata abortTxnMetadata = new AbortTxnMetadata();
-                    abortTxnMetadata.setTxnIdMostBits(k.getMostSigBits());
-                    abortTxnMetadata.setTxnIdLeastBits(k.getLeastSigBits());
-                    abortTxnMetadata.setLedgerId(v.getLedgerId());
-                    abortTxnMetadata.setEntryId(v.getEntryId());
-                    list.add(abortTxnMetadata);
+        return takeSnapshotWriter.thenAccept(writer -> {
+                    TransactionBufferSnapshot snapshot = new TransactionBufferSnapshot();
+                    synchronized (TopicTransactionBuffer.this) {
+                        snapshot.setTopicName(topic.getName());
+                        snapshot.setMaxReadPositionLedgerId(maxReadPosition.getLedgerId());
+                        snapshot.setMaxReadPositionEntryId(maxReadPosition.getEntryId());
+                        List<AbortTxnMetadata> list = new ArrayList<>();
+                        aborts.forEach((k, v) -> {
+                            AbortTxnMetadata abortTxnMetadata = new AbortTxnMetadata();
+                            abortTxnMetadata.setTxnIdMostBits(k.getMostSigBits());
+                            abortTxnMetadata.setTxnIdLeastBits(k.getLeastSigBits());
+                            abortTxnMetadata.setLedgerId(v.getLedgerId());
+                            abortTxnMetadata.setEntryId(v.getEntryId());
+                            list.add(abortTxnMetadata);
+                        });
+                        snapshot.setAborts(list);
+                    }
+                    writer.writeAsync(snapshot).thenAccept((messageId) -> {
+                        this.lastSnapshotTimestamps = System.currentTimeMillis();
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}]Transaction buffer take snapshot success! "
+                                    + "messageId : {}", topic.getName(), messageId);
+                        }
+                    }).exceptionally(e -> {
+                        log.warn("[{}]Transaction buffer take snapshot fail! ", topic.getName(), e);
+                        return null;
+                    }).join();

Review comment:
       this is bad practice,
   we are already returning a CompletableFuture, you can use thenCombine/theCompose




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #12219: Transaction buffer take snapshot max read position

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #12219:
URL: https://github.com/apache/pulsar/pull/12219#issuecomment-938534390


   There are conflicts to be resolved


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou commented on a change in pull request #12219: [Transaction] Transaction buffer take snapshot max read position

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on a change in pull request #12219:
URL: https://github.com/apache/pulsar/pull/12219#discussion_r728010473



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -2981,12 +2981,16 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
                                     // Message has been successfully persisted
                                     messageDeduplication.recordMessagePersisted(publishContext,
                                             (PositionImpl) position);
-                                    publishContext.completed(null, ((PositionImpl) position).getLedgerId(),
-                                            ((PositionImpl) position).getEntryId());
+                                    publishContext.completed(null, position.getLedgerId(),
+                                            position.getEntryId());
 
                                     decrementPendingWriteOpsAndCheck();
                                 })
                                 .exceptionally(throwable -> {
+                                    throwable = throwable.getCause();
+                                    if (!(throwable instanceof ManagedLedgerException)){
+                                        throwable = new ManagedLedgerException(throwable);
+                                    }

Review comment:
       The methoda of ddFailed() needs to accept ManagedLedgerException as a parameter, and transaction will return exceptions that are not ManagedLedgerException in two places. So unified packaging here

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionBufferStatusException.java
##########
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.exceptions;
+
+
+import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
+
+public class TransactionBufferStatusException extends TransactionBufferException{

Review comment:
       TransactionBufferException has ten similar implementation classes. If you and @congbobo184 agree with this, I can mention another PR to do this later.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -398,13 +451,14 @@ public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
         synchronized (TopicTransactionBuffer.this) {
             if (ongoingTxns.isEmpty()) {
                 maxReadPosition = position;
+                changeMaxReadPositionAndAddAbortTimes.incrementAndGet();
             }
         }
     }
 
     @Override
     public PositionImpl getMaxReadPosition() {
-        if (checkIfReady()) {
+        if (checkIfReady() || checkIfUnused()) {

Review comment:
       If it is unused, it will return lastConfirmed

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java
##########
@@ -32,7 +32,8 @@
         None,
         Initializing,
         Ready,
-        Close
+        Close,
+        Unused

Review comment:
       Thx, I have adopted your suggestion. 

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java
##########
@@ -32,7 +32,8 @@
         None,
         Initializing,
         Ready,
-        Close
+        Close,
+        Unused

Review comment:
       Which will be submitted together in this update

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -163,6 +174,34 @@ public void recoverExceptionally(Exception e) {
     @Override
     public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) {
         CompletableFuture<Position> completableFuture = new CompletableFuture<>();
+        if (checkIfReady()){
+            addTxnEntry(completableFuture, txnId, buffer);
+        } else {
+            if (checkIfUnused() && changeToInitializingStateFromUnused()){
+                //`PulsarDecoder` will release this buffer  in `finally` and `takeSnapshot` is asynchronous
+                buffer.retain();
+                takeSnapshot().thenAccept(ignore -> {
+                    changeToReadyState();

Review comment:
       In order to ensure the orderliness of the messages and no ambiguity, we hope that the first transaction will directly fail, and then the client will resend the requests.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -163,6 +174,34 @@ public void recoverExceptionally(Exception e) {
     @Override
     public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) {
         CompletableFuture<Position> completableFuture = new CompletableFuture<>();
+        if (checkIfReady()){
+            addTxnEntry(completableFuture, txnId, buffer);
+        } else {
+            if (checkIfUnused() && changeToInitializingStateFromUnused()){
+                //`PulsarDecoder` will release this buffer  in `finally` and `takeSnapshot` is asynchronous
+                buffer.retain();
+                takeSnapshot().thenAccept(ignore -> {
+                    changeToReadyState();
+                    buffer.release();
+                    timer.newTimeout(TopicTransactionBuffer.this,
+                            takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
+                    log.info("Topic {} take snapshot successfully when uses TransactionBuffer at the first time",
+                            this.topic.getName());
+                }).exceptionally(exception -> {
+                    changeToUnUsedState();
+                    buffer.release();
+                    log.error("Topic {} fail to takeSnapshot before adding the first message with transaction",
+                            this.topic.getName(), exception);
+                    return null;
+                });
+            }
+        completableFuture.completeExceptionally(new TransactionBufferStatusException(this.topic.getName(),

Review comment:
       If the state of TransactionBuffer is not Ready, we hope so

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java
##########
@@ -32,7 +32,8 @@
         None,
         Initializing,
         Ready,
-        Close
+        Close,
+        Unused

Review comment:
       Which will be submitted together in this update

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java
##########
@@ -32,7 +32,8 @@
         None,
         Initializing,
         Ready,
-        Close
+        Close,
+        Unused

Review comment:
       Thx, I have adopted your suggestion which will be submitted together in this update.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #12219: [Transaction] Transaction buffer take snapshot max read position

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #12219:
URL: https://github.com/apache/pulsar/pull/12219#discussion_r727976752



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionBufferStatusException.java
##########
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.exceptions;
+
+
+import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
+
+public class TransactionBufferStatusException extends TransactionBufferException{

Review comment:
       It can be an inner class of TransactionBufferException?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -2981,12 +2981,16 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
                                     // Message has been successfully persisted
                                     messageDeduplication.recordMessagePersisted(publishContext,
                                             (PositionImpl) position);
-                                    publishContext.completed(null, ((PositionImpl) position).getLedgerId(),
-                                            ((PositionImpl) position).getEntryId());
+                                    publishContext.completed(null, position.getLedgerId(),
+                                            position.getEntryId());
 
                                     decrementPendingWriteOpsAndCheck();
                                 })
                                 .exceptionally(throwable -> {
+                                    throwable = throwable.getCause();
+                                    if (!(throwable instanceof ManagedLedgerException)){
+                                        throwable = new ManagedLedgerException(throwable);
+                                    }

Review comment:
       Why need to covert all exceptions to ManagedLedgerException? And it's better to add error log here, so that we can debug more easier

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java
##########
@@ -32,7 +32,8 @@
         None,
         Initializing,
         Ready,
-        Close
+        Close,
+        Unused

Review comment:
       Does `NoSnapshot` is more meaningful here?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -398,13 +451,14 @@ public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
         synchronized (TopicTransactionBuffer.this) {
             if (ongoingTxns.isEmpty()) {
                 maxReadPosition = position;
+                changeMaxReadPositionAndAddAbortTimes.incrementAndGet();
             }
         }
     }
 
     @Override
     public PositionImpl getMaxReadPosition() {
-        if (checkIfReady()) {
+        if (checkIfReady() || checkIfUnused()) {

Review comment:
       If the state is unused, we should return the latest?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -163,6 +174,34 @@ public void recoverExceptionally(Exception e) {
     @Override
     public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) {
         CompletableFuture<Position> completableFuture = new CompletableFuture<>();
+        if (checkIfReady()){

Review comment:
       ```suggestion
           if (checkIfReady()) {
   ```

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -163,6 +174,34 @@ public void recoverExceptionally(Exception e) {
     @Override
     public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) {
         CompletableFuture<Position> completableFuture = new CompletableFuture<>();
+        if (checkIfReady()){
+            addTxnEntry(completableFuture, txnId, buffer);
+        } else {
+            if (checkIfUnused() && changeToInitializingStateFromUnused()){
+                //`PulsarDecoder` will release this buffer  in `finally` and `takeSnapshot` is asynchronous
+                buffer.retain();
+                takeSnapshot().thenAccept(ignore -> {
+                    changeToReadyState();
+                    buffer.release();
+                    timer.newTimeout(TopicTransactionBuffer.this,
+                            takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
+                    log.info("Topic {} take snapshot successfully when uses TransactionBuffer at the first time",
+                            this.topic.getName());
+                }).exceptionally(exception -> {
+                    changeToUnUsedState();
+                    buffer.release();
+                    log.error("Topic {} fail to takeSnapshot before adding the first message with transaction",
+                            this.topic.getName(), exception);
+                    return null;
+                });
+            }
+        completableFuture.completeExceptionally(new TransactionBufferStatusException(this.topic.getName(),

Review comment:
       If we are taking the snapshot, all the messages publish with transaction will get exception here

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -102,11 +103,12 @@ public TopicTransactionBuffer(PersistentTopic topic, CompletableFuture<Void> tra
                 .getConfiguration().getTransactionBufferSnapshotMaxTransactionCount();
         this.takeSnapshotIntervalTime = topic.getBrokerService().getPulsar()
                 .getConfiguration().getTransactionBufferSnapshotMinTimeInMillis();
+        this.maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
         this.topic.getBrokerService().getPulsar().getTransactionReplayExecutor()
                 .execute(new TopicTransactionBufferRecover(new TopicTransactionBufferRecoverCallBack() {
                     @Override
                     public void recoverComplete() {
-                        if (!changeToReadyState()) {
+                        if (!changeToReadyState()){

Review comment:
       ```suggestion
                           if (!changeToReadyState()) {
   ```

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -163,6 +174,34 @@ public void recoverExceptionally(Exception e) {
     @Override
     public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) {
         CompletableFuture<Position> completableFuture = new CompletableFuture<>();
+        if (checkIfReady()){
+            addTxnEntry(completableFuture, txnId, buffer);
+        } else {
+            if (checkIfUnused() && changeToInitializingStateFromUnused()){
+                //`PulsarDecoder` will release this buffer  in `finally` and `takeSnapshot` is asynchronous
+                buffer.retain();
+                takeSnapshot().thenAccept(ignore -> {
+                    changeToReadyState();

Review comment:
       If the snapshot is created, we should add the txn entry? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou commented on a change in pull request #12219: [Transaction] Transaction buffer take snapshot max read position

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on a change in pull request #12219:
URL: https://github.com/apache/pulsar/pull/12219#discussion_r728018227



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java
##########
@@ -32,7 +32,8 @@
         None,
         Initializing,
         Ready,
-        Close
+        Close,
+        Unused

Review comment:
       Thx, I have adopted your suggestion which will be submitted together in this update.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #12219: [Transaction] Transaction buffer take snapshot max read position

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #12219:
URL: https://github.com/apache/pulsar/pull/12219


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #12219: [Transaction] Transaction buffer take snapshot max read position

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #12219:
URL: https://github.com/apache/pulsar/pull/12219#issuecomment-944236175


   This is a wire protocol change, let's commit it in 2.9
   
   committing as soon as CI passes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou commented on pull request #12219: [Transaction] Transaction buffer take snapshot max read position

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on pull request #12219:
URL: https://github.com/apache/pulsar/pull/12219#issuecomment-941024311


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou removed a comment on pull request #12219: [Transaction] Transaction buffer take snapshot max read position

Posted by GitBox <gi...@apache.org>.
liangyepianzhou removed a comment on pull request #12219:
URL: https://github.com/apache/pulsar/pull/12219#issuecomment-942342432


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou commented on a change in pull request #12219: [Transaction] Transaction buffer take snapshot max read position

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on a change in pull request #12219:
URL: https://github.com/apache/pulsar/pull/12219#discussion_r728014751



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -398,13 +451,14 @@ public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
         synchronized (TopicTransactionBuffer.this) {
             if (ongoingTxns.isEmpty()) {
                 maxReadPosition = position;
+                changeMaxReadPositionAndAddAbortTimes.incrementAndGet();
             }
         }
     }
 
     @Override
     public PositionImpl getMaxReadPosition() {
-        if (checkIfReady()) {
+        if (checkIfReady() || checkIfUnused()) {

Review comment:
       If it is unused, it will return lastConfirmed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] liangyepianzhou commented on a change in pull request #12219: [Transaction] Transaction buffer take snapshot max read position

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on a change in pull request #12219:
URL: https://github.com/apache/pulsar/pull/12219#discussion_r728020863



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -163,6 +174,34 @@ public void recoverExceptionally(Exception e) {
     @Override
     public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) {
         CompletableFuture<Position> completableFuture = new CompletableFuture<>();
+        if (checkIfReady()){
+            addTxnEntry(completableFuture, txnId, buffer);
+        } else {
+            if (checkIfUnused() && changeToInitializingStateFromUnused()){
+                //`PulsarDecoder` will release this buffer  in `finally` and `takeSnapshot` is asynchronous
+                buffer.retain();
+                takeSnapshot().thenAccept(ignore -> {
+                    changeToReadyState();

Review comment:
       In order to ensure the orderliness of the messages and no ambiguity, we hope that the first transaction will directly fail, and then the client will resend the requests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org