You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2020/06/01 15:56:03 UTC

[GitHub] [ignite] Mmuzaf opened a new pull request #7881: IGNITE-10959: Continuous query pending buffer limit

Mmuzaf opened a new pull request #7881:
URL: https://github.com/apache/ignite/pull/7881


   Thank you for submitting the pull request to the Apache Ignite.
   
   In order to streamline the review of the contribution 
   we ask you to ensure the following steps have been taken:
   
   ### The Contribution Checklist
   - [ ] There is a single JIRA ticket related to the pull request. 
   - [ ] The web-link to the pull request is attached to the JIRA ticket.
   - [ ] The JIRA ticket has the _Patch Available_ state.
   - [ ] The pull request body describes changes that have been made. 
   The description explains _WHAT_ and _WHY_ was made instead of _HOW_.
   - [ ] The pull request title is treated as the final commit message. 
   The following pattern must be used: `IGNITE-12407: Add Cluster API support to Java thin client`
   - [ ] A reviewer has been mentioned through the JIRA comments 
   (see [the Maintainers list](https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute#HowtoContribute-ReviewProcessandMaintainers)) 
   - [ ] The pull request has been checked by the Teamcity Bot and 
   the `green visa` attached to the JIRA ticket (see [TC.Bot: Check PR](https://mtcga.gridgain.com/prs.html))
   
   ### Notes
   - [How to Contribute](https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute)
   - [Coding abbreviation rules](https://cwiki.apache.org/confluence/display/IGNITE/Abbreviation+Rules)
   - [Coding Guidelines](https://cwiki.apache.org/confluence/display/IGNITE/Coding+Guidelines)
   - [Apache Ignite Teamcity Bot](https://cwiki.apache.org/confluence/display/IGNITE/Apache+Ignite+Teamcity+Bot)
   
   If you need any help, please email dev@ignite.apache.org or ask anу advice on http://asf.slack.com _#ignite_ channel.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #7881: IGNITE-10959: Continuous query pending buffer limit

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #7881:
URL: https://github.com/apache/ignite/pull/7881#discussion_r451094673



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -26,40 +26,69 @@
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.GridAtomicLong;
+import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.util.deque.FastSizeDeque;
 import org.jetbrains.annotations.Nullable;
 
 /**
  *
  */
 public class CacheContinuousQueryEventBuffer {
-    /** */
+    /** Maximum size of buffer for pending events. Default value is {@code 10_000}. */
+    public static final int MAX_PENDING_BUFF_SIZE =
+        IgniteSystemProperties.getInteger("IGNITE_CONTINUOUS_QUERY_PENDING_BUFF_SIZE", 10_000);
+
+    /** Batch buffer size. */
     private static final int BUF_SIZE =
         IgniteSystemProperties.getInteger("IGNITE_CONTINUOUS_QUERY_SERVER_BUFFER_SIZE", 1000);
 
     /** */
     private static final Object RETRY = new Object();
 
-    /** */
-    protected final int part;
+    /** Continuous query category logger. */
+    private final IgniteLogger log;
 
     /** */
-    private AtomicReference<Batch> curBatch = new AtomicReference<>();
+    private final int part;
 
-    /** */
-    private FastSizeDeque<CacheContinuousQueryEntry> backupQ = new FastSizeDeque<>(new ConcurrentLinkedDeque<>());
+    /** Batch of entries currently being collected to send to the remote. */
+    private final AtomicReference<Batch> curBatch = new AtomicReference<>();
 
-    /** */
-    private ConcurrentSkipListMap<Long, CacheContinuousQueryEntry> pending = new ConcurrentSkipListMap<>();
+    /** Queue for keeping backup entries which partition counter less the counter processing by current batch. */
+    private final FastSizeDeque<CacheContinuousQueryEntry> backupQ = new FastSizeDeque<>(new ConcurrentLinkedDeque<>());
+
+    /** Entries which are waiting for being processed. */
+    private final ConcurrentSkipListMap<Long, CacheContinuousQueryEntry> pending = new ConcurrentSkipListMap<>();
+
+    /**
+     * The size method of the pending ConcurrentSkipListMap is not a constant-time operation. Since each

Review comment:
       `size()` method is almost constant time operation, there is a long adder inside and usually, there is only one cell in it (but at most CPU count cells). If you have low contention on `pending` map access to `pending.size()` will cost relatively low. If you have very high contention on `pending` map, I think `AtomicInteger.incrementAndGet()` can become a bottleneck here. 
   So I'm not sure about this optimization, perhaps it's better to use `pending.size()`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #7881: IGNITE-10959: Continuous query pending buffer limit

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #7881:
URL: https://github.com/apache/ignite/pull/7881#discussion_r443685211



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -172,21 +210,50 @@ private Object process0(long cntr, CacheContinuousQueryEntry entry, boolean back
                 if (res == RETRY)
                     continue;
             }
-            else
+            else {
+                if (batch.endCntr < ackedUpdCntr)
+                    batch.tryRollOver(entry.topologyVersion());
+
+                if (pendingSize.get() > MAX_PENDING_BUFF_SIZE) {
+                    LT.warn(log, "Buffer for pending events reached max of its size " +
+                        "[cacheId=" + entry.cacheId() + ", maxSize=" + MAX_PENDING_BUFF_SIZE +
+                        ", partId=" + entry.partition() + ']');
+
+                    // Remove first BUFF_SIZE keys.
+                    int keysToRemove = BUF_SIZE;
+
+                    Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pending.entrySet().iterator();
+
+                    res = new ArrayList<>();
+
+                    while (iter.hasNext() && keysToRemove > 0) {
+                        // Collecting results ignore backup flag due to batch may not be switched.
+                        res = addResult(res, iter.next().getValue(), false);
+
+                        iter.remove();
+                        pendingSize.decrementAndGet();
+
+                        keysToRemove--;
+                    }
+                }
+
+                pendingSize.incrementAndGet();
                 pending.put(cntr, entry);
+            }
 
             break;
         }
 
         Batch batch0 = curBatch.get();
 
+        // Batch has been changed on entry processing to the new one.
         if (batch0 != batch) {

Review comment:
       I'll change this under the IGNITE-13126.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #7881: IGNITE-10959: Continuous query pending buffer limit

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #7881:
URL: https://github.com/apache/ignite/pull/7881#discussion_r451390459



##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBufferLimitTest.java
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.event.CacheEntryEvent;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.GridAtomicLong;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.systemview.view.ContinuousQueryView;
+import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEventBuffer.MAX_PENDING_BUFF_SIZE;
+import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.CQ_SYS_VIEW;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class CacheContinuousQueryBufferLimitTest extends GridCommonAbstractTest {
+    /** Cache partitions count. */
+    private static final int PARTS = 1;
+
+    /** Total number of cache keys. */
+    private static final int TOTAL_KEYS = 1024;
+
+    /** Timeout to wait for pending buffer overflow. */
+    private static final long OVERFLOW_TIMEOUT_MS = 10_000L;
+
+    /** Default remote no-op filter. */
+    private static final CacheEntryEventSerializableFilter<Integer, Integer> RMT_FILTER = e -> true;
+
+    /** Counter of cache messages being send. */
+    private final AtomicInteger msgCntr = new AtomicInteger();
+
+    /** Cache mode. */
+    @Parameterized.Parameter(0)
+    public CacheMode cacheMode;
+
+    /** Cache atomicity mode. */
+    @Parameterized.Parameter(1)
+    public CacheAtomicityMode atomicityMode;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "cacheMode={0}, atomicityMode={1}")
+    public static Collection<?> parameters() {
+        return Arrays.asList(new Object[][] {
+            {REPLICATED, ATOMIC},
+            {REPLICATED, TRANSACTIONAL},
+            {PARTITIONED, ATOMIC},
+            {PARTITIONED, TRANSACTIONAL}
+        });
+    }
+
+    /**
+     * Local pending limit for this test is less than MAX_PENDING_BUFF_SIZE,
+     * so pending entries must be cleaned prior to reaching it.
+     *
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testContinuousQueryBatchSwitchOnAck() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10, MAX_PENDING_BUFF_SIZE / 10);
+    }
+
+    /**
+     * The test blocks switching current CacheContinuousQueryEventBuffer.Batch to the new one, so
+     * pending entries will be processed (dropped on backups and send to the client on primaries)
+     * when the MAX_PENDING_BUFF_SIZE is reached.
+     *
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testContinuousQueryPendingBufferLimit() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            (msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10) ||
+                msg instanceof CacheContinuousQueryBatchAck, (int)(MAX_PENDING_BUFF_SIZE * 1.1));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testPendingSendToClientOnLimitReached() throws Exception {
+        AtomicInteger keys = new AtomicInteger();
+        AtomicReference<String> err = new AtomicReference<>();
+
+        IgniteEx srv = startGrids(2);
+        IgniteEx clnt = startClientGrid();
+
+        CacheEntryEventSerializableFilter<Integer, Integer> filter = new CacheEntryEventSerializableFilter<Integer,Integer>() {

Review comment:
       This `filter` used twice: check that all accepted by client events are filtered (on buffer overflowed we send entries to from the server to the client), and also used as a filter condition.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #7881: IGNITE-10959: Continuous query pending buffer limit

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #7881:
URL: https://github.com/apache/ignite/pull/7881#discussion_r451491459



##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBufferLimitTest.java
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.event.CacheEntryEvent;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.GridAtomicLong;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.systemview.view.ContinuousQueryView;
+import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEventBuffer.MAX_PENDING_BUFF_SIZE;
+import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.CQ_SYS_VIEW;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class CacheContinuousQueryBufferLimitTest extends GridCommonAbstractTest {
+    /** Cache partitions count. */
+    private static final int PARTS = 1;
+
+    /** Total number of cache keys. */
+    private static final int TOTAL_KEYS = 1024;
+
+    /** Timeout to wait for pending buffer overflow. */
+    private static final long OVERFLOW_TIMEOUT_MS = 10_000L;
+
+    /** Default remote no-op filter. */
+    private static final CacheEntryEventSerializableFilter<Integer, Integer> RMT_FILTER = e -> true;
+
+    /** Counter of cache messages being send. */
+    private final AtomicInteger msgCntr = new AtomicInteger();
+
+    /** Cache mode. */
+    @Parameterized.Parameter(0)
+    public CacheMode cacheMode;
+
+    /** Cache atomicity mode. */
+    @Parameterized.Parameter(1)
+    public CacheAtomicityMode atomicityMode;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "cacheMode={0}, atomicityMode={1}")
+    public static Collection<?> parameters() {
+        return Arrays.asList(new Object[][] {
+            {REPLICATED, ATOMIC},
+            {REPLICATED, TRANSACTIONAL},
+            {PARTITIONED, ATOMIC},
+            {PARTITIONED, TRANSACTIONAL}
+        });
+    }
+
+    /**
+     * Local pending limit for this test is less than MAX_PENDING_BUFF_SIZE,
+     * so pending entries must be cleaned prior to reaching it.
+     *
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testContinuousQueryBatchSwitchOnAck() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10, MAX_PENDING_BUFF_SIZE / 10);
+    }
+
+    /**
+     * The test blocks switching current CacheContinuousQueryEventBuffer.Batch to the new one, so
+     * pending entries will be processed (dropped on backups and send to the client on primaries)
+     * when the MAX_PENDING_BUFF_SIZE is reached.
+     *
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testContinuousQueryPendingBufferLimit() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            (msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10) ||
+                msg instanceof CacheContinuousQueryBatchAck, (int)(MAX_PENDING_BUFF_SIZE * 1.1));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testPendingSendToClientOnLimitReached() throws Exception {
+        AtomicInteger keys = new AtomicInteger();
+        AtomicReference<String> err = new AtomicReference<>();
+
+        IgniteEx srv = startGrids(2);
+        IgniteEx clnt = startClientGrid();
+
+        CacheEntryEventSerializableFilter<Integer, Integer> filter = new CacheEntryEventSerializableFilter<Integer,Integer>() {
+            @Override public boolean evaluate(CacheEntryEvent<? extends Integer,? extends Integer> evt) {
+                return evt.getKey() % 2 == 0;
+            }
+        };
+
+        ContinuousQuery<Integer, Integer> cq = new ContinuousQuery<>();
+        cq.setRemoteFilterFactory(FactoryBuilder.factoryOf(filter));
+        cq.setLocalListener((events) -> events.forEach(e -> {
+            if (!filter.evaluate(e))
+                err.compareAndSet(null, "Key must be filtered [e=" + e + ']');
+        }));
+        cq.setLocal(false);
+
+        spi(srv).blockMessages((nodeId, msg) -> msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10);
+        spi(clnt).blockMessages((nodeId, msg) -> msg instanceof CacheContinuousQueryBatchAck);
+
+        IgniteInternalFuture<?> loadFut = null;
+
+        try (QueryCursor<?> qry = clnt.cache(DEFAULT_CACHE_NAME).query(cq)) {
+            awaitPartitionMapExchange();
+
+            loadFut = GridTestUtils.runMultiThreadedAsync(() -> {
+                while (!Thread.currentThread().isInterrupted())
+                    clnt.cache(DEFAULT_CACHE_NAME).put(keys.incrementAndGet(), 0);
+            }, 3, "cq-put-");
+
+            ConcurrentMap<Long, CacheContinuousQueryEntry> pending =
+                getContinuousQueryPendingBuffer(grid(0), CU.cacheId(DEFAULT_CACHE_NAME), 0);
+
+            waitForCondition(() -> pending.size() > MAX_PENDING_BUFF_SIZE, 15_000);
+
+            // Check all entries greater than limit filtered correctly.
+            waitForCondition(() -> keys.get() > MAX_PENDING_BUFF_SIZE * 2, 15_000);

Review comment:
       I've fixed comment. This means all entries with counter greater than exceeded limit must be filtered correctly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #7881: IGNITE-10959: Continuous query pending buffer limit

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #7881:
URL: https://github.com/apache/ignite/pull/7881#discussion_r450149629



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -172,21 +204,51 @@ private Object process0(long cntr, CacheContinuousQueryEntry entry, boolean back
                 if (res == RETRY)
                     continue;
             }
-            else
+            else {
+                if (batch.endCntr < ackedUpdCntr.get())
+                    batch.tryRollOver(entry.topologyVersion());
+
+                if (pendingSize.get() > MAX_PENDING_BUFF_SIZE) {
+                    LT.warn(log, "Buffer for pending events reached max of its size " +
+                        "[cacheId=" + entry.cacheId() + ", maxSize=" + MAX_PENDING_BUFF_SIZE +
+                        ", partId=" + entry.partition() + ']');
+
+                    // Remove first BUFF_SIZE keys.
+                    int keysToRemove = BUF_SIZE;
+
+                    Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pending.entrySet().iterator();
+
+                    while (iter.hasNext() && keysToRemove > 0) {
+                        CacheContinuousQueryEntry entry0 = iter.next().getValue();
+
+                        // Discard messages on backup and send to client if primary.
+                        if (!backup)
+                            res = addResult(res, entry0, !backup);

Review comment:
       Fixed.

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBufferLimitTest.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.cache.configuration.FactoryBuilder;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.systemview.view.ContinuousQueryView;
+import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.CQ_SYS_VIEW;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class CacheContinuousQueryBufferLimitTest extends GridCommonAbstractTest {
+    /** Cache partitions count. */
+    private static final int PARTS = 1;
+
+    /** Total number of cache keys. */
+    private static final int TOTAL_KEYS = 1024;
+
+    /** Number of pending entries.  */
+    private static final int PENDING_LIMIT = 1100;
+
+    /** Timeout to wait for pending buffer overflow. */
+    private static final long OVERFLOW_TIMEOUT_MS = 15_000L;
+
+    /** Default remote no-op filter. */
+    private static final CacheEntryEventSerializableFilter<Integer, Integer> RMT_FILTER = e -> true;
+
+    /** Counter of cache messages being send. */
+    private final AtomicInteger msgCntr = new AtomicInteger();
+
+    /** Cache mode. */
+    @Parameterized.Parameter(0)
+    public CacheMode cacheMode;
+
+    /** Cache atomicity mode. */
+    @Parameterized.Parameter(1)
+    public CacheAtomicityMode atomicityMode;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "cacheMode={0}, atomicityMode={1}")
+    public static Collection<?> parameters() {
+        return Arrays.asList(new Object[][] {
+            {REPLICATED, ATOMIC},
+            {REPLICATED, TRANSACTIONAL},
+            {PARTITIONED, ATOMIC},
+            {PARTITIONED, TRANSACTIONAL}
+        });
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testContinuousQueryBatchSwitchOnAck() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10);
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    @WithSystemProperty(key = "IGNITE_CONTINUOUS_QUERY_PENDING_BUFF_SIZE", value = "1000")
+    public void testContinuousQueryPendingBufferLimit() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            (msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10) ||
+                msg instanceof CacheContinuousQueryBatchAck);
+    }
+
+

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #7881: IGNITE-10959: Continuous query pending buffer limit

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #7881:
URL: https://github.com/apache/ignite/pull/7881#discussion_r450218454



##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBufferLimitTest.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.cache.configuration.FactoryBuilder;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.systemview.view.ContinuousQueryView;
+import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.CQ_SYS_VIEW;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class CacheContinuousQueryBufferLimitTest extends GridCommonAbstractTest {
+    /** Cache partitions count. */
+    private static final int PARTS = 1;
+
+    /** Total number of cache keys. */
+    private static final int TOTAL_KEYS = 1024;
+
+    /** Number of pending entries.  */
+    private static final int PENDING_LIMIT = 1100;
+
+    /** Timeout to wait for pending buffer overflow. */
+    private static final long OVERFLOW_TIMEOUT_MS = 15_000L;
+
+    /** Default remote no-op filter. */
+    private static final CacheEntryEventSerializableFilter<Integer, Integer> RMT_FILTER = e -> true;
+
+    /** Counter of cache messages being send. */
+    private final AtomicInteger msgCntr = new AtomicInteger();
+
+    /** Cache mode. */
+    @Parameterized.Parameter(0)
+    public CacheMode cacheMode;
+
+    /** Cache atomicity mode. */
+    @Parameterized.Parameter(1)
+    public CacheAtomicityMode atomicityMode;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "cacheMode={0}, atomicityMode={1}")
+    public static Collection<?> parameters() {
+        return Arrays.asList(new Object[][] {
+            {REPLICATED, ATOMIC},
+            {REPLICATED, TRANSACTIONAL},
+            {PARTITIONED, ATOMIC},
+            {PARTITIONED, TRANSACTIONAL}
+        });
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testContinuousQueryBatchSwitchOnAck() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10);
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    @WithSystemProperty(key = "IGNITE_CONTINUOUS_QUERY_PENDING_BUFF_SIZE", value = "1000")
+    public void testContinuousQueryPendingBufferLimit() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            (msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10) ||
+                msg instanceof CacheContinuousQueryBatchAck);
+    }
+
+
+    /** {@inheritDoc} */
+    @Override public IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setCommunicationSpi(new TestRecordingCommunicationSpi())
+            .setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+                .setAtomicityMode(atomicityMode)
+                .setCacheMode(cacheMode)
+                .setBackups(1)
+                .setAffinity(new RendezvousAffinityFunction(false, PARTS)));
+    }
+
+    /** */
+    @Before
+    public void resetMessageCounter() {
+        msgCntr.set(0);
+    }
+
+    /** */
+    @After
+    public void stopAllInstances() {
+        stopAllGrids();
+    }
+
+    /**
+     * @param locBlockPred Block predicate on local node to emulate message delivery issues.
+     * @throws Exception If fails.
+     */
+    private void doTestContinuousQueryPendingBufferLimit(
+        IgniteBiPredicate<ClusterNode, Message> locBlockPred
+    ) throws Exception
+    {
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        IgniteEx locIgnite = startGrid(0);
+        IgniteEx rmtIgnite = startGrid(1);
+
+        IgniteCache<Integer, Integer> cache = locIgnite.cache(DEFAULT_CACHE_NAME);
+        CacheConfiguration<Integer, Integer> ccfg = cache.getConfiguration(CacheConfiguration.class);
+
+        for (int i = 0; i < TOTAL_KEYS; i++)
+            cache.put(i, i);
+
+        assertEquals(PARTS, ccfg.getAffinity().partitions());
+
+        AtomicLong lastAcked = new AtomicLong();
+
+        ContinuousQuery<Integer, Integer> cq = new ContinuousQuery<>();
+        cq.setRemoteFilterFactory(FactoryBuilder.factoryOf(RMT_FILTER));
+        cq.setLocalListener((events) ->
+            events.forEach(e ->
+                lastAcked.getAndUpdate(c ->
+                    Math.max(c, ((CacheQueryEntryEvent<?, ?>)e).getPartitionUpdateCounter()))));
+        cq.setLocal(false);
+
+        IgniteInternalFuture<?> updFut = null;
+
+        try (QueryCursor<?> qry = locIgnite.cache(DEFAULT_CACHE_NAME).query(cq)) {
+            awaitPartitionMapExchange();
+
+            for (int j = 0; j < TOTAL_KEYS; j++)
+                putX2Value(cache, rnd.nextInt(TOTAL_KEYS));
+
+            SystemView<ContinuousQueryView> rmtQryView = rmtIgnite.context().systemView().view(CQ_SYS_VIEW);
+            assertEquals(1, rmtQryView.size());
+
+            UUID routineId = rmtQryView.iterator().next().routineId();
+
+            // Partition Id, Update Counter, Continuous Entry.
+            ConcurrentMap<Long, CacheContinuousQueryEntry> pending =
+                getContinuousQueryPendingBuffer(rmtIgnite, routineId, CU.cacheId(DEFAULT_CACHE_NAME), 0);
+
+            spi(locIgnite).blockMessages(locBlockPred);
+
+            updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+                while (!Thread.currentThread().isInterrupted()) {
+                    putX2Value(cache, rnd.nextInt(TOTAL_KEYS));

Review comment:
       Fixed.

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBufferLimitTest.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.cache.configuration.FactoryBuilder;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.systemview.view.ContinuousQueryView;
+import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.CQ_SYS_VIEW;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class CacheContinuousQueryBufferLimitTest extends GridCommonAbstractTest {
+    /** Cache partitions count. */
+    private static final int PARTS = 1;
+
+    /** Total number of cache keys. */
+    private static final int TOTAL_KEYS = 1024;
+
+    /** Number of pending entries.  */
+    private static final int PENDING_LIMIT = 1100;
+
+    /** Timeout to wait for pending buffer overflow. */
+    private static final long OVERFLOW_TIMEOUT_MS = 15_000L;
+
+    /** Default remote no-op filter. */
+    private static final CacheEntryEventSerializableFilter<Integer, Integer> RMT_FILTER = e -> true;
+
+    /** Counter of cache messages being send. */
+    private final AtomicInteger msgCntr = new AtomicInteger();
+
+    /** Cache mode. */
+    @Parameterized.Parameter(0)
+    public CacheMode cacheMode;
+
+    /** Cache atomicity mode. */
+    @Parameterized.Parameter(1)
+    public CacheAtomicityMode atomicityMode;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "cacheMode={0}, atomicityMode={1}")
+    public static Collection<?> parameters() {
+        return Arrays.asList(new Object[][] {
+            {REPLICATED, ATOMIC},
+            {REPLICATED, TRANSACTIONAL},
+            {PARTITIONED, ATOMIC},
+            {PARTITIONED, TRANSACTIONAL}
+        });
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testContinuousQueryBatchSwitchOnAck() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10);
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    @WithSystemProperty(key = "IGNITE_CONTINUOUS_QUERY_PENDING_BUFF_SIZE", value = "1000")

Review comment:
       FIxed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf merged pull request #7881: IGNITE-10959: Continuous query pending buffer limit

Posted by GitBox <gi...@apache.org>.
Mmuzaf merged pull request #7881:
URL: https://github.com/apache/ignite/pull/7881


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #7881: IGNITE-10959: Continuous query pending buffer limit

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #7881:
URL: https://github.com/apache/ignite/pull/7881#discussion_r451493608



##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBufferLimitTest.java
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.event.CacheEntryEvent;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.GridAtomicLong;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.systemview.view.ContinuousQueryView;
+import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEventBuffer.MAX_PENDING_BUFF_SIZE;
+import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.CQ_SYS_VIEW;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class CacheContinuousQueryBufferLimitTest extends GridCommonAbstractTest {
+    /** Cache partitions count. */
+    private static final int PARTS = 1;
+
+    /** Total number of cache keys. */
+    private static final int TOTAL_KEYS = 1024;
+
+    /** Timeout to wait for pending buffer overflow. */
+    private static final long OVERFLOW_TIMEOUT_MS = 10_000L;
+
+    /** Default remote no-op filter. */
+    private static final CacheEntryEventSerializableFilter<Integer, Integer> RMT_FILTER = e -> true;
+
+    /** Counter of cache messages being send. */
+    private final AtomicInteger msgCntr = new AtomicInteger();
+
+    /** Cache mode. */
+    @Parameterized.Parameter(0)
+    public CacheMode cacheMode;
+
+    /** Cache atomicity mode. */
+    @Parameterized.Parameter(1)
+    public CacheAtomicityMode atomicityMode;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "cacheMode={0}, atomicityMode={1}")
+    public static Collection<?> parameters() {
+        return Arrays.asList(new Object[][] {
+            {REPLICATED, ATOMIC},
+            {REPLICATED, TRANSACTIONAL},
+            {PARTITIONED, ATOMIC},
+            {PARTITIONED, TRANSACTIONAL}
+        });
+    }
+
+    /**
+     * Local pending limit for this test is less than MAX_PENDING_BUFF_SIZE,
+     * so pending entries must be cleaned prior to reaching it.
+     *
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testContinuousQueryBatchSwitchOnAck() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10, MAX_PENDING_BUFF_SIZE / 10);
+    }
+
+    /**
+     * The test blocks switching current CacheContinuousQueryEventBuffer.Batch to the new one, so
+     * pending entries will be processed (dropped on backups and send to the client on primaries)
+     * when the MAX_PENDING_BUFF_SIZE is reached.
+     *
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testContinuousQueryPendingBufferLimit() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            (msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10) ||
+                msg instanceof CacheContinuousQueryBatchAck, (int)(MAX_PENDING_BUFF_SIZE * 1.1));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testPendingSendToClientOnLimitReached() throws Exception {
+        AtomicInteger keys = new AtomicInteger();
+        AtomicReference<String> err = new AtomicReference<>();
+
+        IgniteEx srv = startGrids(2);
+        IgniteEx clnt = startClientGrid();
+
+        CacheEntryEventSerializableFilter<Integer, Integer> filter = new CacheEntryEventSerializableFilter<Integer,Integer>() {
+            @Override public boolean evaluate(CacheEntryEvent<? extends Integer,? extends Integer> evt) {
+                return evt.getKey() % 2 == 0;
+            }
+        };
+
+        ContinuousQuery<Integer, Integer> cq = new ContinuousQuery<>();
+        cq.setRemoteFilterFactory(FactoryBuilder.factoryOf(filter));
+        cq.setLocalListener((events) -> events.forEach(e -> {
+            if (!filter.evaluate(e))
+                err.compareAndSet(null, "Key must be filtered [e=" + e + ']');
+        }));
+        cq.setLocal(false);
+
+        spi(srv).blockMessages((nodeId, msg) -> msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10);
+        spi(clnt).blockMessages((nodeId, msg) -> msg instanceof CacheContinuousQueryBatchAck);
+
+        IgniteInternalFuture<?> loadFut = null;
+
+        try (QueryCursor<?> qry = clnt.cache(DEFAULT_CACHE_NAME).query(cq)) {
+            awaitPartitionMapExchange();
+
+            loadFut = GridTestUtils.runMultiThreadedAsync(() -> {
+                while (!Thread.currentThread().isInterrupted())
+                    clnt.cache(DEFAULT_CACHE_NAME).put(keys.incrementAndGet(), 0);
+            }, 3, "cq-put-");
+
+            ConcurrentMap<Long, CacheContinuousQueryEntry> pending =
+                getContinuousQueryPendingBuffer(grid(0), CU.cacheId(DEFAULT_CACHE_NAME), 0);
+
+            waitForCondition(() -> pending.size() > MAX_PENDING_BUFF_SIZE, 15_000);

Review comment:
       We will wait for `15_000`only if an error occurred. Other things fixed as you suggested privately. `waitForCondition` exits when the number of inserted entries achieved.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #7881: IGNITE-10959: Continuous query pending buffer limit

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #7881:
URL: https://github.com/apache/ignite/pull/7881#discussion_r451034133



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
##########
@@ -99,6 +99,10 @@
     static final int LSNR_MAX_BUF_SIZE =
         IgniteSystemProperties.getInteger("IGNITE_CONTINUOUS_QUERY_LISTENER_MAX_BUFFER_SIZE", 10_000);
 
+    /** Maximum size of buffer for pending events. Default value is {@code 10_000}. */
+    public final int maxPendingBuffSize =

Review comment:
       Why do we need this field? It's not used.

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBufferLimitTest.java
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.event.CacheEntryEvent;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.GridAtomicLong;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.systemview.view.ContinuousQueryView;
+import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEventBuffer.MAX_PENDING_BUFF_SIZE;
+import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.CQ_SYS_VIEW;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class CacheContinuousQueryBufferLimitTest extends GridCommonAbstractTest {
+    /** Cache partitions count. */
+    private static final int PARTS = 1;
+
+    /** Total number of cache keys. */
+    private static final int TOTAL_KEYS = 1024;
+
+    /** Timeout to wait for pending buffer overflow. */
+    private static final long OVERFLOW_TIMEOUT_MS = 10_000L;
+
+    /** Default remote no-op filter. */
+    private static final CacheEntryEventSerializableFilter<Integer, Integer> RMT_FILTER = e -> true;
+
+    /** Counter of cache messages being send. */
+    private final AtomicInteger msgCntr = new AtomicInteger();
+
+    /** Cache mode. */
+    @Parameterized.Parameter(0)
+    public CacheMode cacheMode;
+
+    /** Cache atomicity mode. */
+    @Parameterized.Parameter(1)
+    public CacheAtomicityMode atomicityMode;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "cacheMode={0}, atomicityMode={1}")
+    public static Collection<?> parameters() {
+        return Arrays.asList(new Object[][] {
+            {REPLICATED, ATOMIC},
+            {REPLICATED, TRANSACTIONAL},
+            {PARTITIONED, ATOMIC},
+            {PARTITIONED, TRANSACTIONAL}
+        });
+    }
+
+    /**
+     * Local pending limit for this test is less than MAX_PENDING_BUFF_SIZE,
+     * so pending entries must be cleaned prior to reaching it.
+     *
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testContinuousQueryBatchSwitchOnAck() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10, MAX_PENDING_BUFF_SIZE / 10);
+    }
+
+    /**
+     * The test blocks switching current CacheContinuousQueryEventBuffer.Batch to the new one, so
+     * pending entries will be processed (dropped on backups and send to the client on primaries)
+     * when the MAX_PENDING_BUFF_SIZE is reached.
+     *
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testContinuousQueryPendingBufferLimit() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            (msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10) ||
+                msg instanceof CacheContinuousQueryBatchAck, (int)(MAX_PENDING_BUFF_SIZE * 1.1));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testPendingSendToClientOnLimitReached() throws Exception {
+        AtomicInteger keys = new AtomicInteger();
+        AtomicReference<String> err = new AtomicReference<>();
+
+        IgniteEx srv = startGrids(2);
+        IgniteEx clnt = startClientGrid();
+
+        CacheEntryEventSerializableFilter<Integer, Integer> filter = new CacheEntryEventSerializableFilter<Integer,Integer>() {
+            @Override public boolean evaluate(CacheEntryEvent<? extends Integer,? extends Integer> evt) {
+                return evt.getKey() % 2 == 0;
+            }
+        };
+
+        ContinuousQuery<Integer, Integer> cq = new ContinuousQuery<>();
+        cq.setRemoteFilterFactory(FactoryBuilder.factoryOf(filter));
+        cq.setLocalListener((events) -> events.forEach(e -> {
+            if (!filter.evaluate(e))
+                err.compareAndSet(null, "Key must be filtered [e=" + e + ']');
+        }));
+        cq.setLocal(false);
+
+        spi(srv).blockMessages((nodeId, msg) -> msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10);
+        spi(clnt).blockMessages((nodeId, msg) -> msg instanceof CacheContinuousQueryBatchAck);
+
+        IgniteInternalFuture<?> loadFut = null;
+
+        try (QueryCursor<?> qry = clnt.cache(DEFAULT_CACHE_NAME).query(cq)) {
+            awaitPartitionMapExchange();
+
+            loadFut = GridTestUtils.runMultiThreadedAsync(() -> {
+                while (!Thread.currentThread().isInterrupted())
+                    clnt.cache(DEFAULT_CACHE_NAME).put(keys.incrementAndGet(), 0);
+            }, 3, "cq-put-");
+
+            ConcurrentMap<Long, CacheContinuousQueryEntry> pending =
+                getContinuousQueryPendingBuffer(grid(0), CU.cacheId(DEFAULT_CACHE_NAME), 0);
+
+            waitForCondition(() -> pending.size() > MAX_PENDING_BUFF_SIZE, 15_000);
+
+            // Check all entries greater than limit filtered correctly.
+            waitForCondition(() -> keys.get() > MAX_PENDING_BUFF_SIZE * 2, 15_000);

Review comment:
       I can't match comment and condition. What do you mean here?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -172,21 +204,51 @@ private Object process0(long cntr, CacheContinuousQueryEntry entry, boolean back
                 if (res == RETRY)
                     continue;
             }
-            else
+            else {
+                if (batch.endCntr < ackedUpdCntr.get())

Review comment:
       You should move this condition out from here, or at least continue the cycle after `tryRollOver`. 
   For example, if you have acked counter = 100 and cntr == 101 entry should not go to the pending entries map, instead it should go to current batch, since batch is rolled over. If you skip this entry in the current batch you will never switch this batch successfully, since first entry is missing in the batch and `lastProc` will never increment.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -154,6 +185,7 @@ private Object process0(long cntr, CacheContinuousQueryEntry entry, boolean back
         Object res = null;
 
         for (;;) {
+            // Set batch only if batch is null (first attempt).
             batch = initBatch(entry.topologyVersion(), backup);
 
             if (batch == null || cntr < batch.startCntr) {

Review comment:
       We can skip `backupQ.add()` if we already have acked counter more then current counter.

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBufferLimitTest.java
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.event.CacheEntryEvent;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.GridAtomicLong;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.systemview.view.ContinuousQueryView;
+import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEventBuffer.MAX_PENDING_BUFF_SIZE;
+import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.CQ_SYS_VIEW;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class CacheContinuousQueryBufferLimitTest extends GridCommonAbstractTest {
+    /** Cache partitions count. */
+    private static final int PARTS = 1;
+
+    /** Total number of cache keys. */
+    private static final int TOTAL_KEYS = 1024;
+
+    /** Timeout to wait for pending buffer overflow. */
+    private static final long OVERFLOW_TIMEOUT_MS = 10_000L;
+
+    /** Default remote no-op filter. */
+    private static final CacheEntryEventSerializableFilter<Integer, Integer> RMT_FILTER = e -> true;
+
+    /** Counter of cache messages being send. */
+    private final AtomicInteger msgCntr = new AtomicInteger();
+
+    /** Cache mode. */
+    @Parameterized.Parameter(0)
+    public CacheMode cacheMode;
+
+    /** Cache atomicity mode. */
+    @Parameterized.Parameter(1)
+    public CacheAtomicityMode atomicityMode;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "cacheMode={0}, atomicityMode={1}")
+    public static Collection<?> parameters() {
+        return Arrays.asList(new Object[][] {
+            {REPLICATED, ATOMIC},
+            {REPLICATED, TRANSACTIONAL},
+            {PARTITIONED, ATOMIC},
+            {PARTITIONED, TRANSACTIONAL}
+        });
+    }
+
+    /**
+     * Local pending limit for this test is less than MAX_PENDING_BUFF_SIZE,
+     * so pending entries must be cleaned prior to reaching it.
+     *
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testContinuousQueryBatchSwitchOnAck() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10, MAX_PENDING_BUFF_SIZE / 10);
+    }
+
+    /**
+     * The test blocks switching current CacheContinuousQueryEventBuffer.Batch to the new one, so
+     * pending entries will be processed (dropped on backups and send to the client on primaries)
+     * when the MAX_PENDING_BUFF_SIZE is reached.
+     *
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testContinuousQueryPendingBufferLimit() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            (msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10) ||
+                msg instanceof CacheContinuousQueryBatchAck, (int)(MAX_PENDING_BUFF_SIZE * 1.1));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testPendingSendToClientOnLimitReached() throws Exception {
+        AtomicInteger keys = new AtomicInteger();
+        AtomicReference<String> err = new AtomicReference<>();
+
+        IgniteEx srv = startGrids(2);
+        IgniteEx clnt = startClientGrid();
+
+        CacheEntryEventSerializableFilter<Integer, Integer> filter = new CacheEntryEventSerializableFilter<Integer,Integer>() {
+            @Override public boolean evaluate(CacheEntryEvent<? extends Integer,? extends Integer> evt) {
+                return evt.getKey() % 2 == 0;
+            }
+        };
+
+        ContinuousQuery<Integer, Integer> cq = new ContinuousQuery<>();
+        cq.setRemoteFilterFactory(FactoryBuilder.factoryOf(filter));
+        cq.setLocalListener((events) -> events.forEach(e -> {
+            if (!filter.evaluate(e))
+                err.compareAndSet(null, "Key must be filtered [e=" + e + ']');
+        }));
+        cq.setLocal(false);
+
+        spi(srv).blockMessages((nodeId, msg) -> msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10);
+        spi(clnt).blockMessages((nodeId, msg) -> msg instanceof CacheContinuousQueryBatchAck);
+
+        IgniteInternalFuture<?> loadFut = null;
+
+        try (QueryCursor<?> qry = clnt.cache(DEFAULT_CACHE_NAME).query(cq)) {
+            awaitPartitionMapExchange();
+
+            loadFut = GridTestUtils.runMultiThreadedAsync(() -> {
+                while (!Thread.currentThread().isInterrupted())
+                    clnt.cache(DEFAULT_CACHE_NAME).put(keys.incrementAndGet(), 0);
+            }, 3, "cq-put-");
+
+            ConcurrentMap<Long, CacheContinuousQueryEntry> pending =
+                getContinuousQueryPendingBuffer(grid(0), CU.cacheId(DEFAULT_CACHE_NAME), 0);
+
+            waitForCondition(() -> pending.size() > MAX_PENDING_BUFF_SIZE, 15_000);

Review comment:
       We should assertFalse here. 
   
   But personally I don't like the idea with such big timeouts. I've run new tests and it takes about 3 minutes, and this time will be about the same on any hardware due to timeouts.
   Can we rewrite the tests using keys count limit instead of timeouts? I think `MAX_PENDING_BUFF_SIZE * 3` will be enough for this test. Instead of "wait for condition" you can replace "pending" field by your own map which raise some error flag, for example, on put when size() is more than `MAX_PENDING_BUFF_SIZE`. The same for other tests.
   
   Also in current implementation there is no synchronization between `pendingCurrSize.get() > MAX_PENDING_BUFF_SIZE` and `pendingCurrSize.incrementAndGet()`, so due to races you can get sometimes values more than limit (but no more than limit + threads count). 

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBufferLimitTest.java
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.event.CacheEntryEvent;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.GridAtomicLong;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.systemview.view.ContinuousQueryView;
+import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEventBuffer.MAX_PENDING_BUFF_SIZE;
+import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.CQ_SYS_VIEW;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class CacheContinuousQueryBufferLimitTest extends GridCommonAbstractTest {
+    /** Cache partitions count. */
+    private static final int PARTS = 1;
+
+    /** Total number of cache keys. */
+    private static final int TOTAL_KEYS = 1024;
+
+    /** Timeout to wait for pending buffer overflow. */
+    private static final long OVERFLOW_TIMEOUT_MS = 10_000L;
+
+    /** Default remote no-op filter. */
+    private static final CacheEntryEventSerializableFilter<Integer, Integer> RMT_FILTER = e -> true;
+
+    /** Counter of cache messages being send. */
+    private final AtomicInteger msgCntr = new AtomicInteger();
+
+    /** Cache mode. */
+    @Parameterized.Parameter(0)
+    public CacheMode cacheMode;
+
+    /** Cache atomicity mode. */
+    @Parameterized.Parameter(1)
+    public CacheAtomicityMode atomicityMode;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "cacheMode={0}, atomicityMode={1}")
+    public static Collection<?> parameters() {
+        return Arrays.asList(new Object[][] {
+            {REPLICATED, ATOMIC},
+            {REPLICATED, TRANSACTIONAL},
+            {PARTITIONED, ATOMIC},
+            {PARTITIONED, TRANSACTIONAL}
+        });
+    }
+
+    /**
+     * Local pending limit for this test is less than MAX_PENDING_BUFF_SIZE,
+     * so pending entries must be cleaned prior to reaching it.
+     *
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testContinuousQueryBatchSwitchOnAck() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10, MAX_PENDING_BUFF_SIZE / 10);
+    }
+
+    /**
+     * The test blocks switching current CacheContinuousQueryEventBuffer.Batch to the new one, so
+     * pending entries will be processed (dropped on backups and send to the client on primaries)
+     * when the MAX_PENDING_BUFF_SIZE is reached.
+     *
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testContinuousQueryPendingBufferLimit() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            (msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10) ||
+                msg instanceof CacheContinuousQueryBatchAck, (int)(MAX_PENDING_BUFF_SIZE * 1.1));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testPendingSendToClientOnLimitReached() throws Exception {
+        AtomicInteger keys = new AtomicInteger();
+        AtomicReference<String> err = new AtomicReference<>();
+
+        IgniteEx srv = startGrids(2);
+        IgniteEx clnt = startClientGrid();
+
+        CacheEntryEventSerializableFilter<Integer, Integer> filter = new CacheEntryEventSerializableFilter<Integer,Integer>() {

Review comment:
       Can be simplified with lambda `evt -> evt.getKey() % 2 == 0`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -172,21 +204,51 @@ private Object process0(long cntr, CacheContinuousQueryEntry entry, boolean back
                 if (res == RETRY)
                     continue;
             }
-            else
+            else {
+                if (batch.endCntr < ackedUpdCntr.get())
+                    batch.tryRollOver(entry.topologyVersion());
+
+                if (pendingCurrSize.get() > MAX_PENDING_BUFF_SIZE) {

Review comment:
       You should use some synchronization here, only one thread at a time should be able to remove pending entries. In other case you can iterate concurrently on the same entry, send duplicate to the client, call `remove` and decrement size twice (however only one entry is removed)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #7881: IGNITE-10959: Continuous query pending buffer limit

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #7881:
URL: https://github.com/apache/ignite/pull/7881#discussion_r451388565



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -172,21 +204,51 @@ private Object process0(long cntr, CacheContinuousQueryEntry entry, boolean back
                 if (res == RETRY)
                     continue;
             }
-            else
+            else {
+                if (batch.endCntr < ackedUpdCntr.get())

Review comment:
       Agree, `RETRY` added.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #7881: IGNITE-10959: Continuous query pending buffer limit

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #7881:
URL: https://github.com/apache/ignite/pull/7881#discussion_r451494192



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -154,6 +185,7 @@ private Object process0(long cntr, CacheContinuousQueryEntry entry, boolean back
         Object res = null;
 
         for (;;) {
+            // Set batch only if batch is null (first attempt).
             batch = initBatch(entry.topologyVersion(), backup);
 
             if (batch == null || cntr < batch.startCntr) {

Review comment:
       I'll fix it under - https://issues.apache.org/jira/browse/IGNITE-13126




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #7881: IGNITE-10959: Continuous query pending buffer limit

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #7881:
URL: https://github.com/apache/ignite/pull/7881#discussion_r443679969



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -154,6 +191,7 @@ private Object process0(long cntr, CacheContinuousQueryEntry entry, boolean back
         Object res = null;
 
         for (;;) {
+            // Set bach only if batch is null (first attempt).

Review comment:
       Fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #7881: IGNITE-10959: Continuous query pending buffer limit

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #7881:
URL: https://github.com/apache/ignite/pull/7881#discussion_r451390459



##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBufferLimitTest.java
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.event.CacheEntryEvent;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.GridAtomicLong;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.systemview.view.ContinuousQueryView;
+import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEventBuffer.MAX_PENDING_BUFF_SIZE;
+import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.CQ_SYS_VIEW;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class CacheContinuousQueryBufferLimitTest extends GridCommonAbstractTest {
+    /** Cache partitions count. */
+    private static final int PARTS = 1;
+
+    /** Total number of cache keys. */
+    private static final int TOTAL_KEYS = 1024;
+
+    /** Timeout to wait for pending buffer overflow. */
+    private static final long OVERFLOW_TIMEOUT_MS = 10_000L;
+
+    /** Default remote no-op filter. */
+    private static final CacheEntryEventSerializableFilter<Integer, Integer> RMT_FILTER = e -> true;
+
+    /** Counter of cache messages being send. */
+    private final AtomicInteger msgCntr = new AtomicInteger();
+
+    /** Cache mode. */
+    @Parameterized.Parameter(0)
+    public CacheMode cacheMode;
+
+    /** Cache atomicity mode. */
+    @Parameterized.Parameter(1)
+    public CacheAtomicityMode atomicityMode;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "cacheMode={0}, atomicityMode={1}")
+    public static Collection<?> parameters() {
+        return Arrays.asList(new Object[][] {
+            {REPLICATED, ATOMIC},
+            {REPLICATED, TRANSACTIONAL},
+            {PARTITIONED, ATOMIC},
+            {PARTITIONED, TRANSACTIONAL}
+        });
+    }
+
+    /**
+     * Local pending limit for this test is less than MAX_PENDING_BUFF_SIZE,
+     * so pending entries must be cleaned prior to reaching it.
+     *
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testContinuousQueryBatchSwitchOnAck() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10, MAX_PENDING_BUFF_SIZE / 10);
+    }
+
+    /**
+     * The test blocks switching current CacheContinuousQueryEventBuffer.Batch to the new one, so
+     * pending entries will be processed (dropped on backups and send to the client on primaries)
+     * when the MAX_PENDING_BUFF_SIZE is reached.
+     *
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testContinuousQueryPendingBufferLimit() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            (msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10) ||
+                msg instanceof CacheContinuousQueryBatchAck, (int)(MAX_PENDING_BUFF_SIZE * 1.1));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testPendingSendToClientOnLimitReached() throws Exception {
+        AtomicInteger keys = new AtomicInteger();
+        AtomicReference<String> err = new AtomicReference<>();
+
+        IgniteEx srv = startGrids(2);
+        IgniteEx clnt = startClientGrid();
+
+        CacheEntryEventSerializableFilter<Integer, Integer> filter = new CacheEntryEventSerializableFilter<Integer,Integer>() {

Review comment:
       This `filter` used twice: check that all accepted by client events are filtered (on buffer overflowed we send entries to from the server to the client), and also used as a filter condition. So, I don't think it can be simplified.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #7881: IGNITE-10959: Continuous query pending buffer limit

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #7881:
URL: https://github.com/apache/ignite/pull/7881#discussion_r451669851



##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBufferLimitTest.java
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.event.CacheEntryEvent;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.GridAtomicLong;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.systemview.view.ContinuousQueryView;
+import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEventBuffer.MAX_PENDING_BUFF_SIZE;
+import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.CQ_SYS_VIEW;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class CacheContinuousQueryBufferLimitTest extends GridCommonAbstractTest {
+    /** Cache partitions count. */
+    private static final int PARTS = 1;
+
+    /** Total number of cache keys. */
+    private static final int TOTAL_KEYS = 1024;
+
+    /** Timeout to wait for pending buffer overflow. */
+    private static final long OVERFLOW_TIMEOUT_MS = 10_000L;
+
+    /** Default remote no-op filter. */
+    private static final CacheEntryEventSerializableFilter<Integer, Integer> RMT_FILTER = e -> true;
+
+    /** Counter of cache messages being send. */
+    private final AtomicInteger msgCntr = new AtomicInteger();
+
+    /** Cache mode. */
+    @Parameterized.Parameter(0)
+    public CacheMode cacheMode;
+
+    /** Cache atomicity mode. */
+    @Parameterized.Parameter(1)
+    public CacheAtomicityMode atomicityMode;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "cacheMode={0}, atomicityMode={1}")
+    public static Collection<?> parameters() {
+        return Arrays.asList(new Object[][] {
+            {REPLICATED, ATOMIC},
+            {REPLICATED, TRANSACTIONAL},
+            {PARTITIONED, ATOMIC},
+            {PARTITIONED, TRANSACTIONAL}
+        });
+    }
+
+    /**
+     * Local pending limit for this test is less than MAX_PENDING_BUFF_SIZE,
+     * so pending entries must be cleaned prior to reaching it.
+     *
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testContinuousQueryBatchSwitchOnAck() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10, MAX_PENDING_BUFF_SIZE / 10);
+    }
+
+    /**
+     * The test blocks switching current CacheContinuousQueryEventBuffer.Batch to the new one, so
+     * pending entries will be processed (dropped on backups and send to the client on primaries)
+     * when the MAX_PENDING_BUFF_SIZE is reached.
+     *
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testContinuousQueryPendingBufferLimit() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            (msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10) ||
+                msg instanceof CacheContinuousQueryBatchAck, (int)(MAX_PENDING_BUFF_SIZE * 1.1));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testPendingSendToClientOnLimitReached() throws Exception {
+        AtomicInteger keys = new AtomicInteger();
+        AtomicReference<String> err = new AtomicReference<>();
+
+        IgniteEx srv = startGrids(2);
+        IgniteEx clnt = startClientGrid();
+
+        CacheEntryEventSerializableFilter<Integer, Integer> filter = new CacheEntryEventSerializableFilter<Integer,Integer>() {

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #7881: IGNITE-10959: Continuous query pending buffer limit

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #7881:
URL: https://github.com/apache/ignite/pull/7881#discussion_r451388265



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -26,40 +26,69 @@
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.GridAtomicLong;
+import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.util.deque.FastSizeDeque;
 import org.jetbrains.annotations.Nullable;
 
 /**
  *
  */
 public class CacheContinuousQueryEventBuffer {
-    /** */
+    /** Maximum size of buffer for pending events. Default value is {@code 10_000}. */
+    public static final int MAX_PENDING_BUFF_SIZE =
+        IgniteSystemProperties.getInteger("IGNITE_CONTINUOUS_QUERY_PENDING_BUFF_SIZE", 10_000);
+
+    /** Batch buffer size. */
     private static final int BUF_SIZE =
         IgniteSystemProperties.getInteger("IGNITE_CONTINUOUS_QUERY_SERVER_BUFFER_SIZE", 1000);
 
     /** */
     private static final Object RETRY = new Object();
 
-    /** */
-    protected final int part;
+    /** Continuous query category logger. */
+    private final IgniteLogger log;
 
     /** */
-    private AtomicReference<Batch> curBatch = new AtomicReference<>();
+    private final int part;
 
-    /** */
-    private FastSizeDeque<CacheContinuousQueryEntry> backupQ = new FastSizeDeque<>(new ConcurrentLinkedDeque<>());
+    /** Batch of entries currently being collected to send to the remote. */
+    private final AtomicReference<Batch> curBatch = new AtomicReference<>();
 
-    /** */
-    private ConcurrentSkipListMap<Long, CacheContinuousQueryEntry> pending = new ConcurrentSkipListMap<>();
+    /** Queue for keeping backup entries which partition counter less the counter processing by current batch. */
+    private final FastSizeDeque<CacheContinuousQueryEntry> backupQ = new FastSizeDeque<>(new ConcurrentLinkedDeque<>());
+
+    /** Entries which are waiting for being processed. */
+    private final ConcurrentSkipListMap<Long, CacheContinuousQueryEntry> pending = new ConcurrentSkipListMap<>();
+
+    /**
+     * The size method of the pending ConcurrentSkipListMap is not a constant-time operation. Since each

Review comment:
       Discussed privately. jdk8 and jdk11 has different `size()` method implementation, so for the main use-case when the buffer is overflowed it's better to use AtomicInteger + synchronization on `pending` to eliminate all potential issues.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #7881: IGNITE-10959: Continuous query pending buffer limit

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #7881:
URL: https://github.com/apache/ignite/pull/7881#discussion_r450162322



##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBufferLimitTest.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.cache.configuration.FactoryBuilder;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.systemview.view.ContinuousQueryView;
+import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.CQ_SYS_VIEW;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class CacheContinuousQueryBufferLimitTest extends GridCommonAbstractTest {
+    /** Cache partitions count. */
+    private static final int PARTS = 1;
+
+    /** Total number of cache keys. */
+    private static final int TOTAL_KEYS = 1024;
+
+    /** Number of pending entries.  */
+    private static final int PENDING_LIMIT = 1100;
+
+    /** Timeout to wait for pending buffer overflow. */
+    private static final long OVERFLOW_TIMEOUT_MS = 15_000L;
+
+    /** Default remote no-op filter. */
+    private static final CacheEntryEventSerializableFilter<Integer, Integer> RMT_FILTER = e -> true;
+
+    /** Counter of cache messages being send. */
+    private final AtomicInteger msgCntr = new AtomicInteger();
+
+    /** Cache mode. */
+    @Parameterized.Parameter(0)
+    public CacheMode cacheMode;
+
+    /** Cache atomicity mode. */
+    @Parameterized.Parameter(1)
+    public CacheAtomicityMode atomicityMode;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "cacheMode={0}, atomicityMode={1}")
+    public static Collection<?> parameters() {
+        return Arrays.asList(new Object[][] {
+            {REPLICATED, ATOMIC},
+            {REPLICATED, TRANSACTIONAL},
+            {PARTITIONED, ATOMIC},
+            {PARTITIONED, TRANSACTIONAL}
+        });
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testContinuousQueryBatchSwitchOnAck() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10);
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    @WithSystemProperty(key = "IGNITE_CONTINUOUS_QUERY_PENDING_BUFF_SIZE", value = "1000")
+    public void testContinuousQueryPendingBufferLimit() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            (msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10) ||
+                msg instanceof CacheContinuousQueryBatchAck);
+    }
+
+
+    /** {@inheritDoc} */
+    @Override public IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setCommunicationSpi(new TestRecordingCommunicationSpi())
+            .setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+                .setAtomicityMode(atomicityMode)
+                .setCacheMode(cacheMode)
+                .setBackups(1)
+                .setAffinity(new RendezvousAffinityFunction(false, PARTS)));
+    }
+
+    /** */
+    @Before
+    public void resetMessageCounter() {
+        msgCntr.set(0);
+    }
+
+    /** */
+    @After
+    public void stopAllInstances() {
+        stopAllGrids();
+    }
+
+    /**
+     * @param locBlockPred Block predicate on local node to emulate message delivery issues.
+     * @throws Exception If fails.
+     */
+    private void doTestContinuousQueryPendingBufferLimit(
+        IgniteBiPredicate<ClusterNode, Message> locBlockPred
+    ) throws Exception
+    {
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        IgniteEx locIgnite = startGrid(0);
+        IgniteEx rmtIgnite = startGrid(1);
+
+        IgniteCache<Integer, Integer> cache = locIgnite.cache(DEFAULT_CACHE_NAME);
+        CacheConfiguration<Integer, Integer> ccfg = cache.getConfiguration(CacheConfiguration.class);
+
+        for (int i = 0; i < TOTAL_KEYS; i++)
+            cache.put(i, i);
+
+        assertEquals(PARTS, ccfg.getAffinity().partitions());
+
+        AtomicLong lastAcked = new AtomicLong();
+
+        ContinuousQuery<Integer, Integer> cq = new ContinuousQuery<>();
+        cq.setRemoteFilterFactory(FactoryBuilder.factoryOf(RMT_FILTER));
+        cq.setLocalListener((events) ->
+            events.forEach(e ->
+                lastAcked.getAndUpdate(c ->
+                    Math.max(c, ((CacheQueryEntryEvent<?, ?>)e).getPartitionUpdateCounter()))));
+        cq.setLocal(false);
+
+        IgniteInternalFuture<?> updFut = null;
+
+        try (QueryCursor<?> qry = locIgnite.cache(DEFAULT_CACHE_NAME).query(cq)) {
+            awaitPartitionMapExchange();
+
+            for (int j = 0; j < TOTAL_KEYS; j++)
+                putX2Value(cache, rnd.nextInt(TOTAL_KEYS));
+
+            SystemView<ContinuousQueryView> rmtQryView = rmtIgnite.context().systemView().view(CQ_SYS_VIEW);
+            assertEquals(1, rmtQryView.size());
+
+            UUID routineId = rmtQryView.iterator().next().routineId();
+
+            // Partition Id, Update Counter, Continuous Entry.
+            ConcurrentMap<Long, CacheContinuousQueryEntry> pending =
+                getContinuousQueryPendingBuffer(rmtIgnite, routineId, CU.cacheId(DEFAULT_CACHE_NAME), 0);
+
+            spi(locIgnite).blockMessages(locBlockPred);
+
+            updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+                while (!Thread.currentThread().isInterrupted()) {
+                    putX2Value(cache, rnd.nextInt(TOTAL_KEYS));
+                }
+            }, 3, "cq-put-");
+
+            assertNotNull("Partition remote buffers must be inited", pending);
+
+            log.warning("Waiting for pending buffer being overflowed within " + OVERFLOW_TIMEOUT_MS + " ms.");
+
+            boolean await = waitForCondition(() -> pending.size() > PENDING_LIMIT, OVERFLOW_TIMEOUT_MS);

