You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/16 09:07:38 UTC

[3/8] incubator-ignite git commit: Revert "ignite-471: fixed NPE in PortableMarshaller"

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
index 1e85ecd..8410e71 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
@@ -21,7 +21,6 @@ import org.apache.ignite.*;
 import org.apache.ignite.internal.managers.deployment.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 import org.jsr166.*;
 
@@ -39,8 +38,20 @@ class GridResourceIoc {
     private final ConcurrentMap<ClassLoader, Set<Class<?>>> taskMap =
         new ConcurrentHashMap8<>();
 
-    /** Class descriptors cache. */
-    private final ConcurrentMap<Class<?>, ClassDescriptor> clsDescs = new ConcurrentHashMap8<>();
+    /** Field cache. */
+    private final ConcurrentMap<Class<?>, ConcurrentMap<Class<? extends Annotation>, GridResourceField[]>> fieldCache =
+        new ConcurrentHashMap8<>();
+
+    /** Method cache. */
+    private final ConcurrentMap<Class<?>, ConcurrentMap<Class<? extends Annotation>, GridResourceMethod[]>> mtdCache =
+        new ConcurrentHashMap8<>();
+
+    /**
+     * Cache for classes that do not require injection with some annotation.
+     * Maps annotation classes to set a set of target classes to skip.
+     */
+    private final ConcurrentMap<Class<? extends Annotation>, Set<Class<?>>> skipCache =
+        new ConcurrentHashMap8<>();
 
     /** */
     private final ConcurrentMap<Class<?>, Class<? extends Annotation>[]> annCache =
@@ -53,8 +64,18 @@ class GridResourceIoc {
         Set<Class<?>> clss = taskMap.remove(ldr);
 
         if (clss != null) {
-            clsDescs.keySet().removeAll(clss);
-            annCache.keySet().removeAll(clss);
+            fieldCache.keySet().removeAll(clss);
+            mtdCache.keySet().removeAll(clss);
+
+            for (Map.Entry<Class<? extends Annotation>, Set<Class<?>>> e : skipCache.entrySet()) {
+                Set<Class<?>> skipClss = e.getValue();
+
+                if (skipClss != null)
+                    e.getValue().removeAll(clss);
+            }
+
+            for (Class<?> cls : clss)
+                annCache.remove(cls);
         }
     }
 
@@ -63,8 +84,8 @@ class GridResourceIoc {
      */
     void undeployAll() {
         taskMap.clear();
-        clsDescs.clear();
-        annCache.clear();
+        mtdCache.clear();
+        fieldCache.clear();
     }
 
     /**
@@ -86,26 +107,15 @@ class GridResourceIoc {
         @Nullable Class<?> depCls)
         throws IgniteCheckedException
     {
-        return injectInternal(target, annCls, injector, dep, depCls, null);
-    }
-
-    /**
-     * @param cls Class.
-     */
-    private ClassDescriptor descriptor(@Nullable GridDeployment dep, Class<?> cls) {
-        ClassDescriptor res = clsDescs.get(cls);
-
-        if (res == null) {
-            if (dep != null) {
-                Set<Class<?>> classes = F.addIfAbsent(taskMap, dep.classLoader(), F.<Class<?>>newCSet());
-
-                classes.add(cls);
-            }
+        assert target != null;
+        assert annCls != null;
+        assert injector != null;
 
-            res = F.addIfAbsent(clsDescs, cls, new ClassDescriptor(cls));
-        }
+        if (isAnnotationPresent(target, annCls, dep))
+            // Use identity hash set to compare via referential equality.
+            return injectInternal(target, annCls, injector, dep, depCls, new GridLeanIdentitySet<>());
 
-        return res;
+        return false;
     }
 
     /**
@@ -123,54 +133,73 @@ class GridResourceIoc {
         GridResourceInjector injector,
         @Nullable GridDeployment dep,
         @Nullable Class<?> depCls,
-        @Nullable Set<Object> checkedObjs)
+        Set<Object> checkedObjs)
         throws IgniteCheckedException
     {
-        Class<?> targetCls = target.getClass();
+        assert target != null;
+        assert annCls != null;
+        assert injector != null;
+        assert checkedObjs != null;
 
-        ClassDescriptor descr = descriptor(dep, targetCls);
+        Class<?> targetCls = target.getClass();
 
-        T2<GridResourceField[], GridResourceMethod[]> annotatedMembers = descr.annotatedMembers(annCls);
+        Set<Class<?>> skipClss = skipCache.get(annCls);
 
-        if (descr.recursiveFields().length == 0 && annotatedMembers == null)
+        // Skip this class if it does not need to be injected.
+        if (skipClss != null && skipClss.contains(targetCls))
             return false;
 
-        if (checkedObjs == null && descr.recursiveFields().length > 0)
-            checkedObjs = new GridLeanIdentitySet<>();
-
-        if (checkedObjs != null && !checkedObjs.add(target))
+        // Check if already inspected to avoid indefinite recursion.
+        if (!checkedObjs.add(target))
             return false;
 
+        int annCnt = 0;
+
         boolean injected = false;
 
-        for (Field field : descr.recursiveFields()) {
-            try {
-                Object obj = field.get(target);
+        for (GridResourceField field : getFieldsWithAnnotation(dep, targetCls, annCls)) {
+            if (field.processFieldValue()) {
+                Field f = field.getField();
 
-                if (obj != null) {
-                    assert checkedObjs != null;
+                try {
+                    Object obj = f.get(target);
 
-                    injected |= injectInternal(obj, annCls, injector, dep, depCls, checkedObjs);
+                    if (obj != null) {
+                        // Recursion.
+                        boolean injected0 = injectInternal(obj, annCls, injector, dep, depCls, checkedObjs);
+
+                        injected |= injected0;
+                    }
+                }
+                catch (IllegalAccessException e) {
+                    throw new IgniteCheckedException("Failed to inject resource [field=" + f.getName() +
+                        ", target=" + target + ']', e);
                 }
             }
-            catch (IllegalAccessException e) {
-                throw new IgniteCheckedException("Failed to inject resource [field=" + field.getName() +
-                    ", target=" + target + ']', e);
-            }
-        }
-
-        if (annotatedMembers != null) {
-            for (GridResourceField field : annotatedMembers.get1()) {
+            else {
                 injector.inject(field, target, depCls, dep);
 
                 injected = true;
             }
 
-            for (GridResourceMethod mtd : annotatedMembers.get2()) {
-                injector.inject(mtd, target, depCls, dep);
+            annCnt++;
+        }
 
-                injected = true;
-            }
+        for (GridResourceMethod mtd : getMethodsWithAnnotation(dep, targetCls, annCls)) {
+            injector.inject(mtd, target, depCls, dep);
+
+            injected = true;
+
+            annCnt++;
+        }
+
+        if (annCnt == 0) {
+            if (skipClss == null)
+                skipClss = F.addIfAbsent(skipCache, annCls, F.<Class<?>>newCSet());
+
+            assert skipClss != null;
+
+            skipClss.add(targetCls);
         }
 
         return injected;
@@ -188,9 +217,29 @@ class GridResourceIoc {
         assert target != null;
         assert annCls != null;
 
-        ClassDescriptor desc = descriptor(dep, target.getClass());
+        Class<?> targetCls = target.getClass();
+
+        Set<Class<?>> skipClss = skipCache.get(annCls);
+
+        if (skipClss != null && skipClss.contains(targetCls))
+            return false;
+
+        GridResourceField[] fields = getFieldsWithAnnotation(dep, targetCls, annCls);
+
+        if (fields.length > 0)
+            return true;
 
-        return desc.recursiveFields().length > 0 || desc.annotatedMembers(annCls) != null;
+        GridResourceMethod[] mtds = getMethodsWithAnnotation(dep, targetCls, annCls);
+
+        if (mtds.length > 0)
+            return true;
+
+        if (skipClss == null)
+            skipClss = F.addIfAbsent(skipCache, annCls, F.<Class<?>>newCSet());
+
+        skipClss.add(targetCls);
+
+        return false;
     }
 
     /**
@@ -211,14 +260,17 @@ class GridResourceIoc {
         Class<? extends Annotation>[] res = annCache.get(cls);
 
         if (res == null) {
-            Collection<Class<? extends Annotation>> res0 = new ArrayList<>();
+            Collection<Class<? extends Annotation>> res0 =
+                new HashSet<>(annClss.size(), 1.0f);
 
             for (Class<? extends Annotation> annCls : annClss) {
                 if (isAnnotationPresent(target, annCls, dep))
                     res0.add(annCls);
             }
 
-            res = res0.toArray(new Class[res0.size()]);
+            res = new Class[res0.size()];
+
+            res0.toArray(res);
 
             annCache.putIfAbsent(cls, res);
         }
@@ -227,6 +279,36 @@ class GridResourceIoc {
     }
 
     /**
+     * For tests only.
+     *
+     * @param cls Class for test.
+     * @return {@code true} if cached, {@code false} otherwise.
+     */
+    boolean isCached(Class<?> cls) {
+        return isCached(cls.getName());
+    }
+
+    /**
+     * For tests only.
+     *
+     * @param clsName Class for test.
+     * @return {@code true} if cached, {@code false} otherwise.
+     */
+    boolean isCached(String clsName) {
+        for (Class<?> aClass : fieldCache.keySet()) {
+            if (aClass.getName().equals(clsName))
+                return true;
+        }
+
+        for (Class<?> aClass : mtdCache.keySet()) {
+            if (aClass.getName().equals(clsName))
+                return true;
+        }
+
+        return false;
+    }
+
+    /**
      * Gets set of methods with given annotation.
      *
      * @param dep Deployment.
@@ -236,111 +318,156 @@ class GridResourceIoc {
      */
     GridResourceMethod[] getMethodsWithAnnotation(@Nullable GridDeployment dep, Class<?> cls,
         Class<? extends Annotation> annCls) {
-        ClassDescriptor desc = descriptor(dep, cls);
+        GridResourceMethod[] mtds = getMethodsFromCache(cls, annCls);
 
-        T2<GridResourceField[], GridResourceMethod[]> t2 = desc.annotatedMembers(annCls);
+        if (mtds == null) {
+            List<GridResourceMethod> mtdsList = new ArrayList<>();
 
-        return t2 == null ? GridResourceMethod.EMPTY_ARRAY : t2.get2();
-    }
+            for (Class cls0 = cls; !cls0.equals(Object.class); cls0 = cls0.getSuperclass()) {
+                for (Method mtd : cls0.getDeclaredMethods()) {
+                    Annotation ann = mtd.getAnnotation(annCls);
 
-    /** {@inheritDoc} */
-    public void printMemoryStats() {
-        X.println(">>>   taskMapSize: " + taskMap.size());
-        X.println(">>>   classDescriptorsCacheSize: " + clsDescs.size());
+                    if (ann != null)
+                        mtdsList.add(new GridResourceMethod(mtd, ann));
+                }
+            }
+
+            if (mtdsList.isEmpty())
+                mtds = GridResourceMethod.EMPTY_ARRAY;
+            else
+                mtds = mtdsList.toArray(new GridResourceMethod[mtdsList.size()]);
+
+            cacheMethods(dep, cls, annCls, mtds);
+        }
+
+        return mtds;
     }
 
     /**
+     * Gets all entries from the specified class or its super-classes that have
+     * been annotated with annotation provided.
      *
+     * @param cls Class in which search for methods.
+     * @param dep Deployment.
+     * @param annCls Annotation.
+     * @return Set of entries with given annotations.
      */
-    private static class ClassDescriptor {
-        /** */
-        private final Field[] recursiveFields;
-
-        /** */
-        private final Map<Class<? extends Annotation>, T2<GridResourceField[], GridResourceMethod[]>> annMap;
-
-        /**
-         * @param cls Class.
-         */
-        ClassDescriptor(Class<?> cls) {
-            Map<Class<? extends Annotation>, T2<List<GridResourceField>, List<GridResourceMethod>>> annMap
-                = new HashMap<>();
+    private GridResourceField[] getFieldsWithAnnotation(@Nullable GridDeployment dep, Class<?> cls,
+        Class<? extends Annotation> annCls) {
+        GridResourceField[] fields = getFieldsFromCache(cls, annCls);
 
-            List<Field> recursiveFieldsList = new ArrayList<>();
+        if (fields == null) {
+            List<GridResourceField> fieldsList = new ArrayList<>();
 
             boolean allowImplicitInjection = !GridNoImplicitInjection.class.isAssignableFrom(cls);
 
             for (Class cls0 = cls; !cls0.equals(Object.class); cls0 = cls0.getSuperclass()) {
                 for (Field field : cls0.getDeclaredFields()) {
-                    Annotation[] fieldAnns = field.getAnnotations();
+                    Annotation ann = field.getAnnotation(annCls);
 
-                    for (Annotation ann : fieldAnns) {
-                        T2<List<GridResourceField>, List<GridResourceMethod>> t2 = annMap.get(ann.annotationType());
+                    if (ann != null)
+                        fieldsList.add(new GridResourceField(field, ann));
+                    else if (allowImplicitInjection && GridResourceUtils.mayRequireResources(field)) {
+                        // Account for anonymous inner classes.
+                        fieldsList.add(new GridResourceField(field, null));
+                    }
+                }
+            }
 
-                        if (t2 == null) {
-                            t2 = new T2<List<GridResourceField>, List<GridResourceMethod>>(
-                                new ArrayList<GridResourceField>(),
-                                new ArrayList<GridResourceMethod>());
+            if (fieldsList.isEmpty())
+                fields = GridResourceField.EMPTY_ARRAY;
+            else
+                fields = fieldsList.toArray(new GridResourceField[fieldsList.size()]);
 
-                            annMap.put(ann.annotationType(), t2);
-                        }
+            cacheFields(dep, cls, annCls, fields);
+        }
 
-                        t2.get1().add(new GridResourceField(field, ann));
-                    }
+        return fields;
+    }
 
-                    if (allowImplicitInjection
-                        && fieldAnns.length == 0
-                        && GridResourceUtils.mayRequireResources(field)) {
-                        field.setAccessible(true);
+    /**
+     * Gets all fields for a given class with given annotation from cache.
+     *
+     * @param cls Class to get fields from.
+     * @param annCls Annotation class for fields.
+     * @return List of fields with given annotation, possibly {@code null}.
+     */
+    @Nullable private GridResourceField[] getFieldsFromCache(Class<?> cls, Class<? extends Annotation> annCls) {
+        Map<Class<? extends Annotation>, GridResourceField[]> annCache = fieldCache.get(cls);
 
-                        // Account for anonymous inner classes.
-                        recursiveFieldsList.add(field);
-                    }
-                }
+        return annCache != null ? annCache.get(annCls) : null;
+    }
 
-                for (Method mtd : cls0.getDeclaredMethods()) {
-                    for (Annotation ann : mtd.getAnnotations()) {
-                        T2<List<GridResourceField>, List<GridResourceMethod>> t2 = annMap.get(ann.annotationType());
+    /**
+     * Caches list of fields with given annotation from given class.
+     *
+     * @param cls Class the fields belong to.
+     * @param dep Deployment.
+     * @param annCls Annotation class for the fields.
+     * @param fields Fields to cache.
+     */
+    private void cacheFields(@Nullable GridDeployment dep, Class<?> cls, Class<? extends Annotation> annCls,
+        GridResourceField[] fields) {
+        if (dep != null) {
+            Set<Class<?>> classes = F.addIfAbsent(taskMap, dep.classLoader(), F.<Class<?>>newCSet());
 
-                        if (t2 == null) {
-                            t2 = new T2<List<GridResourceField>, List<GridResourceMethod>>(
-                                new ArrayList<GridResourceField>(),
-                                new ArrayList<GridResourceMethod>());
+            assert classes != null;
 
-                            annMap.put(ann.annotationType(), t2);
-                        }
+            classes.add(cls);
+        }
 
-                        t2.get2().add(new GridResourceMethod(mtd, ann));
-                    }
-                }
-            }
+        Map<Class<? extends Annotation>, GridResourceField[]> rsrcFields =
+            F.addIfAbsent(fieldCache, cls, F.<Class<? extends Annotation>, GridResourceField[]>newCMap());
 
-            recursiveFields = recursiveFieldsList.isEmpty() ? U.EMPTY_FIELDS
-                : recursiveFieldsList.toArray(new Field[recursiveFieldsList.size()]);
+        assert rsrcFields != null;
 
-            this.annMap = IgniteUtils.limitedMap(annMap.size());
+        rsrcFields.put(annCls, fields);
+    }
 
-            for (Map.Entry<Class<? extends Annotation>, T2<List<GridResourceField>, List<GridResourceMethod>>> entry
-                : annMap.entrySet()) {
-                GridResourceField[] fields = GridResourceField.toArray(entry.getValue().get1());
-                GridResourceMethod[] mtds = GridResourceMethod.toArray(entry.getValue().get2());
+    /**
+     * Gets all methods for a given class with given annotation from cache.
+     *
+     * @param cls Class to get methods from.
+     * @param annCls Annotation class for fields.
+     * @return List of methods with given annotation, possibly {@code null}.
+     */
+    @Nullable private GridResourceMethod[] getMethodsFromCache(Class<?> cls, Class<? extends Annotation> annCls) {
+        Map<Class<? extends Annotation>, GridResourceMethod[]> annCache = mtdCache.get(cls);
 
-                this.annMap.put(entry.getKey(), new T2<>(fields, mtds));
-            }
-        }
+        return annCache != null ? annCache.get(annCls) : null;
+    }
 
-        /**
-         * @return Recursive fields.
-         */
-        public Field[] recursiveFields() {
-            return recursiveFields;
-        }
+    /**
+     * Caches list of methods with given annotation from given class.
+     *
+     * @param rsrcCls Class the fields belong to.
+     * @param dep Deployment.
+     * @param annCls Annotation class for the fields.
+     * @param mtds Methods to cache.
+     */
+    private void cacheMethods(@Nullable GridDeployment dep, Class<?> rsrcCls, Class<? extends Annotation> annCls,
+        GridResourceMethod[] mtds) {
+        if (dep != null) {
+            Set<Class<?>> classes = F.addIfAbsent(taskMap, dep.classLoader(), F.<Class<?>>newCSet());
+
+            assert classes != null;
 
-        /**
-         * @return Fields.
-         */
-        @Nullable public T2<GridResourceField[], GridResourceMethod[]> annotatedMembers(Class<? extends Annotation> annCls) {
-            return annMap.get(annCls);
+            classes.add(rsrcCls);
         }
+
+        Map<Class<? extends Annotation>, GridResourceMethod[]> rsrcMtds = F.addIfAbsent(mtdCache,
+            rsrcCls, F.<Class<? extends Annotation>, GridResourceMethod[]>newCMap());
+
+        assert rsrcMtds != null;
+
+        rsrcMtds.put(annCls, mtds);
+    }
+
+    /** {@inheritDoc} */
+    public void printMemoryStats() {
+        X.println(">>>   taskMapSize: " + taskMap.size());
+        X.println(">>>   fieldCacheSize: " + fieldCache.size());
+        X.println(">>>   mtdCacheSize: " + mtdCache.size());
+        X.println(">>>   skipCacheSize: " + skipCache.size());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceMethod.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceMethod.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceMethod.java
index ad08a40..aba9405 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceMethod.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceMethod.java
@@ -21,7 +21,6 @@ import org.apache.ignite.internal.util.typedef.internal.*;
 
 import java.lang.annotation.*;
 import java.lang.reflect.*;
-import java.util.*;
 
 /**
  * Wrapper for data where resource should be injected.
@@ -49,8 +48,6 @@ class GridResourceMethod {
 
         this.mtd = mtd;
         this.ann = ann;
-
-        mtd.setAccessible(true);
     }
 
     /**
@@ -71,16 +68,6 @@ class GridResourceMethod {
         return ann;
     }
 
-    /**
-     * @param c Closure.
-     */
-    public static GridResourceMethod[] toArray(Collection<GridResourceMethod> c) {
-        if (c.isEmpty())
-            return EMPTY_ARRAY;
-
-        return c.toArray(new GridResourceMethod[c.size()]);
-    }
-
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridResourceMethod.class, this);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
index f5ba492..f08a287 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
@@ -23,6 +23,7 @@ import org.apache.ignite.compute.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.deployment.*;
 import org.apache.ignite.internal.processors.*;
+import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.lifecycle.*;
 import org.apache.ignite.resources.*;
@@ -146,8 +147,8 @@ public class GridResourceProcessor extends GridProcessorAdapter {
                 Method mtd = rsrcMtd.getMethod();
 
                 try {
-                    // No need to call mtd.setAccessible(true);
-                    // It has been called in GridResourceMethod constructor.
+                    mtd.setAccessible(true);
+
                     mtd.invoke(target);
                 }
                 catch (IllegalArgumentException | InvocationTargetException | IllegalAccessException e) {
@@ -277,7 +278,7 @@ public class GridResourceProcessor extends GridProcessorAdapter {
             log.debug("Injecting resources: " + job);
 
         // Unwrap Proxy object.
-        Object obj = unwrapTarget(job);
+        Object obj = unwrapTarget(unwrapJob(job));
 
         injectToJob(dep, taskCls, obj, ses, jobCtx);
 
@@ -328,6 +329,19 @@ public class GridResourceProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Gets rid of job wrapper, if any.
+     *
+     * @param job Job to unwrap.
+     * @return Unwrapped job.
+     */
+    private ComputeJob unwrapJob(ComputeJob job) {
+        if (job instanceof GridComputeJobWrapper)
+            return ((GridComputeJobWrapper)job).wrappedJob();
+
+        return job;
+    }
+
+    /**
      * Injects held resources into given grid task.
      *
      * @param dep Deployed class.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 80fcbb0..2ab8e0a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -109,13 +109,10 @@ public abstract class IgniteUtils {
     private static final int[] GRID_EVTS;
 
     /** Empty integers array. */
-    public static final int[] EMPTY_INTS = new int[0];
+    private static final int[] EMPTY_INTS = new int[0];
 
     /** Empty  longs. */
-    public static final long[] EMPTY_LONGS = new long[0];
-
-    /** Empty  longs. */
-    public static final Field[] EMPTY_FIELDS = new Field[0];
+    private static final long[] EMPTY_LONGS = new long[0];
 
     /** System line separator. */
     private static final String NL = System.getProperty("line.separator");
@@ -8859,21 +8856,6 @@ public abstract class IgniteUtils {
     }
 
     /**
-     * Creates new map that limited by size.
-     *
-     * @param limit Limit for size.
-     */
-    public static <K, V> Map<K, V> limitedMap(int limit) {
-        if (limit == 0)
-            return Collections.emptyMap();
-
-        if (limit < 5)
-            return new GridLeanMap<>(limit);
-
-        return new HashMap<>(capacity(limit), 0.75f);
-    }
-
-    /**
      * Returns comparator that sorts remote node addresses. If remote node resides on the same host, then put
      * loopback addresses first, last otherwise.
      *

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridComputeJobWrapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridComputeJobWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridComputeJobWrapper.java
new file mode 100644
index 0000000..82c0078
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridComputeJobWrapper.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.lang;
+
+import org.apache.ignite.compute.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.util.concurrent.*;
+
+/**
+ * Convenient wrapper for grid job. It allows to create a job clone in cases when the same
+ * job needs to be cloned to multiple grid nodes during mapping phase of task execution.
+ */
+public class GridComputeJobWrapper implements ComputeJob, Callable<Object>,
+    GridPeerDeployAware {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final ComputeJob job;
+
+    /** Peer deploy aware class. */
+    private transient volatile GridPeerDeployAware p;
+
+    /**
+     * Creates a wrapper with given grid {@code job}.
+     *
+     * @param job Job to wrap.
+     */
+    public GridComputeJobWrapper(ComputeJob job) {
+        A.notNull(job, "job");
+
+        this.job = job;
+    }
+
+    /**
+     * Gets wrapped job.
+     *
+     * @return Wrapped job.
+     */
+    public ComputeJob wrappedJob() {
+        return job;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public final Object call() throws Exception {
+        return execute();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Class<?> deployClass() {
+        if (p == null)
+            p = U.detectPeerDeployAware(this);
+
+        return p.deployClass();
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClassLoader classLoader() {
+        if (p == null)
+            p = U.detectPeerDeployAware(this);
+
+        return p.classLoader();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cancel() {
+        job.cancel();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object execute() {
+        return job.execute();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridComputeJobWrapper.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java
index a03d2c8..3f81dc4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.util.nio;
 
 import org.apache.ignite.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
 
 import java.io.*;
 import java.nio.*;
@@ -32,6 +33,9 @@ import java.nio.*;
  *     | MSG_SIZE  |   MESSAGE  | MSG_SIZE  |   MESSAGE  |
  *     +--+--+--+--+--+--+...+--+--+--+--+--+--+--+...+--+
  * </pre>
+ * <p>
+ * It expects that first 4 bytes in stream are {@link U#IGNITE_HEADER}. If beginning of a stream,
+ * isn't equal to these bytes than exception will be thrown.
  */
 public class GridBufferedParser implements GridNioParser {
     /** Buffer metadata key. */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java
deleted file mode 100644
index 256597c..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.util.nio;
-
-import org.apache.ignite.*;
-
-import java.io.*;
-import java.nio.*;
-
-/**
- * This class implements stream parser based on {@link GridNioDelimitedBuffer}.
- * <p>
- * The rule for this parser is that every message sent over the stream is appended with
- * delimiter (bytes array). So, the stream structure is as follows:
- * <pre>
- *     +--+--+...+--+--+--+--+--+--+--+...+--+--+--+--+--+-
- *     |   MESSAGE  | DELIMITER  |  MESSAGE  | DELIMITER  |
- *     +--+--+...+--+--+--+--+--+--+--+...+--+--+--+--+--+-
- * </pre>
- */
-public class GridDelimitedParser implements GridNioParser {
-    /** Buffer metadata key. */
-    private static final int BUF_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
-
-    /** Delimiter. */
-    private final byte[] delim;
-
-    /** Direct buffer. */
-    private final boolean directBuf;
-
-    /**
-     * @param delim Delimiter.
-     * @param directBuf Direct buffer.
-     */
-    public GridDelimitedParser(byte[] delim, boolean directBuf) {
-        this.delim = delim;
-        this.directBuf = directBuf;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte[] decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException {
-        GridNioDelimitedBuffer nioBuf = ses.meta(BUF_META_KEY);
-
-        // Decode for a given session is called per one thread, so there should not be any concurrency issues.
-        // However, we make some additional checks.
-        if (nioBuf == null) {
-            nioBuf = new GridNioDelimitedBuffer(delim);
-
-            GridNioDelimitedBuffer old = ses.addMeta(BUF_META_KEY, nioBuf);
-
-            assert old == null;
-        }
-
-        return nioBuf.read(buf);
-    }
-
-    /** {@inheritDoc} */
-    @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException {
-        byte[] msg0 = (byte[])msg;
-
-        int cap = msg0.length + delim.length;
-        ByteBuffer res = directBuf ? ByteBuffer.allocateDirect(cap) : ByteBuffer.allocate(cap);
-
-        res.put(msg0);
-        res.put(delim);
-
-        res.flip();
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return this.getClass().getSimpleName();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java
deleted file mode 100644
index 2b764ec..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.util.nio;
-
-import org.jetbrains.annotations.*;
-
-import java.nio.*;
-import java.util.*;
-
-/**
- * Buffer with message delimiter support.
- */
-public class GridNioDelimitedBuffer {
-    /** Delimiter. */
-    private final byte[] delim;
-
-    /** Data. */
-    private byte[] data = new byte[16384];
-
-    /** Count. */
-    private int cnt;
-
-    /** Index. */
-    private int idx;
-
-    /**
-     * @param delim Delimiter.
-     */
-    public GridNioDelimitedBuffer(byte[] delim) {
-        assert delim != null;
-        assert delim.length > 0;
-
-        this.delim = delim;
-
-        reset();
-    }
-
-    /**
-     * Resets buffer state.
-     */
-    private void reset() {
-        cnt = 0;
-        idx = 0;
-    }
-
-    /**
-     * @param buf Buffer.
-     * @return Message bytes or {@code null} if message is not fully read yet.
-     */
-    @Nullable public byte[] read(ByteBuffer buf) {
-        while(buf.hasRemaining()) {
-            if (cnt == data.length)
-                data = Arrays.copyOf(data, data.length * 2);
-
-            byte b = buf.get();
-
-            data[cnt++] = b;
-
-            if (b == delim[idx])
-                idx++;
-            else if (idx > 0) {
-                int pos = cnt - idx;
-
-                idx = 0;
-
-                for (int i = pos; i < cnt; i++) {
-                    if (data[pos] == delim[idx]) {
-                        pos++;
-
-                        idx++;
-                    }
-                    else {
-                        pos = cnt - idx;
-
-                        idx = 0;
-                    }
-                }
-            }
-
-            if (idx == delim.length) {
-                byte[] bytes = Arrays.copyOfRange(data, 0, cnt - delim.length);
-
-                reset();
-
-                return bytes;
-            }
-        }
-
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index fd17791..2d5c541 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -203,7 +203,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     public static final int DFLT_ACK_SND_THRESHOLD = 16;
 
     /** Default socket write timeout. */
-    public static final long DFLT_SOCK_WRITE_TIMEOUT = 200;
+    public static final long DFLT_SOCK_WRITE_TIMEOUT = GridNioServer.DFLT_SES_WRITE_TIMEOUT;
 
     /** No-op runnable. */
     private static final IgniteRunnable NOOP = new IgniteRunnable() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 4cff45b..27fa473 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -2689,26 +2689,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
                 msgLsnr.apply(msg);
 
             if (redirectToClients(msg)) {
-                ByteBuffer marshalledMsg = null;
-
-                for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) {
-                    // Send a clone to client to avoid ConcurrentModificationException
-                    TcpDiscoveryAbstractMessage msgClone;
-
-                    try {
-                        if (marshalledMsg == null)
-                            marshalledMsg = marsh.marshal(msg);
-
-                        msgClone = marsh.unmarshal(marshalledMsg, null);
-                    }
-                    catch (IgniteCheckedException e) {
-                        U.error(log, "Failed to marshal message: " + msg, e);
-
-                        msgClone = msg;
-                    }
-
-                    clientMsgWorker.addMessage(msgClone);
-                }
+                for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values())
+                    clientMsgWorker.addMessage(msg);
             }
 
             Collection<TcpDiscoveryNode> failedNodes;
@@ -3951,6 +3933,18 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
                 long topVer;
 
                 if (locNodeCoord) {
+                    if (!msg.client() && ipFinder.isShared()) {
+                        try {
+                            ipFinder.unregisterAddresses(leftNode.socketAddresses());
+                        }
+                        catch (IgniteSpiException e) {
+                            if (log.isDebugEnabled())
+                                log.debug("Failed to unregister left node address: " + leftNode);
+
+                            onException("Failed to unregister left node address: " + leftNode, e);
+                        }
+                    }
+
                     topVer = ring.incrementTopologyVersion();
 
                     msg.topologyVersion(topVer);
@@ -4118,6 +4112,20 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
                 long topVer;
 
                 if (locNodeCoord) {
+                    if (!node.isClient() && ipFinder.isShared()) {
+                        try {
+                            ipFinder.unregisterAddresses(node.socketAddresses());
+                        }
+                        catch (IgniteSpiException e) {
+                            if (log.isDebugEnabled())
+                                log.debug("Failed to unregister failed node address [node=" + node +
+                                    ", err=" + e.getMessage() + ']');
+
+                            onException("Failed to unregister failed node address [node=" + node +
+                                ", err=" + e.getMessage() + ']', e);
+                        }
+                    }
+
                     topVer = ring.incrementTopologyVersion();
 
                     msg.topologyVersion(topVer);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
index c52cbc1..6b3f068 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
@@ -55,11 +55,11 @@ abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements Discov
     /** Default socket operations timeout in milliseconds (value is <tt>200ms</tt>). */
     public static final long DFLT_SOCK_TIMEOUT = 200;
 
-    /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>200ms</tt>). */
-    public static final long DFLT_ACK_TIMEOUT = 200;
+    /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>5,000ms</tt>). */
+    public static final long DFLT_ACK_TIMEOUT = 5000;
 
-    /** Default network timeout in milliseconds (value is <tt>200ms</tt>). */
-    public static final long DFLT_NETWORK_TIMEOUT = 200;
+    /** Default network timeout in milliseconds (value is <tt>5,000ms</tt>). */
+    public static final long DFLT_NETWORK_TIMEOUT = 5000;
 
     /** Default value for thread priority (value is <tt>10</tt>). */
     public static final int DFLT_THREAD_PRI = 10;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
deleted file mode 100644
index 0c4e2d1..0000000
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.stream;
-
-import org.apache.ignite.*;
-
-import java.util.*;
-
-/**
- * Convenience adapter for streamers. Adapters are optional components for
- * streaming from different data sources. The purpose of adapters is to
- * convert different message formats into Ignite stream key-value tuples
- * and feed the tuples into the provided {@link org.apache.ignite.IgniteDataStreamer}.
- */
-public abstract class StreamAdapter<T, K, V> {
-    /** Tuple extractor. */
-    private StreamTupleExtractor<T, K, V> extractor;
-
-    /** Streamer. */
-    private IgniteDataStreamer<K, V> stmr;
-
-    /** Ignite. */
-    private Ignite ignite;
-
-    /**
-     * Empty constructor.
-     */
-    protected StreamAdapter() {
-        // No-op.
-    }
-
-    /**
-     * Stream adapter.
-     *
-     * @param stmr Streamer.
-     * @param extractor Tuple extractor.
-     */
-    protected StreamAdapter(IgniteDataStreamer<K, V> stmr, StreamTupleExtractor<T, K, V> extractor) {
-        this.stmr = stmr;
-        this.extractor = extractor;
-    }
-
-    /**
-     * @return Provided data streamer.
-     */
-    public IgniteDataStreamer<K, V> getStreamer() {
-        return stmr;
-    }
-
-    /**
-     * @param stmr Ignite data streamer.
-     */
-    public void setStreamer(IgniteDataStreamer<K, V> stmr) {
-        this.stmr = stmr;
-    }
-
-    /**
-     * @return Provided tuple extractor.
-     */
-    public StreamTupleExtractor<T, K, V> getTupleExtractor() {
-        return extractor;
-    }
-
-    /**
-     * @param extractor Extractor for key-value tuples from messages.
-     */
-    public void setTupleExtractor(StreamTupleExtractor<T, K, V> extractor) {
-        this.extractor = extractor;
-    }
-
-    /**
-     * @return Provided {@link Ignite} instance.
-     */
-    public Ignite getIgnite() {
-        return ignite;
-    }
-
-    /**
-     * @param ignite {@link Ignite} instance.
-     */
-    public void setIgnite(Ignite ignite) {
-        this.ignite = ignite;
-    }
-
-    /**
-     * Converts given message to a tuple and adds it to the underlying streamer.
-     *
-     * @param msg Message to convert.
-     */
-    protected void addMessage(T msg) {
-        Map.Entry<K, V> e = extractor.extract(msg);
-
-        if (e != null)
-            stmr.addData(e);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
deleted file mode 100644
index d2a4ede..0000000
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.stream;
-
-import java.util.*;
-
-/**
- * Stream tuple extractor to convert messages to Ignite key-value tuples.
- */
-public interface StreamTupleExtractor<T, K, V> {
-    /**
-     * Extracts a key-value tuple from a message.
-     *
-     * @param msg Message.
-     * @return Key-value tuple.
-     */
-    public Map.Entry<K, V> extract(T msg);
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java
deleted file mode 100644
index 8161d86..0000000
--- a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.stream.socket;
-
-/**
- * Socket message converter.
- */
-public interface SocketMessageConverter<T> {
-    /**
-     * Converter message represented by array of bytes to object.
-     *
-     * @param msg Message.
-     * @return Converted object.
-     */
-    public T convert(byte[] msg);
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
deleted file mode 100644
index d308897..0000000
--- a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.stream.socket;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.util.nio.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.marshaller.jdk.*;
-import org.apache.ignite.stream.*;
-
-import org.jetbrains.annotations.*;
-
-import java.net.*;
-import java.nio.*;
-
-/**
- * Server that receives data from TCP socket, converts it to key-value pairs using {@link StreamTupleExtractor} and
- * streams into {@link IgniteDataStreamer} instance.
- * <p>
- * By default server uses size-based message processing. That is every message sent over the socket is prepended with
- * 4-byte integer header containing message size. If message delimiter is defined (see {@link #setDelimiter}) then
- * delimiter-based message processing will be used. That is every message sent over the socket is appended with
- * provided delimiter.
- * <p>
- * Received messages through socket converts to Java object using standard serialization. Conversion functionality
- * can be customized via user defined {@link SocketMessageConverter} (e.g. in order to convert messages from
- * non Java clients).
- */
-public class SocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
-    /** Default threads. */
-    private static final int DFLT_THREADS = Runtime.getRuntime().availableProcessors();
-
-    /** Logger. */
-    private IgniteLogger log;
-
-    /** Address. */
-    private InetAddress addr;
-
-    /** Server port. */
-    private int port;
-
-    /** Threads number. */
-    private int threads = DFLT_THREADS;
-
-    /** Direct mode. */
-    private boolean directMode;
-
-    /** Delimiter. */
-    private byte[] delim;
-
-    /** Converter. */
-    private SocketMessageConverter<T> converter;
-
-    /** Server. */
-    private GridNioServer<byte[]> srv;
-
-    /**
-     * Sets server address.
-     *
-     * @param addr Address.
-     */
-    public void setAddr(InetAddress addr) {
-        this.addr = addr;
-    }
-
-    /**
-     * Sets port number.
-     *
-     * @param port Port.
-     */
-    public void setPort(int port) {
-        this.port = port;
-    }
-
-    /**
-     * Sets threadds amount.
-     *
-     * @param threads Threads.
-     */
-    public void setThreads(int threads) {
-        this.threads = threads;
-    }
-
-    /**
-     * Sets direct mode flag.
-     *
-     * @param directMode Direct mode.
-     */
-    public void setDirectMode(boolean directMode) {
-        this.directMode = directMode;
-    }
-
-    /**
-     * Sets message delimiter.
-     *
-     * @param delim Delimiter.
-     */
-    public void setDelimiter(byte[] delim) {
-        this.delim = delim;
-    }
-
-    /**
-     * Sets message converter.
-     *
-     * @param converter Converter.
-     */
-    public void setConverter(SocketMessageConverter<T> converter) {
-        this.converter = converter;
-    }
-
-    /**
-     * Starts streamer.
-     *
-     * @throws IgniteException If failed.
-     */
-    public void start() {
-        A.notNull(getTupleExtractor(), "tupleExtractor");
-        A.notNull(getStreamer(), "streamer");
-        A.notNull(getIgnite(), "ignite");
-        A.ensure(threads > 0, "threads > 0");
-
-        log = getIgnite().log();
-
-        GridNioServerListener<byte[]> lsnr = new GridNioServerListenerAdapter<byte[]>() {
-            @Override public void onConnected(GridNioSession ses) {
-                assert ses.accepted();
-
-                if (log.isDebugEnabled())
-                    log.debug("Accepted connection: " + ses.remoteAddress());
-            }
-
-            @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
-                if (e != null)
-                    log.error("Connection failed with exception", e);
-            }
-
-            @Override public void onMessage(GridNioSession ses, byte[] msg) {
-                addMessage(converter.convert(msg));
-            }
-        };
-
-        ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
-
-        GridNioParser parser = F.isEmpty(delim) ? new GridBufferedParser(directMode, byteOrder) :
-            new GridDelimitedParser(delim, directMode);
-
-        if (converter == null)
-            converter = new DefaultConverter<>();
-
-        GridNioFilter codec = new GridNioCodecFilter(parser, log, directMode);
-
-        GridNioFilter[] filters = new GridNioFilter[] {codec};
-
-        try {
-            srv = new GridNioServer.Builder<byte[]>()
-                .address(addr == null ? InetAddress.getLocalHost() : addr)
-                .port(port)
-                .listener(lsnr)
-                .logger(log)
-                .selectorCount(threads)
-                .byteOrder(byteOrder)
-                .filters(filters)
-                .build();
-        }
-        catch (IgniteCheckedException | UnknownHostException e) {
-            throw new IgniteException(e);
-        }
-
-        srv.start();
-
-        if (log.isDebugEnabled())
-            log.debug("Socket streaming server started on " + addr + ':' + port);
-    }
-
-    /**
-     * Stops streamer.
-     */
-    public void stop() {
-        srv.stop();
-
-        if (log.isDebugEnabled())
-            log.debug("Socket streaming server stopped");
-    }
-
-    /**
-     * Converts message to Java object using Jdk marshaller.
-     */
-    private static class DefaultConverter<T> implements SocketMessageConverter<T> {
-        /** Marshaller. */
-        private static final JdkMarshaller MARSH = new JdkMarshaller();
-
-        /** {@inheritDoc} */
-        @Override public T convert(byte[] msg) {
-            try {
-                return MARSH.unmarshal(ByteBuffer.wrap(msg), null);
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java
deleted file mode 100644
index e1cef65..0000000
--- a/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Contains socket streamer implementation.
- */
-package org.apache.ignite.stream.socket;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index 4f4c1ae..1d4a652 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -1140,6 +1140,7 @@ org.apache.ignite.internal.util.lang.GridAbsClosure
 org.apache.ignite.internal.util.lang.GridAbsClosureX
 org.apache.ignite.internal.util.lang.GridCloseableIterator
 org.apache.ignite.internal.util.lang.GridClosureException
+org.apache.ignite.internal.util.lang.GridComputeJobWrapper
 org.apache.ignite.internal.util.lang.GridFunc$1
 org.apache.ignite.internal.util.lang.GridFunc$10
 org.apache.ignite.internal.util.lang.GridFunc$100

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java
deleted file mode 100644
index 8c7d33d..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.*;
-import org.apache.ignite.internal.processors.cache.distributed.near.*;
-import org.apache.ignite.internal.processors.cache.local.*;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.*;
-import static org.apache.ignite.cache.CacheMemoryMode.*;
-import static org.apache.ignite.cache.CacheMode.*;
-
-/**
- * Cache map entry self test.
- */
-public class CacheOffheapMapEntrySelfTest extends GridCacheAbstractSelfTest {
-    /** {@inheritDoc} */
-    @Override protected int gridCount() {
-        return 0;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        stopAllGrids();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        startGrids(1);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        // No-op.
-    }
-
-    /**
-     * @param gridName Grid name.
-     * @param memoryMode Memory mode.
-     * @param atomicityMode Atomicity mode.
-     * @param cacheMode Cache mode.
-     * @param cacheName Cache name.
-     * @return Cache configuration.
-     * @throws Exception If failed.
-     */
-    private CacheConfiguration cacheConfiguration(String gridName,
-        CacheMemoryMode memoryMode,
-        CacheAtomicityMode atomicityMode,
-        CacheMode cacheMode,
-        String cacheName)
-        throws Exception
-    {
-        CacheConfiguration cfg = super.cacheConfiguration(gridName);
-
-        cfg.setCacheMode(cacheMode);
-        cfg.setAtomicityMode(atomicityMode);
-        cfg.setMemoryMode(memoryMode);
-        cfg.setName(cacheName);
-
-        return cfg;
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testCacheMapEntry() throws Exception {
-        checkCacheMapEntry(ONHEAP_TIERED, ATOMIC, LOCAL, GridLocalCacheEntry.class);
-
-        checkCacheMapEntry(OFFHEAP_TIERED, ATOMIC, LOCAL, GridLocalCacheEntry.class);
-
-        checkCacheMapEntry(OFFHEAP_VALUES, ATOMIC, LOCAL, GridLocalCacheEntry.class);
-
-        checkCacheMapEntry(ONHEAP_TIERED, TRANSACTIONAL, LOCAL, GridLocalCacheEntry.class);
-
-        checkCacheMapEntry(OFFHEAP_TIERED, TRANSACTIONAL, LOCAL, GridLocalCacheEntry.class);
-
-        checkCacheMapEntry(OFFHEAP_VALUES, TRANSACTIONAL, LOCAL, GridLocalCacheEntry.class);
-
-        checkCacheMapEntry(ONHEAP_TIERED, ATOMIC, PARTITIONED, GridNearCacheEntry.class);
-
-        checkCacheMapEntry(OFFHEAP_TIERED, ATOMIC, PARTITIONED, GridNearOffHeapCacheEntry.class);
-
-        checkCacheMapEntry(OFFHEAP_VALUES, ATOMIC, PARTITIONED, GridNearOffHeapCacheEntry.class);
-
-        checkCacheMapEntry(ONHEAP_TIERED, TRANSACTIONAL, PARTITIONED, GridNearCacheEntry.class);
-
-        checkCacheMapEntry(OFFHEAP_TIERED, TRANSACTIONAL, PARTITIONED, GridNearOffHeapCacheEntry.class);
-
-        checkCacheMapEntry(OFFHEAP_VALUES, TRANSACTIONAL, PARTITIONED, GridNearOffHeapCacheEntry.class);
-
-        checkCacheMapEntry(ONHEAP_TIERED, ATOMIC, REPLICATED, GridDhtAtomicCacheEntry.class);
-
-        checkCacheMapEntry(OFFHEAP_TIERED, ATOMIC, REPLICATED, GridDhtAtomicOffHeapCacheEntry.class);
-
-        checkCacheMapEntry(OFFHEAP_VALUES, ATOMIC, REPLICATED, GridDhtAtomicOffHeapCacheEntry.class);
-
-        checkCacheMapEntry(ONHEAP_TIERED, TRANSACTIONAL, REPLICATED, GridDhtColocatedCacheEntry.class);
-
-        checkCacheMapEntry(OFFHEAP_TIERED, TRANSACTIONAL, REPLICATED, GridDhtColocatedOffHeapCacheEntry.class);
-
-        checkCacheMapEntry(OFFHEAP_VALUES, TRANSACTIONAL, REPLICATED, GridDhtColocatedOffHeapCacheEntry.class);
-    }
-
-    /**
-     * @param memoryMode Cache memory mode.
-     * @param atomicityMode Cache atomicity mode.
-     * @param cacheMode Cache mode.
-     * @param entryCls Class of cache map entry.
-     * @throws Exception If failed.
-     */
-    private void checkCacheMapEntry(CacheMemoryMode memoryMode,
-        CacheAtomicityMode atomicityMode,
-        CacheMode cacheMode,
-        Class<?> entryCls)
-        throws Exception
-    {
-        log.info("Test cache [memMode=" + memoryMode +
-            ", atomicityMode=" + atomicityMode +
-            ", cacheMode=" + cacheMode + ']');
-
-        CacheConfiguration cfg = cacheConfiguration(grid(0).name(),
-            memoryMode,
-            atomicityMode,
-            cacheMode,
-            "Cache");
-
-        try (IgniteCache jcache = grid(0).getOrCreateCache(cfg)) {
-            GridCacheAdapter<Integer, String> cache = ((IgniteKernal)grid(0)).internalCache(jcache.getName());
-
-            Integer key = primaryKey(grid(0).cache(null));
-
-            cache.put(key, "val");
-
-            GridCacheEntryEx entry = cache.entryEx(key);
-
-            entry.unswap(true);
-
-            assertNotNull(entry);
-
-            assertEquals(entry.getClass(), entryCls);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java
deleted file mode 100644
index f5de96f..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.testframework.*;
-
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-/**
- * Test remove all method.
- */
-public class CacheRemoveAllSelfTest extends GridCacheAbstractSelfTest {
-    /** {@inheritDoc} */
-    @Override protected long getTestTimeout() {
-        return 60000;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected int gridCount() {
-        return 4;
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testRemoveAll() throws Exception {
-        IgniteCache<Integer, String> cache = grid(0).cache(null);
-
-        for (int i = 0; i < 10_000; ++i)
-            cache.put(i, "val");
-
-        final AtomicInteger igniteId = new AtomicInteger(gridCount());
-
-        IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                for (int i = 0; i < 2; ++i)
-                    startGrid(igniteId.getAndIncrement());
-
-                return true;
-            }
-        }, 3, "start-node-thread");
-
-        cache.removeAll();
-
-        fut.get();
-
-        U.sleep(5000);
-
-        for (int i = 0; i < igniteId.get(); ++i) {
-            IgniteCache locCache = grid(i).cache(null);
-
-            assertEquals("Local size: " + locCache.localSize() + "\n" +
-                "On heap: " + locCache.localSize(CachePeekMode.ONHEAP) + "\n" +
-                "Off heap: " + locCache.localSize(CachePeekMode.OFFHEAP) + "\n" +
-                "Swap: " + locCache.localSize(CachePeekMode.SWAP) + "\n" +
-                "Primary: " + locCache.localSize(CachePeekMode.PRIMARY) + "\n" +
-                "Backup: " + locCache.localSize(CachePeekMode.BACKUP),
-                0, locCache.localSize());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
index 5d9ad35..5389ef9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
@@ -25,12 +25,12 @@ import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.spi.discovery.tcp.*;
 import org.apache.ignite.testframework.*;
 import org.apache.ignite.transactions.*;
 import org.jetbrains.annotations.*;
 
 import javax.cache.*;
+import java.util.*;
 import java.util.concurrent.atomic.*;
 
 import static org.apache.ignite.cache.CacheRebalanceMode.*;
@@ -70,12 +70,6 @@ public abstract class GridCacheAbstractFailoverSelfTest extends GridCacheAbstrac
 
         cfg.getTransactionConfiguration().setTxSerializableEnabled(true);
 
-        TcpDiscoverySpi discoSpi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
-
-        discoSpi.setSocketTimeout(10_000);
-        discoSpi.setAckTimeout(10_000);
-        discoSpi.setNetworkTimeout(10_000);
-
         return cfg;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
index 7e65f23..85e2c7c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.spi.discovery.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
@@ -105,10 +106,6 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
 
         disco.setIpFinder(ipFinder);
 
-        disco.setSocketTimeout(30_000);
-        disco.setAckTimeout(30_000);
-        disco.setNetworkTimeout(30_000);
-
         c.setDiscoverySpi(disco);
 
         return c;
@@ -515,7 +512,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
                                 try {
                                     cache.put(key, Integer.toString(key));
                                 }
-                                catch (IgniteException | CacheException ignored) {
+                                catch (TransactionRollbackException | ClusterTopologyException | CacheException ignored) {
                                     // It is ok if primary node leaves grid.
                                 }
 
@@ -671,7 +668,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
                                         tx.commit();
                                     }
                                 }
-                                catch (IgniteException | CacheException ignored) {
+                                catch (ClusterTopologyException | CacheException ignored) {
                                     // It is ok if primary node leaves grid.
                                 }
 
@@ -817,7 +814,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
 
                                     tx.commit();
                                 }
-                                catch (IgniteException | CacheException ignored) {
+                                catch (ClusterTopologyException | CacheException ignored) {
                                     // It is ok if primary node leaves grid.
                                 }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLockAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLockAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLockAbstractTest.java
index 2fe76e7..ab0f7d0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLockAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLockAbstractTest.java
@@ -501,6 +501,8 @@ public abstract class GridCacheLockAbstractTest extends GridCommonAbstractTest {
      * @throws Throwable If failed.
      */
     public void testLockReentrancy() throws Throwable {
+        fail("https://issues.apache.org/jira/browse/IGNITE-835");
+
         Affinity<Integer> aff = ignite1.affinity(null);
 
         for (int i = 10; i < 100; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxGetAfterStopTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxGetAfterStopTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxGetAfterStopTest.java
deleted file mode 100644
index 469f513..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxGetAfterStopTest.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.transactions.*;
-import org.jetbrains.annotations.*;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.*;
-import static org.apache.ignite.cache.CacheMode.*;
-
-/**
- *
- */
-public class IgniteTxGetAfterStopTest extends IgniteCacheAbstractTest {
-    /** */
-    private CacheMode cacheMode;
-
-    /** */
-    private NearCacheConfiguration nearCfg;
-
-    /** {@inheritDoc} */
-    @Override protected int gridCount() {
-        return 4;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected CacheMode cacheMode() {
-        return cacheMode;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected CacheAtomicityMode atomicityMode() {
-        return TRANSACTIONAL;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected NearCacheConfiguration nearConfiguration() {
-        return nearCfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        super.afterTest();
-
-        stopAllGrids();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testReplicated() throws Exception {
-        getAfterStop(REPLICATED, null);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testPartitioned() throws Exception {
-        getAfterStop(PARTITIONED, new NearCacheConfiguration());
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testPartitionedNearDisabled() throws Exception {
-        getAfterStop(PARTITIONED, null);
-    }
-
-    /**
-     * @param cacheMode Cache mode.
-     * @param nearCfg Near cache configuration.
-     * @throws Exception If failed.
-     */
-    private void getAfterStop(CacheMode cacheMode, @Nullable NearCacheConfiguration nearCfg) throws Exception {
-        this.cacheMode = cacheMode;
-        this.nearCfg = nearCfg;
-
-        startGrids();
-
-        IgniteCache<Integer, Integer> cache0 = jcache(0);
-        IgniteCache<Integer, Integer> cache1 = jcache(1);
-
-        Integer key0 = primaryKey(cache0);
-        Integer key1 = primaryKey(cache1);
-
-        try (Transaction tx = ignite(0).transactions().txStart()) {
-            log.info("Put: " + key0);
-
-            cache0.put(key0, key0);
-
-            log.info("Stop node.");
-
-            stopGrid(3);
-
-            log.info("Get: " + key1);
-
-            cache0.get(key1);
-
-            log.info("Commit.");
-
-            tx.commit();
-        }
-
-        assertEquals(key0, cache0.get(key0));
-        assertNull(cache1.get(key1));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedNearDisabledLockSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedNearDisabledLockSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedNearDisabledLockSelfTest.java
deleted file mode 100644
index 69c7909..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedNearDisabledLockSelfTest.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht;
-
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.processors.cache.distributed.near.*;
-
-/**
- *
- */
-public class GridCachePartitionedNearDisabledLockSelfTest extends GridCachePartitionedLockSelfTest {
-    /** {@inheritDoc} */
-    @Override protected CacheConfiguration cacheConfiguration() {
-        CacheConfiguration ccfg = super.cacheConfiguration();
-
-        assertNotNull(ccfg.getNearConfiguration());
-
-        ccfg.setNearConfiguration(null);
-
-        return ccfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected boolean isPartitioned() {
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void testLockReentrancy() throws Throwable {
-        fail("https://issues.apache.org/jira/browse/IGNITE-835");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
index f996877..ee2f16b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
@@ -199,7 +199,7 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
 
             commSpi.blockMessages(ignite(2).cluster().localNode().id()); // Do not allow to finish prepare for key2.
 
-            IgniteInternalFuture<?> prepFut = txEx.prepareAsync();
+            IgniteInternalFuture<IgniteInternalTx> prepFut = txEx.prepareAsync();
 
             waitPrepared(ignite(1));
 
@@ -360,7 +360,7 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
 
         commSpi.blockMessages(ignite(2).cluster().localNode().id()); // Do not allow to finish prepare for key2.
 
-        IgniteInternalFuture<?> prepFut = txEx.prepareAsync();
+        IgniteInternalFuture<IgniteInternalTx> prepFut = txEx.prepareAsync();
 
         waitPrepared(ignite(1));
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicReplicatedFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicReplicatedFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicReplicatedFailoverSelfTest.java
index 1e57c09..0a2781b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicReplicatedFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicReplicatedFailoverSelfTest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
 import org.apache.ignite.cache.*;
-import org.apache.ignite.configuration.*;
 
 import static org.apache.ignite.cache.CacheMode.*;
 
@@ -30,9 +29,4 @@ public class GridCacheAtomicReplicatedFailoverSelfTest extends GridCacheAtomicFa
     @Override protected CacheMode cacheMode() {
         return REPLICATED;
     }
-
-    /** {@inheritDoc} */
-    @Override protected NearCacheConfiguration nearConfiguration() {
-        return null;
-    }
 }