You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/01/18 08:10:20 UTC

[4/4] ignite git commit: IGNITE-2384

IGNITE-2384


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/43e7ae12
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/43e7ae12
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/43e7ae12

Branch: refs/heads/ignite-2384
Commit: 43e7ae12fbf97d4107be04418733c65745d16d87
Parents: e135ebc
Author: sboikov <sb...@gridgain.com>
Authored: Mon Jan 18 09:27:28 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Jan 18 10:06:12 2016 +0300

----------------------------------------------------------------------
 .../managers/discovery/CustomEventListener.java |   4 +-
 .../discovery/GridDiscoveryManager.java         |   8 +-
 .../continuous/CacheContinuousQueryHandler.java |  12 +-
 .../continuous/GridContinuousProcessor.java     |  13 +-
 .../CacheContinuousIssueSelfTest.java           | 224 -------------------
 .../CacheContinuousIssueTxSelfTest.java         |  34 ---
 .../CacheContinuousQueryLostPartitionTest.java  | 203 +++++++++++++++++
 ...CacheContinuousQueryLostPartitionTxTest.java |  36 +++
 .../IgniteBinaryCacheQueryTestSuite.java        |   8 +-
 9 files changed, 266 insertions(+), 276 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/43e7ae12/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java
index ab143fb..21fd842 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java
@@ -18,14 +18,16 @@
 package org.apache.ignite.internal.managers.discovery;
 
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 
 /**
  * Listener interface.
  */
 public interface CustomEventListener<T extends DiscoveryCustomMessage> {
     /**
+     * @param topVer Current topology version.
      * @param snd Sender.
      * @param msg Message.
      */
-    public void onCustomEvent(ClusterNode snd, T msg);
+    public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, T msg);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/43e7ae12/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 29e85dd..42f9b6d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -507,14 +507,18 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
                         verChanged = true;
                     }
+                }
+
+                nextTopVer = new AffinityTopologyVersion(topVer, minorTopVer);
 
+                if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
                     for (Class cls = customMsg.getClass(); cls != null; cls = cls.getSuperclass()) {
                         List<CustomEventListener<DiscoveryCustomMessage>> list = customEvtLsnrs.get(cls);
 
                         if (list != null) {
                             for (CustomEventListener<DiscoveryCustomMessage> lsnr : list) {
                                 try {
-                                    lsnr.onCustomEvent(node, customMsg);
+                                    lsnr.onCustomEvent(nextTopVer, node, customMsg);
                                 }
                                 catch (Exception e) {
                                     U.error(log, "Failed to notify direct custom event listener: " + customMsg, e);
@@ -524,8 +528,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     }
                 }
 
-                nextTopVer = new AffinityTopologyVersion(topVer, minorTopVer);
-
                 // Put topology snapshot into discovery history.
                 // There is no race possible between history maintenance and concurrent discovery
                 // event notifications, since SPI notifies manager about all events from this listener.

http://git-wip-us.apache.org/repos/asf/ignite/blob/43e7ae12/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 1753b26..97e4a99 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -173,6 +173,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
      * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache.
      * @param taskHash Task name hash code.
      * @param locCache {@code True} if local cache.
+     * @param keepBinary Keep binary flag.
      */
     public CacheContinuousQueryHandler(
         String cacheName,
@@ -242,8 +243,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
     /** {@inheritDoc} */
     @Override public void updateCounters(AffinityTopologyVersion topVer, Map<Integer, Long> cntrs) {
-        this.initUpdCntrs = cntrs;
         this.initTopVer = topVer;
+        this.initUpdCntrs = cntrs;
     }
 
     /** {@inheritDoc} */
@@ -389,7 +390,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                             entry.markBackup();
 
                             // Skip init query and expire entries.
-                            if (entry.updateCounter() != -1)
+                            if (entry.updateCounter() != -1L)
                                 backupQueue.add(entry);
                         }
                     }
@@ -648,9 +649,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 return F.asList(e);
         }
 
-        // Initial query entry or evicted entry.
-        // This events should be fired immediately.
-        if (e.updateCounter() == -1)
+        // Initial query entry or evicted entry. These events should be fired immediately.
+        if (e.updateCounter() == -1L)
             return F.asList(e);
 
         PartitionRecovery rec = rcvs.get(e.partition());
