You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by mm...@apache.org on 2020/07/10 17:06:38 UTC
[ignite] branch master updated: IGNITE-10959: Add continuous query
pending buffer limit (#7881)
This is an automated email from the ASF dual-hosted git repository.
mmuzaf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new e730185 IGNITE-10959: Add continuous query pending buffer limit (#7881)
e730185 is described below
commit e730185565a75fa338496814f0eaec1d1315d7c9
Author: Maxim Muzafarov <mm...@apache.org>
AuthorDate: Fri Jul 10 20:06:11 2020 +0300
IGNITE-10959: Add continuous query pending buffer limit (#7881)
---
.../CacheContinuousQueryEventBuffer.java | 167 ++++++++---
.../continuous/CacheContinuousQueryHandler.java | 4 +-
.../CacheContinuousQueryBufferLimitTest.java | 334 +++++++++++++++++++++
.../apache/ignite/testframework/GridTestUtils.java | 20 +-
.../testsuites/IgniteCacheQuerySelfTestSuite3.java | 4 +-
5 files changed, 485 insertions(+), 44 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
index 4b45d72..5807ccf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
@@ -26,9 +26,13 @@ import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.GridAtomicLong;
+import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.util.deque.FastSizeDeque;
import org.jetbrains.annotations.Nullable;
@@ -36,30 +40,55 @@ import org.jetbrains.annotations.Nullable;
*
*/
public class CacheContinuousQueryEventBuffer {
- /** */
+ /** Maximum size of buffer for pending events. Default value is {@code 10_000}. */
+ public static final int MAX_PENDING_BUFF_SIZE =
+ IgniteSystemProperties.getInteger("IGNITE_CONTINUOUS_QUERY_PENDING_BUFF_SIZE", 10_000);
+
+ /** Batch buffer size. */
private static final int BUF_SIZE =
IgniteSystemProperties.getInteger("IGNITE_CONTINUOUS_QUERY_SERVER_BUFFER_SIZE", 1000);
/** */
private static final Object RETRY = new Object();
- /** */
- protected final int part;
+ /** Continuous query category logger. */
+ private final IgniteLogger log;
/** */
- private AtomicReference<Batch> curBatch = new AtomicReference<>();
+ private final int part;
- /** */
- private FastSizeDeque<CacheContinuousQueryEntry> backupQ = new FastSizeDeque<>(new ConcurrentLinkedDeque<>());
+ /** Batch of entries currently being collected to send to the remote. */
+ private final AtomicReference<Batch> curBatch = new AtomicReference<>();
- /** */
- private ConcurrentSkipListMap<Long, CacheContinuousQueryEntry> pending = new ConcurrentSkipListMap<>();
+ /** Queue for keeping backup entries which partition counter less the counter processing by current batch. */
+ private final FastSizeDeque<CacheContinuousQueryEntry> backupQ = new FastSizeDeque<>(new ConcurrentLinkedDeque<>());
+
+ /** Entries which are waiting for being processed. */
+ private final ConcurrentSkipListMap<Long, CacheContinuousQueryEntry> pending = new ConcurrentSkipListMap<>();
+
+ /**
+ * The size method of the pending ConcurrentSkipListMap is not a constant-time operation. Since each
+ * entry processed under the GridCacheMapEntry lock it's necessary to maintain the size of map explicitly.
+ */
+ private final AtomicInteger pendingCurrSize = new AtomicInteger();
+
+ /** Last seen ack partition counter tracked by the CQ handler partition recovery queue. */
+ private final GridAtomicLong ackedUpdCntr = new GridAtomicLong(0);
/**
* @param part Partition number.
+ * @param log Continuous query category logger.
*/
- CacheContinuousQueryEventBuffer(int part) {
+ CacheContinuousQueryEventBuffer(int part, IgniteLogger log) {
this.part = part;
+ this.log = log;
+ }
+
+ /**
+ * @param part Partition number.
+ */
+ CacheContinuousQueryEventBuffer(int part) {
+ this(part, null);
}
/**
@@ -74,6 +103,8 @@ public class CacheContinuousQueryEventBuffer {
if (backupEntry.updateCounter() <= updateCntr)
it.remove();
}
+
+ ackedUpdCntr.setIfGreater(updateCntr);
}
/**
@@ -154,6 +185,7 @@ public class CacheContinuousQueryEventBuffer {
Object res = null;
for (;;) {
+ // Set batch only if batch is null (first attempt).
batch = initBatch(entry.topologyVersion(), backup);
if (batch == null || cntr < batch.startCntr) {
@@ -172,21 +204,55 @@ public class CacheContinuousQueryEventBuffer {
if (res == RETRY)
continue;
}
- else
+ else {
+ if (batch.endCntr < ackedUpdCntr.get() && batch.tryRollOver(entry.topologyVersion()) == RETRY)
+ continue;
+
+ pendingCurrSize.incrementAndGet();
pending.put(cntr, entry);
+ if (pendingCurrSize.get() > MAX_PENDING_BUFF_SIZE) {
+ synchronized (pending) {
+ if (pendingCurrSize.get() <= MAX_PENDING_BUFF_SIZE)
+ break;
+
+ LT.warn(log, "Buffer for pending events reached max of its size " +
+ "[cacheId=" + entry.cacheId() + ", maxSize=" + MAX_PENDING_BUFF_SIZE +
+ ", partId=" + entry.partition() + ']');
+
+ // Remove first BUFF_SIZE keys.
+ int keysToRemove = BUF_SIZE;
+
+ Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pending.entrySet().iterator();
+
+ while (iter.hasNext() && keysToRemove > 0) {
+ CacheContinuousQueryEntry entry0 = iter.next().getValue();
+
+ // Discard messages on backup and send to client if primary.
+ if (!backup)
+ res = addResult(res, entry0.copyWithDataReset(), backup);
+
+ iter.remove();
+ pendingCurrSize.decrementAndGet();
+ keysToRemove--;
+ }
+ }
+ }
+ }
+
break;
}
Batch batch0 = curBatch.get();
+ // Batch has been changed on entry processing to the new one.
if (batch0 != batch) {
do {
batch = batch0;
res = processPending(res, batch, backup);
- batch0 = initBatch(entry.topologyVersion(), backup);
+ batch0 = curBatch.get();
}
while (batch != batch0);
}
@@ -230,22 +296,28 @@ public class CacheContinuousQueryEventBuffer {
* @return New result.
*/
@Nullable private Object processPending(@Nullable Object res, Batch batch, boolean backup) {
- if (pending.floorKey(batch.endCntr) != null) {
+ if (pending.floorKey(batch.endCntr) == null)
+ return res;
+
+ synchronized (pending) {
for (Map.Entry<Long, CacheContinuousQueryEntry> p : pending.headMap(batch.endCntr, true).entrySet()) {
long cntr = p.getKey();
assert cntr <= batch.endCntr;
- if (pending.remove(p.getKey()) != null) {
- if (cntr < batch.startCntr)
- res = addResult(res, p.getValue(), backup);
- else
- res = batch.processEntry0(res, p.getKey(), p.getValue(), backup);
- }
+ if (pending.remove(cntr) == null)
+ continue;
+
+ if (cntr < batch.startCntr)
+ res = addResult(res, p.getValue(), backup);
+ else
+ res = batch.processEntry0(res, p.getKey(), p.getValue(), backup);
+
+ pendingCurrSize.decrementAndGet();
}
- }
- return res;
+ return res;
+ }
}
/**
@@ -439,6 +511,7 @@ public class CacheContinuousQueryEventBuffer {
entries[pos] = entry;
int next = lastProc + 1;
+ long ackedUpdCntr0 = ackedUpdCntr.get();
if (next == pos) {
for (int i = next; i < entries.length; i++) {
@@ -463,26 +536,52 @@ public class CacheContinuousQueryEventBuffer {
lastProc = pos;
- if (pos == entries.length - 1) {
- Arrays.fill(entries, null);
+ if (pos == entries.length - 1)
+ rollOver(startCntr + BUF_SIZE, filtered, entry.topologyVersion());
+ }
+ else if (endCntr < ackedUpdCntr0)
+ rollOver(ackedUpdCntr0 + 1, 0, entry.topologyVersion());
+
+ return res;
+ }
+ }
- Batch nextBatch = new Batch(this.startCntr + BUF_SIZE,
- filtered,
- entries,
- entry.topologyVersion());
+ /**
+ * @param topVer Topology version of current processing entry.
+ */
+ private synchronized Object tryRollOver(AffinityTopologyVersion topVer) {
+ if (entries == null)
+ return RETRY;
- entries = null;
+ long ackedUpdCntr0 = ackedUpdCntr.get();
- assert curBatch.get() == this;
+ if (endCntr < ackedUpdCntr0) {
+ rollOver(ackedUpdCntr0 + 1, 0, topVer);
- curBatch.set(nextBatch);
- }
- }
- else
- return res;
+ return RETRY;
}
- return res;
+ return null;
+ }
+
+ /**
+ * @param startCntr Start batch position.
+ * @param filtered Number of filtered entries prior start position.
+ * @param topVer Next topology version based on cache entry.
+ */
+ private void rollOver(long startCntr, long filtered, AffinityTopologyVersion topVer) {
+ Arrays.fill(entries, null);
+
+ Batch nextBatch = new Batch(startCntr,
+ filtered,
+ entries,
+ topVer);
+
+ entries = null;
+
+ boolean changed = curBatch.compareAndSet(this, nextBatch);
+
+ assert changed;
}
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index cc672c6..8a54a1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -1236,11 +1236,11 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
* @param part Partition.
* @return Event buffer.
*/
- private CacheContinuousQueryEventBuffer partitionBuffer(final GridCacheContext cctx, int part) {
+ CacheContinuousQueryEventBuffer partitionBuffer(final GridCacheContext cctx, int part) {
CacheContinuousQueryEventBuffer buf = entryBufs.get(part);
if (buf == null) {
- buf = new CacheContinuousQueryEventBuffer(part) {
+ buf = new CacheContinuousQueryEventBuffer(part, ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY)) {
@Override protected long currentPartitionCounter(boolean backup) {
GridDhtLocalPartition locPart = cctx.topology().localPartition(part, null, false);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBufferLimitTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBufferLimitTest.java
new file mode 100644
index 0000000..9c6845f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBufferLimitTest.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.cache.configuration.FactoryBuilder;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.GridAtomicLong;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.systemview.view.ContinuousQueryView;
+import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEventBuffer.MAX_PENDING_BUFF_SIZE;
+import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.CQ_SYS_VIEW;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class CacheContinuousQueryBufferLimitTest extends GridCommonAbstractTest {
+ /** Cache partitions count. */
+ private static final int PARTS = 1;
+
+ /** Total number of cache keys. */
+ private static final int TOTAL_KEYS = 1024;
+
+ /** Maximum of keys processed by CQ to check buffer being overflowed. */
+ private static final long OVERFLOW_KEYS_COUNT = MAX_PENDING_BUFF_SIZE * 3;
+
+ /** Default remote no-op filter. */
+ private static final CacheEntryEventSerializableFilter<Integer, Integer> RMT_FILTER = e -> true;
+
+ /** Counter of cache messages being send. */
+ private final AtomicInteger msgCntr = new AtomicInteger();
+
+ /** Cache mode. */
+ @Parameterized.Parameter(0)
+ public CacheMode cacheMode;
+
+ /** Cache atomicity mode. */
+ @Parameterized.Parameter(1)
+ public CacheAtomicityMode atomicityMode;
+
+ /** @return Test parameters. */
+ @Parameterized.Parameters(name = "cacheMode={0}, atomicityMode={1}")
+ public static Collection<?> parameters() {
+ return Arrays.asList(new Object[][] {
+ {REPLICATED, ATOMIC},
+ {PARTITIONED, ATOMIC},
+ {REPLICATED, TRANSACTIONAL},
+ {PARTITIONED, TRANSACTIONAL}
+ });
+ }
+
+ /**
+ * Local pending limit for this test is less than MAX_PENDING_BUFF_SIZE,
+ * so pending entries must be cleaned prior to reaching it.
+ *
+ * @throws Exception If fails.
+ */
+ @Test
+ public void testContinuousQueryBatchSwitchOnAck() throws Exception {
+ doTestContinuousQueryPendingBufferLimit((n, msg) ->
+ cachePutOperationRequestMessage(msg) && msgCntr.getAndIncrement() == 10, MAX_PENDING_BUFF_SIZE / 10);
+ }
+
+ /**
+ * The test blocks switching current CacheContinuousQueryEventBuffer.Batch to the new one, so
+ * pending entries will be processed (dropped on backups and send to the client on primaries)
+ * when the MAX_PENDING_BUFF_SIZE is reached.
+ *
+ * @throws Exception If fails.
+ */
+ @Test
+ public void testContinuousQueryPendingBufferLimit() throws Exception {
+ doTestContinuousQueryPendingBufferLimit((n, msg) ->
+ (cachePutOperationRequestMessage(msg) && msgCntr.getAndIncrement() == 10) ||
+ msg instanceof CacheContinuousQueryBatchAck, (int)(MAX_PENDING_BUFF_SIZE * 1.2));
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testPendingSendToClientOnLimitReached() throws Exception {
+ AtomicInteger keys = new AtomicInteger();
+ AtomicReference<String> err = new AtomicReference<>();
+
+ IgniteEx srv = startGrids(2);
+ IgniteEx clnt = startClientGrid();
+
+ IgniteCache<Integer, Integer> cache = clnt.cache(DEFAULT_CACHE_NAME);
+ CacheEntryEventSerializableFilter<Integer, Integer> filter = evt -> evt.getKey() % 2 == 0;
+
+ ContinuousQuery<Integer, Integer> cq = new ContinuousQuery<>();
+ cq.setRemoteFilterFactory(FactoryBuilder.factoryOf(filter));
+ cq.setLocalListener((events) -> events.forEach(e -> {
+ if (!filter.evaluate(e))
+ err.compareAndSet(null, "Key must be filtered [e=" + e + ']');
+ }));
+ cq.setLocal(false);
+
+ spi(srv).blockMessages((nodeId, msg) -> (cachePutOperationRequestMessage(msg) && msgCntr.getAndIncrement() == 7) ||
+ msg instanceof CacheContinuousQueryBatchAck);
+
+ IgniteInternalFuture<?> loadFut = null;
+
+ try (QueryCursor<?> qry = cache.query(cq)) {
+ awaitPartitionMapExchange();
+
+ loadFut = GridTestUtils.runMultiThreadedAsync(() -> {
+ while (!Thread.currentThread().isInterrupted())
+ cache.put(keys.incrementAndGet(), 0);
+ }, 6, "cq-put-");
+
+ // Entries are checked by CacheEntryUpdatedListener which has been set to CQ. Check that all
+ // entries greater than pending limit filtered correctly (entries are sent to client on buffer overflow).
+ boolean await = waitForCondition(() -> keys.get() > OVERFLOW_KEYS_COUNT, 30_000);
+
+ assertTrue("Number of keys to put must reach the limit [keys=" + keys.get() +
+ ", limit=" + OVERFLOW_KEYS_COUNT + ']', await);
+ }
+ finally {
+ TestRecordingCommunicationSpi.stopBlockAll();
+
+ if (loadFut != null)
+ loadFut.cancel();
+ }
+
+ if (err.get() != null)
+ throw new Exception(err.get());
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setCommunicationSpi(new TestRecordingCommunicationSpi())
+ .setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+ .setAtomicityMode(atomicityMode)
+ .setCacheMode(cacheMode)
+ .setBackups(1)
+ .setAffinity(new RendezvousAffinityFunction(false, PARTS)));
+ }
+
+ /** */
+ @Before
+ public void resetMessageCounter() {
+ msgCntr.set(0);
+ }
+
+ /** */
+ @After
+ public void stopAllInstances() {
+ stopAllGrids();
+ }
+
+ /**
+ * @param locBlockPred Block predicate on local node to emulate message delivery issues.
+ * @param pendingLimit Test limit of pending entries.
+ * @throws Exception If fails.
+ */
+ private void doTestContinuousQueryPendingBufferLimit(
+ IgniteBiPredicate<ClusterNode, Message> locBlockPred,
+ int pendingLimit
+ ) throws Exception
+ {
+ AtomicInteger keys = new AtomicInteger();
+
+ IgniteEx locIgnite = startGrid(0);
+ IgniteEx rmtIgnite = startGrid(1);
+
+ IgniteCache<Integer, Integer> cache = locIgnite.cache(DEFAULT_CACHE_NAME);
+ CacheConfiguration<Integer, Integer> ccfg = cache.getConfiguration(CacheConfiguration.class);
+
+ for (int i = 0; i < TOTAL_KEYS; i++)
+ cache.put(i, i);
+
+ assertEquals(PARTS, ccfg.getAffinity().partitions());
+
+ GridAtomicLong lastAcked = new GridAtomicLong();
+
+ ContinuousQuery<Integer, Integer> cq = new ContinuousQuery<>();
+ cq.setRemoteFilterFactory(FactoryBuilder.factoryOf(RMT_FILTER));
+ cq.setLocalListener((events) ->
+ events.forEach(e ->
+ lastAcked.setIfGreater(((CacheQueryEntryEvent<?, ?>)e).getPartitionUpdateCounter())));
+ cq.setLocal(false);
+
+ IgniteInternalFuture<?> updFut = null;
+
+ try (QueryCursor<?> qry = locIgnite.cache(DEFAULT_CACHE_NAME).query(cq)) {
+ awaitPartitionMapExchange();
+
+ // Partition Id, Update Counter, Continuous Entry.
+ ConcurrentMap<Long, CacheContinuousQueryEntry> pending =
+ getContinuousQueryPendingBuffer(rmtIgnite, CU.cacheId(DEFAULT_CACHE_NAME), 0);
+
+ spi(locIgnite).blockMessages(locBlockPred);
+
+ updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+ while (keys.get() <= OVERFLOW_KEYS_COUNT)
+ cache.put(keys.incrementAndGet(), 0);
+ }, 3, "cq-put-");
+
+ assertNotNull("Partition remote buffers must be inited", pending);
+
+ log.warning("Waiting for pending buffer being overflowed within " + OVERFLOW_KEYS_COUNT +
+ " number of keys.");
+
+ boolean await = waitForCondition(() -> pending.size() > pendingLimit, () -> keys.get() <= OVERFLOW_KEYS_COUNT);
+
+ assertFalse("Pending buffer exceeded the limit despite entries have been acked " +
+ "[lastAcked=" + lastAcked + ", pending=" + S.compact(pending.keySet(), i -> i + 1) + ']',
+ await);
+ }
+ finally {
+ spi(locIgnite).stopBlock();
+
+ if (updFut != null)
+ updFut.cancel();
+ }
+ }
+
+ /**
+ * @param msg Cache message.
+ * @return {@code true} if message is initial for cache operation.
+ */
+ private boolean cachePutOperationRequestMessage(Message msg) {
+ switch (atomicityMode) {
+ case ATOMIC:
+ return msg instanceof GridDhtAtomicUpdateRequest;
+
+ case TRANSACTIONAL:
+ return msg instanceof GridDhtTxPrepareRequest;
+
+ default:
+ throw new IgniteException("Unsupported atomicity mode: " + atomicityMode);
+ }
+ }
+
+ /**
+ * @param ignite Ignite remote instance.
+ * @param routineId Routine id.
+ * @return Registered handler.
+ */
+ private static <K, V> CacheContinuousQueryHandler<K, V> getRemoteContinuousQueryHandler(
+ IgniteEx ignite,
+ UUID routineId
+ ) {
+ GridContinuousProcessor contProc = ignite.context().continuous();
+
+ ConcurrentMap<UUID, GridContinuousProcessor.RemoteRoutineInfo> rmtInfos =
+ getFieldValue(contProc, GridContinuousProcessor.class, "rmtInfos");
+
+ return rmtInfos.get(routineId) == null ?
+ null : (CacheContinuousQueryHandler<K, V>)rmtInfos.get(routineId).handler();
+ }
+
+ /**
+ * @param ignite Ignite remote instance.
+ * @param cacheId Cache id.
+ * @param partId Partition id.
+ * @return Map of pending entries.
+ */
+ private static ConcurrentMap<Long, CacheContinuousQueryEntry> getContinuousQueryPendingBuffer(
+ IgniteEx ignite,
+ int cacheId,
+ int partId
+ ) {
+ SystemView<ContinuousQueryView> rmtQryView = ignite.context().systemView().view(CQ_SYS_VIEW);
+ assertEquals(1, rmtQryView.size());
+
+ UUID routineId = rmtQryView.iterator().next().routineId();
+
+ CacheContinuousQueryHandler<?, ?> hnd = getRemoteContinuousQueryHandler(ignite, routineId);
+ GridCacheContext<?, ?> cctx = ignite.context().cache().context().cacheContext(cacheId);
+ CacheContinuousQueryEventBuffer buff = hnd.partitionBuffer(cctx, partId);
+
+ return getFieldValue(buff, CacheContinuousQueryEventBuffer.class, "pending");
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index 58f0a11..43f5fb3 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -64,6 +64,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BooleanSupplier;
import javax.cache.CacheException;
import javax.cache.configuration.Factory;
import javax.management.Attribute;
@@ -1903,19 +1904,24 @@ public final class GridTestUtils {
* @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If interrupted.
*/
public static boolean waitForCondition(GridAbsPredicate cond, long timeout) throws IgniteInterruptedCheckedException {
- long curTime = U.currentTimeMillis();
- long endTime = curTime + timeout;
+ long endTime = U.currentTimeMillis() + timeout;
+ long endTime0 = endTime < 0 ? Long.MAX_VALUE : endTime;
- if (endTime < 0)
- endTime = Long.MAX_VALUE;
+ return waitForCondition(cond, () -> U.currentTimeMillis() < endTime0);
+ }
- while (curTime < endTime) {
+ /**
+ * @param cond Condition to wait for.
+ * @param wait Wait predicate.
+ * @return {@code true} if condition was achieved, {@code false} otherwise.
+ * @throws IgniteInterruptedCheckedException If interrupted.
+ */
+ public static boolean waitForCondition(GridAbsPredicate cond, BooleanSupplier wait) throws IgniteInterruptedCheckedException {
+ while (wait.getAsBoolean()) {
if (cond.apply())
return true;
U.sleep(DFLT_BUSYWAIT_SLEEP_INTERVAL);
-
- curTime = U.currentTimeMillis();
}
return false;
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
index 26edb91..0505a14 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
@@ -18,6 +18,7 @@
package org.apache.ignite.testsuites;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchForceServerModeAckTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryBufferLimitTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryConcurrentPartitionUpdateTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryCounterPartitionedAtomicTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryCounterPartitionedTxTest;
@@ -47,7 +48,8 @@ import org.junit.runners.Suite;
*/
@RunWith(Suite.class)
@Suite.SuiteClasses({
- // Continuous queries 1.
+ CacheContinuousQueryBufferLimitTest.class,
+
GridCacheContinuousQueryNodesFilteringTest.class,
GridCacheContinuousQueryPartitionTxOneNodeTest.class,
CacheContinuousWithTransformerReplicatedSelfTest.class,