You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2016/06/15 12:05:48 UTC

[01/16] ignite git commit: ignite-3261 Store information about AffinityKey class in metadata cache. (cherry picked from commit 2b64e7c)

Repository: ignite
Updated Branches:
  refs/heads/master b500d3a56 -> b742f5fc8


ignite-3261 Store information about AffinityKey class in metadata cache.
(cherry picked from commit 2b64e7c)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/150b5c99
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/150b5c99
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/150b5c99

Branch: refs/heads/master
Commit: 150b5c99828f701f72cabf852dfbff0dca4ea0ca
Parents: fad73bf
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jun 14 09:45:10 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jun 14 10:35:43 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/binary/BinaryContext.java   | 43 ++++++++++--
 .../ignite/internal/binary/AffinityKey.java     | 69 ++++++++++++++++++++
 .../binary/GridBinaryAffinityKeySelfTest.java   | 15 +++++
 ...aultBinaryMappersBinaryMetaDataSelfTest.java | 17 +++++
 4 files changed, 137 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/150b5c99/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
index daf34ad..d39f9c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
@@ -259,7 +259,7 @@ public class BinaryContext {
         registerPredefinedType(LinkedHashMap.class, 0);
 
         // Classes with overriden default serialization flag.
-        registerPredefinedType(AffinityKey.class, 0, affinityFieldName(AffinityKey.class));
+        registerPredefinedType(AffinityKey.class, 0, affinityFieldName(AffinityKey.class), false);
 
         registerPredefinedType(GridMapEntry.class, 60);
         registerPredefinedType(IgniteBiTuple.class, 61);
@@ -562,9 +562,37 @@ public class BinaryContext {
         if (desc == null)
             desc = registerClassDescriptor(cls, deserialize);
         else if (!desc.registered()) {
-            assert desc.userType();
-
-            desc = registerUserClassDescriptor(desc);
+            if (!desc.userType()) {
+                BinaryClassDescriptor desc0 = new BinaryClassDescriptor(
+                    this,
+                    desc.describedClass(),
+                    false,
+                    desc.typeId(),
+                    desc.typeName(),
+                    desc.affFieldKeyName(),
+                    desc.mapper(),
+                    desc.initialSerializer(),
+                    false,
+                    true
+                );
+
+                if (descByCls.replace(cls, desc, desc0)) {
+                    Collection<BinarySchema> schemas =
+                        desc0.schema() != null ? Collections.singleton(desc.schema()) : null;
+
+                    BinaryMetadata meta = new BinaryMetadata(desc0.typeId(),
+                        desc0.typeName(),
+                        desc0.fieldsMeta(),
+                        desc0.affFieldKeyName(),
+                        schemas, desc0.isEnum());
+
+                    metaHnd.addMeta(desc0.typeId(), meta.wrap(this));
+
+                    return desc0;
+                }
+            }
+            else
+                desc = registerUserClassDescriptor(desc);
         }
 
         return desc;
@@ -959,15 +987,16 @@ public class BinaryContext {
      * @return GridBinaryClassDescriptor.
      */
     public BinaryClassDescriptor registerPredefinedType(Class<?> cls, int id) {
-        return registerPredefinedType(cls, id, null);
+        return registerPredefinedType(cls, id, null, true);
     }
 
     /**
      * @param cls Class.
      * @param id Type ID.
+     * @param affFieldName Affinity field name.
      * @return GridBinaryClassDescriptor.
      */
-    public BinaryClassDescriptor registerPredefinedType(Class<?> cls, int id, String affFieldName) {
+    public BinaryClassDescriptor registerPredefinedType(Class<?> cls, int id, String affFieldName, boolean registered) {
         String simpleClsName = SIMPLE_NAME_LOWER_CASE_MAPPER.typeName(cls.getName());
 
         if (id == 0)
@@ -983,7 +1012,7 @@ public class BinaryContext {
             SIMPLE_NAME_LOWER_CASE_MAPPER,
             new BinaryReflectiveSerializer(),
             false,
-            true /* registered */
+            registered /* registered */
         );
 
         predefinedTypeNames.put(simpleClsName, id);

http://git-wip-us.apache.org/repos/asf/ignite/blob/150b5c99/modules/core/src/test/java/org/apache/ignite/internal/binary/AffinityKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/AffinityKey.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/AffinityKey.java
new file mode 100644
index 0000000..1b4daee
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/AffinityKey.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.binary;
+
+import org.apache.ignite.cache.affinity.AffinityKeyMapped;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ *
+ */
+public class AffinityKey {
+    /** Key. */
+    private int key;
+
+    /** Affinity key. */
+    @AffinityKeyMapped
+    private int aff;
+
+    /**
+     * @param key Key.
+     * @param aff Affinity key.
+     */
+    public AffinityKey(int key, int aff) {
+        this.key = key;
+        this.aff = aff;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        AffinityKey that = (AffinityKey) o;
+
+        return key == that.key && aff == that.aff;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int res = key;
+
+        res = 31 * res + aff;
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(AffinityKey.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/150b5c99/modules/core/src/test/java/org/apache/ignite/internal/binary/GridBinaryAffinityKeySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/GridBinaryAffinityKeySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/GridBinaryAffinityKeySelfTest.java
index 06406e0..2b54f6b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/binary/GridBinaryAffinityKeySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/GridBinaryAffinityKeySelfTest.java
@@ -139,6 +139,8 @@ public class GridBinaryAffinityKeySelfTest extends GridCommonAbstractTest {
 
             assertEquals(i, aff.affinityKey(ignite.binary().toBinary(new TestObject(i))));
 
+            assertEquals(i, aff.affinityKey(new AffinityKey(0, i)));
+
             BinaryObjectBuilder bldr = ignite.binary().builder("TestObject2");
 
             bldr.setField("affKey", i);
@@ -162,6 +164,8 @@ public class GridBinaryAffinityKeySelfTest extends GridCommonAbstractTest {
             assertEquals(affProc.mapKeyToNode(null, i), affProc.mapKeyToNode(null, new TestObject(i)));
 
             assertEquals(affProc.mapKeyToNode(null, i), affProc.mapKeyToNode(null, cacheObj));
+
+            assertEquals(affProc.mapKeyToNode(null, new AffinityKey(0, i)), affProc.mapKeyToNode(null, i));
         }
     }
 
@@ -184,6 +188,17 @@ public class GridBinaryAffinityKeySelfTest extends GridCommonAbstractTest {
             });
 
             assertEquals(aff.mapKeyToNode(i).id(), nodeId.get());
+
+            grid(0).compute().affinityRun(null, new AffinityKey(0, i), new IgniteRunnable() {
+                @IgniteInstanceResource
+                private Ignite ignite;
+
+                @Override public void run() {
+                    nodeId.set(ignite.configuration().getNodeId());
+                }
+            });
+
+            assertEquals(aff.mapKeyToNode(i).id(), nodeId.get());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/150b5c99/modules/core/src/test/java/org/apache/ignite/internal/binary/GridDefaultBinaryMappersBinaryMetaDataSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/GridDefaultBinaryMappersBinaryMetaDataSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/GridDefaultBinaryMappersBinaryMetaDataSelfTest.java
index 041238f..7010463 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/binary/GridDefaultBinaryMappersBinaryMetaDataSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/GridDefaultBinaryMappersBinaryMetaDataSelfTest.java
@@ -26,6 +26,7 @@ import org.apache.ignite.IgniteBinary;
 import org.apache.ignite.binary.BinaryBasicIdMapper;
 import org.apache.ignite.binary.BinaryNameMapper;
 import org.apache.ignite.binary.BinaryBasicNameMapper;
+import org.apache.ignite.cache.affinity.AffinityKey;
 import org.apache.ignite.configuration.BinaryConfiguration;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -149,6 +150,22 @@ public class GridDefaultBinaryMappersBinaryMetaDataSelfTest extends GridCommonAb
             else
                 assert false : meta.typeName();
         }
+
+        grid().cache(null).put(new AffinityKey<>(1, 1), 1);
+
+        metas = binaries().types();
+
+        assertEquals(3, metas.size());
+
+        for (BinaryType meta : metas) {
+            if (AffinityKey.class.getSimpleName().equals(meta.typeName())) {
+                assertEquals("affKey", meta.affinityKeyFieldName());
+
+                return;
+            }
+        }
+
+        fail("Failed to find metadata for AffinityKey");
     }
 
     /**


[10/16] ignite git commit: IGNITE-3305 - Fixed SYNC rebalance mode for dynamically started cache.

Posted by vk...@apache.org.
IGNITE-3305 - Fixed SYNC rebalance mode for dynamically started cache.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/84547ef2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/84547ef2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/84547ef2

Branch: refs/heads/master
Commit: 84547ef2a198d639ac213ecf2f51de539b3f19d0
Parents: e9a33f2
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Mon Jun 13 17:52:39 2016 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Tue Jun 14 13:12:38 2016 -0700

----------------------------------------------------------------------
 .../processors/cache/GridCacheProcessor.java    |  24 ++--
 .../IgniteCacheSyncRebalanceModeSelfTest.java   | 114 +++++++++++++++++++
 .../testsuites/IgniteCacheTestSuite5.java       |   2 +
 3 files changed, 131 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/84547ef2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 37c3cf1..9451254 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -731,9 +731,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public void onKernalStart() throws IgniteCheckedException {
-        try {
-            ClusterNode locNode = ctx.discovery().localNode();
+        ClusterNode locNode = ctx.discovery().localNode();
 
+        try {
             if (!ctx.config().isDaemon() && !getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
                 for (ClusterNode n : ctx.discovery().remoteNodes()) {
                     if (n.attribute(ATTR_CONSISTENCY_CHECK_SKIPPED))
@@ -822,15 +822,21 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             ctx.cacheObjects().onUtilityCacheStarted();
 
         // Wait for caches in SYNC preload mode.
-        for (CacheConfiguration cfg : ctx.config().getCacheConfiguration()) {
-            GridCacheAdapter cache = caches.get(maskNull(cfg.getName()));
+        for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+            CacheConfiguration cfg = desc.cacheConfiguration();
 
-            if (cache != null) {
-                if (cfg.getRebalanceMode() == SYNC) {
-                    CacheMode cacheMode = cfg.getCacheMode();
+            IgnitePredicate filter = cfg.getNodeFilter();
+
+            if (desc.locallyConfigured() || desc.receivedOnDiscovery() && CU.affinityNode(locNode, filter)) {
+                GridCacheAdapter cache = caches.get(maskNull(cfg.getName()));
 
-                    if (cacheMode == REPLICATED || (cacheMode == PARTITIONED && cfg.getRebalanceDelay() >= 0))
-                        cache.preloader().syncFuture().get();
+                if (cache != null) {
+                    if (cfg.getRebalanceMode() == SYNC) {
+                        CacheMode cacheMode = cfg.getCacheMode();
+
+                        if (cacheMode == REPLICATED || (cacheMode == PARTITIONED && cfg.getRebalanceDelay() >= 0))
+                            cache.preloader().syncFuture().get();
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/84547ef2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java
new file mode 100644
index 0000000..9b0637e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.replicated;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class IgniteCacheSyncRebalanceModeSelfTest extends GridCommonAbstractTest {
+    /** Entry count. */
+    public static final int CNT = 100_000;
+    public static final String STATIC_CACHE_NAME = "static";
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration ccfg = new CacheConfiguration(STATIC_CACHE_NAME);
+
+        ccfg.setCacheMode(CacheMode.REPLICATED);
+        ccfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testStaticCache() throws Exception {
+        IgniteEx ignite = startGrid(0);
+
+        IgniteCache<Object, Object> cache = ignite.cache(STATIC_CACHE_NAME);
+
+        try (IgniteDataStreamer<Object, Object> streamer = ignite.dataStreamer(STATIC_CACHE_NAME)) {
+            streamer.allowOverwrite(true);
+
+            for (int i = 0; i < 100_000; i++)
+                streamer.addData(i, i);
+        }
+
+        assertEquals(CNT, cache.localSize());
+
+        Ignite ignite2 = startGrid(1);
+
+        assertEquals(CNT, ignite2.cache(STATIC_CACHE_NAME).localSize(CachePeekMode.PRIMARY, CachePeekMode.BACKUP));
+
+        for (int i = 0; i < CNT; i++)
+            assertEquals(i, ignite2.cache(STATIC_CACHE_NAME).localPeek(i));
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testDynamicCache() throws Exception {
+        IgniteEx ignite = startGrid(0);
+
+        String cacheName = "dynamic";
+
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(cacheName);
+
+        ccfg.setCacheMode(CacheMode.REPLICATED);
+        ccfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+
+        IgniteCache<Object, Object> cache = ignite.createCache(ccfg);
+
+        try (IgniteDataStreamer<Object, Object> streamer = ignite.dataStreamer(cacheName)) {
+            streamer.allowOverwrite(true);
+
+            for (int i = 0; i < 100_000; i++)
+                streamer.addData(i, i);
+        }
+
+        assertEquals(CNT, cache.localSize());
+
+        Ignite ignite2 = startGrid(1);
+
+        assertEquals(CNT, ignite2.cache(cacheName).localSize(CachePeekMode.PRIMARY, CachePeekMode.BACKUP));
+
+        for (int i = 0; i < CNT; i++)
+            assertEquals(i, ignite2.cache(cacheName).localPeek(i));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/84547ef2/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
index a263c0d..41e0ed1 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheStoreCollectionTes
 import org.apache.ignite.internal.processors.cache.distributed.CacheLateAffinityAssignmentFairAffinityTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheLateAffinityAssignmentNodeJoinValidationTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheLateAffinityAssignmentTest;
+import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheSyncRebalanceModeSelfTest;
 import org.apache.ignite.internal.processors.cache.store.IgniteCacheWriteBehindNoUpdateSelfTest;
 
 /**
@@ -52,6 +53,7 @@ public class IgniteCacheTestSuite5 extends TestSuite {
         suite.addTestSuite(CacheLateAffinityAssignmentFairAffinityTest.class);
         suite.addTestSuite(CacheLateAffinityAssignmentNodeJoinValidationTest.class);
         suite.addTestSuite(EntryVersionConsistencyReadThroughTest.class);
+        suite.addTestSuite(IgniteCacheSyncRebalanceModeSelfTest.class);
 
         suite.addTest(IgniteCacheReadThroughEvictionsVariationsSuite.suite());
 


[03/16] ignite git commit: ignite-114 Load value from store for cache 'invoke' (cherry picked from commit e10ffef)

Posted by vk...@apache.org.
ignite-114 Load value from store for cache 'invoke'
(cherry picked from commit e10ffef)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e90c9c88
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e90c9c88
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e90c9c88

Branch: refs/heads/master
Commit: e90c9c8889235e780bb9f37ed0b0ca7a8d30136d
Parents: 150b5c9
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jun 14 10:05:20 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jun 14 10:42:27 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |  34 +-
 .../processors/cache/GridCacheEntryEx.java      |   9 +-
 .../processors/cache/GridCacheMapEntry.java     | 100 +++--
 .../processors/cache/GridCacheUtils.java        |   3 +
 .../distributed/dht/GridDhtCacheAdapter.java    |  10 +-
 .../distributed/dht/GridDhtLockFuture.java      |  18 +-
 .../dht/GridDhtTransactionalCacheAdapter.java   |   2 -
 .../distributed/dht/GridDhtTxPrepareFuture.java |  19 +-
 .../dht/GridPartitionedGetFuture.java           |   2 -
 .../dht/GridPartitionedSingleGetFuture.java     |   2 -
 .../dht/atomic/GridDhtAtomicCache.java          |   8 -
 .../dht/colocated/GridDhtColocatedCache.java    |   4 +-
 .../dht/preloader/GridDhtForceKeysFuture.java   |   3 +-
 .../dht/preloader/GridDhtPartitionDemander.java |   3 +-
 .../distributed/near/GridNearGetFuture.java     |   4 -
 .../cache/distributed/near/GridNearTxLocal.java |   7 +-
 .../local/atomic/GridLocalAtomicCache.java      |   8 -
 .../cache/transactions/IgniteTxAdapter.java     |   2 -
 .../cache/transactions/IgniteTxEntry.java       |  24 +-
 .../cache/transactions/IgniteTxHandler.java     |  46 +++
 .../transactions/IgniteTxLocalAdapter.java      |  34 +-
 .../datastreamer/DataStreamerImpl.java          |   3 +-
 .../processors/cache/GridCacheTestEntryEx.java  |   5 +-
 .../cache/IgniteCacheAbstractTest.java          |   2 +-
 ...gniteCacheInvokeReadThroughAbstractTest.java | 382 +++++++++++++++++++
 ...iteCacheInvokeReadThroughSingleNodeTest.java | 106 +++++
 .../cache/IgniteCacheInvokeReadThroughTest.java | 182 +++++----
 .../IgniteCacheReadThroughStoreCallTest.java    | 288 ++++++++++++++
 .../IgniteCacheLoaderWriterAbstractTest.java    |  10 +
 .../testsuites/IgniteCacheTestSuite4.java       |   4 +
 30 files changed, 1110 insertions(+), 214 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 0b3b2da..2212926 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -3531,8 +3531,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         GridCacheEntryEx entry = entryEx(key, false);
 
         try {
-            entry.initialValue(cacheVal, ver, ttl, CU.EXPIRE_TIME_CALCULATE, false, topVer,
-                replicate ? DR_LOAD : DR_NONE);
+            entry.initialValue(cacheVal,
+                ver,
+                ttl,
+                CU.EXPIRE_TIME_CALCULATE,
+                false,
+                topVer,
+                replicate ? DR_LOAD : DR_NONE,
+                true);
         }
         catch (IgniteCheckedException e) {
             throw new IgniteException("Failed to put cache value: " + entry, e);
@@ -4908,19 +4914,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         boolean deserializeBinary)
         throws IgniteCheckedException, GridCacheEntryRemovedException {
         CacheObject val = entry.innerGet(
-            null,
-            null,
-            false,
-            false,
-            false,
-            true,
-            false,
-            false,
-            false,
-            null,
-            null,
-            null,
-            null,
+            /*ver*/null,
+            /*tx*/null,
+            /*swap*/false,
+            /*readThrough*/false,
+            /*metrics*/false,
+            /*evt*/false,
+            /*tmp*/false,
+            /*subjId*/null,
+            /*transformClo*/null,
+            /*taskName*/null,
+            /*expiryPlc*/null,
             !deserializeBinary);
 
         if (val == null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 646e6bc..616854f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -279,9 +279,6 @@ public interface GridCacheEntryEx {
      * @param tx Ongoing transaction (possibly null).
      * @param readSwap Flag indicating whether to check swap memory.
      * @param readThrough Flag indicating whether to read through.
-     * @param failFast If {@code true}, then throw {@link GridCacheFilterFailedException} if
-     *      filter didn't pass.
-     * @param unmarshal Unmarshal flag.
      * @param updateMetrics If {@code true} then metrics should be updated.
      * @param evt Flag to signal event notification.
      * @param tmp If {@code true} can return temporary instance which is valid while entry lock is held,
@@ -300,8 +297,6 @@ public interface GridCacheEntryEx {
         @Nullable IgniteInternalTx tx,
         boolean readSwap,
         boolean readThrough,
-        boolean failFast,
-        boolean unmarshal,
         boolean updateMetrics,
         boolean evt,
         boolean tmp,
@@ -676,6 +671,7 @@ public interface GridCacheEntryEx {
      * @param preload Flag indicating whether entry is being preloaded.
      * @param topVer Topology version.
      * @param drType DR type.
+     * @param fromStore {@code True} if value was loaded from store.
      * @return {@code True} if initial value was set.
      * @throws IgniteCheckedException In case of error.
      * @throws GridCacheEntryRemovedException If entry was removed.
@@ -686,7 +682,8 @@ public interface GridCacheEntryEx {
         long expireTime,
         boolean preload,
         AffinityTopologyVersion topVer,
-        GridDrType drType) throws IgniteCheckedException, GridCacheEntryRemovedException;
+        GridDrType drType,
+        boolean fromStore) throws IgniteCheckedException, GridCacheEntryRemovedException;
 
     /**
      * Sets new value if current version is <tt>0</tt> using swap entry data.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 12bd556..6d321bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -750,8 +750,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         @Nullable IgniteInternalTx tx,
         boolean readSwap,
         boolean readThrough,
-        boolean failFast,
-        boolean unmarshal,
         boolean updateMetrics,
         boolean evt,
         boolean tmp,
@@ -766,7 +764,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             readSwap,
             readThrough,
             evt,
-            unmarshal,
             updateMetrics,
             tmp,
             subjId,
@@ -796,7 +793,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             readSwap,
             false,
             evt,
-            unmarshal,
             updateMetrics,
             false,
             subjId,
@@ -815,7 +811,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         boolean readSwap,
         boolean readThrough,
         boolean evt,
-        boolean unmarshal,
         boolean updateMetrics,
         boolean tmp,
         UUID subjId,
@@ -1987,6 +1982,51 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             if (isStartVersion())
                 unswap(retval, false);
 
+            // Prepare old value.
+            oldVal = needVal ? rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : val;
+
+            // Possibly read value from store.
+            boolean readFromStore = false;
+
+            Object old0 = null;
+
+            if (readThrough && needVal && oldVal == null && (cctx.readThrough() &&
+                (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) {
+                old0 = readThrough(null, key, false, subjId, taskName);
+
+                oldVal = cctx.toCacheObject(old0);
+
+                readFromStore = true;
+
+                // Detach value before index update.
+                oldVal = cctx.kernalContext().cacheObjects().prepareForCache(oldVal, cctx);
+
+                // Calculate initial TTL and expire time.
+                long initTtl;
+                long initExpireTime;
+
+                if (expiryPlc != null && oldVal != null) {
+                    IgniteBiTuple<Long, Long> initTtlAndExpireTime = initialTtlAndExpireTime(expiryPlc);
+
+                    initTtl = initTtlAndExpireTime.get1();
+                    initExpireTime = initTtlAndExpireTime.get2();
+                }
+                else {
+                    initTtl = CU.TTL_ETERNAL;
+                    initExpireTime = CU.EXPIRE_TIME_ETERNAL;
+                }
+
+                if (oldVal != null)
+                    updateIndex(oldVal, initExpireTime, ver, null);
+                else
+                    clearIndex(null);
+
+                update(oldVal, initExpireTime, initTtl, ver, true);
+
+                if (deletedUnlocked() && oldVal != null && !isInternal())
+                    deletedUnlocked(false);
+            }
+
             Object transformClo = null;
 
             // Request-level conflict resolution is needed, i.e. we do not know who will win in advance.
@@ -2194,51 +2234,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                         "Invalid version for inner update [isNew=" + isNew() + ", entry=" + this + ", newVer=" + newVer + ']';
             }
 
-            // Prepare old value and value bytes.
-            oldVal = needVal ? rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : val;
-
-            // Possibly read value from store.
-            boolean readFromStore = false;
-
-            Object old0 = null;
-
-            if (readThrough && needVal && oldVal == null && (cctx.readThrough() &&
-                (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) {
-                old0 = readThrough(null, key, false, subjId, taskName);
-
-                oldVal = cctx.toCacheObject(old0);
-
-                readFromStore = true;
-
-                // Detach value before index update.
-                oldVal = cctx.kernalContext().cacheObjects().prepareForCache(oldVal, cctx);
-
-                // Calculate initial TTL and expire time.
-                long initTtl;
-                long initExpireTime;
-
-                if (expiryPlc != null && oldVal != null) {
-                    IgniteBiTuple<Long, Long> initTtlAndExpireTime = initialTtlAndExpireTime(expiryPlc);
-
-                    initTtl = initTtlAndExpireTime.get1();
-                    initExpireTime = initTtlAndExpireTime.get2();
-                }
-                else {
-                    initTtl = CU.TTL_ETERNAL;
-                    initExpireTime = CU.EXPIRE_TIME_ETERNAL;
-                }
-
-                if (oldVal != null)
-                    updateIndex(oldVal, initExpireTime, ver, null);
-                else
-                    clearIndex(null);
-
-                update(oldVal, initExpireTime, initTtl, ver, true);
-
-                if (deletedUnlocked() && oldVal != null && !isInternal())
-                    deletedUnlocked(false);
-            }
-
             // Apply metrics.
             if (metrics && cctx.cache().configuration().isStatisticsEnabled() && needVal) {
                 // PutIfAbsent methods mustn't update hit/miss statistics
@@ -3417,7 +3412,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         long expireTime,
         boolean preload,
         AffinityTopologyVersion topVer,
-        GridDrType drType)
+        GridDrType drType,
+        boolean fromStore)
         throws IgniteCheckedException, GridCacheEntryRemovedException {
         synchronized (this) {
             checkObsolete();
@@ -3470,7 +3466,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     cctx.dataStructures().onEntryUpdated(key, false, true);
                 }
 
-                if (cctx.store().isLocal()) {
+                if (!fromStore && cctx.store().isLocal()) {
                     if (val != null)
                         cctx.store().put(null, key, val, ver);
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index e7010f5..feaa618 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -149,6 +149,9 @@ public class GridCacheUtils {
     /** Keep serialized flag. */
     public static final int KEEP_BINARY_FLAG_MASK = 0x2;
 
+    /** Flag indicating that old value for 'invoke' operation was non null on primary node. */
+    public static final int OLD_VAL_ON_PRIMARY = 0x4;
+
     /** Empty predicate array. */
     private static final IgnitePredicate[] EMPTY = new IgnitePredicate[0];
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 2ab6303..14468eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -546,8 +546,14 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
 
                     entry = entryEx(key, false);
 
-                    entry.initialValue(cacheVal, ver, ttl, CU.EXPIRE_TIME_CALCULATE, false, topVer,
-                        replicate ? DR_LOAD : DR_NONE);
+                    entry.initialValue(cacheVal,
+                        ver,
+                        ttl,
+                        CU.EXPIRE_TIME_CALCULATE,
+                        false,
+                        topVer,
+                        replicate ? DR_LOAD : DR_NONE,
+                        false);
                 }
                 catch (IgniteCheckedException e) {
                     throw new IgniteException("Failed to put cache value: " + entry, e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index b52376c..38a7e19 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -1048,7 +1048,14 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
                             try {
                                 CacheObject val0 = cctx.toCacheObject(val);
 
-                                entry0.initialValue(val0, ver, 0, 0, false, topVer, GridDrType.DR_LOAD);
+                                entry0.initialValue(val0,
+                                    ver,
+                                    0,
+                                    0,
+                                    false,
+                                    topVer,
+                                    GridDrType.DR_LOAD,
+                                    true);
                             }
                             catch (GridCacheEntryRemovedException e) {
                                 assert false : "Should not get removed exception while holding lock on entry " +
@@ -1205,8 +1212,13 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
                     try {
                         GridCacheEntryEx entry = cctx.cache().entryEx(info.key(), topVer);
 
-                        if (entry.initialValue(info.value(), info.version(), info.ttl(),
-                            info.expireTime(), true, topVer, replicate ? DR_PRELOAD : DR_NONE)) {
+                        if (entry.initialValue(info.value(),
+                            info.version(),
+                            info.ttl(),
+                            info.expireTime(),
+                            true, topVer,
+                            replicate ? DR_PRELOAD : DR_NONE,
+                            false)) {
                             if (rec && !entry.isInternal())
                                 cctx.events().addEvent(entry.partition(), entry.key(), cctx.localNodeId(),
                                     (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, info.value(), true, null,

http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 0a99621..d5927bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -1184,8 +1184,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                                         tx,
                                         /*swap*/true,
                                         /*read-through*/false,
-                                        /*fail-fast.*/false,
-                                        /*unmarshal*/false,
                                         /*update-metrics*/true,
                                         /*event notification*/req.returnValue(i),
                                         /*temporary*/false,

http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index e1baabb..746dbb9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -349,9 +349,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                 if (hasFilters || retVal || txEntry.op() == DELETE || txEntry.op() == TRANSFORM) {
                     cached.unswap(retVal);
 
-                    boolean readThrough = (retVal || hasFilters) &&
-                        cacheCtx.config().isLoadPreviousValue() &&
-                        !txEntry.skipStore();
+                    boolean readThrough = !txEntry.skipStore() &&
+                        (txEntry.op() == TRANSFORM || ((retVal || hasFilters) && cacheCtx.config().isLoadPreviousValue()));
 
                     boolean evt = retVal || txEntry.op() == TRANSFORM;
 
@@ -367,8 +366,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                         tx,
                         /*swap*/true,
                         readThrough,
-                        /*fail fast*/false,
-                        /*unmarshal*/true,
                         /*metrics*/retVal,
                         /*event*/evt,
                         /*tmp*/false,
@@ -392,6 +389,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
 
                             boolean modified = false;
 
+                            txEntry.oldValueOnPrimary(val != null);
+
                             for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : txEntry.entryProcessors()) {
                                  CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>(key, val,
                                      txEntry.cached().version(), keepBinary, txEntry.cached());
@@ -1573,8 +1572,14 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                         GridDrType drType = cacheCtx.isDrEnabled() ? GridDrType.DR_PRELOAD : GridDrType.DR_NONE;
 
                         try {
-                            if (entry.initialValue(info.value(), info.version(),
-                                info.ttl(), info.expireTime(), true, topVer, drType)) {
+                            if (entry.initialValue(info.value(),
+                                info.version(),
+                                info.ttl(),
+                                info.expireTime(),
+                                true,
+                                topVer,
+                                drType,
+                                false)) {
                                 if (rec && !entry.isInternal())
                                     cacheCtx.events().addEvent(entry.partition(), entry.key(), cctx.localNodeId(),
                                         (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, info.value(), true, null,

http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 9561ad8..2e22d9e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -473,8 +473,6 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
                             null,
                             /*swap*/true,
                             /*read-through*/false,
-                            /*fail-fast*/true,
-                            /*unmarshal*/true,
                             /**update-metrics*/false,
                             /*event*/!skipVals,
                             /*temporary*/false,

http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index fd59f48..aeb7eba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -397,8 +397,6 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
                             null,
                             /*swap*/true,
                             /*read-through*/false,
-                            /*fail-fast*/true,
-                            /*unmarshal*/true,
                             /**update-metrics*/false,
                             /*event*/!skipVals,
                             /*temporary*/false,

http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 7460495..9291153 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1371,8 +1371,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                     null,
                                     /*swap*/true,
                                     /*read-through*/false,
-                                    /*fail-fast*/true,
-                                    /*unmarshal*/true,
                                     /**update-metrics*/false,
                                     /*event*/!skipVals,
                                     /*temporary*/false,
@@ -1848,8 +1846,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         null,
                         /*read swap*/true,
                         /*read through*/true,
-                        /*fail fast*/false,
-                        /*unmarshal*/true,
                         /*metrics*/true,
                         /*event*/true,
                         /*temporary*/true,
@@ -2008,8 +2004,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                              null,
                             /*read swap*/true,
                             /*read through*/ctx.loadPreviousValue(),
-                            /*fail fast*/false,
-                            /*unmarshal*/true,
                             /*metrics*/true,
                             /*event*/true,
                             /*temporary*/true,
@@ -2055,8 +2049,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             null,
                             /*read swap*/true,
                             /*read through*/ctx.loadPreviousValue(),
-                            /*fail fast*/false,
-                            /*unmarshal*/true,
                             /*metrics*/true,
                             /*event*/true,
                             /*temporary*/true,

http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index d0850e7..3376510 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -508,8 +508,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
                                     null,
                                     /*swap*/true,
                                     /*read-through*/false,
-                                    /*fail-fast*/true,
-                                    /*unmarshal*/true,
                                     /**update-metrics*/false,
                                     /*event*/!skipVals,
                                     /*temporary*/false,
@@ -1054,7 +1052,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
 
             IgniteInternalFuture<GridCacheReturn> txFut = tx.lockAllAsync(cacheCtx,
                 keys,
-                tx.implicit(),
+                retval,
                 txRead,
                 accessTtl,
                 skipStore,

http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
index 4da1f38..7cbd77c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
@@ -552,7 +552,8 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
                             info.expireTime(),
                             true,
                             topVer,
-                            replicate ? DR_PRELOAD : DR_NONE
+                            replicate ? DR_PRELOAD : DR_NONE,
+                            false
                         )) {
                             if (rec && !entry.isInternal())
                                 cctx.events().addEvent(entry.partition(), entry.key(), cctx.localNodeId(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 821d2e4..57d5229 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -688,7 +688,8 @@ public class GridDhtPartitionDemander {
                         entry.expireTime(),
                         true,
                         topVer,
-                        cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE
+                        cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE,
+                        false
                     )) {
                         cctx.evicts().touch(cached, topVer); // Start tracking.
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index 7d29381..290c08e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -463,8 +463,6 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
                             tx,
                             /*swap*/false,
                             /*read-through*/false,
-                            /*fail-fast*/true,
-                            /*unmarshal*/true,
                             /*metrics*/true,
                             /*events*/!skipVals,
                             /*temporary*/false,
@@ -605,8 +603,6 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
                             tx,
                             /*swap*/true,
                             /*read-through*/false,
-                            /*fail-fast*/true,
-                            /*unmarshal*/true,
                             /*update-metrics*/false,
                             /*events*/!nearRead && !skipVals,
                             /*temporary*/false,

http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 7500d8f..db736a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -1147,16 +1147,17 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
     /**
      * @param cacheCtx Cache context.
      * @param keys Keys.
-     * @param implicit Implicit flag.
+     * @param retval Return value flag.
      * @param read Read flag.
      * @param accessTtl Access ttl.
      * @param <K> Key type.
      * @param skipStore Skip store flag.
+     * @param keepBinary Keep binary flag.
      * @return Future with respond.
      */
     public <K> IgniteInternalFuture<GridCacheReturn> lockAllAsync(GridCacheContext cacheCtx,
         final Collection<? extends K> keys,
-        boolean implicit,
+        boolean retval,
         boolean read,
         long accessTtl,
         boolean skipStore,
@@ -1190,7 +1191,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
             this,
             isInvalidate(),
             read,
-            /*retval*/false,
+            retval,
             isolation,
             accessTtl,
             CU.empty0(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index e1d12b6..ac08f8f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -550,8 +550,6 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
                                 null,
                                 /*swap*/swapOrOffheap,
                                 /*read-through*/false,
-                                /*fail-fast*/false,
-                                /*unmarshal*/true,
                                 /**update-metrics*/true,
                                 /**event*/!skipVals,
                                 /**temporary*/false,
@@ -1111,8 +1109,6 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
                             null,
                             /*swap*/true,
                             /*read-through*/true,
-                            /*fail-fast*/false,
-                            /*unmarshal*/true,
                             /**update-metrics*/true,
                             /**event*/true,
                             /**temporary*/true,
@@ -1235,8 +1231,6 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
                                 null,
                                 /*swap*/true,
                                 /*read-through*/ctx.loadPreviousValue(),
-                                /*fail-fast*/false,
-                                /*unmarshal*/true,
                                 /**update-metrics*/true,
                                 /**event*/true,
                                 /**temporary*/true,
@@ -1272,8 +1266,6 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
                                 null,
                                 /*swap*/true,
                                 /*read-through*/ctx.loadPreviousValue(),
-                                /*fail-fast*/false,
-                                /*unmarshal*/true,
                                 /**update-metrics*/true,
                                 /**event*/true,
                                 /**temporary*/true,

http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 86a1989..ece013e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -1493,8 +1493,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
                     this,
                     /*swap*/false,
                     /*read through*/false,
-                    /*fail fast*/true,
-                    /*unmarshal*/true,
                     /*metrics*/metrics,
                     /*event*/recordEvt,
                     /*temporary*/true,

http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 8d50e2b..87b2525 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.GridCacheUtils;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -57,6 +58,7 @@ import org.jetbrains.annotations.Nullable;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.READ;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
 import static org.apache.ignite.internal.processors.cache.GridCacheUtils.KEEP_BINARY_FLAG_MASK;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.OLD_VAL_ON_PRIMARY;
 import static org.apache.ignite.internal.processors.cache.GridCacheUtils.SKIP_STORE_FLAG_MASK;
 
 /**
@@ -192,9 +194,11 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
     private byte[] expiryPlcBytes;
 
     /**
-     * Additional flags.
-     * GridCacheUtils.SKIP_STORE_FLAG_MASK - for skipStore flag value.
-     * GridCacheUtils.KEEP_BINARY_FLAG_MASK - for withKeepBinary flag.
+     * Additional flags:
+     * <ul>
+     * <li>{@link GridCacheUtils#SKIP_STORE_FLAG_MASK} - for skipStore flag value.</li>
+     * <li>{@link GridCacheUtils#KEEP_BINARY_FLAG_MASK} - for withKeepBinary flag.</li>
+     * </ul>
      */
     private byte flags;
 
@@ -493,6 +497,20 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
     }
 
     /**
+     * @param oldValOnPrimary {@code True} If old value for 'invoke' operation was non null on primary node.
+     */
+    public void oldValueOnPrimary(boolean oldValOnPrimary) {
+        setFlag(oldValOnPrimary, OLD_VAL_ON_PRIMARY);
+    }
+
+    /**
+     * @return {@code True} If old value for 'invoke' operation was non null on primary node.
+     */
+    public boolean oldValueOnPrimary() {
+        return isFlag(OLD_VAL_ON_PRIMARY);
+    }
+
+    /**
      * Sets keep binary flag value.
      *
      * @param keepBinary Keep binary flag value.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index a764d5d..7039399 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -26,9 +26,11 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -76,6 +78,7 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
 import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
 import static org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishFuture.FINISH_NEAR_ONE_PHASE_SINCE;
 import static org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx.FinalizationStatus.USER_FINISH;
@@ -1193,6 +1196,49 @@ public class IgniteTxHandler {
                             if (info != null && !info.isNew() && !info.isDeleted())
                                 res.addPreloadEntry(info);
                         }
+
+                        if (cacheCtx.readThroughConfigured() &&
+                            !entry.skipStore() &&
+                            entry.op() == TRANSFORM &&
+                            entry.oldValueOnPrimary() &&
+                            !entry.hasValue()) {
+                            while (true) {
+                                try {
+                                    GridCacheEntryEx cached = entry.cached();
+
+                                    if (cached == null)
+                                        cached = cacheCtx.cache().entryEx(entry.key(), req.topologyVersion());
+
+                                    CacheObject val = cached.innerGet(
+                                        /*ver*/null,
+                                        tx,
+                                        /*readSwap*/true,
+                                        /*readThrough*/false,
+                                        /*updateMetrics*/false,
+                                        /*evt*/false,
+                                        /*tmp*/false,
+                                        tx.subjectId(),
+                                        /*transformClo*/null,
+                                        tx.resolveTaskName(),
+                                        /*expiryPlc*/null,
+                                        /*keepBinary*/true);
+
+                                    if (val == null)
+                                        val = cacheCtx.toCacheObject(cacheCtx.store().load(null, entry.key()));
+
+                                    if (val != null)
+                                        entry.readValue(val);
+
+                                    break;
+                                }
+                                catch (GridCacheEntryRemovedException e) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Got entry removed exception, will retry: " + entry.txKey());
+
+                                    entry.cached(null);
+                                }
+                            }
+                        }
                     }
                     catch (GridDhtInvalidPartitionException e) {
                         tx.addInvalidPartition(cacheCtx, e.partition());

http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index f9ef423..d51d873 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1249,8 +1249,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                     this,
                                     /*swap*/true,
                                     /*read-through*/false,
-                                    /*fail fast*/true,
-                                    /*unmarshal*/true,
                                     /*metrics*/true,
                                     /*event*/!skipVals,
                                     /*temporary*/false,
@@ -1334,9 +1332,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                     null,
                                     this,
                                     /*swap*/true,
-                                    /*no read-through*/false,
-                                    /*fail-fast*/true,
-                                    /*unmarshal*/true,
+                                    /*read-through*/false,
                                     /*metrics*/true,
                                     /*event*/true,
                                     /*temporary*/false,
@@ -1683,8 +1679,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                             IgniteTxLocalAdapter.this,
                                             cacheCtx.isSwapOrOffheapEnabled(),
                                             /*read-through*/false,
-                                            /*fail-fast*/true,
-                                            /*unmarshal*/true,
                                             /*metrics*/true,
                                             /*events*/!skipVals,
                                             /*temporary*/false,
@@ -2025,7 +2019,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                     needReadVer,
                     singleRmv,
                     hasFilters,
-                    skipStore,
+                    /*read through*/(entryProcessor != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore,
                     retval,
                     keepBinary);
             }
@@ -2194,7 +2188,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                     needReadVer,
                     singleRmv,
                     hasFilters,
-                    skipStore,
+                    /*read through*/(invokeMap != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore,
                     retval,
                     keepBinary);
             }
@@ -2214,7 +2208,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
      * @param needReadVer Read version flag.
      * @param singleRmv {@code True} for single remove operation.
      * @param hasFilters {@code True} if filters not empty.
-     * @param skipStore Skip store flag.
+     * @param readThrough Read through flag.
      * @param retval Return value flag.
      * @return Load future.
      */
@@ -2227,7 +2221,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
         final boolean needReadVer,
         final boolean singleRmv,
         final boolean hasFilters,
-        final boolean skipStore,
+        final boolean readThrough,
         final boolean retval,
         final boolean keepBinary) {
         GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c =
@@ -2260,6 +2254,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                         if (e.op() == TRANSFORM) {
                             GridCacheVersion ver;
 
+                            e.readValue(cacheVal);
+
                             try {
                                 ver = e.cached().version();
                             }
@@ -2286,7 +2282,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
         return loadMissing(
             cacheCtx,
             topVer,
-            /*read through*/cacheCtx.config().isLoadPreviousValue() && !skipStore,
+            readThrough,
             /*async*/true,
             keys,
             /*skipVals*/singleRmv,
@@ -2398,8 +2394,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                     this,
                                     /*swap*/false,
                                     /*read-through*/false,
-                                    /*fail-fast*/false,
-                                    /*unmarshal*/retval || needVal,
                                     /*metrics*/retval,
                                     /*events*/retval,
                                     /*temporary*/false,
@@ -2696,16 +2690,16 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                     if (retval || invoke) {
                         if (!cacheCtx.isNear()) {
                             if (!hasPrevVal) {
-                                boolean readThrough =
-                                    (invoke || cacheCtx.loadPreviousValue()) && !txEntry.skipStore();
+                                // For non-local cache should read from store after lock on primary.
+                                boolean readThrough = cacheCtx.isLocal() &&
+                                    (invoke || cacheCtx.loadPreviousValue()) &&
+                                    !txEntry.skipStore();
 
                                 v = cached.innerGet(
                                     null,
                                     this,
                                     /*swap*/true,
                                     readThrough,
-                                    /*failFast*/false,
-                                    /*unmarshal*/true,
                                     /*metrics*/!invoke,
                                     /*event*/!invoke && !dht(),
                                     /*temporary*/false,
@@ -2937,7 +2931,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                 IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted,
                     timeout,
                     this,
-                    false,
+                    /*read*/entryProcessor != null, // Needed to force load from store.
                     retval,
                     isolation,
                     isInvalidate(),
@@ -3115,7 +3109,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                 IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted,
                     timeout,
                     this,
-                    false,
+                    /*read*/invokeMap != null, // Needed to force load from store.
                     retval,
                     isolation,
                     isInvalidate(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 4599060..9dc6a7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -1621,7 +1621,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                         expiryTime,
                         false,
                         topVer,
-                        GridDrType.DR_LOAD);
+                        GridDrType.DR_LOAD,
+                        false);
 
                     cctx.evicts().touch(entry, topVer);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index 348b6c9..400fb14 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -432,8 +432,6 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
         @Nullable IgniteInternalTx tx,
         boolean readSwap,
         boolean readThrough,
-        boolean failFast,
-        boolean unmarshal,
         boolean updateMetrics,
         boolean evt,
         boolean tmp,
@@ -667,7 +665,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
         long expireTime,
         boolean preload,
         AffinityTopologyVersion topVer,
-        GridDrType drType
+        GridDrType drType,
+        boolean fromStore
     ) throws IgniteCheckedException, GridCacheEntryRemovedException {
         assert false;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
index ce60232..45b6e9d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
@@ -54,7 +54,7 @@ public abstract class IgniteCacheAbstractTest extends GridCommonAbstractTest {
     private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
 
     /** */
-    protected static final Map<Object, Object> storeMap = new ConcurrentHashMap8<>();
+    public static final Map<Object, Object> storeMap = new ConcurrentHashMap8<>();
 
     /**
      * @return Grids count to start.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughAbstractTest.java
new file mode 100644
index 0000000..294ebea
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughAbstractTest.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import javax.cache.configuration.Factory;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorResult;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public abstract class IgniteCacheInvokeReadThroughAbstractTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static volatile boolean failed;
+
+    /** */
+    protected boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        failed = false;
+
+        startNodes();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        IgniteCacheAbstractTest.storeMap.clear();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @return Store factory.
+     */
+    protected Factory<CacheStore> cacheStoreFactory() {
+        return new IgniteCacheAbstractTest.TestStoreFactory();
+    }
+
+    /**
+     * @param data Data.
+     * @param cacheName Cache name.
+     * @throws Exception If failed.
+     */
+    protected void putDataInStore(Map<Object, Object> data, String cacheName) throws Exception {
+        IgniteCacheAbstractTest.storeMap.putAll(data);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    protected abstract void startNodes() throws Exception;
+
+    /**
+     * @param ccfg Cache configuration.
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("unchecked")
+    protected void invokeReadThrough(CacheConfiguration ccfg) throws Exception {
+        Ignite ignite0 = ignite(0);
+
+        ignite0.createCache(ccfg);
+
+        try {
+            int key = 0;
+
+            for (Ignite node : G.allGrids()) {
+                if (node.configuration().isClientMode() && ccfg.getNearConfiguration() != null)
+                    node.createNearCache(ccfg.getName(), ccfg.getNearConfiguration());
+            }
+
+            for (Ignite node : G.allGrids()) {
+                log.info("Test for node: " + node.name());
+
+                IgniteCache<Object, Object> cache = node.cache(ccfg.getName());
+
+                for (int i = 0; i < 50; i++)
+                    checkReadThrough(cache, key++, null, null);
+
+                Set<Object> keys = new HashSet<>();
+
+                for (int i = 0; i < 5; i++)
+                    keys.add(key++);
+
+                checkReadThroughInvokeAll(cache, keys, null, null);
+
+                keys = new HashSet<>();
+
+                for (int i = 0; i < 100; i++)
+                    keys.add(key++);
+
+                checkReadThroughInvokeAll(cache, keys, null, null);
+
+                if (ccfg.getAtomicityMode() == TRANSACTIONAL) {
+                    for (TransactionConcurrency concurrency : TransactionConcurrency.values()) {
+                        for (TransactionIsolation isolation : TransactionIsolation.values()) {
+                            log.info("Test tx [concurrency=" + concurrency + ", isolation=" + isolation + ']');
+
+                            for (int i = 0; i < 50; i++)
+                                checkReadThrough(cache, key++, concurrency, isolation);
+
+                            keys = new HashSet<>();
+
+                            for (int i = 0; i < 5; i++)
+                                keys.add(key++);
+
+                            checkReadThroughInvokeAll(cache, keys, concurrency, isolation);
+
+                            keys = new HashSet<>();
+
+                            for (int i = 0; i < 100; i++)
+                                keys.add(key++);
+
+                            checkReadThroughInvokeAll(cache, keys, concurrency, isolation);
+                        }
+                    }
+
+                    for (TransactionConcurrency concurrency : TransactionConcurrency.values()) {
+                        for (TransactionIsolation isolation : TransactionIsolation.values()) {
+                            log.info("Test tx2 [concurrency=" + concurrency + ", isolation=" + isolation + ']');
+
+                            for (int i = 0; i < 50; i++)
+                                checkReadThroughGetAndInvoke(cache, key++, concurrency, isolation);
+                        }
+                    }
+                }
+            }
+
+            ignite0.cache(ccfg.getName()).removeAll();
+        }
+        finally {
+            ignite0.destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     * @param key Key.
+     * @param concurrency Transaction concurrency.
+     * @param isolation Transaction isolation.
+     * @throws Exception If failed.
+     */
+    private void checkReadThrough(IgniteCache<Object, Object> cache,
+        Object key,
+        @Nullable TransactionConcurrency concurrency,
+        @Nullable TransactionIsolation isolation) throws Exception {
+        putDataInStore(Collections.singletonMap(key, key), cache.getName());
+
+        Transaction tx = isolation != null ? cache.unwrap(Ignite.class).transactions().txStart(concurrency, isolation)
+            : null;
+
+        try {
+            Object ret = cache.invoke(key, new TestEntryProcessor());
+
+            assertEquals(key, ret);
+
+            if (tx != null)
+                tx.commit();
+        }
+        finally {
+            if (tx != null)
+                tx.close();
+        }
+
+        checkValue(cache.getName(), key, (Integer)key + 1);
+    }
+
+    /**
+     * @param cache Cache.
+     * @param key Key.
+     * @param concurrency Transaction concurrency.
+     * @param isolation Transaction isolation.
+     * @throws Exception If failed.
+     */
+    private void checkReadThroughGetAndInvoke(IgniteCache<Object, Object> cache,
+        Object key,
+        TransactionConcurrency concurrency,
+        TransactionIsolation isolation) throws Exception {
+        putDataInStore(Collections.singletonMap(key, key), cache.getName());
+
+        try (Transaction tx = cache.unwrap(Ignite.class).transactions().txStart(concurrency, isolation)) {
+            cache.get(key);
+
+            Object ret = cache.invoke(key, new TestEntryProcessor());
+
+            assertEquals(key, ret);
+
+            tx.commit();
+        }
+
+        checkValue(cache.getName(), key, (Integer)key + 1);
+    }
+
+    /**
+     * @param cache Cache.
+     * @param keys Key.
+     * @param concurrency Transaction concurrency.
+     * @param isolation Transaction isolation.
+     * @throws Exception If failed.
+     */
+    private void checkReadThroughInvokeAll(IgniteCache<Object, Object> cache,
+        Set<Object> keys,
+        @Nullable TransactionConcurrency concurrency,
+        @Nullable TransactionIsolation isolation) throws Exception {
+        Map<Object, Object> data = U.newHashMap(keys.size());
+
+        for (Object key : keys)
+            data.put(key, key);
+
+        putDataInStore(data, cache.getName());
+
+        Transaction tx = isolation != null ? cache.unwrap(Ignite.class).transactions().txStart(concurrency, isolation)
+            : null;
+
+        try {
+            Map<Object, EntryProcessorResult<Object>> ret = cache.invokeAll(keys, new TestEntryProcessor());
+
+            assertEquals(ret.size(), keys.size());
+
+            for (Object key : keys) {
+                EntryProcessorResult<Object> res = ret.get(key);
+
+                assertNotNull(res);
+                assertEquals(key, res.get());
+            }
+
+            if (tx != null)
+                tx.commit();
+        }
+        finally {
+            if (tx != null)
+                tx.close();
+        }
+
+        for (Object key : keys)
+            checkValue(cache.getName(), key, (Integer)key + 1);
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param key Key.
+     * @param val Expected value.
+     */
+    private void checkValue(String cacheName, Object key, Object val) {
+        for (Ignite ignite : G.allGrids()) {
+            assertEquals("Unexpected value for node: " + ignite.name(),
+                val,
+                ignite.cache(cacheName).get(key));
+        }
+
+        assertFalse(failed);
+    }
+
+    /**
+     * @param cacheMode Cache mode.
+     * @param atomicityMode Atomicity mode.
+     * @param memoryMode Memory mode.
+     * @param backups Number of backups.
+     * @param nearCache Near cache flag.
+     * @return Cache configuration.
+     */
+    @SuppressWarnings("unchecked")
+    protected CacheConfiguration cacheConfiguration(CacheMode cacheMode,
+        CacheAtomicityMode atomicityMode,
+        CacheMemoryMode memoryMode,
+        int backups,
+        boolean nearCache) {
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setReadThrough(true);
+        ccfg.setWriteThrough(true);
+        ccfg.setCacheStoreFactory(cacheStoreFactory());
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setCacheMode(cacheMode);
+        ccfg.setAffinity(new RendezvousAffinityFunction(false, 32));
+        ccfg.setAtomicWriteOrderMode(PRIMARY);
+        ccfg.setMemoryMode(memoryMode);
+
+        if (nearCache)
+            ccfg.setNearConfiguration(new NearCacheConfiguration());
+
+        if (cacheMode == PARTITIONED)
+            ccfg.setBackups(backups);
+
+        return ccfg;
+    }
+
+    /**
+     *
+     */
+    static class TestEntryProcessor implements EntryProcessor<Object, Object, Object> {
+        /** {@inheritDoc} */
+        @Override public Object process(MutableEntry<Object, Object> entry, Object... args) {
+            if (!entry.exists()) {
+                failed = true;
+
+                fail();
+            }
+
+            Integer val = (Integer)entry.getValue();
+
+            if (!val.equals(entry.getKey())) {
+                failed = true;
+
+                assertEquals(val, entry.getKey());
+            }
+
+            entry.setValue(val + 1);
+
+            return val;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughSingleNodeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughSingleNodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughSingleNodeTest.java
new file mode 100644
index 0000000..b451abf
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughSingleNodeTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/**
+ *
+ */
+public class IgniteCacheInvokeReadThroughSingleNodeTest extends IgniteCacheInvokeReadThroughAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected void startNodes() throws Exception {
+        startGrid(0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughAtomic() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, ONHEAP_TIERED, 1, false));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughAtomic_Offheap() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, OFFHEAP_TIERED, 1, false));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughAtomicNearCache() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, ONHEAP_TIERED, 1, true));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughAtomicReplicated() throws Exception {
+        invokeReadThrough(cacheConfiguration(REPLICATED, ATOMIC, ONHEAP_TIERED, 0, false));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughAtomicLocal() throws Exception {
+        invokeReadThrough(cacheConfiguration(LOCAL, ATOMIC, ONHEAP_TIERED, 0, false));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughTx() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, ONHEAP_TIERED, 1, false));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughTx_Offheap() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, OFFHEAP_TIERED, 1, false));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughTxNearCache() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, ONHEAP_TIERED, 1, true));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughTxReplicated() throws Exception {
+        invokeReadThrough(cacheConfiguration(REPLICATED, TRANSACTIONAL, ONHEAP_TIERED, 0, false));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughTxLocal() throws Exception {
+        invokeReadThrough(cacheConfiguration(LOCAL, TRANSACTIONAL, ONHEAP_TIERED, 0, false));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
index 2464e81..9578227 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
@@ -17,111 +17,163 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import javax.cache.configuration.Factory;
-import javax.cache.processor.EntryProcessor;
-import javax.cache.processor.MutableEntry;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cache.store.CacheStore;
-import org.apache.ignite.configuration.NearCacheConfiguration;
-
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
 
 /**
  *
  */
-public class IgniteCacheInvokeReadThroughTest extends IgniteCacheAbstractTest {
+public class IgniteCacheInvokeReadThroughTest extends IgniteCacheInvokeReadThroughAbstractTest {
     /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-114");
-    }
+    @Override protected void startNodes() throws Exception {
+        startGridsMultiThreaded(4);
 
-    /** */
-    private static volatile boolean failed;
+        client = true;
 
-    /** {@inheritDoc} */
-    @Override protected int gridCount() {
-        return 3;
+        startGrid(4);
     }
 
-    /** {@inheritDoc} */
-    @Override protected CacheMode cacheMode() {
-        return PARTITIONED;
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughAtomic0() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, ONHEAP_TIERED, 0, false));
     }
 
-    /** {@inheritDoc} */
-    @Override protected CacheAtomicityMode atomicityMode() {
-        return TRANSACTIONAL;
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughAtomic1() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, ONHEAP_TIERED, 1, false));
     }
 
-    /** {@inheritDoc} */
-    @Override protected NearCacheConfiguration nearConfiguration() {
-        return null;
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughAtomic2() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, ONHEAP_TIERED, 2, false));
     }
 
-    /** {@inheritDoc} */
-    @Override protected Factory<CacheStore> cacheStoreFactory() {
-        return new TestStoreFactory();
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughAtomicNearCache() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, ONHEAP_TIERED, 1, true));
     }
 
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        super.beforeTestsStarted();
-
-        failed = false;
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughAtomicReplicated() throws Exception {
+        invokeReadThrough(cacheConfiguration(REPLICATED, ATOMIC, ONHEAP_TIERED, 0, false));
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testInvokeReadThrough() throws Exception {
-        IgniteCache<Integer, Integer> cache = jcache(0);
-
-        checkReadThrough(cache, primaryKey(cache));
+    public void testInvokeReadThroughAtomic0_Offheap() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, OFFHEAP_TIERED, 0, false));
+    }
 
-        checkReadThrough(cache, backupKey(cache));
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughAtomic1_Offheap() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, OFFHEAP_TIERED, 1, false));
+    }
 
-        checkReadThrough(cache, nearKey(cache));
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughAtomic2_Offheap() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, OFFHEAP_TIERED, 2, false));
     }
 
     /**
-     * @param cache Cache.
-     * @param key Key.
+     * @throws Exception If failed.
      */
-    private void checkReadThrough(IgniteCache<Integer, Integer> cache, Integer key) {
-        log.info("Test [key=" + key + ']');
+    public void testInvokeReadThroughAtomicNearCache_Offheap() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, OFFHEAP_TIERED, 1, true));
+    }
 
-        storeMap.put(key, key);
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughAtomicReplicated_Offheap() throws Exception {
+        invokeReadThrough(cacheConfiguration(REPLICATED, ATOMIC, OFFHEAP_TIERED, 0, false));
+    }
 
-        Object ret = cache.invoke(key, new EntryProcessor<Integer, Integer, Object>() {
-            @Override public Object process(MutableEntry<Integer, Integer> entry, Object... args) {
-                if (!entry.exists()) {
-                    failed = true;
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughTx0() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, ONHEAP_TIERED, 0, false));
+    }
 
-                    fail();
-                }
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughTx1() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, ONHEAP_TIERED, 1, false));
+    }
 
-                Integer val = entry.getValue();
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughTx2() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, ONHEAP_TIERED, 2, false));
+    }
 
-                if (!val.equals(entry.getKey())) {
-                    failed = true;
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughTxNearCache() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, ONHEAP_TIERED, 1, true));
+    }
 
