You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/12/09 10:50:16 UTC

[GitHub] [pulsar] congbobo184 opened a new pull request #13209: [Transaction] Fix transaction sequenceId generate error.

congbobo184 opened a new pull request #13209:
URL: https://github.com/apache/pulsar/pull/13209


   ## Motivation
   now, we recover transaction sequenceId from lastConfirmEntry or managedLedger properties. But update managedLedger properties is not a Synchronize op, so it will recover error sequenceId. 
   
   ### Modifications
   use ManagedLedgerInterceptor will fix this problem.
   1. use interceptor.onUpdateManagedLedgerInfo, when ledger roll over, it will update current sequenceId to managedLedger properties.
   2. use interceptor.onManagedLedgerPropertiesInitialize, first we will recover sequenceId from managedLedger properties.
   3. use interceptor.onManagedLedgerLastLedgerInitialize, when tc recover and has a effective lastConfirmEntry, we will change the sequenceID after interceptor.onManagedLedgerPropertiesInitialize
   4. If a new TC, it can't recover from interceptor.onManagedLedgerPropertiesInitialize and interceptor.onManagedLedgerLastLedgerInitialize, it will use the initial sequenceId -1
   ### Verifying this change
   Add the tests for it
   
   Does this pull request potentially affect one of the following parts:
   If yes was chosen, please highlight the changes
   
   Dependencies (does it add or upgrade a dependency): (no)
   The public API: (no)
   The schema: (no)
   The default values of configurations: (no)
   The wire protocol: (no)
   The rest endpoints: (no)
   The admin cli options: (no)
   Anything that affects deployment: (no)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #13209: [Transaction] Fix transaction sequenceId generate error.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #13209:
URL: https://github.com/apache/pulsar/pull/13209#discussion_r766489831



##########
File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogInterceptor.java
##########
@@ -18,46 +18,87 @@
  */
 package org.apache.pulsar.transaction.coordinator.impl;
 
+import io.netty.buffer.ByteBuf;
+import lombok.Getter;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.mledger.impl.OpAddEntry;
 import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
