You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@activemq.apache.org by "rvais (via GitHub)" <gi...@apache.org> on 2023/05/15 18:16:15 UTC

[GitHub] [activemq-artemis] rvais commented on a diff in pull request #4472: ARTEMIS-4278 Incorrect Paging Counters with Prepared Transactions

rvais commented on code in PR #4472:
URL: https://github.com/apache/activemq-artemis/pull/4472#discussion_r1194106917


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -241,28 +247,64 @@ public void rebuild() throws Exception {
                if (logger.isTraceEnabled()) {
                   logger.trace("reading message for rebuild cursor on address={}, pg={}, messageNR={}, routedQueues={}, message={}, queueLIst={}", pgStore.getAddress(), msg.getPageNumber(), msg.getMessageNumber(), routedQueues, msg, routedQueues);
                }
+
+               PageTransactionInfo txInfo = null;
+
+               if (msg.getTransactionID() > 0) {
+                  txInfo = transactions.get(msg.getTransactionID());
+               }
+
+               Transaction preparedTX = txInfo == null ? null : txInfo.getPreparedTransaction();
+
+               if (logger.isTraceEnabled()) {
+                  if (logger.isTraceEnabled()) {
+                     logger.trace("lookup on {}, tx={}, preparedTX={}", msg.getTransactionID(), txInfo, preparedTX);
+                  }
+               }
+
                for (long queueID : routedQueues) {
                   boolean ok = !isACK(queueID, msg.getPageNumber(), msg.getMessageNumber());
 
-                  boolean txOK = msg.getTransactionID() <= 0 || transactions == null || transactions.contains(msg.getTransactionID());
+                  // if the pageTransaction is in prepare state, we have to increment the counter after the commit
+                  // notice that there is a check if the commit is done in afterCommit
+                  if (preparedTX != null) {
+                     PageSubscription subscription = pgStore.getCursorProvider().getSubscription(queueID);
+                     preparedTX.addOperation(new TransactionOperationAbstract() {
+                        @Override
+                        public void afterCommit(Transaction tx) {
+                           // We use the pagingManager executor here, in case the commit happened while the rebuild manager is working
+                           // on that case the increment will wait any pending tasks on that executor to finish before this executor takes effect

Review Comment:
   Typo. I believe there should be "in that case", but since it's in comment it is not that important.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -241,28 +247,64 @@ public void rebuild() throws Exception {
                if (logger.isTraceEnabled()) {
                   logger.trace("reading message for rebuild cursor on address={}, pg={}, messageNR={}, routedQueues={}, message={}, queueLIst={}", pgStore.getAddress(), msg.getPageNumber(), msg.getMessageNumber(), routedQueues, msg, routedQueues);
                }
+
+               PageTransactionInfo txInfo = null;
+
+               if (msg.getTransactionID() > 0) {
+                  txInfo = transactions.get(msg.getTransactionID());
+               }
+
+               Transaction preparedTX = txInfo == null ? null : txInfo.getPreparedTransaction();
+
+               if (logger.isTraceEnabled()) {
+                  if (logger.isTraceEnabled()) {
+                     logger.trace("lookup on {}, tx={}, preparedTX={}", msg.getTransactionID(), txInfo, preparedTX);
+                  }
+               }
+
                for (long queueID : routedQueues) {
                   boolean ok = !isACK(queueID, msg.getPageNumber(), msg.getMessageNumber());
 
-                  boolean txOK = msg.getTransactionID() <= 0 || transactions == null || transactions.contains(msg.getTransactionID());
+                  // if the pageTransaction is in prepare state, we have to increment the counter after the commit
+                  // notice that there is a check if the commit is done in afterCommit
+                  if (preparedTX != null) {
+                     PageSubscription subscription = pgStore.getCursorProvider().getSubscription(queueID);
+                     preparedTX.addOperation(new TransactionOperationAbstract() {
+                        @Override
+                        public void afterCommit(Transaction tx) {
+                           // We use the pagingManager executor here, in case the commit happened while the rebuild manager is working
+                           // on that case the increment will wait any pending tasks on that executor to finish before this executor takes effect
+                           pagingManager.execute(() -> {
+                              try {
+                                 subscription.getCounter().increment(null, 1, msg.getStoredSize());
+                              } catch (Exception e) {
+                                 logger.warn(e.getMessage(), e);
+                              }
+                           });
+                        }
+                     });
 
-                  if (!txOK) {
-                     logger.debug("TX is not ok for {}", msg);
-                  }
+                  } else {
+                     boolean txOK = msg.getTransactionID() <= 0 || transactions == null || txInfo != null;

Review Comment:
   "txOK" is a bit confusing to me personally in given context.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -241,28 +247,64 @@ public void rebuild() throws Exception {
                if (logger.isTraceEnabled()) {
                   logger.trace("reading message for rebuild cursor on address={}, pg={}, messageNR={}, routedQueues={}, message={}, queueLIst={}", pgStore.getAddress(), msg.getPageNumber(), msg.getMessageNumber(), routedQueues, msg, routedQueues);
                }
+
+               PageTransactionInfo txInfo = null;
+
+               if (msg.getTransactionID() > 0) {
+                  txInfo = transactions.get(msg.getTransactionID());
+               }
+
+               Transaction preparedTX = txInfo == null ? null : txInfo.getPreparedTransaction();
+
+               if (logger.isTraceEnabled()) {

Review Comment:
   There are two exactly the same conditions "logger.isTraceEnabled()" nested together.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org