Review comment:
       Fixed.

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBufferLimitTest.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.cache.configuration.FactoryBuilder;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.systemview.view.ContinuousQueryView;
+import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.CQ_SYS_VIEW;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class CacheContinuousQueryBufferLimitTest extends GridCommonAbstractTest {
+    /** Cache partitions count. */
+    private static final int PARTS = 1;
+
+    /** Total number of cache keys. */
+    private static final int TOTAL_KEYS = 1024;
+
+    /** Number of pending entries.  */
+    private static final int PENDING_LIMIT = 1100;
+
+    /** Timeout to wait for pending buffer overflow. */
+    private static final long OVERFLOW_TIMEOUT_MS = 15_000L;
+
+    /** Default remote no-op filter. */
+    private static final CacheEntryEventSerializableFilter<Integer, Integer> RMT_FILTER = e -> true;
+
+    /** Counter of cache messages being send. */
+    private final AtomicInteger msgCntr = new AtomicInteger();
+
+    /** Cache mode. */
+    @Parameterized.Parameter(0)
+    public CacheMode cacheMode;
+
+    /** Cache atomicity mode. */
+    @Parameterized.Parameter(1)
+    public CacheAtomicityMode atomicityMode;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "cacheMode={0}, atomicityMode={1}")
+    public static Collection<?> parameters() {
+        return Arrays.asList(new Object[][] {
+            {REPLICATED, ATOMIC},
+            {REPLICATED, TRANSACTIONAL},
+            {PARTITIONED, ATOMIC},
+            {PARTITIONED, TRANSACTIONAL}
+        });
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testContinuousQueryBatchSwitchOnAck() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10);
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    @WithSystemProperty(key = "IGNITE_CONTINUOUS_QUERY_PENDING_BUFF_SIZE", value = "1000")
+    public void testContinuousQueryPendingBufferLimit() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            (msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10) ||
+                msg instanceof CacheContinuousQueryBatchAck);
+    }
+
+
+    /** {@inheritDoc} */
+    @Override public IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setCommunicationSpi(new TestRecordingCommunicationSpi())
+            .setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+                .setAtomicityMode(atomicityMode)
+                .setCacheMode(cacheMode)
+                .setBackups(1)
+                .setAffinity(new RendezvousAffinityFunction(false, PARTS)));
+    }
+
+    /** */
+    @Before
+    public void resetMessageCounter() {
+        msgCntr.set(0);
+    }
+
+    /** */
+    @After
+    public void stopAllInstances() {
+        stopAllGrids();
+    }
+
+    /**
+     * @param locBlockPred Block predicate on local node to emulate message delivery issues.
+     * @throws Exception If fails.
+     */
+    private void doTestContinuousQueryPendingBufferLimit(
+        IgniteBiPredicate<ClusterNode, Message> locBlockPred
+    ) throws Exception
+    {
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        IgniteEx locIgnite = startGrid(0);
+        IgniteEx rmtIgnite = startGrid(1);
+
+        IgniteCache<Integer, Integer> cache = locIgnite.cache(DEFAULT_CACHE_NAME);
+        CacheConfiguration<Integer, Integer> ccfg = cache.getConfiguration(CacheConfiguration.class);
+
+        for (int i = 0; i < TOTAL_KEYS; i++)
+            cache.put(i, i);
+
+        assertEquals(PARTS, ccfg.getAffinity().partitions());
+
+        AtomicLong lastAcked = new AtomicLong();
+
+        ContinuousQuery<Integer, Integer> cq = new ContinuousQuery<>();
+        cq.setRemoteFilterFactory(FactoryBuilder.factoryOf(RMT_FILTER));
+        cq.setLocalListener((events) ->
+            events.forEach(e ->
+                lastAcked.getAndUpdate(c ->

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #7881: IGNITE-10959: Continuous query pending buffer limit

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #7881:
URL: https://github.com/apache/ignite/pull/7881#discussion_r443681796



##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/AbstractCacheContinuousQueryBufferLimitTest.java
##########
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.cache.configuration.FactoryBuilder;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.systemview.view.ContinuousQueryView;
+import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.CQ_SYS_VIEW;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ *
+ */
+public abstract class AbstractCacheContinuousQueryBufferLimitTest extends GridCommonAbstractTest {

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #7881: IGNITE-10959: Continuous query pending buffer limit

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #7881:
URL: https://github.com/apache/ignite/pull/7881#discussion_r450162185



##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBufferLimitTest.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.cache.configuration.FactoryBuilder;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.systemview.view.ContinuousQueryView;
+import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.CQ_SYS_VIEW;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class CacheContinuousQueryBufferLimitTest extends GridCommonAbstractTest {
+    /** Cache partitions count. */
+    private static final int PARTS = 1;
+
+    /** Total number of cache keys. */
+    private static final int TOTAL_KEYS = 1024;
+
+    /** Number of pending entries.  */
+    private static final int PENDING_LIMIT = 1100;
+
+    /** Timeout to wait for pending buffer overflow. */
+    private static final long OVERFLOW_TIMEOUT_MS = 15_000L;
+
+    /** Default remote no-op filter. */
+    private static final CacheEntryEventSerializableFilter<Integer, Integer> RMT_FILTER = e -> true;
+
+    /** Counter of cache messages being send. */
+    private final AtomicInteger msgCntr = new AtomicInteger();
+
+    /** Cache mode. */
+    @Parameterized.Parameter(0)
+    public CacheMode cacheMode;
+
+    /** Cache atomicity mode. */
+    @Parameterized.Parameter(1)
+    public CacheAtomicityMode atomicityMode;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "cacheMode={0}, atomicityMode={1}")
+    public static Collection<?> parameters() {
+        return Arrays.asList(new Object[][] {
+            {REPLICATED, ATOMIC},
+            {REPLICATED, TRANSACTIONAL},
+            {PARTITIONED, ATOMIC},
+            {PARTITIONED, TRANSACTIONAL}
+        });
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testContinuousQueryBatchSwitchOnAck() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10);
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    @WithSystemProperty(key = "IGNITE_CONTINUOUS_QUERY_PENDING_BUFF_SIZE", value = "1000")
+    public void testContinuousQueryPendingBufferLimit() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            (msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10) ||
+                msg instanceof CacheContinuousQueryBatchAck);
+    }
+
+
+    /** {@inheritDoc} */
+    @Override public IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setCommunicationSpi(new TestRecordingCommunicationSpi())
+            .setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+                .setAtomicityMode(atomicityMode)
+                .setCacheMode(cacheMode)
+                .setBackups(1)
+                .setAffinity(new RendezvousAffinityFunction(false, PARTS)));
+    }
+
+    /** */
+    @Before
+    public void resetMessageCounter() {
+        msgCntr.set(0);
+    }
+
+    /** */
+    @After
+    public void stopAllInstances() {
+        stopAllGrids();
+    }
+
+    /**
+     * @param locBlockPred Block predicate on local node to emulate message delivery issues.
+     * @throws Exception If fails.
+     */
+    private void doTestContinuousQueryPendingBufferLimit(
+        IgniteBiPredicate<ClusterNode, Message> locBlockPred
+    ) throws Exception
+    {
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        IgniteEx locIgnite = startGrid(0);
+        IgniteEx rmtIgnite = startGrid(1);
+
+        IgniteCache<Integer, Integer> cache = locIgnite.cache(DEFAULT_CACHE_NAME);
+        CacheConfiguration<Integer, Integer> ccfg = cache.getConfiguration(CacheConfiguration.class);
+
+        for (int i = 0; i < TOTAL_KEYS; i++)
+            cache.put(i, i);
+
+        assertEquals(PARTS, ccfg.getAffinity().partitions());
+
+        AtomicLong lastAcked = new AtomicLong();
+
+        ContinuousQuery<Integer, Integer> cq = new ContinuousQuery<>();
+        cq.setRemoteFilterFactory(FactoryBuilder.factoryOf(RMT_FILTER));
+        cq.setLocalListener((events) ->
+            events.forEach(e ->
+                lastAcked.getAndUpdate(c ->

Review comment:
       Fixed.

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBufferLimitTest.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.cache.configuration.FactoryBuilder;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.systemview.view.ContinuousQueryView;
+import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.CQ_SYS_VIEW;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class CacheContinuousQueryBufferLimitTest extends GridCommonAbstractTest {
+    /** Cache partitions count. */
+    private static final int PARTS = 1;
+
+    /** Total number of cache keys. */
+    private static final int TOTAL_KEYS = 1024;
+
+    /** Number of pending entries.  */
+    private static final int PENDING_LIMIT = 1100;
+
+    /** Timeout to wait for pending buffer overflow. */
+    private static final long OVERFLOW_TIMEOUT_MS = 15_000L;
+
+    /** Default remote no-op filter. */
+    private static final CacheEntryEventSerializableFilter<Integer, Integer> RMT_FILTER = e -> true;
+
+    /** Counter of cache messages being send. */
+    private final AtomicInteger msgCntr = new AtomicInteger();
+
+    /** Cache mode. */
+    @Parameterized.Parameter(0)
+    public CacheMode cacheMode;
+
+    /** Cache atomicity mode. */
+    @Parameterized.Parameter(1)
+    public CacheAtomicityMode atomicityMode;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "cacheMode={0}, atomicityMode={1}")
+    public static Collection<?> parameters() {
+        return Arrays.asList(new Object[][] {
+            {REPLICATED, ATOMIC},
+            {REPLICATED, TRANSACTIONAL},
+            {PARTITIONED, ATOMIC},
+            {PARTITIONED, TRANSACTIONAL}
+        });
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testContinuousQueryBatchSwitchOnAck() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10);
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    @WithSystemProperty(key = "IGNITE_CONTINUOUS_QUERY_PENDING_BUFF_SIZE", value = "1000")
+    public void testContinuousQueryPendingBufferLimit() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            (msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10) ||
+                msg instanceof CacheContinuousQueryBatchAck);
+    }
+
+
+    /** {@inheritDoc} */
+    @Override public IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setCommunicationSpi(new TestRecordingCommunicationSpi())
+            .setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+                .setAtomicityMode(atomicityMode)
+                .setCacheMode(cacheMode)
+                .setBackups(1)
+                .setAffinity(new RendezvousAffinityFunction(false, PARTS)));
+    }
+
+    /** */
+    @Before
+    public void resetMessageCounter() {
+        msgCntr.set(0);
+    }
+
+    /** */
+    @After
+    public void stopAllInstances() {
+        stopAllGrids();
+    }
+
+    /**
+     * @param locBlockPred Block predicate on local node to emulate message delivery issues.
+     * @throws Exception If fails.
+     */
+    private void doTestContinuousQueryPendingBufferLimit(
+        IgniteBiPredicate<ClusterNode, Message> locBlockPred
+    ) throws Exception
+    {
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        IgniteEx locIgnite = startGrid(0);
+        IgniteEx rmtIgnite = startGrid(1);
+
+        IgniteCache<Integer, Integer> cache = locIgnite.cache(DEFAULT_CACHE_NAME);
+        CacheConfiguration<Integer, Integer> ccfg = cache.getConfiguration(CacheConfiguration.class);
+
+        for (int i = 0; i < TOTAL_KEYS; i++)
+            cache.put(i, i);
+
+        assertEquals(PARTS, ccfg.getAffinity().partitions());
+
+        AtomicLong lastAcked = new AtomicLong();
+
+        ContinuousQuery<Integer, Integer> cq = new ContinuousQuery<>();
+        cq.setRemoteFilterFactory(FactoryBuilder.factoryOf(RMT_FILTER));
+        cq.setLocalListener((events) ->
+            events.forEach(e ->
+                lastAcked.getAndUpdate(c ->
+                    Math.max(c, ((CacheQueryEntryEvent<?, ?>)e).getPartitionUpdateCounter()))));
+        cq.setLocal(false);
+
+        IgniteInternalFuture<?> updFut = null;
+
+        try (QueryCursor<?> qry = locIgnite.cache(DEFAULT_CACHE_NAME).query(cq)) {
+            awaitPartitionMapExchange();
+
+            for (int j = 0; j < TOTAL_KEYS; j++)
+                putX2Value(cache, rnd.nextInt(TOTAL_KEYS));
+
+            SystemView<ContinuousQueryView> rmtQryView = rmtIgnite.context().systemView().view(CQ_SYS_VIEW);
+            assertEquals(1, rmtQryView.size());
+
+            UUID routineId = rmtQryView.iterator().next().routineId();
+
+            // Partition Id, Update Counter, Continuous Entry.
+            ConcurrentMap<Long, CacheContinuousQueryEntry> pending =
+                getContinuousQueryPendingBuffer(rmtIgnite, routineId, CU.cacheId(DEFAULT_CACHE_NAME), 0);
+
+            spi(locIgnite).blockMessages(locBlockPred);
+
+            updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+                while (!Thread.currentThread().isInterrupted()) {
+                    putX2Value(cache, rnd.nextInt(TOTAL_KEYS));
+                }
+            }, 3, "cq-put-");
+
+            assertNotNull("Partition remote buffers must be inited", pending);
+
+            log.warning("Waiting for pending buffer being overflowed within " + OVERFLOW_TIMEOUT_MS + " ms.");
+
+            boolean await = waitForCondition(() -> pending.size() > PENDING_LIMIT, OVERFLOW_TIMEOUT_MS);
+
+            assertFalse("Pending buffer exceeded the limit despite entries have been acked " +
+                    "[lastAcked=" + lastAcked + ", pending=" + S.compact(pending.keySet(), i -> i + 1) + ']',
+                await);
+        }
+        finally {
+            spi(locIgnite).stopBlock();
+
+            if (updFut != null)
+                updFut.cancel();
+        }
+    }
+
+    /**
+     * @param cache Ignite cache.
+     * @param key Key to change.
+     */
+    private static void putX2Value(IgniteCache<Integer, Integer> cache, int key) {

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #7881: IGNITE-10959: Continuous query pending buffer limit

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #7881:
URL: https://github.com/apache/ignite/pull/7881#discussion_r451628509



##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBufferLimitTest.java
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.event.CacheEntryEvent;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.GridAtomicLong;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.systemview.view.ContinuousQueryView;
+import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEventBuffer.MAX_PENDING_BUFF_SIZE;
+import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.CQ_SYS_VIEW;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class CacheContinuousQueryBufferLimitTest extends GridCommonAbstractTest {
+    /** Cache partitions count. */
+    private static final int PARTS = 1;
+
+    /** Total number of cache keys. */
+    private static final int TOTAL_KEYS = 1024;
+
+    /** Timeout to wait for pending buffer overflow. */
+    private static final long OVERFLOW_TIMEOUT_MS = 10_000L;
+
+    /** Default remote no-op filter. */
+    private static final CacheEntryEventSerializableFilter<Integer, Integer> RMT_FILTER = e -> true;
+
+    /** Counter of cache messages being send. */
+    private final AtomicInteger msgCntr = new AtomicInteger();
+
+    /** Cache mode. */
+    @Parameterized.Parameter(0)
+    public CacheMode cacheMode;
+
+    /** Cache atomicity mode. */
+    @Parameterized.Parameter(1)
+    public CacheAtomicityMode atomicityMode;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "cacheMode={0}, atomicityMode={1}")
+    public static Collection<?> parameters() {
+        return Arrays.asList(new Object[][] {
+            {REPLICATED, ATOMIC},
+            {REPLICATED, TRANSACTIONAL},
+            {PARTITIONED, ATOMIC},
+            {PARTITIONED, TRANSACTIONAL}
+        });
+    }
+
+    /**
+     * Local pending limit for this test is less than MAX_PENDING_BUFF_SIZE,
+     * so pending entries must be cleaned prior to reaching it.
+     *
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testContinuousQueryBatchSwitchOnAck() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10, MAX_PENDING_BUFF_SIZE / 10);
+    }
+
+    /**
+     * The test blocks switching current CacheContinuousQueryEventBuffer.Batch to the new one, so
+     * pending entries will be processed (dropped on backups and send to the client on primaries)
+     * when the MAX_PENDING_BUFF_SIZE is reached.
+     *
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testContinuousQueryPendingBufferLimit() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            (msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10) ||
+                msg instanceof CacheContinuousQueryBatchAck, (int)(MAX_PENDING_BUFF_SIZE * 1.1));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testPendingSendToClientOnLimitReached() throws Exception {
+        AtomicInteger keys = new AtomicInteger();
+        AtomicReference<String> err = new AtomicReference<>();
+
+        IgniteEx srv = startGrids(2);
+        IgniteEx clnt = startClientGrid();
+
+        CacheEntryEventSerializableFilter<Integer, Integer> filter = new CacheEntryEventSerializableFilter<Integer,Integer>() {

Review comment:
       Here I mean that you can simplify this filter:
   `CacheEntryEventSerializableFilter<Integer, Integer> filter = evt -> evt.getKey() % 2 == 0;`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #7881: IGNITE-10959: Continuous query pending buffer limit

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #7881:
URL: https://github.com/apache/ignite/pull/7881#discussion_r445375443



##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBufferLimitTest.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.cache.configuration.FactoryBuilder;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.systemview.view.ContinuousQueryView;
+import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.CQ_SYS_VIEW;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class CacheContinuousQueryBufferLimitTest extends GridCommonAbstractTest {
+    /** Cache partitions count. */
+    private static final int PARTS = 1;
+
+    /** Total number of cache keys. */
+    private static final int TOTAL_KEYS = 1024;
+
+    /** Number of pending entries.  */
+    private static final int PENDING_LIMIT = 1100;
+
+    /** Timeout to wait for pending buffer overflow. */
+    private static final long OVERFLOW_TIMEOUT_MS = 15_000L;
+
+    /** Default remote no-op filter. */
+    private static final CacheEntryEventSerializableFilter<Integer, Integer> RMT_FILTER = e -> true;
+
+    /** Counter of cache messages being send. */
+    private final AtomicInteger msgCntr = new AtomicInteger();
+
+    /** Cache mode. */
+    @Parameterized.Parameter(0)
+    public CacheMode cacheMode;
+
+    /** Cache atomicity mode. */
+    @Parameterized.Parameter(1)
+    public CacheAtomicityMode atomicityMode;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "cacheMode={0}, atomicityMode={1}")
+    public static Collection<?> parameters() {
+        return Arrays.asList(new Object[][] {
+            {REPLICATED, ATOMIC},
+            {REPLICATED, TRANSACTIONAL},
+            {PARTITIONED, ATOMIC},
+            {PARTITIONED, TRANSACTIONAL}
+        });
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testContinuousQueryBatchSwitchOnAck() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10);
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    @WithSystemProperty(key = "IGNITE_CONTINUOUS_QUERY_PENDING_BUFF_SIZE", value = "1000")
+    public void testContinuousQueryPendingBufferLimit() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            (msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10) ||
+                msg instanceof CacheContinuousQueryBatchAck);
+    }
+
+
+    /** {@inheritDoc} */
+    @Override public IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setCommunicationSpi(new TestRecordingCommunicationSpi())
+            .setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+                .setAtomicityMode(atomicityMode)
+                .setCacheMode(cacheMode)
+                .setBackups(1)
+                .setAffinity(new RendezvousAffinityFunction(false, PARTS)));
+    }
+
+    /** */
+    @Before
+    public void resetMessageCounter() {
+        msgCntr.set(0);
+    }
+
+    /** */
+    @After
+    public void stopAllInstances() {
+        stopAllGrids();
+    }
+
+    /**
+     * @param locBlockPred Block predicate on local node to emulate message delivery issues.
+     * @throws Exception If fails.
+     */
+    private void doTestContinuousQueryPendingBufferLimit(
+        IgniteBiPredicate<ClusterNode, Message> locBlockPred
+    ) throws Exception
+    {
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        IgniteEx locIgnite = startGrid(0);
+        IgniteEx rmtIgnite = startGrid(1);
+
+        IgniteCache<Integer, Integer> cache = locIgnite.cache(DEFAULT_CACHE_NAME);
+        CacheConfiguration<Integer, Integer> ccfg = cache.getConfiguration(CacheConfiguration.class);
+
+        for (int i = 0; i < TOTAL_KEYS; i++)
+            cache.put(i, i);
+
+        assertEquals(PARTS, ccfg.getAffinity().partitions());
+
+        AtomicLong lastAcked = new AtomicLong();
+
+        ContinuousQuery<Integer, Integer> cq = new ContinuousQuery<>();
+        cq.setRemoteFilterFactory(FactoryBuilder.factoryOf(RMT_FILTER));
+        cq.setLocalListener((events) ->
+            events.forEach(e ->
+                lastAcked.getAndUpdate(c ->
+                    Math.max(c, ((CacheQueryEntryEvent<?, ?>)e).getPartitionUpdateCounter()))));
+        cq.setLocal(false);
+
+        IgniteInternalFuture<?> updFut = null;
+
+        try (QueryCursor<?> qry = locIgnite.cache(DEFAULT_CACHE_NAME).query(cq)) {
+            awaitPartitionMapExchange();
+
+            for (int j = 0; j < TOTAL_KEYS; j++)
+                putX2Value(cache, rnd.nextInt(TOTAL_KEYS));
+
+            SystemView<ContinuousQueryView> rmtQryView = rmtIgnite.context().systemView().view(CQ_SYS_VIEW);
+            assertEquals(1, rmtQryView.size());
+
+            UUID routineId = rmtQryView.iterator().next().routineId();
+
+            // Partition Id, Update Counter, Continuous Entry.
+            ConcurrentMap<Long, CacheContinuousQueryEntry> pending =
+                getContinuousQueryPendingBuffer(rmtIgnite, routineId, CU.cacheId(DEFAULT_CACHE_NAME), 0);
+
+            spi(locIgnite).blockMessages(locBlockPred);
+
+            updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+                while (!Thread.currentThread().isInterrupted()) {
+                    putX2Value(cache, rnd.nextInt(TOTAL_KEYS));

Review comment:
       Illegal '{ }' for one line statement

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBufferLimitTest.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.cache.configuration.FactoryBuilder;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.systemview.view.ContinuousQueryView;
+import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.CQ_SYS_VIEW;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class CacheContinuousQueryBufferLimitTest extends GridCommonAbstractTest {
+    /** Cache partitions count. */
+    private static final int PARTS = 1;
+
+    /** Total number of cache keys. */
+    private static final int TOTAL_KEYS = 1024;
+
+    /** Number of pending entries.  */
+    private static final int PENDING_LIMIT = 1100;
+
+    /** Timeout to wait for pending buffer overflow. */
+    private static final long OVERFLOW_TIMEOUT_MS = 15_000L;
+
+    /** Default remote no-op filter. */
+    private static final CacheEntryEventSerializableFilter<Integer, Integer> RMT_FILTER = e -> true;
+
+    /** Counter of cache messages being send. */
+    private final AtomicInteger msgCntr = new AtomicInteger();
+
+    /** Cache mode. */
+    @Parameterized.Parameter(0)
+    public CacheMode cacheMode;
+
+    /** Cache atomicity mode. */
+    @Parameterized.Parameter(1)
+    public CacheAtomicityMode atomicityMode;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "cacheMode={0}, atomicityMode={1}")
+    public static Collection<?> parameters() {
+        return Arrays.asList(new Object[][] {
+            {REPLICATED, ATOMIC},
+            {REPLICATED, TRANSACTIONAL},
+            {PARTITIONED, ATOMIC},
+            {PARTITIONED, TRANSACTIONAL}
+        });
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testContinuousQueryBatchSwitchOnAck() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10);
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    @WithSystemProperty(key = "IGNITE_CONTINUOUS_QUERY_PENDING_BUFF_SIZE", value = "1000")
+    public void testContinuousQueryPendingBufferLimit() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            (msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10) ||
+                msg instanceof CacheContinuousQueryBatchAck);
+    }
+
+

Review comment:
       Redundant NL

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBufferLimitTest.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.cache.configuration.FactoryBuilder;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.systemview.view.ContinuousQueryView;
+import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.CQ_SYS_VIEW;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class CacheContinuousQueryBufferLimitTest extends GridCommonAbstractTest {
+    /** Cache partitions count. */
+    private static final int PARTS = 1;
+
+    /** Total number of cache keys. */
+    private static final int TOTAL_KEYS = 1024;
+
+    /** Number of pending entries.  */
+    private static final int PENDING_LIMIT = 1100;
+
+    /** Timeout to wait for pending buffer overflow. */
+    private static final long OVERFLOW_TIMEOUT_MS = 15_000L;
+
+    /** Default remote no-op filter. */
+    private static final CacheEntryEventSerializableFilter<Integer, Integer> RMT_FILTER = e -> true;
+
+    /** Counter of cache messages being send. */
+    private final AtomicInteger msgCntr = new AtomicInteger();
+
+    /** Cache mode. */
+    @Parameterized.Parameter(0)
+    public CacheMode cacheMode;
+
+    /** Cache atomicity mode. */
+    @Parameterized.Parameter(1)
+    public CacheAtomicityMode atomicityMode;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "cacheMode={0}, atomicityMode={1}")
+    public static Collection<?> parameters() {
+        return Arrays.asList(new Object[][] {
+            {REPLICATED, ATOMIC},
+            {REPLICATED, TRANSACTIONAL},
+            {PARTITIONED, ATOMIC},
+            {PARTITIONED, TRANSACTIONAL}
+        });
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testContinuousQueryBatchSwitchOnAck() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10);
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    @WithSystemProperty(key = "IGNITE_CONTINUOUS_QUERY_PENDING_BUFF_SIZE", value = "1000")
+    public void testContinuousQueryPendingBufferLimit() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            (msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10) ||
+                msg instanceof CacheContinuousQueryBatchAck);
+    }
+
+
+    /** {@inheritDoc} */
+    @Override public IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setCommunicationSpi(new TestRecordingCommunicationSpi())
+            .setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+                .setAtomicityMode(atomicityMode)
+                .setCacheMode(cacheMode)
+                .setBackups(1)
+                .setAffinity(new RendezvousAffinityFunction(false, PARTS)));
+    }
+
+    /** */
+    @Before
+    public void resetMessageCounter() {
+        msgCntr.set(0);
+    }
+
+    /** */
+    @After
+    public void stopAllInstances() {
+        stopAllGrids();
+    }
+
+    /**
+     * @param locBlockPred Block predicate on local node to emulate message delivery issues.
+     * @throws Exception If fails.
+     */
+    private void doTestContinuousQueryPendingBufferLimit(
+        IgniteBiPredicate<ClusterNode, Message> locBlockPred
+    ) throws Exception
+    {
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        IgniteEx locIgnite = startGrid(0);
+        IgniteEx rmtIgnite = startGrid(1);
+
+        IgniteCache<Integer, Integer> cache = locIgnite.cache(DEFAULT_CACHE_NAME);
+        CacheConfiguration<Integer, Integer> ccfg = cache.getConfiguration(CacheConfiguration.class);
+
+        for (int i = 0; i < TOTAL_KEYS; i++)
+            cache.put(i, i);
+
+        assertEquals(PARTS, ccfg.getAffinity().partitions());
+
+        AtomicLong lastAcked = new AtomicLong();
+
+        ContinuousQuery<Integer, Integer> cq = new ContinuousQuery<>();
+        cq.setRemoteFilterFactory(FactoryBuilder.factoryOf(RMT_FILTER));
+        cq.setLocalListener((events) ->
+            events.forEach(e ->
+                lastAcked.getAndUpdate(c ->
+                    Math.max(c, ((CacheQueryEntryEvent<?, ?>)e).getPartitionUpdateCounter()))));
+        cq.setLocal(false);
+
+        IgniteInternalFuture<?> updFut = null;
+
+        try (QueryCursor<?> qry = locIgnite.cache(DEFAULT_CACHE_NAME).query(cq)) {
+            awaitPartitionMapExchange();
+
+            for (int j = 0; j < TOTAL_KEYS; j++)
+                putX2Value(cache, rnd.nextInt(TOTAL_KEYS));
+
+            SystemView<ContinuousQueryView> rmtQryView = rmtIgnite.context().systemView().view(CQ_SYS_VIEW);
+            assertEquals(1, rmtQryView.size());
+
+            UUID routineId = rmtQryView.iterator().next().routineId();
+
+            // Partition Id, Update Counter, Continuous Entry.
+            ConcurrentMap<Long, CacheContinuousQueryEntry> pending =
+                getContinuousQueryPendingBuffer(rmtIgnite, routineId, CU.cacheId(DEFAULT_CACHE_NAME), 0);
+
+            spi(locIgnite).blockMessages(locBlockPred);
+
+            updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+                while (!Thread.currentThread().isInterrupted()) {
+                    putX2Value(cache, rnd.nextInt(TOTAL_KEYS));
+                }
+            }, 3, "cq-put-");
+
+            assertNotNull("Partition remote buffers must be inited", pending);
+
+            log.warning("Waiting for pending buffer being overflowed within " + OVERFLOW_TIMEOUT_MS + " ms.");
+
+            boolean await = waitForCondition(() -> pending.size() > PENDING_LIMIT, OVERFLOW_TIMEOUT_MS);
+
+            assertFalse("Pending buffer exceeded the limit despite entries have been acked " +
+                    "[lastAcked=" + lastAcked + ", pending=" + S.compact(pending.keySet(), i -> i + 1) + ']',
+                await);
+        }
+        finally {
+            spi(locIgnite).stopBlock();
+
+            if (updFut != null)
+                updFut.cancel();
+        }
+    }
+
+    /**
+     * @param cache Ignite cache.
+     * @param key Key to change.
+     */
+    private static void putX2Value(IgniteCache<Integer, Integer> cache, int key) {

Review comment:
       Why not just put random key with the random value? We don't check these values after test anyway.

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBufferLimitTest.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.cache.configuration.FactoryBuilder;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.systemview.view.ContinuousQueryView;
+import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.CQ_SYS_VIEW;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class CacheContinuousQueryBufferLimitTest extends GridCommonAbstractTest {
+    /** Cache partitions count. */
+    private static final int PARTS = 1;
+
+    /** Total number of cache keys. */
+    private static final int TOTAL_KEYS = 1024;
+
+    /** Number of pending entries.  */
+    private static final int PENDING_LIMIT = 1100;
+
+    /** Timeout to wait for pending buffer overflow. */
+    private static final long OVERFLOW_TIMEOUT_MS = 15_000L;
+
+    /** Default remote no-op filter. */
+    private static final CacheEntryEventSerializableFilter<Integer, Integer> RMT_FILTER = e -> true;
+
+    /** Counter of cache messages being send. */
+    private final AtomicInteger msgCntr = new AtomicInteger();
+
+    /** Cache mode. */
+    @Parameterized.Parameter(0)
+    public CacheMode cacheMode;
+
+    /** Cache atomicity mode. */
+    @Parameterized.Parameter(1)
+    public CacheAtomicityMode atomicityMode;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "cacheMode={0}, atomicityMode={1}")
+    public static Collection<?> parameters() {
+        return Arrays.asList(new Object[][] {
+            {REPLICATED, ATOMIC},
+            {REPLICATED, TRANSACTIONAL},
+            {PARTITIONED, ATOMIC},
+            {PARTITIONED, TRANSACTIONAL}
+        });
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testContinuousQueryBatchSwitchOnAck() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10);
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    @WithSystemProperty(key = "IGNITE_CONTINUOUS_QUERY_PENDING_BUFF_SIZE", value = "1000")
+    public void testContinuousQueryPendingBufferLimit() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            (msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10) ||
+                msg instanceof CacheContinuousQueryBatchAck);
+    }
+
+
+    /** {@inheritDoc} */
+    @Override public IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setCommunicationSpi(new TestRecordingCommunicationSpi())
+            .setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+                .setAtomicityMode(atomicityMode)
+                .setCacheMode(cacheMode)
+                .setBackups(1)
+                .setAffinity(new RendezvousAffinityFunction(false, PARTS)));
+    }
+
+    /** */
+    @Before
+    public void resetMessageCounter() {
+        msgCntr.set(0);
+    }
+
+    /** */
+    @After
+    public void stopAllInstances() {
+        stopAllGrids();
+    }
+
+    /**
+     * @param locBlockPred Block predicate on local node to emulate message delivery issues.
+     * @throws Exception If fails.
+     */
+    private void doTestContinuousQueryPendingBufferLimit(
+        IgniteBiPredicate<ClusterNode, Message> locBlockPred
+    ) throws Exception
+    {
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        IgniteEx locIgnite = startGrid(0);
+        IgniteEx rmtIgnite = startGrid(1);
+
+        IgniteCache<Integer, Integer> cache = locIgnite.cache(DEFAULT_CACHE_NAME);
+        CacheConfiguration<Integer, Integer> ccfg = cache.getConfiguration(CacheConfiguration.class);
+
+        for (int i = 0; i < TOTAL_KEYS; i++)
+            cache.put(i, i);
+
+        assertEquals(PARTS, ccfg.getAffinity().partitions());
+
+        AtomicLong lastAcked = new AtomicLong();
+
+        ContinuousQuery<Integer, Integer> cq = new ContinuousQuery<>();
+        cq.setRemoteFilterFactory(FactoryBuilder.factoryOf(RMT_FILTER));
+        cq.setLocalListener((events) ->
+            events.forEach(e ->
+                lastAcked.getAndUpdate(c ->
+                    Math.max(c, ((CacheQueryEntryEvent<?, ?>)e).getPartitionUpdateCounter()))));
+        cq.setLocal(false);
+
+        IgniteInternalFuture<?> updFut = null;
+
+        try (QueryCursor<?> qry = locIgnite.cache(DEFAULT_CACHE_NAME).query(cq)) {
+            awaitPartitionMapExchange();
+
+            for (int j = 0; j < TOTAL_KEYS; j++)
+                putX2Value(cache, rnd.nextInt(TOTAL_KEYS));
+
+            SystemView<ContinuousQueryView> rmtQryView = rmtIgnite.context().systemView().view(CQ_SYS_VIEW);
+            assertEquals(1, rmtQryView.size());
+
+            UUID routineId = rmtQryView.iterator().next().routineId();
+
+            // Partition Id, Update Counter, Continuous Entry.
+            ConcurrentMap<Long, CacheContinuousQueryEntry> pending =
+                getContinuousQueryPendingBuffer(rmtIgnite, routineId, CU.cacheId(DEFAULT_CACHE_NAME), 0);
+
+            spi(locIgnite).blockMessages(locBlockPred);
+
+            updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+                while (!Thread.currentThread().isInterrupted()) {
+                    putX2Value(cache, rnd.nextInt(TOTAL_KEYS));
+                }
+            }, 3, "cq-put-");
+
+            assertNotNull("Partition remote buffers must be inited", pending);
+
+            log.warning("Waiting for pending buffer being overflowed within " + OVERFLOW_TIMEOUT_MS + " ms.");
+
+            boolean await = waitForCondition(() -> pending.size() > PENDING_LIMIT, OVERFLOW_TIMEOUT_MS);