@@ -761,7 +761,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 }
 
                 if (curTop.compareTo(entry.topologyVersion()) < 0) {
-                    if (entry.updateCounter() == 1 && !entry.isBackup()) {
+                    if (entry.updateCounter() == 1L && !entry.isBackup()) {
                         entries = new ArrayList<>(pendingEvts.size());
 
                         for (CacheContinuousQueryEntry evt : pendingEvts.values()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/43e7ae12/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index fb8855d..7c7e3e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -52,6 +52,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
 import org.apache.ignite.internal.managers.discovery.CustomEventListener;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler;
@@ -191,7 +192,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
         ctx.discovery().setCustomEventListener(StartRoutineDiscoveryMessage.class,
             new CustomEventListener<StartRoutineDiscoveryMessage>() {
-                @Override public void onCustomEvent(ClusterNode snd,
+                @Override public void onCustomEvent(AffinityTopologyVersion topVer,
+                    ClusterNode snd,
                     StartRoutineDiscoveryMessage msg) {
                     if (!snd.id().equals(ctx.localNodeId()) && !ctx.isStopping())
                         processStartRequest(snd, msg);
@@ -200,7 +202,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
         ctx.discovery().setCustomEventListener(StartRoutineAckDiscoveryMessage.class,
             new CustomEventListener<StartRoutineAckDiscoveryMessage>() {
-                @Override public void onCustomEvent(ClusterNode snd,
+                @Override public void onCustomEvent(AffinityTopologyVersion topVer,
+                    ClusterNode snd,
                     StartRoutineAckDiscoveryMessage msg) {
                     StartFuture fut = startFuts.remove(msg.routineId());
 
@@ -246,7 +249,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
         ctx.discovery().setCustomEventListener(StopRoutineDiscoveryMessage.class,
             new CustomEventListener<StopRoutineDiscoveryMessage>() {
-                @Override public void onCustomEvent(ClusterNode snd,
+                @Override public void onCustomEvent(AffinityTopologyVersion topVer,
+                    ClusterNode snd,
                     StopRoutineDiscoveryMessage msg) {
                     if (!snd.id().equals(ctx.localNodeId())) {
                         UUID routineId = msg.routineId();
@@ -265,7 +269,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
         ctx.discovery().setCustomEventListener(StopRoutineAckDiscoveryMessage.class,
             new CustomEventListener<StopRoutineAckDiscoveryMessage>() {
-                @Override public void onCustomEvent(ClusterNode snd,
+                @Override public void onCustomEvent(AffinityTopologyVersion topVer,
+                    ClusterNode snd,
                     StopRoutineAckDiscoveryMessage msg) {
                     StopFuture fut = stopFuts.remove(msg.routineId());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/43e7ae12/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousIssueSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousIssueSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousIssueSelfTest.java
deleted file mode 100644
index e1000e4..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousIssueSelfTest.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.query.continuous;
-
-import java.io.Serializable;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
-import javax.cache.event.CacheEntryCreatedListener;
-import javax.cache.event.CacheEntryEvent;
-import javax.cache.event.CacheEntryExpiredListener;
-import javax.cache.event.CacheEntryRemovedListener;
-import javax.cache.event.CacheEntryUpdatedListener;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cache.CacheRebalanceMode;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.cache.affinity.Affinity;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.util.typedef.PA;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-import static javax.cache.configuration.FactoryBuilder.factoryOf;
-
-/**
- * Test from https://issues.apache.org/jira/browse/IGNITE-2384.
- */
-public class CacheContinuousIssueSelfTest extends GridCommonAbstractTest {
-    /** */
-    static public TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
-
-    /** Cache name. */
-    public static final String CACHE_NAME = "test_cache";
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        super.beforeTest();
-
-        startGridsMultiThreaded(2);
-
-        awaitPartitionMapExchange();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        stopAllGrids();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testEvent() throws Exception {
-        IgniteCache<Integer, String> cache1 = grid(0).getOrCreateCache(CACHE_NAME);
-
-        final AllEventListener<Integer, String> lsnr1 = registerCacheListener(cache1);
-
-        IgniteCache<Integer, String> cache2 = grid(1).getOrCreateCache(CACHE_NAME);
-
-        int key = affinityKeyForNode(1, grid(0));
-        cache1.put(key, "vodka");
-
-        // Note the issue is only reproducible if the second registration is done right
-        // here, after the first put() above.
-        final AllEventListener<Integer, String> lsnr2 = registerCacheListener(cache2);
-
-        assert GridTestUtils.waitForCondition(new PA() {
-            @Override public boolean apply() {
-                return lsnr1.createdCount.get() == 1;
-            }
-        }, 2000L) : "Unexpected number of events: " + lsnr1.createdCount.get();
-
-        // Sanity check.
-        assert GridTestUtils.waitForCondition(new PA() {
-            @Override public boolean apply() {
-                return lsnr2.createdCount.get() == 0;
-            }
-        }, 2000L) : "Expected no create events, but got: " + lsnr2.createdCount.get();
-
-        // node2 now becomes the primary for the key.
-        grid(0).close();
-
-        awaitPartitionMapExchange();
-
-        cache2.put(key, "peevo");
-
-        // Sanity check.
-        assert GridTestUtils.waitForCondition(new PA() {
-            @Override public boolean apply() {
-                return lsnr1.createdCount.get() == 1;
-            }
-        }, 2000L) : "Expected no change here, but got: " + lsnr1.createdCount.get();
-
-        // Sanity check.
-        assert GridTestUtils.waitForCondition(new PA() {
-            @Override public boolean apply() {
-                return lsnr2.updatedCount.get() == 0;
-            }
-        }, 2000L) : "Expected no update events, but got: " + lsnr2.updatedCount.get();
-
-        System.out.println(">>>>> " + lsnr2.createdCount.get());
-
-        // This assertion fails: 0 events get delivered.
-        assert GridTestUtils.waitForCondition(new PA() {
-            @Override public boolean apply() {
-                return lsnr2.createdCount.get() == 1;
-            }
-        }, 2000L) : "Expected a single event due to 'peevo', but got: " + lsnr2.createdCount.get();
-    }
-
-    /**
-     * @param cache Cache.
-     * @return Event listener.
-     */
-    private AllEventListener<Integer, String> registerCacheListener(
-        IgniteCache<Integer, String> cache) {
-        AllEventListener<Integer, String> lsnr = new AllEventListener<>();
-        cache.registerCacheEntryListener(
-            new MutableCacheEntryListenerConfiguration<>(factoryOf(lsnr), null, true, false));
-        return lsnr;
-    }
-
-    /**
-     * @param startValue Start value.
-     * @param node Ignite node.
-     * @return Primary key.
-     */
-    private int affinityKeyForNode(int startValue, Ignite node) {
-        Affinity<Integer> affinity = node.affinity(CACHE_NAME);
-
-        ClusterNode localNode = node.cluster().localNode();
-
-        int key;
-
-        for (key = startValue + 1; !affinity.isPrimary(localNode, key); key++);
-
-        return key;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration() throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration();
-
-        TcpDiscoverySpi spi = new TcpDiscoverySpi();
-
-        spi.setIpFinder(ipFinder);
-
-        cfg.setDiscoverySpi(spi);
-        cfg.setCacheConfiguration(cache());
-
-        return cfg;
-    }
-
-    /**
-     * @return Cache configuration.
-     */
-    protected CacheConfiguration<Integer, String> cache() {
-        CacheConfiguration<Integer, String> cfg = new CacheConfiguration<>(CACHE_NAME);
-
-        cfg.setCacheMode(CacheMode.PARTITIONED);
-        cfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
-        cfg.setRebalanceMode(CacheRebalanceMode.SYNC);
-        cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.PRIMARY_SYNC);
-        cfg.setStartSize(1024);
-
-        return cfg;
-    }
-
-    /**
-     * Event listener.
-     */
-    public static class AllEventListener<K, V> implements CacheEntryCreatedListener<K, V>,
-        CacheEntryUpdatedListener<K, V>, CacheEntryRemovedListener<K, V>, CacheEntryExpiredListener<K, V>,
-        Serializable {
-        /** */
-        final AtomicInteger createdCount = new AtomicInteger();
-
-        /** */
-        final AtomicInteger updatedCount = new AtomicInteger();
-
-        /** {@inheritDoc} */
-        @Override public void onCreated(Iterable<CacheEntryEvent<? extends K, ? extends V>> evts) {
-            createdCount.incrementAndGet();
-            System.out.printf("onCreate: %s. \n", evts);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onExpired(Iterable<CacheEntryEvent<? extends K, ? extends V>> evts) {
-            System.out.printf("onExpired: %s. \n", evts);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onRemoved(Iterable<CacheEntryEvent<? extends K, ? extends V>> evts) {
-            System.out.printf("onRemoved: %s. \n", evts);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends K, ? extends V>> evts) {
-            updatedCount.incrementAndGet();
-            System.out.printf("onUpdated: %s.", evts);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/43e7ae12/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousIssueTxSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousIssueTxSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousIssueTxSelfTest.java
deleted file mode 100644
index 0323b23..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousIssueTxSelfTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.query.continuous;
-
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.configuration.CacheConfiguration;
-
-/**
- *
- */
-public class CacheContinuousIssueTxSelfTest extends CacheContinuousIssueSelfTest {
-    @Override protected CacheConfiguration<Integer, String> cache() {
-        CacheConfiguration<Integer, String> ccfg = super.cache();
-
-        ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
-
-        return ccfg;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/43e7ae12/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java
new file mode 100644
index 0000000..632a7a3
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
+import javax.cache.event.CacheEntryCreatedListener;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryExpiredListener;
+import javax.cache.event.CacheEntryRemovedListener;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static javax.cache.configuration.FactoryBuilder.factoryOf;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+
+/**
+ * Test from https://issues.apache.org/jira/browse/IGNITE-2384.
+ */
+public class CacheContinuousQueryLostPartitionTest extends GridCommonAbstractTest {
+    /** */
+    static public TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** Cache name. */
+    public static final String CACHE_NAME = "test_cache";
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        startGridsMultiThreaded(2);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testEvent() throws Exception {
+        IgniteCache<Integer, String> cache1 = grid(0).getOrCreateCache(CACHE_NAME);
+
+        final AllEventListener<Integer, String> lsnr1 = registerCacheListener(cache1);
+
+        IgniteCache<Integer, String> cache2 = grid(1).getOrCreateCache(CACHE_NAME);
+
+        Integer key = primaryKey(cache1);
+
+        cache1.put(key, "1");
+
+        // Note the issue is only reproducible if the second registration is done right
+        // here, after the first put() above.
+        final AllEventListener<Integer, String> lsnr2 = registerCacheListener(cache2);
+
+        assert GridTestUtils.waitForCondition(new PA() {
+            @Override public boolean apply() {
+                return lsnr1.createdCnt.get() == 1;
+            }
+        }, 2000L) : "Unexpected number of events: " + lsnr1.createdCnt.get();
+
+        // Sanity check.
+        assert GridTestUtils.waitForCondition(new PA() {
+            @Override public boolean apply() {
+                return lsnr2.createdCnt.get() == 0;
+            }
+        }, 2000L) : "Expected no create events, but got: " + lsnr2.createdCnt.get();
+
+        // node2 now becomes the primary for the key.
+        grid(0).close();
+
+        cache2.put(key, "2");
+
+        // Sanity check.
+        assert GridTestUtils.waitForCondition(new PA() {
+            @Override public boolean apply() {
+                return lsnr1.createdCnt.get() == 1;
+            }
+        }, 2000L) : "Expected no change here, but got: " + lsnr1.createdCnt.get();
+
+        // Sanity check.
+        assert GridTestUtils.waitForCondition(new PA() {
+            @Override public boolean apply() {
+                return lsnr2.updatedCnt.get() == 0;
+            }
+        }, 2000L) : "Expected no update events, but got: " + lsnr2.updatedCnt.get();
+
+        System.out.println(">>>>> " + lsnr2.createdCnt.get());
+
+        // This assertion fails: 0 events get delivered.
+        assert GridTestUtils.waitForCondition(new PA() {
+            @Override public boolean apply() {
+                return lsnr2.createdCnt.get() == 1;
+            }
+        }, 2000L) : "Expected a single event due to '2', but got: " + lsnr2.createdCnt.get();
+    }
+
+    /**
+     * @param cache Cache.
+     * @return Event listener.
+     */
+    private AllEventListener<Integer, String> registerCacheListener(
+        IgniteCache<Integer, String> cache) {
+        AllEventListener<Integer, String> lsnr = new AllEventListener<>();
+        cache.registerCacheEntryListener(
+            new MutableCacheEntryListenerConfiguration<>(factoryOf(lsnr), null, true, false));
+        return lsnr;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration() throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration();
+
+        TcpDiscoverySpi spi = new TcpDiscoverySpi();
+
+        spi.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(spi);
+        cfg.setCacheConfiguration(cache());
+
+        return cfg;
+    }
+
+    /**
+     * @return Cache configuration.
+     */
+    protected CacheConfiguration<Integer, String> cache() {
+        CacheConfiguration<Integer, String> cfg = new CacheConfiguration<>(CACHE_NAME);
+
+        cfg.setCacheMode(PARTITIONED);
+        cfg.setAtomicityMode(ATOMIC);
+        cfg.setRebalanceMode(SYNC);
+        cfg.setWriteSynchronizationMode(PRIMARY_SYNC);
+        cfg.setBackups(0);
+
+        return cfg;
+    }
+
+    /**
+     * Event listener.
+     */
+    public static class AllEventListener<K, V> implements CacheEntryCreatedListener<K, V>,
+        CacheEntryUpdatedListener<K, V>, CacheEntryRemovedListener<K, V>, CacheEntryExpiredListener<K, V>,
+        Serializable {
+        /** */
+        final AtomicInteger createdCnt = new AtomicInteger();
+
+        /** */
+        final AtomicInteger updatedCnt = new AtomicInteger();
+
+        /** {@inheritDoc} */
+        @Override public void onCreated(Iterable<CacheEntryEvent<? extends K, ? extends V>> evts) {
+            createdCnt.incrementAndGet();
+
+            System.out.printf("onCreate: %s. \n", evts);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onExpired(Iterable<CacheEntryEvent<? extends K, ? extends V>> evts) {
+            System.out.printf("onExpired: %s. \n", evts);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onRemoved(Iterable<CacheEntryEvent<? extends K, ? extends V>> evts) {
+            System.out.printf("onRemoved: %s. \n", evts);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends K, ? extends V>> evts) {
+            updatedCnt.incrementAndGet();
+
+            System.out.printf("onUpdated: %s.", evts);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/43e7ae12/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTxTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTxTest.java
new file mode 100644
index 0000000..bd72dc2
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTxTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.configuration.CacheConfiguration;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+
+/**
+ *
+ */
+public class CacheContinuousQueryLostPartitionTxTest extends CacheContinuousQueryLostPartitionTest {
+    /** {@inheritDoc} */
+    @Override protected CacheConfiguration<Integer, String> cache() {
+        CacheConfiguration<Integer, String> ccfg = super.cache();
+
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+
+        return ccfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/43e7ae12/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
index 2fd8dd5..d101493 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
@@ -75,8 +75,8 @@ import org.apache.ignite.internal.processors.cache.distributed.replicated.Ignite
 import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalAtomicQuerySelfTest;
 import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalFieldsQuerySelfTest;
 import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalQuerySelfTest;
-import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousIssueSelfTest;
-import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousIssueTxSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryLostPartitionTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryLostPartitionTxTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicReplicatedSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxReplicatedSelfTest;
@@ -202,8 +202,8 @@ public class IgniteBinaryCacheQueryTestSuite extends TestSuite {
         suite.addTestSuite(CacheContinuousQueryFailoverAtomicReplicatedSelfTest.class);
         suite.addTestSuite(CacheContinuousQueryFailoverTxSelfTest.class);
         suite.addTestSuite(CacheContinuousQueryFailoverTxReplicatedSelfTest.class);
-        suite.addTestSuite(CacheContinuousIssueSelfTest.class);
-        suite.addTestSuite(CacheContinuousIssueTxSelfTest.class);
+        suite.addTestSuite(CacheContinuousQueryLostPartitionTest.class);
+        suite.addTestSuite(CacheContinuousQueryLostPartitionTxTest.class);
 
         // Reduce fields queries.
         suite.addTestSuite(GridCacheReduceFieldsQueryLocalSelfTest.class);