You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2016/01/18 18:42:20 UTC
ignite git commit: IGNITE-2384 Fixed "Notification missed in
continuous query".
Repository: ignite
Updated Branches:
refs/heads/master 9a996332d -> 252ba877e
IGNITE-2384 Fixed "Notification missed in continuous query".
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/252ba877
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/252ba877
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/252ba877
Branch: refs/heads/master
Commit: 252ba877e2e8b357ca326ce23d8fcc83dd0138bd
Parents: 9a99633
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Mon Jan 18 20:41:53 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Mon Jan 18 20:42:16 2016 +0300
----------------------------------------------------------------------
.../internal/GridEventConsumeHandler.java | 3 +-
.../internal/GridMessageListenHandler.java | 6 +-
.../managers/discovery/CustomEventListener.java | 4 +-
.../discovery/GridDiscoveryManager.java | 8 +-
.../continuous/CacheContinuousQueryHandler.java | 36 +--
.../continuous/GridContinuousHandler.java | 4 +-
.../continuous/GridContinuousProcessor.java | 15 +-
...ContinuousQueryFailoverAbstractSelfTest.java | 80 ++++++
.../CacheContinuousQueryLostPartitionTest.java | 256 +++++++++++++++++++
.../IgniteBinaryCacheQueryTestSuite.java | 2 +
10 files changed, 383 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/252ba877/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index be35ba4..69af6cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
@@ -135,7 +136,7 @@ class GridEventConsumeHandler implements GridContinuousHandler {
}
/** {@inheritDoc} */
- @Override public void updateCounters(Map<Integer, Long> cntrs) {
+ @Override public void updateCounters(AffinityTopologyVersion topVer, Map<Integer, Long> cntrs) {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/252ba877/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index 2edfda5..13aeb54 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -27,6 +27,7 @@ import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
@@ -108,12 +109,13 @@ public class GridMessageListenHandler implements GridContinuousHandler {
}
/** {@inheritDoc} */
- @Override public void updateCounters(Map<Integer, Long> cntrs) {
+ @Override public void updateCounters(AffinityTopologyVersion topVer, Map<Integer, Long> cntrs) {
// No-op.
}
/** {@inheritDoc} */
- @Override public RegisterStatus register(UUID nodeId, UUID routineId, final GridKernalContext ctx) throws IgniteCheckedException {
+ @Override public RegisterStatus register(UUID nodeId, UUID routineId, final GridKernalContext ctx)
+ throws IgniteCheckedException {
ctx.io().addUserMessageListener(topic, pred);
return RegisterStatus.REGISTERED;
http://git-wip-us.apache.org/repos/asf/ignite/blob/252ba877/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java
index ab143fb..21fd842 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java
@@ -18,14 +18,16 @@
package org.apache.ignite.internal.managers.discovery;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
/**
* Listener interface.
*/
public interface CustomEventListener<T extends DiscoveryCustomMessage> {
/**
+ * @param topVer Current topology version.
* @param snd Sender.
* @param msg Message.
*/
- public void onCustomEvent(ClusterNode snd, T msg);
+ public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, T msg);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/252ba877/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 29e85dd..42f9b6d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -507,14 +507,18 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
verChanged = true;
}
+ }
+
+ nextTopVer = new AffinityTopologyVersion(topVer, minorTopVer);
+ if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
for (Class cls = customMsg.getClass(); cls != null; cls = cls.getSuperclass()) {
List<CustomEventListener<DiscoveryCustomMessage>> list = customEvtLsnrs.get(cls);
if (list != null) {
for (CustomEventListener<DiscoveryCustomMessage> lsnr : list) {
try {
- lsnr.onCustomEvent(node, customMsg);
+ lsnr.onCustomEvent(nextTopVer, node, customMsg);
}
catch (Exception e) {
U.error(log, "Failed to notify direct custom event listener: " + customMsg, e);
@@ -524,8 +528,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
}
- nextTopVer = new AffinityTopologyVersion(topVer, minorTopVer);
-
// Put topology snapshot into discovery history.
// There is no race possible between history maintenance and concurrent discovery
// event notifications, since SPI notifies manager about all events from this listener.
http://git-wip-us.apache.org/repos/asf/ignite/blob/252ba877/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index ad86d65..fa54a6b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -148,6 +148,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
/** */
private Map<Integer, Long> initUpdCntrs;
+ /** */
+ private AffinityTopologyVersion initTopVer;
+
/**
* Required by {@link Externalizable}.
*/
@@ -170,6 +173,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
* @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache.
* @param taskHash Task name hash code.
* @param locCache {@code True} if local cache.
+ * @param keepBinary Keep binary flag.
*/
public CacheContinuousQueryHandler(
String cacheName,
@@ -238,7 +242,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
}
/** {@inheritDoc} */
- @Override public void updateCounters(Map<Integer, Long> cntrs) {
+ @Override public void updateCounters(AffinityTopologyVersion topVer, Map<Integer, Long> cntrs) {
+ this.initTopVer = topVer;
this.initUpdCntrs = cntrs;
}
@@ -382,9 +387,12 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
}
else {
if (!internal) {
- entry.markBackup();
+ // Skip init query and expire entries.
+ if (entry.updateCounter() != -1L) {
+ entry.markBackup();
- backupQueue.add(entry);
+ backupQueue.add(entry);
+ }
}
}
}
@@ -483,7 +491,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
e = buf.skipEntry(e);
- if (e != null)
+ if (e != null && !ctx.localNodeId().equals(nodeId))
ctx.continuous().addNotification(nodeId, routineId, e, topic, sync, true);
}
catch (ClusterTopologyCheckedException ex) {
@@ -642,15 +650,14 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
return F.asList(e);
}
- // Initial query entry or evicted entry.
- // This events should be fired immediately.
- if (e.updateCounter() == -1)
+ // Initial query entry or evicted entry. These events should be fired immediately.
+ if (e.updateCounter() == -1L)
return F.asList(e);
PartitionRecovery rec = rcvs.get(e.partition());
if (rec == null) {
- rec = new PartitionRecovery(ctx.log(getClass()), cacheContext(ctx).topology().topologyVersion(),
+ rec = new PartitionRecovery(ctx.log(getClass()), initTopVer,
initUpdCntrs == null ? null : initUpdCntrs.get(e.partition()));
PartitionRecovery oldRec = rcvs.putIfAbsent(e.partition(), rec);
@@ -724,6 +731,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
* @param initCntr Update counters.
*/
public PartitionRecovery(IgniteLogger log, AffinityTopologyVersion topVer, @Nullable Long initCntr) {
+ assert topVer.topologyVersion() > 0 : topVer;
+
this.log = log;
if (initCntr != null) {
@@ -745,17 +754,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
List<CacheContinuousQueryEntry> entries;
synchronized (pendingEvts) {
- // Received first event.
- if (curTop == AffinityTopologyVersion.NONE) {
- lastFiredEvt = entry.updateCounter();
-
- curTop = entry.topologyVersion();
-
- return F.asList(entry);
- }
-
if (curTop.compareTo(entry.topologyVersion()) < 0) {
- if (entry.updateCounter() == 1 && !entry.isBackup()) {
+ if (entry.updateCounter() == 1L && !entry.isBackup()) {
entries = new ArrayList<>(pendingEvts.size());
for (CacheContinuousQueryEntry evt : pendingEvts.values()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/252ba877/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
index 900835a..8cd30a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.jetbrains.annotations.Nullable;
/**
@@ -154,6 +155,7 @@ public interface GridContinuousHandler extends Externalizable, Cloneable {
/**
* @param cntrs Init state for partition counters.
+ * @param topVer Topology version.
*/
- public void updateCounters(Map<Integer, Long> cntrs);
+ public void updateCounters(AffinityTopologyVersion topVer, Map<Integer, Long> cntrs);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/252ba877/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index cb028f3..7c7e3e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -52,6 +52,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
import org.apache.ignite.internal.managers.discovery.CustomEventListener;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler;
@@ -191,7 +192,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
ctx.discovery().setCustomEventListener(StartRoutineDiscoveryMessage.class,
new CustomEventListener<StartRoutineDiscoveryMessage>() {
- @Override public void onCustomEvent(ClusterNode snd,
+ @Override public void onCustomEvent(AffinityTopologyVersion topVer,
+ ClusterNode snd,
StartRoutineDiscoveryMessage msg) {
if (!snd.id().equals(ctx.localNodeId()) && !ctx.isStopping())
processStartRequest(snd, msg);
@@ -200,7 +202,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
ctx.discovery().setCustomEventListener(StartRoutineAckDiscoveryMessage.class,
new CustomEventListener<StartRoutineAckDiscoveryMessage>() {
- @Override public void onCustomEvent(ClusterNode snd,
+ @Override public void onCustomEvent(AffinityTopologyVersion topVer,
+ ClusterNode snd,
StartRoutineAckDiscoveryMessage msg) {
StartFuture fut = startFuts.remove(msg.routineId());
@@ -228,7 +231,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
}
- routine.handler().updateCounters(msg.updateCounters());
+ routine.handler().updateCounters(topVer, msg.updateCounters());
}
fut.onRemoteRegistered();
@@ -246,7 +249,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
ctx.discovery().setCustomEventListener(StopRoutineDiscoveryMessage.class,
new CustomEventListener<StopRoutineDiscoveryMessage>() {
- @Override public void onCustomEvent(ClusterNode snd,
+ @Override public void onCustomEvent(AffinityTopologyVersion topVer,
+ ClusterNode snd,
StopRoutineDiscoveryMessage msg) {
if (!snd.id().equals(ctx.localNodeId())) {
UUID routineId = msg.routineId();
@@ -265,7 +269,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
ctx.discovery().setCustomEventListener(StopRoutineAckDiscoveryMessage.class,
new CustomEventListener<StopRoutineAckDiscoveryMessage>() {
- @Override public void onCustomEvent(ClusterNode snd,
+ @Override public void onCustomEvent(AffinityTopologyVersion topVer,
+ ClusterNode snd,
StopRoutineAckDiscoveryMessage msg) {
StopFuture fut = stopFuts.remove(msg.routineId());
http://git-wip-us.apache.org/repos/asf/ignite/blob/252ba877/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index 283da80..5de3d0f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -41,6 +41,9 @@ import javax.cache.CacheException;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.expiry.TouchedExpiryPolicy;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
@@ -90,11 +93,13 @@ import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/**
*
@@ -1278,6 +1283,81 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
/**
* @throws Exception If failed.
*/
+ public void testBackupQueueEvict() throws Exception {
+ startGridsMultiThreaded(2);
+
+ client = true;
+
+ Ignite qryClient = startGrid(2);
+
+ CacheEventListener1 lsnr = new CacheEventListener1(false);
+
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ qry.setLocalListener(lsnr);
+
+ QueryCursor<?> cur = qryClient.cache(null).query(qry);
+
+ final Collection<Object> backupQueue = backupQueue(ignite(0));
+
+ assertEquals(0, backupQueue.size());
+
+ long ttl = 100;
+
+ final ExpiryPolicy expiry = new TouchedExpiryPolicy(new Duration(MILLISECONDS, ttl));
+
+ final IgniteCache<Object, Object> cache0 = ignite(2).cache(null).withExpiryPolicy(expiry);
+
+ final List<Integer> keys = primaryKeys(ignite(1).cache(null), BACKUP_ACK_THRESHOLD);
+
+ CountDownLatch latch = new CountDownLatch(keys.size());
+
+ lsnr.latch = latch;
+
+ for (Integer key : keys) {
+ log.info("Put: " + key);
+
+ cache0.put(key, key);
+ }
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return backupQueue.isEmpty();
+ }
+ }, 2000);
+
+ assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.size() < BACKUP_ACK_THRESHOLD);
+
+ boolean wait = waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return cache0.localPeek(keys.get(0)) == null;
+ }
+ }, ttl + 1000);
+
+ assertTrue("Entry evicted.", wait);
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return backupQueue.isEmpty();
+ }
+ }, 2000);
+
+ assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.size() < BACKUP_ACK_THRESHOLD);
+
+ if (backupQueue.size() != 0) {
+ for (Object o : backupQueue) {
+ CacheContinuousQueryEntry e = (CacheContinuousQueryEntry)o;
+
+ assertNotSame("Evicted entry added to backup queue.", -1L, e.updateCounter());
+ }
+ }
+
+ cur.close();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testBackupQueueCleanupServerQuery() throws Exception {
Ignite qryClient = startGridsMultiThreaded(2);
http://git-wip-us.apache.org/repos/asf/ignite/blob/252ba877/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java
new file mode 100644
index 0000000..30613a4
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
+import javax.cache.event.CacheEntryCreatedListener;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryExpiredListener;
+import javax.cache.event.CacheEntryRemovedListener;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static javax.cache.configuration.FactoryBuilder.factoryOf;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+
+/**
+ * Test from https://issues.apache.org/jira/browse/IGNITE-2384.
+ */
+public class CacheContinuousQueryLostPartitionTest extends GridCommonAbstractTest {
+ /** */
+ static public TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** Cache name. */
+ public static final String CACHE_NAME = "test_cache";
+
+ /** Cache name. */
+ public static final String TX_CACHE_NAME = "tx_test_cache";
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ startGridsMultiThreaded(2);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxEvent() throws Exception {
+ testEvent(TX_CACHE_NAME, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicEvent() throws Exception {
+ testEvent(CACHE_NAME, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxClientEvent() throws Exception {
+ testEvent(TX_CACHE_NAME, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicClientEvent() throws Exception {
+ testEvent(CACHE_NAME, true);
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @throws Exception If failed.
+ */
+ public void testEvent(String cacheName, boolean client) throws Exception {
+ IgniteCache<Integer, String> cache1 = grid(0).getOrCreateCache(cacheName);
+
+ final AllEventListener<Integer, String> lsnr1 = registerCacheListener(cache1);
+
+ IgniteCache<Integer, String> cache2 = grid(1).getOrCreateCache(cacheName);
+
+ Integer key = primaryKey(cache1);
+
+ cache1.put(key, "1");
+
+ // Note the issue is only reproducible if the second registration is done right
+ // here, after the first put() above.
+ AllEventListener<Integer, String> lsnr20;
+
+ if (client) {
+ IgniteCache<Integer, String> clnCache = startGrid(3).getOrCreateCache(cacheName);
+
+ lsnr20 = registerCacheListener(clnCache);
+ }
+ else
+ lsnr20 = registerCacheListener(cache2);
+
+ final AllEventListener<Integer, String> lsnr2 = lsnr20;
+
+ assert GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return lsnr1.createdCnt.get() == 1;
+ }
+ }, 2000L) : "Unexpected number of events: " + lsnr1.createdCnt.get();
+
+ // Sanity check.
+ assert GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return lsnr2.createdCnt.get() == 0;
+ }
+ }, 2000L) : "Expected no create events, but got: " + lsnr2.createdCnt.get();
+
+ // node2 now becomes the primary for the key.
+ grid(0).close();
+
+ cache2.put(key, "2");
+
+ // Sanity check.
+ assert GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return lsnr1.createdCnt.get() == 1;
+ }
+ }, 2000L) : "Expected no change here, but got: " + lsnr1.createdCnt.get();
+
+ // Sanity check.
+ assert GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return lsnr2.updatedCnt.get() == 0;
+ }
+ }, 2000L) : "Expected no update events, but got: " + lsnr2.updatedCnt.get();
+
+ System.out.println(">>>>> " + lsnr2.createdCnt.get());
+
+ // This assertion fails: 0 events get delivered.
+ assert GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return lsnr2.createdCnt.get() == 1;
+ }
+ }, 2000L) : "Expected a single event due to '2', but got: " + lsnr2.createdCnt.get();
+ }
+
+ /**
+ * @param cache Cache.
+ * @return Event listener.
+ */
+ private AllEventListener<Integer, String> registerCacheListener(IgniteCache<Integer, String> cache) {
+ AllEventListener<Integer, String> lsnr = new AllEventListener<>();
+
+ cache.registerCacheEntryListener(
+ new MutableCacheEntryListenerConfiguration<>(factoryOf(lsnr), null, true, false));
+
+ return lsnr;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(name);
+
+ TcpDiscoverySpi spi = new TcpDiscoverySpi();
+
+ spi.setIpFinder(ipFinder);
+
+ cfg.setDiscoverySpi(spi);
+ cfg.setCacheConfiguration(cache(TX_CACHE_NAME), cache(CACHE_NAME));
+
+ if (name.endsWith("3"))
+ cfg.setClientMode(true);
+
+ return cfg;
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @return Cache configuration.
+ */
+ protected CacheConfiguration<Integer, String> cache(String cacheName) {
+ CacheConfiguration<Integer, String> cfg = new CacheConfiguration<>(cacheName);
+
+ cfg.setCacheMode(PARTITIONED);
+
+ if (cacheName.equals(CACHE_NAME))
+ cfg.setAtomicityMode(ATOMIC);
+ else
+ cfg.setAtomicityMode(TRANSACTIONAL);
+
+ cfg.setRebalanceMode(SYNC);
+ cfg.setWriteSynchronizationMode(PRIMARY_SYNC);
+ cfg.setBackups(0);
+
+ return cfg;
+ }
+
+ /**
+ * Event listener.
+ */
+ public static class AllEventListener<K, V> implements CacheEntryCreatedListener<K, V>,
+ CacheEntryUpdatedListener<K, V>, CacheEntryRemovedListener<K, V>, CacheEntryExpiredListener<K, V>,
+ Serializable {
+ /** */
+ final AtomicInteger createdCnt = new AtomicInteger();
+
+ /** */
+ final AtomicInteger updatedCnt = new AtomicInteger();
+
+ /** {@inheritDoc} */
+ @Override public void onCreated(Iterable<CacheEntryEvent<? extends K, ? extends V>> evts) {
+ createdCnt.incrementAndGet();
+
+ System.out.printf("onCreate: %s. \n", evts);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onExpired(Iterable<CacheEntryEvent<? extends K, ? extends V>> evts) {
+ System.out.printf("onExpired: %s. \n", evts);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onRemoved(Iterable<CacheEntryEvent<? extends K, ? extends V>> evts) {
+ System.out.printf("onRemoved: %s. \n", evts);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends K, ? extends V>> evts) {
+ updatedCnt.incrementAndGet();
+
+ System.out.printf("onUpdated: %s.", evts);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/252ba877/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
index eddfcf4..3bab1f9 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
@@ -75,6 +75,7 @@ import org.apache.ignite.internal.processors.cache.distributed.replicated.Ignite
import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalAtomicQuerySelfTest;
import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalFieldsQuerySelfTest;
import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalQuerySelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryLostPartitionTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicReplicatedSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxReplicatedSelfTest;
@@ -200,6 +201,7 @@ public class IgniteBinaryCacheQueryTestSuite extends TestSuite {
suite.addTestSuite(CacheContinuousQueryFailoverAtomicReplicatedSelfTest.class);
suite.addTestSuite(CacheContinuousQueryFailoverTxSelfTest.class);
suite.addTestSuite(CacheContinuousQueryFailoverTxReplicatedSelfTest.class);
+ suite.addTestSuite(CacheContinuousQueryLostPartitionTest.class);
// Reduce fields queries.
suite.addTestSuite(GridCacheReduceFieldsQueryLocalSelfTest.class);