You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/09/02 15:21:06 UTC

[GitHub] [pulsar] congbobo184 commented on a change in pull request #11091: [Transaction] Transaction pending ack lazy init.

congbobo184 commented on a change in pull request #11091:
URL: https://github.com/apache/pulsar/pull/11091#discussion_r701173839



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -101,36 +103,96 @@
 
     private final PersistentSubscription persistentSubscription;
 
-    private final CompletableFuture<PendingAckStore> pendingAckStoreFuture;
+    private CompletableFuture<PendingAckStore> pendingAckStoreFuture;
 
     private final CompletableFuture<PendingAckHandle> pendingAckHandleCompletableFuture = new CompletableFuture<>();
 
+    private final TransactionPendingAckStoreProvider pendingAckStoreProvider;
+
+    private final BlockingQueue<Runnable> acceptQueue = new LinkedBlockingDeque<>();
+
     public PendingAckHandleImpl(PersistentSubscription persistentSubscription) {
         super(State.None);
         this.topicName = persistentSubscription.getTopicName();
         this.subName = persistentSubscription.getName();
         this.persistentSubscription = persistentSubscription;
 
-        TransactionPendingAckStoreProvider pendingAckStoreProvider =
-                ((PersistentTopic) this.persistentSubscription.getTopic())
+        this.pendingAckStoreProvider = ((PersistentTopic) this.persistentSubscription.getTopic())
                         .getBrokerService().getPulsar().getTransactionPendingAckStoreProvider();
-        this.pendingAckStoreFuture =
-                pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
-
-        this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
-            changeToInitializingState();
-            pendingAckStore.replayAsync(this,
-                    ((PersistentTopic) persistentSubscription.getTopic()).getBrokerService()
-                            .getPulsar().getTransactionReplayExecutor());
-        }).exceptionally(e -> {
-            log.error("PendingAckHandleImpl init fail! TopicName : {}, SubName: {}", topicName, subName, e);
-            return null;
+        pendingAckStoreProvider.checkInitializedBefore(persistentSubscription).thenAccept(init -> {
+            if (init) {
+                initPendingAckStore();
+            } else {
+                completeHandleFuture();
+            }
         });
     }
 
+    private void initPendingAckStore() {
+        if (changeToInitializingState()) {
+            synchronized (PendingAckHandleImpl.this) {
+                if (!checkIfClose()) {
+                    this.pendingAckStoreFuture =
+                            pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
+                    this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
+                        pendingAckStore.replayAsync(this,
+                                ((PersistentTopic) persistentSubscription.getTopic()).getBrokerService()
+                                        .getPulsar().getTransactionReplayExecutor());
+                    }).exceptionally(e -> {
+                        acceptQueue.clear();
+                        changeToErrorState();
+                        log.error("PendingAckHandleImpl init fail! TopicName : {}, SubName: {}", topicName, subName, e);
+                        return null;
+                    });
+                }
+            }
+        }
+    }
+
+    private void addIndividualAcknowledgeMessageRequest(TxnID txnID,
+                                                        List<MutablePair<PositionImpl, Integer>> positions,
+                                                        CompletableFuture<Void> completableFuture) {
+        acceptQueue.add(() -> individualAcknowledgeMessage(txnID, positions, true).thenAccept(v ->
+                completableFuture.complete(null)).exceptionally(e -> {
+            completableFuture.completeExceptionally(e);
+            return null;
+        }));
+    }
+
     @Override
     public CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID,
