You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2016/03/21 19:55:30 UTC
ignite git commit: Fixed IGNITE-2791 "Continuous query listener is
not notified during concurrent key put and registration."
Repository: ignite
Updated Branches:
refs/heads/ignite-2791-merg [created] 8e13dc265
Fixed IGNITE-2791 "Continuous query listener is not notified during concurrent key put and registration."
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8e13dc26
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8e13dc26
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8e13dc26
Branch: refs/heads/ignite-2791-merg
Commit: 8e13dc265abdb2718d1ea687caf3ad6270d95390
Parents: 43ff148
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Mon Mar 21 21:55:33 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Mon Mar 21 21:55:33 2016 +0300
----------------------------------------------------------------------
.../internal/GridEventConsumeHandler.java | 3 +-
.../internal/GridMessageListenHandler.java | 3 +-
.../continuous/CacheContinuousQueryHandler.java | 88 +++-
.../continuous/CacheContinuousQueryManager.java | 12 +
.../continuous/GridContinuousHandler.java | 4 +-
.../continuous/GridContinuousProcessor.java | 27 +-
.../StartRoutineAckDiscoveryMessage.java | 22 +-
.../StartRoutineDiscoveryMessage.java | 22 +-
.../CacheContinuousQueryLostPartitionTest.java | 2 -
.../GridCacheContinuousQueryConcurrentTest.java | 466 +++++++++++++++++++
.../IgniteCacheQuerySelfTestSuite.java | 2 +
11 files changed, 600 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e13dc26/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index e2b1184..19bf1a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -136,7 +136,8 @@ class GridEventConsumeHandler implements GridContinuousHandler {
}
/** {@inheritDoc} */
- @Override public void updateCounters(AffinityTopologyVersion topVer, Map<Integer, Long> cntrs) {
+ @Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, Long>> cntrsPerNode,
+ Map<Integer, Long> cntrs) {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e13dc26/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index 402365c..0ac6877 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -125,7 +125,8 @@ public class GridMessageListenHandler implements GridContinuousHandler {
}
/** {@inheritDoc} */
- @Override public void updateCounters(AffinityTopologyVersion topVer, Map<Integer, Long> cntrs) {
+ @Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, Long>> cntrsPerNode,
+ Map<Integer, Long> cntrs) {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e13dc26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 10fbd89..6243af7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
@@ -72,6 +73,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgnitePredicate;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedDeque8;
@@ -146,10 +148,13 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
private transient int cacheId;
/** */
- private Map<Integer, Long> initUpdCntrs;
+ private transient volatile Map<Integer, Long> initUpdCntrs;
/** */
- private AffinityTopologyVersion initTopVer;
+ private transient volatile Map<UUID, Map<Integer, Long>> initUpdCntrsPerNode;
+
+ /** */
+ private transient volatile AffinityTopologyVersion initTopVer;
/** */
private transient boolean ignoreClsNotFound;
@@ -264,9 +269,11 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
}
/** {@inheritDoc} */
- @Override public void updateCounters(AffinityTopologyVersion topVer, Map<Integer, Long> cntrs) {
- this.initTopVer = topVer;
+ @Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, Long>> cntrsPerNode,
+ Map<Integer, Long> cntrs) {
+ this.initUpdCntrsPerNode = cntrsPerNode;
this.initUpdCntrs = cntrs;
+ this.initTopVer = topVer;
}
/** {@inheritDoc} */
@@ -296,20 +303,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
assert !skipPrimaryCheck || loc;
- final GridCacheContext<K, V> cctx = cacheContext(ctx);
-
- if (!internal && cctx != null && initUpdCntrs != null) {
- Map<Integer, Long> map = cctx.topology().updateCounters();
-
- for (Map.Entry<Integer, Long> e : map.entrySet()) {
- Long cntr0 = initUpdCntrs.get(e.getKey());
- Long cntr1 = e.getValue();
-
- if (cntr0 == null || cntr1 > cntr0)
- initUpdCntrs.put(e.getKey(), cntr1);
- }
- }
-
CacheContinuousQueryListener<K, V> lsnr = new CacheContinuousQueryListener<K, V>() {
@Override public void onExecution() {
if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
@@ -561,6 +554,20 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
entry.prepareMarshal(cctx);
}
+ /**
+ * Wait topology.
+ */
+ public void waitTopologyFuture(GridKernalContext ctx) throws IgniteCheckedException {
+ GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+ if (!cctx.isLocal()) {
+ cacheContext(ctx).affinity().affinityReadyFuture(initTopVer).get();
+
+ for (int partId = 0; partId < cacheContext(ctx).affinity().partitions(); partId++)
+ getOrCreatePartitionRecovery(ctx, partId);
+ }
+ }
+
/** {@inheritDoc} */
@Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) {
// No-op.
@@ -668,19 +675,54 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
if (e.updateCounter() == -1L)
return F.asList(e);
- PartitionRecovery rec = rcvs.get(e.partition());
+ PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition());
+
+ return rec.collectEntries(e);
+ }
+
+ /**
+ * @param ctx Context.
+ * @param partId Partition id.
+ * @return Partition recovery.
+ */
+ @NotNull private PartitionRecovery getOrCreatePartitionRecovery(GridKernalContext ctx, int partId) {
+ PartitionRecovery rec = rcvs.get(partId);
if (rec == null) {
- rec = new PartitionRecovery(ctx.log(getClass()), initTopVer,
- initUpdCntrs == null ? null : initUpdCntrs.get(e.partition()));
+ Long partCntr = null;
- PartitionRecovery oldRec = rcvs.putIfAbsent(e.partition(), rec);
+ AffinityTopologyVersion initTopVer0 = initTopVer;
+
+ if (initTopVer0 != null) {
+ GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+ GridCacheAffinityManager aff = cctx.affinity();
+
+ if (initUpdCntrsPerNode != null) {
+ for (ClusterNode node : aff.nodes(partId, initTopVer)) {
+ Map<Integer, Long> map = initUpdCntrsPerNode.get(node.id());
+
+ if (map != null) {
+ partCntr = map.get(partId);
+
+ break;
+ }
+ }
+ }
+ else if (initUpdCntrs != null) {
+ partCntr = initUpdCntrs.get(partId);
+ }
+ }
+
+ rec = new PartitionRecovery(ctx.log(getClass()), initTopVer0, partCntr);
+
+ PartitionRecovery oldRec = rcvs.putIfAbsent(partId, rec);
if (oldRec != null)
rec = oldRec;
}
- return rec.collectEntries(e);
+ return rec;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e13dc26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 353043f..869a51b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -649,6 +649,18 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
autoUnsubscribe,
pred).get();
+ try {
+ if (hnd.isQuery() && cctx.userCache())
+ hnd.waitTopologyFuture(cctx.kernalContext());
+ }
+ catch (IgniteCheckedException e) {
+ log.warning("Failed to start continuous query.", e);
+
+ cctx.kernalContext().continuous().stopRoutine(id);
+
+ throw new IgniteCheckedException("Failed to start continuous query.", e);
+ }
+
if (notifyExisting) {
final Iterator<GridCacheEntryEx> it = cctx.cache().allEntries().iterator();
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e13dc26/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
index 8cd30a8..46e87af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
@@ -154,8 +154,10 @@ public interface GridContinuousHandler extends Externalizable, Cloneable {
public String cacheName();
/**
+ * @param cntrsPerNode Init state partition counters for node.
* @param cntrs Init state for partition counters.
* @param topVer Topology version.
*/
- public void updateCounters(AffinityTopologyVersion topVer, Map<Integer, Long> cntrs);
+ public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, Long>> cntrsPerNode,
+ Map<Integer, Long> cntrs);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e13dc26/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 1776748..f2d6e1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -45,7 +45,6 @@ import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteDeploymentCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
@@ -220,25 +219,18 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
// Update partition counters.
if (routine != null && routine.handler().isQuery()) {
+ Map<UUID, Map<Integer, Long>> cntrsPerNode = msg.updateCountersPerNode();
Map<Integer, Long> cntrs = msg.updateCounters();
GridCacheAdapter<Object, Object> interCache =
ctx.cache().internalCache(routine.handler().cacheName());
- if (interCache != null && cntrs != null && interCache.context() != null
- && !interCache.isLocal() && !CU.clientNode(ctx.grid().localNode())) {
- Map<Integer, Long> map = interCache.context().topology().updateCounters();
+ GridCacheContext cctx = interCache != null ? interCache.context() : null;
- for (Map.Entry<Integer, Long> e : map.entrySet()) {
- Long cntr0 = cntrs.get(e.getKey());
- Long cntr1 = e.getValue();
+ if (cctx != null && cntrsPerNode != null && !cctx.isLocal() && cctx.affinityNode())
+ cntrsPerNode.put(ctx.localNodeId(), cctx.topology().updateCounters());
- if (cntr0 == null || cntr1 > cntr0)
- cntrs.put(e.getKey(), cntr1);
- }
- }
-
- routine.handler().updateCounters(topVer, msg.updateCounters());
+ routine.handler().updateCounters(topVer, cntrsPerNode, cntrs);
}
fut.onRemoteRegistered();
@@ -756,7 +748,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
syncMsgFuts.put(futId, fut);
try {
- sendNotification(nodeId, routineId, futId, F.asList(obj), orderedTopic, msg, null);
+ sendNotification(nodeId, routineId, futId, F.asList(obj), null, msg, null);
}
catch (IgniteCheckedException e) {
syncMsgFuts.remove(futId);
@@ -923,11 +915,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
if (proc != null) {
GridCacheAdapter cache = ctx.cache().internalCache(hnd0.cacheName());
- if (cache != null && !cache.isLocal()) {
- Map<Integer, Long> cntrs = cache.context().topology().updateCounters();
-
- req.addUpdateCounters(cntrs);
- }
+ if (cache != null && !cache.isLocal())
+ req.addUpdateCounters(ctx.localNodeId(), cache.context().topology().updateCounters());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e13dc26/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
index 9644372..ca34b27 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
@@ -22,6 +22,7 @@ import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
@@ -36,18 +37,28 @@ public class StartRoutineAckDiscoveryMessage extends AbstractContinuousMessage {
private final Map<UUID, IgniteCheckedException> errs;
/** */
+ @GridToStringExclude
private final Map<Integer, Long> updateCntrs;
+ /** */
+ @GridToStringExclude
+ private final Map<UUID, Map<Integer, Long>> updateCntrsPerNode;
+
/**
* @param routineId Routine id.
* @param errs Errs.
+ * @param cntrs Partition counters.
+ * @param cntrsPerNode Partition counters per node.
*/
- public StartRoutineAckDiscoveryMessage(UUID routineId, Map<UUID, IgniteCheckedException> errs,
- Map<Integer, Long> cntrs) {
+ public StartRoutineAckDiscoveryMessage(UUID routineId,
+ Map<UUID, IgniteCheckedException> errs,
+ Map<Integer, Long> cntrs,
+ Map<UUID, Map<Integer, Long>> cntrsPerNode) {
super(routineId);
this.errs = new HashMap<>(errs);
this.updateCntrs = cntrs;
+ this.updateCntrsPerNode = cntrsPerNode;
}
/** {@inheritDoc} */
@@ -63,6 +74,13 @@ public class StartRoutineAckDiscoveryMessage extends AbstractContinuousMessage {
}
/**
+ * @return Update counters for partitions per each node.
+ */
+ public Map<UUID, Map<Integer, Long>> updateCountersPerNode() {
+ return updateCntrsPerNode;
+ }
+
+ /**
* @return Errs.
*/
public Map<UUID, IgniteCheckedException> errs() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e13dc26/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
index ff037d4..24eb050 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
@@ -40,6 +40,9 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
/** */
private Map<Integer, Long> updateCntrs;
+ /** */
+ private Map<UUID, Map<Integer, Long>> updateCntrsPerNode;
+
/** Keep binary flag. */
private boolean keepBinary;
@@ -72,7 +75,7 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
/**
* @param cntrs Update counters.
*/
- public void addUpdateCounters(Map<Integer, Long> cntrs) {
+ private void addUpdateCounters(Map<Integer, Long> cntrs) {
if (updateCntrs == null)
updateCntrs = new HashMap<>();
@@ -86,6 +89,21 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
}
/**
+ * @param nodeId Local node ID.
+ * @param cntrs Update counters.
+ */
+ public void addUpdateCounters(UUID nodeId, Map<Integer, Long> cntrs) {
+ addUpdateCounters(cntrs);
+
+ if (updateCntrsPerNode == null)
+ updateCntrsPerNode = new HashMap<>();
+
+ Map<Integer, Long> old = updateCntrsPerNode.put(nodeId, cntrs);
+
+ assert old == null : old;
+ }
+
+ /**
* @return Errs.
*/
public Map<UUID, IgniteCheckedException> errs() {
@@ -106,7 +124,7 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
/** {@inheritDoc} */
@Override public DiscoveryCustomMessage ackMessage() {
- return new StartRoutineAckDiscoveryMessage(routineId, errs, updateCntrs);
+ return new StartRoutineAckDiscoveryMessage(routineId, errs, updateCntrs, updateCntrsPerNode);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e13dc26/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java
index f4659dc..025dd80 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java
@@ -140,8 +140,6 @@ public class CacheContinuousQueryLostPartitionTest extends GridCommonAbstractTes
// node2 now becomes the primary for the key.
stopGrid(0);
- awaitPartitionMapExchange();
-
cache2.put(key, "2");
// Sanity check.
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e13dc26/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java
new file mode 100644
index 0000000..29b351b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+import javax.cache.configuration.FactoryBuilder.SingletonFactory;
+import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
+import javax.cache.event.CacheEntryCreatedListener;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
+import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static javax.cache.configuration.FactoryBuilder.factoryOf;
+
+/**
+ *
+ */
+public class GridCacheContinuousQueryConcurrentTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final int NODES = 2;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ startGridsMultiThreaded(NODES);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ cfg.setPeerClassLoadingEnabled(false);
+
+ if (gridName.endsWith(String.valueOf(NODES)))
+ cfg.setClientMode(ThreadLocalRandom.current().nextBoolean());
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReplicatedTx() throws Exception {
+ testRegistration(cacheConfiguration(CacheMode.REPLICATED, CacheAtomicityMode.TRANSACTIONAL, 1));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRestartReplicated() throws Exception {
+ testRestartRegistration(cacheConfiguration(CacheMode.REPLICATED, CacheAtomicityMode.ATOMIC, 2));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRestartPartition() throws Exception {
+ testRestartRegistration(cacheConfiguration(CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC, 2));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRestartPartitionTx() throws Exception {
+ testRestartRegistration(cacheConfiguration(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL, 2));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReplicatedAtomic() throws Exception {
+ testRegistration(cacheConfiguration(CacheMode.REPLICATED, CacheAtomicityMode.ATOMIC, 2));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionTx() throws Exception {
+ testRegistration(cacheConfiguration(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL, 2));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionAtomic() throws Exception {
+ testRegistration(cacheConfiguration(CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC, 2));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRegistration(CacheConfiguration ccfg) throws Exception {
+ ExecutorService execSrv = newSingleThreadExecutor();
+
+ try {
+ final IgniteCache<Integer, String> cache = grid(0).getOrCreateCache(ccfg);
+
+ for (int i = 0; i < 10; i++) {
+ log.info("Start iteration: " + i);
+
+ final int i0 = i;
+ final AtomicBoolean stop = new AtomicBoolean(false);
+ final CountDownLatch latch = new CountDownLatch(1);
+ final int conQryCnt = 50;
+
+ Future<List<IgniteFuture<String>>> fut = execSrv.submit(
+ new Callable<List<IgniteFuture<String>>>() {
+ @Override public List<IgniteFuture<String>> call() throws Exception {
+ int count = 0;
+ List<IgniteFuture<String>> futures = new ArrayList<>();
+
+ while (!stop.get()) {
+ futures.add(waitForKey(i0, cache, count));
+
+ if (log.isDebugEnabled())
+ log.debug("Started cont query count: " + count);
+
+ if (++count >= conQryCnt)
+ latch.countDown();
+ }
+
+ return futures;
+ }
+ });
+
+ assert U.await(latch, 1, MINUTES);
+
+ cache.put(i, "v");
+
+ stop.set(true);
+
+ List<IgniteFuture<String>> contQries = fut.get();
+
+ for (IgniteFuture<String> contQry : contQries)
+ contQry.get(2, TimeUnit.SECONDS);
+ }
+ }
+ finally {
+ execSrv.shutdownNow();
+
+ grid(0).destroyCache(ccfg.getName());
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRestartRegistration(CacheConfiguration ccfg) throws Exception {
+ ExecutorService execSrv = newSingleThreadExecutor();
+
+ final AtomicBoolean stopRes = new AtomicBoolean(false);
+
+ IgniteInternalFuture<?> restartFut = null;
+
+ try {
+ final IgniteCache<Integer, String> cache = grid(0).getOrCreateCache(ccfg);
+
+ restartFut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ while (!stopRes.get()) {
+ startGrid(NODES);
+
+ assert GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return grid(0).cluster().nodes().size() == NODES + 1;
+ }
+ }, 5000L);
+
+ Thread.sleep(300);
+
+ stopGrid(NODES);
+
+ assert GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return grid(0).cluster().nodes().size() == NODES;
+ }
+ }, 5000L);
+
+ Thread.sleep(300);
+ }
+
+ return null;
+ }
+ });
+
+ U.sleep(100);
+
+ for (int i = 0; i < 10; i++) {
+ log.info("Start iteration: " + i);
+
+ final int i0 = i;
+ final AtomicBoolean stop = new AtomicBoolean(false);
+ final CountDownLatch latch = new CountDownLatch(1);
+ final int conQryCnt = 50;
+
+ Future<List<IgniteFuture<String>>> fut = execSrv.submit(
+ new Callable<List<IgniteFuture<String>>>() {
+ @Override public List<IgniteFuture<String>> call() throws Exception {
+ int count = 0;
+ List<IgniteFuture<String>> futures = new ArrayList<>();
+
+ while (!stop.get()) {
+ futures.add(waitForKey(i0, cache, count));
+
+ if (log.isDebugEnabled())
+ log.debug("Started cont query count: " + count);
+
+ if (++count >= conQryCnt)
+ latch.countDown();
+ }
+
+ return futures;
+ }
+ });
+
+ latch.await();
+
+ cache.put(i, "v");
+
+ assertEquals("v", cache.get(i));
+
+ stop.set(true);
+
+ List<IgniteFuture<String>> contQries = fut.get();
+
+ for (IgniteFuture<String> contQry : contQries)
+ contQry.get(5, TimeUnit.SECONDS);
+ }
+ }
+ finally {
+ execSrv.shutdownNow();
+
+ grid(0).destroyCache(ccfg.getName());
+
+ if (restartFut != null) {
+ stopRes.set(true);
+
+ restartFut.get();
+
+ stopGrid(NODES);
+ }
+ }
+ }
+
+ /**
+ * @param key Key
+ * @param cache Cache.
+ * @param id ID.
+ * @return Future.
+ */
+ public IgniteFuture<String> waitForKey(Integer key, final IgniteCache<Integer, String> cache, final int id) {
+ String v = cache.get(key);
+
+ // From now on, all futures will be completed immediately (since the key has been
+ // inserted).
+ if (v != null)
+ return new IgniteFinishedFutureImpl<>("immediately");
+
+ final IgniteFuture<String> promise = new IgniteFutureImpl<>(new GridFutureAdapter<String>());
+
+ final CacheEntryListenerConfiguration<Integer, String> cfg =
+ createCacheListener(key, promise, id);
+
+ promise.listen(new IgniteInClosure<IgniteFuture<String>>() {
+ @Override public void apply(IgniteFuture<String> future) {
+ GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ cache.deregisterCacheEntryListener(cfg);
+
+ return null;
+ }
+ });
+ }
+ });
+
+ // Start listening.
+ // Assumption: When the call returns, the listener is guaranteed to have been registered.
+ cache.registerCacheEntryListener(cfg);
+
+ // Now must check the cache again, to make sure that we didn't miss the key insert while we
+ // were busy setting up the cache listener.
+ // Check asynchronously.
+ IgniteCache<Integer, String> asyncCache = cache.withAsync();
+ asyncCache.get(key);
+
+ // Complete the promise if the key was inserted concurrently.
+ asyncCache.<String>future().listen(new IgniteInClosure<IgniteFuture<String>>() {
+ @Override public void apply(IgniteFuture<String> f) {
+ String value = f.get();
+
+ if (value != null) {
+ log.info("Completed by get: " + id);
+
+ (((GridFutureAdapter)((IgniteFutureImpl)promise).internalFuture())).onDone("by get");
+ }
+ }
+ });
+
+ return promise;
+ }
+
+ /**
+ * @param key Key.
+ * @param result Result.
+ * @param id Listener ID.
+ * @return Listener
+ */
+ private CacheEntryListenerConfiguration<Integer, String> createCacheListener(
+ Integer key,
+ IgniteFuture<String> result,
+ int id) {
+ return new MutableCacheEntryListenerConfiguration<>(
+ factoryOf(new CacheListener(result, id)),
+ new SingletonFactory<>(new KeyEventFilter(key, id)), false, true);
+ }
+
+
+
+ /**
+ * @param cacheMode Cache mode.
+ * @param atomicMode Atomicy mode.
+ * @param backups Backups.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration<Integer, String> cacheConfiguration(CacheMode cacheMode,
+ CacheAtomicityMode atomicMode, int backups) {
+ CacheConfiguration<Integer, String> cfg = new CacheConfiguration<>("test-" + cacheMode + atomicMode + backups);
+
+ cfg.setCacheMode(cacheMode);
+ cfg.setAtomicityMode(atomicMode);
+ cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ cfg.setBackups(backups);
+ cfg.setReadFromBackup(false);
+
+ return cfg;
+ }
+
+ /**
+ *
+ */
+ private static class CacheListener implements CacheEntryCreatedListener<Integer, String>, Serializable {
+ /** */
+ final IgniteFuture<String> result;
+
+ /** */
+ private final int id;
+
+ /**
+ * @param result Result.
+ * @param id ID.
+ */
+ CacheListener(IgniteFuture<String> result, int id) {
+ this.result = result;
+ this.id = id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> evts) {
+ (((GridFutureAdapter)((IgniteFutureImpl)result).internalFuture())).onDone("by listener");
+ }
+ }
+
+ /**
+ *
+ */
+ private static class KeyEventFilter implements CacheEntryEventFilter<Integer, String>, Serializable {
+ /** */
+ private static final long serialVersionUID = 42L;
+
+ /** */
+ private final Object key;
+
+ /** */
+ private final int id;
+
+ /**
+ * @param key Key.
+ * @param id ID.
+ */
+ KeyEventFilter(Object key, int id) {
+ this.key = key;
+ this.id = id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends String> e) {
+ return e.getKey().equals(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ return this == o || !(o == null || getClass() != o.getClass())
+ && key.equals(((KeyEventFilter) o).key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return key.hashCode();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8e13dc26/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 083af1e..0aa3560 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -93,6 +93,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheCon
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicOffheapValuesTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicP2PDisabledSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryConcurrentTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryLocalAtomicSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryLocalSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryPartitionAtomicOneNodeTest;
@@ -228,6 +229,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
suite.addTestSuite(CacheContinuousQueryFailoverTxOffheapTieredTest.class);
suite.addTestSuite(CacheContinuousQueryRandomOperationsTest.class);
suite.addTestSuite(CacheContinuousQueryFactoryFilterTest.class);
+ suite.addTestSuite(GridCacheContinuousQueryConcurrentTest.class);
suite.addTestSuite(CacheContinuousQueryOperationP2PTest.class);
suite.addTestSuite(CacheContinuousBatchAckTest.class);
suite.addTestSuite(CacheContinuousBatchForceServerModeAckTest.class);