You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2016/02/04 01:48:27 UTC

[1/5] ignite git commit: IGNITE-2454 Fixed "Continuous query notification missed if there is only one node".

Repository: ignite
Updated Branches:
  refs/heads/ignite-2450 7e936135b -> 01b02ab12


IGNITE-2454 Fixed "Continuous query notification missed if there is only one node".


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/60c8b072
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/60c8b072
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/60c8b072

Branch: refs/heads/ignite-2450
Commit: 60c8b072dc08d92e45da2404be09031af165ae48
Parents: e6acce6
Author: Tikhonov Nikolay <ti...@gmail.com>
Authored: Wed Feb 3 13:06:14 2016 +0300
Committer: Tikhonov Nikolay <ti...@gmail.com>
Committed: Wed Feb 3 13:06:14 2016 +0300

----------------------------------------------------------------------
 .../continuous/CacheContinuousQueryHandler.java |   9 +
 ...ntinuousQueryPartitionAtomicOneNodeTest.java |  37 ++++
 ...heContinuousQueryPartitionTxOneNodeTest.java |  37 ++++
 ...tinuousQueryReplicatedAtomicOneNodeTest.java |  31 +++
 ...ontinuousQueryReplicatedOneNodeSelfTest.java | 120 ------------
 ...eContinuousQueryReplicatedTxOneNodeTest.java | 193 +++++++++++++++++++
 .../IgniteCacheQuerySelfTestSuite.java          |  10 +-
 7 files changed, 315 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/60c8b072/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index fa54a6b..7e66ad3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -754,6 +754,15 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
             List<CacheContinuousQueryEntry> entries;
 
             synchronized (pendingEvts) {
+                // Received first event.
+                if (curTop == AffinityTopologyVersion.NONE) {
+                    lastFiredEvt = entry.updateCounter();
+
+                    curTop = entry.topologyVersion();
+
+                    return F.asList(entry);
+                }
+
                 if (curTop.compareTo(entry.topologyVersion()) < 0) {
                     if (entry.updateCounter() == 1L && !entry.isBackup()) {
                         entries = new ArrayList<>(pendingEvts.size());

http://git-wip-us.apache.org/repos/asf/ignite/blob/60c8b072/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryPartitionAtomicOneNodeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryPartitionAtomicOneNodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryPartitionAtomicOneNodeTest.java
new file mode 100644
index 0000000..797882b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryPartitionAtomicOneNodeTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ *
+ */
+public class GridCacheContinuousQueryPartitionAtomicOneNodeTest
+    extends GridCacheContinuousQueryReplicatedTxOneNodeTest {
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicMode() {
+        return CacheAtomicityMode.ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.PARTITIONED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/60c8b072/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryPartitionTxOneNodeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryPartitionTxOneNodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryPartitionTxOneNodeTest.java
new file mode 100644
index 0000000..7dd7bd7
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryPartitionTxOneNodeTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ *
+ */
+public class GridCacheContinuousQueryPartitionTxOneNodeTest
+    extends GridCacheContinuousQueryReplicatedTxOneNodeTest {
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicMode() {
+        return CacheAtomicityMode.TRANSACTIONAL;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.PARTITIONED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/60c8b072/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedAtomicOneNodeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedAtomicOneNodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedAtomicOneNodeTest.java
new file mode 100644
index 0000000..066afa9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedAtomicOneNodeTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+
+/**
+ *
+ */
+public class GridCacheContinuousQueryReplicatedAtomicOneNodeTest
+    extends GridCacheContinuousQueryReplicatedTxOneNodeTest {
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicMode() {
+        return CacheAtomicityMode.ATOMIC;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/60c8b072/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedOneNodeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedOneNodeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedOneNodeSelfTest.java
deleted file mode 100644
index 8152b2a..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedOneNodeSelfTest.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.query.continuous;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.cache.event.CacheEntryEvent;
-import javax.cache.event.CacheEntryListenerException;
-import javax.cache.event.CacheEntryUpdatedListener;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cache.CacheRebalanceMode;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.cache.query.ContinuousQuery;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-/**
- * Test for replicated cache with one node.
- */
-public class GridCacheContinuousQueryReplicatedOneNodeSelfTest extends GridCommonAbstractTest {
-    /** IP finder. */
-    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        CacheConfiguration cacheCfg = defaultCacheConfiguration();
-
-        cacheCfg.setCacheMode(CacheMode.REPLICATED);
-        cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
-        cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
-
-        cfg.setCacheConfiguration(cacheCfg);
-
-        TcpDiscoverySpi disco = new TcpDiscoverySpi();
-
-        disco.setIpFinder(IP_FINDER);
-
-        cfg.setDiscoverySpi(disco);
-
-        return cfg;
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testLocal() throws Exception {
-        doTest(true);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testDistributed() throws Exception {
-        doTest(false);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    private void doTest(boolean loc) throws Exception {
-        try {
-            IgniteCache<String, Integer> cache = startGrid(0).cache(null);
-
-            ContinuousQuery<String, Integer> qry = new ContinuousQuery<>();
-
-            final AtomicInteger cnt = new AtomicInteger();
-            final CountDownLatch latch = new CountDownLatch(10);
-
-            qry.setLocalListener(new CacheEntryUpdatedListener<String, Integer>() {
-                @Override
-                public void onUpdated(Iterable<CacheEntryEvent<? extends String, ? extends Integer>> evts)
-                        throws CacheEntryListenerException {
-                    for (CacheEntryEvent<? extends String, ? extends Integer> evt : evts) {
-                        cnt.incrementAndGet();
-                        latch.countDown();
-                    }
-                }
-            });
-
-            cache.query(qry.setLocal(loc));
-
-            startGrid(1);
-
-            awaitPartitionMapExchange();
-
-            for (int i = 0; i < 10; i++)
-                cache.put("key" + i, i);
-
-            assert latch.await(5000, TimeUnit.MILLISECONDS);
-
-            assertEquals(10, cnt.get());
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/60c8b072/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedTxOneNodeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedTxOneNodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedTxOneNodeTest.java
new file mode 100644
index 0000000..271a8d9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedTxOneNodeTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Test for replicated cache with one node.
+ */
+@SuppressWarnings("Duplicates")
+public class GridCacheContinuousQueryReplicatedTxOneNodeTest extends GridCommonAbstractTest {
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setAtomicityMode(atomicMode());
+        cacheCfg.setCacheMode(cacheMode());
+        cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+        cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+
+        cfg.setCacheConfiguration(cacheCfg);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(disco);
+
+        return cfg;
+    }
+
+    /**
+     * @return Atomicity mode for a cache.
+     */
+    protected CacheAtomicityMode atomicMode() {
+        return CacheAtomicityMode.TRANSACTIONAL;
+    }
+
+    /**
+     * @return Cache mode.
+     */
+    protected CacheMode cacheMode() {
+        return CacheMode.REPLICATED;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLocal() throws Exception {
+        if (cacheMode() == CacheMode.REPLICATED)
+            doTest(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDistributed() throws Exception {
+        doTest(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLocalOneNode() throws Exception {
+        doTestOneNode(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDistributedOneNode() throws Exception {
+        doTestOneNode(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void doTest(boolean loc) throws Exception {
+        try {
+            IgniteCache<String, Integer> cache = startGrid(0).cache(null);
+
+            ContinuousQuery<String, Integer> qry = new ContinuousQuery<>();
+
+            final AtomicInteger cnt = new AtomicInteger();
+            final CountDownLatch latch = new CountDownLatch(10);
+
+            qry.setLocalListener(new CacheEntryUpdatedListener<String, Integer>() {
+                @Override
+                public void onUpdated(Iterable<CacheEntryEvent<? extends String, ? extends Integer>> evts)
+                        throws CacheEntryListenerException {
+                    for (CacheEntryEvent<? extends String, ? extends Integer> evt : evts) {
+                        cnt.incrementAndGet();
+                        latch.countDown();
+                    }
+                }
+            });
+
+            cache.query(qry.setLocal(loc));
+
+            startGrid(1);
+
+            awaitPartitionMapExchange();
+
+            for (int i = 0; i < 10; i++)
+                cache.put("key" + i, i);
+
+            assert latch.await(5000, TimeUnit.MILLISECONDS);
+
+            assertEquals(10, cnt.get());
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void doTestOneNode(boolean loc) throws Exception {
+        try {
+            IgniteCache<String, Integer> cache = startGrid(0).cache(null);
+
+            ContinuousQuery<String, Integer> qry = new ContinuousQuery<>();
+
+            final AtomicInteger cnt = new AtomicInteger();
+            final CountDownLatch latch = new CountDownLatch(10);
+
+            for (int i = 0; i < 10; i++)
+                cache.put("key" + i, i);
+
+            cache.clear();
+
+            qry.setLocalListener(new CacheEntryUpdatedListener<String, Integer>() {
+                @Override public void onUpdated(Iterable<CacheEntryEvent<? extends String, ? extends Integer>> evts)
+                        throws CacheEntryListenerException {
+                    for (CacheEntryEvent<? extends String, ? extends Integer> evt : evts) {
+                        cnt.incrementAndGet();
+                        latch.countDown();
+                    }
+                }
+            });
+
+            cache.query(qry.setLocal(loc));
+
+            for (int i = 0; i < 10; i++)
+                cache.put("key" + i, i);
+
+            assert latch.await(5000, TimeUnit.MILLISECONDS);
+
+            assertEquals(10, cnt.get());
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/60c8b072/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 4b1eafa..359cdf3 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -79,11 +79,14 @@ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheCon
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryLocalAtomicSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryLocalSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryPartitionAtomicOneNodeTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryPartitionTxOneNodeTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryPartitionedOnlySelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryPartitionedP2PDisabledSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryPartitionedSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedAtomicOneNodeTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedAtomicSelfTest;
-import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedOneNodeSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedTxOneNodeTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedP2PDisabledSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryTxSelfTest;
@@ -181,7 +184,10 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheContinuousQueryAtomicSelfTest.class);
         suite.addTestSuite(GridCacheContinuousQueryAtomicNearEnabledSelfTest.class);
         suite.addTestSuite(GridCacheContinuousQueryAtomicP2PDisabledSelfTest.class);
-        suite.addTestSuite(GridCacheContinuousQueryReplicatedOneNodeSelfTest.class);
+        suite.addTestSuite(GridCacheContinuousQueryReplicatedTxOneNodeTest.class);
+        suite.addTestSuite(GridCacheContinuousQueryReplicatedAtomicOneNodeTest.class);
+        suite.addTestSuite(GridCacheContinuousQueryPartitionTxOneNodeTest.class);
+        suite.addTestSuite(GridCacheContinuousQueryPartitionAtomicOneNodeTest.class);
         suite.addTestSuite(IgniteCacheContinuousQueryClientTest.class);
         suite.addTestSuite(IgniteCacheContinuousQueryClientReconnectTest.class);
         suite.addTestSuite(IgniteCacheContinuousQueryClientTxReconnectTest.class);


[5/5] ignite git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/ignite into ignite-2450

Posted by vk...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/ignite into ignite-2450


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/01b02ab1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/01b02ab1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/01b02ab1

Branch: refs/heads/ignite-2450
Commit: 01b02ab129416b010368ce633979854813d1b754
Parents: a3c0311 500bd3a
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Wed Feb 3 16:47:57 2016 -0800
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Wed Feb 3 16:47:57 2016 -0800

----------------------------------------------------------------------
 .../org/apache/ignite/cache/CacheEntry.java     |   9 +-
 .../processors/cache/GridCacheAdapter.java      | 100 +++++++++-
 .../continuous/CacheContinuousQueryHandler.java |   9 +
 ...ntinuousQueryPartitionAtomicOneNodeTest.java |  37 ++++
 ...heContinuousQueryPartitionTxOneNodeTest.java |  37 ++++
 ...tinuousQueryReplicatedAtomicOneNodeTest.java |  31 +++
 ...ontinuousQueryReplicatedOneNodeSelfTest.java | 120 ------------
 ...eContinuousQueryReplicatedTxOneNodeTest.java | 193 +++++++++++++++++++
 .../IgniteCacheQuerySelfTestSuite.java          |  10 +-
 9 files changed, 417 insertions(+), 129 deletions(-)
----------------------------------------------------------------------



[2/5] ignite git commit: 2224 Added Javadoc about new methods to CacheEntry

Posted by vk...@apache.org.
2224 Added Javadoc about new methods to CacheEntry


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e7de923e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e7de923e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e7de923e

Branch: refs/heads/ignite-2450
Commit: e7de923e6c34bdb4e276bd4314868541087d66f1
Parents: 60c8b07
Author: Anton Vinogradov <av...@apache.org>
Authored: Wed Feb 3 13:11:10 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Wed Feb 3 13:11:10 2016 +0300

----------------------------------------------------------------------
 .../src/main/java/org/apache/ignite/cache/CacheEntry.java   | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e7de923e/modules/core/src/main/java/org/apache/ignite/cache/CacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheEntry.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheEntry.java
index d92f9fb..19585a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/CacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheEntry.java
@@ -24,9 +24,9 @@ import org.apache.ignite.IgniteCache;
 
 /**
  * Cache entry that extends {@link javax.cache.Cache.Entry} by providing additional entry related information.
- *
- * To get an instance of {@code CacheEntry} use {@link javax.cache.Cache.Entry#unwrap(Class)} method by passing
- * {@code CacheEntry} class to it as the argument.
+ * <p>
+ * To get an instance of {@code CacheEntry} from {@link javax.cache.Cache.Entry} use
+ * {@link javax.cache.Cache.Entry#unwrap(Class)} method by passing {@code CacheEntry} class to it as the argument.
  * <p>
  * {@code CacheEntry} is supported only for {@link javax.cache.Cache.Entry} returned by one of the following methods:
  * <ul>
@@ -36,6 +36,9 @@ import org.apache.ignite.IgniteCache;
  * <li>{@link IgniteCache#randomEntry()}</li>
  * </ul>
  * <p>
+ * To get an instance of {@code CacheEntry} directly use {@link IgniteCache#getEntry(Object)} or
+ * {@link IgniteCache#getEntries(Set)} methods.
+ * <p>
  * {@code CacheEntry} is not supported for {@link javax.cache.Cache#iterator()} because of performance reasons.
  * {@link javax.cache.Cache#iterator()} loads entries from all the cluster nodes and to speed up the load additional
  * information, like entry's version, is ignored.


[4/5] ignite git commit: IGNITE-2450 - Fixed tests

Posted by vk...@apache.org.
IGNITE-2450 - Fixed tests


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a3c03119
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a3c03119
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a3c03119

Branch: refs/heads/ignite-2450
Commit: a3c03119c1a1ac518dd2d959dd60ace4406c89dd
Parents: 7e93613
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Wed Feb 3 16:47:41 2016 -0800
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Wed Feb 3 16:47:41 2016 -0800

----------------------------------------------------------------------
 ...ynamicProxySerializationMultiJvmSelfTest.java | 19 ++++++++++++++-----
 .../junits/multijvm/IgniteNodeRunner.java        | 16 +++++++++-------
 .../junits/multijvm/IgniteProcessProxy.java      | 19 +++++++++++--------
 .../ignite/testsuites/IgniteBasicTestSuite.java  |  2 +-
 .../testsuites/IgniteBinaryBasicTestSuite.java   |  2 ++
 5 files changed, 37 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a3c03119/modules/core/src/test/java/org/apache/ignite/marshaller/DynamicProxySerializationMultiJvmSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/marshaller/DynamicProxySerializationMultiJvmSelfTest.java b/modules/core/src/test/java/org/apache/ignite/marshaller/DynamicProxySerializationMultiJvmSelfTest.java
index c5bd892..d22aeac 100644
--- a/modules/core/src/test/java/org/apache/ignite/marshaller/DynamicProxySerializationMultiJvmSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/marshaller/DynamicProxySerializationMultiJvmSelfTest.java
@@ -20,8 +20,10 @@ package org.apache.ignite.marshaller;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
+import java.util.concurrent.Callable;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -31,7 +33,7 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
  */
 public class DynamicProxySerializationMultiJvmSelfTest extends GridCommonAbstractTest {
     /** */
-    private static boolean optMarsh;
+    private static Callable<Marshaller> marshFactory;
 
     /** {@inheritDoc} */
     @Override protected boolean isMultiJvm() {
@@ -42,8 +44,7 @@ public class DynamicProxySerializationMultiJvmSelfTest extends GridCommonAbstrac
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
-        if (optMarsh)
-            cfg.setMarshaller(new OptimizedMarshaller(false));
+        cfg.setMarshaller(marshFactory.call());
 
         return cfg;
     }
@@ -52,7 +53,11 @@ public class DynamicProxySerializationMultiJvmSelfTest extends GridCommonAbstrac
      * @throws Exception If failed.
      */
     public void testOptimized() throws Exception {
-        optMarsh = true;
+        marshFactory = new Callable<Marshaller>() {
+            @Override public Marshaller call() throws Exception {
+                return new OptimizedMarshaller(false);
+            }
+        };
 
         doTest();
     }
@@ -61,7 +66,11 @@ public class DynamicProxySerializationMultiJvmSelfTest extends GridCommonAbstrac
      * @throws Exception If failed.
      */
     public void testBinary() throws Exception {
-        optMarsh = false;
+        marshFactory = new Callable<Marshaller>() {
+            @Override public Marshaller call() throws Exception {
+                return new BinaryMarshaller();
+            }
+        };
 
         doTest();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a3c03119/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java
index 0597eda..7d1a37d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.testframework.junits.multijvm;
 
-import com.thoughtworks.xstream.XStream;
 import java.io.BufferedOutputStream;
 import java.io.BufferedReader;
 import java.io.File;
@@ -29,6 +28,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
+import com.thoughtworks.xstream.XStream;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.Ignition;
@@ -100,12 +100,14 @@ public class IgniteNodeRunner {
         String fileName = IGNITE_CONFIGURATION_FILE + cfg.getNodeId();
 
         try(OutputStream out = new BufferedOutputStream(new FileOutputStream(fileName))) {
-            cfg.setMBeanServer(null);
-            cfg.setMarshaller(null);
-            cfg.setDiscoverySpi(null);
-            cfg.setGridLogger(null);
+            IgniteConfiguration cfg0 = new IgniteConfiguration(cfg);
+
+            cfg0.setMBeanServer(null);
+            cfg0.setMarshaller(null);
+            cfg0.setDiscoverySpi(null);
+            cfg0.setGridLogger(null);
 
-            new XStream().toXML(cfg, out);
+            new XStream().toXML(cfg0, out);
         }
 
         return fileName;
@@ -176,4 +178,4 @@ public class IgniteNodeRunner {
 
         return res;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a3c03119/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
index fed42e1..a2e0d5a 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
@@ -19,7 +19,6 @@ package org.apache.ignite.testframework.junits.multijvm;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -31,10 +30,10 @@ import org.apache.ignite.IgniteAtomicLong;
 import org.apache.ignite.IgniteAtomicReference;
 import org.apache.ignite.IgniteAtomicSequence;
 import org.apache.ignite.IgniteAtomicStamped;
+import org.apache.ignite.IgniteBinary;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.IgniteCountDownLatch;
-import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteEvents;
 import org.apache.ignite.IgniteException;
@@ -42,9 +41,9 @@ import org.apache.ignite.IgniteFileSystem;
 import org.apache.ignite.IgniteIllegalStateException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteMessaging;
-import org.apache.ignite.IgniteBinary;
 import org.apache.ignite.IgniteQueue;
 import org.apache.ignite.IgniteScheduler;
+import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.IgniteServices;
 import org.apache.ignite.IgniteSet;
 import org.apache.ignite.IgniteTransactions;
@@ -76,6 +75,7 @@ import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.plugin.IgnitePlugin;
 import org.apache.ignite.plugin.PluginNotFoundException;
 import org.apache.ignite.resources.IgniteInstanceResource;
@@ -121,14 +121,17 @@ public class IgniteProcessProxy implements IgniteEx {
 
         String cfgFileName = IgniteNodeRunner.storeToFile(cfg.setNodeId(id));
 
-        List<String> jvmArgs = U.jvmArgs();
-
         Collection<String> filteredJvmArgs = new ArrayList<>();
 
-        for (String arg : jvmArgs) {
-            if(arg.startsWith("-Xmx") || arg.startsWith("-Xms") ||
+        Marshaller marsh = cfg.getMarshaller();
+
+        if (marsh != null)
+            filteredJvmArgs.add("-D" + IgniteTestResources.MARSH_CLASS_NAME + "=" + marsh.getClass().getName());
+
+        for (String arg : U.jvmArgs()) {
+            if (arg.startsWith("-Xmx") || arg.startsWith("-Xms") ||
                 arg.startsWith("-cp") || arg.startsWith("-classpath") ||
-                arg.startsWith("-D" + IgniteTestResources.MARSH_CLASS_NAME))
+                (marsh != null && arg.startsWith("-D" + IgniteTestResources.MARSH_CLASS_NAME)))
                 filteredJvmArgs.add(arg);
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a3c03119/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index e26c5dd..c904ef4 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -117,7 +117,7 @@ public class IgniteBasicTestSuite extends TestSuite {
 
         suite.addTestSuite(IgniteExceptionInNioWorkerSelfTest.class);
 
-        suite.addTestSuite(DynamicProxySerializationMultiJvmSelfTest.class);
+        GridTestUtils.addTestIfNeeded(suite, DynamicProxySerializationMultiJvmSelfTest.class, ignoredTests);
 
         return suite;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a3c03119/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryBasicTestSuite.java
index cbb87fa..e0c06dc 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryBasicTestSuite.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.util.IgniteUtilsSelfTest;
 import org.apache.ignite.internal.util.io.GridUnsafeDataOutputArraySizingSelfTest;
 import org.apache.ignite.internal.util.nio.GridNioSelfTest;
 import org.apache.ignite.internal.util.nio.GridNioSslSelfTest;
+import org.apache.ignite.marshaller.DynamicProxySerializationMultiJvmSelfTest;
 import org.apache.ignite.marshaller.jdk.GridJdkMarshallerSelfTest;
 import org.apache.ignite.marshaller.optimized.OptimizedMarshallerEnumSelfTest;
 import org.apache.ignite.marshaller.optimized.OptimizedMarshallerNodeFailoverTest;
@@ -84,6 +85,7 @@ public class IgniteBinaryBasicTestSuite extends TestSuite {
         ignoredTests.add(GridMessagingSelfTest.class);
         ignoredTests.add(GridVersionSelfTest.class);
         ignoredTests.add(GridDeploymentMessageCountSelfTest.class);
+        ignoredTests.add(DynamicProxySerializationMultiJvmSelfTest.class);
 
         // TODO: check and delete if pass.
         ignoredTests.add(IgniteDaemonNodeMarshallerCacheTest.class);


[3/5] ignite git commit: IGNITE-2465 - Fixed race in load cache closure - Fixes #431.

Posted by vk...@apache.org.
IGNITE-2465 - Fixed race in load cache closure - Fixes #431.

Signed-off-by: Alexey Goncharuk <al...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/500bd3ab
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/500bd3ab
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/500bd3ab

Branch: refs/heads/ignite-2450
Commit: 500bd3ab576830f8160eb66274590b7684a39599
Parents: e7de923
Author: ashutak <as...@gridgain.com>
Authored: Wed Feb 3 14:56:42 2016 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Feb 3 14:56:42 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      | 100 ++++++++++++++++++-
 1 file changed, 96 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/500bd3ab/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 69abc54..2c3a197 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -70,6 +70,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.FileSystemConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.TransactionConfiguration;
+import org.apache.ignite.internal.ComputeTaskInternalFuture;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteKernal;
@@ -101,6 +102,7 @@ import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException
 import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.util.F0;
 import org.apache.ignite.internal.util.GridLeanMap;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -128,6 +130,7 @@ import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteOutClosure;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.mxbean.CacheMetricsMXBean;
@@ -166,6 +169,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     /** Maximum number of retries when topology changes. */
     public static final int MAX_RETRIES = IgniteSystemProperties.getInteger(IGNITE_CACHE_RETRIES_COUNT, 100);
 
+    /** */
+    public static final IgniteProductVersion LOAD_CACHE_JOB_SINCE = IgniteProductVersion.fromString("1.5.7");
+
     /** Deserialization stash. */
     private static final ThreadLocal<IgniteBiTuple<String, String>> stash = new ThreadLocal<IgniteBiTuple<String,
         String>>() {
@@ -3737,7 +3743,19 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      */
     IgniteInternalFuture<?> globalLoadCacheAsync(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args)
         throws IgniteCheckedException {
-        ClusterGroup nodes = ctx.kernalContext().grid().cluster().forCacheNodes(ctx.name());
+        ClusterGroup oldNodes = ctx.kernalContext().grid().cluster().forCacheNodes(ctx.name())
+            .forPredicate(new IgnitePredicate<ClusterNode>() {
+                @Override public boolean apply(ClusterNode node) {
+                    return node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_SINCE) < 0;
+                }
+            });
+
+        ClusterGroup newNodes = ctx.kernalContext().grid().cluster().forCacheNodes(ctx.name())
+            .forPredicate(new IgnitePredicate<ClusterNode>() {
+                @Override public boolean apply(ClusterNode node) {
+                    return node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_SINCE) >= 0;
+                }
+            });
 
         ctx.kernalContext().task().setThreadContext(TC_NO_FAILOVER, true);
 
@@ -3745,9 +3763,27 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
         ExpiryPolicy plc = opCtx != null ? opCtx.expiry() : null;
 
-        return ctx.kernalContext().closure().callAsync(BROADCAST,
-            Arrays.asList(new LoadCacheClosure<>(ctx.name(), p, args, plc)),
-            nodes.nodes());
+        GridCompoundFuture<Object, ?> fut = new GridCompoundFuture<>();
+
+        if (!F.isEmpty(oldNodes.nodes())) {
+            ComputeTaskInternalFuture oldNodesFut = ctx.kernalContext().closure().callAsync(BROADCAST,
+                Arrays.asList(new LoadCacheClosure<>(ctx.name(), p, args, plc)),
+                oldNodes.nodes());
+
+            fut.add(oldNodesFut);
+        }
+
+        if (!F.isEmpty(newNodes.nodes())) {
+            ComputeTaskInternalFuture newNodesFut = ctx.kernalContext().closure().callAsync(BROADCAST,
+                Arrays.asList(new LoadCacheJob<>(ctx.name(), ctx.affinity().affinityTopologyVersion(), p, args, plc)),
+                newNodes.nodes());
+
+            fut.add(newNodesFut);
+        }
+
+        fut.markInitialized();
+
+        return fut;
     }
 
     /**
@@ -5498,6 +5534,62 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     }
 
     /**
+     * Internal callable for global size calculation.
+     */
+    @GridInternal
+    private static class LoadCacheJob<K, V> extends TopologyVersionAwareJob {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final IgniteBiPredicate<K, V> p;
+
+        /** */
+        private final Object[] args;
+
+        /** */
+        private final ExpiryPolicy plc;
+
+        /**
+         * @param cacheName Cache name.
+         * @param topVer Affinity topology version.
+         * @param p Predicate.
+         * @param args Arguments.
+         * @param plc Policy.
+         */
+        private LoadCacheJob(String cacheName, AffinityTopologyVersion topVer, IgniteBiPredicate<K, V> p, Object[] args,
+            ExpiryPolicy plc) {
+            super(cacheName, topVer);
+
+            this.p = p;
+            this.args = args;
+            this.plc = plc;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache) {
+            try {
+                assert cache != null : "Failed to get a cache [cacheName=" + cacheName + ", topVer=" + topVer + "]";
+
+                if (plc != null)
+                    cache = cache.withExpiryPolicy(plc);
+
+                cache.localLoadCache(p, args);
+
+                return null;
+            }
+            catch (IgniteCheckedException e) {
+                throw U.convertException(e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        public String toString() {
+            return S.toString(LoadCacheJob.class, this);
+        }
+    }
+
+    /**
      * Holder for last async operation future.
      */
     protected static class FutureHolder {