-                                                                List<MutablePair<PositionImpl, Integer>> positions) {
+                                                                List<MutablePair<PositionImpl, Integer>> positions,
+                                                                boolean isInCacheRequest) {
+
+        if (!checkIfReady() && !isInCacheRequest) {
+            synchronized (PendingAckHandleImpl.this) {
+                if (state == State.Initializing) {
+                    CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+                    addIndividualAcknowledgeMessageRequest(txnID, positions, completableFuture);
+                    return completableFuture;
+                } else if (state == State.None) {
+                    CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+                    addIndividualAcknowledgeMessageRequest(txnID, positions, completableFuture);
+                    initPendingAckStore();
+                    return completableFuture;
+                } else if (checkIfReady()) {
+
+                } else {
+                    return FutureUtil.failedFuture(
+                            new ServiceUnitNotReadyException("PendingAckHandle not replay complete!"));
+                }
+            }
+        }
+
+        if (!acceptQueue.isEmpty() && !isInCacheRequest) {
+            synchronized (PendingAckHandleImpl.this) {
+                if (!acceptQueue.isEmpty()) {
+                    CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+                    addIndividualAcknowledgeMessageRequest(txnID, positions, completableFuture);

Review comment:
       We need to make sure that Ack is in order

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java
##########
@@ -44,16 +44,19 @@ public MLPendingAckReplyCallBack(PendingAckHandleImpl pendingAckHandle) {
 
     @Override
     public void replayComplete() {
-        log.info("Topic name : [{}], SubName : [{}] pending ack state reply success!",
-                pendingAckHandle.getTopicName(), pendingAckHandle.getSubName());
-
-        if (pendingAckHandle.changeToReadyState()) {
-            pendingAckHandle.completeHandleFuture();
+        synchronized (pendingAckHandle) {
             log.info("Topic name : [{}], SubName : [{}] pending ack state reply success!",
                     pendingAckHandle.getTopicName(), pendingAckHandle.getSubName());
-        } else {
-            log.error("Topic name : [{}], SubName : [{}] pending ack state reply fail!",
-                    pendingAckHandle.getTopicName(), pendingAckHandle.getSubName());
+
+            if (pendingAckHandle.changeToReadyState()) {
+                pendingAckHandle.completeHandleFuture();
+                pendingAckHandle.handleCacheRequest();

Review comment:
       In order to ensure the order of ack, I think we should continue to acquire this lock

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -101,36 +103,96 @@
 
     private final PersistentSubscription persistentSubscription;
 
-    private final CompletableFuture<PendingAckStore> pendingAckStoreFuture;
+    private CompletableFuture<PendingAckStore> pendingAckStoreFuture;
 
     private final CompletableFuture<PendingAckHandle> pendingAckHandleCompletableFuture = new CompletableFuture<>();
 
+    private final TransactionPendingAckStoreProvider pendingAckStoreProvider;
+
+    private final BlockingQueue<Runnable> acceptQueue = new LinkedBlockingDeque<>();
+
     public PendingAckHandleImpl(PersistentSubscription persistentSubscription) {
         super(State.None);
         this.topicName = persistentSubscription.getTopicName();
         this.subName = persistentSubscription.getName();
         this.persistentSubscription = persistentSubscription;
 
-        TransactionPendingAckStoreProvider pendingAckStoreProvider =
-                ((PersistentTopic) this.persistentSubscription.getTopic())
+        this.pendingAckStoreProvider = ((PersistentTopic) this.persistentSubscription.getTopic())
                         .getBrokerService().getPulsar().getTransactionPendingAckStoreProvider();
-        this.pendingAckStoreFuture =
-                pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
-
-        this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
-            changeToInitializingState();
-            pendingAckStore.replayAsync(this,
-                    ((PersistentTopic) persistentSubscription.getTopic()).getBrokerService()
-                            .getPulsar().getTransactionReplayExecutor());
-        }).exceptionally(e -> {
-            log.error("PendingAckHandleImpl init fail! TopicName : {}, SubName: {}", topicName, subName, e);
-            return null;
+        pendingAckStoreProvider.checkInitializedBefore(persistentSubscription).thenAccept(init -> {
+            if (init) {
+                initPendingAckStore();
+            } else {
+                completeHandleFuture();
+            }
         });
     }
 
+    private void initPendingAckStore() {
+        if (changeToInitializingState()) {
+            synchronized (PendingAckHandleImpl.this) {
+                if (!checkIfClose()) {
+                    this.pendingAckStoreFuture =
+                            pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
+                    this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
+                        pendingAckStore.replayAsync(this,
+                                ((PersistentTopic) persistentSubscription.getTopic()).getBrokerService()
+                                        .getPulsar().getTransactionReplayExecutor());
+                    }).exceptionally(e -> {
+                        acceptQueue.clear();
+                        changeToErrorState();
+                        log.error("PendingAckHandleImpl init fail! TopicName : {}, SubName: {}", topicName, subName, e);
+                        return null;
+                    });
+                }
+            }
+        }
+    }
+
+    private void addIndividualAcknowledgeMessageRequest(TxnID txnID,
+                                                        List<MutablePair<PositionImpl, Integer>> positions,
+                                                        CompletableFuture<Void> completableFuture) {
+        acceptQueue.add(() -> individualAcknowledgeMessage(txnID, positions, true).thenAccept(v ->
+                completableFuture.complete(null)).exceptionally(e -> {
+            completableFuture.completeExceptionally(e);
+            return null;
+        }));
+    }
+
     @Override
     public CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID,
-                                                                List<MutablePair<PositionImpl, Integer>> positions) {
+                                                                List<MutablePair<PositionImpl, Integer>> positions,
+                                                                boolean isInCacheRequest) {
+
+        if (!checkIfReady() && !isInCacheRequest) {
+            synchronized (PendingAckHandleImpl.this) {
+                if (state == State.Initializing) {
+                    CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+                    addIndividualAcknowledgeMessageRequest(txnID, positions, completableFuture);
+                    return completableFuture;
+                } else if (state == State.None) {
+                    CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+                    addIndividualAcknowledgeMessageRequest(txnID, positions, completableFuture);
+                    initPendingAckStore();
+                    return completableFuture;
+                } else if (checkIfReady()) {
+

Review comment:
       When ready, we should let the code continue to execute downward




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org