You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/09/02 19:41:32 UTC

[GitHub] [pulsar] gaoran10 commented on a change in pull request #11091: [Transaction] Transaction pending ack lazy init.

gaoran10 commented on a change in pull request #11091:
URL: https://github.com/apache/pulsar/pull/11091#discussion_r701340295



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java
##########
@@ -44,16 +44,19 @@ public MLPendingAckReplyCallBack(PendingAckHandleImpl pendingAckHandle) {
 
     @Override
     public void replayComplete() {
-        log.info("Topic name : [{}], SubName : [{}] pending ack state reply success!",
-                pendingAckHandle.getTopicName(), pendingAckHandle.getSubName());
-
-        if (pendingAckHandle.changeToReadyState()) {
-            pendingAckHandle.completeHandleFuture();
+        synchronized (pendingAckHandle) {
             log.info("Topic name : [{}], SubName : [{}] pending ack state reply success!",
                     pendingAckHandle.getTopicName(), pendingAckHandle.getSubName());
-        } else {
-            log.error("Topic name : [{}], SubName : [{}] pending ack state reply fail!",
-                    pendingAckHandle.getTopicName(), pendingAckHandle.getSubName());
+
+            if (pendingAckHandle.changeToReadyState()) {
+                pendingAckHandle.completeHandleFuture();
+                pendingAckHandle.handleCacheRequest();
+                log.info("Topic name : [{}], SubName : [{}] pending ack state reply success!",
+                        pendingAckHandle.getTopicName(), pendingAckHandle.getSubName());
+            } else {
+                log.error("Topic name : [{}], SubName : [{}] pending ack state reply fail!",

Review comment:
       If the pending ack handle relay failed, how to reply again?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -101,36 +103,96 @@
 
     private final PersistentSubscription persistentSubscription;
 
-    private final CompletableFuture<PendingAckStore> pendingAckStoreFuture;
+    private CompletableFuture<PendingAckStore> pendingAckStoreFuture;
 
     private final CompletableFuture<PendingAckHandle> pendingAckHandleCompletableFuture = new CompletableFuture<>();
 
+    private final TransactionPendingAckStoreProvider pendingAckStoreProvider;
+
+    private final BlockingQueue<Runnable> acceptQueue = new LinkedBlockingDeque<>();
+
     public PendingAckHandleImpl(PersistentSubscription persistentSubscription) {
         super(State.None);
         this.topicName = persistentSubscription.getTopicName();
         this.subName = persistentSubscription.getName();
         this.persistentSubscription = persistentSubscription;
 
-        TransactionPendingAckStoreProvider pendingAckStoreProvider =
-                ((PersistentTopic) this.persistentSubscription.getTopic())
+        this.pendingAckStoreProvider = ((PersistentTopic) this.persistentSubscription.getTopic())
                         .getBrokerService().getPulsar().getTransactionPendingAckStoreProvider();
-        this.pendingAckStoreFuture =
-                pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
-
-        this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
-            changeToInitializingState();
-            pendingAckStore.replayAsync(this,
-                    ((PersistentTopic) persistentSubscription.getTopic()).getBrokerService()
-                            .getPulsar().getTransactionReplayExecutor());
-        }).exceptionally(e -> {
-            log.error("PendingAckHandleImpl init fail! TopicName : {}, SubName: {}", topicName, subName, e);
-            return null;
+        pendingAckStoreProvider.checkInitializedBefore(persistentSubscription).thenAccept(init -> {
+            if (init) {
+                initPendingAckStore();
+            } else {
+                completeHandleFuture();
+            }
         });
     }
 
+    private void initPendingAckStore() {
+        if (changeToInitializingState()) {
+            synchronized (PendingAckHandleImpl.this) {

Review comment:
       I'm not sure why need this lock, it seems that only one thread will perform the replay operation because only one thread could change the state successfully.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -300,11 +402,36 @@ public PendingAckHandleImpl(PersistentSubscription persistentSubscription) {
         return completableFuture;
     }
 
+    private void addCommitTxnRequest(TxnID txnId, Map<String, Long> properties, long lowWaterMark,
+                                    CompletableFuture<Void> completableFuture) {
+        acceptQueue.add(() -> commitTxn(txnId, properties, lowWaterMark, true).thenAccept(v ->
+                completableFuture.complete(null)).exceptionally(e -> {
+            completableFuture.completeExceptionally(e);
+            return null;
+        }));
+    }
+
     @Override
     public synchronized CompletableFuture<Void> commitTxn(TxnID txnID, Map<String, Long> properties,
-                                                          long lowWaterMark) {
+                                                          long lowWaterMark, boolean isInCacheRequest) {

Review comment:
       It seems that the param `isInCacheRequest` isn't used, do we need to add this param?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java
##########
@@ -44,16 +44,19 @@ public MLPendingAckReplyCallBack(PendingAckHandleImpl pendingAckHandle) {
 
     @Override
     public void replayComplete() {
-        log.info("Topic name : [{}], SubName : [{}] pending ack state reply success!",
-                pendingAckHandle.getTopicName(), pendingAckHandle.getSubName());
-
-        if (pendingAckHandle.changeToReadyState()) {
-            pendingAckHandle.completeHandleFuture();
+        synchronized (pendingAckHandle) {
             log.info("Topic name : [{}], SubName : [{}] pending ack state reply success!",
                     pendingAckHandle.getTopicName(), pendingAckHandle.getSubName());
-        } else {
-            log.error("Topic name : [{}], SubName : [{}] pending ack state reply fail!",
-                    pendingAckHandle.getTopicName(), pendingAckHandle.getSubName());
+
+            if (pendingAckHandle.changeToReadyState()) {
+                pendingAckHandle.completeHandleFuture();
+                pendingAckHandle.handleCacheRequest();
+                log.info("Topic name : [{}], SubName : [{}] pending ack state reply success!",

Review comment:
       It seems that this log is the same as the above.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -732,7 +888,14 @@ public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID)
 
     @Override
     public CompletableFuture<Void> close() {
-        return this.pendingAckStoreFuture.thenAccept(PendingAckStore::closeAsync);
+        changeToCloseState();
+        synchronized (PendingAckHandleImpl.this) {

Review comment:
       Do we need this lock?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -101,36 +103,96 @@
 
     private final PersistentSubscription persistentSubscription;
 
-    private final CompletableFuture<PendingAckStore> pendingAckStoreFuture;
+    private CompletableFuture<PendingAckStore> pendingAckStoreFuture;
 
     private final CompletableFuture<PendingAckHandle> pendingAckHandleCompletableFuture = new CompletableFuture<>();
 
+    private final TransactionPendingAckStoreProvider pendingAckStoreProvider;
+
+    private final BlockingQueue<Runnable> acceptQueue = new LinkedBlockingDeque<>();
+
     public PendingAckHandleImpl(PersistentSubscription persistentSubscription) {
         super(State.None);
         this.topicName = persistentSubscription.getTopicName();
         this.subName = persistentSubscription.getName();
         this.persistentSubscription = persistentSubscription;
 
-        TransactionPendingAckStoreProvider pendingAckStoreProvider =
-                ((PersistentTopic) this.persistentSubscription.getTopic())
+        this.pendingAckStoreProvider = ((PersistentTopic) this.persistentSubscription.getTopic())
                         .getBrokerService().getPulsar().getTransactionPendingAckStoreProvider();
-        this.pendingAckStoreFuture =
-                pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
-
-        this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
-            changeToInitializingState();
-            pendingAckStore.replayAsync(this,
-                    ((PersistentTopic) persistentSubscription.getTopic()).getBrokerService()
-                            .getPulsar().getTransactionReplayExecutor());
-        }).exceptionally(e -> {
-            log.error("PendingAckHandleImpl init fail! TopicName : {}, SubName: {}", topicName, subName, e);
-            return null;
+        pendingAckStoreProvider.checkInitializedBefore(persistentSubscription).thenAccept(init -> {
+            if (init) {
+                initPendingAckStore();
+            } else {
+                completeHandleFuture();
+            }
         });
     }
 
+    private void initPendingAckStore() {
+        if (changeToInitializingState()) {
+            synchronized (PendingAckHandleImpl.this) {
+                if (!checkIfClose()) {
+                    this.pendingAckStoreFuture =
+                            pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
+                    this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
+                        pendingAckStore.replayAsync(this,
+                                ((PersistentTopic) persistentSubscription.getTopic()).getBrokerService()
+                                        .getPulsar().getTransactionReplayExecutor());
+                    }).exceptionally(e -> {
+                        acceptQueue.clear();
+                        changeToErrorState();
+                        log.error("PendingAckHandleImpl init fail! TopicName : {}, SubName: {}", topicName, subName, e);
+                        return null;
+                    });
+                }
+            }
+        }
+    }
+
+    private void addIndividualAcknowledgeMessageRequest(TxnID txnID,
+                                                        List<MutablePair<PositionImpl, Integer>> positions,
+                                                        CompletableFuture<Void> completableFuture) {
+        acceptQueue.add(() -> individualAcknowledgeMessage(txnID, positions, true).thenAccept(v ->
+                completableFuture.complete(null)).exceptionally(e -> {
+            completableFuture.completeExceptionally(e);
+            return null;
+        }));
+    }
+
     @Override
     public CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID,
-                                                                List<MutablePair<PositionImpl, Integer>> positions) {
+                                                                List<MutablePair<PositionImpl, Integer>> positions,
+                                                                boolean isInCacheRequest) {
+        if (!isInCacheRequest) {
+            if (!checkIfReady()) {
+                synchronized (PendingAckHandleImpl.this) {
+                    if (state == State.Initializing) {

Review comment:
       It seems that this check is repeated many times, maybe we could add a method to make this check.
   
   such as
   ```
   public boolean needCacheRequest() {
     if (state == State.Initializing ||) {
       return true;
     } else if (state == State.None) {
       initPendingAckStore();
       return true;
     } else if (checkIfReady()) {
       // do nothing
     } else {
       return FutureUtil.failedFuture(
              new ServiceUnitNotReadyException("PendingAckHandle replay failed!"));
     }
   }
   ```

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -101,36 +103,96 @@
 
     private final PersistentSubscription persistentSubscription;
 
-    private final CompletableFuture<PendingAckStore> pendingAckStoreFuture;
+    private CompletableFuture<PendingAckStore> pendingAckStoreFuture;
 
     private final CompletableFuture<PendingAckHandle> pendingAckHandleCompletableFuture = new CompletableFuture<>();
 
+    private final TransactionPendingAckStoreProvider pendingAckStoreProvider;
+
+    private final BlockingQueue<Runnable> acceptQueue = new LinkedBlockingDeque<>();
+
     public PendingAckHandleImpl(PersistentSubscription persistentSubscription) {
         super(State.None);
         this.topicName = persistentSubscription.getTopicName();
         this.subName = persistentSubscription.getName();
         this.persistentSubscription = persistentSubscription;
 
-        TransactionPendingAckStoreProvider pendingAckStoreProvider =
-                ((PersistentTopic) this.persistentSubscription.getTopic())
+        this.pendingAckStoreProvider = ((PersistentTopic) this.persistentSubscription.getTopic())
                         .getBrokerService().getPulsar().getTransactionPendingAckStoreProvider();
-        this.pendingAckStoreFuture =
-                pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
-
-        this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
-            changeToInitializingState();
-            pendingAckStore.replayAsync(this,
-                    ((PersistentTopic) persistentSubscription.getTopic()).getBrokerService()
-                            .getPulsar().getTransactionReplayExecutor());
-        }).exceptionally(e -> {
-            log.error("PendingAckHandleImpl init fail! TopicName : {}, SubName: {}", topicName, subName, e);
-            return null;
+        pendingAckStoreProvider.checkInitializedBefore(persistentSubscription).thenAccept(init -> {
+            if (init) {
+                initPendingAckStore();
+            } else {
+                completeHandleFuture();
+            }
         });
     }
 
+    private void initPendingAckStore() {
+        if (changeToInitializingState()) {
+            synchronized (PendingAckHandleImpl.this) {
+                if (!checkIfClose()) {
+                    this.pendingAckStoreFuture =
+                            pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
+                    this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
+                        pendingAckStore.replayAsync(this,
+                                ((PersistentTopic) persistentSubscription.getTopic()).getBrokerService()
+                                        .getPulsar().getTransactionReplayExecutor());
+                    }).exceptionally(e -> {
+                        acceptQueue.clear();
+                        changeToErrorState();
+                        log.error("PendingAckHandleImpl init fail! TopicName : {}, SubName: {}", topicName, subName, e);
+                        return null;
+                    });
+                }
+            }
+        }
+    }
+
+    private void addIndividualAcknowledgeMessageRequest(TxnID txnID,
+                                                        List<MutablePair<PositionImpl, Integer>> positions,
+                                                        CompletableFuture<Void> completableFuture) {
+        acceptQueue.add(() -> individualAcknowledgeMessage(txnID, positions, true).thenAccept(v ->
+                completableFuture.complete(null)).exceptionally(e -> {
+            completableFuture.completeExceptionally(e);
+            return null;
+        }));
+    }
+
     @Override
     public CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID,
-                                                                List<MutablePair<PositionImpl, Integer>> positions) {
+                                                                List<MutablePair<PositionImpl, Integer>> positions,
+                                                                boolean isInCacheRequest) {
+        if (!isInCacheRequest) {
+            if (!checkIfReady()) {
+                synchronized (PendingAckHandleImpl.this) {

Review comment:
       Does this lock is used to make sure the state doesn't be changed?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -367,11 +494,38 @@ public PendingAckHandleImpl(PersistentSubscription persistentSubscription) {
         return commitFuture;
     }
 
+    private void addAbortTxnRequest(TxnID txnId, Consumer consumer, long lowWaterMark,
+                                    CompletableFuture<Void> completableFuture) {
+        acceptQueue.add(() -> abortTxn(txnId, consumer, lowWaterMark, true).thenAccept(v ->
+                completableFuture.complete(null)).exceptionally(e -> {
+            completableFuture.completeExceptionally(e);
+            return null;
+        }));
+    }
+
     @Override
-    public synchronized CompletableFuture<Void> abortTxn(TxnID txnId, Consumer consumer, long lowWaterMark) {
+    public synchronized CompletableFuture<Void> abortTxn(TxnID txnId, Consumer consumer,
+                                                         long lowWaterMark, boolean isInCacheRequest) {

Review comment:
       It seems that the param isInCacheRequest isn't used, do we need to add this param?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -300,11 +402,36 @@ public PendingAckHandleImpl(PersistentSubscription persistentSubscription) {
         return completableFuture;
     }
 
+    private void addCommitTxnRequest(TxnID txnId, Map<String, Long> properties, long lowWaterMark,
+                                    CompletableFuture<Void> completableFuture) {
+        acceptQueue.add(() -> commitTxn(txnId, properties, lowWaterMark, true).thenAccept(v ->
+                completableFuture.complete(null)).exceptionally(e -> {
+            completableFuture.completeExceptionally(e);
+            return null;
+        }));
+    }
+
     @Override
     public synchronized CompletableFuture<Void> commitTxn(TxnID txnID, Map<String, Long> properties,
-                                                          long lowWaterMark) {
+                                                          long lowWaterMark, boolean isInCacheRequest) {
         if (!checkIfReady()) {
-            return FutureUtil.failedFuture(new ServiceUnitNotReadyException("PendingAckHandle not replay complete!"));
+            synchronized (PendingAckHandleImpl.this) {
+                if (state == State.Initializing) {
+                    CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+                    addCommitTxnRequest(txnID, properties, lowWaterMark, completableFuture);
+                    return completableFuture;
+                } else if (state == State.None) {
+                    CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+                    addCommitTxnRequest(txnID, properties, lowWaterMark, completableFuture);
+                    initPendingAckStore();
+                    return completableFuture;
+                } else if (checkIfReady()) {
+
+                } else {
+                    return FutureUtil.failedFuture(
+                            new ServiceUnitNotReadyException("PendingAckHandle not replay complete!"));

Review comment:
       Does this error log is right? It seems that the state is `Error` or `Close`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org