You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2020/07/07 19:13:01 UTC

[GitHub] [ignite] alex-plekhanov commented on a change in pull request #7881: IGNITE-10959: Continuous query pending buffer limit

alex-plekhanov commented on a change in pull request #7881:
URL: https://github.com/apache/ignite/pull/7881#discussion_r451034133



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
##########
@@ -99,6 +99,10 @@
     static final int LSNR_MAX_BUF_SIZE =
         IgniteSystemProperties.getInteger("IGNITE_CONTINUOUS_QUERY_LISTENER_MAX_BUFFER_SIZE", 10_000);
 
+    /** Maximum size of buffer for pending events. Default value is {@code 10_000}. */
+    public final int maxPendingBuffSize =

Review comment:
       Why do we need this field? It's not used.

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBufferLimitTest.java
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.event.CacheEntryEvent;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.GridAtomicLong;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.systemview.view.ContinuousQueryView;
+import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEventBuffer.MAX_PENDING_BUFF_SIZE;
+import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.CQ_SYS_VIEW;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class CacheContinuousQueryBufferLimitTest extends GridCommonAbstractTest {
+    /** Cache partitions count. */
+    private static final int PARTS = 1;
+
+    /** Total number of cache keys. */
+    private static final int TOTAL_KEYS = 1024;
+
+    /** Timeout to wait for pending buffer overflow. */
+    private static final long OVERFLOW_TIMEOUT_MS = 10_000L;
+
+    /** Default remote no-op filter. */
+    private static final CacheEntryEventSerializableFilter<Integer, Integer> RMT_FILTER = e -> true;
+
+    /** Counter of cache messages being send. */
+    private final AtomicInteger msgCntr = new AtomicInteger();
+
+    /** Cache mode. */
+    @Parameterized.Parameter(0)
+    public CacheMode cacheMode;
+
+    /** Cache atomicity mode. */
+    @Parameterized.Parameter(1)
+    public CacheAtomicityMode atomicityMode;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "cacheMode={0}, atomicityMode={1}")
+    public static Collection<?> parameters() {
+        return Arrays.asList(new Object[][] {
+            {REPLICATED, ATOMIC},
+            {REPLICATED, TRANSACTIONAL},
+            {PARTITIONED, ATOMIC},
+            {PARTITIONED, TRANSACTIONAL}
+        });
+    }
+
+    /**
+     * Local pending limit for this test is less than MAX_PENDING_BUFF_SIZE,
+     * so pending entries must be cleaned prior to reaching it.
+     *
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testContinuousQueryBatchSwitchOnAck() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10, MAX_PENDING_BUFF_SIZE / 10);
+    }
+
+    /**
+     * The test blocks switching current CacheContinuousQueryEventBuffer.Batch to the new one, so
+     * pending entries will be processed (dropped on backups and send to the client on primaries)
+     * when the MAX_PENDING_BUFF_SIZE is reached.
+     *
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testContinuousQueryPendingBufferLimit() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            (msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10) ||
+                msg instanceof CacheContinuousQueryBatchAck, (int)(MAX_PENDING_BUFF_SIZE * 1.1));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testPendingSendToClientOnLimitReached() throws Exception {
+        AtomicInteger keys = new AtomicInteger();
+        AtomicReference<String> err = new AtomicReference<>();
+
+        IgniteEx srv = startGrids(2);
+        IgniteEx clnt = startClientGrid();
+
+        CacheEntryEventSerializableFilter<Integer, Integer> filter = new CacheEntryEventSerializableFilter<Integer,Integer>() {
+            @Override public boolean evaluate(CacheEntryEvent<? extends Integer,? extends Integer> evt) {
+                return evt.getKey() % 2 == 0;
+            }
+        };
+
+        ContinuousQuery<Integer, Integer> cq = new ContinuousQuery<>();
+        cq.setRemoteFilterFactory(FactoryBuilder.factoryOf(filter));
+        cq.setLocalListener((events) -> events.forEach(e -> {
+            if (!filter.evaluate(e))
+                err.compareAndSet(null, "Key must be filtered [e=" + e + ']');
+        }));
+        cq.setLocal(false);
+
+        spi(srv).blockMessages((nodeId, msg) -> msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10);
+        spi(clnt).blockMessages((nodeId, msg) -> msg instanceof CacheContinuousQueryBatchAck);
+
+        IgniteInternalFuture<?> loadFut = null;
+
+        try (QueryCursor<?> qry = clnt.cache(DEFAULT_CACHE_NAME).query(cq)) {
+            awaitPartitionMapExchange();
+
+            loadFut = GridTestUtils.runMultiThreadedAsync(() -> {
+                while (!Thread.currentThread().isInterrupted())
+                    clnt.cache(DEFAULT_CACHE_NAME).put(keys.incrementAndGet(), 0);
+            }, 3, "cq-put-");
+
+            ConcurrentMap<Long, CacheContinuousQueryEntry> pending =
+                getContinuousQueryPendingBuffer(grid(0), CU.cacheId(DEFAULT_CACHE_NAME), 0);
+
+            waitForCondition(() -> pending.size() > MAX_PENDING_BUFF_SIZE, 15_000);
+
+            // Check all entries greater than limit filtered correctly.
+            waitForCondition(() -> keys.get() > MAX_PENDING_BUFF_SIZE * 2, 15_000);

Review comment:
       I can't match comment and condition. What do you mean here?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -172,21 +204,51 @@ private Object process0(long cntr, CacheContinuousQueryEntry entry, boolean back
                 if (res == RETRY)
                     continue;
             }
-            else
+            else {
+                if (batch.endCntr < ackedUpdCntr.get())

Review comment:
       You should move this condition out from here, or at least continue the cycle after `tryRollOver`. 
   For example, if you have acked counter = 100 and cntr == 101 entry should not go to the pending entries map, instead it should go to current batch, since batch is rolled over. If you skip this entry in the current batch you will never switch this batch successfully, since first entry is missing in the batch and `lastProc` will never increment.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -154,6 +185,7 @@ private Object process0(long cntr, CacheContinuousQueryEntry entry, boolean back
         Object res = null;
 
         for (;;) {
+            // Set batch only if batch is null (first attempt).
             batch = initBatch(entry.topologyVersion(), backup);
 
             if (batch == null || cntr < batch.startCntr) {

Review comment:
       We can skip `backupQ.add()` if we already have acked counter more then current counter.

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBufferLimitTest.java
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.event.CacheEntryEvent;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.GridAtomicLong;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.systemview.view.ContinuousQueryView;
+import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEventBuffer.MAX_PENDING_BUFF_SIZE;
+import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.CQ_SYS_VIEW;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class CacheContinuousQueryBufferLimitTest extends GridCommonAbstractTest {
+    /** Cache partitions count. */
+    private static final int PARTS = 1;
+
+    /** Total number of cache keys. */
+    private static final int TOTAL_KEYS = 1024;
+
+    /** Timeout to wait for pending buffer overflow. */
+    private static final long OVERFLOW_TIMEOUT_MS = 10_000L;
+
+    /** Default remote no-op filter. */
+    private static final CacheEntryEventSerializableFilter<Integer, Integer> RMT_FILTER = e -> true;
+
+    /** Counter of cache messages being send. */
+    private final AtomicInteger msgCntr = new AtomicInteger();
+
+    /** Cache mode. */
+    @Parameterized.Parameter(0)
+    public CacheMode cacheMode;
+
+    /** Cache atomicity mode. */
+    @Parameterized.Parameter(1)
+    public CacheAtomicityMode atomicityMode;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "cacheMode={0}, atomicityMode={1}")
+    public static Collection<?> parameters() {
+        return Arrays.asList(new Object[][] {
+            {REPLICATED, ATOMIC},
+            {REPLICATED, TRANSACTIONAL},
+            {PARTITIONED, ATOMIC},
+            {PARTITIONED, TRANSACTIONAL}
+        });
+    }
+
+    /**
+     * Local pending limit for this test is less than MAX_PENDING_BUFF_SIZE,
+     * so pending entries must be cleaned prior to reaching it.
+     *
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testContinuousQueryBatchSwitchOnAck() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10, MAX_PENDING_BUFF_SIZE / 10);
+    }
+
+    /**
+     * The test blocks switching current CacheContinuousQueryEventBuffer.Batch to the new one, so
+     * pending entries will be processed (dropped on backups and send to the client on primaries)
+     * when the MAX_PENDING_BUFF_SIZE is reached.
+     *
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testContinuousQueryPendingBufferLimit() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            (msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10) ||
+                msg instanceof CacheContinuousQueryBatchAck, (int)(MAX_PENDING_BUFF_SIZE * 1.1));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testPendingSendToClientOnLimitReached() throws Exception {
+        AtomicInteger keys = new AtomicInteger();
+        AtomicReference<String> err = new AtomicReference<>();
+
+        IgniteEx srv = startGrids(2);
+        IgniteEx clnt = startClientGrid();
+
+        CacheEntryEventSerializableFilter<Integer, Integer> filter = new CacheEntryEventSerializableFilter<Integer,Integer>() {
+            @Override public boolean evaluate(CacheEntryEvent<? extends Integer,? extends Integer> evt) {
+                return evt.getKey() % 2 == 0;
+            }
+        };
+
+        ContinuousQuery<Integer, Integer> cq = new ContinuousQuery<>();
+        cq.setRemoteFilterFactory(FactoryBuilder.factoryOf(filter));
+        cq.setLocalListener((events) -> events.forEach(e -> {
+            if (!filter.evaluate(e))
+                err.compareAndSet(null, "Key must be filtered [e=" + e + ']');
+        }));
+        cq.setLocal(false);
+
+        spi(srv).blockMessages((nodeId, msg) -> msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10);
+        spi(clnt).blockMessages((nodeId, msg) -> msg instanceof CacheContinuousQueryBatchAck);
+
+        IgniteInternalFuture<?> loadFut = null;
+
+        try (QueryCursor<?> qry = clnt.cache(DEFAULT_CACHE_NAME).query(cq)) {
+            awaitPartitionMapExchange();
+
+            loadFut = GridTestUtils.runMultiThreadedAsync(() -> {
+                while (!Thread.currentThread().isInterrupted())
+                    clnt.cache(DEFAULT_CACHE_NAME).put(keys.incrementAndGet(), 0);
+            }, 3, "cq-put-");
+
+            ConcurrentMap<Long, CacheContinuousQueryEntry> pending =
+                getContinuousQueryPendingBuffer(grid(0), CU.cacheId(DEFAULT_CACHE_NAME), 0);
+
+            waitForCondition(() -> pending.size() > MAX_PENDING_BUFF_SIZE, 15_000);

Review comment:
       We should assertFalse here. 
   
   But personally I don't like the idea with such big timeouts. I've run new tests and it takes about 3 minutes, and this time will be about the same on any hardware due to timeouts.
   Can we rewrite the tests using keys count limit instead of timeouts? I think `MAX_PENDING_BUFF_SIZE * 3` will be enough for this test. Instead of "wait for condition" you can replace "pending" field by your own map which raise some error flag, for example, on put when size() is more than `MAX_PENDING_BUFF_SIZE`. The same for other tests.
   
   Also in current implementation there is no synchronization between `pendingCurrSize.get() > MAX_PENDING_BUFF_SIZE` and `pendingCurrSize.incrementAndGet()`, so due to races you can get sometimes values more than limit (but no more than limit + threads count). 

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBufferLimitTest.java
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.event.CacheEntryEvent;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.GridAtomicLong;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.systemview.view.ContinuousQueryView;
+import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEventBuffer.MAX_PENDING_BUFF_SIZE;
+import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.CQ_SYS_VIEW;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class CacheContinuousQueryBufferLimitTest extends GridCommonAbstractTest {
+    /** Cache partitions count. */
+    private static final int PARTS = 1;
+
+    /** Total number of cache keys. */
+    private static final int TOTAL_KEYS = 1024;
+
+    /** Timeout to wait for pending buffer overflow. */
+    private static final long OVERFLOW_TIMEOUT_MS = 10_000L;
+
+    /** Default remote no-op filter. */
+    private static final CacheEntryEventSerializableFilter<Integer, Integer> RMT_FILTER = e -> true;
+
+    /** Counter of cache messages being send. */
+    private final AtomicInteger msgCntr = new AtomicInteger();
+
+    /** Cache mode. */
+    @Parameterized.Parameter(0)
+    public CacheMode cacheMode;
+
+    /** Cache atomicity mode. */
+    @Parameterized.Parameter(1)
+    public CacheAtomicityMode atomicityMode;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "cacheMode={0}, atomicityMode={1}")
+    public static Collection<?> parameters() {
+        return Arrays.asList(new Object[][] {
+            {REPLICATED, ATOMIC},
+            {REPLICATED, TRANSACTIONAL},
+            {PARTITIONED, ATOMIC},
+            {PARTITIONED, TRANSACTIONAL}
+        });
+    }
+
+    /**
+     * Local pending limit for this test is less than MAX_PENDING_BUFF_SIZE,
+     * so pending entries must be cleaned prior to reaching it.
+     *
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testContinuousQueryBatchSwitchOnAck() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10, MAX_PENDING_BUFF_SIZE / 10);
+    }
+
+    /**
+     * The test blocks switching current CacheContinuousQueryEventBuffer.Batch to the new one, so
+     * pending entries will be processed (dropped on backups and send to the client on primaries)
+     * when the MAX_PENDING_BUFF_SIZE is reached.
+     *
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testContinuousQueryPendingBufferLimit() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            (msg instanceof GridCacheIdMessage && msgCntr.getAndIncrement() == 10) ||
+                msg instanceof CacheContinuousQueryBatchAck, (int)(MAX_PENDING_BUFF_SIZE * 1.1));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testPendingSendToClientOnLimitReached() throws Exception {
+        AtomicInteger keys = new AtomicInteger();
+        AtomicReference<String> err = new AtomicReference<>();
+
+        IgniteEx srv = startGrids(2);
+        IgniteEx clnt = startClientGrid();
+
+        CacheEntryEventSerializableFilter<Integer, Integer> filter = new CacheEntryEventSerializableFilter<Integer,Integer>() {

Review comment:
       Can be simplified with lambda `evt -> evt.getKey() % 2 == 0`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -172,21 +204,51 @@ private Object process0(long cntr, CacheContinuousQueryEntry entry, boolean back
                 if (res == RETRY)
                     continue;
             }
-            else
+            else {
+                if (batch.endCntr < ackedUpdCntr.get())
+                    batch.tryRollOver(entry.topologyVersion());
+
+                if (pendingCurrSize.get() > MAX_PENDING_BUFF_SIZE) {

Review comment:
       You should use some synchronization here, only one thread at a time should be able to remove pending entries. In other case you can iterate concurrently on the same entry, send duplicate to the client, call `remove` and decrement size twice (however only one entry is removed)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org