-                    assertEquals(val, entry.getKey());
-                }
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughTxReplicated() throws Exception {
+        invokeReadThrough(cacheConfiguration(REPLICATED, TRANSACTIONAL, ONHEAP_TIERED, 0, false));
+    }
 
-                entry.setValue(val + 1);
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughTx0_Offheap() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, OFFHEAP_TIERED, 0, false));
+    }
 
-                return val;
-            }
-        });
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughTx1_Offheap() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, OFFHEAP_TIERED, 1, false));
+    }
 
-        assertEquals(key, ret);
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughTx2_Offheap() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, OFFHEAP_TIERED, 2, false));
+    }
 
-        for (int i = 0; i < gridCount(); i++)
-            assertEquals("Unexpected value for node: " + i, key + 1, jcache(i).get(key));
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughTxNearCache_Offheap() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, OFFHEAP_TIERED, 1, true));
+    }
 
-        assertFalse(failed);
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughTxReplicated_Offheap() throws Exception {
+        invokeReadThrough(cacheConfiguration(REPLICATED, TRANSACTIONAL, OFFHEAP_TIERED, 0, false));
     }
 }


[14/16] ignite git commit: Fixed compilation

Posted by vk...@apache.org.
Fixed compilation


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e9546aa7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e9546aa7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e9546aa7