Review comment:
       I think we wait here too long. With 15 secs timeout, it's about 2.5 additional minutes for all tests.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -172,21 +204,51 @@ private Object process0(long cntr, CacheContinuousQueryEntry entry, boolean back
                 if (res == RETRY)
                     continue;
             }
-            else
+            else {
+                if (batch.endCntr < ackedUpdCntr.get())
+                    batch.tryRollOver(entry.topologyVersion());
+
+                if (pendingSize.get() > MAX_PENDING_BUFF_SIZE) {
+                    LT.warn(log, "Buffer for pending events reached max of its size " +
+                        "[cacheId=" + entry.cacheId() + ", maxSize=" + MAX_PENDING_BUFF_SIZE +
+                        ", partId=" + entry.partition() + ']');
+
+                    // Remove first BUFF_SIZE keys.
+                    int keysToRemove = BUF_SIZE;
+
+                    Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pending.entrySet().iterator();
+
+                    while (iter.hasNext() && keysToRemove > 0) {
+                        CacheContinuousQueryEntry entry0 = iter.next().getValue();
+
+                        // Discard messages on backup and send to client if primary.
+                        if (!backup)
+                            res = addResult(res, entry0, !backup);

Review comment:
       Why do we use `!backup` here? In this case entries will go to backupQ on primary node or fail with assert. 

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBufferLimitTest.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.cache.configuration.FactoryBuilder;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.systemview.view.ContinuousQueryView;
+import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.CQ_SYS_VIEW;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class CacheContinuousQueryBufferLimitTest extends GridCommonAbstractTest {
+    /** Cache partitions count. */
+    private static final int PARTS = 1;
+
+    /** Total number of cache keys. */
+    private static final int TOTAL_KEYS = 1024;
+
+    /** Number of pending entries.  */
+    private static final int PENDING_LIMIT = 1100;
+
+    /** Timeout to wait for pending buffer overflow. */
+    private static final long OVERFLOW_TIMEOUT_MS = 15_000L;
+
+    /** Default remote no-op filter. */
+    private static final CacheEntryEventSerializableFilter<Integer, Integer> RMT_FILTER = e -> true;
+
+    /** Counter of cache messages being send. */
+    private final AtomicInteger msgCntr = new AtomicInteger();
+
+    /** Cache mode. */
+    @Parameterized.Parameter(0)
+    public CacheMode cacheMode;
+
+    /** Cache atomicity mode. */
+    @Parameterized.Parameter(1)
+    public CacheAtomicityMode atomicityMode;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "cacheMode={0}, atomicityMode={1}")
+    public static Collection<?> parameters() {
+        return Arrays.asList(new Object[][] {
+            {REPLICATED, ATOMIC},
+            {REPLICATED, TRANSACTIONAL},
+            {PARTITIONED, ATOMIC},
+            {PARTITIONED, TRANSACTIONAL}
+        });
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testContinuousQueryBatchSwitchOnAck() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10);
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    @WithSystemProperty(key = "IGNITE_CONTINUOUS_QUERY_PENDING_BUFF_SIZE", value = "1000")

Review comment:
       It's dangerous since MAX_PENDING_BUFF_SIZE constant is static. If `CacheContinuousQueryEventBuffer` class will be first loaded by this test, other continuous queries tests (in this JVM) can fail. If this class was already loaded and used before this test, property will have no effect and this case can fail.

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBufferLimitTest.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.cache.configuration.FactoryBuilder;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.systemview.view.ContinuousQueryView;
+import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.CQ_SYS_VIEW;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class CacheContinuousQueryBufferLimitTest extends GridCommonAbstractTest {
+    /** Cache partitions count. */
+    private static final int PARTS = 1;
+
+    /** Total number of cache keys. */
+    private static final int TOTAL_KEYS = 1024;
+
+    /** Number of pending entries.  */
+    private static final int PENDING_LIMIT = 1100;
+
+    /** Timeout to wait for pending buffer overflow. */
+    private static final long OVERFLOW_TIMEOUT_MS = 15_000L;
+
+    /** Default remote no-op filter. */
+    private static final CacheEntryEventSerializableFilter<Integer, Integer> RMT_FILTER = e -> true;
+
+    /** Counter of cache messages being send. */
+    private final AtomicInteger msgCntr = new AtomicInteger();
+
+    /** Cache mode. */
+    @Parameterized.Parameter(0)
+    public CacheMode cacheMode;
+
+    /** Cache atomicity mode. */
+    @Parameterized.Parameter(1)
+    public CacheAtomicityMode atomicityMode;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "cacheMode={0}, atomicityMode={1}")
+    public static Collection<?> parameters() {
+        return Arrays.asList(new Object[][] {
+            {REPLICATED, ATOMIC},
+            {REPLICATED, TRANSACTIONAL},
+            {PARTITIONED, ATOMIC},
+            {PARTITIONED, TRANSACTIONAL}
+        });
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testContinuousQueryBatchSwitchOnAck() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10);
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    @WithSystemProperty(key = "IGNITE_CONTINUOUS_QUERY_PENDING_BUFF_SIZE", value = "1000")
+    public void testContinuousQueryPendingBufferLimit() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            (msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10) ||
+                msg instanceof CacheContinuousQueryBatchAck);
+    }
+
+
+    /** {@inheritDoc} */
+    @Override public IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setCommunicationSpi(new TestRecordingCommunicationSpi())
+            .setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+                .setAtomicityMode(atomicityMode)
+                .setCacheMode(cacheMode)
+                .setBackups(1)
+                .setAffinity(new RendezvousAffinityFunction(false, PARTS)));
+    }
+
+    /** */
+    @Before
+    public void resetMessageCounter() {
+        msgCntr.set(0);
+    }
+
+    /** */
+    @After
+    public void stopAllInstances() {
+        stopAllGrids();
+    }
+
+    /**
+     * @param locBlockPred Block predicate on local node to emulate message delivery issues.
+     * @throws Exception If fails.
+     */
+    private void doTestContinuousQueryPendingBufferLimit(
+        IgniteBiPredicate<ClusterNode, Message> locBlockPred
+    ) throws Exception
+    {
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        IgniteEx locIgnite = startGrid(0);
+        IgniteEx rmtIgnite = startGrid(1);
+
+        IgniteCache<Integer, Integer> cache = locIgnite.cache(DEFAULT_CACHE_NAME);
+        CacheConfiguration<Integer, Integer> ccfg = cache.getConfiguration(CacheConfiguration.class);
+
+        for (int i = 0; i < TOTAL_KEYS; i++)
+            cache.put(i, i);
+
+        assertEquals(PARTS, ccfg.getAffinity().partitions());
+
+        AtomicLong lastAcked = new AtomicLong();
+
+        ContinuousQuery<Integer, Integer> cq = new ContinuousQuery<>();
+        cq.setRemoteFilterFactory(FactoryBuilder.factoryOf(RMT_FILTER));
+        cq.setLocalListener((events) ->
+            events.forEach(e ->
+                lastAcked.getAndUpdate(c ->

Review comment:
       GridAtomicLong.setIfGreater?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #7881: IGNITE-10959: Continuous query pending buffer limit

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #7881:
URL: https://github.com/apache/ignite/pull/7881#discussion_r443681723



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -74,6 +106,11 @@ void cleanupBackupQueue(Long updateCntr) {
             if (backupEntry.updateCounter() <= updateCntr)
                 it.remove();
         }
+
+        long val = ackedUpdCntr;
+
+        while (val < updateCntr && !ACKED_UPDATER.compareAndSet(this, val, updateCntr))

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #7881: IGNITE-10959: Continuous query pending buffer limit

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #7881:
URL: https://github.com/apache/ignite/pull/7881#discussion_r442017767



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -463,26 +533,47 @@ private CacheContinuousQueryEntry filteredEntry(long cntr, long filtered) {
 
                     lastProc = pos;
 
-                    if (pos == entries.length - 1) {
-                        Arrays.fill(entries, null);
+                    if (pos == entries.length - 1)
+                        rollOver(startCntr + BUF_SIZE, filtered, entry.topologyVersion());
+                }
+                else if (endCntr < ackedUpdCntr0)
+                    rollOver(ackedUpdCntr0 + 1, 0, entry.topologyVersion());
 
-                        Batch nextBatch = new Batch(this.startCntr + BUF_SIZE,
-                            filtered,
-                            entries,
-                            entry.topologyVersion());
+                return res;
+            }
+        }
 
-                        entries = null;
+        /**
+         * @param topVer Topology version of current processing entry.
+         */
+        private synchronized void tryRollOver(AffinityTopologyVersion topVer) {
+            if (entries == null)
+                return;
 
-                        assert curBatch.get() == this;
+            long ackedUpdCntr0 = ackedUpdCntr;
 
-                        curBatch.set(nextBatch);
-                    }
-                }
-                else
-                    return res;
-            }
+            if (endCntr < ackedUpdCntr0)

Review comment:
       You should also check `this` against `curBatch`, since `curBatch` can be changed concurrently and in this case you will fail with assert on `rollOver`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -172,21 +210,50 @@ private Object process0(long cntr, CacheContinuousQueryEntry entry, boolean back
                 if (res == RETRY)
                     continue;
             }
-            else
+            else {
+                if (batch.endCntr < ackedUpdCntr)
+                    batch.tryRollOver(entry.topologyVersion());
+
+                if (pendingSize.get() > MAX_PENDING_BUFF_SIZE) {
+                    LT.warn(log, "Buffer for pending events reached max of its size " +
+                        "[cacheId=" + entry.cacheId() + ", maxSize=" + MAX_PENDING_BUFF_SIZE +
+                        ", partId=" + entry.partition() + ']');
+
+                    // Remove first BUFF_SIZE keys.
+                    int keysToRemove = BUF_SIZE;
+
+                    Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pending.entrySet().iterator();
+
+                    res = new ArrayList<>();
+
+                    while (iter.hasNext() && keysToRemove > 0) {
+                        // Collecting results ignore backup flag due to batch may not be switched.
+                        res = addResult(res, iter.next().getValue(), false);
+
+                        iter.remove();
+                        pendingSize.decrementAndGet();
+
+                        keysToRemove--;
+                    }
+                }
+
+                pendingSize.incrementAndGet();
                 pending.put(cntr, entry);
+            }
 
             break;
         }
 
         Batch batch0 = curBatch.get();
 
