You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/10/09 16:47:02 UTC

[1/7] ignite git commit: ignite-1624 Tests muted

Repository: ignite
Updated Branches:
  refs/heads/ignite-1093-2 f2da44dcf -> d8bfdce2e


ignite-1624 Tests muted


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f60cba79
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f60cba79
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f60cba79

Branch: refs/heads/ignite-1093-2
Commit: f60cba7986bcde8578e4e0dc4a065fb834514373
Parents: b3d347e
Author: agura <ag...@gridgain.com>
Authored: Wed Oct 7 13:34:33 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Wed Oct 7 13:34:33 2015 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java    | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f60cba79/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 9fbf5b1..0bb9a42 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -1074,6 +1074,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testJoinError() throws Exception {
+        fail("https://issues.apache.org/jira/browse/IGNITE-1624");
+
         startServerNodes(1);
 
         Ignite ignite = G.ignite("server-0");


[6/7] ignite git commit: ignite-1526: full support of IBM JDK by Ignite

Posted by sb...@apache.org.
ignite-1526: full support of IBM JDK by Ignite


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a1e5cc57
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a1e5cc57
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a1e5cc57

Branch: refs/heads/ignite-1093-2
Commit: a1e5cc5783fc4b20f8c01f3b7af17a9ac772afd6
Parents: d917733
Author: Andrey Gura <ag...@gridgain.com>
Authored: Fri Oct 9 13:54:56 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Fri Oct 9 13:54:56 2015 +0300

----------------------------------------------------------------------
 .../discovery/GridDiscoveryManager.java         |   17 +-
 .../internal/portable/PortableContext.java      |    7 +
 .../portable/api/PortableMarshaller.java        |   14 +-
 .../ignite/internal/util/GridJavaProcess.java   |   12 +-
 .../ignite/internal/util/lang/GridFunc.java     |   12 +
 .../apache/ignite/marshaller/Marshaller.java    |    2 +-
 .../optimized/OptimizedMarshallerUtils.java     |    6 +-
 .../cache/GridCacheAbstractFullApiSelfTest.java |  486 ++++---
 .../CacheNoValueClassOnServerNodeTest.java      |    1 +
 ...tomicClientOnlyMultiNodeFullApiSelfTest.java |   71 +-
 ...achePartitionedMultiNodeFullApiSelfTest.java |  129 +-
 .../testframework/junits/GridAbstractTest.java  |  116 +-
 .../junits/IgniteTestResources.java             |    8 +-
 .../junits/common/GridCommonAbstractTest.java   |   15 +-
 .../junits/multijvm/AffinityProcessProxy.java   |  440 ++++--
 .../multijvm/IgniteCacheProcessProxy.java       | 1346 ++++++++++++++----
 .../multijvm/IgniteClusterProcessProxy.java     |  115 +-
 .../multijvm/IgniteEventsProcessProxy.java      |   50 +-
 .../junits/multijvm/IgniteNodeRunner.java       |   39 +-
 .../junits/multijvm/IgniteProcessProxy.java     |  107 +-
 .../cache/CacheConfigurationP2PTest.java        |    3 +
 21 files changed, 2185 insertions(+), 811 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e5cc57/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index a6f5f08..2ed4520 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -987,19 +987,24 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
         boolean locP2pEnabled = locNode.attribute(ATTR_PEER_CLASSLOADING);
 
-        boolean warned = false;
+        boolean ipV4Warned = false;
+
+        boolean jvmMajVerWarned = false;
 
         for (ClusterNode n : nodes) {
             int rmtJvmMajVer = nodeJavaMajorVersion(n);
 
-            if (locJvmMajVer != rmtJvmMajVer)
-                throw new IgniteCheckedException("Local node's java major version is different from remote node's one" +
-                    " [locJvmMajVer=" + locJvmMajVer + ", rmtJvmMajVer=" + rmtJvmMajVer + "]");
+            if (locJvmMajVer != rmtJvmMajVer && !jvmMajVerWarned) {
+                U.warn(log, "Local java version is different from remote [loc=" +
+                    locJvmMajVer + ", rmt=" + rmtJvmMajVer + "]");
+
+                jvmMajVerWarned = true;
+            }
 
             String rmtPreferIpV4 = n.attribute("java.net.preferIPv4Stack");
 
             if (!F.eq(rmtPreferIpV4, locPreferIpV4)) {
-                if (!warned)
+                if (!ipV4Warned)
                     U.warn(log, "Local node's value of 'java.net.preferIPv4Stack' " +
                         "system property differs from remote node's " +
                         "(all nodes in topology should have identical value) " +
@@ -1008,7 +1013,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                         ", rmtAddrs=" + U.addressesAsString(n) + ']',
                         "Local and remote 'java.net.preferIPv4Stack' system properties do not match.");
 
-                warned = true;
+                ipV4Warned = true;
             }
 
             // Daemon nodes are allowed to have any deployment they need.

http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e5cc57/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
index 2ee96b7..1ad42ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
@@ -967,6 +967,9 @@ public class PortableContext implements Externalizable {
         }
     }
 
+    /**
+     * Basic class ID mapper.
+     */
     private static class BasicClassIdMapper implements PortableIdMapper {
         /** {@inheritDoc} */
         @Override public int typeId(String clsName) {
@@ -1121,6 +1124,10 @@ public class PortableContext implements Externalizable {
         /** Whether the following type is registered in a cache or not */
         private final boolean registered;
 
+        /**
+         * @param id Id.
+         * @param registered Registered.
+         */
         public Type(int id, boolean registered) {
             this.id = id;
             this.registered = registered;

http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e5cc57/modules/core/src/main/java/org/apache/ignite/internal/portable/api/PortableMarshaller.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/api/PortableMarshaller.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/api/PortableMarshaller.java
index de0df8d..3dfbdf0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/api/PortableMarshaller.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/api/PortableMarshaller.java
@@ -29,12 +29,6 @@ import org.apache.ignite.internal.portable.GridPortableMarshaller;
 import org.apache.ignite.internal.portable.PortableContext;
 import org.apache.ignite.marshaller.AbstractMarshaller;
 import org.apache.ignite.marshaller.MarshallerContext;
-import org.apache.ignite.internal.portable.api.PortableException;
-import org.apache.ignite.internal.portable.api.PortableIdMapper;
-import org.apache.ignite.internal.portable.api.PortableObject;
-import org.apache.ignite.internal.portable.api.PortableProtocolVersion;
-import org.apache.ignite.internal.portable.api.PortableSerializer;
-import org.apache.ignite.internal.portable.api.PortableTypeConfiguration;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -336,7 +330,7 @@ public class PortableMarshaller extends AbstractMarshaller {
 
     /** {@inheritDoc} */
     @Override public <T> T unmarshal(InputStream in, @Nullable ClassLoader clsLdr) throws IgniteCheckedException {
-        ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+        ByteArrayOutputStream buf = new ByteArrayOutputStream();
 
         byte[] arr = new byte[4096];
         int cnt;
@@ -345,11 +339,11 @@ public class PortableMarshaller extends AbstractMarshaller {
         // returns number of bytes remaining.
         try {
             while ((cnt = in.read(arr)) != -1)
-                buffer.write(arr, 0, cnt);
+                buf.write(arr, 0, cnt);
 
-            buffer.flush();
+            buf.flush();
 
-            return impl.deserialize(buffer.toByteArray(), clsLdr);
+            return impl.deserialize(buf.toByteArray(), clsLdr);
         }
         catch (IOException e) {
             throw new PortableException("Failed to unmarshal the object from InputStream", e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e5cc57/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java
index 92c20fe..3371eb8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java
@@ -89,7 +89,7 @@ public final class GridJavaProcess {
      */
     public static GridJavaProcess exec(Class cls, String params, @Nullable IgniteLogger log,
         @Nullable IgniteInClosure<String> printC, @Nullable GridAbsClosure procKilledC) throws Exception {
-        return exec(cls.getCanonicalName(), params, log, printC, procKilledC, null, null);
+        return exec(cls.getCanonicalName(), params, log, printC, procKilledC, null, null, null);
     }
 
     /**
@@ -108,7 +108,7 @@ public final class GridJavaProcess {
     public static GridJavaProcess exec(Class cls, String params, @Nullable IgniteLogger log,
         @Nullable IgniteInClosure<String> printC, @Nullable GridAbsClosure procKilledC,
         @Nullable Collection<String> jvmArgs, @Nullable String cp) throws Exception {
-        return exec(cls.getCanonicalName(), params, log, printC, procKilledC, jvmArgs, cp);
+        return exec(cls.getCanonicalName(), params, log, printC, procKilledC, null, jvmArgs, cp);
     }
 
     /**
@@ -116,9 +116,10 @@ public final class GridJavaProcess {
      *
      * @param clsName Class with main() method to be run.
      * @param params main() method parameters.
+     * @param log Log to use.
      * @param printC Optional closure to be called each time wrapped process prints line to system.out or system.err.
      * @param procKilledC Optional closure to be called when process termination is detected.
-     * @param log Log to use.
+     * @param javaHome Java home location. The process will be started under given JVM.
      * @param jvmArgs JVM arguments to use.
      * @param cp Additional classpath.
      * @return Wrapper around {@link Process}
@@ -126,7 +127,7 @@ public final class GridJavaProcess {
      */
     public static GridJavaProcess exec(String clsName, String params, @Nullable IgniteLogger log,
         @Nullable IgniteInClosure<String> printC, @Nullable GridAbsClosure procKilledC,
-        @Nullable Collection<String> jvmArgs, @Nullable String cp) throws Exception {
+        @Nullable String javaHome, @Nullable Collection<String> jvmArgs, @Nullable String cp) throws Exception {
         if (!(U.isLinux() || U.isMacOs() || U.isWindows()))
             throw new Exception("Your OS is not supported.");
 
@@ -140,7 +141,8 @@ public final class GridJavaProcess {
 
         List<String> procCommands = new ArrayList<>();
 
-        String javaBin = System.getProperty("java.home") + File.separator + "bin" + File.separator + "java";
+        String javaBin = (javaHome == null ? System.getProperty("java.home") : javaHome) +
+            File.separator + "bin" + File.separator + "java";
 
         procCommands.add(javaBin);
         procCommands.addAll(jvmArgs == null ? U.jvmArgs() : jvmArgs);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e5cc57/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index ffeeca0..43bc5f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -103,6 +103,9 @@ public class GridFunc {
 
     /** */
     private static final IgniteClosure IDENTITY = new C1() {
+        /** */
+        private static final long serialVersionUID = -6338573080046225172L;
+
         @Override public Object apply(Object o) {
             return o;
         }
@@ -1196,6 +1199,9 @@ public class GridFunc {
         A.notNull(nodeId, "nodeId");
 
         return new P1<T>() {
+            /** */
+            private static final long serialVersionUID = -7082730222779476623L;
+
             @Override public boolean apply(ClusterNode e) {
                 return e.id().equals(nodeId);
             }
@@ -1705,6 +1711,9 @@ public class GridFunc {
         assert c != null;
 
         return new GridSerializableList<T2>() {
+            /** */
+            private static final long serialVersionUID = 3126625219739967068L;
+
             @Override public T2 get(int idx) {
                 return trans.apply(c.get(idx));
             }
@@ -1766,6 +1775,9 @@ public class GridFunc {
         assert m != null;
 
         return isEmpty(p) || isAlwaysTrue(p) ? m : new GridSerializableMap<K, V>() {
+            /** */
+            private static final long serialVersionUID = 5531745605372387948L;
+
             /** Entry predicate. */
             private IgnitePredicate<Entry<K, V>> ep = new P1<Map.Entry<K, V>>() {
                 @Override public boolean apply(Entry<K, V> e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e5cc57/modules/core/src/main/java/org/apache/ignite/marshaller/Marshaller.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/Marshaller.java b/modules/core/src/main/java/org/apache/ignite/marshaller/Marshaller.java
index 3e815fd..a76daa8 100644
--- a/modules/core/src/main/java/org/apache/ignite/marshaller/Marshaller.java
+++ b/modules/core/src/main/java/org/apache/ignite/marshaller/Marshaller.java
@@ -92,7 +92,7 @@ public interface Marshaller {
     public byte[] marshal(@Nullable Object obj) throws IgniteCheckedException;
 
     /**
-     * Unmarshals object from the output stream using given class loader.
+     * Unmarshals object from the input stream using given class loader.
      * This method should not close given input stream.
      *
      * @param <T> Type of unmarshalled object.

http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e5cc57/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java
index 4abbd04..584083c 100644
--- a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java
@@ -42,6 +42,10 @@ class OptimizedMarshallerUtils {
     /** */
     private static final Unsafe UNSAFE = GridUnsafe.unsafe();
 
+    /** Use default {@code serialVersionUid} for {@link Serializable} classes. */
+    private static final boolean USE_DFLT_SUID =
+        Boolean.valueOf(System.getProperty("ignite.marsh.optimized.useDefaultSUID", Boolean.TRUE.toString()));
+
     /** */
     static final long HASH_SET_MAP_OFF;
 
@@ -283,7 +287,7 @@ class OptimizedMarshallerUtils {
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
     static short computeSerialVersionUid(Class cls, List<Field> fields) throws IOException {
-        if (Serializable.class.isAssignableFrom(cls) && !Enum.class.isAssignableFrom(cls))
+        if (USE_DFLT_SUID && Serializable.class.isAssignableFrom(cls) && !Enum.class.isAssignableFrom(cls))
             return (short)ObjectStreamClass.lookup(cls).getSerialVersionUID();
 
         MessageDigest md;

http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e5cc57/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 2a64963..ec3ea0c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -49,6 +50,7 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteTransactions;
 import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.cache.CacheMemoryMode;
@@ -75,6 +77,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.resources.LoggerResource;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.swapspace.inmemory.GridTestSwapSpaceSpi;
 import org.apache.ignite.testframework.GridTestUtils;
@@ -117,48 +120,37 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
     /** */
     public static final CacheEntryProcessor<String, Integer, String> ERR_PROCESSOR =
         new CacheEntryProcessor<String, Integer, String>() {
+            /** */
+            private static final long serialVersionUID = 0L;
+
             @Override public String process(MutableEntry<String, Integer> e, Object... args) {
                 throw new RuntimeException("Failed!");
             }
         };
 
     /** Increment processor for invoke operations. */
-    public static final EntryProcessor<String, Integer, String> INCR_PROCESSOR = new EntryProcessor<String, Integer, String>() {
-        @Override public String process(MutableEntry<String, Integer> e, Object... args) {
-            assertNotNull(e.getKey());
-
-            Integer old = e.getValue();
-
-            e.setValue(old == null ? 1 : old + 1);
-
-            return String.valueOf(old);
-        }
-    };
+    public static final EntryProcessor<String, Integer, String> INCR_PROCESSOR = new IncrementEntryProcessor();
 
     /** Increment processor for invoke operations with IgniteEntryProcessor. */
     public static final CacheEntryProcessor<String, Integer, String> INCR_IGNITE_PROCESSOR =
         new CacheEntryProcessor<String, Integer, String>() {
+            /** */
+            private static final long serialVersionUID = 0L;
+
             @Override public String process(MutableEntry<String, Integer> e, Object... args) {
                 return INCR_PROCESSOR.process(e, args);
             }
         };
 
     /** Increment processor for invoke operations. */
-    public static final EntryProcessor<String, Integer, String> RMV_PROCESSOR = new EntryProcessor<String, Integer, String>() {
-        @Override public String process(MutableEntry<String, Integer> e, Object... args) {
-            assertNotNull(e.getKey());
-
-            Integer old = e.getValue();
-
-            e.remove();
-
-            return String.valueOf(old);
-        }
-    };
+    public static final EntryProcessor<String, Integer, String> RMV_PROCESSOR = new RemoveEntryProcessor();
 
     /** Increment processor for invoke operations with IgniteEntryProcessor. */
     public static final CacheEntryProcessor<String, Integer, String> RMV_IGNITE_PROCESSOR =
         new CacheEntryProcessor<String, Integer, String>() {
+            /** */
+            private static final long serialVersionUID = 0L;
+
             @Override public String process(MutableEntry<String, Integer> e, Object... args) {
                 return RMV_PROCESSOR.process(e, args);
             }
@@ -346,21 +338,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
             assert jcache(i).localSize() != 0 || F.isEmpty(keysCol);
         }
 
-        for (int i = 0; i < gridCount(); i++) {
-            executeOnLocalOrRemoteJvm(i, new TestIgniteIdxRunnable() {
-                @Override public void run(int idx) throws Exception {
-                    GridCacheContext<String, Integer> ctx = context(idx);
-
-                    int sum = 0;
-
-                    for (String key : map.keySet())
-                        if (ctx.affinity().localNode(key, new AffinityTopologyVersion(ctx.discovery().topologyVersion())))
-                            sum++;
-
-                    assertEquals("Incorrect key size on cache #" + idx, sum, jcache(idx).localSize(ALL));
-                }
-            });
-        }
+        for (int i = 0; i < gridCount(); i++)
+            executeOnLocalOrRemoteJvm(i, new CheckCacheSizeTask(map));
 
         for (int i = 0; i < gridCount(); i++) {
             Collection<String> keysCol = mapped.get(grid(i).localNode());
@@ -1350,13 +1329,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
 
         assertEquals((Integer)3, cache.get("k1"));
 
-        EntryProcessor<String, Integer, Integer> c = new EntryProcessor<String, Integer, Integer>() {
-            @Override public Integer process(MutableEntry<String, Integer> e, Object... args) {
-                e.remove();
-
-                return null;
-            }
-        };
+        EntryProcessor<String, Integer, Integer> c = new RemoveAndReturnNullEntryProcessor();
 
         assertNull(cache.invoke("k1", c));
         assertNull(cache.get("k1"));
@@ -1364,11 +1337,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
         for (int i = 0; i < gridCount(); i++)
             assertNull(jcache(i).localPeek("k1", ONHEAP));
 
-        final EntryProcessor<String, Integer, Integer> errProcessor = new EntryProcessor<String, Integer, Integer>() {
-            @Override public Integer process(MutableEntry<String, Integer> e, Object... args) {
-                throw new EntryProcessorException("Test entry processor exception.");
-            }
-        };
+        final EntryProcessor<String, Integer, Integer> errProcessor = new FailedEntryProcessor();
 
         GridTestUtils.assertThrows(log, new Callable<Void>() {
             @Override public Void call() throws Exception {
@@ -2001,7 +1970,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
             assertFalse(cacheAsync.<Boolean>future().get());
         }
 
-        cache.localEvict(Arrays.asList("key2"));
+        cache.localEvict(Collections.singletonList("key2"));
 
         // Same checks inside tx.
         Transaction tx = inTx ? transactions().txStart() : null;
@@ -2357,27 +2326,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
             for (int i = 0; i < cnt; i++)
                 cache.remove(String.valueOf(i));
 
-            for (int g = 0; g < gridCount(); g++) {
-                executeOnLocalOrRemoteJvm(g, new TestIgniteIdxRunnable() {
-                    @Override public void run(int idx) throws Exception {
-                        for (int i = 0; i < cnt; i++) {
-                            String key = String.valueOf(i);
-
-                            GridCacheContext<String, Integer> cctx = context(idx);
-
-                            GridCacheEntryEx entry = cctx.isNear() ? cctx.near().dht().peekEx(key) :
-                                cctx.cache().peekEx(key);
-
-                            if (grid(idx).affinity(null).mapKeyToPrimaryAndBackups(key).contains(grid(idx).localNode())) {
-                                assertNotNull(entry);
-                                assertTrue(entry.deleted());
-                            }
-                            else
-                                assertNull(entry);
-                        }
-                    }
-                });
-            }
+            for (int g = 0; g < gridCount(); g++)
+                executeOnLocalOrRemoteJvm(g, new CheckEntriesDeletedTask(cnt));
         }
     }
 
@@ -2587,8 +2537,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
         c.add(null);
 
         GridTestUtils.assertThrows(log, new Callable<Void>() {
-            @Override
-            public Void call() throws Exception {
+            @Override public Void call() throws Exception {
                 cache.removeAll(c);
 
                 return null;
@@ -2725,7 +2674,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
     public void testRemoveAfterClear() throws Exception {
         IgniteEx ignite = grid(0);
 
-        boolean affNode = ((IgniteKernal)ignite).context().cache().internalCache(null).context().affinityNode();
+        boolean affNode = ignite.context().cache().internalCache(null).context().affinityNode();
 
         if (!affNode) {
             if (gridCount() < 2)
@@ -2766,13 +2715,6 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
     }
 
     /**
-     *
-     */
-    private void xxx() {
-        System.out.printf("");
-    }
-
-    /**
      * @throws Exception In case of error.
      */
     public void testClear() throws Exception {
@@ -3597,26 +3539,9 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
             info("Local keys (primary + backup): " + locKeys);
         }
 
-        for (int i = 0; i < gridCount(); i++) {
-            grid(i).events().localListen(new IgnitePredicate<Event>() {
-                @Override public boolean apply(Event evt) {
-                    info("Received event: " + evt);
-
-                    switch (evt.type()) {
-                        case EVT_CACHE_OBJECT_SWAPPED:
-                            swapEvts.incrementAndGet();
-
-                            break;
-                        case EVT_CACHE_OBJECT_UNSWAPPED:
-                            unswapEvts.incrementAndGet();
-
-                            break;
-                    }
-
-                    return true;
-                }
-            }, EVT_CACHE_OBJECT_SWAPPED, EVT_CACHE_OBJECT_UNSWAPPED);
-        }
+        for (int i = 0; i < gridCount(); i++)
+            grid(i).events().localListen(
+                new SwapEvtsLocalListener(swapEvts, unswapEvts), EVT_CACHE_OBJECT_SWAPPED, EVT_CACHE_OBJECT_UNSWAPPED);
 
         cache.localEvict(F.asList(k2, k3));
 
@@ -3934,7 +3859,6 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
                 map.put(entry.getKey(), entry.getValue());
         }
 
-        assert map != null;
         assert map.size() == 2;
         assert map.get("key1") == 1;
         assert map.get("key2") == 2;
@@ -3951,32 +3875,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
         if (nearEnabled())
             assertEquals(keys.size(), jcache().localSize(CachePeekMode.ALL));
         else {
-            for (int i = 0; i < gridCount(); i++) {
-                executeOnLocalOrRemoteJvm(i, new TestIgniteIdxRunnable() {
-                    @Override public void run(int idx) throws Exception {
-                        GridCacheContext<String, Integer> ctx = context(idx);
-
-                        if (ctx.cache().configuration().getMemoryMode() == OFFHEAP_TIERED)
-                            return;
-
-                        int size = 0;
-
-                        for (String key : keys) {
-                            if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx())) {
-                                GridCacheEntryEx e =
-                                    ctx.isNear() ? ctx.near().dht().peekEx(key) : ctx.cache().peekEx(key);
-
-                                assert e != null : "Entry is null [idx=" + idx + ", key=" + key + ", ctx=" + ctx + ']';
-                                assert !e.deleted() : "Entry is deleted: " + e;
-
-                                size++;
-                            }
-                        }
-
-                        assertEquals("Incorrect size on cache #" + idx, size, jcache(idx).localSize(ALL));
-                    }
-                });
-            }
+            for (int i = 0; i < gridCount(); i++)
+                executeOnLocalOrRemoteJvm(i, new CheckEntriesTask(keys));
         }
     }
 
@@ -3989,21 +3889,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
             assertEquals("Invalid key size: " + jcache().localSize(ALL),
                 keys.size(), jcache().localSize(ALL));
         else {
-            for (int i = 0; i < gridCount(); i++) {
-                executeOnLocalOrRemoteJvm(i, new TestIgniteIdxRunnable() {
-                    @Override public void run(int idx) throws Exception {
-                        GridCacheContext<String, Integer> ctx = context(idx);
-
-                        int size = 0;
-
-                        for (String key : keys)
-                            if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx()))
-                                size++;
-
-                        assertEquals("Incorrect key size on cache #" + idx, size, jcache(idx).localSize(ALL));
-                    }
-                });
-            }
+            for (int i = 0; i < gridCount(); i++)
+                executeOnLocalOrRemoteJvm(i, new CheckKeySizeTask(keys));
         }
     }
 
@@ -4061,27 +3948,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
      * @param cnt Keys count.
      * @return Collection of keys for which given cache is primary.
      */
-    protected List<String> primaryKeysForCache(final IgniteCache<String, Integer> cache, final int cnt, final int startFrom) {
-        return executeOnLocalOrRemoteJvm(cache, new TestCacheCallable<String, Integer, List<String>>() {
-            @Override public List<String> call(Ignite ignite, IgniteCache<String, Integer> cache) throws Exception {
-                List<String> found = new ArrayList<>();
-
-                Affinity<Object> affinity = ignite.affinity(cache.getName());
-
-                for (int i = startFrom; i < startFrom + 100_000; i++) {
-                    String key = "key" + i;
-
-                    if (affinity.isPrimary(ignite.cluster().localNode(), key)) {
-                        found.add(key);
-
-                        if (found.size() == cnt)
-                            return found;
-                    }
-                }
-
-                throw new IgniteException("Unable to find " + cnt + " keys as primary for cache.");
-            }
-        });
+    protected List<String> primaryKeysForCache(IgniteCache<String, Integer> cache, int cnt, int startFrom) {
+        return executeOnLocalOrRemoteJvm(cache, new CheckPrimaryKeysTask(startFrom, cnt));
     }
 
     /**
@@ -4272,18 +4140,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
      * Checks iterators are cleared.
      */
     private void checkIteratorsCleared() {
-        for (int j = 0; j < gridCount(); j++) {
-            executeOnLocalOrRemoteJvm(j, new TestIgniteIdxRunnable() {
-                @Override public void run(int idx) throws Exception {
-                    GridCacheQueryManager queries = context(idx).queries();
-
-                    Map map = GridTestUtils.getFieldValue(queries, GridCacheQueryManager.class, "qryIters");
-
-                    for (Object obj : map.values())
-                        assertEquals("Iterators not removed for grid " + idx, 0, ((Map)obj).size());
-                }
-            });
-        }
+        for (int j = 0; j < gridCount(); j++)
+            executeOnLocalOrRemoteJvm(j, new CheckIteratorTask());
     }
 
     /**
@@ -5226,4 +5084,280 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
         /** */
         ONE_BY_ONE
     }
+
+    /**
+     *
+     */
+    private static class RemoveEntryProcessor implements EntryProcessor<String, Integer, String>, Serializable {
+        /** {@inheritDoc} */
+        @Override public String process(MutableEntry<String, Integer> e, Object... args) {
+            assertNotNull(e.getKey());
+
+            Integer old = e.getValue();
+
+            e.remove();
+
+            return String.valueOf(old);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class IncrementEntryProcessor implements EntryProcessor<String, Integer, String>, Serializable {
+        /** {@inheritDoc} */
+        @Override public String process(MutableEntry<String, Integer> e, Object... args) {
+            assertNotNull(e.getKey());
+
+            Integer old = e.getValue();
+
+            e.setValue(old == null ? 1 : old + 1);
+
+            return String.valueOf(old);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class CheckEntriesTask extends TestIgniteIdxRunnable {
+        /** Keys. */
+        private final Collection<String> keys;
+
+        /**
+         * @param keys Keys.
+         */
+        public CheckEntriesTask(Collection<String> keys) {
+            this.keys = keys;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run(int idx) throws Exception {
+            GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String, Integer>internalCache().context();
+
+            if (ctx.cache().configuration().getMemoryMode() == OFFHEAP_TIERED)
+                return;
+
+            int size = 0;
+
+            for (String key : keys) {
+                if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx())) {
+                    GridCacheEntryEx e =
+                        ctx.isNear() ? ctx.near().dht().peekEx(key) : ctx.cache().peekEx(key);
+
+                    assert e != null : "Entry is null [idx=" + idx + ", key=" + key + ", ctx=" + ctx + ']';
+                    assert !e.deleted() : "Entry is deleted: " + e;
+
+                    size++;
+                }
+            }
+
+            assertEquals("Incorrect size on cache #" + idx, size, ignite.cache(ctx.name()).localSize(ALL));
+        }
+    }
+
+    /**
+     *
+     */
+    private static class CheckCacheSizeTask extends TestIgniteIdxRunnable {
+        private final Map<String, Integer> map;
+
+        /**
+         * @param map Map.
+         */
+        public CheckCacheSizeTask(Map<String, Integer> map) {
+            this.map = map;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run(int idx) throws Exception {
+            GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String, Integer>internalCache().context();
+
+            int size = 0;
+
+            for (String key : map.keySet())
+                if (ctx.affinity().localNode(key, new AffinityTopologyVersion(ctx.discovery().topologyVersion())))
+                    size++;
+
+            assertEquals("Incorrect key size on cache #" + idx, size, ignite.cache(ctx.name()).localSize(ALL));
+        }
+    }
+
+    /**
+     *
+     */
+    private static class CheckPrimaryKeysTask implements TestCacheCallable<String, Integer, List<String>> {
+        /** Start from. */
+        private final int startFrom;
+
+        /** Count. */
+        private final int cnt;
+
+        /**
+         * @param startFrom Start from.
+         * @param cnt Count.
+         */
+        public CheckPrimaryKeysTask(int startFrom, int cnt) {
+            this.startFrom = startFrom;
+            this.cnt = cnt;
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<String> call(Ignite ignite, IgniteCache<String, Integer> cache) throws Exception {
+            List<String> found = new ArrayList<>();
+
+            Affinity<Object> affinity = ignite.affinity(cache.getName());
+
+            for (int i = startFrom; i < startFrom + 100_000; i++) {
+                String key = "key" + i;
+
+                if (affinity.isPrimary(ignite.cluster().localNode(), key)) {
+                    found.add(key);
+
+                    if (found.size() == cnt)
+                        return found;
+                }
+            }
+
+            throw new IgniteException("Unable to find " + cnt + " keys as primary for cache.");
+        }
+    }
+
+    /**
+     *
+     */
+    private static class CheckIteratorTask extends TestIgniteIdxCallable<Void> {
+        /**
+         * @param idx Index.
+         */
+        @Override public Void call(int idx) throws Exception {
+            GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String, Integer>internalCache().context();
+            GridCacheQueryManager queries = ctx.queries();
+
+            Map map = GridTestUtils.getFieldValue(queries, GridCacheQueryManager.class, "qryIters");
+
+            for (Object obj : map.values())
+                assertEquals("Iterators not removed for grid " + idx, 0, ((Map)obj).size());
+
+            return null;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class RemoveAndReturnNullEntryProcessor implements
+        EntryProcessor<String, Integer, Integer>, Serializable {
+
+        /** {@inheritDoc} */
+        @Override public Integer process(MutableEntry<String, Integer> e, Object... args) {
+            e.remove();
+
+            return null;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class SwapEvtsLocalListener implements IgnitePredicate<Event> {
+        @LoggerResource
+        private IgniteLogger log;
+
+        /** Swap events. */
+        private final AtomicInteger swapEvts;
+
+        /** Unswap events. */
+        private final AtomicInteger unswapEvts;
+
+        /**
+         * @param swapEvts Swap events.
+         * @param unswapEvts Unswap events.
+         */
+        public SwapEvtsLocalListener(AtomicInteger swapEvts, AtomicInteger unswapEvts) {
+            this.swapEvts = swapEvts;
+            this.unswapEvts = unswapEvts;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(Event evt) {
+            log.info("Received event: " + evt);
+
+            switch (evt.type()) {
+                case EVT_CACHE_OBJECT_SWAPPED:
+                    swapEvts.incrementAndGet();
+
+                    break;
+                case EVT_CACHE_OBJECT_UNSWAPPED:
+                    unswapEvts.incrementAndGet();
+
+                    break;
+            }
+
+            return true;
+        }
+    }
+
+    private static class CheckEntriesDeletedTask extends TestIgniteIdxRunnable {
+        private final int cnt;
+
+        public CheckEntriesDeletedTask(int cnt) {
+            this.cnt = cnt;
+        }
+
+        @Override public void run(int idx) throws Exception {
+            for (int i = 0; i < cnt; i++) {
+                String key = String.valueOf(i);
+
+                GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String, Integer>internalCache().context();
+
+                GridCacheEntryEx entry = ctx.isNear() ? ctx.near().dht().peekEx(key) : ctx.cache().peekEx(key);
+
+                if (ignite.affinity(null).mapKeyToPrimaryAndBackups(key).contains(((IgniteKernal)ignite).localNode())) {
+                    assertNotNull(entry);
+                    assertTrue(entry.deleted());
+                }
+                else
+                    assertNull(entry);
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private static class CheckKeySizeTask extends TestIgniteIdxRunnable {
+        /** Keys. */
+        private final Collection<String> keys;
+
+        /**
+         * @param keys Keys.
+         */
+        public CheckKeySizeTask(Collection<String> keys) {
+            this.keys = keys;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run(int idx) throws Exception {
+            GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String, Integer>internalCache().context();
+
+            int size = 0;
+
+            for (String key : keys)
+                if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx()))
+                    size++;
+
+            assertEquals("Incorrect key size on cache #" + idx, size, ignite.cache(null).localSize(ALL));
+        }
+    }
+
+    /**
+     *
+     */
+    private static class FailedEntryProcessor implements EntryProcessor<String, Integer, Integer>, Serializable {
+        /** {@inheritDoc} */
+        @Override public Integer process(MutableEntry<String, Integer> e, Object... args) {
+            throw new EntryProcessorException("Test entry processor exception.");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e5cc57/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheNoValueClassOnServerNodeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheNoValueClassOnServerNodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheNoValueClassOnServerNodeTest.java
index da694b5..c6ce81e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheNoValueClassOnServerNodeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheNoValueClassOnServerNodeTest.java
@@ -111,6 +111,7 @@ public class CacheNoValueClassOnServerNodeTest extends GridCommonAbstractTest {
                         }
                     },
                     null,
+                    null,
                     jvmArgs,
                     cp
                 );

http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e5cc57/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest.java
index 1511c45..927ee62 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -29,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import javax.cache.expiry.Duration;
 import javax.cache.expiry.TouchedExpiryPolicy;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -37,6 +39,7 @@ import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.resources.LoggerResource;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
@@ -106,9 +109,11 @@ public class GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest extends GridCache
 
         affinityNodes(); // Just to ack cache configuration to log..
 
-        checkKeySize(map.keySet());
+        Set<String> keys = new LinkedHashSet<>(map.keySet());
 
-        checkSize(map.keySet());
+        checkKeySize(keys);
+
+        checkSize(keys);
 
         int fullCacheSize = 0;
 
@@ -317,24 +322,8 @@ public class GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest extends GridCache
         Collection<String> locKeys = new HashSet<>();
 
         for (int i = 0; i < gridCount(); i++) {
-            grid(i).events().localListen(new IgnitePredicate<Event>() {
-                @Override public boolean apply(Event evt) {
-                    info("Received event: " + evt);
-
-                    switch (evt.type()) {
-                        case EVT_CACHE_OBJECT_SWAPPED:
-                            swapEvts.incrementAndGet();
-
-                            break;
-                        case EVT_CACHE_OBJECT_UNSWAPPED:
-                            unswapEvts.incrementAndGet();
-
-                            break;
-                    }
-
-                    return true;
-                }
-            }, EVT_CACHE_OBJECT_SWAPPED, EVT_CACHE_OBJECT_UNSWAPPED);
+            grid(i).events().localListen(
+                new LocalListener(swapEvts, unswapEvts), EVT_CACHE_OBJECT_SWAPPED, EVT_CACHE_OBJECT_UNSWAPPED);
         }
 
         cache.localEvict(Collections.singleton(k2));
@@ -416,4 +405,46 @@ public class GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest extends GridCache
         assertEquals(cnt, swapEvts.get());
         assertEquals(cnt, unswapEvts.get());
     }
+
+    /**
+     *
+     */
+    private static class LocalListener implements IgnitePredicate<Event> {
+        /** Logger. */
+        @LoggerResource
+        private IgniteLogger log;
+
+        /** Swap events. */
+        private final AtomicInteger swapEvts;
+
+        /** Unswap events. */
+        private final AtomicInteger unswapEvts;
+
+        /**
+         * @param swapEvts Swap events.
+         * @param unswapEvts Unswap events.
+         */
+        public LocalListener(AtomicInteger swapEvts, AtomicInteger unswapEvts) {
+            this.swapEvts = swapEvts;
+            this.unswapEvts = unswapEvts;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(Event evt) {
+            log.info("Received event: " + evt);
+
+            switch (evt.type()) {
+                case EVT_CACHE_OBJECT_SWAPPED:
+                    swapEvts.incrementAndGet();
+
+                    break;
+                case EVT_CACHE_OBJECT_UNSWAPPED:
+                    unswapEvts.incrementAndGet();
+
+                    break;
+            }
+
+            return true;
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e5cc57/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java
index c04bf2e..a2440e2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java
@@ -20,11 +20,14 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteAtomicLong;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheMemoryMode;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.affinity.Affinity;
@@ -36,6 +39,8 @@ import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
 
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
@@ -93,7 +98,7 @@ public class GridCachePartitionedMultiNodeFullApiSelfTest extends GridCacheParti
 
         atomicClockModeDelay(c0);
 
-        c1.removeAll(putMap.keySet());
+        c1.removeAll(new HashSet<>(putMap.keySet()));
 
         for (int i = 0; i < size; i++) {
             assertNull(c0.get(i));
@@ -159,22 +164,8 @@ public class GridCachePartitionedMultiNodeFullApiSelfTest extends GridCacheParti
             info("Finished putting value [i=" + i + ']');
         }
 
-        for (int i = 0; i < gridCount(); i++) {
-            executeOnLocalOrRemoteJvm(i, new TestIgniteIdxRunnable() {
-                @Override public void run(int idx) throws Exception {
-                    assertEquals(0, context(idx).tm().idMapSize());
-
-                    IgniteCache<Object, Object> cache = grid(idx).cache(null);
-                    ClusterNode node = grid(idx).localNode();
-
-                    for (int k = 0; k < size; k++) {
-                        if (affinity(cache).isPrimaryOrBackup(node, k))
-                            assertEquals("Check failed for node: " + node.id(), k,
-                                cache.localPeek(k, CachePeekMode.ONHEAP, CachePeekMode.OFFHEAP));
-                    }
-                }
-            });
-        }
+        for (int i = 0; i < gridCount(); i++)
+            executeOnLocalOrRemoteJvm(i, new CheckAffinityTask(size));
 
         for (int i = 0; i < size; i++) {
             info("Putting value 2 [i=" + i + ']');
@@ -199,28 +190,9 @@ public class GridCachePartitionedMultiNodeFullApiSelfTest extends GridCacheParti
 
         final IgniteAtomicLong unswapEvts = grid(0).atomicLong("unswapEvts", 0, true);
 
-        for (int i = 0; i < gridCount(); i++) {
-            final int iCopy = i;
-
-            grid(i).events().localListen(new IgnitePredicate<Event>() {
-                @Override public boolean apply(Event evt) {
-                    info("Received event: " + evt);
-
-                    switch (evt.type()) {
-                        case EVT_CACHE_OBJECT_SWAPPED:
-                            grid(iCopy).atomicLong("swapEvts", 0, false).incrementAndGet();
-
-                            break;
-                        case EVT_CACHE_OBJECT_UNSWAPPED:
-                            grid(iCopy).atomicLong("unswapEvts", 0, false).incrementAndGet();
-
-                            break;
-                    }
-
-                    return true;
-                }
-            }, EVT_CACHE_OBJECT_SWAPPED, EVT_CACHE_OBJECT_UNSWAPPED);
-        }
+        for (int i = 0; i < gridCount(); i++)
+            grid(i).events().localListen(
+                    new SwapUnswapLocalListener(), EVT_CACHE_OBJECT_SWAPPED, EVT_CACHE_OBJECT_UNSWAPPED);
 
         jcache().put("key", 1);
 
@@ -254,13 +226,8 @@ public class GridCachePartitionedMultiNodeFullApiSelfTest extends GridCacheParti
 
             boolean nearEnabled = nearEnabled(c);
 
-            if (nearEnabled) {
-                executeOnLocalOrRemoteJvm(i, new TestIgniteIdxRunnable() {
-                    @Override public void run(int idx) throws Exception {
-                        assertTrue(((IgniteKernal)ignite(idx)).internalCache().context().isNear());
-                    }
-                });
-            }
+            if (nearEnabled)
+                executeOnLocalOrRemoteJvm(i, new IsNearTask());
 
             Integer nearPeekVal = nearEnabled ? 1 : null;
 
@@ -476,4 +443,74 @@ public class GridCachePartitionedMultiNodeFullApiSelfTest extends GridCacheParti
             assertFalse(affinity(cache).isPrimaryOrBackup(other, key));
         }
     }
+
+    /**
+     *
+     */
+    private static class SwapUnswapLocalListener implements IgnitePredicate<Event> {
+        /** Logger. */
+        @LoggerResource
+        private IgniteLogger log;
+
+        /** Ignite. */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(Event evt) {
+            log.info("Received event: " + evt);
+
+            switch (evt.type()) {
+                case EVT_CACHE_OBJECT_SWAPPED:
+                    ignite.atomicLong("swapEvts", 0, false).incrementAndGet();
+
+                    break;
+                case EVT_CACHE_OBJECT_UNSWAPPED:
+                    ignite.atomicLong("unswapEvts", 0, false).incrementAndGet();
+
+                    break;
+            }
+
+            return true;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class CheckAffinityTask extends TestIgniteIdxRunnable {
+        /** Size. */
+        private final int size;
+
+        /**
+         * @param size Size.
+         */
+        public CheckAffinityTask(int size) {
+            this.size = size;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run(int idx) throws Exception {
+            assertEquals(0, ((IgniteKernal)ignite).<String, Integer>internalCache().context().tm().idMapSize());
+
+            IgniteCache<Object, Object> cache = ignite.cache(null);
+            ClusterNode node = ((IgniteKernal)ignite).localNode();
+
+            for (int k = 0; k < size; k++) {
+                if (affinity(cache).isPrimaryOrBackup(node, k))
+                    assertEquals("Check failed for node: " + node.id(), k,
+                        cache.localPeek(k, CachePeekMode.ONHEAP, CachePeekMode.OFFHEAP));
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private static class IsNearTask extends TestIgniteIdxRunnable {
+        /** {@inheritDoc} */
+        @Override public void run(int idx) throws Exception {
+            assertTrue(((IgniteKernal)ignite).internalCache().context().isNear());
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e5cc57/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index f54fe06..d133a84 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -73,6 +73,7 @@ import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.marshaller.MarshallerExclusions;
 import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.spi.checkpoint.sharedfs.SharedFsCheckpointSpi;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -1471,6 +1472,8 @@ public abstract class GridAbstractTest extends TestCase {
 
         if (!isMultiJvmObject(ignite))
             try {
+                job.setIgnite(ignite);
+
                 return job.call(idx);
             }
             catch (Exception e) {
@@ -1532,11 +1535,7 @@ public abstract class GridAbstractTest extends TestCase {
 
         IgniteProcessProxy proxy = (IgniteProcessProxy)ignite;
 
-        return proxy.remoteCompute().call(new IgniteCallable<R>() {
-            @Override public R call() throws Exception {
-                return job.call(idx);
-            }
-        });
+        return proxy.remoteCompute().call(new ExecuteRemotelyTask<>(job, idx));
     }
 
     /**
@@ -1546,15 +1545,7 @@ public abstract class GridAbstractTest extends TestCase {
      * @param job Job.
      */
     public static <R> R executeRemotely(IgniteProcessProxy proxy, final TestIgniteCallable<R> job) {
-        final UUID id = proxy.getId();
-
-        return proxy.remoteCompute().call(new IgniteCallable<R>() {
-            @Override public R call() throws Exception {
-                Ignite ignite = Ignition.ignite(id);
-
-                return job.call(ignite);
-            }
-        });
+        return proxy.remoteCompute().call(new TestRemoteTask<>(proxy.getId(), job));
     }
 
     /**
@@ -1571,6 +1562,8 @@ public abstract class GridAbstractTest extends TestCase {
         final String cacheName = cache.getName();
 
         return proxy.remoteCompute().call(new IgniteCallable<R>() {
+            private static final long serialVersionUID = -3868429485920845137L;
+
             @Override public R call() throws Exception {
                 Ignite ignite = Ignition.ignite(id);
                 IgniteCache<K,V> cache = ignite.cache(cacheName);
@@ -1745,6 +1738,22 @@ public abstract class GridAbstractTest extends TestCase {
     }
 
     /**
+     * @param name Name.
+     * @param remote Remote.
+     * @param thisRemote This remote.
+     */
+    public static IgniteEx grid(String name, boolean remote, boolean thisRemote) {
+        if (!remote)
+            return (IgniteEx)G.ignite(name);
+        else {
+            if (thisRemote)
+                return IgniteNodeRunner.startedInstance();
+            else
+                return IgniteProcessProxy.ignite(name);
+        }
+    }
+
+    /**
      *
      */
     private static interface WriteReplaceOwner {
@@ -1781,6 +1790,67 @@ public abstract class GridAbstractTest extends TestCase {
     }
 
     /**
+     * Remote computation task.
+     */
+    private static class TestRemoteTask<R> implements IgniteCallable<R> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Node ID. */
+        private final UUID id;
+
+        /** Job. */
+        private final TestIgniteCallable<R> job;
+
+        /**
+         * @param id Id.
+         * @param job Job.
+         */
+        public TestRemoteTask(UUID id, TestIgniteCallable<R> job) {
+            this.id = id;
+            this.job = job;
+        }
+
+        /** {@inheritDoc} */
+        @Override public R call() throws Exception {
+            Ignite ignite = Ignition.ignite(id);
+
+            return job.call(ignite);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class ExecuteRemotelyTask<R> implements IgniteCallable<R> {
+        /** Ignite. */
+        @IgniteInstanceResource
+        protected Ignite ignite;
+
+        /** Job. */
+        private final TestIgniteIdxCallable<R> job;
+
+        /** Index. */
+        private final int idx;
+
+        /**
+         * @param job Job.
+         * @param idx Index.
+         */
+        public ExecuteRemotelyTask(TestIgniteIdxCallable<R> job, int idx) {
+            this.job = job;
+            this.idx = idx;
+        }
+
+        /** {@inheritDoc} */
+        @Override public R call() throws Exception {
+            job.setIgnite(ignite);
+
+            return job.call(idx);
+        }
+    }
+
+    /**
      * Test counters.
      */
     protected class TestCounters {
@@ -1923,17 +1993,27 @@ public abstract class GridAbstractTest extends TestCase {
     }
 
     /** */
-    public static interface TestIgniteIdxCallable<R> extends Serializable {
+    public static abstract class TestIgniteIdxCallable<R> implements Serializable {
+        @IgniteInstanceResource
+        protected Ignite ignite;
+
+        /**
+         * @param ignite Ignite.
+         */
+        public void setIgnite(Ignite ignite) {
+            this.ignite = ignite;
+        }
+
         /**
          * @param idx Grid index.
          */
-        R call(int idx) throws Exception;
+        protected abstract R call(int idx) throws Exception;
     }
 
     /** */
-    public abstract static class TestIgniteIdxRunnable implements TestIgniteIdxCallable<Object> {
+    public abstract static class TestIgniteIdxRunnable extends TestIgniteIdxCallable<Void> {
         /** {@inheritDoc} */
-        @Override public Object call(int idx) throws Exception {
+        @Override public Void call(int idx) throws Exception {
             run(idx);
 
             return null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e5cc57/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java
index eb72252..406318f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java
@@ -42,6 +42,9 @@ import org.jetbrains.annotations.Nullable;
  * Test resources for injection.
  */
 public class IgniteTestResources {
+    /** Marshaller class name. */
+    public static final String MARSH_CLASS_NAME = "test.marshaller.class";
+
     /** */
     private static final IgniteLogger rootLog = new GridTestLog4jLogger(false);
 
@@ -230,8 +233,9 @@ public class IgniteTestResources {
      * @throws IgniteCheckedException If failed.
      */
     @SuppressWarnings("unchecked")
-    public synchronized Marshaller getMarshaller() throws IgniteCheckedException {
-        String marshallerName = GridTestProperties.getProperty(GridTestProperties.MARSH_CLASS_NAME);
+    public static synchronized Marshaller getMarshaller() throws IgniteCheckedException {
+        String marshallerName =
+            System.getProperty(MARSH_CLASS_NAME, GridTestProperties.getProperty(GridTestProperties.MARSH_CLASS_NAME));
 
         Marshaller marsh;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e5cc57/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 4bcf51e..e4c2129 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -246,10 +246,12 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
     protected static <K, V> boolean nearEnabled(final IgniteCache<K,V> cache) {
         CacheConfiguration cfg = GridAbstractTest.executeOnLocalOrRemoteJvm(cache,
             new TestCacheCallable<K, V, CacheConfiguration>() {
-            @Override public CacheConfiguration call(Ignite ignite, IgniteCache<K, V> cache) throws Exception {
-                return ((IgniteKernal)ignite).<K, V>internalCache(cache.getName()).context().config();
-            }
-        });
+                private static final long serialVersionUID = 0L;
+
+                @Override public CacheConfiguration call(Ignite ignite, IgniteCache<K, V> cache) throws Exception {
+                    return ((IgniteKernal)ignite).<K, V>internalCache(cache.getName()).context().config();
+                }
+            });
 
         return isNearEnabled(cfg);
     }
@@ -285,10 +287,13 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
      * @throws Exception If failed.
      */
     @SuppressWarnings("unchecked")
-    protected static <K> void loadAll(Cache<K, ?> cache, final Set<K> keys, final boolean replaceExistingValues) throws Exception {
+    protected static <K> void loadAll(Cache<K, ?> cache, final Set<K> keys, final boolean replaceExistingValues)
+        throws Exception {
         IgniteCache<K, Object> cacheCp = (IgniteCache<K, Object>)cache;
 
         GridAbstractTest.executeOnLocalOrRemoteJvm(cacheCp, new TestCacheRunnable<K, Object>() {
+            private static final long serialVersionUID = -3030833765012500545L;
+
             @Override public void run(Ignite ignite, IgniteCache<K, Object> cache) throws Exception {
                 final AtomicReference<Exception> ex = new AtomicReference<>();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e5cc57/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/AffinityProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/AffinityProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/AffinityProcessProxy.java
index e1959e5..57fbcfc 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/AffinityProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/AffinityProcessProxy.java
@@ -19,12 +19,12 @@ package org.apache.ignite.testframework.junits.multijvm;
 
 import java.util.Collection;
 import java.util.Map;
-import java.util.UUID;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCompute;
-import org.apache.ignite.Ignition;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.resources.IgniteInstanceResource;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -38,160 +38,388 @@ public class AffinityProcessProxy<K> implements Affinity<K> {
     /** Cache name. */
     private final String cacheName;
 
-    /** Grid id. */
-    private final UUID gridId;
-
     /**
      * @param cacheName Cache name.
-     * @param proxy Ignite ptocess proxy.
+     * @param proxy Ignite process proxy.
      */
     public AffinityProcessProxy(String cacheName, IgniteProcessProxy proxy) {
         this.cacheName = cacheName;
-        gridId = proxy.getId();
-        compute = proxy.remoteCompute();
-    }
-
-    /**
-     * Returns cache instance. Method to be called from closure at another JVM.
-     *
-     * @return Cache.
-     */
-    private Affinity<Object> affinity() {
-        return Ignition.ignite(gridId).affinity(cacheName);
+        this.compute = proxy.remoteCompute();
     }
 
     /** {@inheritDoc} */
     @Override public int partitions() {
-        return (int)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return affinity().partitions();
-            }
-        });
+        return compute.call(new PartitionsTask(cacheName));
     }
 
     /** {@inheritDoc} */
-    @Override public int partition(final K key) {
-        return (int)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return affinity().partition(key);
-            }
-        });
+    @Override public int partition(K key) {
+        return compute.call(new PartitionTask<>(cacheName, key));
     }
 
     /** {@inheritDoc} */
-    @Override public boolean isPrimary(final ClusterNode n, final K key) {
-        return (boolean)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return affinity().isPrimary(n, key);
-            }
-        });
+    @Override public boolean isPrimary(ClusterNode n, K key) {
+        return compute.call(new PrimaryOrBackupNodeTask<>(cacheName, key, n, true, false));
     }
 
     /** {@inheritDoc} */
-    @Override public boolean isBackup(final ClusterNode n, final K key) {
-        return (boolean)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return affinity().isBackup(n, key);
-            }
-        });
+    @Override public boolean isBackup(ClusterNode n, K key) {
+        return compute.call(new PrimaryOrBackupNodeTask<>(cacheName, key, n, false, true));
     }
 
     /** {@inheritDoc} */
-    @Override public boolean isPrimaryOrBackup(final ClusterNode n, final K key) {
-        return (boolean)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return affinity().isPrimaryOrBackup(n, key);
-            }
-        });
+    @Override public boolean isPrimaryOrBackup(ClusterNode n, K key) {
+        return compute.call(new PrimaryOrBackupNodeTask<>(cacheName, key, n, true, true));
     }
 
     /** {@inheritDoc} */
-    @Override public int[] primaryPartitions(final ClusterNode n) {
-        return (int[])compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return affinity().primaryPartitions(n);
-            }
-        });
+    @Override public int[] primaryPartitions(ClusterNode n) {
+        return compute.call(new GetPartitionsTask(cacheName, n, true, false));
     }
 
     /** {@inheritDoc} */
-    @Override public int[] backupPartitions(final ClusterNode n) {
-        return (int[])compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return affinity().backupPartitions(n);
-            }
-        });
+    @Override public int[] backupPartitions(ClusterNode n) {
+        return compute.call(new GetPartitionsTask(cacheName, n, false, true));
     }
 
     /** {@inheritDoc} */
-    @Override public int[] allPartitions(final ClusterNode n) {
-        return (int[])compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return affinity().allPartitions(n);
-            }
-        });
+    @Override public int[] allPartitions(ClusterNode n) {
+        return compute.call(new GetPartitionsTask(cacheName, n, true, true));
     }
 
     /** {@inheritDoc} */
-    @Override public Object affinityKey(final K key) {
-        return compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return affinity().affinityKey(key);
-            }
-        });
+    @Override public Object affinityKey(K key) {
+        return compute.call(new AffinityKeyTask<>(cacheName, key));
     }
 
     /** {@inheritDoc} */
-    @Override public Map<ClusterNode, Collection<K>> mapKeysToNodes(final Collection<? extends K> keys) {
-        return (Map<ClusterNode, Collection<K>>)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return affinity().mapKeysToNodes(keys);
-            }
-        });
+    @Override public Map<ClusterNode, Collection<K>> mapKeysToNodes(Collection<? extends K> keys) {
+        return compute.call(new MapKeysToNodesTask<>(cacheName, keys));
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public ClusterNode mapKeyToNode(final K key) {
-        return (ClusterNode)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return affinity().mapKeyToNode(key);
-            }
-        });
+    @Nullable @Override public ClusterNode mapKeyToNode(K key) {
+        return compute.call(new MapKeyToNodeTask<>(cacheName, key));
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<ClusterNode> mapKeyToPrimaryAndBackups(final K key) {
-        return (Collection<ClusterNode>)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return affinity().mapKeyToPrimaryAndBackups(key);
-            }
-        });
+    @Override public Collection<ClusterNode> mapKeyToPrimaryAndBackups(K key) {
+        return compute.call(new MapKeyToPrimaryAndBackupsTask<>(cacheName, key));
     }
 
     /** {@inheritDoc} */
-    @Override public ClusterNode mapPartitionToNode(final int part) {
-        return (ClusterNode)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return affinity().mapPartitionToNode(part);
-            }
-        });
+    @Override public ClusterNode mapPartitionToNode(int part) {
+        return compute.call(new MapPartitionToNode<>(cacheName, part));
     }
 
     /** {@inheritDoc} */
-    @Override public Map<Integer, ClusterNode> mapPartitionsToNodes(final Collection<Integer> parts) {
-        return (Map<Integer, ClusterNode>)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return affinity().mapPartitionsToNodes(parts);
-            }
-        });
+    @Override public Map<Integer, ClusterNode> mapPartitionsToNodes(Collection<Integer> parts) {
+        return compute.call(new MapPartitionsToNodes<>(cacheName, parts));
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<ClusterNode> mapPartitionToPrimaryAndBackups(final int part) {
-        return (Collection<ClusterNode>)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return affinity().mapPartitionToPrimaryAndBackups(part);
-            }
-        });
+    @Override public Collection<ClusterNode> mapPartitionToPrimaryAndBackups(int part) {
+        return compute.call(new MapPartitionsToPrimaryAndBackupsTask<>(cacheName, part));
+    }
+
+    /**
+     *
+     */
+    private static class PrimaryOrBackupNodeTask<K> extends AffinityTaskAdapter<K, Boolean> {
+        /** Key. */
+        private final K key;
+
+        /** Node. */
+        private final ClusterNode n;
+
+        /** Primary. */
+        private final boolean primary;
+
+        /** Backup. */
+        private final boolean backup;
+
+        /**
+         * @param cacheName Cache name.
+         * @param key Key.
+         * @param n N.
+         */
+        public PrimaryOrBackupNodeTask(String cacheName, K key, ClusterNode n,
+            boolean primary, boolean backup) {
+            super(cacheName);
+            this.key = key;
+            this.n = n;
+            this.primary = primary;
+            this.backup = backup;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Boolean call() throws Exception {
+            if (primary && backup)
+                return affinity().isPrimaryOrBackup(n, key);
+            else if (primary)
+                return affinity().isPrimary(n, key);
+            else if (backup)
+                return affinity().isBackup(n, key);
+            else
+                throw new IllegalStateException("primary or backup or both flags should be switched on");
+        }
+    }
+
+    /**
+     *
+     */
+    private static class MapKeyToPrimaryAndBackupsTask<K> extends AffinityTaskAdapter<K, Collection<ClusterNode>> {
+        /** Key. */
+        private final K key;
+
+        /**
+         * @param cacheName Cache name.
+         * @param key Key.
+         */
+        public MapKeyToPrimaryAndBackupsTask(String cacheName, K key) {
+            super(cacheName);
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<ClusterNode> call() throws Exception {
+            return affinity().mapKeyToPrimaryAndBackups(key);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class PartitionsTask extends AffinityTaskAdapter<Void, Integer> {
+        /**
+         * @param cacheName Cache name.
+         */
+        public PartitionsTask(String cacheName) {
+            super(cacheName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Integer call() throws Exception {
+            return affinity().partitions();
+        }
+    }
+
+    /**
+     *
+     */
+    private static class PartitionTask<K> extends AffinityTaskAdapter<K, Integer> {
+        /** Key. */
+        private final K key;
+
+        /**
+         * @param cacheName Cache name.
+         * @param key Key.
+         */
+        public PartitionTask(String cacheName, K key) {
+            super(cacheName);
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Integer call() throws Exception {
+            return affinity().partition(key);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class GetPartitionsTask extends AffinityTaskAdapter<Void, int[]> {
+        /** Node. */
+        private final ClusterNode n;
+
+        /** Primary. */
+        private final boolean primary;
+
+        /** Backup. */
+        private final boolean backup;
+
+        /**
+         * @param cacheName Cache name.
+         * @param n N.
+         * @param primary Primary.
+         * @param backup Backup.
+         */
+        public GetPartitionsTask(String cacheName, ClusterNode n, boolean primary, boolean backup) {
+            super(cacheName);
+            this.n = n;
+            this.primary = primary;
+            this.backup = backup;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int[] call() throws Exception {
+            if (primary && backup)
+                return affinity().allPartitions(n);
+            else if (primary)
+                return affinity().primaryPartitions(n);
+            else if (backup)
+                return affinity().backupPartitions(n);
+            else
+                throw new IllegalStateException("primary or backup or both flags should be switched on");
+        }
+    }
+
+    /**
+     *
+     */
+    private static class AffinityKeyTask<K> extends AffinityTaskAdapter<K, Object> {
+        /** Key. */
+        private final K key;
+
+        /**
+         * @param cacheName Cache name.
+         * @param key Key.
+         */
+        public AffinityKeyTask(String cacheName, K key) {
+            super(cacheName);
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object call() throws Exception {
+            return affinity().affinityKey(key);
+        }
+    }
+
+    /**
+     * @param <K>
+     */
+    private static class MapKeysToNodesTask<K> extends AffinityTaskAdapter<K, Map<ClusterNode, Collection<K>>> {
+        /** Keys. */
+        private final Collection<? extends K> keys;
+
+        /**
+         * @param cacheName Cache name.
+         * @param keys Keys.
+         */
+        public MapKeysToNodesTask(String cacheName, Collection<? extends K> keys) {
+            super(cacheName);
+            this.keys = keys;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Map<ClusterNode, Collection<K>> call() throws Exception {
+            return affinity().mapKeysToNodes(keys);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class MapKeyToNodeTask<K> extends AffinityTaskAdapter<K, ClusterNode> {
+        /** Key. */
+        private final K key;
+
+        /**
+         * @param cacheName Cache name.
+         * @param key Key.
+         */
+        public MapKeyToNodeTask(String cacheName, K key) {
+            super(cacheName);
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public ClusterNode call() throws Exception {
+            return affinity().mapKeyToNode(key);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class MapPartitionToNode<K> extends AffinityTaskAdapter<K, ClusterNode> {
+        /** Partition. */
+        private final int part;
+
+        /**
+         * @param cacheName Cache name.
+         * @param part Partition.
+         */
+        public MapPartitionToNode(String cacheName, int part) {
+            super(cacheName);
+            this.part = part;
+        }
+
+        /** {@inheritDoc} */
+        @Override public ClusterNode call() throws Exception {
+            return affinity().mapPartitionToNode(part);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class MapPartitionsToNodes<K> extends AffinityTaskAdapter<K, Map<Integer, ClusterNode>> {
+        /** Parts. */
+        private final Collection<Integer> parts;
+
+        /**
+         * @param cacheName Cache name.
+         * @param parts Parts.
+         */
+        public MapPartitionsToNodes(String cacheName, Collection<Integer> parts) {
+            super(cacheName);
+            this.parts = parts;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Map<Integer, ClusterNode> call() throws Exception {
+            return affinity().mapPartitionsToNodes(parts);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class MapPartitionsToPrimaryAndBackupsTask<K> extends AffinityTaskAdapter<K, Collection<ClusterNode>> {
+        /** Partition. */
+        private final int part;
+
+        /**
+         * @param cacheName Cache name.
+         * @param part Partition.
+         */
+        public MapPartitionsToPrimaryAndBackupsTask(String cacheName, int part) {
+            super(cacheName);
+            this.part = part;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<ClusterNode> call() throws Exception {
+            return affinity().mapPartitionToPrimaryAndBackups(part);
+        }
+    }
+
+    /**
+     *
+     */
+    private abstract static class AffinityTaskAdapter<K, R> implements IgniteCallable<R> {
+        /** Ignite. */
+        @IgniteInstanceResource
+        protected Ignite ignite;
+
+        /** Cache name. */
+        protected final String cacheName;
+
+        /**
+         * @param cacheName Cache name.
+         */
+        public AffinityTaskAdapter(String cacheName) {
+            this.cacheName = cacheName;
+        }
+
+        /**
+         * @return Affinity.
+         */
+        protected Affinity<K> affinity() {
+            return ignite.affinity(cacheName);
+        }
     }
 }
\ No newline at end of file


[7/7] ignite git commit: Merge remote-tracking branch 'remotes/origin/master' into ignite-1093-2

Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-1093-2


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d8bfdce2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d8bfdce2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d8bfdce2

Branch: refs/heads/ignite-1093-2
Commit: d8bfdce2e3718db1d0d078539d006e960a0e95c0
Parents: f2da44d a1e5cc5
Author: Anton Vinogradov <av...@apache.org>
Authored: Fri Oct 9 17:46:27 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Fri Oct 9 17:46:27 2015 +0300

----------------------------------------------------------------------
 .../JettyRestProcessorAbstractSelfTest.java     |   27 +-
 .../apache/ignite/IgniteSystemProperties.java   |    3 +
 .../discovery/GridDiscoveryManager.java         |   17 +-
 .../internal/portable/PortableContext.java      |    7 +
 .../portable/api/PortableMarshaller.java        |   14 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |    8 +-
 .../colocated/GridDhtColocatedLockFuture.java   |   11 +-
 .../distributed/near/GridNearLockFuture.java    |   11 +-
 .../near/GridNearOptimisticTxPrepareFuture.java |   24 +-
 .../datastructures/DataStructuresProcessor.java |   48 +-
 .../processors/rest/GridRestProcessor.java      |  356 ++++-
 .../handlers/cache/GridCacheCommandHandler.java |    2 +-
 .../ignite/internal/util/GridJavaProcess.java   |   12 +-
 .../ignite/internal/util/IgniteUtils.java       |    2 +-
 .../ignite/internal/util/lang/GridFunc.java     |   12 +
 .../apache/ignite/marshaller/Marshaller.java    |    2 +-
 .../optimized/OptimizedMarshallerUtils.java     |    6 +-
 .../cache/GridCacheAbstractFullApiSelfTest.java |  486 ++++---
 .../CacheGetFutureHangsSelfTest.java            |    3 +
 .../CacheNoValueClassOnServerNodeTest.java      |    1 +
 ...niteCacheClientNodeChangingTopologyTest.java |    6 +-
 ...gniteAtomicLongChangingTopologySelfTest.java |  155 +-
 ...tomicClientOnlyMultiNodeFullApiSelfTest.java |   71 +-
 ...achePartitionedMultiNodeFullApiSelfTest.java |  129 +-
 .../tcp/TcpClientDiscoverySpiSelfTest.java      |    2 +
 .../testframework/junits/GridAbstractTest.java  |  116 +-
 .../junits/IgniteTestResources.java             |    8 +-
 .../junits/common/GridCommonAbstractTest.java   |   15 +-
 .../junits/multijvm/AffinityProcessProxy.java   |  440 ++++--
 .../multijvm/IgniteCacheProcessProxy.java       | 1346 ++++++++++++++----
 .../multijvm/IgniteClusterProcessProxy.java     |  115 +-
 .../multijvm/IgniteEventsProcessProxy.java      |   50 +-
 .../junits/multijvm/IgniteNodeRunner.java       |   39 +-
 .../junits/multijvm/IgniteProcessProxy.java     |  107 +-
 .../cache/CacheConfigurationP2PTest.java        |    3 +
 .../visor/commands/kill/VisorKillCommand.scala  |    2 +-
 .../scala/org/apache/ignite/visor/visor.scala   |    1 -
 37 files changed, 2745 insertions(+), 912 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d8bfdce2/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------


[4/7] ignite git commit: ignite-master Minor fix in visorcmd.

Posted by sb...@apache.org.
ignite-master Minor fix in visorcmd.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d9177333
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d9177333
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d9177333

Branch: refs/heads/ignite-1093-2
Commit: d9177333814450ad5a7b2ea88464bbbb2664b959
Parents: 3036c8d
Author: Andrey <an...@gridgain.com>
Authored: Fri Oct 9 10:13:19 2015 +0700
Committer: Andrey <an...@gridgain.com>
Committed: Fri Oct 9 10:13:19 2015 +0700

----------------------------------------------------------------------
 .../org/apache/ignite/visor/commands/kill/VisorKillCommand.scala   | 2 +-
 .../src/main/scala/org/apache/ignite/visor/visor.scala             | 1 -
 2 files changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d9177333/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/kill/VisorKillCommand.scala
----------------------------------------------------------------------
diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/kill/VisorKillCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/kill/VisorKillCommand.scala
index 6cd1ffa..dffd5f1 100644
--- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/kill/VisorKillCommand.scala
+++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/kill/VisorKillCommand.scala
@@ -131,7 +131,7 @@ class VisorKillCommand extends VisorConsoleCommand {
             if (kill && restart)
                 scold("Only one of '-k' or '-r' can be specified.")
             else if (!kill && !restart)
-                scold("Invalid command arguments: " + args)
+                scold("Missing '-k' or '-r' option in command: " + args)
             else if (id8.isDefined && id.isDefined)
                 scold("Only one of -id8 or -id is allowed.")
             else {

http://git-wip-us.apache.org/repos/asf/ignite/blob/d9177333/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala
----------------------------------------------------------------------
diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala
index 7bd6818..28ef0d7 100644
--- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala
+++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala
@@ -461,7 +461,6 @@ object visor extends VisorTag {
         shortInfo = "Starts or stops grid-wide events logging.",
         longInfo = Seq(
             "Logging of discovery and failure grid-wide events.",
-            "Logging starts by default when Visor starts.",
             " ",
             "Events are logged to a file. If path is not provided,",
             "it will log into '<Ignite home folder>/work/visor/visor-log'.",


[2/7] ignite git commit: Fix REST authentication.

Posted by sb...@apache.org.
Fix REST authentication.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f025714e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f025714e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f025714e

Branch: refs/heads/ignite-1093-2
Commit: f025714ef0d21ffff7ebf6088c254f26dd9aa3fc
Parents: f60cba7
Author: ashutak <as...@gridgain.com>
Authored: Wed Oct 7 16:11:18 2015 +0300
Committer: ashutak <as...@gridgain.com>
Committed: Wed Oct 7 16:11:18 2015 +0300

----------------------------------------------------------------------
 .../JettyRestProcessorAbstractSelfTest.java     |  27 +-
 .../apache/ignite/IgniteSystemProperties.java   |   3 +
 .../processors/rest/GridRestProcessor.java      | 356 +++++++++++++++++--
 .../handlers/cache/GridCacheCommandHandler.java |   2 +-
 .../ignite/internal/util/IgniteUtils.java       |   2 +-
 5 files changed, 341 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f025714e/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
index 8db4cd7..ac0edff 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
@@ -84,6 +84,13 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
     protected abstract int restPort();
 
     /**
+     * @return Security enabled flag. Should be the same with {@code ctx.security().enabled()}.
+     */
+    protected boolean securityEnabled() {
+        return false;
+    }
+
+    /**
      * @param params Command parameters.
      * @return Returned content.
      * @throws Exception If failed.
@@ -133,7 +140,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
         return "\\{\\\"affinityNodeId\\\":\\\"\\w{8}-\\w{4}-\\w{4}-\\w{4}-\\w{12}\\\"\\," +
             "\\\"error\\\":\\\"\\\"\\," +
             "\\\"response\\\":\\\"" + res + "\\\"\\," +
-            "\\\"sessionToken\\\":\\\"\\\"," +
+            "\\\"sessionToken\\\":\\\"" + (securityEnabled() && success ? ".+" : "") + "\\\"," +
             "\\\"successStatus\\\":" + (success ? 0 : 1) + "\\}";
     }
 
@@ -157,7 +164,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
     private String integerPattern(int res, boolean success) {
         return "\\{\\\"error\\\":\\\"\\\"\\," +
             "\\\"response\\\":" + res + "\\," +
-            "\\\"sessionToken\\\":\\\"\\\"," +
+            "\\\"sessionToken\\\":\\\"" + (securityEnabled() && success ? ".+" : "") + "\\\"," +
             "\\\"successStatus\\\":" + (success ? 0 : 1) + "\\}";
     }
 
@@ -170,7 +177,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
         return "\\{\\\"affinityNodeId\\\":\\\"\\\"\\," +
             "\\\"error\\\":\\\"\\\"\\," +
             "\\\"response\\\":" + res + "\\," +
-            "\\\"sessionToken\\\":\\\"\\\"," +
+            "\\\"sessionToken\\\":\\\"" + (securityEnabled() && success ? ".+" : "") + "\\\"," +
             "\\\"successStatus\\\":" + (success ? 0 : 1) + "\\}";
     }
 
@@ -183,7 +190,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
         return "\\{\\\"affinityNodeId\\\":\\\"\\\"\\," +
             "\\\"error\\\":\\\"\\\"\\," +
             "\\\"response\\\":" + res + "\\," +
-            "\\\"sessionToken\\\":\\\"\\\"," +
+            "\\\"sessionToken\\\":\\\"" + (securityEnabled() && success ? ".+" : "") + "\\\"," +
             "\\\"successStatus\\\":" + (success ? 0 : 1) + "\\}";
     }
 
@@ -196,7 +203,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
         return "\\{\\\"affinityNodeId\\\":\\\"\\w{8}-\\w{4}-\\w{4}-\\w{4}-\\w{12}\\\"\\," +
             "\\\"error\\\":\\\"\\\"\\," +
             "\\\"response\\\":" + res + "\\," +
-            "\\\"sessionToken\\\":\\\"\\\"," +
+            "\\\"sessionToken\\\":\\\"" + (securityEnabled() && success ? ".+" : "") + "\\\"," +
             "\\\"successStatus\\\":" + (success ? 0 : 1) + "\\}";
     }
 
@@ -209,7 +216,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
         return "\\{\\\"affinityNodeId\\\":\\\"\\\"\\," +
             "\\\"error\\\":\\\"\\\"\\," +
             "\\\"response\\\":" + res + "\\," +
-            "\\\"sessionToken\\\":\\\"\\\"," +
+            "\\\"sessionToken\\\":\\\"" + (securityEnabled() && success ? ".+" : "") + "\\\"," +
             "\\\"successStatus\\\":" + (success ? 0 : 1) + "\\}";
     }
 
@@ -222,7 +229,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
         return "\\{\\\"affinityNodeId\\\":\\\"(\\w{8}-\\w{4}-\\w{4}-\\w{4}-\\w{12})?\\\"\\," +
             "\\\"error\\\":\\\"\\\"\\," +
             "\\\"response\\\":" + res + "\\," +
-            "\\\"sessionToken\\\":\\\"\\\"," +
+            "\\\"sessionToken\\\":\\\"" + (securityEnabled() && success ? ".+" : "") + "\\\"," +
             "\\\"successStatus\\\":" + (success ? 0 : 1) + "\\}";
     }
 
@@ -234,7 +241,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
     private String pattern(String res, boolean success) {
         return "\\{\\\"error\\\":\\\"" + (!success ? ".+" : "") + "\\\"\\," +
             "\\\"response\\\":" + res + "\\," +
-            "\\\"sessionToken\\\":\\\"\\\"," +
+            "\\\"sessionToken\\\":\\\"" + (securityEnabled() && success ? ".+" : "") + "\\\"," +
             "\\\"successStatus\\\":" + (success ? 0 : 1) + "\\}";
     }
 
@@ -246,7 +253,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
     private String stringPattern(String res, boolean success) {
         return "\\{\\\"error\\\":\\\"" + (!success ? ".+" : "") + "\\\"\\," +
             "\\\"response\\\":\\\"" + res + "\\\"\\," +
-            "\\\"sessionToken\\\":\\\"\\\"," +
+            "\\\"sessionToken\\\":\\\"" + (securityEnabled() && success ? ".+" : "") + "\\\"," +
             "\\\"successStatus\\\":" + (success ? 0 : 1) + "\\}";
     }
 
@@ -1316,4 +1323,4 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
             return id;
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f025714e/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 1e4c8b7..5d3b08b 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -96,6 +96,9 @@ public final class IgniteSystemProperties {
      */
     public static final String IGNITE_JETTY_LOG_NO_OVERRIDE = "IGNITE_JETTY_LOG_NO_OVERRIDE";
 
+    /** This property allow rewriting default ({@code 30}) rest session expire time (in seconds). */
+    public static final String IGNITE_REST_SESSION_TIMEOUT = "IGNITE_REST_SESSION_TIMEOUT";
+
     /**
      * This property allows to override maximum count of task results stored on one node
      * in REST processor.

http://git-wip-us.apache.org/repos/asf/ignite/blob/f025714e/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
index d606ba4..d54c8bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.rest;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.EnumMap;
 import java.util.HashMap;
@@ -30,7 +31,9 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.configuration.ConnectorConfiguration;
 import org.apache.ignite.configuration.ConnectorMessageInterceptor;
 import org.apache.ignite.internal.GridKernalContext;
@@ -53,8 +56,10 @@ import org.apache.ignite.internal.processors.security.SecurityContext;
 import org.apache.ignite.internal.util.GridSpinReadWriteLock;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.typedef.C1;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.LT;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.internal.util.worker.GridWorkerFuture;
@@ -65,6 +70,7 @@ import org.apache.ignite.plugin.security.AuthenticationContext;
 import org.apache.ignite.plugin.security.SecurityCredentials;
 import org.apache.ignite.plugin.security.SecurityException;
 import org.apache.ignite.plugin.security.SecurityPermission;
+import org.apache.ignite.thread.IgniteThread;
 import org.jsr166.LongAdder8;
 
 import static org.apache.ignite.internal.processors.rest.GridRestResponse.STATUS_AUTH_FAILED;
@@ -80,8 +86,11 @@ public class GridRestProcessor extends GridProcessorAdapter {
     private static final String HTTP_PROTO_CLS =
         "org.apache.ignite.internal.processors.rest.protocols.http.jetty.GridJettyRestProtocol";
 
-    /** */
-    public static final byte[] ZERO_BYTES = new byte[0];
+    /** Delay between sessions timeout checks. */
+    private static final int SES_TIMEOUT_CHECK_DELAY = 1_000;
+
+    /** Default session timout. */
+    private static final int DEFAULT_SES_TIMEOUT = 30_000;
 
     /** Protocols. */
     private final Collection<GridRestProtocol> protos = new ArrayList<>();
@@ -98,8 +107,14 @@ public class GridRestProcessor extends GridProcessorAdapter {
     /** Workers count. */
     private final LongAdder8 workersCnt = new LongAdder8();
 
-    /** SecurityContext map. */
-    private ConcurrentMap<UUID, SecurityContext> sesMap = new ConcurrentHashMap<>();
+    /** ClientId-SessionId map. */
+    private final ConcurrentMap<UUID, UUID> clientId2SesId = new ConcurrentHashMap<>();
+
+    /** SessionId-Session map. */
+    private final ConcurrentMap<UUID, Session> sesId2Ses = new ConcurrentHashMap<>();
+
+    /** */
+    private final Thread sesTimeoutCheckerThread;
 
     /** Protocol handler. */
     private final GridRestProtocolHandler protoHnd = new GridRestProtocolHandler() {
@@ -112,6 +127,9 @@ public class GridRestProcessor extends GridProcessorAdapter {
         }
     };
 
+    /** Session time to live. */
+    private final long sesTtl;
+
     /**
      * @param req Request.
      * @return Future.
@@ -195,22 +213,40 @@ public class GridRestProcessor extends GridProcessorAdapter {
         if (log.isDebugEnabled())
             log.debug("Received request from client: " + req);
 
-        SecurityContext subjCtx = null;
-
         if (ctx.security().enabled()) {
+            Session ses;
+
             try {
-                subjCtx = authenticate(req);
+                ses = session(req);
+            }
+            catch (IgniteCheckedException e) {
+                GridRestResponse res = new GridRestResponse(STATUS_FAILED, e.getMessage());
 
-                authorize(req, subjCtx);
+                return new GridFinishedFuture<>(res);
+            }
+
+            assert ses != null;
+
+            req.clientId(ses.clientId);
+            req.sessionToken(U.uuidToBytes(ses.sesId));
+
+            if (log.isDebugEnabled())
+                log.debug("Next clientId and sessionToken were extracted according to request: " +
+                    "[clientId="+req.clientId()+", sesTok="+Arrays.toString(req.sessionToken())+"]");
+
+            SecurityContext secCtx0 = ses.secCtx;
+
+            try {
+                if (secCtx0 == null)
+                    ses.secCtx = secCtx0 = authenticate(req);
+
+                authorize(req, secCtx0);
             }
             catch (SecurityException e) {
-                assert subjCtx != null;
+                assert secCtx0 != null;
 
                 GridRestResponse res = new GridRestResponse(STATUS_SECURITY_CHECK_FAILED, e.getMessage());
 
-                updateSession(req, subjCtx);
-                res.sessionTokenBytes(ZERO_BYTES);
-
                 return new GridFinishedFuture<>(res);
             }
             catch (IgniteCheckedException e) {
@@ -228,16 +264,18 @@ public class GridRestProcessor extends GridProcessorAdapter {
             return new GridFinishedFuture<>(
                 new IgniteCheckedException("Failed to find registered handler for command: " + req.command()));
 
-        final SecurityContext subjCtx0 = subjCtx;
-
         return res.chain(new C1<IgniteInternalFuture<GridRestResponse>, GridRestResponse>() {
             @Override public GridRestResponse apply(IgniteInternalFuture<GridRestResponse> f) {
                 GridRestResponse res;
 
+                boolean failed = false;
+
                 try {
                     res = f.get();
                 }
                 catch (Exception e) {
+                    failed = true;
+
                     if (!X.hasCause(e, VisorClusterGroupEmptyException.class))
                         LT.error(log, e, "Failed to handle request: " + req.command());
 
@@ -249,10 +287,8 @@ public class GridRestProcessor extends GridProcessorAdapter {
 
                 assert res != null;
 
-                if (ctx.security().enabled()) {
-                    updateSession(req, subjCtx0);
-                    res.sessionTokenBytes(ZERO_BYTES);
-                }
+                if (ctx.security().enabled() && !failed)
+                    res.sessionTokenBytes(req.sessionToken());
 
                 interceptResponse(res, req);
 
@@ -262,10 +298,137 @@ public class GridRestProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param req Request.
+     * @return Not null session.
+     * @throws IgniteCheckedException If failed.
+     */
+    private Session session(final GridRestRequest req) throws IgniteCheckedException {
+        final UUID clientId = req.clientId();
+        final byte[] sesTok = req.sessionToken();
+
+        while (true) {
+            if (F.isEmpty(sesTok) && clientId == null) {
+                Session ses = Session.random();
+
+                UUID oldSesId = clientId2SesId.put(ses.clientId, ses.sesId);
+
+                assert oldSesId == null : "Got an illegal state for request: " + req;
+
+                Session oldSes = sesId2Ses.put(ses.sesId, ses);
+
+                assert oldSes == null : "Got an illegal state for request: " + req;
+
+                return ses;
+            }
+
+            if (F.isEmpty(sesTok) && clientId != null) {
+                UUID sesId = clientId2SesId.get(clientId);
+
+                if (sesId == null) {
+                    Session ses = Session.fromClientId(clientId);
+
+                    if (clientId2SesId.putIfAbsent(ses.clientId, ses.sesId) != null)
+                        continue; // Another thread already register session with the clientId.
+
+                    Session prevSes = sesId2Ses.put(ses.sesId, ses);
+
+                    assert prevSes == null : "Got an illegal state for request: " + req;
+
+                    return ses;
+                }
+                else {
+                    Session ses = sesId2Ses.get(sesId);
+
+                    if (ses == null || !ses.touch())
+                        continue; // Need to wait while timeout thread complete removing of timed out sessions.
+
+                    return ses;
+                }
+            }
+
+            if (!F.isEmpty(sesTok) && clientId == null) {
+                UUID sesId = U.bytesToUuid(sesTok, 0);
+
+                Session ses = sesId2Ses.get(sesId);
+
+                if (ses == null)
+                    throw new IgniteCheckedException("Failed to handle request - unknown session token " +
+                        "(maybe expired session) [sesTok=" + U.byteArray2HexString(sesTok) + "]");
+
+                if (!ses.touch())
+                    continue; // Need to wait while timeout thread complete removing of timed out sessions.
+
+                return ses;
+            }
+
+            if (!F.isEmpty(sesTok) && clientId != null) {
+                UUID sesId = clientId2SesId.get(clientId);
+
+                if (sesId == null || !sesId.equals(U.bytesToUuid(sesTok, 0)))
+                    throw new IgniteCheckedException("Failed to handle request - unsupported case (misamatched " +
+                        "clientId and session token) [clientId=" + clientId + ", sesTok=" +
+                        U.byteArray2HexString(sesTok) + "]");
+
+                Session ses = sesId2Ses.get(sesId);
+
+                if (ses == null)
+                    throw new IgniteCheckedException("Failed to handle request - unknown session token " +
+                        "(maybe expired session) [sesTok=" + U.byteArray2HexString(sesTok) + "]");
+
+                if (!ses.touch())
+                    continue; // Need to wait while timeout thread complete removing of timed out sessions.
+
+                return ses;
+            }
+
+            assert false : "Got an unreachable state.";
+        }
+    }
+
+    /**
      * @param ctx Context.
      */
     public GridRestProcessor(GridKernalContext ctx) {
         super(ctx);
+
+        long sesExpTime0;
+        String sesExpTime = null;
+
+        try {
+            sesExpTime = System.getProperty(IgniteSystemProperties.IGNITE_REST_SESSION_TIMEOUT);
+
+            if (sesExpTime != null)
+                sesExpTime0 = Long.valueOf(sesExpTime) * 1000;
+            else
+                sesExpTime0 = DEFAULT_SES_TIMEOUT;
+        }
+        catch (NumberFormatException ignore) {
+            U.warn(log, "Failed parsing IGNITE_REST_SESSION_TIMEOUT system variable [IGNITE_REST_SESSION_TIMEOUT="
+                + sesExpTime + "]");
+
+            sesExpTime0 = DEFAULT_SES_TIMEOUT;
+        }
+
+        sesTtl = sesExpTime0;
+
+        sesTimeoutCheckerThread = new IgniteThread(ctx.gridName(), "session-timeout-worker",
+            new GridWorker(ctx.gridName(), "session-timeout-worker", log) {
+                @Override protected void body() throws InterruptedException {
+                    while (!isCancelled()) {
+                        Thread.sleep(SES_TIMEOUT_CHECK_DELAY);
+
+                        for (Map.Entry<UUID, Session> e : sesId2Ses.entrySet()) {
+                            Session ses = e.getValue();
+
+                            if (ses.isTimedOut(sesTtl)) {
+                                sesId2Ses.remove(ses.sesId, ses);
+
+                                clientId2SesId.remove(ses.clientId, ses.sesId);
+                            }
+                        }
+                    }
+                }
+            });
     }
 
     /** {@inheritDoc} */
@@ -310,6 +473,10 @@ public class GridRestProcessor extends GridProcessorAdapter {
             for (GridRestProtocol proto : protos)
                 proto.onKernalStart();
 
+            sesTimeoutCheckerThread.setDaemon(true);
+
+            sesTimeoutCheckerThread.start();
+
             startLatch.countDown();
 
             if (log.isDebugEnabled())
@@ -334,6 +501,8 @@ public class GridRestProcessor extends GridProcessorAdapter {
                 }
             }
 
+            U.interrupt(sesTimeoutCheckerThread);
+
             if (interrupted)
                 Thread.currentThread().interrupt();
 
@@ -483,13 +652,8 @@ public class GridRestProcessor extends GridProcessorAdapter {
      * @throws IgniteCheckedException If authentication failed.
      */
     private SecurityContext authenticate(GridRestRequest req) throws IgniteCheckedException {
-        UUID clientId = req.clientId();
-        SecurityContext secCtx = clientId == null ? null : sesMap.get(clientId);
+        assert req.clientId() != null;
 
-        if (secCtx != null)
-            return secCtx;
-
-        // Authenticate client if invalid session.
         AuthenticationContext authCtx = new AuthenticationContext();
 
         authCtx.subjectType(REMOTE_CLIENT);
@@ -531,18 +695,6 @@ public class GridRestProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * Update session.
-     * @param req REST request.
-     * @param sCtx Security context.
-     */
-    private void updateSession(GridRestRequest req, SecurityContext sCtx) {
-        if (sCtx != null) {
-            UUID id = req.clientId();
-            sesMap.put(id, sCtx);
-        }
-    }
-
-    /**
      * @param req REST request.
      * @param sCtx Security context.
      * @throws SecurityException If authorization failed.
@@ -719,4 +871,134 @@ public class GridRestProcessor extends GridProcessorAdapter {
         X.println(">>>   protosSize: " + protos.size());
         X.println(">>>   handlersSize: " + handlers.size());
     }
-}
\ No newline at end of file
+
+    /**
+     * Session.
+     */
+    private static class Session {
+        /** Expiration flag. It's a final state of lastToucnTime. */
+        private static final Long TIMEDOUT_FLAG = 0L;
+
+        /** Client id. */
+        private final UUID clientId;
+
+        /** Session token id. */
+        private final UUID sesId;
+
+        /** Security context. */
+        private volatile SecurityContext secCtx;
+
+        /**
+         * Time when session is used last time.
+         * If this time was set at TIMEDOUT_FLAG, then it should never be changed.
+         */
+        private final AtomicLong lastTouchTime = new AtomicLong(U.currentTimeMillis());
+
+        /**
+         * @param clientId Client ID.
+         * @param sesId session ID.
+         */
+        private Session(UUID clientId, UUID sesId) {
+            this.clientId = clientId;
+            this.sesId = sesId;
+        }
+
+        /**
+         * Static constructor.
+         *
+         * @return New session instance with random client ID and random session ID.
+         */
+        static Session random() {
+            return new Session(UUID.randomUUID(), UUID.randomUUID());
+        }
+
+        /**
+         * Static constructor.
+         *
+         * @param clientId Client ID.
+         * @return New session instance with given client ID and random session ID.
+         */
+        static Session fromClientId(UUID clientId) {
+            return new Session(clientId, UUID.randomUUID());
+        }
+
+        /**
+         * Static constructor.
+         *
+         * @param sesTokId Session token ID.
+         * @return New session instance with random client ID and given session ID.
+         */
+        static Session fromSessionToken(UUID sesTokId) {
+            return new Session(UUID.randomUUID(), sesTokId);
+        }
+
+        /**
+         * Checks expiration of session and if expired then sets TIMEDOUT_FLAG.
+         *
+         * @param sesTimeout Session timeout.
+         * @return <code>True</code> if expired.
+         * @see #touch()
+         */
+        boolean isTimedOut(long sesTimeout) {
+            long time0 = lastTouchTime.get();
+
+            if (time0 == TIMEDOUT_FLAG)
+                return true;
+
+            return U.currentTimeMillis() - time0 > sesTimeout && lastTouchTime.compareAndSet(time0, TIMEDOUT_FLAG);
+        }
+
+        /**
+         * Checks whether session at expired state (EPIRATION_FLAG) or not, if not then tries to update last touch time.
+         *
+         * @return {@code False} if session timed out (not successfully touched).
+         * @see #isTimedOut(long)
+         */
+        boolean touch() {
+            while (true) {
+                long time0 = lastTouchTime.get();
+
+                if (time0 == TIMEDOUT_FLAG)
+                    return false;
+
+                boolean success = lastTouchTime.compareAndSet(time0, U.currentTimeMillis());
+
+                if (success)
+                    return true;
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (!(o instanceof Session))
+                return false;
+
+            Session ses = (Session)o;
+
+            if (clientId != null ? !clientId.equals(ses.clientId) : ses.clientId != null)
+                return false;
+
+            if (sesId != null ? !sesId.equals(ses.sesId) : ses.sesId != null)
+                return false;
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = clientId != null ? clientId.hashCode() : 0;
+
+            res = 31 * res + (sesId != null ? sesId.hashCode() : 0);
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Session.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f025714e/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
index b3af2f2..9d32c17 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
@@ -1391,4 +1391,4 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter {
             return c.sizeAsync(new CachePeekMode[]{CachePeekMode.PRIMARY});
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f025714e/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index e5090cb..3c1913a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -9302,4 +9302,4 @@ public abstract class IgniteUtils {
             throw new IgniteInterruptedCheckedException(e);
         }
     }
-}
\ No newline at end of file
+}


[5/7] ignite git commit: ignite-1526: full support of IBM JDK by Ignite

Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e5cc57/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
index ac8c5af..d89e397 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
@@ -22,7 +22,6 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.locks.Lock;
 import javax.cache.CacheException;
 import javax.cache.CacheManager;
@@ -35,7 +34,6 @@ import javax.cache.processor.EntryProcessorResult;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCompute;
-import org.apache.ignite.Ignition;
 import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.cache.CacheMetrics;
 import org.apache.ignite.cache.CachePeekMode;
@@ -48,8 +46,8 @@ import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.mxbean.CacheMetricsMXBean;
+import org.apache.ignite.resources.IgniteInstanceResource;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -63,9 +61,6 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
     /** Cache name. */
     private final String cacheName;
 
-    /** Grid id. */
-    private final UUID gridId;
-
     /** With async. */
     private final boolean isAsync;
 
@@ -82,31 +77,16 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
 
     /**
      * @param name Name.
-     * @param async
+     * @param async Async flag.
      * @param proxy Ignite Process Proxy.
      */
     public IgniteCacheProcessProxy(String name, boolean async, IgniteProcessProxy proxy) {
         cacheName = name;
         isAsync = async;
-        gridId = proxy.getId();
         igniteProxy = proxy;
         compute = proxy.remoteCompute();
     }
 
-    /**
-     * Returns cache instance. Method to be called from closure at another JVM.
-     *
-     * @return Cache.
-     */
-    private IgniteCache<Object, Object> cache() {
-        IgniteCache cache = Ignition.ignite(gridId).cache(cacheName);
-
-        if (isAsync)
-            cache = cache.withAsync();
-
-        return cache;
-    }
-
     /** {@inheritDoc} */
     @Override public IgniteCache<K, V> withAsync() {
         return new IgniteCacheProcessProxy<>(cacheName, true, igniteProxy);
@@ -124,14 +104,8 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override public <C extends Configuration<K, V>> C getConfiguration(final Class<C> clazz) {
-        final Class cl = clazz;
-
-        return (C)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return cache().getConfiguration(cl);
-            }
-        });
+    @Override public <C extends Configuration<K, V>> C getConfiguration(Class<C> clazz) {
+        return compute.call(new GetConfigurationTask<>(cacheName, isAsync, clazz));
     }
 
     /** {@inheritDoc} */
@@ -149,33 +123,26 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
         throw new UnsupportedOperationException("Method should be supported.");
     }
 
+    /** {@inheritDoc} */
     @Override public IgniteCache<K, V> withNoRetries() {
         throw new UnsupportedOperationException("Method should be supported.");
     }
 
     /** {@inheritDoc} */
-    @Override public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws CacheException {
+    @Override public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args)
+        throws CacheException {
         throw new UnsupportedOperationException("Method should be supported.");
     }
 
     /** {@inheritDoc} */
-    @Override public void localLoadCache(@Nullable final IgniteBiPredicate<K, V> p, @Nullable final Object... args) throws CacheException {
-        final IgniteBiPredicate pCopy = p;
-
-        compute.run(new IgniteRunnable() {
-            @Override public void run() {
-                cache().localLoadCache(pCopy, args);
-            }
-        });
+    @Override public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args)
+        throws CacheException {
+        compute.call(new LocalLoadCacheTask<>(cacheName, isAsync, p, args));
     }
 
     /** {@inheritDoc} */
-    @Override public V getAndPutIfAbsent(final K key, final V val) throws CacheException {
-        return (V)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return cache().getAndPutIfAbsent(key, val);
-            }
-        });
+    @Override public V getAndPutIfAbsent(K key, V val) throws CacheException {
+        return compute.call(new GetAndPutIfAbsentTask<>(cacheName, isAsync, key, val));
     }
 
     /** {@inheritDoc} */
@@ -189,12 +156,8 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean isLocalLocked(final K key, final boolean byCurrThread) {
-        return compute.call(new IgniteCallable<Boolean>() {
-            @Override public Boolean call() throws Exception {
-                return cache().isLocalLocked(key, byCurrThread);
-            }
-        });
+    @Override public boolean isLocalLocked(K key, boolean byCurrThread) {
+        return compute.call(new IsLocalLockedTask<>(cacheName, isAsync, key, byCurrThread));
     }
 
     /** {@inheritDoc} */
@@ -203,18 +166,8 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public Iterable<Entry<K, V>> localEntries(final CachePeekMode... peekModes) throws CacheException {
-        return (Iterable<Entry<K, V>>)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                Collection<Entry> res = new ArrayList<>();
-
-                for (Entry e : cache().localEntries(peekModes))
-                    res.add(new CacheEntryImpl(e.getKey(), e.getValue()));
-
-                return res;
-            }
-        });
+    @Override public Iterable<Entry<K, V>> localEntries(CachePeekMode... peekModes) throws CacheException {
+        return compute.call(new LocalEntriesTask<K, V>(cacheName, isAsync, peekModes));
     }
 
     /** {@inheritDoc} */
@@ -223,21 +176,13 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override public void localEvict(final Collection<? extends K> keys) {
-        compute.run(new IgniteRunnable() {
-            @Override public void run() {
-                cache().localEvict(keys);
-            }
-        });
+    @Override public void localEvict(Collection<? extends K> keys) {
+        compute.call(new LocalEvictTask<>(cacheName, isAsync, keys));
     }
 
     /** {@inheritDoc} */
-    @Override public V localPeek(final K key, final CachePeekMode... peekModes) {
-        return (V)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return cache().localPeek(key, peekModes);
-            }
-        });
+    @Override public V localPeek(K key, CachePeekMode... peekModes) {
+        return compute.call(new LocalPeekTask<K, V>(cacheName, isAsync, key, peekModes));
     }
 
     /** {@inheritDoc} */
@@ -246,274 +191,160 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override public int size(final CachePeekMode... peekModes) throws CacheException {
-        return (int)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return cache().size(peekModes);
-            }
-        });
+    @Override public int size(CachePeekMode... peekModes) throws CacheException {
+        return compute.call(new SizeTask(cacheName, isAsync, peekModes, false));
     }
 
     /** {@inheritDoc} */
-    @Override public int localSize(final CachePeekMode... peekModes) {
-        return (int)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return cache().localSize(peekModes);
-            }
-        });
+    @Override public int localSize(CachePeekMode... peekModes) {
+        return compute.call(new SizeTask(cacheName, isAsync, peekModes, true));
     }
 
     /** {@inheritDoc} */
-    @Override  public <T> Map<K, EntryProcessorResult<T>> invokeAll(Map<? extends K, ? extends EntryProcessor<K, V, T>> map,
-        Object... args) {
+    @Override  public <T> Map<K, EntryProcessorResult<T>> invokeAll(
+        Map<? extends K, ? extends EntryProcessor<K, V, T>> map,
+        Object... args)
+    {
         throw new UnsupportedOperationException("Method should be supported.");
     }
 
     /** {@inheritDoc} */
-    @Override public V get(final K key) {
-        return (V)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return cache().get(key);
-            }
-        });
+    @Override public V get(K key) {
+        return compute.call(new GetTask<K, V>(cacheName, isAsync, key));
     }
 
     /** {@inheritDoc} */
-    @Override public Map<K, V> getAll(final Set<? extends K> keys) {
-        return (Map<K, V>)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return cache().getAll(keys);
-            }
-        });
+    @Override public Map<K, V> getAll(Set<? extends K> keys) {
+        return compute.call(new GetAllTask<K, V>(cacheName, isAsync, keys));
     }
 
-    @Override public Map<K, V> getAllOutTx(final Set<? extends K> keys) {
-        return (Map<K, V>)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return cache().getAllOutTx(keys);
-            }
-        });
+    /** {@inheritDoc} */
+    @Override public Map<K, V> getAllOutTx(Set<? extends K> keys) {
+        return compute.call(new GetAllOutTxTask<K, V>(cacheName, isAsync, keys));
     }
 
     /** {@inheritDoc} */
-    @Override public boolean containsKey(final K key) {
-        return (boolean)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return cache().containsKey(key);
-            }
-        });
+    @Override public boolean containsKey(K key) {
+        return compute.call(new ContainsKeyTask<>(cacheName, isAsync, key));
     }
 
     /** {@inheritDoc} */
-    @Override  public void loadAll(Set<? extends K> keys, boolean replaceExistingValues, CompletionListener completionLsnr) {
+    @Override  public void loadAll(Set<? extends K> keys, boolean replaceExistVals, CompletionListener completionLsnr) {
         throw new UnsupportedOperationException("Oparetion can't be supported automatically.");
     }
 
     /** {@inheritDoc} */
-    @Override public boolean containsKeys(final Set<? extends K> keys) {
-        return (boolean)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return cache().containsKeys(keys);
-            }
-        });
+    @Override public boolean containsKeys(Set<? extends K> keys) {
+        return compute.call(new ContainsKeysTask<>(cacheName, isAsync, keys));
     }
 
     /** {@inheritDoc} */
-    @Override public void put(final K key, final V val) {;
-        compute.run(new IgniteRunnable() {
-            @Override public void run() {
-                cache().put(key, val);
-            }
-        });
+    @Override public void put(K key, V val) {
+        compute.call(new PutTask<>(cacheName, isAsync, key, val));
     }
 
     /** {@inheritDoc} */
-    @Override public V getAndPut(final K key, final V val) {
-        return (V)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return cache().getAndPut(key, val);
-            }
-        });
+    @Override public V getAndPut(K key, V val) {
+        return compute.call(new GetAndPutTask<>(cacheName, isAsync, key, val));
     }
 
     /** {@inheritDoc} */
-    @Override public void putAll(final Map<? extends K, ? extends V> map) {
-        compute.run(new IgniteRunnable() {
-            @Override public void run() {
-                cache().putAll(map);
-            }
-        });
+    @Override public void putAll(Map<? extends K, ? extends V> map) {
+        compute.call(new PutAllTask<>(cacheName, isAsync, map));
     }
 
     /** {@inheritDoc} */
-    @Override public boolean putIfAbsent(final K key, final V val) {
-        return (boolean)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return cache().putIfAbsent(key, val);
-            }
-        });
+    @Override public boolean putIfAbsent(K key, V val) {
+        return compute.call(new PutIfAbsentTask<>(cacheName, isAsync, key, val));
     }
 
     /** {@inheritDoc} */
-    @Override public boolean remove(final K key) {
-        return (boolean)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return cache().remove(key);
-            }
-        });
+    @Override public boolean remove(K key) {
+        return compute.call(new RemoveTask<>(cacheName, isAsync, key));
     }
 
     /** {@inheritDoc} */
-    @Override public boolean remove(final K key, final V oldVal) {
-        return (boolean)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return cache().remove(key, oldVal);
-            }
-        });
+    @Override public boolean remove(K key, V oldVal) {
+        return compute.call(new RemoveIfExistsTask<>(cacheName, isAsync, key, oldVal));
     }
 
     /** {@inheritDoc} */
-    @Override public V getAndRemove(final K key) {
-        return (V)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return cache().getAndRemove(key);
-            }
-        });
+    @Override public V getAndRemove(K key) {
+        return compute.call(new GetAndRemoveTask<K, V>(cacheName, isAsync, key));
     }
 
     /** {@inheritDoc} */
-    @Override public boolean replace(final K key, final V oldVal, final V newVal) {
-        return (boolean)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return cache().replace(key, oldVal, newVal);
-            }
-        });
+    @Override public boolean replace(K key, V oldVal, V newVal) {
+        return compute.call(new ReplaceIfExistsTask<>(cacheName, isAsync, key, oldVal, newVal));
     }
 
     /** {@inheritDoc} */
-    @Override public boolean replace(final K key, final V val) {
-        return (boolean)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return cache().replace(key, val);
-            }
-        });
+    @Override public boolean replace(K key, V val) {
+        return compute.call(new ReplaceTask<>(cacheName, isAsync, key, val));
     }
 
     /** {@inheritDoc} */
-    @Override public V getAndReplace(final K key, final V val) {
-        return (V)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return cache().getAndReplace(key, val);
-            }
-        });
+    @Override public V getAndReplace(K key, V val) {
+        return compute.call(new GetAndReplaceTask<>(cacheName, isAsync, key, val));
     }
 
     /** {@inheritDoc} */
-    @Override public void removeAll(final Set<? extends K> keys) {
-        compute.run(new IgniteRunnable() {
-            @Override public void run() {
-                cache().removeAll(keys);
-            }
-        });
+    @Override public void removeAll(Set<? extends K> keys) {
+        compute.call(new RemoveAllKeysTask<>(cacheName, isAsync, keys));
     }
 
     /** {@inheritDoc} */
     @Override public void removeAll() {
-        compute.run(new IgniteRunnable() {
-            @Override public void run() {
-                IgniteCache<Object, Object> cache = cache();
-
-                cache.removeAll();
-
-                if (isAsync)
-                    cache.future().get();
-            }
-        });
+        compute.call(new RemoveAllTask<K, V>(cacheName, isAsync));
     }
 
     /** {@inheritDoc} */
     @Override public void clear() {
-        compute.run(new IgniteRunnable() {
-            @Override public void run() {
-                cache().clear();
-            }
-        });
+        compute.call(new ClearTask(cacheName, isAsync));
     }
 
     /** {@inheritDoc} */
-    @Override public void clear(final K key) {
-        compute.run(new IgniteRunnable() {
-            @Override public void run() {
-                cache().clear(key);
-            }
-        });
+    @Override public void clear(K key) {
+        compute.call(new ClearKeyTask<>(cacheName, isAsync, false, key));
     }
 
     /** {@inheritDoc} */
-    @Override public void clearAll(final Set<? extends K> keys) {
-        compute.run(new IgniteRunnable() {
-            @Override public void run() {
-                cache().clearAll(keys);
-            }
-        });
+    @Override public void clearAll(Set<? extends K> keys) {
+        compute.call(new ClearAllKeys<>(cacheName, isAsync, false, keys));
     }
 
     /** {@inheritDoc} */
-    @Override public void localClear(final K key) {
-        compute.run(new IgniteRunnable() {
-            @Override public void run() {
-                cache().localClear(key);
-            }
-        });
+    @Override public void localClear(K key) {
+        compute.call(new ClearKeyTask<>(cacheName, isAsync, true, key));
     }
 
     /** {@inheritDoc} */
-    @Override public void localClearAll(final Set<? extends K> keys) {
-        compute.run(new IgniteRunnable() {
-            @Override public void run() {
-                cache().localClearAll(keys);
-            }
-        });
+    @Override public void localClearAll(Set<? extends K> keys) {
+        compute.call(new ClearAllKeys<>(cacheName, isAsync, true, keys));
     }
 
     /** {@inheritDoc} */
-    @Override public <T> T invoke(final K key, final EntryProcessor<K, V, T> entryProcessor, final Object... arguments) {
-        return (T)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return cache().invoke(key,
-                    (EntryProcessor<Object, Object, Object>)entryProcessor, arguments);
-            }
-        });
+    @Override public <T> T invoke(K key, EntryProcessor<K, V, T> processor, Object... args) {
+        return compute.call(new InvokeTask<>(cacheName, isAsync, key, processor, args));
     }
 
     /** {@inheritDoc} */
-    @Override public <T> T invoke(final K key, final CacheEntryProcessor<K, V, T> entryProcessor, final Object... arguments) {
-        return (T)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return cache().invoke(key,
-                    (CacheEntryProcessor<Object, Object, Object>)entryProcessor, arguments);
-            }
-        });
+    @Override public <T> T invoke(K key, CacheEntryProcessor<K, V, T> processor, Object... args) {
+        return compute.call(new InvokeTask<>(cacheName, isAsync, key, processor, args));
     }
 
     /** {@inheritDoc} */
-    @Override  public <T> Map<K, EntryProcessorResult<T>> invokeAll(final Set<? extends K> keys, final EntryProcessor<K, V, T> entryProcessor,
-        final Object... args) {
-        return (Map<K, EntryProcessorResult<T>>)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return cache().invokeAll(keys,
-                    (EntryProcessor<Object, Object, Object>)entryProcessor, args);
-            }
-        });
+    @Override  public <T> Map<K, EntryProcessorResult<T>> invokeAll(
+        Set<? extends K> keys,
+        EntryProcessor<K, V, T> processor,
+        Object... args)
+    {
+        return compute.call(new InvokeAllTask<>(cacheName, isAsync, keys, processor, args));
     }
 
     /** {@inheritDoc} */
     @Override public String getName() {
-        return (String)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return cache().getName();
-            }
-        });
+        return compute.call(new GetNameTask(cacheName, isAsync));
     }
 
     /** {@inheritDoc} */
@@ -523,72 +354,47 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
 
     /** {@inheritDoc} */
     @Override public void close() {
-        compute.run(new IgniteRunnable() {
-            @Override public void run() {
-                cache().close();
-            }
-        });
+        compute.call(new CloseTask(cacheName, isAsync));
     }
 
     /** {@inheritDoc} */
     @Override public void destroy() {
-        compute.run(new IgniteRunnable() {
-            @Override public void run() {
-                cache().destroy();
-            }
-        });
+        compute.call(new DestroyTask(cacheName, isAsync));
     }
 
     /** {@inheritDoc} */
     @Override public boolean isClosed() {
-        return (boolean)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return cache().isClosed();
-            }
-        });
+        return compute.call(new IsClosedTask(cacheName, isAsync));
     }
 
     /** {@inheritDoc} */
-    @Override public <T> T unwrap(final Class<T> clazz) {
+    @SuppressWarnings("unchecked")
+    @Override public <T> T unwrap(Class<T> clazz) {
         if (Ignite.class.equals(clazz))
             return (T)igniteProxy;
 
         try {
-            return (T)compute.call(new IgniteCallable<Object>() {
-                @Override public Object call() throws Exception {
-                    return cache().unwrap(clazz);
-                }
-            });
+            return compute.call(new UnwrapTask<>(cacheName, isAsync, clazz));
         }
         catch (Exception e) {
-            throw new IllegalArgumentException("Looks like class " + clazz + " is unmarshallable. Exception type:" + e.getClass(), e);
+            throw new IllegalArgumentException("Looks like class " + clazz +
+                " is unmarshallable. Exception type:" + e.getClass(), e);
         }
     }
 
     /** {@inheritDoc} */
-    @Override  public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) {
+    @Override  public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) {
         throw new UnsupportedOperationException("Method should be supported.");
     }
 
     /** {@inheritDoc} */
-    @Override  public void deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) {
+    @Override  public void deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) {
         throw new UnsupportedOperationException("Method should be supported.");
     }
 
     /** {@inheritDoc} */
     @Override public Iterator<Entry<K, V>> iterator() {
-        final Collection<Entry<K, V>> col = (Collection<Entry<K, V>>)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                Collection res = new ArrayList();
-
-                for (Object o : cache())
-                    res.add(o);
-
-                return res;
-            }
-        });
-
-        return col.iterator();
+        return compute.call(new IteratorTask<K, V>(cacheName, isAsync)).iterator();
     }
 
     /** {@inheritDoc} */
@@ -616,4 +422,968 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
     @Override public CacheMetricsMXBean mxBean() {
         throw new UnsupportedOperationException("Method should be supported.");
     }
+
+    /**
+     *
+     */
+    private static class GetConfigurationTask<K, V, C extends Configuration<K, V>> extends CacheTaskAdapter<K, V, C> {
+        /** Clazz. */
+        private final Class<C> clazz;
+
+        /**
+         * @param cacheName Cache name.
+         * @param async Async.
+         * @param clazz Clazz.
+         */
+        public GetConfigurationTask(String cacheName, boolean async, Class<C> clazz) {
+            super(cacheName, async);
+            this.clazz = clazz;
+        }
+
+        /** {@inheritDoc} */
+        @Override public C call() throws Exception {
+            return cache().getConfiguration(clazz);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class LocalLoadCacheTask<K, V> extends CacheTaskAdapter<K, V, Void> {
+        /** Predicate. */
+        private final IgniteBiPredicate<K, V> p;
+
+        /** Args. */
+        private final Object[] args;
+
+        /**
+         * @param cacheName Cache name.
+         * @param async Async.
+         * @param p P.
+         * @param args Args.
+         */
+        public LocalLoadCacheTask(String cacheName, boolean async, IgniteBiPredicate<K, V> p, Object[] args) {
+            super(cacheName, async);
+            this.p = p;
+            this.args = args;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void call() {
+            cache().localLoadCache(p, args);
+
+            return null;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class GetAndPutIfAbsentTask<K, V> extends CacheTaskAdapter<K, V, V> {
+        /** Key. */
+        private final K key;
+
+        /** Value. */
+        private final V val;
+
+        /**
+         * @param cacheName Cache name.
+         * @param async Async.
+         * @param key Key.
+         * @param val Value.
+         */
+        public GetAndPutIfAbsentTask(String cacheName, boolean async, K key, V val) {
+            super(cacheName, async);
+            this.key = key;
+            this.val = val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public V call() throws Exception {
+            return cache().getAndPutIfAbsent(key, val);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class IsLocalLockedTask<K> extends CacheTaskAdapter<K, Void, Boolean> {
+        /** Key. */
+        private final K key;
+
+        /** By current thread. */
+        private final boolean byCurrThread;
+
+        /**
+         * @param cacheName Cache name.
+         * @param async Async.
+         * @param key Key.
+         * @param byCurrThread By current thread.
+         */
+        public IsLocalLockedTask(String cacheName, boolean async, K key, boolean byCurrThread) {
+            super(cacheName, async);
+            this.key = key;
+            this.byCurrThread = byCurrThread;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Boolean call() throws Exception {
+            return cache().isLocalLocked(key, byCurrThread);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class LocalEntriesTask<K, V> extends CacheTaskAdapter<K, V, Iterable<Entry<K, V>>> {
+        /** Peek modes. */
+        private final CachePeekMode[] peekModes;
+
+        /**
+         * @param cacheName Cache name.
+         * @param async Async.
+         * @param peekModes Peek modes.
+         */
+        public LocalEntriesTask(String cacheName, boolean async, CachePeekMode[] peekModes) {
+            super(cacheName, async);
+            this.peekModes = peekModes;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Iterable<Entry<K, V>> call() throws Exception {
+            Collection<Entry<K, V>> res = new ArrayList<>();
+
+            for (Entry<K, V> e : cache().localEntries(peekModes))
+                res.add(new CacheEntryImpl<>(e.getKey(), e.getValue()));
+
+            return res;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class LocalEvictTask<K> extends CacheTaskAdapter<K, Void, Void> {
+        /** Keys. */
+        private final Collection<? extends K> keys;
+
+        /**
+         * @param cacheName Cache name.
+         * @param async Async.
+         * @param keys Keys.
+         */
+        public LocalEvictTask(String cacheName, boolean async, Collection<? extends K> keys) {
+            super(cacheName, async);
+            this.keys = keys;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void call() {
+            cache().localEvict(keys);
+
+            return null;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class LocalPeekTask<K, V> extends CacheTaskAdapter<K, V, V> {
+        /** Key. */
+        private final K key;
+
+        /** Peek modes. */
+        private final CachePeekMode[] peekModes;
+
+        /**
+         * @param cacheName Cache name.
+         * @param async Async.
+         * @param key Key.
+         * @param peekModes Peek modes.
+         */
+        public LocalPeekTask(String cacheName, boolean async, K key, CachePeekMode[] peekModes) {
+            super(cacheName, async);
+            this.key = key;
+            this.peekModes = peekModes;
+        }
+
+        /** {@inheritDoc} */
+        @Override public V call() throws Exception {
+            return cache().localPeek(key, peekModes);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class SizeTask extends CacheTaskAdapter<Void, Void, Integer> {
+        /** Peek modes. */
+        private final CachePeekMode[] peekModes;
+
+        /** Local. */
+        private final boolean loc;
+
+        /**
+         * @param cacheName Cache name.
+         * @param async Async.
+         * @param peekModes Peek modes.
+         * @param loc Local.
+         */
+        public SizeTask(String cacheName, boolean async, CachePeekMode[] peekModes, boolean loc) {
+            super(cacheName, async);
+            this.loc = loc;
+            this.peekModes = peekModes;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Integer call() throws Exception {
+            return loc ? cache().localSize(peekModes) : cache().size(peekModes);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class GetTask<K, V> extends CacheTaskAdapter<K, V, V> {
+        /** Key. */
+        private final K key;
+
+        /**
+         * @param cacheName Cache name.
+         * @param async Async.
+         * @param key Key.
+         */
+        public GetTask(String cacheName, boolean async, K key) {
+            super(cacheName, async);
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public V call() throws Exception {
+            return cache().get(key);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class RemoveAllTask<K, V> extends CacheTaskAdapter<K, V, Void> {
+        /**
+         * @param cacheName Cache name.
+         * @param async Async.
+         */
+        public RemoveAllTask(String cacheName, boolean async) {
+            super(cacheName, async);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void call() {
+            IgniteCache<K, V> cache = cache();
+
+            cache.removeAll();
+
+            if (async)
+                cache.future().get();
+
+            return null;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class PutTask<K, V> extends CacheTaskAdapter<K, V, Void> {
+        /** Key. */
+        private final K key;
+
+        /** Value. */
+        private final V val;
+
+        /**
+         * @param cacheName Cache name.
+         * @param async Async.
+         * @param key Key.
+         * @param val Value.
+         */
+        public PutTask(String cacheName, boolean async, K key, V val) {
+            super(cacheName, async);
+            this.key = key;
+            this.val = val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void call() {
+            cache().put(key, val);
+
+            return null;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class ContainsKeyTask<K> extends CacheTaskAdapter<K, Object, Boolean> {
+        /** Key. */
+        private final K key;
+
+        /**
+         * @param cacheName Cache name.
+         * @param async Async.
+         * @param key Key.
+         */
+        public ContainsKeyTask(String cacheName, boolean async, K key) {
+            super(cacheName, async);
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Boolean call() throws Exception {
+            return cache().containsKey(key);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class ClearTask extends CacheTaskAdapter<Object, Object, Void> {
+        /**
+         * @param cacheName Cache name.
+         * @param async Async.
+         */
+        public ClearTask(String cacheName, boolean async) {
+            super(cacheName, async);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void call() {
+            cache().clear();
+
+            return null;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class IteratorTask<K, V> extends CacheTaskAdapter<K, V, Collection<Entry<K, V>>> {
+        /**
+         * @param cacheName Cache name.
+         * @param async Async.
+         */
+        public IteratorTask(String cacheName, boolean async) {
+            super(cacheName, async);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<Entry<K, V>> call() throws Exception {
+            Collection<Entry<K, V>> res = new ArrayList<>();
+
+            for (Entry<K, V> o : cache())
+                res.add(o);
+
+            return res;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class ReplaceTask<K, V> extends CacheTaskAdapter<K, V, Boolean> {
+        /** Key. */
+        private final K key;
+
+        /** Value. */
+        private final V val;
+
+        /**
+         * @param cacheName Cache name.
+         * @param async Async.
+         * @param key Key.
+         * @param val Value.
+         */
+        public ReplaceTask(String cacheName, boolean async, K key, V val) {
+            super(cacheName, async);
+            this.key = key;
+            this.val = val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Boolean call() throws Exception {
+            return cache().replace(key, val);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class GetNameTask extends CacheTaskAdapter<Void, Void, String> {
+        /**
+         * @param cacheName Cache name.
+         * @param async Async.
+         */
+        public GetNameTask(String cacheName, boolean async) {
+            super(cacheName, async);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String call() throws Exception {
+            return cache().getName();
+        }
+    }
+
+    /**
+     *
+     */
+    private static class RemoveTask<K> extends CacheTaskAdapter<K, Void, Boolean> {
+        /** Key. */
+        private final K key;
+
+        /**
+         * @param cacheName Cache name.
+         * @param async Async.
+         * @param key Key.
+         */
+        public RemoveTask(String cacheName, boolean async, K key) {
+            super(cacheName, async);
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Boolean call() throws Exception {
+            return cache().remove(key);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class PutAllTask<K, V> extends CacheTaskAdapter<K, V, Void> {
+        /** Map. */
+        private final Map<? extends K, ? extends V> map;
+
+        /**
+         * @param cacheName Cache name.
+         * @param async Async.
+         * @param map Map.
+         */
+        public PutAllTask(String cacheName, boolean async, Map<? extends K, ? extends V> map) {
+            super(cacheName, async);
+            this.map = map;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void call() {
+            cache().putAll(map);
+
+            return null;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class RemoveAllKeysTask<K> extends CacheTaskAdapter<K, Void, Void> {
+        /** Keys. */
+        private final Set<? extends K> keys;
+
+        /**
+         * @param cacheName Cache name.
+         * @param async Async.
+         * @param keys Keys.
+         */
+        public RemoveAllKeysTask(String cacheName, boolean async, Set<? extends K> keys) {
+            super(cacheName, async);
+            this.keys = keys;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void call() {
+            cache().removeAll(keys);
+
+            return null;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class GetAllTask<K, V> extends CacheTaskAdapter<K, V, Map<K, V>> {
+        /** Keys. */
+        private final Set<? extends K> keys;
+
+        /**
+         * @param cacheName Cache name.
+         * @param async Async.
+         * @param keys Keys.
+         */
+        public GetAllTask(String cacheName, boolean async, Set<? extends K> keys) {
+            super(cacheName, async);
+            this.keys = keys;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Map<K, V> call() throws Exception {
+            return cache().getAll(keys);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class GetAllOutTxTask<K, V> extends CacheTaskAdapter<K, V, Map<K, V>> {
+        /** Keys. */
+        private final Set<? extends K> keys;
+
+        /**
+         * @param cacheName Cache name.
+         * @param async Async.
+         * @param keys Keys.
+         */
+        public GetAllOutTxTask(String cacheName, boolean async, Set<? extends K> keys) {
+            super(cacheName, async);
+            this.keys = keys;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Map<K, V> call() throws Exception {
+            return cache().getAllOutTx(keys);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class ContainsKeysTask<K, V> extends CacheTaskAdapter<K, V, Boolean> {
+        /** Keys. */
+        private final Set<? extends K> keys;
+
+        /**
+         * @param cacheName Cache name.
+         * @param async Async.
+         * @param keys Keys.
+         */
+        public ContainsKeysTask(String cacheName, boolean async, Set<? extends K> keys) {
+            super(cacheName, async);
+            this.keys = keys;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Boolean call() throws Exception {
+            return cache().containsKeys(keys);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class GetAndPutTask<K, V> extends CacheTaskAdapter<K, V, V> {
+        /** Key. */
+        private final K key;
+
+        /** Value. */
+        private final V val;
+
+        /**
+         * @param cacheName Cache name.
+         * @param async Async.
+         * @param key Key.
+         * @param val Value.
+         */
+        public GetAndPutTask(String cacheName, boolean async, K key, V val) {
+            super(cacheName, async);
+            this.key = key;
+            this.val = val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public V call() throws Exception {
+            return cache().getAndPut(key, val);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class PutIfAbsentTask<K, V> extends CacheTaskAdapter<K, V, Boolean> {
+        /** Key. */
+        private final K key;
+
+        /** Value. */
+        private final V val;
+
+        /**
+         * @param cacheName Cache name.
+         * @param async Async.
+         * @param key Key.
+         * @param val Value.
+         */
+        public PutIfAbsentTask(String cacheName, boolean async, K key, V val) {
+            super(cacheName, async);
+            this.key = key;
+            this.val = val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Boolean call() throws Exception {
+            return cache().putIfAbsent(key, val);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class RemoveIfExistsTask<K, V> extends CacheTaskAdapter<K, V, Boolean> {
+        /** Key. */
+        private final K key;
+
+        /** Old value. */
+        private final V oldVal;
+
+        /**
+         * @param cacheName Cache name.
+         * @param async Async.
+         * @param key Key.
+         * @param oldVal Old value.
+         */
+        public RemoveIfExistsTask(String cacheName, boolean async, K key, V oldVal) {
+            super(cacheName, async);
+            this.key = key;
+            this.oldVal = oldVal;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Boolean call() throws Exception {
+            return cache().remove(key, oldVal);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class GetAndRemoveTask<K, V> extends CacheTaskAdapter<K, V, V> {
+        /** Key. */
+        private final K key;
+
+        /**
+         * @param cacheName Cache name.
+         * @param async Async.
+         * @param key Key.
+         */
+        public GetAndRemoveTask(String cacheName, boolean async, K key) {
+            super(cacheName, async);
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public V call() throws Exception {
+            return cache().getAndRemove(key);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class ReplaceIfExistsTask<K, V> extends CacheTaskAdapter<K, V, Boolean> {
+        /** Key. */
+        private final K key;
+
+        /** Old value. */
+        private final V oldVal;
+
+        /** New value. */
+        private final V newVal;
+
+        /**
+         * @param cacheName Cache name.
+         * @param async Async.
+         * @param key Key.
+         * @param oldVal Old value.
+         * @param newVal New value.
+         */
+        public ReplaceIfExistsTask(String cacheName, boolean async, K key, V oldVal, V newVal) {
+            super(cacheName, async);
+            this.key = key;
+            this.oldVal = oldVal;
+            this.newVal = newVal;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Boolean call() throws Exception {
+            return cache().replace(key, oldVal, newVal);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class GetAndReplaceTask<K, V> extends CacheTaskAdapter<K, V, V> {
+        /** Key. */
+        private final K key;
+
+        /** Value. */
+        private final V val;
+
+        /**
+         * @param cacheName Cache name.
+         * @param async Async.
+         * @param key Key.
+         * @param val Value.
+         */
+        public GetAndReplaceTask(String cacheName, boolean async, K key, V val) {
+            super(cacheName, async);
+            this.key = key;
+            this.val = val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public V call() throws Exception {
+            return cache().getAndReplace(key, val);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class ClearKeyTask<K> extends CacheTaskAdapter<K, Void, Void> {
+        /** Key. */
+        private final K key;
+
+        /** Local. */
+        private final boolean loc;
+
+        /**
+         * @param cacheName Cache name.
+         * @param async Async.
+         * @param key Key.
+         */
+        public ClearKeyTask(String cacheName, boolean async, boolean loc, K key) {
+            super(cacheName, async);
+            this.key = key;
+            this.loc = loc;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void call() {
+            if (loc)
+                cache().localClear(key);
+            else
+                cache().clear(key);
+
+            return null;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class ClearAllKeys<K> extends CacheTaskAdapter<K, Void, Void> {
+        /** Keys. */
+        private final Set<? extends K> keys;
+
+        /** Local. */
+        private final boolean loc;
+
+        /**
+         * @param cacheName Cache name.
+         * @param async Async.
+         * @param keys Keys.
+         */
+        public ClearAllKeys(String cacheName, boolean async, boolean loc, Set<? extends K> keys) {
+            super(cacheName, async);
+            this.keys = keys;
+            this.loc = loc;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void call() {
+            if (loc)
+                cache().localClearAll(keys);
+            else
+                cache().clearAll(keys);
+
+            return null;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class InvokeTask<K, V, R> extends CacheTaskAdapter<K, V, R> {
+        /** Key. */
+        private final K key;
+
+        /** Processor. */
+        private final EntryProcessor<K, V, R> processor;
+
+        /** Args. */
+        private final Object[] args;
+
+        /**
+         * @param cacheName Cache name.
+         * @param async Async.
+         * @param key Key.
+         * @param processor Processor.
+         * @param args Args.
+         */
+        public InvokeTask(String cacheName, boolean async, K key, EntryProcessor<K, V, R> processor,
+            Object[] args) {
+            super(cacheName, async);
+            this.args = args;
+            this.key = key;
+            this.processor = processor;
+        }
+
+        /** {@inheritDoc} */
+        @Override public R call() throws Exception {
+            return cache().invoke(key, processor, args);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class InvokeAllTask<K, V, T> extends CacheTaskAdapter<K, V, Map<K, EntryProcessorResult<T>>> {
+        /** Keys. */
+        private final Set<? extends K> keys;
+
+        /** Processor. */
+        private final EntryProcessor<K, V, T> processor;
+
+        /** Args. */
+        private final Object[] args;
+
+        /**
+         * @param cacheName Cache name.
+         * @param async Async.
+         * @param keys Keys.
+         * @param processor Processor.
+         * @param args Args.
+         */
+        public InvokeAllTask(String cacheName, boolean async, Set<? extends K> keys,
+            EntryProcessor<K, V, T> processor, Object[] args) {
+            super(cacheName, async);
+            this.args = args;
+            this.keys = keys;
+            this.processor = processor;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Map<K, EntryProcessorResult<T>> call() throws Exception {
+            return cache().invokeAll(keys, processor, args);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class CloseTask extends CacheTaskAdapter<Void, Void, Void> {
+        /**
+         * @param cacheName Cache name.
+         * @param async Async.
+         */
+        public CloseTask(String cacheName, boolean async) {
+            super(cacheName, async);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void call() {
+            cache().close();
+
+            return null;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class DestroyTask extends CacheTaskAdapter<Void, Void, Void> {
+        /**
+         * @param cacheName Cache name.
+         * @param async Async.
+         */
+        public DestroyTask(String cacheName, boolean async) {
+            super(cacheName, async);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void call() {
+            cache().destroy();
+
+            return null;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class IsClosedTask extends CacheTaskAdapter<Void, Void, Boolean> {
+        /**
+         * @param cacheName Cache name.
+         * @param async Async.
+         */
+        public IsClosedTask(String cacheName, boolean async) {
+            super(cacheName, async);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Boolean call() throws Exception {
+            return cache().isClosed();
+        }
+    }
+
+    /**
+     *
+     */
+    private static class UnwrapTask<R> extends CacheTaskAdapter<Void, Void, R> {
+        /** Clazz. */
+        private final Class<R> clazz;
+
+        /**
+         * @param cacheName Cache name.
+         * @param async Async.
+         * @param clazz Clazz.
+         */
+        public UnwrapTask(String cacheName, boolean async, Class<R> clazz) {
+            super(cacheName, async);
+            this.clazz = clazz;
+        }
+
+        /** {@inheritDoc} */
+        @Override public R call() throws Exception {
+            return cache().unwrap(clazz);
+        }
+    }
+
+    /**
+     *
+     */
+    private static abstract class CacheTaskAdapter<K, V, R> implements IgniteCallable<R> {
+        /** Ignite. */
+        @IgniteInstanceResource
+        protected Ignite ignite;
+
+        /** Cache name. */
+        protected final String cacheName;
+
+        /** Async. */
+        protected final boolean async;
+
+        /**
+         * @param cacheName Cache name.
+         * @param async Async.
+         */
+        public CacheTaskAdapter(String cacheName, boolean async) {
+            this.async = async;
+            this.cacheName = cacheName;
+        }
+
+        /**
+         * Returns cache instance.
+         */
+        protected IgniteCache<K, V> cache() {
+            IgniteCache<K, V> cache = ignite.cache(cacheName);
+
+            return async ? cache.withAsync() : cache;
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e5cc57/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java
index 05d6533..633e9d0 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java
@@ -26,7 +26,6 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCluster;
 import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.IgniteException;
-import org.apache.ignite.Ignition;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.cluster.ClusterMetrics;
 import org.apache.ignite.cluster.ClusterNode;
@@ -36,6 +35,7 @@ import org.apache.ignite.internal.cluster.IgniteClusterEx;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.resources.IgniteInstanceResource;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -43,9 +43,6 @@ import org.jetbrains.annotations.Nullable;
  */
 @SuppressWarnings("TransientFieldInNonSerializableClass")
 public class IgniteClusterProcessProxy implements IgniteClusterEx {
-    /** Grid id. */
-    private final UUID gridId;
-
     /** Compute. */
     private final transient IgniteCompute compute;
 
@@ -57,21 +54,11 @@ public class IgniteClusterProcessProxy implements IgniteClusterEx {
      */
     public IgniteClusterProcessProxy(IgniteProcessProxy proxy) {
         this.proxy = proxy;
-        gridId = proxy.getId();
         compute = proxy.remoteCompute();
     }
 
-    /**
-     * Returns cluster instance. Method to be called from closure at another JVM.
-     *
-     * @return Cache.
-     */
-    private IgniteClusterEx cluster() {
-        return (IgniteClusterEx)Ignition.ignite(gridId).cluster();
-    }
-
     /** {@inheritDoc} */
-    @Override public ClusterGroupEx forSubjectId(final UUID subjId) {
+    @Override public ClusterGroupEx forSubjectId(UUID subjId) {
         throw new UnsupportedOperationException("Operation is not supported yet.");
     }
 
@@ -83,11 +70,7 @@ public class IgniteClusterProcessProxy implements IgniteClusterEx {
 
     /** {@inheritDoc} */
     @Override public ClusterNode localNode() {
-        return compute.call(new IgniteCallable<ClusterNode>() {
-            @Override public ClusterNode call() throws Exception {
-                return cluster().localNode();
-            }
-        });
+        return compute.call(new LocalNodeTask());
     }
 
     /** {@inheritDoc} */
@@ -285,38 +268,22 @@ public class IgniteClusterProcessProxy implements IgniteClusterEx {
 
     /** {@inheritDoc} */
     @Override public Collection<ClusterNode> nodes() {
-        return compute.call(new IgniteCallable<Collection<ClusterNode>>() {
-            @Override public Collection<ClusterNode> call() throws Exception {
-                return cluster().nodes();
-            }
-        });
+        return compute.call(new NodesTask());
     }
 
     /** {@inheritDoc} */
-    @Override public ClusterNode node(final UUID nid) {
-        return compute.call(new IgniteCallable<ClusterNode>() {
-            @Override public ClusterNode call() throws Exception {
-                return cluster().node(nid);
-            }
-        });
+    @Override public ClusterNode node(UUID nid) {
+        return compute.call(new NodeTask(nid));
     }
 
     /** {@inheritDoc} */
     @Override public ClusterNode node() {
-        return compute.call(new IgniteCallable<ClusterNode>() {
-            @Override public ClusterNode call() throws Exception {
-                return cluster().node();
-            }
-        });
+        return compute.call(new NodeTask(null));
     }
 
     /** {@inheritDoc} */
     @Override public Collection<String> hostNames() {
-        return compute.call(new IgniteCallable<Collection<String>>() {
-            @Override public Collection<String> call() throws Exception {
-                return cluster().hostNames();
-            }
-        });
+        return compute.call(new HostNamesTask());
     }
 
     /** {@inheritDoc} */
@@ -333,4 +300,70 @@ public class IgniteClusterProcessProxy implements IgniteClusterEx {
     @Nullable @Override public IgniteFuture<?> clientReconnectFuture() {
         throw new UnsupportedOperationException("Operation is not supported yet.");
     }
+
+    /**
+     *
+     */
+    private static class LocalNodeTask extends ClusterTaskAdapter<ClusterNode> {
+        /** {@inheritDoc} */
+        @Override public ClusterNode call() throws Exception {
+            return cluster().localNode();
+        }
+    }
+
+    /**
+     *
+     */
+    private static class NodesTask extends ClusterTaskAdapter<Collection<ClusterNode>> {
+        /** {@inheritDoc} */
+        @Override public Collection<ClusterNode> call() throws Exception {
+            return cluster().nodes();
+        }
+    }
+
+    /**
+     *
+     */
+    private static class NodeTask extends ClusterTaskAdapter<ClusterNode> {
+        /** Node id. */
+        private final UUID nodeId;
+
+        /**
+         * @param nodeId Node id.
+         */
+        public NodeTask(UUID nodeId) {
+            this.nodeId = nodeId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public ClusterNode call() throws Exception {
+            return nodeId == null ? cluster().node() : cluster().node(nodeId);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class HostNamesTask extends ClusterTaskAdapter<Collection<String>> {
+        /** {@inheritDoc} */
+        @Override public Collection<String> call() throws Exception {
+            return cluster().hostNames();
+        }
+    }
+
+    /**
+     *
+     */
+    private abstract static class ClusterTaskAdapter<R> implements IgniteCallable<R> {
+        /** Ignite. */
+        @IgniteInstanceResource
+        protected Ignite ignite;
+
+        /**
+         *
+         */
+        protected IgniteCluster cluster() {
+            return ignite.cluster();
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e5cc57/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteEventsProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteEventsProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteEventsProcessProxy.java
index 860f889..d5af81e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteEventsProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteEventsProcessProxy.java
@@ -20,15 +20,16 @@ package org.apache.ignite.testframework.junits.multijvm;
 import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteEvents;
 import org.apache.ignite.IgniteException;
-import org.apache.ignite.Ignition;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.resources.IgniteInstanceResource;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -39,23 +40,11 @@ public class IgniteEventsProcessProxy implements IgniteEvents {
     /** Ignite proxy. */
     private final transient IgniteProcessProxy igniteProxy;
 
-    /** Grid id. */
-    private final UUID gridId;
-
     /**
      * @param igniteProxy Ignite proxy.
      */
     public IgniteEventsProcessProxy(IgniteProcessProxy igniteProxy) {
         this.igniteProxy = igniteProxy;
-
-        gridId = igniteProxy.getId();
-    }
-
-    /**
-     * @return Events instance.
-     */
-    private IgniteEvents events() {
-        return Ignition.ignite(gridId).events();
     }
 
     /** {@inheritDoc} */
@@ -105,11 +94,7 @@ public class IgniteEventsProcessProxy implements IgniteEvents {
 
     /** {@inheritDoc} */
     @Override public void localListen(final IgnitePredicate<? extends Event> lsnr, final int... types) {
-        igniteProxy.remoteCompute().run(new IgniteRunnable() {
-            @Override public void run() {
-                events().localListen(lsnr, types);
-            }
-        });
+        igniteProxy.remoteCompute().run(new LocalListenTask(lsnr, types));
     }
 
     /** {@inheritDoc} */
@@ -151,4 +136,33 @@ public class IgniteEventsProcessProxy implements IgniteEvents {
     @Override public <R> IgniteFuture<R> future() {
         throw new UnsupportedOperationException("Operation isn't supported yet.");
     }
+
+    /**
+     *
+     */
+    private static class LocalListenTask implements IgniteRunnable {
+        /** Ignite. */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** Listener. */
+        private final IgnitePredicate<? extends Event> lsnr;
+
+        /** Types. */
+        private final int[] types;
+
+        /**
+         * @param lsnr Listener.
+         * @param types Types.
+         */
+        public LocalListenTask(IgnitePredicate<? extends Event> lsnr, int[] types) {
+            this.lsnr = lsnr;
+            this.types = types;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            ignite.events().localListen(lsnr, types);
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e5cc57/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java
index f46e8e9..0597eda 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java
@@ -25,13 +25,12 @@ import java.io.FileOutputStream;
 import java.io.FileReader;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.Ignition;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
@@ -39,8 +38,9 @@ import org.apache.ignite.internal.processors.cache.GridCacheAbstractFullApiSelfT
 import org.apache.ignite.internal.util.GridJavaProcess;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
+import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.testframework.junits.IgniteTestResources;
 import sun.jvmstat.monitor.HostIdentifier;
 import sun.jvmstat.monitor.MonitoredHost;
 import sun.jvmstat.monitor.MonitoredVm;
@@ -99,30 +99,6 @@ public class IgniteNodeRunner {
     public static String storeToFile(IgniteConfiguration cfg) throws IOException {
         String fileName = IGNITE_CONFIGURATION_FILE + cfg.getNodeId();
 
-        // Check marshaller configuration, because read configuration method expect specific marshaller.
-        if (cfg.getMarshaller() instanceof OptimizedMarshaller){
-            OptimizedMarshaller marsh = (OptimizedMarshaller)cfg.getMarshaller();
-
-            try {
-                Field isRequireFiled = marsh.getClass().getDeclaredField("requireSer");
-
-                isRequireFiled.setAccessible(true);
-
-                boolean isRequireSer = isRequireFiled.getBoolean(marsh);
-
-                if (isRequireSer)
-                    throw new UnsupportedOperationException("Unsupported marshaller configuration. " +
-                        "readCfgFromFileAndDeleteFile method expect " + OptimizedMarshaller.class.getSimpleName() +
-                        "with requireSerializeble flag in 'false'.");
-            }
-            catch (NoSuchFieldException|IllegalAccessException e) {
-                throw new IgniteException("Failed to check filed of " + OptimizedMarshaller.class.getSimpleName(), e);
-            }
-        }
-        else
-            throw new UnsupportedOperationException("Unsupported marshaller. " +
-                "readCfgFromFileAndDeleteFile method expect " + OptimizedMarshaller.class.getSimpleName());
-
         try(OutputStream out = new BufferedOutputStream(new FileOutputStream(fileName))) {
             cfg.setMBeanServer(null);
             cfg.setMarshaller(null);
@@ -143,11 +119,16 @@ public class IgniteNodeRunner {
      * @throws IOException If failed.
      * @see #storeToFile(IgniteConfiguration)
      */
-    private static IgniteConfiguration readCfgFromFileAndDeleteFile(String fileName) throws IOException {
+    private static IgniteConfiguration readCfgFromFileAndDeleteFile(String fileName)
+        throws IOException, IgniteCheckedException {
         try(BufferedReader cfgReader = new BufferedReader(new FileReader(fileName))) {
             IgniteConfiguration cfg = (IgniteConfiguration)new XStream().fromXML(cfgReader);
 
-            cfg.setMarshaller(new OptimizedMarshaller(false));
+            Marshaller marsh = IgniteTestResources.getMarshaller();
+
+            cfg.setMarshaller(marsh);
+
+            X.println("Configured marshaller class: " + marsh.getClass().getName());
 
             TcpDiscoverySpi disco = new TcpDiscoverySpi();
             disco.setIpFinder(GridCacheAbstractFullApiSelfTest.LOCAL_IP_FINDER);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e5cc57/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
index ec7dab7..aa1d470 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
@@ -41,13 +41,11 @@ import org.apache.ignite.IgniteFileSystem;
 import org.apache.ignite.IgniteIllegalStateException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteMessaging;
-import org.apache.ignite.internal.portable.api.IgnitePortables;
 import org.apache.ignite.IgniteQueue;
 import org.apache.ignite.IgniteScheduler;
 import org.apache.ignite.IgniteServices;
 import org.apache.ignite.IgniteSet;
 import org.apache.ignite.IgniteTransactions;
-import org.apache.ignite.Ignition;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.cluster.ClusterNode;
@@ -62,6 +60,7 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.IgniteClusterEx;
+import org.apache.ignite.internal.portable.api.IgnitePortables;
 import org.apache.ignite.internal.processors.cache.GridCacheUtilityKey;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.hadoop.Hadoop;
@@ -78,6 +77,8 @@ import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.plugin.IgnitePlugin;
 import org.apache.ignite.plugin.PluginNotFoundException;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.testframework.junits.IgniteTestResources;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -88,6 +89,9 @@ public class IgniteProcessProxy implements IgniteEx {
     /** Grid proxies. */
     private static final transient ConcurrentMap<String, IgniteProcessProxy> gridProxies = new ConcurrentHashMap<>();
 
+    /** Property that specify alternative {@code JAVA_HOME}. */
+    private static final String TEST_MULTIJVM_JAVA_HOME = "test.multijvm.java.home";
+
     /** Jvm process with ignite instance. */
     private final transient GridJavaProcess proc;
 
@@ -108,7 +112,7 @@ public class IgniteProcessProxy implements IgniteEx {
      * @param log Logger.
      * @param locJvmGrid Local JVM grid.
      */
-    public IgniteProcessProxy(final IgniteConfiguration cfg, final IgniteLogger log, final Ignite locJvmGrid)
+    public IgniteProcessProxy(IgniteConfiguration cfg, IgniteLogger log, Ignite locJvmGrid)
         throws Exception {
         this.cfg = cfg;
         this.locJvmGrid = locJvmGrid;
@@ -121,7 +125,9 @@ public class IgniteProcessProxy implements IgniteEx {
         Collection<String> filteredJvmArgs = new ArrayList<>();
 
         for (String arg : jvmArgs) {
-            if(!arg.toLowerCase().startsWith("-agentlib"))
+            if(arg.startsWith("-Xmx") || arg.startsWith("-Xms") ||
+                arg.startsWith("-cp") || arg.startsWith("-classpath") ||
+                arg.startsWith("-D" + IgniteTestResources.MARSH_CLASS_NAME))
                 filteredJvmArgs.add(arg);
         }
 
@@ -130,7 +136,7 @@ public class IgniteProcessProxy implements IgniteEx {
         locJvmGrid.events().localListen(new NodeStartedListener(id, rmtNodeStartedLatch), EventType.EVT_NODE_JOINED);
 
         proc = GridJavaProcess.exec(
-            IgniteNodeRunner.class,
+            IgniteNodeRunner.class.getCanonicalName(),
             cfgFileName, // Params.
             this.log,
             // Optional closure to be called each time wrapped process prints line to system.out or system.err.
@@ -140,6 +146,7 @@ public class IgniteProcessProxy implements IgniteEx {
                 }
             },
             null,
+            System.getProperty(TEST_MULTIJVM_JAVA_HOME),
             filteredJvmArgs, // JVM Args.
             System.getProperty("surefire.test.class.path")
         );
@@ -149,11 +156,7 @@ public class IgniteProcessProxy implements IgniteEx {
         IgniteProcessProxy prevVal = gridProxies.putIfAbsent(cfg.getGridName(), this);
 
         if (prevVal != null) {
-            remoteCompute().run(new IgniteRunnable() {
-                @Override public void run() {
-                    G.stop(cfg.getGridName(), true);
-                }
-            });
+            remoteCompute().run(new StopGridTask(cfg.getGridName(), true));
 
             throw new IllegalStateException("There was found instance assotiated with " + cfg.getGridName() +
                 ", instance= " + prevVal + ". New started node was stopped.");
@@ -208,30 +211,17 @@ public class IgniteProcessProxy implements IgniteEx {
      * @param gridName Grid name.
      * @param cancel Cacnel flag.
      */
-    public static void stop(final String gridName, final boolean cancel) {
+    public static void stop(String gridName, boolean cancel) {
         IgniteProcessProxy proxy = gridProxies.get(gridName);
 
         if (proxy != null) {
-            proxy.remoteCompute().run(new IgniteRunnable() {
-                @Override public void run() {
-                    G.stop(gridName, cancel);
-                }
-            });
+            proxy.remoteCompute().run(new StopGridTask(gridName, cancel));
 
             gridProxies.remove(gridName, proxy);
         }
     }
 
     /**
-     * For usage in closures.
-     *
-     * @return Ignite instance.
-     */
-    private Ignite igniteById() {
-        return Ignition.ignite(id);
-    }
-
-    /**
      * @param locNodeId ID of local node the requested grid instance is managing.
      * @return An instance of named grid. This method never returns {@code null}.
      * @throws IgniteIllegalStateException Thrown if grid was not properly initialized or grid instance was stopped or
@@ -357,11 +347,7 @@ public class IgniteProcessProxy implements IgniteEx {
 
     /** {@inheritDoc} */
     @Override public ClusterNode localNode() {
-        return remoteCompute().call(new IgniteCallable<ClusterNode>() {
-            @Override public ClusterNode call() throws Exception {
-                return ((IgniteEx)Ignition.ignite(id)).localNode();
-            }
-        });
+        return remoteCompute().call(new NodeTask());
     }
 
     /** {@inheritDoc} */
@@ -467,7 +453,10 @@ public class IgniteProcessProxy implements IgniteEx {
     }
 
     /** {@inheritDoc} */
-    @Override  public <K, V> IgniteCache<K, V> createNearCache(@Nullable String cacheName, NearCacheConfiguration<K, V> nearCfg) {
+    @Override  public <K, V> IgniteCache<K, V> createNearCache(
+        @Nullable String cacheName,
+        NearCacheConfiguration<K, V> nearCfg)
+    {
         throw new UnsupportedOperationException("Operation isn't supported yet.");
     }
 
@@ -508,7 +497,8 @@ public class IgniteProcessProxy implements IgniteEx {
     }
 
     /** {@inheritDoc} */
-    @Override  public IgniteAtomicSequence atomicSequence(String name, long initVal, boolean create) throws IgniteException {
+    @Override  public IgniteAtomicSequence atomicSequence(String name, long initVal, boolean create)
+        throws IgniteException {
         throw new UnsupportedOperationException("Operation isn't supported yet.");
     }
 
@@ -524,8 +514,12 @@ public class IgniteProcessProxy implements IgniteEx {
     }
 
     /** {@inheritDoc} */
-    @Override  public <T, S> IgniteAtomicStamped<T, S> atomicStamped(String name, @Nullable T initVal, @Nullable S initStamp,
-        boolean create) throws IgniteException {
+    @Override  public <T, S> IgniteAtomicStamped<T, S> atomicStamped(
+        String name,
+        @Nullable T initVal,
+        @Nullable S initStamp,
+        boolean create) throws IgniteException
+    {
         throw new UnsupportedOperationException("Operation isn't supported yet.");
     }
 
@@ -572,11 +566,7 @@ public class IgniteProcessProxy implements IgniteEx {
             }
         }, EventType.EVT_NODE_LEFT, EventType.EVT_NODE_FAILED);
 
-        compute().run(new IgniteRunnable() {
-            @Override public void run() {
-                igniteById().close();
-            }
-        });
+        compute().run(new StopGridTask(localJvmGrid().name(), true));
 
         try {
             assert U.await(rmtNodeStoppedLatch, 15, TimeUnit.SECONDS) : "NodeId=" + id;
@@ -616,4 +606,43 @@ public class IgniteProcessProxy implements IgniteEx {
 
         return locJvmGrid.compute(grp);
     }
+
+    /**
+     *
+     */
+    private static class StopGridTask implements IgniteRunnable {
+        /** Grid name. */
+        private final String gridName;
+
+        /** Cancel. */
+        private final boolean cancel;
+
+        /**
+         * @param gridName Grid name.
+         * @param cancel Cancel.
+         */
+        public StopGridTask(String gridName, boolean cancel) {
+            this.gridName = gridName;
+            this.cancel = cancel;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            G.stop(gridName, cancel);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class NodeTask implements IgniteCallable<ClusterNode> {
+        /** Ignite. */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** {@inheritDoc} */
+        @Override public ClusterNode call() throws Exception {
+            return ((IgniteEx)ignite).localNode();
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e5cc57/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheConfigurationP2PTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheConfigurationP2PTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheConfigurationP2PTest.java
index 5298dd4..3777154 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheConfigurationP2PTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheConfigurationP2PTest.java
@@ -103,6 +103,7 @@ public class CacheConfigurationP2PTest extends GridCommonAbstractTest {
                         }
                     },
                     null,
+                    null,
                     jvmArgs,
                     null
                 );
@@ -119,6 +120,7 @@ public class CacheConfigurationP2PTest extends GridCommonAbstractTest {
                         }
                     },
                     null,
+                    null,
                     jvmArgs,
                     null
                 );
@@ -139,6 +141,7 @@ public class CacheConfigurationP2PTest extends GridCommonAbstractTest {
                         }
                     },
                     null,
+                    null,
                     jvmArgs,
                     cp
                 );


[3/7] ignite git commit: ignite-1183 Fixed data structures create/destroy from client node

Posted by sb...@apache.org.
ignite-1183 Fixed data structures create/destroy from client node


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3036c8d8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3036c8d8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3036c8d8

Branch: refs/heads/ignite-1093-2
Commit: 3036c8d85db223a26f48f273ad08b6ea87e2e2ba
Parents: f025714
Author: sboikov <sb...@gridgain.com>
Authored: Thu Oct 8 16:33:53 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Oct 8 16:33:53 2015 +0300

----------------------------------------------------------------------
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |   8 +-
 .../colocated/GridDhtColocatedLockFuture.java   |  11 +-
 .../distributed/near/GridNearLockFuture.java    |  11 +-
 .../near/GridNearOptimisticTxPrepareFuture.java |  24 +--
 .../datastructures/DataStructuresProcessor.java |  48 ++++--
 .../CacheGetFutureHangsSelfTest.java            |   3 +
 ...niteCacheClientNodeChangingTopologyTest.java |   6 +-
 ...gniteAtomicLongChangingTopologySelfTest.java | 155 +++++++++++++++++--
 8 files changed, 216 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3036c8d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 41df53a..97aa646 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -585,8 +585,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                 if (req != null) {
                     res = new GridNearAtomicUpdateResponse(cctx.cacheId(), nodeId, req.futureVersion());
 
-                    res.addFailedKeys(req.keys(), new ClusterTopologyCheckedException("Primary node left grid before " +
-                        "response is received: " + nodeId));
+                    ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " +
+                        "before response is received: " + nodeId);
+
+                    e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion()));
+
+                    res.addFailedKeys(req.keys(), e);
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3036c8d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 33a5cbd..be09f54 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -598,7 +598,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
             // Continue mapping on the same topology version as it was before.
             this.topVer.compareAndSet(null, topVer);
 
-            map(keys, false);
+            map(keys, false, true);
 
             markInitialized();
 
@@ -654,7 +654,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
                     this.topVer.compareAndSet(null, topVer);
                 }
 
-                map(keys, remap);
+                map(keys, remap, false);
 
                 if (c != null)
                     c.run();
@@ -691,8 +691,9 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
      *
      * @param keys Keys.
      * @param remap Remap flag.
+     * @param topLocked {@code True} if thread already acquired lock preventing topology change.
      */
-    private void map(Collection<KeyCacheObject> keys, boolean remap) {
+    private void map(Collection<KeyCacheObject> keys, boolean remap, boolean topLocked) {
         try {
             AffinityTopologyVersion topVer = this.topVer.get();
 
@@ -819,7 +820,9 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
                                     boolean clientFirst = false;
 
                                     if (first) {
-                                        clientFirst = clientNode && (tx == null || !tx.hasRemoteLocks());
+                                        clientFirst = clientNode &&
+                                            !topLocked &&
+                                            (tx == null || !tx.hasRemoteLocks());
 
                                         first = false;
                                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3036c8d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index dcc8da6..e6b1e02 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -718,7 +718,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
             // Continue mapping on the same topology version as it was before.
             this.topVer.compareAndSet(null, topVer);
 
-            map(keys, false);
+            map(keys, false, true);
 
             markInitialized();
 
@@ -773,7 +773,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
                     this.topVer.compareAndSet(null, topVer);
                 }
 
-                map(keys, remap);
+                map(keys, remap, false);
 
                 markInitialized();
             }
@@ -807,8 +807,9 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
      *
      * @param keys Keys.
      * @param remap Remap flag.
+     * @param topLocked {@code True} if thread already acquired lock preventing topology change.
      */
-    private void map(Iterable<KeyCacheObject> keys, boolean remap) {
+    private void map(Iterable<KeyCacheObject> keys, boolean remap, boolean topLocked) {
         try {
             AffinityTopologyVersion topVer = this.topVer.get();
 
@@ -938,7 +939,9 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
                                         boolean clientFirst = false;
 
                                         if (first) {
-                                            clientFirst = clientNode && (tx == null || !tx.hasRemoteLocks());
+                                            clientFirst = clientNode &&
+                                                !topLocked &&
+                                                (tx == null || !tx.hasRemoteLocks());
 
                                             first = false;
                                         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3036c8d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 25028c4..1fb33a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -271,7 +271,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
 
             cctx.mvcc().addFuture(this);
 
-            prepare0(false);
+            prepare0(false, true);
 
             return;
         }
@@ -338,7 +338,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
                 return;
             }
 
-            prepare0(remap);
+            prepare0(remap, false);
 
             if (c != null)
                 c.run();
@@ -428,8 +428,9 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
      * Initializes future.
      *
      * @param remap Remap flag.
+     * @param topLocked {@code True} if thread already acquired lock preventing topology change.
      */
-    private void prepare0(boolean remap) {
+    private void prepare0(boolean remap, boolean topLocked) {
         try {
             boolean txStateCheck = remap ? tx.state() == PREPARING : tx.state(PREPARING);
 
@@ -451,7 +452,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
 
             prepare(
                 tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.<IgniteTxEntry>emptyList(),
-                tx.writeEntries());
+                tx.writeEntries(),
+                topLocked);
 
             markInitialized();
         }
@@ -466,11 +468,13 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
     /**
      * @param reads Read entries.
      * @param writes Write entries.
+     * @param topLocked {@code True} if thread already acquired lock preventing topology change.
      * @throws IgniteCheckedException If failed.
      */
     private void prepare(
         Iterable<IgniteTxEntry> reads,
-        Iterable<IgniteTxEntry> writes
+        Iterable<IgniteTxEntry> writes,
+        boolean topLocked
     ) throws IgniteCheckedException {
         AffinityTopologyVersion topVer = tx.topologyVersion();
 
@@ -497,7 +501,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
         GridDistributedTxMapping cur = null;
 
         for (IgniteTxEntry read : reads) {
-            GridDistributedTxMapping updated = map(read, topVer, cur, false);
+            GridDistributedTxMapping updated = map(read, topVer, cur, false, topLocked);
 
             if (cur != updated) {
                 mappings.offer(updated);
@@ -514,7 +518,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
         }
 
         for (IgniteTxEntry write : writes) {
-            GridDistributedTxMapping updated = map(write, topVer, cur, true);
+            GridDistributedTxMapping updated = map(write, topVer, cur, true, topLocked);
 
             if (cur != updated) {
                 mappings.offer(updated);
@@ -647,13 +651,15 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
      * @param topVer Topology version.
      * @param cur Current mapping.
      * @param waitLock Wait lock flag.
+     * @param topLocked {@code True} if thread already acquired lock preventing topology change.
      * @return Mapping.
      */
     private GridDistributedTxMapping map(
         IgniteTxEntry entry,
         AffinityTopologyVersion topVer,
         @Nullable GridDistributedTxMapping cur,
-        boolean waitLock
+        boolean waitLock,
+        boolean topLocked
     ) {
         GridCacheContext cacheCtx = entry.context();
 
@@ -685,7 +691,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
         }
 
         if (cur == null || !cur.node().id().equals(primary.id()) || cur.near() != cacheCtx.isNear()) {
-            boolean clientFirst = cur == null && cctx.kernalContext().clientNode();
+            boolean clientFirst = cur == null && !topLocked && cctx.kernalContext().clientNode();
 
             cur = new GridDistributedTxMapping(primary);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3036c8d8/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index ef2c543..7c5e97c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -505,14 +505,20 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
                 return dataStructure;
             }
-            catch (ClusterTopologyCheckedException e) {
-                IgniteInternalFuture<?> fut = e.retryReadyFuture();
-
-                fut.get();
-            }
             catch (IgniteTxRollbackCheckedException ignore) {
                 // Safe to retry right away.
             }
+            catch (IgniteCheckedException e) {
+                ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class);
+
+                if (topErr == null)
+                    throw e;
+
+                IgniteInternalFuture<?> fut = topErr.retryReadyFuture();
+
+                if (fut != null)
+                    fut.get();
+            }
         }
     }
 
@@ -593,14 +599,20 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
                 if (afterRmv != null && rmvInfo != null)
                     afterRmv.applyx(rmvInfo);
             }
-            catch (ClusterTopologyCheckedException e) {
-                IgniteInternalFuture<?> fut = e.retryReadyFuture();
-
-                fut.get();
-            }
             catch (IgniteTxRollbackCheckedException ignore) {
                 // Safe to retry right away.
             }
+            catch (IgniteCheckedException e) {
+                ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class);
+
+                if (topErr == null)
+                    throw e;
+
+                IgniteInternalFuture<?> fut = topErr.retryReadyFuture();
+
+                if (fut != null)
+                    fut.get();
+            }
         }
     }
 
@@ -995,14 +1007,20 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
                 return col;
             }
-            catch (ClusterTopologyCheckedException e) {
-                IgniteInternalFuture<?> fut = e.retryReadyFuture();
-
-                fut.get();
-            }
             catch (IgniteTxRollbackCheckedException ignore) {
                 // Safe to retry right away.
             }
+            catch (IgniteCheckedException e) {
+                ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class);
+
+                if (topErr == null)
+                    throw e;
+
+                IgniteInternalFuture<?> fut = topErr.retryReadyFuture();
+
+                if (fut != null)
+                    fut.get();
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3036c8d8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java
index e8622aa..51e76f6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java
@@ -31,6 +31,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.jetbrains.annotations.Nullable;
 
@@ -54,6 +55,8 @@ public class CacheGetFutureHangsSelfTest extends GridCommonAbstractTest {
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
         OptimizedMarshaller marsh = new OptimizedMarshaller();
         marsh.setRequireSerializable(false);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3036c8d8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
index 2d29c49..1b3dc7a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
@@ -118,7 +118,11 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
 
         cfg.setClientMode(client);
 
-        cfg.setCommunicationSpi(new TestCommunicationSpi());
+        TestCommunicationSpi commSpi = new TestCommunicationSpi();
+
+        commSpi.setSharedMemoryPort(-1);
+
+        cfg.setCommunicationSpi(commSpi);
 
         cfg.setCacheConfiguration(ccfg);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3036c8d8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
index 337334e..32a86e4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
@@ -17,21 +17,37 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteAtomicLong;
+import org.apache.ignite.IgniteQueue;
+import org.apache.ignite.configuration.AtomicConfiguration;
+import org.apache.ignite.configuration.CollectionConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.testframework.*;
-import org.apache.ignite.testframework.junits.common.*;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 
 /**
  *
@@ -52,6 +68,9 @@ public class IgniteAtomicLongChangingTopologySelfTest extends GridCommonAbstract
     /** */
     private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
 
+    /** */
+    private boolean client;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -60,16 +79,18 @@ public class IgniteAtomicLongChangingTopologySelfTest extends GridCommonAbstract
 
         discoSpi.setIpFinder(IP_FINDER);
 
-        cfg.setDiscoverySpi(discoSpi);
+        cfg.setDiscoverySpi(discoSpi).setNetworkTimeout(30_000);
 
         AtomicConfiguration atomicCfg = new AtomicConfiguration();
-        atomicCfg.setCacheMode(CacheMode.PARTITIONED);
+        atomicCfg.setCacheMode(PARTITIONED);
         atomicCfg.setBackups(1);
 
         cfg.setAtomicConfiguration(atomicCfg);
 
         ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
 
+        cfg.setClientMode(client);
+
         return cfg;
     }
 
@@ -111,6 +132,110 @@ public class IgniteAtomicLongChangingTopologySelfTest extends GridCommonAbstract
     /**
      * @throws Exception If failed.
      */
+    public void testClientAtomicLongCreateCloseFailover() throws Exception {
+        testFailoverWithClient(new IgniteInClosure<Ignite>() {
+            @Override public void apply(Ignite ignite) {
+                for (int i = 0; i < 100; i++) {
+                    IgniteAtomicLong l = ignite.atomicLong("long-" + 1, 0, true);
+
+                    l.close();
+                }
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientQueueCreateCloseFailover() throws Exception {
+        testFailoverWithClient(new IgniteInClosure<Ignite>() {
+            @Override public void apply(Ignite ignite) {
+                for (int i = 0; i < 100; i++) {
+                    CollectionConfiguration colCfg = new CollectionConfiguration();
+
+                    colCfg.setBackups(1);
+                    colCfg.setCacheMode(PARTITIONED);
+                    colCfg.setAtomicityMode(i % 2 == 0 ? TRANSACTIONAL : ATOMIC);
+
+                    IgniteQueue q = ignite.queue("q-" + i, 0, colCfg);
+
+                    q.close();
+                }
+            }
+        });
+    }
+
+    /**
+     * @param c Test iteration closure.
+     * @throws Exception If failed.
+     */
+    private void testFailoverWithClient(IgniteInClosure<Ignite> c) throws Exception {
+        startGridsMultiThreaded(GRID_CNT, false);
+
+        client = true;
+
+        Ignite ignite = startGrid(GRID_CNT);
+
+        assertTrue(ignite.configuration().isClientMode());
+
+        client = false;
+
+        final AtomicBoolean finished = new AtomicBoolean();
+
+        IgniteInternalFuture<?> fut = restartThread(finished);
+
+        long stop = System.currentTimeMillis() + 30_000;
+
+        try {
+            int iter = 0;
+
+            while (System.currentTimeMillis() < stop) {
+                log.info("Iteration: " + iter++);
+
+                c.apply(ignite);
+            }
+
+            finished.set(true);
+
+            fut.get();
+        }
+        finally {
+            finished.set(true);
+        }
+    }
+
+    /**
+     * @param finished Finished flag.
+     * @return Future.
+     */
+    private IgniteInternalFuture<?> restartThread(final AtomicBoolean finished) {
+        return GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                while (!finished.get()) {
+                    for (int i = 0; i < GRID_CNT; i++) {
+                        log.info("Stop node: " + i);
+
+                        stopGrid(i);
+
+                        U.sleep(500);
+
+                        log.info("Start node: " + i);
+
+                        startGrid(i);
+
+                        if (finished.get())
+                            break;
+                    }
+                }
+
+                return null;
+            }
+        }, "restart-thread");
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testIncrementConsistency() throws Exception {
         startGrids(GRID_CNT);