You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/06/14 01:50:04 UTC

[GitHub] [pulsar] MarvinCai commented on a change in pull request #4265: [transaction][acknowledge] Introduce in-memory PENDING_ACK state in acknowledgement path

MarvinCai commented on a change in pull request #4265: [transaction][acknowledge] Introduce in-memory PENDING_ACK state in acknowledgement path
URL: https://github.com/apache/pulsar/pull/4265#discussion_r293635801
 
 

 ##########
 File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 ##########
 @@ -723,6 +917,115 @@ void topicTerminated() {
         }
     }
 
+    /**
+     * Commit a transaction.
+     *
+     * @param txnId         {@link TxnID} to identify the transaction.
+     * @param properties    Additional user-defined properties that can be associated with a particular cursor position.
+     * @throws IllegalArgumentException if given {@link TxnID} is not found in this subscription.
+     */
+    public synchronized CompletableFuture<Void> commitTxn(TxnID txnId, Map<String,Long> properties) {
+
+        if (pendingAckMessagesMap != null && !this.pendingAckMessagesMap.containsKey(txnId)) {
+            String errorMsg = "[" + topicName + "][" + subName + "] Transaction with id:" + txnId + " not found.";
+            log.error(errorMsg);
+            throw new IllegalArgumentException(errorMsg);
+        }
+
+        CompletableFuture<Void> commitFuture = new CompletableFuture<>();
+        CompletableFuture<Void> deleteFuture = new CompletableFuture<>();
+        CompletableFuture<Void> marketDeleteFuture = new CompletableFuture<>();
+
+        MarkDeleteCallback markDeleteCallback = new MarkDeleteCallback() {
+            @Override
+            public void markDeleteComplete(Object ctx) {
+                PositionImpl pos = (PositionImpl) ctx;
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}][{}] Mark deleted messages until position {}", topicName, subName, pos);
+                }
+                marketDeleteFuture.complete(null);
+            }
+
+            @Override
+            public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}][{}] Failed to mark delete for position ", topicName, subName, ctx, exception);
+                }
+                marketDeleteFuture.completeExceptionally(exception);
+            }
+        };
+
+        DeleteCallback deleteCallback = new DeleteCallback() {
+            @Override
+            public void deleteComplete(Object position) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}][{}] Deleted message at {}", topicName, subName, position);
+                }
+                deleteFuture.complete(null);
+            }
+
+            @Override
+            public void deleteFailed(ManagedLedgerException exception, Object ctx) {
+                if (log.isDebugEnabled()) {
+                    log.warn("[{}][{}] Failed to delete message at {}", topicName, subName, ctx, exception);
+                }
+                deleteFuture.completeExceptionally(exception);
+            }
+        };
+
+        // It's valid to create transaction then commit without doing any operation, which will cause
+        // pendingAckMessagesMap to be null.
+        ConcurrentOpenHashSet<Position> pendingAckMessageForCurrentTxn = pendingAckMessagesMap != null ?
+                                                this.pendingAckMessagesMap.remove(txnId) : new ConcurrentOpenHashSet();
 
 Review comment:
   you're right, it's actually unnecessary to create this empty set, will update in next commit.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services