+import org.apache.pulsar.transaction.coordinator.proto.TransactionMetadataEntry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Store max sequenceID in ManagedLedger properties, in order to recover transaction log.
  */
 public class MLTransactionLogInterceptor implements ManagedLedgerInterceptor {
 
     private static final Logger log = LoggerFactory.getLogger(MLTransactionLogInterceptor.class);
+    private static final long TC_ID_NOT_USED = -1L;
     public static final String MAX_LOCAL_TXN_ID = "max_local_txn_id";
-
-    private volatile long maxLocalTxnId = -1;
+    @Getter
+    private final AtomicLong sequenceId = new AtomicLong(TC_ID_NOT_USED);

Review comment:
       We'd better return long not AtomicLong for the getter method.

##########
File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogInterceptor.java
##########
@@ -18,46 +18,87 @@
  */
 package org.apache.pulsar.transaction.coordinator.impl;
 
+import io.netty.buffer.ByteBuf;
+import lombok.Getter;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.mledger.impl.OpAddEntry;
 import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
+import org.apache.pulsar.transaction.coordinator.proto.TransactionMetadataEntry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Store max sequenceID in ManagedLedger properties, in order to recover transaction log.
  */
 public class MLTransactionLogInterceptor implements ManagedLedgerInterceptor {
 
     private static final Logger log = LoggerFactory.getLogger(MLTransactionLogInterceptor.class);
+    private static final long TC_ID_NOT_USED = -1L;
     public static final String MAX_LOCAL_TXN_ID = "max_local_txn_id";
-
-    private volatile long maxLocalTxnId = -1;
+    @Getter
+    private final AtomicLong sequenceId = new AtomicLong(TC_ID_NOT_USED);
 
     @Override
     public OpAddEntry beforeAddEntry(OpAddEntry op, int numberOfMessages) {
-        return null;
+        return op;
     }
 
+    // When all of ledger have been deleted, we will generate sequenceId from managedLedger properties
     @Override
     public void onManagedLedgerPropertiesInitialize(Map<String, String> propertiesMap) {
+        if (propertiesMap == null || propertiesMap.size() == 0) {
+            return;
+        }
 
+        if (propertiesMap.containsKey(MAX_LOCAL_TXN_ID)) {
+            sequenceId.set(Long.parseLong(propertiesMap.get(MAX_LOCAL_TXN_ID)));
+        }
     }
 
+    // When we don't roll over ledger, we can init sequenceId from the getLastAddConfirmed transaction metadata entry
     @Override
-    public CompletableFuture<Void> onManagedLedgerLastLedgerInitialize(String name, LedgerHandle ledgerHandle) {
-        return CompletableFuture.completedFuture(null);
+    public CompletableFuture<Void> onManagedLedgerLastLedgerInitialize(String name, LedgerHandle lh) {
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        if (lh.getLastAddConfirmed() >= 0) {
+            lh.readAsync(lh.getLastAddConfirmed(), lh.getLastAddConfirmed()).whenComplete((entries, ex) -> {
+                if (ex != null) {
+                    log.error("[{}] Read last entry error.", name, ex);
+                    promise.completeExceptionally(ex);
+                } else {
+                    if (entries != null) {
+                        try {
+                            LedgerEntry ledgerEntry = entries.getEntry(lh.getLastAddConfirmed());

Review comment:
       The`ledgerEntry`should be closed after getting the max local txn ID.

##########
File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogInterceptor.java
##########
@@ -18,46 +18,87 @@
  */
 package org.apache.pulsar.transaction.coordinator.impl;
 
+import io.netty.buffer.ByteBuf;
+import lombok.Getter;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.mledger.impl.OpAddEntry;
 import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
+import org.apache.pulsar.transaction.coordinator.proto.TransactionMetadataEntry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Store max sequenceID in ManagedLedger properties, in order to recover transaction log.
  */
 public class MLTransactionLogInterceptor implements ManagedLedgerInterceptor {
 
     private static final Logger log = LoggerFactory.getLogger(MLTransactionLogInterceptor.class);
+    private static final long TC_ID_NOT_USED = -1L;
     public static final String MAX_LOCAL_TXN_ID = "max_local_txn_id";
-
-    private volatile long maxLocalTxnId = -1;
+    @Getter
+    private final AtomicLong sequenceId = new AtomicLong(TC_ID_NOT_USED);
 
     @Override
     public OpAddEntry beforeAddEntry(OpAddEntry op, int numberOfMessages) {
-        return null;
+        return op;
     }
 
+    // When all of ledger have been deleted, we will generate sequenceId from managedLedger properties
     @Override
     public void onManagedLedgerPropertiesInitialize(Map<String, String> propertiesMap) {
+        if (propertiesMap == null || propertiesMap.size() == 0) {
+            return;
+        }
 
+        if (propertiesMap.containsKey(MAX_LOCAL_TXN_ID)) {
+            sequenceId.set(Long.parseLong(propertiesMap.get(MAX_LOCAL_TXN_ID)));
+        }
     }
 
+    // When we don't roll over ledger, we can init sequenceId from the getLastAddConfirmed transaction metadata entry
     @Override
-    public CompletableFuture<Void> onManagedLedgerLastLedgerInitialize(String name, LedgerHandle ledgerHandle) {
-        return CompletableFuture.completedFuture(null);
+    public CompletableFuture<Void> onManagedLedgerLastLedgerInitialize(String name, LedgerHandle lh) {
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        if (lh.getLastAddConfirmed() >= 0) {
+            lh.readAsync(lh.getLastAddConfirmed(), lh.getLastAddConfirmed()).whenComplete((entries, ex) -> {
+                if (ex != null) {
+                    log.error("[{}] Read last entry error.", name, ex);
+                    promise.completeExceptionally(ex);
+                } else {
+                    if (entries != null) {
+                        try {
+                            LedgerEntry ledgerEntry = entries.getEntry(lh.getLastAddConfirmed());
+                            if (ledgerEntry != null) {
+                                TransactionMetadataEntry lastConfirmEntry = new TransactionMetadataEntry();
+                                ByteBuf buffer = ledgerEntry.getEntryBuffer();

Review comment:
       The buffer should be released after getting the max local txn ID.

##########
File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
##########
@@ -42,10 +42,14 @@
                                                                  ManagedLedgerConfig managedLedgerConfig,
                                                                  TransactionTimeoutTracker timeoutTracker,
                                                                  TransactionRecoverTracker recoverTracker) {
+        MLTransactionLogInterceptor mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
+        managedLedgerConfig.setManagedLedgerInterceptor(new MLTransactionLogInterceptor());
         MLTransactionLogImpl txnLog = new MLTransactionLogImpl(transactionCoordinatorId,
                 managedLedgerFactory, managedLedgerConfig);
 
+        // MLTransactionLogInterceptor will init sequenceId and update the sequenceId to managedLedger properties.
         return txnLog.initialize().thenApply(__ ->
-                new MLTransactionMetadataStore(transactionCoordinatorId, txnLog, timeoutTracker, recoverTracker));
+                new MLTransactionMetadataStore(transactionCoordinatorId, txnLog, timeoutTracker,
+                        recoverTracker, mlTransactionLogInterceptor.getSequenceId()));

Review comment:
       It's better to introduce modify sequence ID method for `mlTransactionLogInterceptor`, share AtomicLong across multiple instances is not good for maintainance




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] github-actions[bot] commented on pull request #13209: [Transaction] Fix transaction sequenceId generate error.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13209:
URL: https://github.com/apache/pulsar/pull/13209#issuecomment-989737114


   @congbobo184:Thanks for your contribution. For this PR, do we need to update docs?
   (The [PR template contains info about doc](https://github.com/apache/pulsar/blob/master/.github/PULL_REQUEST_TEMPLATE.md#documentation), which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] github-actions[bot] commented on pull request #13209: [Transaction] Fix transaction sequenceId generate error.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13209:
URL: https://github.com/apache/pulsar/pull/13209#issuecomment-989754219


   @congbobo184:Thanks for providing doc info!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #13209: [Transaction] Fix transaction sequenceId generate error.

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #13209:
URL: https://github.com/apache/pulsar/pull/13209#issuecomment-989872114


   > is this change compatible with existing Pulsar clusters ?
   
   It will not.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org