You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by an...@apache.org on 2016/08/11 03:33:29 UTC
[17/19] ignite git commit: IGNITE-3272 Fixed "Memory consumption in
ContinuousQueryHandler". This close #930.
IGNITE-3272 Fixed "Memory consumption in ContinuousQueryHandler". This close #930.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ff3e00ca
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ff3e00ca
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ff3e00ca
Branch: refs/heads/master
Commit: ff3e00caa892a7399622711b620fcb4dcfbbfb56
Parents: 151dfa7
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Wed Aug 10 16:21:52 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Wed Aug 10 16:21:52 2016 +0300
----------------------------------------------------------------------
.../internal/GridEventConsumeHandler.java | 5 +
.../internal/GridMessageListenHandler.java | 5 +
.../continuous/CacheContinuousQueryEntry.java | 16 +-
.../continuous/CacheContinuousQueryHandler.java | 85 ++++++---
.../continuous/GridContinuousHandler.java | 5 +
.../continuous/GridContinuousProcessor.java | 11 +-
.../continuous/GridContinuousQueryBatch.java | 47 +++++
...niteCacheContinuousQueryBackupQueueTest.java | 184 ++++++++++++++++++-
8 files changed, 318 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ff3e00ca/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index 19bf1a7..b4b1e58 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -402,6 +402,11 @@ class GridEventConsumeHandler implements GridContinuousHandler {
}
/** {@inheritDoc} */
+ @Override public void onNodeLeft() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Nullable @Override public Object orderedTopic() {
return null;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ff3e00ca/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index 0ac6877..2b8041d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -224,6 +224,11 @@ public class GridMessageListenHandler implements GridContinuousHandler {
}
/** {@inheritDoc} */
+ @Override public void onNodeLeft() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeBoolean(depEnabled);
http://git-wip-us.apache.org/repos/asf/ignite/blob/ff3e00ca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index 74f930a..366a1e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -200,6 +200,20 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
}
/**
+ * @param topVer Topology version.
+ */
+ void topologyVersion(AffinityTopologyVersion topVer) {
+ this.topVer = topVer;
+ }
+
+ /**
+ * @return Size include this event and filtered.
+ */
+ public int size() {
+ return filteredEvts != null ? filteredEvts.size() + 1 : 1;
+ }
+
+ /**
* @return If entry filtered then will return light-weight <i><b>new entry</b></i> without values and key
* (avoid to huge memory consumption), otherwise {@code this}.
*/
@@ -208,7 +222,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
return this;
CacheContinuousQueryEntry e =
- new CacheContinuousQueryEntry(cacheId, evtType, null, null, null, keepBinary, part, updateCntr, topVer);
+ new CacheContinuousQueryEntry(cacheId, null, null, null, null, keepBinary, part, updateCntr, null);
e.flags = flags;
http://git-wip-us.apache.org/repos/asf/ignite/blob/ff3e00ca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 5012569..7b3b47b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -64,8 +64,8 @@ import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryLocalListener;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryRemoteFilter;
import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
-import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
+import org.apache.ignite.internal.processors.continuous.GridContinuousQueryBatch;
import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter;
import org.apache.ignite.internal.util.GridConcurrentSkipListSet;
import org.apache.ignite.internal.util.GridLongList;
@@ -132,7 +132,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
private transient boolean skipPrimaryCheck;
/** Backup queue. */
- private transient Collection<CacheContinuousQueryEntry> backupQueue;
+ private transient volatile Collection<CacheContinuousQueryEntry> backupQueue;
/** */
private boolean locCache;
@@ -430,33 +430,48 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
}
@Override public void cleanupBackupQueue(Map<Integer, Long> updateCntrs) {
- Iterator<CacheContinuousQueryEntry> it = backupQueue.iterator();
+ Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue;
- while (it.hasNext()) {
- CacheContinuousQueryEntry backupEntry = it.next();
+ if (backupQueue0 != null) {
+ Iterator<CacheContinuousQueryEntry> it = backupQueue0.iterator();
- Long updateCntr = updateCntrs.get(backupEntry.partition());
+ while (it.hasNext()) {
+ CacheContinuousQueryEntry backupEntry = it.next();
- if (updateCntr != null && backupEntry.updateCounter() <= updateCntr)
- it.remove();
+ Long updateCntr = updateCntrs.get(backupEntry.partition());
+
+ if (updateCntr != null && backupEntry.updateCounter() <= updateCntr)
+ it.remove();
+ }
}
}
@Override public void flushBackupQueue(GridKernalContext ctx, AffinityTopologyVersion topVer) {
- if (backupQueue.isEmpty())
+ Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue;
+
+ if (backupQueue0 == null)
return;
try {
- GridCacheContext<K, V> cctx = cacheContext(ctx);
+ ClusterNode nodeId0 = ctx.discovery().node(nodeId);
- for (CacheContinuousQueryEntry e : backupQueue) {
- if (!e.isFiltered())
- prepareEntry(cctx, nodeId, e);
- }
+ if (nodeId0 != null) {
+ GridCacheContext<K, V> cctx = cacheContext(ctx);
- ctx.continuous().addBackupNotification(nodeId, routineId, backupQueue, topic);
+ for (CacheContinuousQueryEntry e : backupQueue0) {
+ if (!e.isFiltered())
+ prepareEntry(cctx, nodeId, e);
+
+ e.topologyVersion(topVer);
+ }
+
+ ctx.continuous().addBackupNotification(nodeId, routineId, backupQueue0, topic);
+ }
+ else
+ // Node which start CQ leave topology. Not needed to put data to backup queue.
+ backupQueue = null;
- backupQueue.clear();
+ backupQueue0.clear();
}
catch (IgniteCheckedException e) {
U.error(ctx.log(getClass()), "Failed to send backup event notification to node: " + nodeId, e);
@@ -479,9 +494,13 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
}
@Override public void onPartitionEvicted(int part) {
- for (Iterator<CacheContinuousQueryEntry> it = backupQueue.iterator(); it.hasNext();) {
- if (it.next().partition() == part)
- it.remove();
+ Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue;
+
+ if (backupQueue0 != null) {
+ for (Iterator<CacheContinuousQueryEntry> it = backupQueue0.iterator(); it.hasNext(); ) {
+ if (it.next().partition() == part)
+ it.remove();
+ }
}
}
@@ -740,7 +759,10 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
if (!primary && !internal && entry.updateCounter() != -1L /* Skip init query and expire entries */) {
entry.markBackup();
- backupQueue.add(entry.forBackupQueue());
+ Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue;
+
+ if (backupQueue0 != null)
+ backupQueue0.add(entry.forBackupQueue());
}
return notify;
@@ -765,12 +787,11 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
if (!locCache) {
Collection<CacheEntryEvent<? extends K, ? extends V>> evts = handleEvent(ctx, entry);
- if (!evts.isEmpty()) {
+ if (!evts.isEmpty())
locLsnr.onUpdated(evts);
- if (!internal && !skipPrimaryCheck)
- sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx);
- }
+ if (!internal && !skipPrimaryCheck)
+ sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx);
}
else {
if (!entry.isFiltered())
@@ -931,7 +952,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
* @param topVer Topology version.
* @param initCntr Update counters.
*/
- public PartitionRecovery(IgniteLogger log, AffinityTopologyVersion topVer, @Nullable Long initCntr) {
+ PartitionRecovery(IgniteLogger log, AffinityTopologyVersion topVer, @Nullable Long initCntr) {
this.log = log;
if (initCntr != null) {
@@ -1176,6 +1197,14 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
}
/** {@inheritDoc} */
+ @Override public void onNodeLeft() {
+ Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue;
+
+ if (backupQueue0 != null)
+ backupQueue = null;
+ }
+
+ /** {@inheritDoc} */
@Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException {
assert ctx != null;
assert ctx.config().isPeerClassLoadingEnabled();
@@ -1196,7 +1225,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
/** {@inheritDoc} */
@Override public GridContinuousBatch createBatch() {
- return new GridContinuousBatchAdapter();
+ return new GridContinuousQueryBatch();
}
/** {@inheritDoc} */
@@ -1345,7 +1374,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
@SuppressWarnings("unchecked")
@Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>
onAcknowledged(GridContinuousBatch batch) {
- size += batch.size();
+ assert batch instanceof GridContinuousQueryBatch;
+
+ size += ((GridContinuousQueryBatch)batch).entriesCount();
Collection<CacheContinuousQueryEntry> entries = (Collection)batch.collect();
http://git-wip-us.apache.org/repos/asf/ignite/blob/ff3e00ca/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
index 46e87af..c90746d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
@@ -116,6 +116,11 @@ public interface GridContinuousHandler extends Externalizable, Cloneable {
public void onBatchAcknowledged(UUID routineId, GridContinuousBatch batch, GridKernalContext ctx);
/**
+ * Node which started routine leave topology.
+ */
+ public void onNodeLeft();
+
+ /**
* @return Topic for ordered notifications. If {@code null}, notifications
* will be sent in non-ordered messages.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/ff3e00ca/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index fce48c4..5f61051 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -175,8 +175,13 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
UUID routineId = e.getKey();
RemoteRoutineInfo info = e.getValue();
- if (info.autoUnsubscribe && nodeId.equals(info.nodeId))
- unregisterRemote(routineId);
+ if (nodeId.equals(info.nodeId)) {
+ if (info.autoUnsubscribe)
+ unregisterRemote(routineId);
+
+ if (info.hnd.isQuery())
+ info.hnd.onNodeLeft();
+ }
}
for (Map.Entry<IgniteUuid, SyncMessageAckFuture> e : syncMsgFuts.entrySet()) {
@@ -865,6 +870,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
try {
sendNotification(nodeId, routineId, futId, F.asList(obj), null, msg, null);
+
+ info.hnd.onBatchAcknowledged(routineId, info.add(obj), ctx);
}
catch (IgniteCheckedException e) {
syncMsgFuts.remove(futId);
http://git-wip-us.apache.org/repos/asf/ignite/blob/ff3e00ca/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousQueryBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousQueryBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousQueryBatch.java
new file mode 100644
index 0000000..c5d854b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousQueryBatch.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEntry;
+
+/**
+ * Continuous routine batch adapter.
+ */
+public class GridContinuousQueryBatch extends GridContinuousBatchAdapter {
+ /** Entries size included filtered entries. */
+ private final AtomicInteger size = new AtomicInteger();
+
+ /** {@inheritDoc} */
+ @Override public void add(Object obj) {
+ assert obj != null;
+ assert obj instanceof CacheContinuousQueryEntry;
+
+ super.add(obj);
+
+ size.addAndGet(((CacheContinuousQueryEntry)obj).size());
+ }
+
+ /**
+ * @return Entries count.
+ */
+ public int entriesCount() {
+ return size.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ff3e00ca/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
index aea1954..d823409 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
@@ -18,18 +18,29 @@
package org.apache.ignite.internal.processors.cache.query.continuous;
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import javax.cache.configuration.Factory;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.Ignite;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
@@ -46,14 +57,27 @@ public class IgniteCacheContinuousQueryBackupQueueTest extends GridCommonAbstrac
/** Keys count. */
private static final int KEYS_COUNT = 1024;
+ /** CQ count. */
+ private static final int QUERY_COUNT = 20;
+
/** Grid count. */
private static final int GRID_COUNT = 2;
+ /** */
+ private static boolean client = false;
+
+ /** */
+ private static String CACHE_NAME = "test-cache";
+
+ /** */
+ private static final int BACKUP_ACK_THRESHOLD = 100;
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
- CacheConfiguration ccfg = new CacheConfiguration();
+ CacheConfiguration ccfg = new CacheConfiguration(CACHE_NAME);
+
ccfg.setCacheMode(PARTITIONED);
ccfg.setAtomicityMode(ATOMIC);
ccfg.setWriteSynchronizationMode(FULL_SYNC);
@@ -61,42 +85,51 @@ public class IgniteCacheContinuousQueryBackupQueueTest extends GridCommonAbstrac
cfg.setCacheConfiguration(ccfg);
+ cfg.setClientMode(client);
+
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
return cfg;
}
/** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ startGridsMultiThreaded(GRID_COUNT);
+ }
+
+ /** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
stopAllGrids();
+
+ client = false;
}
/** {@inheritDoc} */
@Override protected long getTestTimeout() {
- return TimeUnit.MINUTES.toMillis(2);
+ return TimeUnit.MINUTES.toMillis(10);
}
/**
* @throws Exception If failed.
*/
public void testBackupQueue() throws Exception {
- startGridsMultiThreaded(GRID_COUNT);
-
final CacheEventListener lsnr = new CacheEventListener();
ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
qry.setLocalListener(lsnr);
- qry.setRemoteFilterFactory(new FilterFactory());
+ qry.setRemoteFilterFactory(new AlwaysFalseFilterFactory());
- try (QueryCursor<?> ignore = grid(0).cache(null).query(qry)) {
+ try (QueryCursor<?> ignore = grid(0).cache(CACHE_NAME).query(qry)) {
for (int i = 0; i < KEYS_COUNT; i++) {
log.info("Put key: " + i);
for (int j = 0; j < 100; j++)
- grid(i % GRID_COUNT).cache(null).put(i, new byte[1024 * 50]);
+ grid(j % GRID_COUNT).cache(CACHE_NAME).put(i, new byte[1024 * 50]);
}
log.info("Finish.");
@@ -104,19 +137,150 @@ public class IgniteCacheContinuousQueryBackupQueueTest extends GridCommonAbstrac
}
/**
+ * @throws Exception If failed.
+ */
+ public void testManyQueryBackupQueue() throws Exception {
+ List<QueryCursor> qryCursors = new ArrayList<>();
+
+ for (int i = 0; i < QUERY_COUNT; i++) {
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ qry.setLocalListener(new CacheEventListener());
+ qry.setRemoteFilterFactory(new AlwaysFalseFilterFactory());
+
+ qryCursors.add(grid(0).cache(CACHE_NAME).query(qry));
+ }
+
+ for (int i = 0; i < KEYS_COUNT; i++) {
+ log.info("Put key: " + i);
+
+ for (int j = 0; j < 150; j++)
+ grid(ThreadLocalRandom.current().nextInt(GRID_COUNT)).cache(CACHE_NAME).put(i, new byte[1024 * 50]);
+ }
+
+ int size = backupQueueSize();
+
+ assertTrue(size > 0);
+ assertTrue(size <= BACKUP_ACK_THRESHOLD * QUERY_COUNT * /* partition count */1024);
+
+ for (QueryCursor qry : qryCursors)
+ qry.close();
+ }
+
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testBackupQueueAutoUnsubscribeFalse() throws Exception {
+ try {
+ client = true;
+
+ Ignite client = startGrid(GRID_COUNT);
+
+ awaitPartitionMapExchange();
+
+ List<QueryCursor> qryCursors = new ArrayList<>();
+
+ for (int i = 0; i < QUERY_COUNT; i++) {
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ qry.setLocalListener(new CacheEventListener());
+ qry.setRemoteFilterFactory(new AlwaysFalseFilterFactory());
+ qry.setAutoUnsubscribe(false);
+
+ qryCursors.add(client.cache(CACHE_NAME).query(qry));
+ }
+
+ for (int i = 0; i < KEYS_COUNT; i++) {
+ log.info("Put key: " + i);
+
+ grid(i % GRID_COUNT).cache(CACHE_NAME).put(i, new byte[1024 * 50]);
+ }
+
+ int size = backupQueueSize();
+
+ assertTrue(size > 0);
+ assertTrue(size <= BACKUP_ACK_THRESHOLD * QUERY_COUNT * /* partition count */1024);
+
+ stopGrid(GRID_COUNT);
+
+ awaitPartitionMapExchange();
+
+ for (int i = 0; i < KEYS_COUNT; i++) {
+ log.info("Put key: " + i);
+
+ grid(i % GRID_COUNT).cache(CACHE_NAME).put(i, new byte[1024 * 50]);
+ }
+
+ size = backupQueueSize();
+
+ assertEquals(-1, size);
+ }
+ finally {
+ stopGrid(GRID_COUNT);
+ }
+ }
+
+ /**
+ * @return Backup queue size or {@code -1} if backup queue doesn't exist.
+ */
+ private int backupQueueSize() {
+ int backupQueueSize = -1;
+
+ for (int i = 0; i < GRID_COUNT; i++) {
+ for (Collection<Object> backQueue : backupQueues(grid(i)))
+ backupQueueSize += backQueue.size();
+ }
+
+ return backupQueueSize;
+ }
+
+ /**
+ * @param ignite Ignite.
+ * @return Backup queue for test query.
+ */
+ private List<Collection<Object>> backupQueues(Ignite ignite) {
+ GridContinuousProcessor proc = ((IgniteKernal)ignite).context().continuous();
+
+ Map<Object, Object> infos = new HashMap<>();
+
+ Map<Object, Object> rmtInfos = GridTestUtils.getFieldValue(proc, "rmtInfos");
+ Map<Object, Object> locInfos = GridTestUtils.getFieldValue(proc, "locInfos");
+
+ infos.putAll(rmtInfos);
+ infos.putAll(locInfos);
+
+ List<Collection<Object>> backupQueues = new ArrayList<>();
+
+ for (Object info : infos.values()) {
+ GridContinuousHandler hnd = GridTestUtils.getFieldValue(info, "hnd");
+
+ if (hnd.isQuery() && hnd.cacheName().equals(CACHE_NAME)) {
+ Collection<Object> q = GridTestUtils.getFieldValue(hnd,
+ CacheContinuousQueryHandler.class, "backupQueue");
+
+ if (q != null)
+ backupQueues.add(q);
+ }
+ }
+
+ return backupQueues;
+ }
+
+ /**
*
*/
- private static class FilterFactory implements Factory<CacheEntryEventFilter<Object, Object>> {
+ private static class AlwaysFalseFilterFactory implements Factory<CacheEntryEventFilter<Object, Object>> {
/** {@inheritDoc} */
@Override public CacheEntryEventFilter<Object, Object> create() {
- return new CacheEventFilter();
+ return new AlwaysFalseFilter();
}
}
/**
*
*/
- private static class CacheEventFilter implements CacheEntryEventFilter<Object, Object>, Serializable {
+ private static class AlwaysFalseFilter implements CacheEntryEventFilter<Object, Object>, Serializable {
/** {@inheritDoc} */
@Override public boolean evaluate(CacheEntryEvent<?, ?> evt) {
return false;