Branch: refs/heads/master
Commit: e9546aa76be97dde30d3d4777b5770d15cc4d1fa
Parents: 10916ed
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Wed Jun 15 13:44:23 2016 +0300
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Wed Jun 15 13:52:16 2016 +0300

----------------------------------------------------------------------
 .../spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala    | 1 -
 1 file changed, 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e9546aa7/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index fb15ff1..f198f0c 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -32,7 +32,6 @@ import org.apache.spark.sql._
 import org.apache.spark._
 
 import scala.collection.JavaConversions._
-import scala.reflect.macros.whitebox
 
 /**
  * Ignite RDD. Represents Ignite cache as Spark RDD abstraction.


[12/16] ignite git commit: IGNITE-3215 - Added IgniteRDD.withKeepBinary method

Posted by vk...@apache.org.
IGNITE-3215 - Added IgniteRDD.withKeepBinary method


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6faa1f22
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6faa1f22
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6faa1f22

Branch: refs/heads/master
Commit: 6faa1f229bf24d5aa5dc6fc99023283639ecacfe
Parents: ab7fa49
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Tue Jun 14 18:09:10 2016 +0300
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Wed Jun 15 12:21:14 2016 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/spark/IgniteContext.scala |  4 +--
 .../org/apache/ignite/spark/IgniteRDD.scala     | 19 ++++++++---
 .../apache/ignite/spark/JavaIgniteContext.scala |  4 +--
 .../org/apache/ignite/spark/JavaIgniteRDD.scala |  2 ++
 .../ignite/spark/impl/IgniteAbstractRDD.scala   | 15 ++++++---
 .../apache/ignite/spark/impl/IgniteSqlRDD.scala |  5 +--
 .../spark/impl/JavaIgniteAbstractRDD.scala      | 34 --------------------
 .../org/apache/ignite/spark/IgniteRDDSpec.scala | 25 ++++++++++++--
 8 files changed, 58 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6faa1f22/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
index 182605c..c63a370 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
@@ -106,7 +106,7 @@ class IgniteContext[K, V](
      * @return `IgniteRDD` instance.
      */
     def fromCache(cacheName: String): IgniteRDD[K, V] = {
-        new IgniteRDD[K, V](this, cacheName, null)
+        new IgniteRDD[K, V](this, cacheName, null, false)
     }
 
     /**
@@ -117,7 +117,7 @@ class IgniteContext[K, V](
      * @return `IgniteRDD` instance.
      */
     def fromCache(cacheCfg: CacheConfiguration[K, V]) = {
-        new IgniteRDD[K, V](this, cacheCfg.getName, cacheCfg)
+        new IgniteRDD[K, V](this, cacheCfg.getName, cacheCfg, false)
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/6faa1f22/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index fa96212..cad96b9 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -45,8 +45,9 @@ import scala.collection.JavaConversions._
 class IgniteRDD[K, V] (
     val ic: IgniteContext[K, V],
     val cacheName: String,
-    val cacheCfg: CacheConfiguration[K, V]
-) extends IgniteAbstractRDD[(K, V), K, V] (ic, cacheName, cacheCfg) {
+    val cacheCfg: CacheConfiguration[K, V],
+    val keepBinary: Boolean
+) extends IgniteAbstractRDD[(K, V), K, V] (ic, cacheName, cacheCfg, keepBinary) {
     /**
      * Computes iterator based on given partition.
      *
@@ -127,7 +128,8 @@ class IgniteRDD[K, V] (
 
         qry.setArgs(args.map(_.asInstanceOf[Object]):_*)
 
-        new IgniteSqlRDD[(K, V), Cache.Entry[K, V], K, V](ic, cacheName, cacheCfg, qry, entry \u21d2 (entry.getKey, entry.getValue))
+        new IgniteSqlRDD[(K, V), Cache.Entry[K, V], K, V](ic, cacheName, cacheCfg, qry,
+            entry \u21d2 (entry.getKey, entry.getValue), keepBinary)
     }
 
     /**
@@ -144,7 +146,8 @@ class IgniteRDD[K, V] (
 
         val schema = buildSchema(ensureCache().query(qry).asInstanceOf[QueryCursorEx[java.util.List[_]]].fieldsMeta())
 
-        val rowRdd = new IgniteSqlRDD[Row, java.util.List[_], K, V](ic, cacheName, cacheCfg, qry, list \u21d2 Row.fromSeq(list))
+        val rowRdd = new IgniteSqlRDD[Row, java.util.List[_], K, V](
+            ic, cacheName, cacheCfg, qry, list \u21d2 Row.fromSeq(list), keepBinary)
 
         ic.sqlContext.createDataFrame(rowRdd, schema)
     }
@@ -290,6 +293,14 @@ class IgniteRDD[K, V] (
         ensureCache().removeAll()
     }
 
+    def withKeepBinary[K1, V1](): IgniteRDD[K1, V1] = {
+        new IgniteRDD[K1, V1](
+            ic.asInstanceOf[IgniteContext[K1, V1]],
+            cacheName,
+            cacheCfg.asInstanceOf[CacheConfiguration[K1, V1]],
+            true)
+    }
+
     /**
      * Builds spark schema from query metadata.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/6faa1f22/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
index 44b1cd9..25184e7 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
@@ -52,10 +52,10 @@ class JavaIgniteContext[K, V](
     }
 
     def fromCache(cacheName: String): JavaIgniteRDD[K, V] =
-        JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheName, null))
+        JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheName, null, false))
 
     def fromCache(cacheCfg: CacheConfiguration[K, V]) =
-        JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheCfg.getName, cacheCfg))
+        JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheCfg.getName, cacheCfg, false))
 
     def ignite(): Ignite = ic.ignite()
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/6faa1f22/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
index cac0e15..1efc6ae 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
@@ -96,6 +96,8 @@ class JavaIgniteRDD[K, V](override val rdd: IgniteRDD[K, V])
         savePairs(jrdd, f, overwrite = false)
 
     def clear(): Unit = rdd.clear()
+
+    def withKeepBinary[K1, V1](): JavaIgniteRDD[K1, V1] = new JavaIgniteRDD[K1, V1](rdd.withKeepBinary[K1, V1]())
 }
 
 object JavaIgniteRDD {

http://git-wip-us.apache.org/repos/asf/ignite/blob/6faa1f22/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteAbstractRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteAbstractRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteAbstractRDD.scala
index 25b3b56..9d5171c 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteAbstractRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteAbstractRDD.scala
@@ -27,13 +27,20 @@ import scala.reflect.ClassTag
 abstract class IgniteAbstractRDD[R:ClassTag, K, V] (
     ic: IgniteContext[K, V],
     cacheName: String,
-    cacheCfg: CacheConfiguration[K, V]
+    cacheCfg: CacheConfiguration[K, V],
+    keepBinary: Boolean
 ) extends RDD[R] (ic.sparkContext, deps = Nil) {
     protected def ensureCache(): IgniteCache[K, V] = {
         // Make sure to deploy the cache
-        if (cacheCfg != null)
-            ic.ignite().getOrCreateCache(cacheCfg)
+        val cache =
+            if (cacheCfg != null)
+                ic.ignite().getOrCreateCache(cacheCfg)
+            else
+                ic.ignite().getOrCreateCache(cacheName)
+
+        if (keepBinary)
+            cache.withKeepBinary()
         else
-            ic.ignite().getOrCreateCache(cacheName)
+            cache.asInstanceOf[IgniteCache[K, V]]
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6faa1f22/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
index 762a6ed..b4579aa 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
@@ -29,8 +29,9 @@ class IgniteSqlRDD[R: ClassTag, T, K, V](
     cacheName: String,
     cacheCfg: CacheConfiguration[K, V],
     qry: Query[T],
-    conv: (T) \u21d2 R
-) extends IgniteAbstractRDD[R, K, V](ic, cacheName, cacheCfg) {
+    conv: (T) \u21d2 R,
+    keepBinary: Boolean
+) extends IgniteAbstractRDD[R, K, V](ic, cacheName, cacheCfg, keepBinary) {
     override def compute(split: Partition, context: TaskContext): Iterator[R] = {
         new IgniteQueryIterator[T, R](ensureCache().query(qry).iterator(), conv)
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6faa1f22/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala
deleted file mode 100644
index 13bd3e8..0000000
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spark.impl
-
-import org.apache.ignite.IgniteCache
-import org.apache.ignite.spark.IgniteRDD
-import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike}
-
-abstract class JavaIgniteAbstractRDD[K, V](val rdd: IgniteRDD[K, V])
-    extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] {
-
-    protected def ensureCache(): IgniteCache[K, V] = {
-        // Make sure to deploy the cache
-        if (rdd.cacheCfg != null)
-            rdd.ic.ignite().getOrCreateCache(rdd.cacheCfg)
-        else
-            rdd.ic.ignite().getOrCreateCache(rdd.cacheName)
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6faa1f22/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala
index 15a51ae..dff82f4 100644
--- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala
@@ -18,7 +18,7 @@
 package org.apache.ignite.spark
 
 import org.apache.ignite.Ignition
-import org.apache.ignite.cache.query.annotations.{QueryTextField, QuerySqlField}
+import org.apache.ignite.cache.query.annotations.{QuerySqlField, QueryTextField}
 import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration}
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder
@@ -26,9 +26,10 @@ import org.apache.spark.SparkContext
 import org.junit.runner.RunWith
 import org.scalatest._
 import org.scalatest.junit.JUnitRunner
-import scala.collection.JavaConversions._
 
+import scala.collection.JavaConversions._
 import IgniteRDDSpec._
+import org.apache.ignite.binary.BinaryObject
 
 import scala.annotation.meta.field
 
@@ -294,6 +295,26 @@ class IgniteRDDSpec extends FunSpec with Matchers with BeforeAndAfterAll with Be
                 sc.stop()
             }
         }
+
+        it("should properly work with binary objects") {
+            val sc = new SparkContext("local[*]", "test")
+
+            try {
+                val ic = new IgniteContext[String, Entity](sc, () \u21d2 configuration("client", client = true))
+
+                val cache = ic.fromCache(PARTITIONED_CACHE_NAME)
+
+                cache.savePairs(sc.parallelize(0 until 10, 2).map(i \u21d2 (String.valueOf(i),
+                    new Entity(i, "name" + i, i * 100))))
+
+                val res = cache.withKeepBinary[String, BinaryObject]().map(t \u21d2 t._2.field[Int]("salary")).collect()
+
+                println(res)
+            }
+            finally {
+                sc.stop()
+            }
+        }
     }
 
     override protected def beforeEach() = {


[08/16] ignite git commit: Merge branch 'gridgain-7.6.1' of https://github.com/gridgain/apache-ignite into gridgain-7.6.1

Posted by vk...@apache.org.
Merge branch 'gridgain-7.6.1' of https://github.com/gridgain/apache-ignite into gridgain-7.6.1


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fda4110b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fda4110b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fda4110b

Branch: refs/heads/master
Commit: fda4110baf5cba8c6d32d0cdd3d7e2c8945af2e5
Parents: 6094ee0 3a4f587
Author: Denis Magda <dm...@gridgain.com>
Authored: Tue Jun 14 13:25:38 2016 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Tue Jun 14 13:25:38 2016 +0300

----------------------------------------------------------------------
 .../affinity/fair/FairAffinityFunction.java     |   2 +
 .../rendezvous/RendezvousAffinityFunction.java  |   2 +
 .../configuration/CacheConfiguration.java       |   3 +
 .../ignite/internal/MarshallerContextImpl.java  |  51 ++-
 .../ignite/internal/binary/BinaryContext.java   |  43 ++-
 .../GridClientConnectionManagerAdapter.java     |  25 +-
 .../connection/GridClientNioTcpConnection.java  |   3 +
 .../GridClientOptimizedMarshaller.java          |   4 +-
 .../GridClientZipOptimizedMarshaller.java       | 167 ++++++++
 .../impl/GridTcpRouterNioListenerAdapter.java   |  11 +-
 .../processors/cache/GridCacheAdapter.java      |  46 ++-
 .../processors/cache/GridCacheEntryEx.java      |   9 +-
 .../processors/cache/GridCacheMapEntry.java     | 100 +++--
 .../processors/cache/GridCacheProcessor.java    |   4 +
 .../processors/cache/GridCacheUtils.java        |   3 +
 .../binary/CacheObjectBinaryProcessorImpl.java  |  29 +-
 .../distributed/dht/GridDhtCacheAdapter.java    |  10 +-
 .../distributed/dht/GridDhtLockFuture.java      |  18 +-
 .../dht/GridDhtTransactionalCacheAdapter.java   |   2 -
 .../distributed/dht/GridDhtTxPrepareFuture.java |  19 +-
 .../dht/GridPartitionedGetFuture.java           |   2 -
 .../dht/GridPartitionedSingleGetFuture.java     |   2 -
 .../dht/atomic/GridDhtAtomicCache.java          |   8 -
 .../dht/colocated/GridDhtColocatedCache.java    |   4 +-
 .../dht/preloader/GridDhtForceKeysFuture.java   |   3 +-
 .../dht/preloader/GridDhtPartitionDemander.java |   3 +-
 .../dht/preloader/GridDhtPartitionMap2.java     |   7 +-
 .../distributed/near/GridNearGetFuture.java     |   4 -
 .../cache/distributed/near/GridNearTxLocal.java |   7 +-
 .../local/atomic/GridLocalAtomicCache.java      |   8 -
 .../continuous/CacheContinuousQueryManager.java | 135 +++++++
 .../cache/transactions/IgniteTxAdapter.java     |   2 -
 .../cache/transactions/IgniteTxEntry.java       |  24 +-
 .../cache/transactions/IgniteTxHandler.java     |  46 +++
 .../transactions/IgniteTxLocalAdapter.java      |  34 +-
 .../cacheobject/IgniteCacheObjectProcessor.java |   7 +
 .../IgniteCacheObjectProcessorImpl.java         |   5 +
 .../continuous/GridContinuousProcessor.java     | 216 ++++++++---
 .../datastreamer/DataStreamerImpl.java          |   3 +-
 .../message/GridClientHandshakeRequest.java     |   4 +-
 .../protocols/tcp/GridTcpRestNioListener.java   |  19 +-
 .../rest/protocols/tcp/GridTcpRestProtocol.java |  12 +-
 .../service/GridServiceProcessor.java           | 305 ++++++++-------
 .../ignite/internal/util/nio/GridNioServer.java |  10 +-
 .../ignite/internal/visor/cache/VisorCache.java |  56 +--
 .../visor/cache/VisorCachePartition.java        |  89 +++++
 .../visor/cache/VisorCachePartitions.java       |  88 +++++
 .../visor/cache/VisorCachePartitionsTask.java   | 152 ++++++++
 .../internal/visor/cache/VisorCacheV3.java      |  68 +---
 .../ignite/spi/discovery/tcp/ServerImpl.java    |   4 +-
 ...eClientReconnectContinuousProcessorTest.java |  60 ++-
 .../ignite/internal/binary/AffinityKey.java     |  69 ++++
 .../binary/GridBinaryAffinityKeySelfTest.java   |  15 +
 ...aultBinaryMappersBinaryMetaDataSelfTest.java |  17 +
 .../processors/cache/GridCacheTestEntryEx.java  |   5 +-
 .../cache/IgniteCacheAbstractTest.java          |   2 +-
 ...gniteCacheInvokeReadThroughAbstractTest.java | 382 +++++++++++++++++++
 ...iteCacheInvokeReadThroughSingleNodeTest.java | 106 +++++
 .../cache/IgniteCacheInvokeReadThroughTest.java | 182 +++++----
 .../IgniteCacheReadThroughStoreCallTest.java    | 288 ++++++++++++++
 .../IgniteCacheLoaderWriterAbstractTest.java    |  10 +
 ...ridCacheContinuousQueryAbstractSelfTest.java |   2 +-
 .../IgniteNoCustomEventsOnNodeStart.java        |  85 +++++
 .../service/GridServiceClientNodeTest.java      | 102 ++++-
 .../testsuites/IgniteCacheTestSuite2.java       |   3 +
 .../testsuites/IgniteCacheTestSuite4.java       |   4 +
 66 files changed, 2609 insertions(+), 601 deletions(-)
----------------------------------------------------------------------



[06/16] ignite git commit: Tcp discovery minor: create buffered input stream with size = socket receiveBufferSize. (cherry picked from commit 54425bf)

Posted by vk...@apache.org.
Tcp discovery minor: create buffered input stream with size = socket receiveBufferSize.
(cherry picked from commit 54425bf)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3a4f587b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3a4f587b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3a4f587b

Branch: refs/heads/master
Commit: 3a4f587b457bd1bb86665bf51757a2aa3f986f07
Parents: c88ce20
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jun 14 11:42:52 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jun 14 11:50:51 2016 +0300

----------------------------------------------------------------------
 .../java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java     | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3a4f587b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index a9a86bc..37a1539 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -5206,7 +5206,9 @@ class ServerImpl extends TcpDiscoveryImpl {
                     for (IgniteInClosure<Socket> connLsnr : spi.incomeConnLsnrs)
                         connLsnr.apply(sock);
 
-                    in = new BufferedInputStream(sock.getInputStream());
+                    int rcvBufSize = sock.getReceiveBufferSize();
+
+                    in = new BufferedInputStream(sock.getInputStream(), rcvBufSize > 0 ? rcvBufSize : 8192);
 
                     byte[] buf = new byte[4];
                     int read = 0;


[05/16] ignite git commit: ignite-3038 Do not use custom discovery events to start continuous queries for system caches (cherry picked from commit 7f878c5)

Posted by vk...@apache.org.
ignite-3038 Do not use custom discovery events to start continuous queries for system caches
(cherry picked from commit 7f878c5)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c88ce20a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c88ce20a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c88ce20a

Branch: refs/heads/master
Commit: c88ce20a62e315418cac014c3419ce254c591bee
Parents: e5fcebb
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jun 14 11:16:29 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jun 14 11:16:29 2016 +0300

----------------------------------------------------------------------
 .../processors/continuous/IgniteNoCustomEventsOnNodeStart.java      | 1 -
 1 file changed, 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c88ce20a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java
index cbeb23f..5ecc27a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.continuous;
 
-import com.mchange.v2.c3p0.util.TestUtils;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
 import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;


[11/16] ignite git commit: IGNITE-3273 - fixed

Posted by vk...@apache.org.
IGNITE-3273 - fixed


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ab7fa496
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ab7fa496
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ab7fa496

Branch: refs/heads/master
Commit: ab7fa496459d9e8560c56d0daa9c56c171dea405
Parents: 84547ef
Author: Sergi Vladykin <se...@gmail.com>
Authored: Tue Jun 14 21:47:11 2016 +0300
Committer: Sergi Vladykin <se...@gmail.com>
Committed: Wed Jun 15 10:29:39 2016 +0300

----------------------------------------------------------------------
 .../processors/query/h2/sql/GridSqlConst.java   |  5 ++
 .../processors/query/h2/sql/GridSqlJoin.java    | 17 +++--
 .../processors/query/h2/sql/GridSqlType.java    |  5 ++
 .../query/IgniteSqlSplitterSelfTest.java        | 75 ++++++++++++++++++++
 4 files changed, 93 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ab7fa496/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlConst.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlConst.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlConst.java
index 36c95ce..976eb2c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlConst.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlConst.java
@@ -19,12 +19,17 @@ package org.apache.ignite.internal.processors.query.h2.sql;
 
 import java.util.Collections;
 import org.h2.value.Value;
+import org.h2.value.ValueBoolean;
 
 /**
  * Constant value.
  */
 public class GridSqlConst extends GridSqlElement implements GridSqlValue {
     /** */
+    public static final GridSqlElement TRUE = new GridSqlConst(ValueBoolean.get(true))
+        .resultType(GridSqlType.BOOLEAN);
+
+    /** */
     private final Value val;
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/ab7fa496/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlJoin.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlJoin.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlJoin.java
index 0148404..f1ad2e5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlJoin.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlJoin.java
@@ -36,13 +36,15 @@ public class GridSqlJoin extends GridSqlElement {
      * @param on Join condition.
      */
     public GridSqlJoin(GridSqlElement leftTbl, GridSqlElement rightTbl, boolean leftOuter, @Nullable GridSqlElement on) {
-        super(new ArrayList<GridSqlElement>(on == null ? 2 : 3));
+        super(new ArrayList<GridSqlElement>(3));
 
         addChild(leftTbl);
         addChild(rightTbl);
 
-        if (on != null)
-            addChild(on);
+        if (on == null) // To avoid query nesting issues in FROM clause we need to always generate ON condition.
+            on = GridSqlConst.TRUE;
+
+        addChild(on);
 
         this.leftOuter = leftOuter;
     }
@@ -64,8 +66,8 @@ public class GridSqlJoin extends GridSqlElement {
     /**
      * @return {@code JOIN ON} condition.
      */
-    @Nullable public GridSqlElement on() {
-        return size() < 3 ? null : child(2);
+    public GridSqlElement on() {
+        return child(2);
     }
 
     /** {@inheritDoc} */
@@ -78,10 +80,7 @@ public class GridSqlJoin extends GridSqlElement {
 
         buff.append(rightTable().getSQL());
 
-        GridSqlElement on = on();
-
-        if (on != null)
-            buff.append(" \n ON ").append(StringUtils.unEnclose(on.getSQL()));
+        buff.append(" \n ON ").append(StringUtils.unEnclose(on().getSQL()));
 
         return buff.toString();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ab7fa496/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlType.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlType.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlType.java
index febf174..efe9138 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlType.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlType.java
@@ -22,6 +22,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.h2.expression.Expression;
 import org.h2.table.Column;
 import org.h2.value.Value;
+import org.h2.value.ValueBoolean;
 import org.h2.value.ValueDouble;
 import org.h2.value.ValueLong;
 
@@ -43,6 +44,10 @@ public final class GridSqlType {
     /** */
     public static final GridSqlType UUID = new GridSqlType(Value.UUID, 0, Integer.MAX_VALUE, 36, "UUID");
 
+    /** */
+    public static final GridSqlType BOOLEAN = new GridSqlType(Value.BOOLEAN, 0, ValueBoolean.PRECISION,
+        ValueBoolean.DISPLAY_SIZE, "BOOLEAN");
+
     /** H2 type. */
     private final int type;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ab7fa496/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
index d0e2780..fd52469 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
@@ -313,6 +313,81 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     *
+     */
+    public void testImplicitJoinConditionGeneration() {
+        IgniteCache<Integer, Person> p = ignite(0).createCache(cacheConfig("P", true, Integer.class, Person.class));
+        IgniteCache<Integer, Department> d = ignite(0).createCache(cacheConfig("D", true, Integer.class, Department.class));
+        IgniteCache<Integer, Org> o = ignite(0).createCache(cacheConfig("O", true, Integer.class, Org.class));
+
+        try {
+            info("Plan: " + p.query(new SqlFieldsQuery(
+                "explain select P.Person.*,dep.*,org.* " +
+                    "from P.Person inner join D.Department dep ON dep.id=P.Person.depId " +
+                    "left join O.Org org ON org.id=dep.orgId"
+            )).getAll());
+
+            assertEquals(0, p.query(new SqlFieldsQuery(
+                "select P.Person.*,dep.*,org.* " +
+                    "from P.Person inner join D.Department dep ON dep.id=P.Person.depId " +
+                    "left join O.Org org ON org.id=dep.orgId"
+            )).getAll().size());
+        }
+        finally {
+            p.destroy();
+            d.destroy();
+            o.destroy();
+        }
+    }
+
+    /**
+     *
+     */
+    public static class Person {
+        /** */
+        @QuerySqlField
+        private int id;
+
+        /** */
+        @QuerySqlField
+        private String name;
+
+        /** */
+        @QuerySqlField
+        private int depId;
+    }
+
+    /**
+     *
+     */
+    public static class Org {
+        /** */
+        @QuerySqlField(index = true)
+        private int id;
+
+        /** */
+        @QuerySqlField
+        private String name;
+    }
+
+    /**
+     *
+     */
+    public static class Department {
+        /** */
+        @QuerySqlField(index = true)
+        private int id;
+
+        /** */
+        @QuerySqlField(index = true)
+        private int orgId;
+
+        /** */
+        @QuerySqlField
+        private String name;
+    }
+
+    /**
      * Test value.
      */
     private static class GroupIndexTestValue implements Serializable {


[13/16] ignite git commit: IGNITE-3215 - Added IgniteRDD.withKeepBinary method (ScalaDoc)

Posted by vk...@apache.org.
IGNITE-3215 - Added IgniteRDD.withKeepBinary method (ScalaDoc)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/10916ed0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/10916ed0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/10916ed0

Branch: refs/heads/master
Commit: 10916ed03688f820cef93866c8ea71226bba3716
Parents: 6faa1f2
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Wed Jun 15 12:19:44 2016 +0300
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Wed Jun 15 12:21:22 2016 +0300

----------------------------------------------------------------------
 .../src/main/scala/org/apache/ignite/spark/IgniteRDD.scala    | 7 +++++++
 1 file changed, 7 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/10916ed0/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index cad96b9..fb15ff1 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql._
 import org.apache.spark._
 
 import scala.collection.JavaConversions._
+import scala.reflect.macros.whitebox
 
 /**
  * Ignite RDD. Represents Ignite cache as Spark RDD abstraction.
@@ -293,6 +294,12 @@ class IgniteRDD[K, V] (
         ensureCache().removeAll()
     }
 
+    /**
+     * Returns `IgniteRDD` that will operate with binary objects. This method
+     * behaves similar to [[org.apache.ignite.IgniteCache#withKeepBinary]].
+     *
+     * @return New `IgniteRDD` instance for binary objects.
+     */
     def withKeepBinary[K1, V1](): IgniteRDD[K1, V1] = {
         new IgniteRDD[K1, V1](
             ic.asInstanceOf[IgniteContext[K1, V1]],


[16/16] ignite git commit: Merge branch 'gridgain-7.6.1'

Posted by vk...@apache.org.
Merge branch 'gridgain-7.6.1'

# Conflicts:
#	modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b742f5fc
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b742f5fc
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b742f5fc

Branch: refs/heads/master
Commit: b742f5fc8eadd850a01246be7f88f08f424a471f
Parents: b500d3a 5c14928
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Wed Jun 15 15:04:34 2016 +0300
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Wed Jun 15 15:04:34 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/MarshallerContextImpl.java  |  51 ++-
 .../ignite/internal/binary/BinaryContext.java   |  43 ++-
 .../processors/cache/GridCacheAdapter.java      |  46 ++-
 .../processors/cache/GridCacheEntryEx.java      |   9 +-
 .../processors/cache/GridCacheMapEntry.java     | 100 +++--
 .../processors/cache/GridCacheUtils.java        |   3 +
 .../binary/CacheObjectBinaryProcessorImpl.java  |  29 +-
 .../distributed/dht/GridDhtCacheAdapter.java    |  10 +-
 .../distributed/dht/GridDhtLockFuture.java      |  18 +-
 .../dht/GridDhtTransactionalCacheAdapter.java   |   2 -
 .../distributed/dht/GridDhtTxPrepareFuture.java |  19 +-
 .../dht/GridPartitionedGetFuture.java           |   2 -
 .../dht/GridPartitionedSingleGetFuture.java     |   2 -
 .../dht/atomic/GridDhtAtomicCache.java          |   8 -
 .../dht/colocated/GridDhtColocatedCache.java    |   4 +-
 .../dht/preloader/GridDhtForceKeysFuture.java   |   3 +-
 .../dht/preloader/GridDhtPartitionDemander.java |   3 +-
 .../distributed/near/GridNearGetFuture.java     |   4 -
 .../cache/distributed/near/GridNearTxLocal.java |   7 +-
 .../local/atomic/GridLocalAtomicCache.java      |   8 -
 .../continuous/CacheContinuousQueryEntry.java   |  16 +
 .../continuous/CacheContinuousQueryHandler.java |   2 +-
 .../continuous/CacheContinuousQueryManager.java | 135 +++++++
 .../cache/transactions/IgniteTxAdapter.java     |   2 -
 .../cache/transactions/IgniteTxEntry.java       |  24 +-
 .../cache/transactions/IgniteTxHandler.java     |  46 +++
 .../transactions/IgniteTxLocalAdapter.java      |  34 +-
 .../cacheobject/IgniteCacheObjectProcessor.java |   7 +
 .../IgniteCacheObjectProcessorImpl.java         |   5 +
 .../continuous/GridContinuousProcessor.java     | 216 ++++++++---
 .../datastreamer/DataStreamerImpl.java          |   3 +-
 .../service/GridServiceProcessor.java           | 305 ++++++++-------
 .../ignite/spi/discovery/tcp/ServerImpl.java    |   4 +-
 ...eClientReconnectContinuousProcessorTest.java |  60 ++-
 .../ignite/internal/binary/AffinityKey.java     |  69 ++++
 .../binary/GridBinaryAffinityKeySelfTest.java   |  15 +
 ...aultBinaryMappersBinaryMetaDataSelfTest.java |  17 +
 .../processors/cache/GridCacheTestEntryEx.java  |   5 +-
 .../cache/IgniteCacheAbstractTest.java          |   2 +-
 ...gniteCacheInvokeReadThroughAbstractTest.java | 382 +++++++++++++++++++
 ...iteCacheInvokeReadThroughSingleNodeTest.java | 106 +++++
 .../cache/IgniteCacheInvokeReadThroughTest.java | 182 +++++----
 .../IgniteCacheReadThroughStoreCallTest.java    | 288 ++++++++++++++
 .../IgniteCacheLoaderWriterAbstractTest.java    |  10 +
 ...ridCacheContinuousQueryAbstractSelfTest.java |   2 +-
 ...niteCacheContinuousQueryBackupQueueTest.java | 135 +++++++
 .../IgniteNoCustomEventsOnNodeStart.java        |  85 +++++
 .../service/GridServiceClientNodeTest.java      | 102 ++++-
 .../testsuites/IgniteCacheTestSuite2.java       |   3 +
 .../testsuites/IgniteCacheTestSuite4.java       |   4 +
 .../IgniteCacheQuerySelfTestSuite3.java         |   2 +
 .../org/apache/ignite/spark/IgniteContext.scala |   4 +-
 .../org/apache/ignite/spark/IgniteRDD.scala     |  25 +-
 .../apache/ignite/spark/JavaIgniteContext.scala |   4 +-
 .../org/apache/ignite/spark/JavaIgniteRDD.scala |   2 +
 .../ignite/spark/impl/IgniteAbstractRDD.scala   |  15 +-
 .../apache/ignite/spark/impl/IgniteSqlRDD.scala |   5 +-
 .../spark/impl/JavaIgniteAbstractRDD.scala      |  34 --
 .../org/apache/ignite/spark/IgniteRDDSpec.scala |  25 +-
 59 files changed, 2242 insertions(+), 511 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b742f5fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/b742f5fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/b742f5fc/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
----------------------------------------------------------------------


[15/16] ignite git commit: IGNITE-2344 - WebSessionFilter doesn't support session ID renewal. Fixes #780.

Posted by vk...@apache.org.
IGNITE-2344 - WebSessionFilter doesn't support session ID renewal. Fixes #780.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5c149287
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5c149287
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5c149287

Branch: refs/heads/master
Commit: 5c1492872358cac78a6954796ea1e3420bf7c26e
Parents: e9546aa
Author: samaitra <sa...@gmail.com>
Authored: Wed Jun 15 12:58:33 2016 +0300
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Wed Jun 15 13:52:36 2016 +0300

----------------------------------------------------------------------
 .../cache/websession/WebSessionFilter.java      |  39 ++-
 .../internal/websession/WebSessionSelfTest.java | 330 ++++++++++++++++++-
 2 files changed, 357 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5c149287/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionFilter.java
----------------------------------------------------------------------
diff --git a/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionFilter.java b/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionFilter.java
index 2b7442f..6c51876 100644
--- a/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionFilter.java
+++ b/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionFilter.java
@@ -31,9 +31,8 @@ import javax.servlet.ServletContext;
 import javax.servlet.ServletException;
 import javax.servlet.ServletRequest;
 import javax.servlet.ServletResponse;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletRequestWrapper;
-import javax.servlet.http.HttpSession;
+import javax.servlet.http.*;
+
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteClientDisconnectedException;
@@ -963,6 +962,22 @@ public class WebSessionFilter implements Filter {
 
             return newId;
         }
+
+        /** {@inheritDoc} */
+        @Override public void login(String username, String password) throws ServletException{
+            HttpServletRequest req = (HttpServletRequest)getRequest();
+
+            req.login(username, password);
+
+            String newId = req.getSession(false).getId();
+
+            this.ses.setId(newId);
+
+            this.ses = createSession(ses, newId);
+            this.ses.servletContext(ctx);
+            this.ses.filter(WebSessionFilter.this);
+            this.ses.resetUpdates();
+        }
     }
 
     /**
@@ -1026,5 +1041,23 @@ public class WebSessionFilter implements Filter {
 
             return newId;
         }
+
+        /** {@inheritDoc} */
+        @Override public void login(String username, String password) throws ServletException{
+            final HttpServletRequest req = (HttpServletRequest)getRequest();
+
+            req.login(username, password);
+
+            final String newId = req.getSession(false).getId();
+
+            if (!F.eq(newId, ses.getId())) {
+                try {
+                    ses = createSessionV2(ses, newId);
+                }
+                catch (IOException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5c149287/modules/web/src/test/java/org/apache/ignite/internal/websession/WebSessionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/web/src/test/java/org/apache/ignite/internal/websession/WebSessionSelfTest.java b/modules/web/src/test/java/org/apache/ignite/internal/websession/WebSessionSelfTest.java
index 1e01d3c..0ab1130 100644
--- a/modules/web/src/test/java/org/apache/ignite/internal/websession/WebSessionSelfTest.java
+++ b/modules/web/src/test/java/org/apache/ignite/internal/websession/WebSessionSelfTest.java
@@ -17,16 +17,11 @@
 
 package org.apache.ignite.internal.websession;
 
-import java.io.BufferedReader;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.Serializable;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
+import java.io.*;
+import java.net.HttpURLConnection;
 import java.net.URL;
 import java.net.URLConnection;
-import java.util.Random;
+import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -51,6 +46,7 @@ import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.eclipse.jetty.security.HashLoginService;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.eclipse.jetty.webapp.WebAppContext;
@@ -92,6 +88,13 @@ public class WebSessionSelfTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testSessionRenewalDuringLogin() throws Exception {
+        testSessionRenewalDuringLogin("/modules/core/src/test/config/websession/example-cache.xml");
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testSingleRequestMetaInf() throws Exception {
         testSingleRequest("ignite-webapp-config.xml");
     }
@@ -293,6 +296,171 @@ public class WebSessionSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Tests session renewal during login. Checks modification attribute in cache.
+     *
+     * @param cfg Configuration.
+     * @throws Exception If failed.
+     */
+    private void testSessionRenewalDuringLogin(String cfg) throws Exception {
+        Server srv = null;
+        String sesId = null;
+        try {
+            srv = startServerWithLoginService(TEST_JETTY_PORT, cfg, null, new SessionLoginServlet());
+
+            URLConnection conn = new URL("http://localhost:" + TEST_JETTY_PORT + "/ignitetest/test").openConnection();
+
+            conn.connect();
+
+            String sesIdCookie1 = getSessionIdFromCookie(conn);
+
+            X.println(">>>", "Initial session Cookie: " + sesIdCookie1, ">>>");
+
+            try (BufferedReader rdr = new BufferedReader(new InputStreamReader(conn.getInputStream()))) {
+                sesId = rdr.readLine();
+
+                if (!keepBinary()) {
+                    IgniteCache<String, HttpSession> cache = G.ignite().cache(getCacheName());
+
+                    assertNotNull(cache);
+
+                    HttpSession ses = cache.get(sesId);
+
+                    assertNotNull(ses);
+
+                    assertEquals("val1", ses.getAttribute("key1"));
+                }
+                else {
+                    final IgniteCache<String, WebSessionEntity> cache = G.ignite().cache(getCacheName());
+
+                    assertNotNull(cache);
+
+                    final WebSessionEntity entity = cache.get(sesId);
+
+                    assertNotNull(entity);
+
+                    final byte[] data = entity.attributes().get("key1");
+
+                    assertNotNull(data);
+
+                    final Marshaller marshaller = G.ignite().configuration().getMarshaller();
+
+                    final String val = marshaller.unmarshal(data, getClass().getClassLoader());
+
+                    assertEquals("val1", val);
+                }
+            }
+
+            URLConnection conn2 = new URL("http://localhost:" + TEST_JETTY_PORT + "/ignitetest/login").openConnection();
+
+            HttpURLConnection con = (HttpURLConnection) conn2;
+
+            con.addRequestProperty("Cookie", "JSESSIONID=" + sesIdCookie1);
+
+            con.setRequestMethod("POST");
+
+            con.setDoOutput(true);
+
+            String sesIdCookie2 = getSessionIdFromCookie(con);
+
+            X.println(">>>", "Logged In session Cookie: " + sesIdCookie2, ">>>");
+
+            try (BufferedReader rdr = new BufferedReader(new InputStreamReader(con.getInputStream()))) {
+                String sesId2 = rdr.readLine();
+
+                if (!keepBinary()) {
+                    IgniteCache<String, HttpSession> cache = G.ignite().cache(getCacheName());
+
+                    assertNotNull(cache);
+
+                    HttpSession ses = cache.get(sesId2);
+
+                    assertNotNull(ses);
+
+                    assertEquals("val1", ses.getAttribute("key1"));
+
+                }
+                else {
+                    final IgniteCache<String, WebSessionEntity> cache = G.ignite().cache(getCacheName());
+
+                    assertNotNull(cache);
+
+                    final WebSessionEntity entity = cache.get(sesId2);
+
+                    assertNotNull(entity);
+
+                    final byte[] data = entity.attributes().get("key1");
+
+                    assertNotNull(data);
+
+                    final Marshaller marshaller = G.ignite().configuration().getMarshaller();
+
+                    final String val = marshaller.unmarshal(data, getClass().getClassLoader());
+
+                    assertEquals("val1", val);
+
+                }
+
+            }
+
+            URLConnection conn3 = new URL("http://localhost:" + TEST_JETTY_PORT + "/ignitetest/simple").openConnection();
+
+            conn3.addRequestProperty("Cookie", "JSESSIONID=" + sesIdCookie2);
+
+            conn3.connect();
+
+            String sesIdCookie3 = getSessionIdFromCookie(conn3);
+
+            X.println(">>>", "Post Logged In session Cookie: " + sesIdCookie3, ">>>");
+
+            assertEquals(sesIdCookie2, sesIdCookie3);
+
+            try (BufferedReader rdr = new BufferedReader(new InputStreamReader(conn3.getInputStream()))) {
+                String sesId3 = rdr.readLine();
+
+                if (!keepBinary()) {
+                    IgniteCache<String, HttpSession> cache = G.ignite().cache(getCacheName());
+
+                    HttpSession session = cache.get(sesId3);
+
+                    assertNotNull(session);
+
+                    assertNotNull(cache);
+
+                    HttpSession ses = cache.get(sesId3);
+
+                    assertNotNull(ses);
+
+                    assertEquals("val1", ses.getAttribute("key1"));
+                }
+                else {
+                    final IgniteCache<String, WebSessionEntity> cache = G.ignite().cache(getCacheName());
+
+                    assertNotNull(cache);
+
+                    final WebSessionEntity entity = cache.get(sesId3);
+
+                    assertNotNull(entity);
+
+                    assertNotNull(cache.get(sesId3));
+
+                    final byte[] data = entity.attributes().get("key1");
+
+                    assertNotNull(data);
+
+                    final Marshaller marshaller = G.ignite().configuration().getMarshaller();
+
+                    final String val = marshaller.unmarshal(data, getClass().getClassLoader());
+
+                    assertEquals("val1", val);
+                }
+            }
+        }
+        finally {
+            stopServerWithLoginService(srv);
+        }
+    }
+
+    /**
      * Tests invalidated sessions.
      *
      * @throws Exception Exception If failed.
@@ -668,6 +836,35 @@ public class WebSessionSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Starts server with Login Service and create a realm file.
+     *
+     * @param port Port number.
+     * @param cfg Configuration.
+     * @param gridName Grid name.
+     * @param servlet Servlet.
+     * @return Server.
+     * @throws Exception In case of error.
+     */
+    private Server startServerWithLoginService(int port, @Nullable String cfg, @Nullable String gridName, HttpServlet servlet)
+            throws Exception {
+        Server srv = new Server(port);
+
+        WebAppContext ctx = getWebContext(cfg, gridName, keepBinary(), servlet);
+
+        HashLoginService hashLoginService = new HashLoginService();
+        hashLoginService.setName("Test Realm");
+        createRealm();
+        hashLoginService.setConfig("realm.properties");
+        ctx.getSecurityHandler().setLoginService(hashLoginService);
+
+        srv.setHandler(ctx);
+
+        srv.start();
+
+        return srv;
+    }
+
+    /**
      * Stops server.
      *
      * @param srv Server.
@@ -679,6 +876,60 @@ public class WebSessionSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Stops server and delete realm file.
+     *
+     * @param srv Server.
+     * @throws Exception In case of error.
+     */
+    private void stopServerWithLoginService(@Nullable Server srv) throws Exception{
+        if (srv != null){
+            srv.stop();
+            File realmFile = new File("realm.properties");
+            realmFile.delete();
+        }
+    }
+
+    /** Creates a realm file to store test user credentials */
+    private void createRealm() throws Exception{
+        File realmFile = new File("realm.properties");
+        FileWriter fileWriter = new FileWriter(realmFile);
+        fileWriter.append("admin:admin");
+        fileWriter.flush();
+        fileWriter.close();
+    }
+
+    /**
+     * Retrieves HttpSession sessionId from Cookie
+     *
+     * @param conn URLConnection
+     * @return sesId
+     */
+    private String getSessionIdFromCookie(URLConnection conn) {
+        String sessionCookieValue = null;
+        String sesId = null;
+        Map<String, List<String>> headerFields = conn.getHeaderFields();
+        Set<String> headerFieldsSet = headerFields.keySet();
+        Iterator<String> hearerFieldsIter = headerFieldsSet.iterator();
+
+        while (hearerFieldsIter.hasNext()) {
+            String headerFieldKey = hearerFieldsIter.next();
+
+            if ("Set-Cookie".equalsIgnoreCase(headerFieldKey)) {
+                List<String> headerFieldValue = headerFields.get(headerFieldKey);
+
+                for (String headerValue : headerFieldValue) {
+                    String[] fields = headerValue.split(";");
+                    sessionCookieValue = fields[0];
+                    sesId = sessionCookieValue.substring(sessionCookieValue.indexOf("=")+1,
+                            sessionCookieValue.length());
+                }
+            }
+        }
+
+        return sesId;
+    }
+
+    /**
      * Test servlet.
      */
     private static class SessionCreateServlet extends HttpServlet {
@@ -831,6 +1082,67 @@ public class WebSessionSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Test session behavior on id change.
+     */
+    private static class SessionLoginServlet extends HttpServlet {
+        /** {@inheritDoc} */
+        @Override protected void doGet(HttpServletRequest req, HttpServletResponse res)
+                throws ServletException, IOException {
+
+            if (req.getPathInfo().equals("/test")) {
+                HttpSession ses = req.getSession(true);
+                assertNotNull(ses);
+                ses.setAttribute("checkCnt", 0);
+                ses.setAttribute("key1", "val1");
+                ses.setAttribute("key2", "val2");
+                ses.setAttribute("mkey", new TestObj());
+
+                Profile p = (Profile) ses.getAttribute("profile");
+
+                if (p == null) {
+                    p = new Profile();
+                    ses.setAttribute("profile", p);
+                }
+
+                p.setMarker(req.getParameter("marker"));
+
+                X.println(">>>", "Request session test: " + ses.getId(), ">>>");
+
+                res.getWriter().write(ses.getId());
+
+                res.getWriter().flush();
+
+            } else if (req.getPathInfo().equals("/simple")) {
+                HttpSession session = req.getSession();
+                X.println(">>>", "Request session simple: " + session.getId(), ">>>");
+
+                res.getWriter().write(session.getId());
+
+                res.getWriter().flush();
+            }
+        }
+        /** {@inheritDoc} */
+        @Override protected void doPost(HttpServletRequest req, HttpServletResponse res)
+                throws ServletException, IOException {
+            if (req.getPathInfo().equals("/login")) {
+                try {
+                    req.login("admin", "admin");
+                } catch (Exception e) {
+                    X.printerrln("Login failed.");
+                }
+
+                HttpSession session = req.getSession();
+
+                X.println(">>>", "Logged In session: " + session.getId(), ">>>");
+
+                res.getWriter().write(session.getId());
+
+                res.getWriter().flush();
+            }
+        }
+    }
+
+    /**
      * Servlet for restarts test.
      */
     private static class RestartsTestServlet extends HttpServlet {
@@ -932,4 +1244,4 @@ public class WebSessionSelfTest extends GridCommonAbstractTest {
             return result;
         }
     }
-}
\ No newline at end of file
+}


[09/16] ignite git commit: IGNITE-3272 Fixed "Memory consumption in ContinuousQueryHandler". (cherry picked from commit 5f44672)

Posted by vk...@apache.org.
IGNITE-3272 Fixed "Memory consumption in ContinuousQueryHandler". (cherry picked from commit 5f44672)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e9a33f23
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e9a33f23
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e9a33f23

Branch: refs/heads/master
Commit: e9a33f2376733ec3c507963c6ff98e3ad0514281
Parents: fda4110
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Tue Jun 14 18:17:33 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Tue Jun 14 18:19:18 2016 +0300

----------------------------------------------------------------------
 .../continuous/CacheContinuousQueryEntry.java   |  16 +++
 .../continuous/CacheContinuousQueryHandler.java |   2 +-
 ...niteCacheContinuousQueryBackupQueueTest.java | 135 +++++++++++++++++++
 .../IgniteCacheQuerySelfTestSuite3.java         |   2 +
 4 files changed, 154 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e9a33f23/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index 63dc4cb..74f930a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -200,6 +200,22 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
     }
 
     /**
+     * @return If entry filtered then will return light-weight <i><b>new entry</b></i> without values and key
+     * (avoid to huge memory consumption), otherwise {@code this}.
+     */
+    CacheContinuousQueryEntry forBackupQueue() {
+        if (!isFiltered())
+            return this;
+
+        CacheContinuousQueryEntry e =
+            new CacheContinuousQueryEntry(cacheId, evtType, null, null, null, keepBinary, part, updateCntr, topVer);
+
+        e.flags = flags;
+
+        return e;
+    }
+
+    /**
      * @return {@code True} if entry sent by backup node.
      */
     boolean isBackup() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/e9a33f23/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 3b77d48..5012569 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -740,7 +740,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         if (!primary && !internal && entry.updateCounter() != -1L /* Skip init query and expire entries */) {
             entry.markBackup();
 
-            backupQueue.add(entry);
+            backupQueue.add(entry.forBackupQueue());
         }
 
         return notify;

http://git-wip-us.apache.org/repos/asf/ignite/blob/e9a33f23/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
new file mode 100644
index 0000000..aea1954
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheContinuousQueryBackupQueueTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** Keys count. */
+    private static final int KEYS_COUNT = 1024;
+
+    /** Grid count. */
+    private static final int GRID_COUNT = 2;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+        ccfg.setCacheMode(PARTITIONED);
+        ccfg.setAtomicityMode(ATOMIC);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setBackups(1);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return TimeUnit.MINUTES.toMillis(2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBackupQueue() throws Exception {
+        startGridsMultiThreaded(GRID_COUNT);
+
+        final CacheEventListener lsnr = new CacheEventListener();
+
+        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+        qry.setLocalListener(lsnr);
+        qry.setRemoteFilterFactory(new FilterFactory());
+
+        try (QueryCursor<?> ignore = grid(0).cache(null).query(qry)) {
+            for (int i = 0; i < KEYS_COUNT; i++) {
+                log.info("Put key: " + i);
+
+                for (int j = 0; j < 100; j++)
+                    grid(i % GRID_COUNT).cache(null).put(i, new byte[1024 * 50]);
+            }
+
+            log.info("Finish.");
+        }
+    }
+
+    /**
+     *
+     */
+    private static class FilterFactory implements Factory<CacheEntryEventFilter<Object, Object>> {
+        /** {@inheritDoc} */
+        @Override public CacheEntryEventFilter<Object, Object> create() {
+            return new CacheEventFilter();
+        }
+    }
+
+    /**
+     *
+     */
+    private static class CacheEventFilter implements CacheEntryEventFilter<Object, Object>, Serializable {
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) {
+            return false;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class CacheEventListener implements CacheEntryUpdatedListener<Object, Object> {
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+            fail();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e9a33f23/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
index ce02823..94b9dce 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheCon
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryTxOffheapTieredTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryTxOffheapValuesTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryTxSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryBackupQueueTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientReconnectTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTxReconnectTest;
@@ -113,6 +114,7 @@ public class IgniteCacheQuerySelfTestSuite3 extends TestSuite {
         suite.addTestSuite(CacheKeepBinaryIterationStoreEnabledTest.class);
         suite.addTestSuite(CacheKeepBinaryIterationSwapEnabledTest.class);
         suite.addTestSuite(CacheKeepBinaryIterationNearEnabledTest.class);
+        suite.addTestSuite(IgniteCacheContinuousQueryBackupQueueTest.class);
 
         return suite;
     }


[07/16] ignite git commit: ignite-gg-11181 - scanCount with offheap index fix

Posted by vk...@apache.org.
ignite-gg-11181 - scanCount with offheap index fix


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6094ee0d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6094ee0d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6094ee0d

Branch: refs/heads/master
Commit: 6094ee0d73f26e913dd5ff4c64d72a75e450479b
Parents: 4f8ba17
Author: Sergi Vladykin <se...@gmail.com>
Authored: Mon Jun 6 00:14:05 2016 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Tue Jun 14 13:25:04 2016 +0300

----------------------------------------------------------------------
 .../unsafe/GridOffheapSnapTreeSelfTest.java     |   2 +-
 .../query/h2/opt/GridH2AbstractKeyValueRow.java |  23 +--
 .../query/h2/opt/GridH2KeyValueRowOffheap.java  |  17 +-
 .../cache/IgniteCacheOffheapIndexScanTest.java  | 195 +++++++++++++++++++
 .../IgniteCacheQuerySelfTestSuite.java          |   2 +
 5 files changed, 226 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6094ee0d/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/unsafe/GridOffheapSnapTreeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/unsafe/GridOffheapSnapTreeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/unsafe/GridOffheapSnapTreeSelfTest.java
index 463b6dc..92d9ec2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/unsafe/GridOffheapSnapTreeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/unsafe/GridOffheapSnapTreeSelfTest.java
@@ -313,7 +313,7 @@ public class GridOffheapSnapTreeSelfTest extends GridCommonAbstractTest {
         }
 
         @Override public int hashCode() {
-            return ptr;
+            throw new IllegalStateException();
         }
 
         @Override public String toString() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/6094ee0d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
index ca5442a..fe6851d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
@@ -351,41 +351,37 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
 
     /** {@inheritDoc} */
     @Override public void setKeyAndVersion(SearchRow old) {
-        assert false;
+        throw new IllegalStateException();
     }
 
     /** {@inheritDoc} */
     @Override public void setKey(long key) {
-        assert false;
+        throw new IllegalStateException();
     }
 
     /** {@inheritDoc} */
     @Override public Row getCopy() {
-        assert false;
-
-        return null;
+        throw new IllegalStateException();
     }
 
     /** {@inheritDoc} */
     @Override public void setDeleted(boolean deleted) {
-        assert false;
+        throw new IllegalStateException();
     }
 
     /** {@inheritDoc} */
     @Override public long getKey() {
-        assert false;
-
-        return 0;
+        throw new IllegalStateException();
     }
 
     /** {@inheritDoc} */
     @Override public void setSessionId(int sesId) {
-        assert false;
+        throw new IllegalStateException();
     }
 
     /** {@inheritDoc} */
     @Override public void setVersion(int ver) {
-        assert false;
+        throw new IllegalStateException();
     }
 
     /**
@@ -469,4 +465,9 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
             throw new IllegalStateException();
         }
     }
+
+    /** {@inheritDoc} */
+    @Override public final int hashCode() {
+        throw new IllegalStateException();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/6094ee0d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
index 2dd9f25..ee68431 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
@@ -170,7 +170,7 @@ public class GridH2KeyValueRowOffheap extends GridH2AbstractKeyValueRow {
             }
         }
         else
-            assert false : col;
+            throw new IllegalStateException("Column: " + col);
 
         Data data = Data.create(null, bytes);
 
@@ -378,4 +378,19 @@ public class GridH2KeyValueRowOffheap extends GridH2AbstractKeyValueRow {
     @Override protected void addOffheapRowId(SB sb) {
         sb.a('-').a(ptr);
     }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+
+        if (obj instanceof GridH2KeyValueRowOffheap) {
+            GridH2KeyValueRowOffheap row = (GridH2KeyValueRowOffheap)obj;
+
+            if (pointer() == row.pointer())
+                return true;
+        }
+
+        return false;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/6094ee0d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapIndexScanTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapIndexScanTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapIndexScanTest.java
new file mode 100644
index 0000000..dbc8a65
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapIndexScanTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+
+/**
+ * Based scanCount with offheap index issue.
+ */
+public class IgniteCacheOffheapIndexScanTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static IgniteCache<Integer, Object> cache;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(disco);
+
+        CacheConfiguration<?,?> cacheCfg = new CacheConfiguration<>();
+
+        cacheCfg.setCacheMode(LOCAL);
+        cacheCfg.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED);
+        cacheCfg.setSqlOnheapRowCacheSize(256);
+        cacheCfg.setIndexedTypes(
+            Integer.class, Person.class
+        );
+
+        cfg.setCacheConfiguration(cacheCfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGridsMultiThreaded(1, false);
+
+        cache = grid(0).cache(null);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testQueryPlan() throws Exception {
+        for (int i = 0 ; i < 1000; i++)
+            cache.put(i, new Person(i, "firstName" + i, "lastName" + i, i % 100));
+
+        final AtomicBoolean end = new AtomicBoolean();
+
+        IgniteInternalFuture<?> fut = multithreadedAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                while(!end.get())
+                    cache.query(new SqlFieldsQuery("select _val from Person")).getAll();
+
+                return null;
+            }
+        }, 5);
+
+        for (int i = 0; i < 150; i++) {
+            String plan = (String)cache.query(new SqlFieldsQuery(
+                "explain analyze select count(*) from Person where salary = 50")).getAll().get(0).get(0);
+
+            assertTrue(plan, plan.contains("scanCount: 11 "));
+
+            Thread.sleep(100);
+        }
+
+        end.set(true);
+
+        fut.get();
+    }
+
+    /**
+     * Person record used for query test.
+     */
+    public static class Person implements Serializable {
+        /** Person ID. */
+        @QuerySqlField(index = true)
+        private int id;
+
+        /** Organization ID. */
+        @QuerySqlField(index = true)
+        private int orgId;
+
+        /** First name (not-indexed). */
+        @QuerySqlField
+        private String firstName;
+
+        /** Last name (not indexed). */
+        @QuerySqlField
+        private String lastName;
+
+        /** Salary. */
+        @QuerySqlField(index = true)
+        private double salary;
+
+        /**
+         * Constructs empty person.
+         */
+        public Person() {
+            // No-op.
+        }
+
+        /**
+         * Constructs person record that is not linked to any organization.
+         *
+         * @param id Person ID.
+         * @param firstName First name.
+         * @param lastName Last name.
+         * @param salary Salary.
+         */
+        public Person(int id, String firstName, String lastName, double salary) {
+            this(id, 0, firstName, lastName, salary);
+        }
+
+        /**
+         * Constructs person record.
+         *
+         * @param id Person ID.
+         * @param orgId Organization ID.
+         * @param firstName First name.
+         * @param lastName Last name.
+         * @param salary Salary.
+         */
+        public Person(int id, int orgId, String firstName, String lastName, double salary) {
+            this.id = id;
+            this.orgId = orgId;
+            this.firstName = firstName;
+            this.lastName = lastName;
+            this.salary = salary;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            return this == o || (o instanceof Person) && id == ((Person)o).id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "Person [firstName=" + firstName +
+                ", id=" + id +
+                ", orgId=" + orgId +
+                ", lastName=" + lastName +
+                ", salary=" + salary +
+                ']';
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/6094ee0d/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index ebad581..7b39453 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheCollocatedQuerySel
 import org.apache.ignite.internal.processors.cache.IgniteCacheDuplicateEntityConfigurationSelfTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheLargeResultSelfTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapEvictQueryTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapIndexScanTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapTieredMultithreadedSelfTest;
 import org.apache.ignite.internal.processors.cache.IgniteCachePartitionedQueryMultiThreadedSelfTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheQueryEvictsMultiThreadedSelfTest;
@@ -101,6 +102,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(IgniteCacheOffheapEvictQueryTest.class);
         suite.addTestSuite(IgniteCacheSqlQueryMultiThreadedSelfTest.class);
         suite.addTestSuite(IgniteCacheOffheapTieredMultithreadedSelfTest.class);
+        suite.addTestSuite(IgniteCacheOffheapIndexScanTest.class);
         suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest.class);
         suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest2.class);
         suite.addTestSuite(IgniteCacheQueryNodeFailTest.class);


[02/16] ignite git commit: ignite-114 Load value from store for cache 'invoke' (cherry picked from commit e10ffef)

Posted by vk...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheReadThroughStoreCallTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheReadThroughStoreCallTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheReadThroughStoreCallTest.java
new file mode 100644
index 0000000..bb092d4
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheReadThroughStoreCallTest.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentHashMap8;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionIsolation.values;
+
+/**
+ *
+ */
+public class IgniteCacheReadThroughStoreCallTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final Map<Object, Object> storeMap = new ConcurrentHashMap8<>();
+
+    /** */
+    protected boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        storeMap.clear();
+
+        super.afterTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultiNode() throws Exception {
+        startGridsMultiThreaded(4);
+
+        client = true;
+
+        startGrid(4);
+
+        checkLoadCount(cacheConfiguration(PARTITIONED, ATOMIC, 0));
+
+        checkLoadCount(cacheConfiguration(PARTITIONED, ATOMIC, 1));
+
+        checkLoadCount(cacheConfiguration(PARTITIONED, ATOMIC, 2));
+
+        checkLoadCount(cacheConfiguration(PARTITIONED, TRANSACTIONAL, 0));
+
+        checkLoadCount(cacheConfiguration(PARTITIONED, TRANSACTIONAL, 1));
+
+        checkLoadCount(cacheConfiguration(PARTITIONED, TRANSACTIONAL, 2));
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @throws Exception If failed.
+     */
+    private void checkLoadCount(CacheConfiguration<Object, Object> ccfg) throws Exception {
+        storeMap.clear();
+
+        Ignite ignite0 = ignite(0);
+
+        ignite0.createCache(ccfg);
+
+        try {
+            int key = 0;
+
+            for (Ignite node : G.allGrids()) {
+                log.info("Test for node: " + node.name());
+
+                final IgniteCache<Object, Object> cache = node.cache(ccfg.getName());
+
+                for (int i = 0; i < 50; i++) {
+                    final int k = key++;
+
+                    checkReadThrough(cache, new IgniteRunnable() {
+                        @Override public void run() {
+                            cache.invoke(k, new TestEntryProcessor());
+                        }
+                    }, null, null, 1);
+                }
+
+                for (int i = 0; i < 50; i++) {
+                    final int k = key++;
+
+                    checkReadThrough(cache, new IgniteRunnable() {
+                        @Override public void run() {
+                            cache.put(k, k);
+                        }
+                    }, null, null, 0);
+                }
+
+                if (ccfg.getAtomicityMode() == TRANSACTIONAL) {
+                    for (TransactionConcurrency concurrency : TransactionConcurrency.values()) {
+                        for (TransactionIsolation isolation : values()) {
+                            log.info("Test tx [concurrency=" + concurrency + ", isolation=" + isolation + ']');
+
+                            for (int i = 0; i < 50; i++) {
+                                final int k = key++;
+
+                                checkReadThrough(cache, new IgniteRunnable() {
+                                    @Override public void run() {
+                                        cache.invoke(k, new TestEntryProcessor());
+                                    }
+                                }, concurrency, isolation, 2);
+                            }
+                        }
+                    }
+                }
+            }
+
+            ignite0.cache(ccfg.getName()).removeAll();
+        }
+        finally {
+            ignite0.destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     * @param c Cache operation Closure.
+     * @param concurrency Transaction concurrency.
+     * @param isolation Transaction isolation.
+     * @param expLoadCnt Expected number of store 'load' calls.
+     * @throws Exception If failed.
+     */
+    private void checkReadThrough(IgniteCache<Object, Object> cache,
+        IgniteRunnable c,
+        @Nullable TransactionConcurrency concurrency,
+        @Nullable TransactionIsolation isolation,
+        int expLoadCnt) throws Exception {
+        TestStore.loadCnt.set(0);
+
+        Transaction tx = isolation != null ? cache.unwrap(Ignite.class).transactions().txStart(concurrency, isolation)
+            : null;
+
+        try {
+            c.run();
+
+            if (tx != null)
+                tx.commit();
+        }
+        finally {
+            if (tx != null)
+                tx.close();
+        }
+
+        assertEquals(expLoadCnt, TestStore.loadCnt.get());
+    }
+
+    /**
+     * @param cacheMode Cache mode.
+     * @param atomicityMode Atomicity mode.
+     * @param backups Number of backups.
+     * @return Cache configuration.
+     */
+    @SuppressWarnings("unchecked")
+    protected CacheConfiguration<Object, Object> cacheConfiguration(CacheMode cacheMode,
+        CacheAtomicityMode atomicityMode,
+        int backups) {
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setReadThrough(true);
+        ccfg.setWriteThrough(true);
+        ccfg.setCacheStoreFactory(new TestStoreFactory());
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setCacheMode(cacheMode);
+        ccfg.setAffinity(new RendezvousAffinityFunction(false, 32));
+        ccfg.setAtomicWriteOrderMode(PRIMARY);
+
+        if (cacheMode == PARTITIONED)
+            ccfg.setBackups(backups);
+
+        return ccfg;
+    }
+
+    /**
+     *
+     */
+    public static class TestStoreFactory implements Factory<CacheStore> {
+        /** {@inheritDoc} */
+        @Override public CacheStore create() {
+            return new TestStore();
+        }
+    }
+
+    /**
+     *
+     */
+    public static class TestStore extends CacheStoreAdapter<Object, Object> {
+        /** */
+        static AtomicInteger loadCnt = new AtomicInteger();
+
+        /** {@inheritDoc} */
+        @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, Object... args) {
+            fail();
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object load(Object key) {
+            loadCnt.incrementAndGet();
+
+            return storeMap.get(key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(Cache.Entry<?, ?> entry) {
+            storeMap.put(entry.getKey(), entry.getValue());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void delete(Object key) {
+            storeMap.remove(key);
+        }
+    }
+
+    /**
+     *
+     */
+    static class TestEntryProcessor implements EntryProcessor<Object, Object, Object> {
+        /** {@inheritDoc} */
+        @Override public Object process(MutableEntry<Object, Object> entry, Object... args) {
+            Object val = entry.getValue();
+
+            entry.setValue(entry.getKey());
+
+            return val;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java
index 28a954b..dd6b268 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java
@@ -147,6 +147,16 @@ public abstract class IgniteCacheLoaderWriterAbstractTest extends IgniteCacheAbs
         assertFalse(storeMap.containsKey(key));
 
         assertNull(cache.get(key));
+
+        ldrCallCnt.set(0);
+
+        cache.invoke(key, new EntryProcessor<Object, Object, Object>() {
+            @Override public Object process(MutableEntry<Object, Object> e, Object... args) {
+                return null;
+            }
+        });
+
+        checkCalls(1, 0);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/e90c9c88/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index 094558d..018fa17 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -65,8 +65,10 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheConfigurationDefau
 import org.apache.ignite.internal.processors.cache.IgniteCacheConfigurationTemplateTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheDynamicStopSelfTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheGetCustomCollectionsSelfTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheInvokeReadThroughSingleNodeTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheInvokeReadThroughTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheLoadRebalanceEvictionSelfTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheReadThroughStoreCallTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheTxCopyOnReadDisabledTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheTxLocalPeekModesTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheTxLocalStoreValueTest;
@@ -206,7 +208,9 @@ public class IgniteCacheTestSuite4 extends TestSuite {
         suite.addTestSuite(IgniteCacheTxLocalPeekModesTest.class);
         suite.addTestSuite(IgniteCacheTxReplicatedPeekModesTest.class);
 
+        suite.addTestSuite(IgniteCacheInvokeReadThroughSingleNodeTest.class);
         suite.addTestSuite(IgniteCacheInvokeReadThroughTest.class);
+        suite.addTestSuite(IgniteCacheReadThroughStoreCallTest.class);
         suite.addTestSuite(GridCacheVersionMultinodeTest.class);
 
         suite.addTestSuite(IgniteCacheNearReadCommittedTest.class);


[04/16] ignite git commit: ignite-3038 Do not use custom discovery events to start continuous queries for system caches (cherry picked from commit 7f878c5)

Posted by vk...@apache.org.
ignite-3038 Do not use custom discovery events to start continuous queries for system caches
(cherry picked from commit 7f878c5)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e5fcebb0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e5fcebb0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e5fcebb0

Branch: refs/heads/master
Commit: e5fcebb095fc189487d1d4f267fbf2164aef7329
Parents: e90c9c8
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jun 14 10:17:50 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jun 14 11:15:13 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/MarshallerContextImpl.java  |  51 +++-
 .../processors/cache/GridCacheAdapter.java      |  12 +-
 .../binary/CacheObjectBinaryProcessorImpl.java  |  29 +-
 .../continuous/CacheContinuousQueryManager.java | 135 ++++++++
 .../cacheobject/IgniteCacheObjectProcessor.java |   7 +
 .../IgniteCacheObjectProcessorImpl.java         |   5 +
 .../continuous/GridContinuousProcessor.java     | 216 +++++++++----
 .../service/GridServiceProcessor.java           | 305 ++++++++++---------
 ...eClientReconnectContinuousProcessorTest.java |  60 +++-
 ...ridCacheContinuousQueryAbstractSelfTest.java |   2 +-
 .../IgniteNoCustomEventsOnNodeStart.java        |  86 ++++++
 .../service/GridServiceClientNodeTest.java      | 102 ++++++-
 .../testsuites/IgniteCacheTestSuite2.java       |   3 +
 13 files changed, 775 insertions(+), 238 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e5fcebb0/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
index 8f566a9..b4c9607 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedException;
 import org.apache.ignite.internal.util.GridStripedLock;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.PluginProvider;
 
@@ -64,6 +65,9 @@ public class MarshallerContextImpl extends MarshallerContextAdapter {
     /** Non-volatile on purpose. */
     private int failedCnt;
 
+    /** */
+    private ContinuousQueryListener lsnr;
+
     /**
      * @param plugins Plugins.
      * @throws IgniteCheckedException In case of error.
@@ -75,25 +79,58 @@ public class MarshallerContextImpl extends MarshallerContextAdapter {
     }
 
     /**
+     * @param ctx Context.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void onContinuousProcessorStarted(GridKernalContext ctx) throws IgniteCheckedException {
+        if (ctx.clientNode()) {
+            lsnr = new ContinuousQueryListener(ctx.log(MarshallerContextImpl.class), workDir);
+
+            ctx.continuous().registerStaticRoutine(
+                CU.MARSH_CACHE_NAME,
+                lsnr,
+                null,
+                null);
+        }
+    }
+
+    /**
      * @param ctx Kernal context.
      * @throws IgniteCheckedException In case of error.
      */
     public void onMarshallerCacheStarted(GridKernalContext ctx) throws IgniteCheckedException {
         assert ctx != null;
 
-        if (!ctx.isDaemon()) {
+        log = ctx.log(MarshallerContextImpl.class);
+
+        cache = ctx.cache().marshallerCache();
+
+        if (ctx.cache().marshallerCache().context().affinityNode()) {
             ctx.cache().marshallerCache().context().continuousQueries().executeInternalQuery(
-                new ContinuousQueryListener(ctx.log(MarshallerContextImpl.class), workDir),
+                new ContinuousQueryListener(log, workDir),
                 null,
-                ctx.cache().marshallerCache().context().affinityNode(),
+                true,
                 true,
                 false
             );
         }
-
-        log = ctx.log(MarshallerContextImpl.class);
-
-        cache = ctx.cache().marshallerCache();
+        else {
+            if (lsnr != null) {
+                ctx.closure().runLocalSafe(new Runnable() {
+                    @SuppressWarnings("unchecked")
+                    @Override public void run() {
+                        try {
+                            Iterable entries = cache.context().continuousQueries().existingEntries(false, null);
+
+                            lsnr.onUpdated(entries);
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.error(log, "Failed to load marshaller cache entries: " + e, e);
+                        }
+                    }
+                });
+            }
+        }
 
         latch.countDown();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e5fcebb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 2212926..3dbd0f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -4014,8 +4014,18 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
     /**
      * @return Distributed ignite cache iterator.
+     * @throws IgniteCheckedException If failed.
      */
     public Iterator<Cache.Entry<K, V>> igniteIterator() throws IgniteCheckedException {
+        return igniteIterator(ctx.keepBinary());
+    }
+
+    /**
+     * @param keepBinary
+     * @return Distributed ignite cache iterator.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Iterator<Cache.Entry<K, V>> igniteIterator(boolean keepBinary) throws IgniteCheckedException {
         GridCacheContext ctx0 = ctx.isNear() ? ctx.near().dht().context() : ctx;
 
         final CacheOperationContext opCtx = ctx.operationContextPerCall();
@@ -4023,7 +4033,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         if (!ctx0.isSwapOrOffheapEnabled() && ctx0.kernalContext().discovery().size() == 1)
             return localIteratorHonorExpirePolicy(opCtx);
 
-        final GridCloseableIterator<Map.Entry<K, V>> iter = ctx0.queries().createScanQuery(null, null, ctx.keepBinary())
+        final GridCloseableIterator<Map.Entry<K, V>> iter = ctx0.queries().createScanQuery(null, null, keepBinary)
             .keepAll(false)
             .executeScanQuery();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e5fcebb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index 18751c1..139a8a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -150,9 +150,6 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
     /** Metadata updates collected before metadata cache is initialized. */
     private final Map<Integer, BinaryMetadata> metaBuf = new ConcurrentHashMap<>();
 
-    /** */
-    private UUID metaCacheQryId;
-
     /**
      * @param ctx Kernal context.
      */
@@ -256,6 +253,17 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
     }
 
     /** {@inheritDoc} */
+    @Override public void onContinuousProcessorStarted(GridKernalContext ctx) throws IgniteCheckedException {
+        if (clientNode && !ctx.isDaemon()) {
+            ctx.continuous().registerStaticRoutine(
+                CU.UTILITY_CACHE_NAME,
+                new MetaDataEntryListener(),
+                new MetaDataEntryFilter(),
+                null);
+        }
+    }
+
+    /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public void onUtilityCacheStarted() throws IgniteCheckedException {
         IgniteCacheProxy<Object, Object> proxy = ctx.cache().jcache(CU.UTILITY_CACHE_NAME);
@@ -272,13 +280,6 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
         if (clientNode) {
             assert !metaDataCache.context().affinityNode();
 
-            metaCacheQryId = metaDataCache.context().continuousQueries().executeInternalQuery(
-                new MetaDataEntryListener(),
-                new MetaDataEntryFilter(),
-                false,
-                true,
-                false);
-
             while (true) {
                 ClusterNode oldestSrvNode =
                     CU.oldestAliveCacheServerNode(ctx.cache().context(), AffinityTopologyVersion.NONE);
@@ -359,14 +360,6 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
         }
     }
 
-    /** {@inheritDoc} */
-    @Override public void onKernalStop(boolean cancel) {
-        super.onKernalStop(cancel);
-
-        if (metaCacheQryId != null)
-            metaDataCache.context().continuousQueries().cancelInternalQuery(metaCacheQryId);
-    }
-
     /**
      * @param key Metadata key.
      * @param newMeta Metadata.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e5fcebb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index b042249..c966527 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -31,6 +31,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import javax.cache.Cache;
 import javax.cache.configuration.CacheEntryListenerConfiguration;
 import javax.cache.configuration.Factory;
 import javax.cache.event.CacheEntryCreatedListener;
@@ -46,6 +47,7 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
 import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridKernalContext;
@@ -57,8 +59,10 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
 import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteAsyncCallback;
 import org.apache.ignite.lang.IgniteClosure;
@@ -728,6 +732,70 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     }
 
     /**
+     * @param keepBinary Keep binary flag.
+     * @param filter Filter.
+     * @return Iterable for events created for existing cache entries.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Iterable<CacheEntryEvent<?, ?>> existingEntries(final boolean keepBinary, final CacheEntryEventFilter filter)
+        throws IgniteCheckedException {
+        final Iterator<Cache.Entry<?, ?>> it = cctx.cache().igniteIterator(keepBinary);
+
+        final Cache cache = cctx.kernalContext().cache().jcache(cctx.name());
+
+        return new Iterable<CacheEntryEvent<?, ?>>() {
+            @Override public Iterator<CacheEntryEvent<?, ?>> iterator() {
+                return new Iterator<CacheEntryEvent<?, ?>>() {
+                    private CacheQueryEntryEvent<?, ?> next;
+
+                    {
+                        advance();
+                    }
+
+                    @Override public boolean hasNext() {
+                        return next != null;
+                    }
+
+                    @Override public CacheEntryEvent<?, ?> next() {
+                        if (!hasNext())
+                            throw new NoSuchElementException();
+
+                        CacheEntryEvent next0 = next;
+
+                        advance();
+
+                        return next0;
+                    }
+
+                    @Override public void remove() {
+                        throw new UnsupportedOperationException();
+                    }
+
+                    private void advance() {
+                        next = null;
+
+                        while (next == null) {
+                            if (!it.hasNext())
+                                break;
+
+                            Cache.Entry e = it.next();
+
+                            next = new CacheEntryEventImpl(
+                                cache,
+                                CREATED,
+                                e.getKey(),
+                                e.getValue());
+
+                            if (filter != null && !filter.evaluate(next))
+                                next = null;
+                        }
+                    }
+                };
+            }
+        };
+    }
+
+    /**
      * @param nodes Nodes.
      * @return {@code True} if all nodes greater than {@link GridContinuousProcessor#QUERY_MSG_VER_2_SINCE},
      *     otherwise {@code false}.
@@ -1129,4 +1197,71 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                 lsnr.acknowledgeBackupOnTimeout(ctx);
         }
     }
+
+    /**
+     *
+     */
+    private static class CacheEntryEventImpl extends CacheQueryEntryEvent {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        @GridToStringInclude
+        private Object key;
+
+        /** */
+        @GridToStringInclude
+        private Object val;
+
+        /**
+         * @param src Event source.
+         * @param evtType Event type.
+         * @param key Key.
+         * @param val Value.
+         */
+        public CacheEntryEventImpl(Cache src, EventType evtType, Object key, Object val) {
+            super(src, evtType);
+
+            this.key = key;
+            this.val = val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long getPartitionUpdateCounter() {
+            return 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object getOldValue() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isOldValueAvailable() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object getKey() {
+            return key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object getValue() {
+            return val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object unwrap(Class cls) {
+            if (cls.isAssignableFrom(getClass()))
+                return cls.cast(this);
+
+            throw new IllegalArgumentException("Unwrapping to class is not supported: " + cls);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(CacheEntryEventImpl.class, this);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e5fcebb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
index 686f308..b8ac301 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
@@ -22,6 +22,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.GridComponent;
+import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.GridProcessor;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
@@ -34,6 +35,12 @@ import org.jetbrains.annotations.Nullable;
  */
 public interface IgniteCacheObjectProcessor extends GridProcessor {
     /**
+     * @param ctx Context.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void onContinuousProcessorStarted(GridKernalContext ctx) throws IgniteCheckedException;
+
+    /**
      * @see GridComponent#onKernalStart()
      * @throws IgniteCheckedException If failed.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/e5fcebb0/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
index 9b47e59..3203548 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
@@ -236,6 +236,11 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
     }
 
     /** {@inheritDoc} */
+    @Override public void onContinuousProcessorStarted(GridKernalContext ctx) throws IgniteCheckedException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
     @Override public void onUtilityCacheStarted() throws IgniteCheckedException {
         // No-op.
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e5fcebb0/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 0db9b27..fce48c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -30,12 +30,15 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import javax.cache.event.CacheEntryUpdatedListener;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
@@ -82,6 +85,7 @@ import org.jsr166.ConcurrentHashMap8;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
+import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
 import static org.apache.ignite.internal.GridTopic.TOPIC_CONTINUOUS;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
 import static org.apache.ignite.internal.processors.continuous.GridContinuousMessageType.MSG_EVT_ACK;
@@ -136,6 +140,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     /** */
     private boolean processorStopped;
 
+    /** Query sequence number for message topic. */
+    private final AtomicLong seq = new AtomicLong();
+
     /**
      * @param ctx Kernal context.
      */
@@ -254,13 +261,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                         UUID routineId = msg.routineId();
 
                         unregisterRemote(routineId);
+                    }
 
-                        if (snd.isClient()) {
-                            Map<UUID, LocalRoutineInfo> clientRoutineMap = clientInfos.get(snd.id());
-
-                            if (clientRoutineMap != null)
-                                clientRoutineMap.remove(msg.routineId());
-                        }
+                    for (Map<UUID, LocalRoutineInfo> clientInfo : clientInfos.values()) {
+                        if (clientInfo.remove(msg.routineId()) != null)
+                            break;
                     }
                 }
             });
@@ -309,6 +314,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             }
         });
 
+        ctx.marshallerContext().onContinuousProcessorStarted(ctx);
+
+        ctx.cacheObjects().onContinuousProcessorStarted(ctx);
+
+        ctx.service().onContinuousProcessorStarted(ctx);
+
         if (log.isDebugEnabled())
             log.debug("Continuous processor started.");
     }
@@ -392,16 +403,33 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Override @Nullable public Serializable collectDiscoveryData(UUID nodeId) {
+        if (log.isDebugEnabled()) {
+            log.debug("collectDiscoveryData [node=" + nodeId +
+                ", loc=" + ctx.localNodeId() +
+                ", locInfos=" + locInfos +
+                ", clientInfos=" + clientInfos +
+                ']');
+        }
+
         if (!nodeId.equals(ctx.localNodeId()) || !locInfos.isEmpty()) {
             Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos0 = U.newHashMap(clientInfos.size());
 
             for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> e : clientInfos.entrySet()) {
-                Map<UUID, LocalRoutineInfo> copy = U.newHashMap(e.getValue().size());
+                Map<UUID, LocalRoutineInfo> cp = U.newHashMap(e.getValue().size());
 
                 for (Map.Entry<UUID, LocalRoutineInfo> e0 : e.getValue().entrySet())
-                    copy.put(e0.getKey(), e0.getValue());
+                    cp.put(e0.getKey(), e0.getValue());
+
+                clientInfos0.put(e.getKey(), cp);
+            }
+
+            if (nodeId.equals(ctx.localNodeId()) && ctx.discovery().localNode().isClient()) {
+                Map<UUID, LocalRoutineInfo> infos = new HashMap<>();
 
-                clientInfos0.put(e.getKey(), copy);
+                for (Map.Entry<UUID, LocalRoutineInfo> e : locInfos.entrySet())
+                    infos.put(e.getKey(), e.getValue());
+
+                clientInfos0.put(ctx.localNodeId(), infos);
             }
 
             DiscoveryData data = new DiscoveryData(ctx.localNodeId(), clientInfos0);
@@ -429,6 +457,14 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable obj) {
         DiscoveryData data = (DiscoveryData)obj;
 
+        if (log.isDebugEnabled()) {
+            log.info("onDiscoveryDataReceived [joining=" + joiningNodeId +
+                ", rmtNodeId=" + rmtNodeId +
+                ", loc=" + ctx.localNodeId() +
+                ", data=" + data +
+                ']');
+        }
+
         if (!ctx.isDaemon() && data != null) {
             for (DiscoveryDataItem item : data.items) {
                 try {
@@ -456,29 +492,31 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> entry : data.clientInfos.entrySet()) {
                 UUID clientNodeId = entry.getKey();
 
-                Map<UUID, LocalRoutineInfo> clientRoutineMap = entry.getValue();
+                if (!ctx.localNodeId().equals(clientNodeId)) {
+                    Map<UUID, LocalRoutineInfo> clientRoutineMap = entry.getValue();
 
-                for (Map.Entry<UUID, LocalRoutineInfo> e : clientRoutineMap.entrySet()) {
-                    UUID routineId = e.getKey();
-                    LocalRoutineInfo info = e.getValue();
+                    for (Map.Entry<UUID, LocalRoutineInfo> e : clientRoutineMap.entrySet()) {
+                        UUID routineId = e.getKey();
+                        LocalRoutineInfo info = e.getValue();
 
-                    try {
-                        if (info.prjPred != null)
-                            ctx.resource().injectGeneric(info.prjPred);
-
-                        if (info.prjPred == null || info.prjPred.apply(ctx.discovery().localNode())) {
-                            if (registerHandler(clientNodeId,
-                                routineId,
-                                info.hnd,
-                                info.bufSize,
-                                info.interval,
-                                info.autoUnsubscribe,
-                                false))
-                                info.hnd.onListenerRegistered(routineId, ctx);
+                        try {
+                            if (info.prjPred != null)
+                                ctx.resource().injectGeneric(info.prjPred);
+
+                            if (info.prjPred == null || info.prjPred.apply(ctx.discovery().localNode())) {
+                                if (registerHandler(clientNodeId,
+                                    routineId,
+                                    info.hnd,
+                                    info.bufSize,
+                                    info.interval,
+                                    info.autoUnsubscribe,
+                                    false))
+                                    info.hnd.onListenerRegistered(routineId, ctx);
+                            }
+                        }
+                        catch (IgniteCheckedException err) {
+                            U.error(log, "Failed to register continuous handler.", err);
                         }
-                    }
-                    catch (IgniteCheckedException err) {
-                        U.error(log, "Failed to register continuous handler.", err);
                     }
                 }
 
@@ -536,6 +574,47 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Registers routine info to be sent in discovery data during this node join
+     * (to be used for internal queries started from client nodes).
+     *
+     * @param cacheName Cache name.
+     * @param locLsnr Local listener.
+     * @param rmtFilter Remote filter.
+     * @param prjPred Projection predicate.
+     * @return Routine ID.
+     * @throws IgniteCheckedException If failed.
+     */
+    public UUID registerStaticRoutine(
+        String cacheName,
+        CacheEntryUpdatedListener<?, ?> locLsnr,
+        CacheEntryEventSerializableFilter rmtFilter,
+        @Nullable IgnitePredicate<ClusterNode> prjPred) throws IgniteCheckedException {
+        String topicPrefix = "CONTINUOUS_QUERY_STATIC" + "_" + cacheName;
+
+        CacheContinuousQueryHandler hnd = new CacheContinuousQueryHandler(
+            cacheName,
+            TOPIC_CACHE.topic(topicPrefix, ctx.localNodeId(), seq.incrementAndGet()),
+            locLsnr,
+            rmtFilter,
+            true,
+            false,
+            true,
+            false);
+
+        hnd.internal(true);
+
+        final UUID routineId = UUID.randomUUID();
+
+        LocalRoutineInfo routineInfo = new LocalRoutineInfo(prjPred, hnd, 1, 0, true);
+
+        locInfos.put(routineId, routineInfo);
+
+        registerMessageListener(hnd);
+
+        return routineId;
+    }
+
+    /**
      * @param hnd Handler.
      * @param bufSize Buffer size.
      * @param interval Time interval.
@@ -609,29 +688,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         }
 
         // Register per-routine notifications listener if ordered messaging is used.
-        if (hnd.orderedTopic() != null) {
-            ctx.io().addMessageListener(hnd.orderedTopic(), new GridMessageListener() {
-                @Override public void onMessage(UUID nodeId, Object obj) {
-                    GridContinuousMessage msg = (GridContinuousMessage)obj;
-
-                    // Only notification can be ordered.
-                    assert msg.type() == MSG_EVT_NOTIFICATION;
-
-                    if (msg.data() == null && msg.dataBytes() != null) {
-                        try {
-                            msg.data(marsh.unmarshal(msg.dataBytes(), U.resolveClassLoader(ctx.config())));
-                        }
-                        catch (IgniteCheckedException e) {
-                            U.error(log, "Failed to process message (ignoring): " + msg, e);
-
-                            return;
-                        }
-                    }
-
-                    processNotification(nodeId, msg);
-                }
-            });
-        }
+        registerMessageListener(hnd);
 
         StartFuture fut = new StartFuture(ctx, routineId);
 
@@ -663,6 +720,35 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param hnd Handler.
+     */
+    private void registerMessageListener(GridContinuousHandler hnd) {
+        if (hnd.orderedTopic() != null) {
+            ctx.io().addMessageListener(hnd.orderedTopic(), new GridMessageListener() {
+                @Override public void onMessage(UUID nodeId, Object obj) {
+                    GridContinuousMessage msg = (GridContinuousMessage)obj;
+
+                    // Only notification can be ordered.
+                    assert msg.type() == MSG_EVT_NOTIFICATION;
+
+                    if (msg.data() == null && msg.dataBytes() != null) {
+                        try {
+                            msg.data(marsh.unmarshal(msg.dataBytes(), U.resolveClassLoader(ctx.config())));
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.error(log, "Failed to process message (ignoring): " + msg, e);
+
+                            return;
+                        }
+                    }
+
+                    processNotification(nodeId, msg);
+                }
+            });
+        }
+    }
+
+    /**
      * @param routineId Consume ID.
      * @return Future.
      */
@@ -809,12 +895,28 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
         cancelFutures(new IgniteClientDisconnectedCheckedException(reconnectFut, "Client node disconnected."));
 
-        for (UUID rmtId : rmtInfos.keySet())
-            unregisterRemote(rmtId);
+        if (log.isDebugEnabled()) {
+            log.debug("onDisconnected [rmtInfos=" + rmtInfos +
+                ", locInfos=" + locInfos +
+                ", clientInfos=" + clientInfos + ']');
+        }
+
+        for (Map.Entry<UUID, RemoteRoutineInfo> e : rmtInfos.entrySet()) {
+            RemoteRoutineInfo info = e.getValue();
+
+            if (!ctx.localNodeId().equals(info.nodeId) || info.autoUnsubscribe)
+                unregisterRemote(e.getKey());
+        }
 
         rmtInfos.clear();
 
         clientInfos.clear();
+
+        if (log.isDebugEnabled()) {
+            log.debug("after onDisconnected [rmtInfos=" + rmtInfos +
+                ", locInfos=" + locInfos +
+                ", clientInfos=" + clientInfos + ']');
+        }
     }
 
     /**
@@ -1040,6 +1142,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         }
 
         if (doRegister) {
+            if (log.isDebugEnabled())
+                log.debug("Register handler: [nodeId=" + nodeId + ", routineId=" + routineId + ", info=" + info + ']');
+
             if (interval > 0) {
                 IgniteThread checker = new IgniteThread(new GridWorker(ctx.gridName(), "continuous-buffer-checker", log) {
                     @SuppressWarnings("ConstantConditions")
@@ -1154,6 +1259,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             stopLock.unlock();
         }
 
+        if (log.isDebugEnabled())
+            log.debug("unregisterRemote [routineId=" + routineId + ", loc=" + loc + ", rmt=" + remote + ']');
+
         if (remote != null)
             unregisterHandler(routineId, remote.hnd, false);
         else if (loc != null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/e5fcebb0/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index b507e5f..0bc1141 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -35,7 +35,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.cache.Cache;
 import javax.cache.event.CacheEntryEvent;
-import javax.cache.event.CacheEntryListenerException;
 import javax.cache.event.CacheEntryUpdatedListener;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -156,12 +155,6 @@ public class GridServiceProcessor extends GridProcessorAdapter {
     /** Topology listener. */
     private GridLocalEventListener topLsnr = new TopologyListener();
 
-    /** Deployment listener ID. */
-    private UUID cfgQryId;
-
-    /** Assignment listener ID. */
-    private UUID assignQryId;
-
     /**
      * @param ctx Kernal context.
      */
@@ -178,6 +171,22 @@ public class GridServiceProcessor extends GridProcessorAdapter {
             new ServicesCompatibilityState(srvcCompatibilitySysProp != null ? srvcCompatibilitySysProp : false, false));
     }
 
+    /**
+     * @param ctx Context.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void onContinuousProcessorStarted(GridKernalContext ctx) throws IgniteCheckedException {
+        if (ctx.clientNode()) {
+            assert !ctx.isDaemon();
+
+            ctx.continuous().registerStaticRoutine(
+                CU.UTILITY_CACHE_NAME,
+                new ServiceEntriesListener(),
+                null,
+                null);
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public void start() throws IgniteCheckedException {
         ctx.addNodeAttribute(ATTR_SERVICES_COMPATIBILITY_MODE, srvcCompatibilitySysProp);
@@ -209,13 +218,32 @@ public class GridServiceProcessor extends GridProcessorAdapter {
             if (ctx.deploy().enabled())
                 ctx.cache().context().deploy().ignoreOwnership(true);
 
-            boolean affNode = cache.context().affinityNode();
+            if (!ctx.clientNode()) {
+                assert cache.context().affinityNode();
+
+                cache.context().continuousQueries().executeInternalQuery(new ServiceEntriesListener(),
+                    null,
+                    true,
+                    true,
+                    false);
+            }
+            else {
+                assert !ctx.isDaemon();
 
-            cfgQryId = cache.context().continuousQueries().executeInternalQuery(
-                new DeploymentListener(), null, affNode, true, !affNode);
+                ctx.closure().runLocalSafe(new Runnable() {
+                    @Override public void run() {
+                        try {
+                            Iterable<CacheEntryEvent<?, ?>> entries =
+                                cache.context().continuousQueries().existingEntries(false, null);
 
-            assignQryId = cache.context().continuousQueries().executeInternalQuery(
-                new AssignmentListener(), null, affNode, true, !affNode);
+                            onSystemCacheUpdated(entries);
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.error(log, "Failed to load service entries: " + e, e);
+                        }
+                    }
+                });
+            }
         }
         finally {
             if (ctx.deploy().enabled())
@@ -249,12 +277,6 @@ public class GridServiceProcessor extends GridProcessorAdapter {
         if (!ctx.clientNode())
             ctx.event().removeLocalEventListener(topLsnr);
 
-        if (cfgQryId != null)
-            cache.context().continuousQueries().cancelInternalQuery(cfgQryId);
-
-        if (assignQryId != null)
-            cache.context().continuousQueries().cancelInternalQuery(assignQryId);
-
         Collection<ServiceContextImpl> ctxs = new ArrayList<>();
 
         synchronized (locSvcs) {
@@ -1305,92 +1327,116 @@ public class GridServiceProcessor extends GridProcessorAdapter {
     /**
      * Service deployment listener.
      */
-    private class DeploymentListener implements CacheEntryUpdatedListener<Object, Object> {
+    @SuppressWarnings("unchecked")
+    private class ServiceEntriesListener implements CacheEntryUpdatedListener<Object, Object> {
         /** {@inheritDoc} */
         @Override public void onUpdated(final Iterable<CacheEntryEvent<?, ?>> deps) {
             depExe.submit(new BusyRunnable() {
                 @Override public void run0() {
-                    boolean firstTime = true;
+                    onSystemCacheUpdated(deps);
+                }
+            });
+        }
+    }
 
-                    for (CacheEntryEvent<?, ?> e : deps) {
-                        if (!(e.getKey() instanceof GridServiceDeploymentKey))
-                            continue;
+    /**
+     * @param evts Update events.
+     */
+    private void onSystemCacheUpdated(final Iterable<CacheEntryEvent<?, ?>> evts) {
+        boolean firstTime = true;
 
-                        if (firstTime) {
-                            markCompatibilityStateAsUsed();
+        for (CacheEntryEvent<?, ?> e : evts) {
+            if (e.getKey() instanceof GridServiceDeploymentKey) {
+                if (firstTime) {
+                    markCompatibilityStateAsUsed();
 
-                            firstTime = false;
-                        }
+                    firstTime = false;
+                }
 
-                        GridServiceDeployment dep;
+                processDeployment((CacheEntryEvent)e);
+            }
+            else if (e.getKey() instanceof GridServiceAssignmentsKey) {
+                if (firstTime) {
+                    markCompatibilityStateAsUsed();
 
-                        try {
-                            dep = (GridServiceDeployment)e.getValue();
-                        }
-                        catch (IgniteException ex) {
-                            if (X.hasCause(ex, ClassNotFoundException.class))
-                                continue;
-                            else
-                                throw ex;
-                        }
+                    firstTime = false;
+                }
 
-                        if (dep != null) {
-                            svcName.set(dep.configuration().getName());
+                processAssignment((CacheEntryEvent)e);
+            }
+        }
+    }
 
-                            // Ignore other utility cache events.
-                            AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx();
+    /**
+     * @param e Entry.
+     */
+    private void processDeployment(CacheEntryEvent<GridServiceDeploymentKey, GridServiceDeployment> e) {
+        GridServiceDeployment dep;
 
-                            ClusterNode oldest = U.oldest(ctx.discovery().nodes(topVer), null);
+        try {
+            dep = e.getValue();
+        }
+        catch (IgniteException ex) {
+            if (X.hasCause(ex, ClassNotFoundException.class))
+                return;
+            else
+                throw ex;
+        }
 
-                            if (oldest.isLocal())
-                                onDeployment(dep, topVer);
-                        }
-                        // Handle undeployment.
-                        else {
-                            String name = ((GridServiceDeploymentKey)e.getKey()).name();
+        if (dep != null) {
+            svcName.set(dep.configuration().getName());
 
-                            svcName.set(name);
+            // Ignore other utility cache events.
+            AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx();
 
-                            Collection<ServiceContextImpl> ctxs;
+            ClusterNode oldest = U.oldest(ctx.discovery().nodes(topVer), null);
 
-                            synchronized (locSvcs) {
-                                ctxs = locSvcs.remove(name);
-                            }
+            if (oldest.isLocal())
+                onDeployment(dep, topVer);
+        }
+        // Handle undeployment.
+        else {
+            String name = e.getKey().name();
 
-                            if (ctxs != null) {
-                                synchronized (ctxs) {
-                                    cancel(ctxs, ctxs.size());
-                                }
-                            }
+            svcName.set(name);
 
-                            // Finish deployment futures if undeployment happened.
-                            GridFutureAdapter<?> fut = depFuts.remove(name);
+            Collection<ServiceContextImpl> ctxs;
 
-                            if (fut != null)
-                                fut.onDone();
+            synchronized (locSvcs) {
+                ctxs = locSvcs.remove(name);
+            }
 
-                            // Complete undeployment future.
-                            fut = undepFuts.remove(name);
+            if (ctxs != null) {
+                synchronized (ctxs) {
+                    cancel(ctxs, ctxs.size());
+                }
+            }
 
-                            if (fut != null)
-                                fut.onDone();
+            // Finish deployment futures if undeployment happened.
+            GridFutureAdapter<?> fut = depFuts.remove(name);
 
-                            GridServiceAssignmentsKey key = new GridServiceAssignmentsKey(name);
+            if (fut != null)
+                fut.onDone();
 
-                            // Remove assignment on primary node in case of undeploy.
-                            if (cache.cache().affinity().isPrimary(ctx.discovery().localNode(), key)) {
-                                try {
-                                    cache.getAndRemove(key);
-                                }
-                                catch (IgniteCheckedException ex) {
-                                    U.error(log, "Failed to remove assignments for undeployed service: " + name, ex);
-                                }
-                            }
-                        }
-                    }
+            // Complete undeployment future.
+            fut = undepFuts.remove(name);
+
+            if (fut != null)
+                fut.onDone();
+
+            GridServiceAssignmentsKey key = new GridServiceAssignmentsKey(name);
+
+            // Remove assignment on primary node in case of undeploy.
+            if (cache.cache().affinity().isPrimary(ctx.discovery().localNode(), key)) {
+                try {
+                    cache.getAndRemove(key);
                 }
-            });
+                catch (IgniteCheckedException ex) {
+                    U.error(log, "Failed to remove assignments for undeployed service: " + name, ex);
+                }
+            }
         }
+    }
 
         /**
          * Deployment callback.
@@ -1446,7 +1492,6 @@ public class GridServiceProcessor extends GridProcessorAdapter {
                         }
                     }
                 });
-            }
         }
     }
 
@@ -1619,79 +1664,59 @@ public class GridServiceProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * Assignment listener.
+     * @param e Entry.
      */
-    private class AssignmentListener implements CacheEntryUpdatedListener<Object, Object> {
-        /** {@inheritDoc} */
-        @Override public void onUpdated(final Iterable<CacheEntryEvent<?, ?>> assignCol) throws CacheEntryListenerException {
-            depExe.submit(new BusyRunnable() {
-                @Override public void run0() {
-                    boolean firstTime = true;
-
-                    for (CacheEntryEvent<?, ?> e : assignCol) {
-                        if (!(e.getKey() instanceof GridServiceAssignmentsKey))
-                            continue;
-
-                        if (firstTime) {
-                            markCompatibilityStateAsUsed();
-
-                            firstTime = false;
-                        }
+    private void processAssignment(CacheEntryEvent<GridServiceAssignmentsKey, GridServiceAssignments> e) {
+        GridServiceAssignments assigns;
 
-                        GridServiceAssignments assigns;
-
-                        try {
-                            assigns = (GridServiceAssignments)e.getValue();
-                        }
-                        catch (IgniteException ex) {
-                            if (X.hasCause(ex, ClassNotFoundException.class))
-                                continue;
-                            else
-                                throw ex;
-                        }
+        try {
+            assigns = e.getValue();
+        }
+        catch (IgniteException ex) {
+            if (X.hasCause(ex, ClassNotFoundException.class))
+                return;
+            else
+                throw ex;
+        }
 
-                        if (assigns != null) {
-                            svcName.set(assigns.name());
+        if (assigns != null) {
+            svcName.set(assigns.name());
 
-                            Throwable t = null;
+            Throwable t = null;
 
-                            try {
-                                redeploy(assigns);
-                            }
-                            catch (Error | RuntimeException th) {
-                                t = th;
-                            }
+            try {
+                redeploy(assigns);
+            }
+            catch (Error | RuntimeException th) {
+                t = th;
+            }
 
-                            GridServiceDeploymentFuture fut = depFuts.get(assigns.name());
+            GridServiceDeploymentFuture fut = depFuts.get(assigns.name());
 
-                            if (fut != null && fut.configuration().equalsIgnoreNodeFilter(assigns.configuration())) {
-                                depFuts.remove(assigns.name(), fut);
+            if (fut != null && fut.configuration().equalsIgnoreNodeFilter(assigns.configuration())) {
+                depFuts.remove(assigns.name(), fut);
 
-                                // Complete deployment futures once the assignments have been stored in cache.
-                                fut.onDone(null, t);
-                            }
-                        }
-                        // Handle undeployment.
-                        else {
-                            String name = ((GridServiceAssignmentsKey)e.getKey()).name();
+                // Complete deployment futures once the assignments have been stored in cache.
+                fut.onDone(null, t);
+            }
+        }
+        // Handle undeployment.
+        else {
+            String name = e.getKey().name();
 
-                            svcName.set(name);
+            svcName.set(name);
 
-                            Collection<ServiceContextImpl> ctxs;
+            Collection<ServiceContextImpl> ctxs;
 
-                            synchronized (locSvcs) {
-                                ctxs = locSvcs.remove(name);
-                            }
+            synchronized (locSvcs) {
+                ctxs = locSvcs.remove(name);
+            }
 
-                            if (ctxs != null) {
-                                synchronized (ctxs) {
-                                    cancel(ctxs, ctxs.size());
-                                }
-                            }
-                        }
-                    }
+            if (ctxs != null) {
+                synchronized (ctxs) {
+                    cancel(ctxs, ctxs.size());
                 }
-            });
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e5fcebb0/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
index 4c44adc..0ff5883 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
@@ -125,6 +125,7 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe
     }
 
     /**
+     * @param stopFromClient If {@code true} stops listener from client node, otherwise from server.
      * @throws Exception If failed.
      */
     private void testMessageListenerReconnect(boolean stopFromClient) throws Exception {
@@ -178,9 +179,11 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe
         assertTrue(locLsnr.latch.await(5000, MILLISECONDS));
         assertTrue(latch.await(5000, MILLISECONDS));
 
-        log.info("Stop listen, should not get remote messages anymore.");
+        Ignite stopFrom = (stopFromClient ? client : srv);
 
-        (stopFromClient ? client : srv).message().stopRemoteListen(opId);
+        log.info("Stop listen, should not get remote messages anymore [from=" + stopFrom.name() + ']');
+
+        stopFrom.message().stopRemoteListen(opId);
 
         srv.message().send(topic, "msg3");
 
@@ -243,6 +246,59 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testCacheContinuousQueryReconnectNewServer() throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        IgniteCache<Object, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<>());
+
+        CacheEventListener lsnr = new CacheEventListener();
+
+        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+        qry.setAutoUnsubscribe(true);
+
+        qry.setLocalListener(lsnr);
+
+        QueryCursor<?> cur = clientCache.query(qry);
+
+        continuousQueryReconnect(client, clientCache, lsnr);
+
+        // Check new server registers listener for reconnected client.
+        try (Ignite newSrv = startGrid(serverCount() + 1)) {
+            awaitPartitionMapExchange();
+
+            lsnr.latch = new CountDownLatch(10);
+
+            IgniteCache<Object, Object> newSrvCache = newSrv.cache(null);
+
+            for (Integer key : primaryKeys(newSrvCache, 10))
+                newSrvCache.put(key, key);
+
+            assertTrue(lsnr.latch.await(5000, MILLISECONDS));
+        }
+
+        cur.close();
+
+        // Check new server does not register listener for closed query.
+        try (Ignite newSrv = startGrid(serverCount() + 1)) {
+            awaitPartitionMapExchange();
+
+            lsnr.latch = new CountDownLatch(5);
+
+            IgniteCache<Object, Object> newSrvCache = newSrv.cache(null);
+
+            for (Integer key : primaryKeys(newSrvCache, 5))
+                newSrvCache.put(key, key);
+
+            assertFalse(lsnr.latch.await(3000, MILLISECONDS));
+        }
+    }
+
+    /**
      * @param client Client.
      * @param clientCache Client cache.
      * @param lsnr Continuous query listener.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e5fcebb0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
index dbe282e..f372e0d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
@@ -208,7 +208,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
         for (int i = 0; i < gridCount(); i++) {
             GridContinuousProcessor proc = grid(i).context().continuous();
 
-            assertEquals(String.valueOf(i), 3, ((Map)U.field(proc, "locInfos")).size());
+            assertEquals(String.valueOf(i), 2, ((Map)U.field(proc, "locInfos")).size());
             assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "rmtInfos")).size());
             assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "startFuts")).size());
             assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "stopFuts")).size());

http://git-wip-us.apache.org/repos/asf/ignite/blob/e5fcebb0/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java
new file mode 100644
index 0000000..cbeb23f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import com.mchange.v2.c3p0.util.TestUtils;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Sanity test to verify there are no unnecessary messages on node start.
+ */
+public class IgniteNoCustomEventsOnNodeStart extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** */
+    private static volatile boolean failed;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TestTcpDiscoverySpi discoSpi = new TestTcpDiscoverySpi();
+        discoSpi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNoCustomEventsOnStart() throws Exception {
+        failed = false;
+
+        for (int i = 0; i < 5; i++) {
+            client = i % 2 == 1;
+
+            startGrid(i);
+        }
+
+        assertFalse(failed);
+    }
+
+    /**
+     *
+     */
+    static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+        /** {@inheritDoc} */
+        @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) {
+            if (GridTestUtils.getFieldValue(msg, "delegate") instanceof CacheAffinityChangeMessage)
+                return;
+
+            failed = true;
+
+            fail("Should not be called: " + msg);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e5fcebb0/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceClientNodeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceClientNodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceClientNodeTest.java
index c3b9cf4..665294d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceClientNodeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceClientNodeTest.java
@@ -21,6 +21,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -34,47 +35,118 @@ public class GridServiceClientNodeTest extends GridCommonAbstractTest {
     protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
 
     /** */
-    private static final int NODE_CNT = 3;
+    private boolean client;
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
-        ((TcpDiscoverySpi) cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setMaxMissedClientHeartbeats(30);
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setHeartbeatFrequency(1000);
 
-        if (gridName.equals(getTestGridName(NODE_CNT - 1)))
-            cfg.setClientMode(true);
+        cfg.setClientMode(client);
 
         return cfg;
     }
 
     /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        super.beforeTestsStarted();
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
 
-        startGrids(NODE_CNT);
+        super.afterTest();
     }
 
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        stopAllGrids();
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeployFromClient() throws Exception {
+        startGrids(3);
+
+        client = true;
+
+        Ignite ignite = startGrid(3);
+
+        checkDeploy(ignite, "service1");
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testDeployFromClient() throws Exception {
-        Ignite ignite = ignite(NODE_CNT - 1);
+    public void testDeployFromClientAfterRouterStop1() throws Exception {
+        startGrid(0);
 
-        assertTrue(ignite.configuration().isClientMode());
+        client = true;
 
-        String svcName = "testService";
+        Ignite ignite = startGrid(1);
+
+        client = false;
+
+        startGrid(2);
+
+        U.sleep(1000);
+
+        stopGrid(0);
+
+        awaitPartitionMapExchange();
+
+        checkDeploy(ignite, "service1");
+
+        startGrid(3);
+
+        for (int i = 0; i < 10; i++)
+            checkDeploy(ignite, "service2-" + i);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeployFromClientAfterRouterStop2() throws Exception {
+        startGrid(0);
+
+        client = true;
+
+        Ignite ignite = startGrid(1);
+
+        client = false;
+
+        startGrid(2);
+
+        client = true;
+
+        startGrid(3);
+
+        client = false;
+
+        startGrid(4);
+
+        U.sleep(1000);
+
+        stopGrid(0);
+
+        awaitPartitionMapExchange();
+
+        checkDeploy(ignite, "service1");
+
+        startGrid(5);
+
+        for (int i = 0; i < 10; i++)
+            checkDeploy(ignite, "service2-" + i);
+    }
+
+    /**
+     * @param client Client node.
+     * @param svcName Service name.
+     * @throws Exception If failed.
+     */
+    private void checkDeploy(Ignite client, String svcName) throws Exception {
+        assertTrue(client.configuration().isClientMode());
 
         CountDownLatch latch = new CountDownLatch(1);
 
         DummyService.exeLatch(svcName, latch);
 
-        ignite.services().deployClusterSingleton(svcName, new DummyService());
+        client.services().deployClusterSingleton(svcName, new DummyService());
 
         assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e5fcebb0/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index 652643d..dc412a9 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -128,6 +128,7 @@ import org.apache.ignite.internal.processors.cache.local.GridCacheLocalMultithre
 import org.apache.ignite.internal.processors.cache.local.GridCacheLocalTxMultiThreadedSelfTest;
 import org.apache.ignite.internal.processors.cache.local.GridCacheLocalTxSingleThreadedSelfTest;
 import org.apache.ignite.internal.processors.cache.local.GridCacheLocalTxTimeoutSelfTest;
+import org.apache.ignite.internal.processors.continuous.IgniteNoCustomEventsOnNodeStart;
 
 /**
  * Test suite.
@@ -259,6 +260,8 @@ public class IgniteCacheTestSuite2 extends TestSuite {
         suite.addTest(new TestSuite(CacheEnumOperationsTest.class));
         suite.addTest(new TestSuite(IgniteCacheIncrementTxTest.class));
 
+        suite.addTest(new TestSuite(IgniteNoCustomEventsOnNodeStart.class));
+
         return suite;
     }
 }