You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@activemq.apache.org by GitBox <gi...@apache.org> on 2021/09/08 17:35:42 UTC

[GitHub] [activemq-artemis] clebertsuconic commented on a change in pull request #3728: ARTEMIS-3464 Missing ACKs on Page and Mirror

clebertsuconic commented on a change in pull request #3728:
URL: https://github.com/apache/activemq-artemis/pull/3728#discussion_r704634791



##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
##########
@@ -112,6 +112,141 @@ public AtomicInteger getScheduledCleanupCount() {
 
    // Each CursorIterator will record their current PageReader in this map
    private final ConcurrentLongHashMap<PageReader> pageReaders = new ConcurrentLongHashMap<>();
+   private final AtomicInteger scheduledScanCount = new AtomicInteger(0);
+
+   private final LinkedList<PageScan> scanList = new LinkedList();
+
+   private static class PageScan {
+      final Comparable<PagedReference> scanFunction;
+      final Runnable found;
+      final Runnable notfound;
+
+      public Comparable<PagedReference> getScanFunction() {
+         return scanFunction;
+      }
+
+      public Runnable getFound() {
+         return found;
+      }
+
+      public Runnable getNotfound() {
+         return notfound;
+      }
+
+      PageScan(Comparable<PagedReference> scanFunction, Runnable found, Runnable notfound) {
+         this.scanFunction = scanFunction;
+         this.found = found;
+         this.notfound = notfound;
+      }
+   }
+
+   @Override
+   public void addScanAck(Comparable<PagedReference> scanFunction, Runnable found, Runnable notfound) {
+      PageScan scan = new PageScan(scanFunction, found, notfound);
+      synchronized (scanList) {
+         scanList.add(scan);
+      }
+   }
+
+   @Override
+   public void performScanAck() {
+      // we should only have a max of 2 scheduled tasks
+      // one that's might still be currently running, and another one lined up
+      // no need for more than that
+      if (scheduledScanCount.incrementAndGet() < 2) {
+         executor.execute(this::actualScanAck);
+      } else {
+         scheduledScanCount.decrementAndGet();
+      }
+   }
+
+   private void actualScanAck() {
+      try {
+         PageScan[] localScanList;
+         synchronized (scanList) {
+            if (scanList.size() == 0) {
+               return;
+            }
+            localScanList = scanList.stream().toArray(i -> new PageScan[i]);
+            scanList.clear();
+         }
+
+         LinkedList<Runnable> afterCommitList = new LinkedList<>();
+         TransactionImpl tx = new TransactionImpl(store);
+         tx.addOperation(new TransactionOperationAbstract() {
+            @Override
+            public void afterCommit(Transaction tx) {
+               for (Runnable r : afterCommitList) {
+                  try {
+                     r.run();
+                  } catch (Throwable e) {
+                     logger.warn(e.getMessage(), e);
+                  }
+               }
+            }
+         });
+         PageIterator iterator = this.iterator(true);
+         boolean hasNext = iterator.hasNext();
+         System.out.println("hasNext = " + hasNext);

Review comment:
       note to myself: removing system.out




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org