You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2020/07/13 10:01:59 UTC

[GitHub] [ignite] Mmuzaf opened a new pull request #8028: IGNITE-13126: Reduce continuous query heap usage

Mmuzaf opened a new pull request #8028:
URL: https://github.com/apache/ignite/pull/8028


   Thank you for submitting the pull request to the Apache Ignite.
   
   In order to streamline the review of the contribution 
   we ask you to ensure the following steps have been taken:
   
   ### The Contribution Checklist
   - [ ] There is a single JIRA ticket related to the pull request. 
   - [ ] The web-link to the pull request is attached to the JIRA ticket.
   - [ ] The JIRA ticket has the _Patch Available_ state.
   - [ ] The pull request body describes changes that have been made. 
   The description explains _WHAT_ and _WHY_ was made instead of _HOW_.
   - [ ] The pull request title is treated as the final commit message. 
   The following pattern must be used: `IGNITE-XXXX Change summary` where `XXXX` - number of JIRA issue.
   - [ ] A reviewer has been mentioned through the JIRA comments 
   (see [the Maintainers list](https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute#HowtoContribute-ReviewProcessandMaintainers)) 
   - [ ] The pull request has been checked by the Teamcity Bot and 
   the `green visa` attached to the JIRA ticket (see [TC.Bot: Check PR](https://mtcga.gridgain.com/prs.html))
   
   ### Notes
   - [How to Contribute](https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute)
   - [Coding abbreviation rules](https://cwiki.apache.org/confluence/display/IGNITE/Abbreviation+Rules)
   - [Coding Guidelines](https://cwiki.apache.org/confluence/display/IGNITE/Coding+Guidelines)
   - [Apache Ignite Teamcity Bot](https://cwiki.apache.org/confluence/display/IGNITE/Apache+Ignite+Teamcity+Bot)
   
   If you need any help, please email dev@ignite.apache.org or ask anу advice on http://asf.slack.com _#ignite_ channel.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] NSAmelchev commented on a change in pull request #8028: IGNITE-13126: Reduce continuous query heap usage

Posted by GitBox <gi...@apache.org>.
NSAmelchev commented on a change in pull request #8028:
URL: https://github.com/apache/ignite/pull/8028#discussion_r487096887



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -376,34 +338,32 @@ private Batch initBatch(AffinityTopologyVersion topVer, boolean backup) {
         /** */
         private CacheContinuousQueryEntry[] entries;
 
-        /** */
-        private final AffinityTopologyVersion topVer;
-
         /**
          * @param filtered Number of filtered events before this batch.
          * @param entries Entries array.
-         * @param topVer Current event topology version.
          * @param startCntr Start counter.
          */
-        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries, AffinityTopologyVersion topVer) {
+        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries) {
             assert startCntr >= 0;
             assert filtered >= 0;
 
             this.startCntr = startCntr;
             this.filtered = filtered;
             this.entries = entries;
-            this.topVer = topVer;
 
             endCntr = startCntr + BUF_SIZE - 1;
         }
 
         /**
-         * @param res Current entries.
-         * @return Entries to send as part of backup queue.
+         * @param filteredFactory Factory which produces filtered entries.
+         * @return Map of collected entries.
          */
-        @Nullable synchronized TreeMap<Long, CacheContinuousQueryEntry> flushCurrentEntries(
-            @Nullable TreeMap<Long, CacheContinuousQueryEntry> res) {
-            if (entries == null)
+        synchronized Map<Long, CacheContinuousQueryEntry> flushCurrentEntries(
+            BiFunction<Long, Long, CacheContinuousQueryEntry> filteredFactory
+        ) {
+            Map<Long, CacheContinuousQueryEntry> res = new HashMap<>();

Review comment:
       Empty map can be returned if `entries == null || filteredFactory == null`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] NSAmelchev commented on a change in pull request #8028: IGNITE-13126: Reduce continuous query heap usage

Posted by GitBox <gi...@apache.org>.
NSAmelchev commented on a change in pull request #8028:
URL: https://github.com/apache/ignite/pull/8028#discussion_r487003898



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -76,80 +78,58 @@
     private final GridAtomicLong ackedUpdCntr = new GridAtomicLong(0);
 
     /**
-     * @param part Partition number.
+     * @param currPartCntr Current partition counter.
      * @param log Continuous query category logger.
      */
-    CacheContinuousQueryEventBuffer(int part, IgniteLogger log) {
-        this.part = part;
+    CacheContinuousQueryEventBuffer(LongUnaryOperator currPartCntr, IgniteLogger log) {
+        this.currPartCntr = currPartCntr;
         this.log = log;
     }
 
     /**
-     * @param part Partition number.
+     * @param log Continuous query category logger.
      */
-    CacheContinuousQueryEventBuffer(int part) {
-        this(part, null);
+    CacheContinuousQueryEventBuffer(IgniteLogger log) {
+        this((backup) -> 0, log);
     }
 
     /**
      * @param updateCntr Acknowledged counter.
      */
-    void cleanupBackupQueue(Long updateCntr) {
-        Iterator<CacheContinuousQueryEntry> it = backupQ.iterator();
-
-        while (it.hasNext()) {
-            CacheContinuousQueryEntry backupEntry = it.next();
-
-            if (backupEntry.updateCounter() <= updateCntr)
-                it.remove();
-        }
-
+    void cleanupOnAck(long updateCntr) {
+        backupQ.removeIf(backupEntry -> backupEntry.updateCounter() <= updateCntr);

Review comment:
       Add line break, please

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
##########
@@ -155,8 +155,8 @@ void resetTopologyCache() {
                     if (!entry.isFiltered())
                         entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry));
 
-                    if (log.isDebugEnabled())
-                        log.debug("Partition was lost [lastFiredEvt=" + lastFiredEvt +
+                    if (log.isInfoEnabled())

Review comment:
       I suggest to revert this change. It's not related to the issue.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -376,34 +338,32 @@ private Batch initBatch(AffinityTopologyVersion topVer, boolean backup) {
         /** */
         private CacheContinuousQueryEntry[] entries;
 
-        /** */
-        private final AffinityTopologyVersion topVer;
-
         /**
          * @param filtered Number of filtered events before this batch.
          * @param entries Entries array.
-         * @param topVer Current event topology version.
          * @param startCntr Start counter.
          */
-        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries, AffinityTopologyVersion topVer) {
+        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries) {
             assert startCntr >= 0;
             assert filtered >= 0;
 
             this.startCntr = startCntr;
             this.filtered = filtered;
             this.entries = entries;
-            this.topVer = topVer;
 
             endCntr = startCntr + BUF_SIZE - 1;
         }
 
         /**
-         * @param res Current entries.
-         * @return Entries to send as part of backup queue.
+         * @param filteredFactory Factory which produces filtered entries.
+         * @return Map of collected entries.
          */
-        @Nullable synchronized TreeMap<Long, CacheContinuousQueryEntry> flushCurrentEntries(
-            @Nullable TreeMap<Long, CacheContinuousQueryEntry> res) {
-            if (entries == null)
+        synchronized Map<Long, CacheContinuousQueryEntry> flushCurrentEntries(
+            BiFunction<Long, Long, CacheContinuousQueryEntry> filteredFactory
+        ) {
+            Map<Long, CacheContinuousQueryEntry> res = new HashMap<>();

Review comment:
       Empty map can be returned if `entries == null || filteredFactory == null`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -376,34 +338,32 @@ private Batch initBatch(AffinityTopologyVersion topVer, boolean backup) {
         /** */
         private CacheContinuousQueryEntry[] entries;
 
-        /** */
-        private final AffinityTopologyVersion topVer;
-
         /**
          * @param filtered Number of filtered events before this batch.
          * @param entries Entries array.
-         * @param topVer Current event topology version.
          * @param startCntr Start counter.
          */
-        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries, AffinityTopologyVersion topVer) {
+        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries) {
             assert startCntr >= 0;
             assert filtered >= 0;
 
             this.startCntr = startCntr;
             this.filtered = filtered;
             this.entries = entries;
-            this.topVer = topVer;
 
             endCntr = startCntr + BUF_SIZE - 1;
         }
 
         /**
-         * @param res Current entries.
-         * @return Entries to send as part of backup queue.
+         * @param filteredFactory Factory which produces filtered entries.
+         * @return Map of collected entries.
          */
-        @Nullable synchronized TreeMap<Long, CacheContinuousQueryEntry> flushCurrentEntries(
-            @Nullable TreeMap<Long, CacheContinuousQueryEntry> res) {
-            if (entries == null)
+        synchronized Map<Long, CacheContinuousQueryEntry> flushCurrentEntries(
+            BiFunction<Long, Long, CacheContinuousQueryEntry> filteredFactory
+        ) {
+            Map<Long, CacheContinuousQueryEntry> res = new HashMap<>();

Review comment:
       Can we estimate map size?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -76,80 +78,58 @@
     private final GridAtomicLong ackedUpdCntr = new GridAtomicLong(0);
 
     /**
-     * @param part Partition number.
+     * @param currPartCntr Current partition counter.
      * @param log Continuous query category logger.
      */
-    CacheContinuousQueryEventBuffer(int part, IgniteLogger log) {
-        this.part = part;
+    CacheContinuousQueryEventBuffer(LongUnaryOperator currPartCntr, IgniteLogger log) {
+        this.currPartCntr = currPartCntr;
         this.log = log;
     }
 
     /**
-     * @param part Partition number.
+     * @param log Continuous query category logger.
      */
-    CacheContinuousQueryEventBuffer(int part) {
-        this(part, null);
+    CacheContinuousQueryEventBuffer(IgniteLogger log) {
+        this((backup) -> 0, log);
     }
 
     /**
      * @param updateCntr Acknowledged counter.
      */
-    void cleanupBackupQueue(Long updateCntr) {
-        Iterator<CacheContinuousQueryEntry> it = backupQ.iterator();
-
-        while (it.hasNext()) {
-            CacheContinuousQueryEntry backupEntry = it.next();
-
-            if (backupEntry.updateCounter() <= updateCntr)
-                it.remove();
-        }
-
+    void cleanupOnAck(long updateCntr) {
+        backupQ.removeIf(backupEntry -> backupEntry.updateCounter() <= updateCntr);

Review comment:
       Add line break, please

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
##########
@@ -155,8 +155,8 @@ void resetTopologyCache() {
                     if (!entry.isFiltered())
                         entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry));
 
-                    if (log.isDebugEnabled())
-                        log.debug("Partition was lost [lastFiredEvt=" + lastFiredEvt +
+                    if (log.isInfoEnabled())

Review comment:
       I suggest to revert this change. It's not related to the issue.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -376,34 +338,32 @@ private Batch initBatch(AffinityTopologyVersion topVer, boolean backup) {
         /** */
         private CacheContinuousQueryEntry[] entries;
 
-        /** */
-        private final AffinityTopologyVersion topVer;
-
         /**
          * @param filtered Number of filtered events before this batch.
          * @param entries Entries array.
-         * @param topVer Current event topology version.
          * @param startCntr Start counter.
          */
-        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries, AffinityTopologyVersion topVer) {
+        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries) {
             assert startCntr >= 0;
             assert filtered >= 0;
 
             this.startCntr = startCntr;
             this.filtered = filtered;
             this.entries = entries;
-            this.topVer = topVer;
 
             endCntr = startCntr + BUF_SIZE - 1;
         }
 
         /**
-         * @param res Current entries.
-         * @return Entries to send as part of backup queue.
+         * @param filteredFactory Factory which produces filtered entries.
+         * @return Map of collected entries.
          */
-        @Nullable synchronized TreeMap<Long, CacheContinuousQueryEntry> flushCurrentEntries(
-            @Nullable TreeMap<Long, CacheContinuousQueryEntry> res) {
-            if (entries == null)
+        synchronized Map<Long, CacheContinuousQueryEntry> flushCurrentEntries(
+            BiFunction<Long, Long, CacheContinuousQueryEntry> filteredFactory
+        ) {
+            Map<Long, CacheContinuousQueryEntry> res = new HashMap<>();

Review comment:
       Empty map can be returned if `entries == null || filteredFactory == null`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -376,34 +338,32 @@ private Batch initBatch(AffinityTopologyVersion topVer, boolean backup) {
         /** */
         private CacheContinuousQueryEntry[] entries;
 
-        /** */
-        private final AffinityTopologyVersion topVer;
-
         /**
          * @param filtered Number of filtered events before this batch.
          * @param entries Entries array.
-         * @param topVer Current event topology version.
          * @param startCntr Start counter.
          */
-        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries, AffinityTopologyVersion topVer) {
+        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries) {
             assert startCntr >= 0;
             assert filtered >= 0;
 
             this.startCntr = startCntr;
             this.filtered = filtered;
             this.entries = entries;
-            this.topVer = topVer;
 
             endCntr = startCntr + BUF_SIZE - 1;
         }
 
         /**
-         * @param res Current entries.
-         * @return Entries to send as part of backup queue.
+         * @param filteredFactory Factory which produces filtered entries.
+         * @return Map of collected entries.
          */
-        @Nullable synchronized TreeMap<Long, CacheContinuousQueryEntry> flushCurrentEntries(
-            @Nullable TreeMap<Long, CacheContinuousQueryEntry> res) {
-            if (entries == null)
+        synchronized Map<Long, CacheContinuousQueryEntry> flushCurrentEntries(
+            BiFunction<Long, Long, CacheContinuousQueryEntry> filteredFactory
+        ) {
+            Map<Long, CacheContinuousQueryEntry> res = new HashMap<>();

Review comment:
       Can we estimate map size?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -76,80 +78,58 @@
     private final GridAtomicLong ackedUpdCntr = new GridAtomicLong(0);
 
     /**
-     * @param part Partition number.
+     * @param currPartCntr Current partition counter.
      * @param log Continuous query category logger.
      */
-    CacheContinuousQueryEventBuffer(int part, IgniteLogger log) {
-        this.part = part;
+    CacheContinuousQueryEventBuffer(LongUnaryOperator currPartCntr, IgniteLogger log) {
+        this.currPartCntr = currPartCntr;
         this.log = log;
     }
 
     /**
-     * @param part Partition number.
+     * @param log Continuous query category logger.
      */
-    CacheContinuousQueryEventBuffer(int part) {
-        this(part, null);
+    CacheContinuousQueryEventBuffer(IgniteLogger log) {
+        this((backup) -> 0, log);
     }
 
     /**
      * @param updateCntr Acknowledged counter.
      */
-    void cleanupBackupQueue(Long updateCntr) {
-        Iterator<CacheContinuousQueryEntry> it = backupQ.iterator();
-
-        while (it.hasNext()) {
-            CacheContinuousQueryEntry backupEntry = it.next();
-
-            if (backupEntry.updateCounter() <= updateCntr)
-                it.remove();
-        }
-
+    void cleanupOnAck(long updateCntr) {
+        backupQ.removeIf(backupEntry -> backupEntry.updateCounter() <= updateCntr);

Review comment:
       Add line break, please

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
##########
@@ -155,8 +155,8 @@ void resetTopologyCache() {
                     if (!entry.isFiltered())
                         entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry));
 
-                    if (log.isDebugEnabled())
-                        log.debug("Partition was lost [lastFiredEvt=" + lastFiredEvt +
+                    if (log.isInfoEnabled())

Review comment:
       I suggest to revert this change. It's not related to the issue.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -376,34 +338,32 @@ private Batch initBatch(AffinityTopologyVersion topVer, boolean backup) {
         /** */
         private CacheContinuousQueryEntry[] entries;
 
-        /** */
-        private final AffinityTopologyVersion topVer;
-
         /**
          * @param filtered Number of filtered events before this batch.
          * @param entries Entries array.
-         * @param topVer Current event topology version.
          * @param startCntr Start counter.
          */
-        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries, AffinityTopologyVersion topVer) {
+        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries) {
             assert startCntr >= 0;
             assert filtered >= 0;
 
             this.startCntr = startCntr;
             this.filtered = filtered;
             this.entries = entries;
-            this.topVer = topVer;
 
             endCntr = startCntr + BUF_SIZE - 1;
         }
 
         /**
-         * @param res Current entries.
-         * @return Entries to send as part of backup queue.
+         * @param filteredFactory Factory which produces filtered entries.
+         * @return Map of collected entries.
          */
-        @Nullable synchronized TreeMap<Long, CacheContinuousQueryEntry> flushCurrentEntries(
-            @Nullable TreeMap<Long, CacheContinuousQueryEntry> res) {
-            if (entries == null)
+        synchronized Map<Long, CacheContinuousQueryEntry> flushCurrentEntries(
+            BiFunction<Long, Long, CacheContinuousQueryEntry> filteredFactory
+        ) {
+            Map<Long, CacheContinuousQueryEntry> res = new HashMap<>();

Review comment:
       Empty map can be returned if `entries == null || filteredFactory == null`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -376,34 +338,32 @@ private Batch initBatch(AffinityTopologyVersion topVer, boolean backup) {
         /** */
         private CacheContinuousQueryEntry[] entries;
 
-        /** */
-        private final AffinityTopologyVersion topVer;
-
         /**
          * @param filtered Number of filtered events before this batch.
          * @param entries Entries array.
-         * @param topVer Current event topology version.
          * @param startCntr Start counter.
          */
-        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries, AffinityTopologyVersion topVer) {
+        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries) {
             assert startCntr >= 0;
             assert filtered >= 0;
 
             this.startCntr = startCntr;
             this.filtered = filtered;
             this.entries = entries;
-            this.topVer = topVer;
 
             endCntr = startCntr + BUF_SIZE - 1;
         }
 
         /**
-         * @param res Current entries.
-         * @return Entries to send as part of backup queue.
+         * @param filteredFactory Factory which produces filtered entries.
+         * @return Map of collected entries.
          */
-        @Nullable synchronized TreeMap<Long, CacheContinuousQueryEntry> flushCurrentEntries(
-            @Nullable TreeMap<Long, CacheContinuousQueryEntry> res) {
-            if (entries == null)
+        synchronized Map<Long, CacheContinuousQueryEntry> flushCurrentEntries(
+            BiFunction<Long, Long, CacheContinuousQueryEntry> filteredFactory
+        ) {
+            Map<Long, CacheContinuousQueryEntry> res = new HashMap<>();

Review comment:
       Can we estimate map size?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -76,80 +78,58 @@
     private final GridAtomicLong ackedUpdCntr = new GridAtomicLong(0);
 
     /**
-     * @param part Partition number.
+     * @param currPartCntr Current partition counter.
      * @param log Continuous query category logger.
      */
-    CacheContinuousQueryEventBuffer(int part, IgniteLogger log) {
-        this.part = part;
+    CacheContinuousQueryEventBuffer(LongUnaryOperator currPartCntr, IgniteLogger log) {
+        this.currPartCntr = currPartCntr;
         this.log = log;
     }
 
     /**
-     * @param part Partition number.
+     * @param log Continuous query category logger.
      */
-    CacheContinuousQueryEventBuffer(int part) {
-        this(part, null);
+    CacheContinuousQueryEventBuffer(IgniteLogger log) {
+        this((backup) -> 0, log);
     }
 
     /**
      * @param updateCntr Acknowledged counter.
      */
-    void cleanupBackupQueue(Long updateCntr) {
-        Iterator<CacheContinuousQueryEntry> it = backupQ.iterator();
-
-        while (it.hasNext()) {
-            CacheContinuousQueryEntry backupEntry = it.next();
-
-            if (backupEntry.updateCounter() <= updateCntr)
-                it.remove();
-        }
-
+    void cleanupOnAck(long updateCntr) {
+        backupQ.removeIf(backupEntry -> backupEntry.updateCounter() <= updateCntr);

Review comment:
       Add line break, please

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
##########
@@ -155,8 +155,8 @@ void resetTopologyCache() {
                     if (!entry.isFiltered())
                         entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry));
 
-                    if (log.isDebugEnabled())
-                        log.debug("Partition was lost [lastFiredEvt=" + lastFiredEvt +
+                    if (log.isInfoEnabled())

Review comment:
       I suggest to revert this change. It's not related to the issue.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -376,34 +338,32 @@ private Batch initBatch(AffinityTopologyVersion topVer, boolean backup) {
         /** */
         private CacheContinuousQueryEntry[] entries;
 
-        /** */
-        private final AffinityTopologyVersion topVer;
-
         /**
          * @param filtered Number of filtered events before this batch.
          * @param entries Entries array.
-         * @param topVer Current event topology version.
          * @param startCntr Start counter.
          */
-        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries, AffinityTopologyVersion topVer) {
+        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries) {
             assert startCntr >= 0;
             assert filtered >= 0;
 
             this.startCntr = startCntr;
             this.filtered = filtered;
             this.entries = entries;
-            this.topVer = topVer;
 
             endCntr = startCntr + BUF_SIZE - 1;
         }
 
         /**
-         * @param res Current entries.
-         * @return Entries to send as part of backup queue.
+         * @param filteredFactory Factory which produces filtered entries.
+         * @return Map of collected entries.
          */
-        @Nullable synchronized TreeMap<Long, CacheContinuousQueryEntry> flushCurrentEntries(
-            @Nullable TreeMap<Long, CacheContinuousQueryEntry> res) {
-            if (entries == null)
+        synchronized Map<Long, CacheContinuousQueryEntry> flushCurrentEntries(
+            BiFunction<Long, Long, CacheContinuousQueryEntry> filteredFactory
+        ) {
+            Map<Long, CacheContinuousQueryEntry> res = new HashMap<>();

Review comment:
       Empty map can be returned if `entries == null || filteredFactory == null`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -376,34 +338,32 @@ private Batch initBatch(AffinityTopologyVersion topVer, boolean backup) {
         /** */
         private CacheContinuousQueryEntry[] entries;
 
-        /** */
-        private final AffinityTopologyVersion topVer;
-
         /**
          * @param filtered Number of filtered events before this batch.
          * @param entries Entries array.
-         * @param topVer Current event topology version.
          * @param startCntr Start counter.
          */
-        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries, AffinityTopologyVersion topVer) {
+        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries) {
             assert startCntr >= 0;
             assert filtered >= 0;
 
             this.startCntr = startCntr;
             this.filtered = filtered;
             this.entries = entries;
-            this.topVer = topVer;
 
             endCntr = startCntr + BUF_SIZE - 1;
         }
 
         /**
-         * @param res Current entries.
-         * @return Entries to send as part of backup queue.
+         * @param filteredFactory Factory which produces filtered entries.
+         * @return Map of collected entries.
          */
-        @Nullable synchronized TreeMap<Long, CacheContinuousQueryEntry> flushCurrentEntries(
-            @Nullable TreeMap<Long, CacheContinuousQueryEntry> res) {
-            if (entries == null)
+        synchronized Map<Long, CacheContinuousQueryEntry> flushCurrentEntries(
+            BiFunction<Long, Long, CacheContinuousQueryEntry> filteredFactory
+        ) {
+            Map<Long, CacheContinuousQueryEntry> res = new HashMap<>();

Review comment:
       Can we estimate map size?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -76,80 +78,58 @@
     private final GridAtomicLong ackedUpdCntr = new GridAtomicLong(0);
 
     /**
-     * @param part Partition number.
+     * @param currPartCntr Current partition counter.
      * @param log Continuous query category logger.
      */
-    CacheContinuousQueryEventBuffer(int part, IgniteLogger log) {
-        this.part = part;
+    CacheContinuousQueryEventBuffer(LongUnaryOperator currPartCntr, IgniteLogger log) {
+        this.currPartCntr = currPartCntr;
         this.log = log;
     }
 
     /**
-     * @param part Partition number.
+     * @param log Continuous query category logger.
      */
-    CacheContinuousQueryEventBuffer(int part) {
-        this(part, null);
+    CacheContinuousQueryEventBuffer(IgniteLogger log) {
+        this((backup) -> 0, log);
     }
 
     /**
      * @param updateCntr Acknowledged counter.
      */
-    void cleanupBackupQueue(Long updateCntr) {
-        Iterator<CacheContinuousQueryEntry> it = backupQ.iterator();
-
-        while (it.hasNext()) {
-            CacheContinuousQueryEntry backupEntry = it.next();
-
-            if (backupEntry.updateCounter() <= updateCntr)
-                it.remove();
-        }
-
+    void cleanupOnAck(long updateCntr) {
+        backupQ.removeIf(backupEntry -> backupEntry.updateCounter() <= updateCntr);

Review comment:
       Add line break, please

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
##########
@@ -155,8 +155,8 @@ void resetTopologyCache() {
                     if (!entry.isFiltered())
                         entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry));
 
-                    if (log.isDebugEnabled())
-                        log.debug("Partition was lost [lastFiredEvt=" + lastFiredEvt +
+                    if (log.isInfoEnabled())

Review comment:
       I suggest to revert this change. It's not related to the issue.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -376,34 +338,32 @@ private Batch initBatch(AffinityTopologyVersion topVer, boolean backup) {
         /** */
         private CacheContinuousQueryEntry[] entries;
 
-        /** */
-        private final AffinityTopologyVersion topVer;
-
         /**
          * @param filtered Number of filtered events before this batch.
          * @param entries Entries array.
-         * @param topVer Current event topology version.
          * @param startCntr Start counter.
          */
-        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries, AffinityTopologyVersion topVer) {
+        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries) {
             assert startCntr >= 0;
             assert filtered >= 0;
 
             this.startCntr = startCntr;
             this.filtered = filtered;
             this.entries = entries;
-            this.topVer = topVer;
 
             endCntr = startCntr + BUF_SIZE - 1;
         }
 
         /**
-         * @param res Current entries.
-         * @return Entries to send as part of backup queue.
+         * @param filteredFactory Factory which produces filtered entries.
+         * @return Map of collected entries.
          */
-        @Nullable synchronized TreeMap<Long, CacheContinuousQueryEntry> flushCurrentEntries(
-            @Nullable TreeMap<Long, CacheContinuousQueryEntry> res) {
-            if (entries == null)
+        synchronized Map<Long, CacheContinuousQueryEntry> flushCurrentEntries(
+            BiFunction<Long, Long, CacheContinuousQueryEntry> filteredFactory
+        ) {
+            Map<Long, CacheContinuousQueryEntry> res = new HashMap<>();

Review comment:
       Empty map can be returned if `entries == null || filteredFactory == null`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -376,34 +338,32 @@ private Batch initBatch(AffinityTopologyVersion topVer, boolean backup) {
         /** */
         private CacheContinuousQueryEntry[] entries;
 
-        /** */
-        private final AffinityTopologyVersion topVer;
-
         /**
          * @param filtered Number of filtered events before this batch.
          * @param entries Entries array.
-         * @param topVer Current event topology version.
          * @param startCntr Start counter.
          */
-        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries, AffinityTopologyVersion topVer) {
+        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries) {
             assert startCntr >= 0;
             assert filtered >= 0;
 
             this.startCntr = startCntr;
             this.filtered = filtered;
             this.entries = entries;
-            this.topVer = topVer;
 
             endCntr = startCntr + BUF_SIZE - 1;
         }
 
         /**
-         * @param res Current entries.
-         * @return Entries to send as part of backup queue.
+         * @param filteredFactory Factory which produces filtered entries.
+         * @return Map of collected entries.
          */
-        @Nullable synchronized TreeMap<Long, CacheContinuousQueryEntry> flushCurrentEntries(
-            @Nullable TreeMap<Long, CacheContinuousQueryEntry> res) {
-            if (entries == null)
+        synchronized Map<Long, CacheContinuousQueryEntry> flushCurrentEntries(
+            BiFunction<Long, Long, CacheContinuousQueryEntry> filteredFactory
+        ) {
+            Map<Long, CacheContinuousQueryEntry> res = new HashMap<>();

Review comment:
       Can we estimate map size?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] NSAmelchev commented on a change in pull request #8028: IGNITE-13126: Reduce continuous query heap usage

Posted by GitBox <gi...@apache.org>.
NSAmelchev commented on a change in pull request #8028:
URL: https://github.com/apache/ignite/pull/8028#discussion_r487003898



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -76,80 +78,58 @@
     private final GridAtomicLong ackedUpdCntr = new GridAtomicLong(0);
 
     /**
-     * @param part Partition number.
+     * @param currPartCntr Current partition counter.
      * @param log Continuous query category logger.
      */
-    CacheContinuousQueryEventBuffer(int part, IgniteLogger log) {
-        this.part = part;
+    CacheContinuousQueryEventBuffer(LongUnaryOperator currPartCntr, IgniteLogger log) {
+        this.currPartCntr = currPartCntr;
         this.log = log;
     }
 
     /**
-     * @param part Partition number.
+     * @param log Continuous query category logger.
      */
-    CacheContinuousQueryEventBuffer(int part) {
-        this(part, null);
+    CacheContinuousQueryEventBuffer(IgniteLogger log) {
+        this((backup) -> 0, log);
     }
 
     /**
      * @param updateCntr Acknowledged counter.
      */
-    void cleanupBackupQueue(Long updateCntr) {
-        Iterator<CacheContinuousQueryEntry> it = backupQ.iterator();
-
-        while (it.hasNext()) {
-            CacheContinuousQueryEntry backupEntry = it.next();
-
-            if (backupEntry.updateCounter() <= updateCntr)
-                it.remove();
-        }
-
+    void cleanupOnAck(long updateCntr) {
+        backupQ.removeIf(backupEntry -> backupEntry.updateCounter() <= updateCntr);

Review comment:
       Add line break, please

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
##########
@@ -155,8 +155,8 @@ void resetTopologyCache() {
                     if (!entry.isFiltered())
                         entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry));
 
-                    if (log.isDebugEnabled())
-                        log.debug("Partition was lost [lastFiredEvt=" + lastFiredEvt +
+                    if (log.isInfoEnabled())

Review comment:
       I suggest to revert this change. It's not related to the issue.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -376,34 +338,32 @@ private Batch initBatch(AffinityTopologyVersion topVer, boolean backup) {
         /** */
         private CacheContinuousQueryEntry[] entries;
 
-        /** */
-        private final AffinityTopologyVersion topVer;
-
         /**
          * @param filtered Number of filtered events before this batch.
          * @param entries Entries array.
-         * @param topVer Current event topology version.
          * @param startCntr Start counter.
          */
-        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries, AffinityTopologyVersion topVer) {
+        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries) {
             assert startCntr >= 0;
             assert filtered >= 0;
 
             this.startCntr = startCntr;
             this.filtered = filtered;
             this.entries = entries;
-            this.topVer = topVer;
 
             endCntr = startCntr + BUF_SIZE - 1;
         }
 
         /**
-         * @param res Current entries.
-         * @return Entries to send as part of backup queue.
+         * @param filteredFactory Factory which produces filtered entries.
+         * @return Map of collected entries.
          */
-        @Nullable synchronized TreeMap<Long, CacheContinuousQueryEntry> flushCurrentEntries(
-            @Nullable TreeMap<Long, CacheContinuousQueryEntry> res) {
-            if (entries == null)
+        synchronized Map<Long, CacheContinuousQueryEntry> flushCurrentEntries(
+            BiFunction<Long, Long, CacheContinuousQueryEntry> filteredFactory
+        ) {
+            Map<Long, CacheContinuousQueryEntry> res = new HashMap<>();

Review comment:
       Empty map can be returned if `entries == null || filteredFactory == null`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -376,34 +338,32 @@ private Batch initBatch(AffinityTopologyVersion topVer, boolean backup) {
         /** */
         private CacheContinuousQueryEntry[] entries;
 
-        /** */
-        private final AffinityTopologyVersion topVer;
-
         /**
          * @param filtered Number of filtered events before this batch.
          * @param entries Entries array.
-         * @param topVer Current event topology version.
          * @param startCntr Start counter.
          */
-        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries, AffinityTopologyVersion topVer) {
+        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries) {
             assert startCntr >= 0;
             assert filtered >= 0;
 
             this.startCntr = startCntr;
             this.filtered = filtered;
             this.entries = entries;
-            this.topVer = topVer;
 
             endCntr = startCntr + BUF_SIZE - 1;
         }
 
         /**
-         * @param res Current entries.
-         * @return Entries to send as part of backup queue.
+         * @param filteredFactory Factory which produces filtered entries.
+         * @return Map of collected entries.
          */
-        @Nullable synchronized TreeMap<Long, CacheContinuousQueryEntry> flushCurrentEntries(
-            @Nullable TreeMap<Long, CacheContinuousQueryEntry> res) {
-            if (entries == null)
+        synchronized Map<Long, CacheContinuousQueryEntry> flushCurrentEntries(
+            BiFunction<Long, Long, CacheContinuousQueryEntry> filteredFactory
+        ) {
+            Map<Long, CacheContinuousQueryEntry> res = new HashMap<>();

Review comment:
       Can we estimate map size?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -76,80 +78,58 @@
     private final GridAtomicLong ackedUpdCntr = new GridAtomicLong(0);
 
     /**
-     * @param part Partition number.
+     * @param currPartCntr Current partition counter.
      * @param log Continuous query category logger.
      */
-    CacheContinuousQueryEventBuffer(int part, IgniteLogger log) {
-        this.part = part;
+    CacheContinuousQueryEventBuffer(LongUnaryOperator currPartCntr, IgniteLogger log) {
+        this.currPartCntr = currPartCntr;
         this.log = log;
     }
 
     /**
-     * @param part Partition number.
+     * @param log Continuous query category logger.
      */
-    CacheContinuousQueryEventBuffer(int part) {
-        this(part, null);
+    CacheContinuousQueryEventBuffer(IgniteLogger log) {
+        this((backup) -> 0, log);
     }
 
     /**
      * @param updateCntr Acknowledged counter.
      */
-    void cleanupBackupQueue(Long updateCntr) {
-        Iterator<CacheContinuousQueryEntry> it = backupQ.iterator();
-
-        while (it.hasNext()) {
-            CacheContinuousQueryEntry backupEntry = it.next();
-
-            if (backupEntry.updateCounter() <= updateCntr)
-                it.remove();
-        }
-
+    void cleanupOnAck(long updateCntr) {
+        backupQ.removeIf(backupEntry -> backupEntry.updateCounter() <= updateCntr);

Review comment:
       Add line break, please

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
##########
@@ -155,8 +155,8 @@ void resetTopologyCache() {
                     if (!entry.isFiltered())
                         entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry));
 
-                    if (log.isDebugEnabled())
-                        log.debug("Partition was lost [lastFiredEvt=" + lastFiredEvt +
+                    if (log.isInfoEnabled())

Review comment:
       I suggest to revert this change. It's not related to the issue.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -376,34 +338,32 @@ private Batch initBatch(AffinityTopologyVersion topVer, boolean backup) {
         /** */
         private CacheContinuousQueryEntry[] entries;
 
-        /** */
-        private final AffinityTopologyVersion topVer;
-
         /**
          * @param filtered Number of filtered events before this batch.
          * @param entries Entries array.
-         * @param topVer Current event topology version.
          * @param startCntr Start counter.
          */
-        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries, AffinityTopologyVersion topVer) {
+        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries) {
             assert startCntr >= 0;
             assert filtered >= 0;
 
             this.startCntr = startCntr;
             this.filtered = filtered;
             this.entries = entries;
-            this.topVer = topVer;
 
             endCntr = startCntr + BUF_SIZE - 1;
         }
 
         /**
-         * @param res Current entries.
-         * @return Entries to send as part of backup queue.
+         * @param filteredFactory Factory which produces filtered entries.
+         * @return Map of collected entries.
          */
-        @Nullable synchronized TreeMap<Long, CacheContinuousQueryEntry> flushCurrentEntries(
-            @Nullable TreeMap<Long, CacheContinuousQueryEntry> res) {
-            if (entries == null)
+        synchronized Map<Long, CacheContinuousQueryEntry> flushCurrentEntries(
+            BiFunction<Long, Long, CacheContinuousQueryEntry> filteredFactory
+        ) {
+            Map<Long, CacheContinuousQueryEntry> res = new HashMap<>();

Review comment:
       Empty map can be returned if `entries == null || filteredFactory == null`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -376,34 +338,32 @@ private Batch initBatch(AffinityTopologyVersion topVer, boolean backup) {
         /** */
         private CacheContinuousQueryEntry[] entries;
 
-        /** */
-        private final AffinityTopologyVersion topVer;
-
         /**
          * @param filtered Number of filtered events before this batch.
          * @param entries Entries array.
-         * @param topVer Current event topology version.
          * @param startCntr Start counter.
          */
-        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries, AffinityTopologyVersion topVer) {
+        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries) {
             assert startCntr >= 0;
             assert filtered >= 0;
 
             this.startCntr = startCntr;
             this.filtered = filtered;
             this.entries = entries;
-            this.topVer = topVer;
 
             endCntr = startCntr + BUF_SIZE - 1;
         }
 
         /**
-         * @param res Current entries.
-         * @return Entries to send as part of backup queue.
+         * @param filteredFactory Factory which produces filtered entries.
+         * @return Map of collected entries.
          */
-        @Nullable synchronized TreeMap<Long, CacheContinuousQueryEntry> flushCurrentEntries(
-            @Nullable TreeMap<Long, CacheContinuousQueryEntry> res) {
-            if (entries == null)
+        synchronized Map<Long, CacheContinuousQueryEntry> flushCurrentEntries(
+            BiFunction<Long, Long, CacheContinuousQueryEntry> filteredFactory
+        ) {
+            Map<Long, CacheContinuousQueryEntry> res = new HashMap<>();

Review comment:
       Can we estimate map size?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -76,80 +78,58 @@
     private final GridAtomicLong ackedUpdCntr = new GridAtomicLong(0);
 
     /**
-     * @param part Partition number.
+     * @param currPartCntr Current partition counter.
      * @param log Continuous query category logger.
      */
-    CacheContinuousQueryEventBuffer(int part, IgniteLogger log) {
-        this.part = part;
+    CacheContinuousQueryEventBuffer(LongUnaryOperator currPartCntr, IgniteLogger log) {
+        this.currPartCntr = currPartCntr;
         this.log = log;
     }
 
     /**
-     * @param part Partition number.
+     * @param log Continuous query category logger.
      */
-    CacheContinuousQueryEventBuffer(int part) {
-        this(part, null);
+    CacheContinuousQueryEventBuffer(IgniteLogger log) {
+        this((backup) -> 0, log);
     }
 
     /**
      * @param updateCntr Acknowledged counter.
      */
-    void cleanupBackupQueue(Long updateCntr) {
-        Iterator<CacheContinuousQueryEntry> it = backupQ.iterator();
-
-        while (it.hasNext()) {
-            CacheContinuousQueryEntry backupEntry = it.next();
-
-            if (backupEntry.updateCounter() <= updateCntr)
-                it.remove();
-        }
-
+    void cleanupOnAck(long updateCntr) {
+        backupQ.removeIf(backupEntry -> backupEntry.updateCounter() <= updateCntr);

Review comment:
       Add line break, please

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
##########
@@ -155,8 +155,8 @@ void resetTopologyCache() {
                     if (!entry.isFiltered())
                         entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry));
 
-                    if (log.isDebugEnabled())
-                        log.debug("Partition was lost [lastFiredEvt=" + lastFiredEvt +
+                    if (log.isInfoEnabled())

Review comment:
       I suggest to revert this change. It's not related to the issue.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -376,34 +338,32 @@ private Batch initBatch(AffinityTopologyVersion topVer, boolean backup) {
         /** */
         private CacheContinuousQueryEntry[] entries;
 
-        /** */
-        private final AffinityTopologyVersion topVer;
-
         /**
          * @param filtered Number of filtered events before this batch.
          * @param entries Entries array.
-         * @param topVer Current event topology version.
          * @param startCntr Start counter.
          */
-        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries, AffinityTopologyVersion topVer) {
+        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries) {
             assert startCntr >= 0;
             assert filtered >= 0;
 
             this.startCntr = startCntr;
             this.filtered = filtered;
             this.entries = entries;
-            this.topVer = topVer;
 
             endCntr = startCntr + BUF_SIZE - 1;
         }
 
         /**
-         * @param res Current entries.
-         * @return Entries to send as part of backup queue.
+         * @param filteredFactory Factory which produces filtered entries.
+         * @return Map of collected entries.
          */
-        @Nullable synchronized TreeMap<Long, CacheContinuousQueryEntry> flushCurrentEntries(
-            @Nullable TreeMap<Long, CacheContinuousQueryEntry> res) {
-            if (entries == null)
+        synchronized Map<Long, CacheContinuousQueryEntry> flushCurrentEntries(
+            BiFunction<Long, Long, CacheContinuousQueryEntry> filteredFactory
+        ) {
+            Map<Long, CacheContinuousQueryEntry> res = new HashMap<>();

Review comment:
       Empty map can be returned if `entries == null || filteredFactory == null`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -376,34 +338,32 @@ private Batch initBatch(AffinityTopologyVersion topVer, boolean backup) {
         /** */
         private CacheContinuousQueryEntry[] entries;
 
-        /** */
-        private final AffinityTopologyVersion topVer;
-
         /**
          * @param filtered Number of filtered events before this batch.
          * @param entries Entries array.
-         * @param topVer Current event topology version.
          * @param startCntr Start counter.
          */
-        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries, AffinityTopologyVersion topVer) {
+        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries) {
             assert startCntr >= 0;
             assert filtered >= 0;
 
             this.startCntr = startCntr;
             this.filtered = filtered;
             this.entries = entries;
-            this.topVer = topVer;
 
             endCntr = startCntr + BUF_SIZE - 1;
         }
 
         /**
-         * @param res Current entries.
-         * @return Entries to send as part of backup queue.
+         * @param filteredFactory Factory which produces filtered entries.
+         * @return Map of collected entries.
          */
-        @Nullable synchronized TreeMap<Long, CacheContinuousQueryEntry> flushCurrentEntries(
-            @Nullable TreeMap<Long, CacheContinuousQueryEntry> res) {
-            if (entries == null)
+        synchronized Map<Long, CacheContinuousQueryEntry> flushCurrentEntries(
+            BiFunction<Long, Long, CacheContinuousQueryEntry> filteredFactory
+        ) {
+            Map<Long, CacheContinuousQueryEntry> res = new HashMap<>();

Review comment:
       Can we estimate map size?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -76,80 +78,58 @@
     private final GridAtomicLong ackedUpdCntr = new GridAtomicLong(0);
 
     /**
-     * @param part Partition number.
+     * @param currPartCntr Current partition counter.
      * @param log Continuous query category logger.
      */
-    CacheContinuousQueryEventBuffer(int part, IgniteLogger log) {
-        this.part = part;
+    CacheContinuousQueryEventBuffer(LongUnaryOperator currPartCntr, IgniteLogger log) {
+        this.currPartCntr = currPartCntr;
         this.log = log;
     }
 
     /**
-     * @param part Partition number.
+     * @param log Continuous query category logger.
      */
-    CacheContinuousQueryEventBuffer(int part) {
-        this(part, null);
+    CacheContinuousQueryEventBuffer(IgniteLogger log) {
+        this((backup) -> 0, log);
     }
 
     /**
      * @param updateCntr Acknowledged counter.
      */
-    void cleanupBackupQueue(Long updateCntr) {
-        Iterator<CacheContinuousQueryEntry> it = backupQ.iterator();
-
-        while (it.hasNext()) {
-            CacheContinuousQueryEntry backupEntry = it.next();
-
-            if (backupEntry.updateCounter() <= updateCntr)
-                it.remove();
-        }
-
+    void cleanupOnAck(long updateCntr) {
+        backupQ.removeIf(backupEntry -> backupEntry.updateCounter() <= updateCntr);

Review comment:
       Add line break, please

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
##########
@@ -155,8 +155,8 @@ void resetTopologyCache() {
                     if (!entry.isFiltered())
                         entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry));
 
-                    if (log.isDebugEnabled())
-                        log.debug("Partition was lost [lastFiredEvt=" + lastFiredEvt +
+                    if (log.isInfoEnabled())

Review comment:
       I suggest to revert this change. It's not related to the issue.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -376,34 +338,32 @@ private Batch initBatch(AffinityTopologyVersion topVer, boolean backup) {
         /** */
         private CacheContinuousQueryEntry[] entries;
 
-        /** */
-        private final AffinityTopologyVersion topVer;
-
         /**
          * @param filtered Number of filtered events before this batch.
          * @param entries Entries array.
-         * @param topVer Current event topology version.
          * @param startCntr Start counter.
          */
-        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries, AffinityTopologyVersion topVer) {
+        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries) {
             assert startCntr >= 0;
             assert filtered >= 0;
 
             this.startCntr = startCntr;
             this.filtered = filtered;
             this.entries = entries;
-            this.topVer = topVer;
 
             endCntr = startCntr + BUF_SIZE - 1;
         }
 
         /**
-         * @param res Current entries.
-         * @return Entries to send as part of backup queue.
+         * @param filteredFactory Factory which produces filtered entries.
+         * @return Map of collected entries.
          */
-        @Nullable synchronized TreeMap<Long, CacheContinuousQueryEntry> flushCurrentEntries(
-            @Nullable TreeMap<Long, CacheContinuousQueryEntry> res) {
-            if (entries == null)
+        synchronized Map<Long, CacheContinuousQueryEntry> flushCurrentEntries(
+            BiFunction<Long, Long, CacheContinuousQueryEntry> filteredFactory
+        ) {
+            Map<Long, CacheContinuousQueryEntry> res = new HashMap<>();

Review comment:
       Empty map can be returned if `entries == null || filteredFactory == null`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -376,34 +338,32 @@ private Batch initBatch(AffinityTopologyVersion topVer, boolean backup) {
         /** */
         private CacheContinuousQueryEntry[] entries;
 
-        /** */
-        private final AffinityTopologyVersion topVer;
-
         /**
          * @param filtered Number of filtered events before this batch.
          * @param entries Entries array.
-         * @param topVer Current event topology version.
          * @param startCntr Start counter.
          */
-        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries, AffinityTopologyVersion topVer) {
+        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries) {
             assert startCntr >= 0;
             assert filtered >= 0;
 
             this.startCntr = startCntr;
             this.filtered = filtered;
             this.entries = entries;
-            this.topVer = topVer;
 
             endCntr = startCntr + BUF_SIZE - 1;
         }
 
         /**
-         * @param res Current entries.
-         * @return Entries to send as part of backup queue.
+         * @param filteredFactory Factory which produces filtered entries.
+         * @return Map of collected entries.
          */
-        @Nullable synchronized TreeMap<Long, CacheContinuousQueryEntry> flushCurrentEntries(
-            @Nullable TreeMap<Long, CacheContinuousQueryEntry> res) {
-            if (entries == null)
+        synchronized Map<Long, CacheContinuousQueryEntry> flushCurrentEntries(
+            BiFunction<Long, Long, CacheContinuousQueryEntry> filteredFactory
+        ) {
+            Map<Long, CacheContinuousQueryEntry> res = new HashMap<>();

Review comment:
       Can we estimate map size?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #8028: IGNITE-13126: Reduce continuous query heap usage

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #8028:
URL: https://github.com/apache/ignite/pull/8028#discussion_r487793262



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -76,80 +78,58 @@
     private final GridAtomicLong ackedUpdCntr = new GridAtomicLong(0);
 
     /**
-     * @param part Partition number.
+     * @param currPartCntr Current partition counter.
      * @param log Continuous query category logger.
      */
-    CacheContinuousQueryEventBuffer(int part, IgniteLogger log) {
-        this.part = part;
+    CacheContinuousQueryEventBuffer(LongUnaryOperator currPartCntr, IgniteLogger log) {
+        this.currPartCntr = currPartCntr;
         this.log = log;
     }
 
     /**
-     * @param part Partition number.
+     * @param log Continuous query category logger.
      */
-    CacheContinuousQueryEventBuffer(int part) {
-        this(part, null);
+    CacheContinuousQueryEventBuffer(IgniteLogger log) {
+        this((backup) -> 0, log);
     }
 
     /**
      * @param updateCntr Acknowledged counter.
      */
-    void cleanupBackupQueue(Long updateCntr) {
-        Iterator<CacheContinuousQueryEntry> it = backupQ.iterator();
-
-        while (it.hasNext()) {
-            CacheContinuousQueryEntry backupEntry = it.next();
-
-            if (backupEntry.updateCounter() <= updateCntr)
-                it.remove();
-        }
-
+    void cleanupOnAck(long updateCntr) {
+        backupQ.removeIf(backupEntry -> backupEntry.updateCounter() <= updateCntr);

Review comment:
       Fixed

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
##########
@@ -155,8 +155,8 @@ void resetTopologyCache() {
                     if (!entry.isFiltered())
                         entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry));
 
-                    if (log.isDebugEnabled())
-                        log.debug("Partition was lost [lastFiredEvt=" + lastFiredEvt +
+                    if (log.isInfoEnabled())

Review comment:
       Reverted.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -376,34 +338,32 @@ private Batch initBatch(AffinityTopologyVersion topVer, boolean backup) {
         /** */
         private CacheContinuousQueryEntry[] entries;
 
-        /** */
-        private final AffinityTopologyVersion topVer;
-
         /**
          * @param filtered Number of filtered events before this batch.
          * @param entries Entries array.
-         * @param topVer Current event topology version.
          * @param startCntr Start counter.
          */
-        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries, AffinityTopologyVersion topVer) {
+        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries) {
             assert startCntr >= 0;
             assert filtered >= 0;
 
             this.startCntr = startCntr;
             this.filtered = filtered;
             this.entries = entries;
-            this.topVer = topVer;
 
             endCntr = startCntr + BUF_SIZE - 1;
         }
 
         /**
-         * @param res Current entries.
-         * @return Entries to send as part of backup queue.
+         * @param filteredFactory Factory which produces filtered entries.
+         * @return Map of collected entries.
          */
-        @Nullable synchronized TreeMap<Long, CacheContinuousQueryEntry> flushCurrentEntries(
-            @Nullable TreeMap<Long, CacheContinuousQueryEntry> res) {
-            if (entries == null)
+        synchronized Map<Long, CacheContinuousQueryEntry> flushCurrentEntries(
+            BiFunction<Long, Long, CacheContinuousQueryEntry> filteredFactory
+        ) {
+            Map<Long, CacheContinuousQueryEntry> res = new HashMap<>();

Review comment:
       Agree, Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] NSAmelchev commented on a change in pull request #8028: IGNITE-13126: Reduce continuous query heap usage

Posted by GitBox <gi...@apache.org>.
NSAmelchev commented on a change in pull request #8028:
URL: https://github.com/apache/ignite/pull/8028#discussion_r487003898



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -76,80 +78,58 @@
     private final GridAtomicLong ackedUpdCntr = new GridAtomicLong(0);
 
     /**
-     * @param part Partition number.
+     * @param currPartCntr Current partition counter.
      * @param log Continuous query category logger.
      */
-    CacheContinuousQueryEventBuffer(int part, IgniteLogger log) {
-        this.part = part;
+    CacheContinuousQueryEventBuffer(LongUnaryOperator currPartCntr, IgniteLogger log) {
+        this.currPartCntr = currPartCntr;
         this.log = log;
     }
 
     /**
-     * @param part Partition number.
+     * @param log Continuous query category logger.
      */
-    CacheContinuousQueryEventBuffer(int part) {
-        this(part, null);
+    CacheContinuousQueryEventBuffer(IgniteLogger log) {
+        this((backup) -> 0, log);
     }
 
     /**
      * @param updateCntr Acknowledged counter.
      */
-    void cleanupBackupQueue(Long updateCntr) {
-        Iterator<CacheContinuousQueryEntry> it = backupQ.iterator();
-
-        while (it.hasNext()) {
-            CacheContinuousQueryEntry backupEntry = it.next();
-
-            if (backupEntry.updateCounter() <= updateCntr)
-                it.remove();
-        }
-
+    void cleanupOnAck(long updateCntr) {
+        backupQ.removeIf(backupEntry -> backupEntry.updateCounter() <= updateCntr);

Review comment:
       Add line break, please

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
##########
@@ -155,8 +155,8 @@ void resetTopologyCache() {
                     if (!entry.isFiltered())
                         entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry));
 
-                    if (log.isDebugEnabled())
-                        log.debug("Partition was lost [lastFiredEvt=" + lastFiredEvt +
+                    if (log.isInfoEnabled())

Review comment:
       I suggest to revert this change. It's not related to the issue.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf merged pull request #8028: IGNITE-13126: Reduce continuous query heap usage

Posted by GitBox <gi...@apache.org>.
Mmuzaf merged pull request #8028:
URL: https://github.com/apache/ignite/pull/8028


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] NSAmelchev commented on a change in pull request #8028: IGNITE-13126: Reduce continuous query heap usage

Posted by GitBox <gi...@apache.org>.
NSAmelchev commented on a change in pull request #8028:
URL: https://github.com/apache/ignite/pull/8028#discussion_r487098959



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -376,34 +338,32 @@ private Batch initBatch(AffinityTopologyVersion topVer, boolean backup) {
         /** */
         private CacheContinuousQueryEntry[] entries;
 
-        /** */
-        private final AffinityTopologyVersion topVer;
-
         /**
          * @param filtered Number of filtered events before this batch.
          * @param entries Entries array.
-         * @param topVer Current event topology version.
          * @param startCntr Start counter.
          */
-        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries, AffinityTopologyVersion topVer) {
+        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries) {
             assert startCntr >= 0;
             assert filtered >= 0;
 
             this.startCntr = startCntr;
             this.filtered = filtered;
             this.entries = entries;
-            this.topVer = topVer;
 
             endCntr = startCntr + BUF_SIZE - 1;
         }
 
         /**
-         * @param res Current entries.
-         * @return Entries to send as part of backup queue.
+         * @param filteredFactory Factory which produces filtered entries.
+         * @return Map of collected entries.
          */
-        @Nullable synchronized TreeMap<Long, CacheContinuousQueryEntry> flushCurrentEntries(
-            @Nullable TreeMap<Long, CacheContinuousQueryEntry> res) {
-            if (entries == null)
+        synchronized Map<Long, CacheContinuousQueryEntry> flushCurrentEntries(
+            BiFunction<Long, Long, CacheContinuousQueryEntry> filteredFactory
+        ) {
+            Map<Long, CacheContinuousQueryEntry> res = new HashMap<>();

Review comment:
       Can we estimate map size?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org