+        // Batch has been changed on entry processing to the new one.
         if (batch0 != batch) {

Review comment:
       Can be simplified with only one `while`, without `if`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #7881: IGNITE-10959: Continuous query pending buffer limit

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #7881:
URL: https://github.com/apache/ignite/pull/7881#discussion_r444110959



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -463,26 +533,47 @@ private CacheContinuousQueryEntry filteredEntry(long cntr, long filtered) {
 
                     lastProc = pos;
 
-                    if (pos == entries.length - 1) {
-                        Arrays.fill(entries, null);
+                    if (pos == entries.length - 1)
+                        rollOver(startCntr + BUF_SIZE, filtered, entry.topologyVersion());
+                }
+                else if (endCntr < ackedUpdCntr0)
+                    rollOver(ackedUpdCntr0 + 1, 0, entry.topologyVersion());
 
-                        Batch nextBatch = new Batch(this.startCntr + BUF_SIZE,
-                            filtered,
-                            entries,
-                            entry.topologyVersion());
+                return res;
+            }
+        }
 
-                        entries = null;
+        /**
+         * @param topVer Topology version of current processing entry.
+         */
+        private synchronized void tryRollOver(AffinityTopologyVersion topVer) {
+            if (entries == null)
+                return;
 
-                        assert curBatch.get() == this;
+            long ackedUpdCntr0 = ackedUpdCntr;
 
-                        curBatch.set(nextBatch);
-                    }
-                }
-                else
-                    return res;
-            }
+            if (endCntr < ackedUpdCntr0)

