You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by mm...@apache.org on 2020/07/10 17:06:38 UTC

[ignite] branch master updated: IGNITE-10959: Add continuous query pending buffer limit (#7881)

This is an automated email from the ASF dual-hosted git repository.

mmuzaf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new e730185  IGNITE-10959: Add continuous query pending buffer limit (#7881)
e730185 is described below

commit e730185565a75fa338496814f0eaec1d1315d7c9
Author: Maxim Muzafarov <mm...@apache.org>
AuthorDate: Fri Jul 10 20:06:11 2020 +0300

    IGNITE-10959: Add continuous query pending buffer limit (#7881)
---
 .../CacheContinuousQueryEventBuffer.java           | 167 ++++++++---
 .../continuous/CacheContinuousQueryHandler.java    |   4 +-
 .../CacheContinuousQueryBufferLimitTest.java       | 334 +++++++++++++++++++++
 .../apache/ignite/testframework/GridTestUtils.java |  20 +-
 .../testsuites/IgniteCacheQuerySelfTestSuite3.java |   4 +-
 5 files changed, 485 insertions(+), 44 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
index 4b45d72..5807ccf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
@@ -26,9 +26,13 @@ import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.GridAtomicLong;
+import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.util.deque.FastSizeDeque;
 import org.jetbrains.annotations.Nullable;
 
@@ -36,30 +40,55 @@ import org.jetbrains.annotations.Nullable;
  *
  */
 public class CacheContinuousQueryEventBuffer {
-    /** */
+    /** Maximum size of buffer for pending events. Default value is {@code 10_000}. */
+    public static final int MAX_PENDING_BUFF_SIZE =
+        IgniteSystemProperties.getInteger("IGNITE_CONTINUOUS_QUERY_PENDING_BUFF_SIZE", 10_000);
+
+    /** Batch buffer size. */
     private static final int BUF_SIZE =
         IgniteSystemProperties.getInteger("IGNITE_CONTINUOUS_QUERY_SERVER_BUFFER_SIZE", 1000);
 
     /** */
     private static final Object RETRY = new Object();
 
-    /** */
-    protected final int part;
+    /** Continuous query category logger. */
+    private final IgniteLogger log;
 
     /** */
-    private AtomicReference<Batch> curBatch = new AtomicReference<>();
+    private final int part;
 
-    /** */
-    private FastSizeDeque<CacheContinuousQueryEntry> backupQ = new FastSizeDeque<>(new ConcurrentLinkedDeque<>());
+    /** Batch of entries currently being collected to send to the remote. */
+    private final AtomicReference<Batch> curBatch = new AtomicReference<>();
 
-    /** */
-    private ConcurrentSkipListMap<Long, CacheContinuousQueryEntry> pending = new ConcurrentSkipListMap<>();
+    /** Queue for keeping backup entries which partition counter less the counter processing by current batch. */
+    private final FastSizeDeque<CacheContinuousQueryEntry> backupQ = new FastSizeDeque<>(new ConcurrentLinkedDeque<>());
+
+    /** Entries which are waiting for being processed. */
+    private final ConcurrentSkipListMap<Long, CacheContinuousQueryEntry> pending = new ConcurrentSkipListMap<>();
+
+    /**
+     * The size method of the pending ConcurrentSkipListMap is not a constant-time operation. Since each
+     * entry processed under the GridCacheMapEntry lock it's necessary to maintain the size of map explicitly.
+     */
+    private final AtomicInteger pendingCurrSize = new AtomicInteger();
+
+    /** Last seen ack partition counter tracked by the CQ handler partition recovery queue. */
+    private final GridAtomicLong ackedUpdCntr = new GridAtomicLong(0);
 
     /**
      * @param part Partition number.
+     * @param log Continuous query category logger.
      */
-    CacheContinuousQueryEventBuffer(int part) {
+    CacheContinuousQueryEventBuffer(int part, IgniteLogger log) {
         this.part = part;
+        this.log = log;
+    }
+
+    /**
+     * @param part Partition number.
+     */
+    CacheContinuousQueryEventBuffer(int part) {
+        this(part, null);
     }
 
     /**
@@ -74,6 +103,8 @@ public class CacheContinuousQueryEventBuffer {
             if (backupEntry.updateCounter() <= updateCntr)
                 it.remove();
         }
+
+        ackedUpdCntr.setIfGreater(updateCntr);
     }
 
     /**
@@ -154,6 +185,7 @@ public class CacheContinuousQueryEventBuffer {
         Object res = null;
 
         for (;;) {
+            // Set batch only if batch is null (first attempt).
             batch = initBatch(entry.topologyVersion(), backup);
 
             if (batch == null || cntr < batch.startCntr) {
@@ -172,21 +204,55 @@ public class CacheContinuousQueryEventBuffer {
                 if (res == RETRY)
                     continue;
             }
-            else
+            else {
+                if (batch.endCntr < ackedUpdCntr.get() && batch.tryRollOver(entry.topologyVersion()) == RETRY)
+                    continue;
+
+                pendingCurrSize.incrementAndGet();
                 pending.put(cntr, entry);
 
+                if (pendingCurrSize.get() > MAX_PENDING_BUFF_SIZE) {
+                    synchronized (pending) {
+                        if (pendingCurrSize.get() <= MAX_PENDING_BUFF_SIZE)
+                            break;
+
+                        LT.warn(log, "Buffer for pending events reached max of its size " +
+                            "[cacheId=" + entry.cacheId() + ", maxSize=" + MAX_PENDING_BUFF_SIZE +
+                            ", partId=" + entry.partition() + ']');
+
+                        // Remove first BUFF_SIZE keys.
+                        int keysToRemove = BUF_SIZE;
+
+                        Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pending.entrySet().iterator();
+
+                        while (iter.hasNext() && keysToRemove > 0) {
+                            CacheContinuousQueryEntry entry0 = iter.next().getValue();
+
+                            // Discard messages on backup and send to client if primary.
+                            if (!backup)
+                                res = addResult(res, entry0.copyWithDataReset(), backup);
+
+                            iter.remove();
+                            pendingCurrSize.decrementAndGet();
+                            keysToRemove--;
+                        }
+                    }
+                }
+            }
+
             break;
         }
 
         Batch batch0 = curBatch.get();
 
+        // Batch has been changed on entry processing to the new one.
         if (batch0 != batch) {
             do {
                 batch = batch0;
 
                 res = processPending(res, batch, backup);
 
-                batch0 = initBatch(entry.topologyVersion(), backup);
+                batch0 = curBatch.get();
             }
             while (batch != batch0);
         }
@@ -230,22 +296,28 @@ public class CacheContinuousQueryEventBuffer {
      * @return New result.
      */
     @Nullable private Object processPending(@Nullable Object res, Batch batch, boolean backup) {
-        if (pending.floorKey(batch.endCntr) != null) {
+        if (pending.floorKey(batch.endCntr) == null)
+            return res;
+
+        synchronized (pending) {
             for (Map.Entry<Long, CacheContinuousQueryEntry> p : pending.headMap(batch.endCntr, true).entrySet()) {
                 long cntr = p.getKey();
 
                 assert cntr <= batch.endCntr;
 
-                if (pending.remove(p.getKey()) != null) {
-                    if (cntr < batch.startCntr)
-                        res = addResult(res, p.getValue(), backup);
-                    else
-                        res = batch.processEntry0(res, p.getKey(), p.getValue(), backup);
-                }
+                if (pending.remove(cntr) == null)
+                    continue;
+
+                if (cntr < batch.startCntr)
+                    res = addResult(res, p.getValue(), backup);
+                else
+                    res = batch.processEntry0(res, p.getKey(), p.getValue(), backup);
+
+                pendingCurrSize.decrementAndGet();
             }
-        }
 
-        return res;
+            return res;
+        }
     }
 
     /**
@@ -439,6 +511,7 @@ public class CacheContinuousQueryEventBuffer {
                 entries[pos] = entry;
 
                 int next = lastProc + 1;
+                long ackedUpdCntr0 = ackedUpdCntr.get();
 
                 if (next == pos) {
                     for (int i = next; i < entries.length; i++) {
@@ -463,26 +536,52 @@ public class CacheContinuousQueryEventBuffer {
 
                     lastProc = pos;
 
-                    if (pos == entries.length - 1) {
-                        Arrays.fill(entries, null);
+                    if (pos == entries.length - 1)
+                        rollOver(startCntr + BUF_SIZE, filtered, entry.topologyVersion());
+                }
+                else if (endCntr < ackedUpdCntr0)
+                    rollOver(ackedUpdCntr0 + 1, 0, entry.topologyVersion());
+
+                return res;
+            }
+        }
 
-                        Batch nextBatch = new Batch(this.startCntr + BUF_SIZE,
-                            filtered,
-                            entries,
-                            entry.topologyVersion());
+        /**
+         * @param topVer Topology version of current processing entry.
+         */
+        private synchronized Object tryRollOver(AffinityTopologyVersion topVer) {
+            if (entries == null)
+                return RETRY;
 
-                        entries = null;
+            long ackedUpdCntr0 = ackedUpdCntr.get();
 
-                        assert curBatch.get() == this;
+            if (endCntr < ackedUpdCntr0) {
+                rollOver(ackedUpdCntr0 + 1, 0, topVer);
 
-                        curBatch.set(nextBatch);
-                    }
-                }
-                else
-                    return res;
+                return RETRY;
             }
 
-            return res;
+            return null;
+        }
+
+        /**
+         * @param startCntr Start batch position.
+         * @param filtered Number of filtered entries prior start position.
+         * @param topVer Next topology version based on cache entry.
+         */
+        private void rollOver(long startCntr, long filtered, AffinityTopologyVersion topVer) {
+            Arrays.fill(entries, null);
+
+            Batch nextBatch = new Batch(startCntr,
+                filtered,
+                entries,
+                topVer);
+
+            entries = null;
+
+            boolean changed = curBatch.compareAndSet(this, nextBatch);
+
+            assert changed;
         }
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index cc672c6..8a54a1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -1236,11 +1236,11 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
      * @param part Partition.
      * @return Event buffer.
      */
-    private CacheContinuousQueryEventBuffer partitionBuffer(final GridCacheContext cctx, int part) {
+    CacheContinuousQueryEventBuffer partitionBuffer(final GridCacheContext cctx, int part) {
         CacheContinuousQueryEventBuffer buf = entryBufs.get(part);
 
         if (buf == null) {
-            buf = new CacheContinuousQueryEventBuffer(part) {
+            buf = new CacheContinuousQueryEventBuffer(part, ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY)) {
                 @Override protected long currentPartitionCounter(boolean backup) {
                     GridDhtLocalPartition locPart = cctx.topology().localPartition(part, null, false);
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBufferLimitTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBufferLimitTest.java
new file mode 100644
index 0000000..9c6845f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBufferLimitTest.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.cache.configuration.FactoryBuilder;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.GridAtomicLong;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.systemview.view.ContinuousQueryView;
+import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEventBuffer.MAX_PENDING_BUFF_SIZE;
+import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.CQ_SYS_VIEW;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class CacheContinuousQueryBufferLimitTest extends GridCommonAbstractTest {
+    /** Cache partitions count. */
+    private static final int PARTS = 1;
+
+    /** Total number of cache keys. */
+    private static final int TOTAL_KEYS = 1024;
+
+    /** Maximum of keys processed by CQ to check buffer being overflowed. */
+    private static final long OVERFLOW_KEYS_COUNT = MAX_PENDING_BUFF_SIZE * 3;
+
+    /** Default remote no-op filter. */
+    private static final CacheEntryEventSerializableFilter<Integer, Integer> RMT_FILTER = e -> true;
+
+    /** Counter of cache messages being send. */
+    private final AtomicInteger msgCntr = new AtomicInteger();
+
+    /** Cache mode. */
+    @Parameterized.Parameter(0)
+    public CacheMode cacheMode;
+
+    /** Cache atomicity mode. */
+    @Parameterized.Parameter(1)
+    public CacheAtomicityMode atomicityMode;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "cacheMode={0}, atomicityMode={1}")
+    public static Collection<?> parameters() {
+        return Arrays.asList(new Object[][] {
+            {REPLICATED, ATOMIC},
+            {PARTITIONED, ATOMIC},
+            {REPLICATED, TRANSACTIONAL},
+            {PARTITIONED, TRANSACTIONAL}
+        });
+    }
+
+    /**
+     * Local pending limit for this test is less than MAX_PENDING_BUFF_SIZE,
+     * so pending entries must be cleaned prior to reaching it.
+     *
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testContinuousQueryBatchSwitchOnAck() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            cachePutOperationRequestMessage(msg) && msgCntr.getAndIncrement() == 10, MAX_PENDING_BUFF_SIZE / 10);
+    }
+
+    /**
+     * The test blocks switching current CacheContinuousQueryEventBuffer.Batch to the new one, so
+     * pending entries will be processed (dropped on backups and send to the client on primaries)
+     * when the MAX_PENDING_BUFF_SIZE is reached.
+     *
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testContinuousQueryPendingBufferLimit() throws Exception {
+        doTestContinuousQueryPendingBufferLimit((n, msg) ->
+            (cachePutOperationRequestMessage(msg) && msgCntr.getAndIncrement() == 10) ||
+                msg instanceof CacheContinuousQueryBatchAck, (int)(MAX_PENDING_BUFF_SIZE * 1.2));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testPendingSendToClientOnLimitReached() throws Exception {
+        AtomicInteger keys = new AtomicInteger();
+        AtomicReference<String> err = new AtomicReference<>();
+
+        IgniteEx srv = startGrids(2);
+        IgniteEx clnt = startClientGrid();
+
+        IgniteCache<Integer, Integer> cache = clnt.cache(DEFAULT_CACHE_NAME);
+        CacheEntryEventSerializableFilter<Integer, Integer> filter = evt -> evt.getKey() % 2 == 0;
+
+        ContinuousQuery<Integer, Integer> cq = new ContinuousQuery<>();
+        cq.setRemoteFilterFactory(FactoryBuilder.factoryOf(filter));
+        cq.setLocalListener((events) -> events.forEach(e -> {
+            if (!filter.evaluate(e))
+                err.compareAndSet(null, "Key must be filtered [e=" + e + ']');
+        }));
+        cq.setLocal(false);
+
+        spi(srv).blockMessages((nodeId, msg) -> (cachePutOperationRequestMessage(msg) && msgCntr.getAndIncrement() == 7) ||
+            msg instanceof CacheContinuousQueryBatchAck);
+
+        IgniteInternalFuture<?> loadFut = null;
+
+        try (QueryCursor<?> qry = cache.query(cq)) {
+            awaitPartitionMapExchange();
+
+            loadFut = GridTestUtils.runMultiThreadedAsync(() -> {
+                while (!Thread.currentThread().isInterrupted())
+                    cache.put(keys.incrementAndGet(), 0);
+            }, 6, "cq-put-");
+
+            // Entries are checked by CacheEntryUpdatedListener which has been set to CQ. Check that all
+            // entries greater than pending limit filtered correctly (entries are sent to client on buffer overflow).
+            boolean await = waitForCondition(() -> keys.get() > OVERFLOW_KEYS_COUNT, 30_000);
+
+            assertTrue("Number of keys to put must reach the limit [keys=" + keys.get() +
+                ", limit=" + OVERFLOW_KEYS_COUNT + ']', await);
+        }
+        finally {
+            TestRecordingCommunicationSpi.stopBlockAll();
+
+            if (loadFut != null)
+                loadFut.cancel();
+        }
+
+        if (err.get() != null)
+            throw new Exception(err.get());
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setCommunicationSpi(new TestRecordingCommunicationSpi())
+            .setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+                .setAtomicityMode(atomicityMode)
+                .setCacheMode(cacheMode)
+                .setBackups(1)
+                .setAffinity(new RendezvousAffinityFunction(false, PARTS)));
+    }
+
+    /** */
+    @Before
+    public void resetMessageCounter() {
+        msgCntr.set(0);
+    }
+
+    /** */
+    @After
+    public void stopAllInstances() {
+        stopAllGrids();
+    }
+
+    /**
+     * @param locBlockPred Block predicate on local node to emulate message delivery issues.
+     * @param pendingLimit Test limit of pending entries.
+     * @throws Exception If fails.
+     */
+    private void doTestContinuousQueryPendingBufferLimit(
+        IgniteBiPredicate<ClusterNode, Message> locBlockPred,
+        int pendingLimit
+    ) throws Exception
+    {
+        AtomicInteger keys = new AtomicInteger();
+
+        IgniteEx locIgnite = startGrid(0);
+        IgniteEx rmtIgnite = startGrid(1);
+
+        IgniteCache<Integer, Integer> cache = locIgnite.cache(DEFAULT_CACHE_NAME);
+        CacheConfiguration<Integer, Integer> ccfg = cache.getConfiguration(CacheConfiguration.class);
+
+        for (int i = 0; i < TOTAL_KEYS; i++)
+            cache.put(i, i);
+
+        assertEquals(PARTS, ccfg.getAffinity().partitions());
+
+        GridAtomicLong lastAcked = new GridAtomicLong();
+
+        ContinuousQuery<Integer, Integer> cq = new ContinuousQuery<>();
+        cq.setRemoteFilterFactory(FactoryBuilder.factoryOf(RMT_FILTER));
+        cq.setLocalListener((events) ->
+            events.forEach(e ->
+                lastAcked.setIfGreater(((CacheQueryEntryEvent<?, ?>)e).getPartitionUpdateCounter())));
+        cq.setLocal(false);
+
+        IgniteInternalFuture<?> updFut = null;
+
+        try (QueryCursor<?> qry = locIgnite.cache(DEFAULT_CACHE_NAME).query(cq)) {
+            awaitPartitionMapExchange();
+
+            // Partition Id, Update Counter, Continuous Entry.
+            ConcurrentMap<Long, CacheContinuousQueryEntry> pending =
+                getContinuousQueryPendingBuffer(rmtIgnite, CU.cacheId(DEFAULT_CACHE_NAME), 0);
+
+            spi(locIgnite).blockMessages(locBlockPred);
+
+            updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+                while (keys.get() <= OVERFLOW_KEYS_COUNT)
+                    cache.put(keys.incrementAndGet(), 0);
+            }, 3, "cq-put-");
+
+            assertNotNull("Partition remote buffers must be inited", pending);
+
+            log.warning("Waiting for pending buffer being overflowed within " + OVERFLOW_KEYS_COUNT +
+                " number of keys.");
+
+            boolean await = waitForCondition(() -> pending.size() > pendingLimit, () -> keys.get() <= OVERFLOW_KEYS_COUNT);
+
+            assertFalse("Pending buffer exceeded the limit despite entries have been acked " +
+                    "[lastAcked=" + lastAcked + ", pending=" + S.compact(pending.keySet(), i -> i + 1) + ']',
+                await);
+        }
+        finally {
+            spi(locIgnite).stopBlock();
+
+            if (updFut != null)
+                updFut.cancel();
+        }
+    }
+
+    /**
+     * @param msg Cache message.
+     * @return {@code true} if message is initial for cache operation.
+     */
+    private boolean cachePutOperationRequestMessage(Message msg) {
+        switch (atomicityMode) {
+            case ATOMIC:
+                return msg instanceof GridDhtAtomicUpdateRequest;
+
+            case TRANSACTIONAL:
+                return msg instanceof GridDhtTxPrepareRequest;
+
+            default:
+                throw new IgniteException("Unsupported atomicity mode: " + atomicityMode);
+        }
+    }
+
+    /**
+     * @param ignite Ignite remote instance.
+     * @param routineId Routine id.
+     * @return Registered handler.
+     */
+    private static <K, V> CacheContinuousQueryHandler<K, V> getRemoteContinuousQueryHandler(
+        IgniteEx ignite,
+        UUID routineId
+    ) {
+        GridContinuousProcessor contProc = ignite.context().continuous();
+
+        ConcurrentMap<UUID, GridContinuousProcessor.RemoteRoutineInfo> rmtInfos =
+            getFieldValue(contProc, GridContinuousProcessor.class, "rmtInfos");
+
+        return rmtInfos.get(routineId) == null ?
+            null : (CacheContinuousQueryHandler<K, V>)rmtInfos.get(routineId).handler();
+    }
+
+    /**
+     * @param ignite Ignite remote instance.
+     * @param cacheId Cache id.
+     * @param partId Partition id.
+     * @return Map of pending entries.
+     */
+    private static ConcurrentMap<Long, CacheContinuousQueryEntry> getContinuousQueryPendingBuffer(
+        IgniteEx ignite,
+        int cacheId,
+        int partId
+    ) {
+        SystemView<ContinuousQueryView> rmtQryView = ignite.context().systemView().view(CQ_SYS_VIEW);
+        assertEquals(1, rmtQryView.size());
+
+        UUID routineId = rmtQryView.iterator().next().routineId();
+
+        CacheContinuousQueryHandler<?, ?> hnd = getRemoteContinuousQueryHandler(ignite, routineId);
+        GridCacheContext<?, ?> cctx = ignite.context().cache().context().cacheContext(cacheId);
+        CacheContinuousQueryEventBuffer buff = hnd.partitionBuffer(cctx, partId);
+
+        return getFieldValue(buff, CacheContinuousQueryEventBuffer.class, "pending");
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index 58f0a11..43f5fb3 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -64,6 +64,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BooleanSupplier;
 import javax.cache.CacheException;
 import javax.cache.configuration.Factory;
 import javax.management.Attribute;
@@ -1903,19 +1904,24 @@ public final class GridTestUtils {
      * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If interrupted.
      */
     public static boolean waitForCondition(GridAbsPredicate cond, long timeout) throws IgniteInterruptedCheckedException {
-        long curTime = U.currentTimeMillis();
-        long endTime = curTime + timeout;
+        long endTime = U.currentTimeMillis() + timeout;
+        long endTime0 = endTime < 0 ? Long.MAX_VALUE : endTime;
 
-        if (endTime < 0)
-            endTime = Long.MAX_VALUE;
+        return waitForCondition(cond, () -> U.currentTimeMillis() < endTime0);
+    }
 
-        while (curTime < endTime) {
+    /**
+     * @param cond Condition to wait for.
+     * @param wait Wait predicate.
+     * @return {@code true} if condition was achieved, {@code false} otherwise.
+     * @throws IgniteInterruptedCheckedException If interrupted.
+     */
+    public static boolean waitForCondition(GridAbsPredicate cond, BooleanSupplier wait) throws IgniteInterruptedCheckedException {
+        while (wait.getAsBoolean()) {
             if (cond.apply())
                 return true;
 
             U.sleep(DFLT_BUSYWAIT_SLEEP_INTERVAL);
-
-            curTime = U.currentTimeMillis();
         }
 
         return false;
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
index 26edb91..0505a14 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.testsuites;
 
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchForceServerModeAckTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryBufferLimitTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryConcurrentPartitionUpdateTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryCounterPartitionedAtomicTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryCounterPartitionedTxTest;
@@ -47,7 +48,8 @@ import org.junit.runners.Suite;
  */
 @RunWith(Suite.class)
 @Suite.SuiteClasses({
-    // Continuous queries 1.
+    CacheContinuousQueryBufferLimitTest.class,
+
     GridCacheContinuousQueryNodesFilteringTest.class,
     GridCacheContinuousQueryPartitionTxOneNodeTest.class,
     CacheContinuousWithTransformerReplicatedSelfTest.class,