Review comment:
       I think it's enough to check `entries == null` under `this` monitor synchronized block, this will guarantee us that `batch` will not be changed concurrently.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #7881: IGNITE-10959: Continuous query pending buffer limit

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #7881:
URL: https://github.com/apache/ignite/pull/7881#discussion_r451391072



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -172,21 +204,51 @@ private Object process0(long cntr, CacheContinuousQueryEntry entry, boolean back
                 if (res == RETRY)
                     continue;
             }
-            else
+            else {
+                if (batch.endCntr < ackedUpdCntr.get())
+                    batch.tryRollOver(entry.topologyVersion());
+
+                if (pendingCurrSize.get() > MAX_PENDING_BUFF_SIZE) {

Review comment:
       Agree, `synchroniztion(pending)` added.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #7881: IGNITE-10959: Continuous query pending buffer limit

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #7881:
URL: https://github.com/apache/ignite/pull/7881#discussion_r451388955



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
##########
@@ -99,6 +99,10 @@
     static final int LSNR_MAX_BUF_SIZE =
         IgniteSystemProperties.getInteger("IGNITE_CONTINUOUS_QUERY_LISTENER_MAX_BUFFER_SIZE", 10_000);
 
+    /** Maximum size of buffer for pending events. Default value is {@code 10_000}. */
+    public final int maxPendingBuffSize =

Review comment:
       Removed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #7881: IGNITE-10959: Continuous query pending buffer limit

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #7881:
URL: https://github.com/apache/ignite/pull/7881#discussion_r440629742



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -154,6 +191,7 @@ private Object process0(long cntr, CacheContinuousQueryEntry entry, boolean back
         Object res = null;
 
         for (;;) {
+            // Set bach only if batch is null (first attempt).

Review comment:
       bach -> batch

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -74,6 +106,11 @@ void cleanupBackupQueue(Long updateCntr) {
             if (backupEntry.updateCounter() <= updateCntr)
                 it.remove();
         }
+
+        long val = ackedUpdCntr;
+
+        while (val < updateCntr && !ACKED_UPDATER.compareAndSet(this, val, updateCntr))

Review comment:
       Use `GridAtomicLong.setIfGreater()` here

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/AbstractCacheContinuousQueryBufferLimitTest.java
##########
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.cache.configuration.FactoryBuilder;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.systemview.view.ContinuousQueryView;
+import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.CQ_SYS_VIEW;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ *
+ */
+public abstract class AbstractCacheContinuousQueryBufferLimitTest extends GridCommonAbstractTest {

Review comment:
       Let's use `@Parameterized` instead of class extending.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org