You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/12/23 15:16:59 UTC

[01/12] ignite git commit: Merge branch 'ignite-1.5' of https://github.com/apache/ignite into ignite-2206

Repository: ignite
Updated Branches:
  refs/heads/ignite-2206 31d3289df -> 04827ca7f


Merge branch 'ignite-1.5' of https://github.com/apache/ignite into ignite-2206


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e35ae061
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e35ae061
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e35ae061

Branch: refs/heads/ignite-2206
Commit: e35ae061f7f3b88b1539ffb03440e86243a31b90
Parents: e5b92ac 66b33bc
Author: iveselovskiy <iv...@gridgain.com>
Authored: Wed Dec 23 12:25:15 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Wed Dec 23 12:25:15 2015 +0300

----------------------------------------------------------------------
 .../binary/BinaryMarshallerSelfTest.java        | 82 ++++++++++----------
 .../cache/IgniteCacheAbstractQuerySelfTest.java |  2 +-
 2 files changed, 42 insertions(+), 42 deletions(-)
----------------------------------------------------------------------



[07/12] ignite git commit: IGNITE-2206: cleanup by review results.

Posted by vo...@apache.org.
IGNITE-2206: cleanup by review results.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/83e12492
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/83e12492
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/83e12492

Branch: refs/heads/ignite-2206
Commit: 83e12492e6c792828a4eb1eb2b90cbd66081a770
Parents: 5a4586e
Author: iveselovskiy <iv...@gridgain.com>
Authored: Wed Dec 23 16:46:35 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Wed Dec 23 16:46:35 2015 +0300

----------------------------------------------------------------------
 .../binary/BinaryMarshallerSelfTest.java        |  82 +++++-----
 .../org/apache/ignite/hadoop/HadoopFsIssue.java |  71 ---------
 .../fs/CachingHadoopFileSystemFactory.java      |  48 +++---
 .../fs/IgniteHadoopIgfsSecondaryFileSystem.java |  70 ++++-----
 .../hadoop/fs/v1/IgniteHadoopFileSystem.java    |  90 ++++++-----
 .../hadoop/fs/v2/HadoopV2FileSystemFactory.java |  11 --
 .../hadoop/fs/v2/IgniteHadoopFileSystem.java    |  27 +---
 .../KerberosSecondaryFileSystemProvider.java    |  55 -------
 .../hadoop/SecondaryFileSystemProvider.java     | 151 -------------------
 .../hadoop/fs/HadoopLazyConcurrentMap.java      |  10 +-
 ...oopSecondaryFileSystemConfigurationTest.java |  15 +-
 .../IgniteHadoopFileSystemAbstractSelfTest.java |   2 +-
 .../testsuites/IgniteHadoopTestSuite.java       |   2 +-
 .../cache/IgniteCacheAbstractQuerySelfTest.java |   2 +-
 parent/pom.xml                                  |   2 +-
 15 files changed, 177 insertions(+), 461 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/83e12492/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
index 9f7beb8..ac9771f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
@@ -17,6 +17,40 @@
 
 package org.apache.ignite.internal.binary;
 
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.net.InetSocketAddress;
+import java.sql.Timestamp;
+import java.util.AbstractQueue;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
 import junit.framework.Assert;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.binary.BinaryCollectionFactory;
@@ -53,43 +87,9 @@ import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 import sun.misc.Unsafe;
 
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.net.InetSocketAddress;
-import java.sql.Timestamp;
-import java.util.AbstractQueue;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Queue;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListSet;
-
-import static org.apache.ignite.internal.binary.streams.BinaryMemoryAllocator.*;
-import static org.junit.Assert.*;
+import static org.apache.ignite.internal.binary.streams.BinaryMemoryAllocator.INSTANCE;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotEquals;
 
 /**
  * Binary marshaller tests.
@@ -2339,8 +2339,8 @@ public class BinaryMarshallerSelfTest extends GridCommonAbstractTest {
 
         // Check direct field access.
         assertNull(objBin.field(fieldName));
-        assertEquals(Integer.valueOf(1), objBin.field(fieldNameA));
-        assertEquals(Integer.valueOf(2), objBin.field(fieldNameB));
+        assertEquals(1, objBin.field(fieldNameA));
+        assertEquals(2, objBin.field(fieldNameB));
 
         // Check metadata.
         BinaryType type = objBin.type();
@@ -2363,8 +2363,8 @@ public class BinaryMarshallerSelfTest extends GridCommonAbstractTest {
         assert fieldB.exists(objBin);
 
         assertNull(field.value(objBin));
-        assertEquals(Integer.valueOf(1), fieldA.value(objBin));
-        assertEquals(Integer.valueOf(2), fieldB.value(objBin));
+        assertEquals(1, fieldA.value(objBin));
+        assertEquals(2, fieldB.value(objBin));
 
         // Check object deserialization.
         DuplicateFieldsB deserialized = objBin.deserialize();

http://git-wip-us.apache.org/repos/asf/ignite/blob/83e12492/modules/hadoop/src/main/java/org/apache/ignite/hadoop/HadoopFsIssue.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/HadoopFsIssue.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/HadoopFsIssue.java
deleted file mode 100644
index 82314f1..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/HadoopFsIssue.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.hadoop;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.ignite.internal.processors.hadoop.SecondaryFileSystemProvider;
-
-/**
- * Comment.
- */
-public class HadoopFsIssue {
-    /**
-     *
-     * @param args
-     */
-    public static void main(String args[]) {
-        String uri = null;
-        String cfgPath = null;
-        String user = null;
-
-        for (String arg : args) {
-            if (arg.startsWith("uri="))
-                uri = arg.split("=")[1].trim();
-            else if (arg.startsWith("cfg="))
-                cfgPath = arg.split("=")[1].trim();
-            else if (arg.startsWith("user="))
-                user = arg.split("=")[1].trim();
-            else
-                throw new IllegalArgumentException("Unknown argument:" + arg);
-        }
-
-        System.out.println("Connecting to HDFS with the following settings [uri=" + uri + ", cfg=" + cfgPath + ", user=" + user + ']');
-
-        try {
-            SecondaryFileSystemProvider provider = new SecondaryFileSystemProvider(uri, cfgPath);
-
-            FileSystem fs = provider.createFileSystem(user);
-
-            RemoteIterator<LocatedFileStatus> iter = fs.listFiles(new Path("/tmp"), true);
-
-            System.out.println("Got the iterator");
-
-            while (iter.hasNext()) {
-                LocatedFileStatus status = iter.next();
-
-                System.out.println(status);
-            }
-        }
-        catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83e12492/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java
index 1e97b30..52d4db5 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java
@@ -73,50 +73,57 @@ public class CachingHadoopFileSystemFactory implements HadoopFileSystemFactory,
     /** */
     protected List<String> cfgPathStr;
 
-    int getCount = 0;
-
     /**
-     *
+     * Public non-arg constructor.
      */
     public CachingHadoopFileSystemFactory() {
-        //
-
-
-
+        // noop
     }
 
+    /** {@inheritDoc} */
     @Override public FileSystem create(String userName) throws IOException {
         A.ensure(cfg != null, "cfg");
 
-        if (getCount == 0)
-            assert fileSysLazyMap.size() == 0;
-
-        getCount++;
-
         return fileSysLazyMap.getOrCreate(userName);
     }
 
-    // TODO: Add getter.
-
     /**
      * Uri setter.
-     * @param uriStr
+     *
+     * @param uriStr The URI to set.
      */
     public void setUri(String uriStr) {
         this.uriStr = uriStr;
     }
 
-    // TODO: Add getter.
+    /**
+     * Gets the URI.
+     *
+     * @return The URI.
+     */
+    public URI getUri() {
+        return uri;
+    }
 
     /**
      * Configuration(s) setter, to be invoked from Spring config.
-     * @param cfgPaths
+     *
+     * @param cfgPaths The config paths collection to set.
      */
     public void setConfigPaths(List<String> cfgPaths) {
         this.cfgPathStr = cfgPaths;
     }
 
     /**
+     * Gets the config paths collection.
+     *
+     * @return The config paths collection.
+     */
+    public List<String> getConfigPaths() {
+        return cfgPathStr;
+    }
+
+    /**
      * @return {@link org.apache.hadoop.fs.FileSystem}  instance for this secondary Fs.
      * @throws IOException
      */
@@ -159,7 +166,9 @@ public class CachingHadoopFileSystemFactory implements HadoopFileSystemFactory,
 
         if (cfgPathStr != null) {
             for (String confPath : cfgPathStr) {
-                if (confPath != null) {
+                if (confPath == null)
+                    throw new IgniteException("Null config path encountered.");
+                else {
                     URL url = U.resolveIgniteUrl(confPath);
 
                     if (url == null) {
@@ -170,9 +179,6 @@ public class CachingHadoopFileSystemFactory implements HadoopFileSystemFactory,
 
                     cfg.addResource(url);
                 }
-                else {
-                    // TODO: Throw exception.
-                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/83e12492/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
index aa1952d..9942ec4 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
@@ -68,15 +68,11 @@ import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_USER_NAME;
 public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem, LifecycleAware,
     HadoopPayloadAware {
     /** The default user name. It is used if no user context is set. */
-    private String dfltUsrName = IgfsUtils.fixUserName(null);
+    private @Nullable String dfltUsrName;
 
     /** */
     private HadoopFileSystemFactory fsFactory;
 
-    /** FileSystem instance created for the default user. Stored outside due to performance reasons. */
-    // TODO: Remove.
-    private volatile FileSystem dfltFs;
-
     /**
      * Default constructor for Spring.
      */
@@ -90,6 +86,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
      * @param uri URI of file system.
      * @throws IgniteCheckedException In case of error.
      */
+    @Deprecated
     public IgniteHadoopIgfsSecondaryFileSystem(String uri) throws IgniteCheckedException {
         this(uri, null, null);
     }
@@ -128,14 +125,13 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
             fac.setConfigPaths(Collections.singletonList(cfgPath));
 
         setFileSystemFactory(fac);
-        setUserName(userName);
+        setDefaultUserName(userName);
     }
 
-    // TODO: Add getter.
-    // TODO: Add docs.
     /**
+     * Sets secondary file system factory.
      *
-     * @param factory
+     * @param factory The factory to set.
      */
     public void setFileSystemFactory(HadoopFileSystemFactory factory) {
         A.ensure(factory != null, "Factory value must not be null.");
@@ -143,17 +139,31 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
         this.fsFactory = factory;
     }
 
-    // TODO: Add getter.
-    // TODO: Add docs.
-    // TODO: Rename to "setDefaultUserName"
+    /**
+     * Gets the secondary file system factory.
+     *
+     * @return The secondary file system factory.
+     */
+    public HadoopFileSystemFactory getFileSystemFactory() {
+        return fsFactory;
+    }
+
+    /**
+     * Sets the default user name.
+     *
+     * @param usrName The user name to set.
+     */
+    public void setDefaultUserName(String usrName) {
+        this.dfltUsrName = usrName;
+    }
 
     /**
+     * Gets the default user name.
      *
-     * @param usrName
+     * @return The default user name.
      */
-    public void setUserName(String usrName) {
-        // TODO: Move fix to start routine.
-        this.dfltUsrName = IgfsUtils.fixUserName(usrName);
+    public String getDefaultUserName() {
+        return dfltUsrName;
     }
 
     /**
@@ -486,13 +496,10 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
         String user = IgfsUserContext.currentUser();
 
         if (F.isEmpty(user))
-            user = dfltUsrName; // default is never empty.
+            user = IgfsUtils.fixUserName(dfltUsrName);
 
         assert !F.isEmpty(user);
 
-        if (F.eq(user, dfltUsrName))
-            return dfltFs; // optimization
-
         try {
             return fsFactory.create(user);
         }
@@ -513,27 +520,8 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
 
     /** {@inheritDoc} */
     @Override public void stop() throws IgniteException {
-        Exception e = null;
-
-        try {
-            if (dfltFs != null)
-                dfltFs.close();
-        }
-        catch (Exception e0) {
-            e = e0;
-        }
-
-        try {
-            if (fsFactory instanceof LifecycleAware)
-                ((LifecycleAware)fsFactory).stop();
-        }
-        catch (IgniteException ie) {
-            if (e == null)
-                e = ie;
-        }
-
-        if (e != null)
-            throw new IgniteException(e);
+        if (fsFactory instanceof LifecycleAware)
+             ((LifecycleAware)fsFactory).stop();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/83e12492/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index 1546995..545c905 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -164,9 +164,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
     /** IGFS mode resolver. */
     private IgfsModeResolver modeRslvr;
 
-    // TODO: Secondary file system must be changed to factory.
-    /** Secondary file system instance. */
-    private FileSystem secondaryFs;
+    /** The secondary file system factory. */
+    private HadoopFileSystemFactory factory;
 
     /** Management connection flag. */
     private boolean mgmt;
@@ -263,7 +262,6 @@ public class IgniteHadoopFileSystem extends FileSystem {
                     "://[name]/[optional_path], actual=" + name + ']');
 
             uri = name;
-            System.out.println("uri initialized: " + uri);
 
             uriAuthority = uri.getAuthority();
 
@@ -331,14 +329,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
             }
 
             if (initSecondary) {
-//                Map<String, String> props = paths.properties();
-//
-//                String secUri = props.get(SECONDARY_FS_URI);
-//                String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH);
-
-//                byte[] secFsFacoryBytes = handshake.getSecondaryFileSystemFactoryBytes();
-
-                HadoopFileSystemFactory factory = (HadoopFileSystemFactory)paths.getPayload();
+                factory = (HadoopFileSystemFactory)paths.getPayload();
 
                 A.ensure(factory != null, "Secondary file system factory should not be null.");
 
@@ -346,13 +337,11 @@ public class IgniteHadoopFileSystem extends FileSystem {
                     ((LifecycleAware) factory).start();
 
                 try {
-                    secondaryFs = factory.create(user);
+                    FileSystem secFs = factory.create(user);
 
-                    secondaryUri = secondaryFs.getUri();
+                    secondaryUri = secFs.getUri();
 
                     A.ensure(secondaryUri != null, "Secondary file system uri should not be null.");
-
-                    //assert secondaryUri.equals(uri2);
                 }
                 catch (IOException e) {
                     if (!mgmt)
@@ -371,23 +360,6 @@ public class IgniteHadoopFileSystem extends FileSystem {
         }
     }
 
-//    /**
-//     *
-//     * @param in
-//     * @throws IOException
-//     * @throws ClassNotFoundException
-//     */
-//    static HadoopFileSystemFactory readFactory(byte[] factoryBytes) throws IOException, ClassNotFoundException {
-//        ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(factoryBytes));
-//
-//        try {
-//            return (HadoopFileSystemFactory<F>) oi.readObject();
-//        }
-//        finally {
-//            oi.close();
-//        }
-//    }
-
     /** {@inheritDoc} */
     @Override protected void checkPath(Path path) {
         URI uri = path.toUri();
@@ -441,9 +413,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
         if (clientLog.isLogEnabled())
             clientLog.close();
 
-        U.closeQuiet(secondaryFs);
-
-        System.out.println("closed " + uri);
+        if (factory instanceof LifecycleAware)
+            ((LifecycleAware) factory).stop();
 
         // Reset initialized resources.
         uri = null;
@@ -458,6 +429,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
             A.notNull(p, "p");
 
             if (mode(p) == PROXY) {
+                final FileSystem secondaryFs = secondaryFs();
+
                 if (secondaryFs == null) {
                     assert mgmt;
 
@@ -486,6 +459,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
             A.notNull(p, "p");
 
             if (mode(p) == PROXY) {
+                final FileSystem secondaryFs = secondaryFs();
+
                 if (secondaryFs == null) {
                     assert mgmt;
 
@@ -515,6 +490,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
         try {
             if (mode(p) == PROXY) {
+                final FileSystem secondaryFs = secondaryFs();
+
                 if (secondaryFs == null) {
                     assert mgmt;
 
@@ -544,6 +521,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
             IgfsMode mode = mode(path);
 
             if (mode == PROXY) {
+                final FileSystem secondaryFs = secondaryFs();
+
                 if (secondaryFs == null) {
                     assert mgmt;
 
@@ -616,6 +595,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
                     path + ", overwrite=" + overwrite + ", bufSize=" + bufSize + ']');
 
             if (mode == PROXY) {
+                final FileSystem secondaryFs = secondaryFs();
+
                 if (secondaryFs == null) {
                     assert mgmt;
 
@@ -697,6 +678,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
                     ", path=" + path + ", bufSize=" + bufSize + ']');
 
             if (mode == PROXY) {
+                final FileSystem secondaryFs = secondaryFs();
+
                 if (secondaryFs == null) {
                     assert mgmt;
 
@@ -760,6 +743,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
             IgfsMode mode = mode(srcPath);
 
             if (mode == PROXY) {
+                final FileSystem secondaryFs = secondaryFs();
+
                 if (secondaryFs == null) {
                     assert mgmt;
 
@@ -820,6 +805,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
             IgfsMode mode = mode(path);
 
             if (mode == PROXY) {
+                final FileSystem secondaryFs = secondaryFs();
+
                 if (secondaryFs == null) {
                     assert mgmt;
 
@@ -865,6 +852,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
             IgfsMode mode = mode(path);
 
             if (mode == PROXY) {
+                final FileSystem secondaryFs = secondaryFs();
+
                 if (secondaryFs == null) {
                     assert mgmt;
 
@@ -929,6 +918,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
     /** {@inheritDoc} */
     @Override public void setWorkingDirectory(Path newPath) {
+        final FileSystem secondaryFs = secondaryFs();
+
         if (newPath == null) {
             Path homeDir = getHomeDirectory();
 
@@ -969,6 +960,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
             IgfsMode mode = mode(path);
 
             if (mode == PROXY) {
+                final FileSystem secondaryFs = secondaryFs();
+
                 if (secondaryFs == null) {
                     assert mgmt;
 
@@ -1010,6 +1003,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
         try {
             if (mode(f) == PROXY) {
+                final FileSystem secondaryFs = secondaryFs();
+
                 if (secondaryFs == null) {
                     assert mgmt;
 
@@ -1040,6 +1035,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
         try {
             if (mode(f) == PROXY) {
+                final FileSystem secondaryFs = secondaryFs();
+
                 if (secondaryFs == null) {
                     assert mgmt;
 
@@ -1071,6 +1068,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
             IgfsPath path = convert(status.getPath());
 
             if (mode(status.getPath()) == PROXY) {
+                final FileSystem secondaryFs = secondaryFs();
+
                 if (secondaryFs == null) {
                     assert mgmt;
 
@@ -1149,7 +1148,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
      * @return Secondary file system path.
      */
     private Path toSecondary(Path path) {
-        assert secondaryFs != null;
+        assert factory != null;
         assert secondaryUri != null;
 
         return convertPath(path, secondaryUri);
@@ -1324,4 +1323,21 @@ public class IgniteHadoopFileSystem extends FileSystem {
     public String user() {
         return user;
     }
+
+    /**
+     * Gets cached or creates a {@link FileSystem}.
+     *
+     * @return The secondary file system.
+     */
+    private @Nullable FileSystem secondaryFs() {
+        if (factory == null)
+            return null;
+
+        try {
+            return factory.create(user);
+        }
+        catch (IOException ioe) {
+            throw new IgniteException();
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/83e12492/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/HadoopV2FileSystemFactory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/HadoopV2FileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/HadoopV2FileSystemFactory.java
deleted file mode 100644
index c2ab620..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/HadoopV2FileSystemFactory.java
+++ /dev/null
@@ -1,11 +0,0 @@
-//package org.apache.ignite.hadoop.fs.v2;
-//
-//import org.apache.hadoop.fs.AbstractFileSystem;
-//import org.apache.hadoop.fs.FileSystem;
-//
-///**
-// * Created by ivan on 18.12.15.
-// */
-//public interface HadoopV2FileSystemFactory {
-//    AbstractFileSystem create(String uri, String configPath, String userName);
-//}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83e12492/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
index 96f97dc..d665d4c 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
@@ -73,6 +73,7 @@ import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lifecycle.LifecycleAware;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_BATCH_SIZE;
@@ -168,6 +169,9 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
     /** Secondary file system instance. */
     private AbstractFileSystem secondaryFs;
 
+    /** Secondary file system factory. */
+    private HadoopAbstractFileSystemFactory factory;
+
     /** Whether custom sequential reads before prefetch value is provided. */
     private boolean seqReadsBeforePrefetchOverride;
 
@@ -332,32 +336,14 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
             }
 
             if (initSecondary) {
-//                Map<String, String> props = paths.properties();
-//
-//                String secUri = props.get(SECONDARY_FS_URI);
-//                String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH);
-
-                HadoopAbstractFileSystemFactory factory
-                    = (HadoopAbstractFileSystemFactory)paths.getPayload();
+                factory = (HadoopAbstractFileSystemFactory)paths.getPayload();
 
                 A.ensure(secondaryUri != null, "File system factory uri should not be null.");
 
-                //secondaryUri = factory.uri();
-
                 try {
-                    //SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath);
-
                     secondaryFs = factory.get(user);
 
                     secondaryUri = secondaryFs.getUri();
-
-//                    assert secondaryUri != null;
-//
-//                    URI uri2 = ((DefaultHadoopFileSystemFactory)factory).uri();
-//                    assert secondaryUri.equals(uri2);
-
-                    //secondaryFs = secProvider.createAbstractFileSystem(user);
-                    //secondaryUri = secProvider.uri();
                 }
                 catch (IOException e) {
                     throw new IOException("Failed to connect to the secondary file system: " + secondaryUri, e);
@@ -380,6 +366,9 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
             if (clientLog.isLogEnabled())
                 clientLog.close();
 
+            if (factory instanceof LifecycleAware)
+                ((LifecycleAware) factory).stop();
+
             // Reset initialized resources.
             rmtClient = null;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/83e12492/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/KerberosSecondaryFileSystemProvider.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/KerberosSecondaryFileSystemProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/KerberosSecondaryFileSystemProvider.java
deleted file mode 100644
index 503ac46..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/KerberosSecondaryFileSystemProvider.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * See https://issues.apache.org/jira/browse/IGNITE-2195 .
- */
-public class KerberosSecondaryFileSystemProvider extends SecondaryFileSystemProvider {
-    /**
-     * Constructor.
-     **/
-    public KerberosSecondaryFileSystemProvider(@Nullable String secUri, @Nullable String secConfPath) throws IOException {
-        super(secUri, secConfPath);
-    }
-
-    /** {@inheritDoc} */
-    @Override public FileSystem createFileSystem(String userName) throws IOException {
-        UserGroupInformation.setConfiguration(cfg);
-
-        UserGroupInformation ugi = UserGroupInformation.createProxyUser(userName, UserGroupInformation.getCurrentUser());
-
-        try {
-            return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
-                @Override public FileSystem run() throws Exception {
-                    return FileSystem.get(uri, cfg);
-                }
-            });
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-
-            throw new IOException("Failed to create file system due to interrupt.", e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83e12492/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
deleted file mode 100644
index 1e7ac7f..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.security.PrivilegedExceptionAction;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.AbstractFileSystem;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
-import org.apache.ignite.internal.util.IgniteUtils;
-import static org.apache.ignite.internal.util.typedef.F.*;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Encapsulates logic of secondary filesystem creation.
- */
-public class SecondaryFileSystemProvider {
-    /** Configuration of the secondary filesystem, never null. */
-    protected final Configuration cfg = HadoopUtils.safeCreateConfiguration();
-
-    /** The secondary filesystem URI, never null. */
-    protected final URI uri;
-
-    /** Configuration file path. */
-    @Nullable protected final String confPath;
-
-    /**
-     * Creates new provider with given config parameters. The configuration URL is optional. The filesystem URI must be
-     * specified either explicitly or in the configuration provided.
-     *
-     * @param secUri the secondary Fs URI (optional). If not given explicitly, it must be specified as "fs.defaultFS"
-     * property in the provided configuration.
-     * @param secConfPath the secondary Fs path (file path on the local file system, optional).
-     * See {@link IgniteUtils#resolveIgniteUrl(String)} on how the path resolved.
-     * @throws IOException
-     */
-    public SecondaryFileSystemProvider(@Nullable String secUri, @Nullable String secConfPath) throws IOException {
-        confPath = secConfPath;
-
-        if (confPath != null) {
-            URL url = U.resolveIgniteUrl(confPath);
-
-            if (url == null) {
-                // If secConfPath is given, it should be resolvable:
-                throw new IllegalArgumentException("Failed to resolve secondary file system configuration path " +
-                    "(ensure that it exists locally and you have read access to it): " + confPath);
-            }
-
-            cfg.addResource(url);
-        }
-
-        // if secondary fs URI is not given explicitly, try to get it from the configuration:
-        if (secUri == null)
-            uri = FileSystem.getDefaultUri(cfg);
-        else {
-            try {
-                uri = new URI(secUri);
-            }
-            catch (URISyntaxException use) {
-                throw new IOException("Failed to resolve secondary file system URI: " + secUri);
-            }
-        }
-
-        // Disable caching:
-        String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(uri.getScheme());
-
-        cfg.setBoolean(prop, true);
-    }
-
-    /**
-     * @return {@link org.apache.hadoop.fs.FileSystem}  instance for this secondary Fs.
-     * @throws IOException
-     */
-    public FileSystem createFileSystem(String userName) throws IOException {
-        userName = IgfsUtils.fixUserName(userName);
-
-        final FileSystem fileSys;
-
-        try {
-           fileSys = FileSystem.get(uri, cfg, userName);
-        }
-        catch (InterruptedException e) {
-           Thread.currentThread().interrupt();
-
-           throw new IOException("Failed to create file system due to interrupt.", e);
-        }
-
-        return fileSys;
-    }
-
-    /**
-     * @return {@link org.apache.hadoop.fs.AbstractFileSystem} instance for this secondary Fs.
-     * @throws IOException in case of error.
-     */
-    public AbstractFileSystem createAbstractFileSystem(String userName) throws IOException {
-        userName = IgfsUtils.fixUserName(userName);
-
-        String ticketCachePath = cfg.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
-
-        UserGroupInformation ugi = UserGroupInformation.getBestUGI(ticketCachePath, userName);
-
-        try {
-            return ugi.doAs(new PrivilegedExceptionAction<AbstractFileSystem>() {
-                @Override public AbstractFileSystem run() throws IOException {
-                    return AbstractFileSystem.get(uri, cfg);
-                }
-            });
-        } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-
-            throw new IOException("Failed to create file system due to interrupt.", ie);
-        }
-    }
-
-    /**
-     * @return the secondary fs URI, never null.
-     */
-    public URI uri() {
-        return uri;
-    }
-
-    /**
-     * @return The configuration path, if any.
-     */
-    @Nullable public String configurationPath() {
-        return confPath;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/83e12492/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
index 58b5120..2b20639 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
@@ -57,10 +57,6 @@ public class HadoopLazyConcurrentMap<K, V extends Closeable> {
         assert getClass().getClassLoader() == Ignite.class.getClassLoader();
     }
 
-    public int size () {
-        return map.size();
-    }
-
     /**
      * Gets cached or creates a new value of V.
      * Never returns null.
@@ -77,7 +73,7 @@ public class HadoopLazyConcurrentMap<K, V extends Closeable> {
             try {
                 if (closed)
                     throw new IllegalStateException("Failed to create value for key [" + k
-                        + "]: the map is already closed.");
+                        + "]: the map is already closed. this = " + System.identityHashCode(this));
 
                 final ValueWrapper wNew = new ValueWrapper(k);
 
@@ -116,6 +112,10 @@ public class HadoopLazyConcurrentMap<K, V extends Closeable> {
             if (closed)
                 return;
 
+            // TODO: debug:
+            System.out.println("##### closed: " + System.identityHashCode(this));
+            Thread.dumpStack();
+
             closed = true;
 
             Exception err = null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/83e12492/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
index 4ddfb0d..98ab317 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
@@ -22,6 +22,7 @@ import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.URI;
+import java.util.Collections;
 import java.util.concurrent.Callable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -34,9 +35,9 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.FileSystemConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory;
 import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
 import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
-import org.apache.ignite.internal.processors.hadoop.SecondaryFileSystemProvider;
 import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils;
 import org.apache.ignite.internal.processors.igfs.IgfsCommonAbstractTest;
 import org.apache.ignite.internal.util.typedef.G;
@@ -173,12 +174,16 @@ public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstra
         else
             primaryConfFullPath = null;
 
-        SecondaryFileSystemProvider provider =
-            new SecondaryFileSystemProvider(primaryFsUriStr, primaryConfFullPath);
+        CachingHadoopFileSystemFactory fac = new CachingHadoopFileSystemFactory();
 
-        primaryFs = provider.createFileSystem(null);
+        fac.setConfigPaths(Collections.singletonList(primaryConfFullPath));
+        fac.setUri(primaryFsUriStr);
 
-        primaryFsUri = provider.uri();
+        fac.start();
+
+        primaryFs = fac.create(null); //provider.createFileSystem(null);
+
+        primaryFsUri = primaryFs.getUri();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/83e12492/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
index 310c390..d473592 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
@@ -391,7 +391,7 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
             IgniteHadoopIgfsSecondaryFileSystem sec = new IgniteHadoopIgfsSecondaryFileSystem();
 
             sec.setFileSystemFactory(fac);
-            sec.setUserName(SECONDARY_FS_USER);
+            sec.setDefaultUserName(SECONDARY_FS_USER);
 
             // NB: start() will be invoked upon IgfsImpl init.
             cfg.setSecondaryFileSystem(sec);

http://git-wip-us.apache.org/repos/asf/ignite/blob/83e12492/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
index eac6bb8..0216f4b 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
@@ -183,7 +183,7 @@ public class IgniteHadoopTestSuite extends TestSuite {
      * @throws Exception If failed.
      */
     public static void downloadHadoop() throws Exception {
-        String ver = IgniteSystemProperties.getString("hadoop.version", "2.6.0");
+        String ver = IgniteSystemProperties.getString("hadoop.version", "2.4.1");
 
         X.println("Will use Hadoop version: " + ver);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/83e12492/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
index 1507543..3782596 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
@@ -475,7 +475,7 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
             Cache.Entry<BinaryObject, BinaryObject> entry = F.first(qry.getAll());
 
             assertNotNull(entry);
-            assertEquals(Long.valueOf(100500L), entry.getKey().field("id"));
+            assertEquals(100500L, entry.getKey().field("id"));
             assertEquals(val1, entry.getValue().deserialize());
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/83e12492/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 0481088..f665d40 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -69,7 +69,7 @@
         <guava14.version>14.0.1</guava14.version>
         <guava16.version>16.0.1</guava16.version>
         <h2.version>1.3.175</h2.version>
-        <hadoop.version>2.6.0</hadoop.version>
+        <hadoop.version>2.4.1</hadoop.version>
         <httpclient.version>4.5.1</httpclient.version>
         <httpcore.version>4.4.3</httpcore.version>
         <jackson.version>1.9.13</jackson.version>


[12/12] ignite git commit: IGNITE-2206: Fixed a bug with file system factory deserialization.

Posted by vo...@apache.org.
IGNITE-2206: Fixed a bug with file system factory deserialization.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/04827ca7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/04827ca7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/04827ca7

Branch: refs/heads/ignite-2206
Commit: 04827ca7f2f96d50e890b98603f1a9f41e1c853a
Parents: 061c222
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Dec 23 17:17:40 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Dec 23 17:17:40 2015 +0300

----------------------------------------------------------------------
 .../internal/processors/igfs/IgfsPaths.java     | 55 +++++++++++++-------
 .../hadoop/fs/v1/IgniteHadoopFileSystem.java    | 35 ++++++++-----
 .../hadoop/fs/v2/IgniteHadoopFileSystem.java    | 41 +++++++++------
 .../hadoop/fs/HadoopLazyConcurrentMap.java      |  4 --
 4 files changed, 81 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/04827ca7/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java
index cd34655..0bb581b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java
@@ -27,6 +27,8 @@ import java.io.ObjectOutput;
 import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.List;
+
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.igfs.IgfsMode;
 import org.apache.ignite.igfs.IgfsPath;
 import org.apache.ignite.internal.util.typedef.T2;
@@ -41,7 +43,7 @@ public class IgfsPaths implements Externalizable {
     private static final long serialVersionUID = 0L;
 
     /** */
-    private Object payload;
+    private byte[] payloadBytes;
 
     /** Default IGFS mode. */
     private IgfsMode dfltMode;
@@ -62,11 +64,27 @@ public class IgfsPaths implements Externalizable {
      * @param payload Payload.
      * @param dfltMode Default IGFS mode.
      * @param pathModes Path modes.
+     * @throws IgniteCheckedException If failed.
      */
-    public IgfsPaths(Object payload, IgfsMode dfltMode, @Nullable List<T2<IgfsPath, IgfsMode>> pathModes) {
-        this.payload = payload;
+    public IgfsPaths(Object payload, IgfsMode dfltMode, @Nullable List<T2<IgfsPath, IgfsMode>> pathModes)
+        throws IgniteCheckedException {
         this.dfltMode = dfltMode;
         this.pathModes = pathModes;
+
+        if (payload == null)
+            payloadBytes = null;
+        else {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+            try (ObjectOutput oo = new ObjectOutputStream(baos)) {
+                oo.writeObject(payload);
+            }
+            catch (IOException e) {
+                throw new IgniteCheckedException("Failed to serialize secondary file system factory: " + payload, e);
+            }
+
+            payloadBytes = baos.toByteArray();
+        }
     }
 
     /**
@@ -85,20 +103,25 @@ public class IgfsPaths implements Externalizable {
 
     /**
      * @return Payload.
+     *
+     * @throws IgniteCheckedException If failed to deserialize the payload.
      */
-    @Nullable public Object getPayload() {
-        return payload;
+    @Nullable public Object getPayload() throws IgniteCheckedException {
+        if (payloadBytes == null)
+            return null;
+        else {
+            try (ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(payloadBytes))) {
+                return oi.readObject();
+            }
+            catch (IOException | ClassNotFoundException e) {
+                throw new IgniteCheckedException("Failed to deserialize secondary file system factory. ", e);
+            }
+        }
     }
 
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
-        try (ObjectOutput oo = new ObjectOutputStream(baos)) {
-            oo.writeObject(payload);
-        }
-
-        U.writeByteArray(out, baos.toByteArray());
+        U.writeByteArray(out, payloadBytes);
 
         U.writeEnum(out, dfltMode);
 
@@ -120,13 +143,7 @@ public class IgfsPaths implements Externalizable {
 
     /** {@inheritDoc} */
     @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        byte[] factoryBytes = U.readByteArray(in);
-
-        assert factoryBytes != null;
-
-        try (ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(factoryBytes))) {
-            payload = oi.readObject();
-        }
+        payloadBytes = U.readByteArray(in);
 
         dfltMode = IgfsMode.fromOrdinal(in.readByte());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/04827ca7/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index 545c905..20cd9ca 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -17,19 +17,6 @@
 
 package org.apache.ignite.hadoop.fs.v1;
 
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.ContentSummary;
@@ -43,6 +30,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Progressable;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
 import org.apache.ignite.igfs.IgfsBlockLocation;
@@ -71,6 +59,20 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lifecycle.LifecycleAware;
 import org.jetbrains.annotations.Nullable;
 
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_BATCH_SIZE;
 import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_DIR;
 import static org.apache.ignite.igfs.IgfsMode.PROXY;
@@ -329,7 +331,12 @@ public class IgniteHadoopFileSystem extends FileSystem {
             }
 
             if (initSecondary) {
-                factory = (HadoopFileSystemFactory)paths.getPayload();
+                try {
+                    factory = (HadoopFileSystemFactory) paths.getPayload();
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IOException("Failed to get secondary file system factory.", e);
+                }
 
                 A.ensure(factory != null, "Secondary file system factory should not be null.");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/04827ca7/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
index d665d4c..59d870c 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
@@ -17,22 +17,6 @@
 
 package org.apache.ignite.hadoop.fs.v2;
 
-import java.io.BufferedOutputStream;
-import java.io.Closeable;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -51,6 +35,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Progressable;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.igfs.IgfsBlockLocation;
 import org.apache.ignite.igfs.IgfsFile;
@@ -76,6 +61,23 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lifecycle.LifecycleAware;
 import org.jetbrains.annotations.Nullable;
 
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_BATCH_SIZE;
 import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_DIR;
 import static org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.getFsHadoopUser;
@@ -336,7 +338,12 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
             }
 
             if (initSecondary) {
-                factory = (HadoopAbstractFileSystemFactory)paths.getPayload();
+                try {
+                    factory = (HadoopAbstractFileSystemFactory) paths.getPayload();
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IOException("Failed to get secondary file system factory.", e);
+                }
 
                 A.ensure(secondaryUri != null, "File system factory uri should not be null.");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/04827ca7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
index 2b20639..5808d01 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
@@ -112,10 +112,6 @@ public class HadoopLazyConcurrentMap<K, V extends Closeable> {
             if (closed)
                 return;
 
-            // TODO: debug:
-            System.out.println("##### closed: " + System.identityHashCode(this));
-            Thread.dumpStack();
-
             closed = true;
 
             Exception err = null;


[11/12] ignite git commit: Merge branch 'ignite-2206' of https://github.com/iveselovskiy/ignite into ignite-2206

Posted by vo...@apache.org.
Merge branch 'ignite-2206' of https://github.com/iveselovskiy/ignite into ignite-2206


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/061c2227
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/061c2227
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/061c2227

Branch: refs/heads/ignite-2206
Commit: 061c2227fb2f74db6bcc599c0eb4ad0ee12ef597
Parents: 23df4bb 5d2d87c
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Dec 23 17:00:24 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Dec 23 17:00:24 2015 +0300

----------------------------------------------------------------------
 .../ignite/igfs/IgniteHadoopFileSystemShmemAbstractSelfTest.java  | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------



[06/12] ignite git commit: Merge remote-tracking branch 'origin/ignite-1.5' into ignite-1.5

Posted by vo...@apache.org.
Merge remote-tracking branch 'origin/ignite-1.5' into ignite-1.5


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9db05f5f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9db05f5f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9db05f5f

Branch: refs/heads/ignite-2206
Commit: 9db05f5f6c64d3b6473d65a501a31287a6c7942c
Parents: 53ec76f 16a9e6d
Author: sboikov <sb...@gridgain.com>
Authored: Wed Dec 23 15:07:22 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Dec 23 15:07:22 2015 +0300

----------------------------------------------------------------------
 .../query/GridCacheDistributedQueryManager.java |   6 +-
 ...iteCacheScanPredicateDeploymentSelfTest.java | 114 +++++++++++++++++++
 .../testsuites/IgniteCacheTestSuite3.java       |   2 +
 .../p2p/CacheDeploymentAlwaysTruePredicate.java |  29 +++++
 4 files changed, 149 insertions(+), 2 deletions(-)
----------------------------------------------------------------------



[10/12] ignite git commit: IGNITE-2206: IgfsPaths simplification.

Posted by vo...@apache.org.
IGNITE-2206: IgfsPaths simplification.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/23df4bbc
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/23df4bbc
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/23df4bbc

Branch: refs/heads/ignite-2206
Commit: 23df4bbc150e965413e3308b0398cb29208ea929
Parents: 32ffef6
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Dec 23 17:00:07 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Dec 23 17:00:07 2015 +0300

----------------------------------------------------------------------
 .../internal/processors/igfs/IgfsPaths.java     | 63 ++++++--------------
 1 file changed, 18 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/23df4bbc/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java
index 986f59f..cd34655 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java
@@ -92,7 +92,13 @@ public class IgfsPaths implements Externalizable {
 
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
-        writePayload(out);
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        try (ObjectOutput oo = new ObjectOutputStream(baos)) {
+            oo.writeObject(payload);
+        }
+
+        U.writeByteArray(out, baos.toByteArray());
 
         U.writeEnum(out, dfltMode);
 
@@ -101,7 +107,10 @@ public class IgfsPaths implements Externalizable {
             out.writeInt(pathModes.size());
 
             for (T2<IgfsPath, IgfsMode> pathMode : pathModes) {
+                assert pathMode.getKey() != null;
+
                 pathMode.getKey().writeExternal(out);
+
                 U.writeEnum(out, pathMode.getValue());
             }
         }
@@ -109,31 +118,16 @@ public class IgfsPaths implements Externalizable {
             out.writeBoolean(false);
     }
 
-    /**
-     * Write payload.
-     *
-     * @param out Output stream.
-     * @throws IOException If failed.
-     */
-    private void writePayload(ObjectOutput out) throws IOException {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        byte[] factoryBytes = U.readByteArray(in);
 
-        ObjectOutput oo = new ObjectOutputStream(baos);
+        assert factoryBytes != null;
 
-        try {
-            oo.writeObject(payload);
-        }
-        finally {
-            oo.close();
+        try (ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(factoryBytes))) {
+            payload = oi.readObject();
         }
 
-        U.writeByteArray(out, baos.toByteArray());
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        readPayload(in);
-
         dfltMode = IgfsMode.fromOrdinal(in.readByte());
 
         if (in.readBoolean()) {
@@ -143,32 +137,11 @@ public class IgfsPaths implements Externalizable {
 
             for (int i = 0; i < size; i++) {
                 IgfsPath path = new IgfsPath();
-                path.readExternal(in);
 
-                T2<IgfsPath, IgfsMode> entry = new T2<>(path, IgfsMode.fromOrdinal(in.readByte()));
+                path.readExternal(in);
 
-                pathModes.add(entry);
+                pathModes.add(new T2<>(path, IgfsMode.fromOrdinal(in.readByte())));
             }
         }
     }
-
-    /**
-     * Read payload.
-     *
-     * @param in Input stream.
-     * @throws IOException If failed.
-     * @throws ClassNotFoundException If failed.
-     */
-    private void readPayload(ObjectInput in) throws IOException, ClassNotFoundException {
-        byte[] factoryBytes = U.readByteArray(in);
-
-        ObjectInput oi = new ObjectInputStream(new ByteArrayInputStream(factoryBytes));
-
-        try {
-            payload = oi.readObject();
-        }
-        finally {
-            oi.close();
-        }
-    }
 }
\ No newline at end of file


[05/12] ignite git commit: ignite-1.5 Fixed hang on metadata update inside put in atomic cache when topology read lock is held. Also fixed several test issues.

Posted by vo...@apache.org.
ignite-1.5 Fixed hang on metadata update inside put in atomic cache when topology read lock is held. Also fixed several test issues.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/53ec76ff
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/53ec76ff
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/53ec76ff

Branch: refs/heads/ignite-2206
Commit: 53ec76ffe65d5788fc1ffa32c2fba66222e51dcc
Parents: 66b33bc
Author: sboikov <sb...@gridgain.com>
Authored: Wed Dec 23 15:06:48 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Dec 23 15:06:48 2015 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   |  39 +-
 .../processors/cache/GridCacheAdapter.java      |  47 ++-
 .../processors/cache/GridCacheProxyImpl.java    |  29 ++
 .../cache/GridCacheSharedContext.java           |  10 +-
 .../processors/cache/IgniteCacheProxy.java      |  35 ++
 .../processors/cache/IgniteInternalCache.java   |  26 ++
 .../binary/CacheObjectBinaryProcessorImpl.java  |   4 +-
 .../distributed/dht/GridDhtLockFuture.java      |   2 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  95 +++--
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 150 ++++---
 .../colocated/GridDhtColocatedLockFuture.java   |  11 +-
 .../distributed/near/GridNearLockFuture.java    |  11 +-
 ...arOptimisticSerializableTxPrepareFuture.java |   5 +-
 .../near/GridNearOptimisticTxPrepareFuture.java |   5 +-
 ...ridNearOptimisticTxPrepareFutureAdapter.java |  12 +-
 .../transactions/IgniteTxLocalAdapter.java      |   2 +
 .../cache/transactions/IgniteTxManager.java     |  61 ++-
 .../datastreamer/DataStreamProcessor.java       |  12 +-
 .../ignite/internal/util/lang/GridFunc.java     |   1 +
 .../test/config/websession/example-cache.xml    |   9 +-
 ...niteClientReconnectFailoverAbstractTest.java |   3 +-
 .../cache/GridCacheAbstractFullApiSelfTest.java |  22 +-
 .../cache/GridCacheAbstractSelfTest.java        |   3 +-
 ...yMetadataUpdateChangingTopologySelfTest.java |   7 +-
 ...niteBinaryMetadataUpdateNodeRestartTest.java | 411 +++++++++++++++++++
 .../distributed/IgniteCacheManyClientsTest.java |   2 +
 ...ContinuousQueryFailoverAbstractSelfTest.java | 128 +++---
 ...ridCacheContinuousQueryAbstractSelfTest.java |   3 +
 .../service/ClosureServiceClientsNodesTest.java |  22 +-
 .../GridServiceProcessorStopSelfTest.java       |  21 +-
 ...cpCommunicationSpiMultithreadedSelfTest.java |  21 +
 .../spi/discovery/tcp/TcpDiscoverySelfTest.java |  14 +-
 .../testframework/GridSpiTestContext.java       |  18 +-
 .../cache/websession/WebSessionFilter.java      |  82 ++--
 .../cache/websession/WebSessionListener.java    |  25 +-
 .../internal/websession/WebSessionSelfTest.java |   2 -
 36 files changed, 1023 insertions(+), 327 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index bf7c7e4..42f8dae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -666,6 +666,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      *
      * @param plc Policy.
      * @return Execution pool.
+     * @throws IgniteCheckedException If failed.
      */
     private Executor pool(byte plc) throws IgniteCheckedException {
         switch (plc) {
@@ -767,6 +768,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param msg Message.
      * @param plc Execution policy.
      * @param msgC Closure to call when message processing finished.
+     * @throws IgniteCheckedException If failed.
      */
     private void processRegularMessage(
         final UUID nodeId,
@@ -824,6 +826,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param msg Ordered message.
      * @param plc Execution policy.
      * @param msgC Closure to call when message processing finished ({@code null} for sync processing).
+     * @throws IgniteCheckedException If failed.
      */
     @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
     private void processOrderedMessage(
@@ -1029,7 +1032,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param ordered Ordered flag.
      * @param timeout Timeout.
      * @param skipOnTimeout Whether message can be skipped on timeout.
-     * @param ackClosure Ack closure.
+     * @param ackC Ack closure.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
     private void send(
@@ -1041,7 +1044,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         boolean ordered,
         long timeout,
         boolean skipOnTimeout,
-        IgniteInClosure<IgniteException> ackClosure
+        IgniteInClosure<IgniteException> ackC
     ) throws IgniteCheckedException {
         assert node != null;
         assert topic != null;
@@ -1062,8 +1065,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             else
                 processRegularMessage0(ioMsg, locNodeId);
 
-            if (ackClosure != null)
-                ackClosure.apply(null);
+            if (ackC != null)
+                ackC.apply(null);
         }
         else {
             if (topicOrd < 0)
@@ -1071,7 +1074,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
             try {
                 if ((CommunicationSpi)getSpi() instanceof TcpCommunicationSpi)
-                    ((TcpCommunicationSpi)(CommunicationSpi)getSpi()).sendMessage(node, ioMsg, ackClosure);
+                    ((TcpCommunicationSpi)(CommunicationSpi)getSpi()).sendMessage(node, ioMsg, ackC);
                 else
                     getSpi().sendMessage(node, ioMsg);
             }
@@ -1197,12 +1200,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param topic Topic to send the message to.
      * @param msg Message to send.
      * @param plc Type of processing.
-     * @param ackClosure Ack closure.
+     * @param ackC Ack closure.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
     public void send(ClusterNode node, GridTopic topic, Message msg, byte plc,
-        IgniteInClosure<IgniteException> ackClosure) throws IgniteCheckedException {
-        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackClosure);
+        IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
+        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackC);
     }
 
     /**
@@ -1233,12 +1236,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param topic Topic to send the message to.
      * @param msg Message to send.
      * @param plc Type of processing.
-     * @param ackClosure Ack closure.
+     * @param ackC Ack closure.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    public void send(ClusterNode node, Object topic, Message msg, byte plc, IgniteInClosure<IgniteException> ackClosure)
+    public void send(ClusterNode node, Object topic, Message msg, byte plc, IgniteInClosure<IgniteException> ackC)
         throws IgniteCheckedException {
-        send(node, topic, -1, msg, plc, false, 0, false, ackClosure);
+        send(node, topic, -1, msg, plc, false, 0, false, ackC);
     }
 
     /**
@@ -1280,7 +1283,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param plc Type of processing.
      * @param timeout Timeout to keep a message on receiving queue.
      * @param skipOnTimeout Whether message can be skipped on timeout.
-     * @param ackClosure Ack closure.
+     * @param ackC Ack closure.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
     public void sendOrderedMessage(
@@ -1290,11 +1293,11 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         byte plc,
         long timeout,
         boolean skipOnTimeout,
-        IgniteInClosure<IgniteException> ackClosure
+        IgniteInClosure<IgniteException> ackC
     ) throws IgniteCheckedException {
         assert timeout > 0 || skipOnTimeout;
 
-        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackClosure);
+        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC);
     }
 
      /**
@@ -1385,6 +1388,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param topic Topic to subscribe to.
      * @param p Message predicate.
      */
+    @SuppressWarnings("unchecked")
     public void addUserMessageListener(@Nullable final Object topic, @Nullable final IgniteBiPredicate<UUID, ?> p) {
         if (p != null) {
             try {
@@ -1406,6 +1410,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param topic Topic to unsubscribe from.
      * @param p Message predicate.
      */
+    @SuppressWarnings("unchecked")
     public void removeUserMessageListener(@Nullable Object topic, IgniteBiPredicate<UUID, ?> p) {
         try {
             removeMessageListener(TOPIC_COMM_USER,
@@ -1423,7 +1428,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param plc Type of processing.
      * @param timeout Timeout to keep a message on receiving queue.
      * @param skipOnTimeout Whether message can be skipped on timeout.
-     * @param ackClosure Ack closure.
+     * @param ackC Ack closure.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
     public void sendOrderedMessage(
@@ -1433,7 +1438,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         byte plc,
         long timeout,
         boolean skipOnTimeout,
-        IgniteInClosure<IgniteException> ackClosure
+        IgniteInClosure<IgniteException> ackC
     ) throws IgniteCheckedException {
         assert timeout > 0 || skipOnTimeout;
 
@@ -1442,7 +1447,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         if (node == null)
             throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
 
-        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackClosure);
+        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index cc4e962..5d4c386 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -2077,8 +2077,32 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public <T> EntryProcessorResult<T> invoke(@Nullable AffinityTopologyVersion topVer,
+        K key,
+        EntryProcessor<K, V, T> entryProcessor,
+        Object... args) throws IgniteCheckedException {
+        return invoke0(topVer, key, entryProcessor, args);
+    }
+
+    /** {@inheritDoc} */
     @Override public <T> EntryProcessorResult<T> invoke(final K key,
         final EntryProcessor<K, V, T> entryProcessor,
+        final Object... args) throws IgniteCheckedException {
+        return invoke0(null, key, entryProcessor, args);
+    }
+
+    /**
+     * @param topVer Locked topology version.
+     * @param key Key.
+     * @param entryProcessor Entry processor.
+     * @param args Entry processor arguments.
+     * @return Invoke result.
+     * @throws IgniteCheckedException If failed.
+     */
+    private <T> EntryProcessorResult<T> invoke0(
+        @Nullable final AffinityTopologyVersion topVer,
+        final K key,
+        final EntryProcessor<K, V, T> entryProcessor,
         final Object... args)
         throws IgniteCheckedException {
         A.notNull(key, "key", entryProcessor, "entryProcessor");
@@ -2089,8 +2113,15 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         return syncOp(new SyncOp<EntryProcessorResult<T>>(true) {
             @Nullable @Override public EntryProcessorResult<T> op(IgniteTxLocalAdapter tx)
                 throws IgniteCheckedException {
-                IgniteInternalFuture<GridCacheReturn> fut =
-                    tx.invokeAsync(ctx, key, (EntryProcessor<K, V, Object>)entryProcessor, args);
+                assert topVer == null || tx.implicit();
+
+                if (topVer != null)
+                    tx.topologyVersion(topVer);
+
+                IgniteInternalFuture<GridCacheReturn> fut = tx.invokeAsync(ctx,
+                    key,
+                    (EntryProcessor<K, V, Object>)entryProcessor,
+                    args);
 
                 Map<K, EntryProcessorResult<T>> resMap = fut.get().value();
 
@@ -2324,16 +2355,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         });
     }
 
-    /**
-     * Tries to put value in cache. Will fail with {@link GridCacheTryPutFailedException}
-     * if topology exchange is in progress.
-     *
-     * @param key Key.
-     * @param val value.
-     * @return Old value.
-     * @throws IgniteCheckedException In case of error.
-     */
-    @Nullable public V tryPutIfAbsent(K key, V val) throws IgniteCheckedException {
+    /** {@inheritDoc} */
+    @Nullable @Override public V tryPutIfAbsent(K key, V val) throws IgniteCheckedException {
         // Supported only in ATOMIC cache.
         throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index d1d93d8..8ffd273 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -36,6 +36,7 @@ import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityProxy;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
@@ -1231,6 +1232,34 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public V tryPutIfAbsent(K key, V val) throws IgniteCheckedException {
+        CacheOperationContext prev = gate.enter(opCtx);
+
+        try {
+            return delegate.tryPutIfAbsent(key, val);
+        }
+        finally {
+            gate.leave(prev);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public <T> EntryProcessorResult<T> invoke(
+        AffinityTopologyVersion topVer,
+        K key,
+        EntryProcessor<K, V, T> entryProcessor,
+        Object... args) throws IgniteCheckedException {
+        CacheOperationContext prev = gate.enter(opCtx);
+
+        try {
+            return delegate.invoke(topVer, key, entryProcessor, args);
+        }
+        finally {
+            gate.leave(prev);
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public void removeAll()
         throws IgniteCheckedException {
         CacheOperationContext prev = gate.enter(opCtx);

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 5ed1df9..2221d3b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -91,9 +91,6 @@ public class GridCacheSharedContext<K, V> {
     /** Tx metrics. */
     private volatile TransactionMetricsAdapter txMetrics;
 
-    /** Preloaders start future. */
-    private IgniteInternalFuture<Object> preloadersStartFut;
-
     /** Store session listeners. */
     private Collection<CacheStoreSessionListener> storeSesLsnrs;
 
@@ -578,12 +575,7 @@ public class GridCacheSharedContext<K, V> {
     @Nullable public AffinityTopologyVersion lockedTopologyVersion(IgniteInternalTx ignore) {
         long threadId = Thread.currentThread().getId();
 
-        IgniteInternalTx tx = txMgr.anyActiveThreadTx(threadId, ignore);
-
-        AffinityTopologyVersion topVer = null;
-
-        if (tx != null && tx.topologyVersionSnapshot() != null)
-            topVer = tx.topologyVersionSnapshot();
+        AffinityTopologyVersion topVer = txMgr.lockedTopologyVersion(threadId, ignore);
 
         if (topVer == null)
             topVer = mvccMgr.lastExplicitLockTopologyVersion(threadId);

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 271a2cf..27a7587 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -64,6 +64,7 @@ import org.apache.ignite.internal.AsyncSupportAdapter;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.query.CacheQuery;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
@@ -1483,6 +1484,40 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
         return invoke(key, (EntryProcessor<K, V, T>)entryProcessor, args);
     }
 
+    /**
+     * @param topVer Locked topology version.
+     * @param key Key.
+     * @param entryProcessor Entry processor.
+     * @param args Arguments.
+     * @return Invoke result.
+     */
+    public <T> T invoke(@Nullable AffinityTopologyVersion topVer,
+        K key,
+        EntryProcessor<K, V, T> entryProcessor,
+        Object... args) {
+        try {
+            GridCacheGateway<K, V> gate = this.gate;
+
+            CacheOperationContext prev = onEnter(gate, opCtx);
+
+            try {
+                if (isAsync())
+                    throw new UnsupportedOperationException();
+                else {
+                    EntryProcessorResult<T> res = delegate.invoke(topVer, key, entryProcessor, args);
+
+                    return res != null ? res.get() : null;
+                }
+            }
+            finally {
+                onLeave(gate, prev);
+            }
+        }
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys,
         EntryProcessor<K, V, T> entryProcessor,

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
index 186de68..433290c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
@@ -40,6 +40,7 @@ import org.apache.ignite.cache.store.CacheStore;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -1863,4 +1864,29 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> {
      * @throws IgniteCheckedException If failed.
      */
     public V getTopologySafe(K key) throws IgniteCheckedException;
+
+    /**
+     * Tries to put value in cache. Will fail with {@link GridCacheTryPutFailedException}
+     * if topology exchange is in progress.
+     *
+     * @param key Key.
+     * @param val value.
+     * @return Old value.
+     * @throws IgniteCheckedException In case of error.
+     */
+    @Nullable public V tryPutIfAbsent(K key, V val) throws IgniteCheckedException;
+
+    /**
+     * @param topVer Locked topology version.
+     * @param key Key.
+     * @param entryProcessor Entry processor.
+     * @param args Arguments.
+     * @return Invoke result.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable public <T> EntryProcessorResult<T> invoke(
+        @Nullable AffinityTopologyVersion topVer,
+        K key,
+        EntryProcessor<K, V, T> entryProcessor,
+        Object... args) throws IgniteCheckedException;
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index b335179..7586a42 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -489,7 +489,9 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
             BinaryMetadata oldMeta = metaDataCache.localPeek(key);
             BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(oldMeta, newMeta0);
 
-            BinaryObjectException err = metaDataCache.invoke(key, new MetadataProcessor(mergedMeta));
+            AffinityTopologyVersion topVer = ctx.cache().context().lockedTopologyVersion(null);
+
+            BinaryObjectException err = metaDataCache.invoke(topVer, key, new MetadataProcessor(mergedMeta));
 
             if (err != null)
                 throw err;

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index f0d2e15..98711b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -743,7 +743,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
         if (tx != null) {
             cctx.tm().txContext(tx);
 
-            set = cctx.tm().setTxTopologyHint(tx);
+            set = cctx.tm().setTxTopologyHint(tx.topologyVersionSnapshot());
         }
 
         try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 481317a..634a9ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -77,6 +77,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSing
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
@@ -1240,7 +1241,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 top.readLock();
 
                 try {
-                    if (topology().stopping()) {
+                    if (top.stopping()) {
                         res.addFailedKeys(keys, new IgniteCheckedException("Failed to perform cache operation " +
                             "(cache is stopped): " + name()));
 
@@ -1289,48 +1290,59 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                         GridCacheReturn retVal = null;
 
-                        if (keys.size() > 1 &&                             // Several keys ...
-                            writeThrough() && !req.skipStore() &&          // and store is enabled ...
-                            !ctx.store().isLocal() &&                      // and this is not local store ...
-                            !ctx.dr().receiveEnabled()                     // and no DR.
-                        ) {
-                            // This method can only be used when there are no replicated entries in the batch.
-                            UpdateBatchResult updRes = updateWithBatch(node,
-                                hasNear,
-                                req,
-                                res,
-                                locked,
-                                ver,
-                                dhtFut,
-                                completionCb,
-                                ctx.isDrEnabled(),
-                                taskName,
-                                expiry,
-                                sndPrevVal);
+                        IgniteTxManager tm = ctx.tm();
 
-                            deleted = updRes.deleted();
-                            dhtFut = updRes.dhtFuture();
+                        // Needed for metadata cache transaction.
+                        boolean set = tm.setTxTopologyHint(req.topologyVersion());
 
-                            if (req.operation() == TRANSFORM)
-                                retVal = updRes.invokeResults();
+                        try {
+                            if (keys.size() > 1 &&                             // Several keys ...
+                                writeThrough() && !req.skipStore() &&          // and store is enabled ...
+                                !ctx.store().isLocal() &&                      // and this is not local store ...
+                                !ctx.dr().receiveEnabled()                     // and no DR.
+                                ) {
+                                // This method can only be used when there are no replicated entries in the batch.
+                                UpdateBatchResult updRes = updateWithBatch(node,
+                                    hasNear,
+                                    req,
+                                    res,
+                                    locked,
+                                    ver,
+                                    dhtFut,
+                                    completionCb,
+                                    ctx.isDrEnabled(),
+                                    taskName,
+                                    expiry,
+                                    sndPrevVal);
+
+                                deleted = updRes.deleted();
+                                dhtFut = updRes.dhtFuture();
+
+                                if (req.operation() == TRANSFORM)
+                                    retVal = updRes.invokeResults();
+                            }
+                            else {
+                                UpdateSingleResult updRes = updateSingle(node,
+                                    hasNear,
+                                    req,
+                                    res,
+                                    locked,
+                                    ver,
+                                    dhtFut,
+                                    completionCb,
+                                    ctx.isDrEnabled(),
+                                    taskName,
+                                    expiry,
+                                    sndPrevVal);
+
+                                retVal = updRes.returnValue();
+                                deleted = updRes.deleted();
+                                dhtFut = updRes.dhtFuture();
+                            }
                         }
-                        else {
-                            UpdateSingleResult updRes = updateSingle(node,
-                                hasNear,
-                                req,
-                                res,
-                                locked,
-                                ver,
-                                dhtFut,
-                                completionCb,
-                                ctx.isDrEnabled(),
-                                taskName,
-                                expiry,
-                                sndPrevVal);
-
-                            retVal = updRes.returnValue();
-                            deleted = updRes.deleted();
-                            dhtFut = updRes.dhtFuture();
+                        finally {
+                            if (set)
+                                tm.setTxTopologyHint(null);
                         }
 
                         if (retVal == null)
@@ -2782,8 +2794,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         if (log.isDebugEnabled())
             log.debug("Processing dht atomic update response [nodeId=" + nodeId + ", res=" + res + ']');
 
-        GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc().
-            atomicFuture(res.futureVersion());
+        GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion());
 
         if (updateFut != null)
             updateFut.onResult(nodeId, res);

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index eefdc73..3c86083 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.UUID;
@@ -47,8 +46,8 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
@@ -288,7 +287,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             // Cannot remap.
             remapCnt = 1;
 
-            state.map(topVer);
+            state.map(topVer, null);
         }
     }
 
@@ -415,7 +414,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             cache.topology().readUnlock();
         }
 
-        state.map(topVer);
+        state.map(topVer, null);
     }
 
     /**
@@ -582,7 +581,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                     req = mappings != null ? mappings.get(nodeId) : null;
 
                 if (req != null && req.response() == null) {
-                    res = new GridNearAtomicUpdateResponse(cctx.cacheId(), nodeId, req.futureVersion(),
+                    res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+                        nodeId,
+                        req.futureVersion(),
                         cctx.deploymentEnabled());
 
                     ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " +
@@ -603,6 +604,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
          * @param res Response.
          * @param nodeErr {@code True} if response was created on node failure.
          */
+        @SuppressWarnings("unchecked")
         void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
             GridNearAtomicUpdateRequest req;
 
@@ -774,7 +776,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                     return;
                 }
 
-                IgniteInternalFuture<AffinityTopologyVersion> fut = cctx.affinity().affinityReadyFuture(remapTopVer);
+                IgniteInternalFuture<AffinityTopologyVersion> fut =
+                    cctx.shared().exchange().affinityReadyFuture(remapTopVer);
+
+                if (fut == null)
+                    fut = new GridFinishedFuture<>(remapTopVer);
 
                 fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
                     @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
@@ -783,7 +789,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                                 try {
                                     AffinityTopologyVersion topVer = fut.get();
 
-                                    map(topVer);
+                                    map(topVer, remapKeys);
                                 }
                                 catch (IgniteCheckedException e) {
                                     onDone(e);
@@ -819,8 +825,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
         /**
          * @param topVer Topology version.
+         * @param remapKeys Keys to remap.
          */
-        void map(AffinityTopologyVersion topVer) {
+        void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) {
             Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
 
             if (F.isEmpty(topNodes)) {
@@ -832,68 +839,78 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
             Exception err = null;
             GridNearAtomicUpdateRequest singleReq0 = null;
-            Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = null;
+            Map<UUID, GridNearAtomicUpdateRequest> mappings0 = null;
 
             int size = keys.size();
 
-            synchronized (this) {
-                assert futVer == null : this;
-                assert this.topVer == AffinityTopologyVersion.ZERO : this;
+            GridCacheVersion futVer = cctx.versions().next(topVer);
 
-                resCnt = 0;
+            GridCacheVersion updVer;
 
-                this.topVer = topVer;
+            // Assign version on near node in CLOCK ordering mode even if fastMap is false.
+            if (cctx.config().getAtomicWriteOrderMode() == CLOCK) {
+                updVer = this.updVer;
 
-                futVer = cctx.versions().next(topVer);
+                if (updVer == null) {
+                    updVer = cctx.versions().next(topVer);
 
-                if (storeFuture()) {
-                    if (!cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this)) {
-                        assert isDone() : GridNearAtomicUpdateFuture.this;
-
-                        return;
-                    }
+                    if (log.isDebugEnabled())
+                        log.debug("Assigned fast-map version for update on near node: " + updVer);
                 }
+            }
+            else
+                updVer = null;
 
-                // Assign version on near node in CLOCK ordering mode even if fastMap is false.
-                if (updVer == null)
-                    updVer = cctx.config().getAtomicWriteOrderMode() == CLOCK ? cctx.versions().next(topVer) : null;
+            try {
+                if (size == 1 && !fastMap) {
+                    assert remapKeys == null || remapKeys.size() == 1;
 
-                if (updVer != null && log.isDebugEnabled())
-                    log.debug("Assigned fast-map version for update on near node: " + updVer);
+                    singleReq0 = mapSingleUpdate(topVer, futVer, updVer);
+                }
+                else {
+                    Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = mapUpdate(topNodes,
+                        topVer,
+                        futVer,
+                        updVer,
+                        remapKeys);
+
+                    if (pendingMappings.size() == 1)
+                        singleReq0 = F.firstValue(pendingMappings);
+                    else {
+                        if (syncMode == PRIMARY_SYNC) {
+                            mappings0 = U.newHashMap(pendingMappings.size());
 
-                try {
-                    if (size == 1 && !fastMap) {
-                        assert remapKeys == null || remapKeys.size() == 1;
+                            for (GridNearAtomicUpdateRequest req : pendingMappings.values()) {
+                                if (req.hasPrimary())
+                                    mappings0.put(req.nodeId(), req);
+                            }
+                        }
+                        else
+                            mappings0 = pendingMappings;
 
-                        singleReq0 = singleReq = mapSingleUpdate();
+                        assert !mappings0.isEmpty() || size == 0 : GridNearAtomicUpdateFuture.this;
                     }
-                    else {
-                        pendingMappings = mapUpdate(topNodes);
+                }
 
-                        if (pendingMappings.size() == 1)
-                            singleReq0 = singleReq = F.firstValue(pendingMappings);
-                        else {
-                            if (syncMode == PRIMARY_SYNC) {
-                                mappings = U.newHashMap(pendingMappings.size());
+                synchronized (this) {
+                    assert this.futVer == null : this;
+                    assert this.topVer == AffinityTopologyVersion.ZERO : this;
 
-                                for (GridNearAtomicUpdateRequest req : pendingMappings.values()) {
-                                    if (req.hasPrimary())
-                                        mappings.put(req.nodeId(), req);
-                                }
-                            }
-                            else
-                                mappings = new HashMap<>(pendingMappings);
+                    this.topVer = topVer;
+                    this.updVer = updVer;
+                    this.futVer = futVer;
 
-                            assert !mappings.isEmpty() || size == 0 : GridNearAtomicUpdateFuture.this;
-                        }
-                    }
+                    resCnt = 0;
 
-                    remapKeys = null;
-                }
-                catch (Exception e) {
-                    err = e;
+                    singleReq = singleReq0;
+                    mappings = mappings0;
+
+                    this.remapKeys = null;
                 }
             }
+            catch (Exception e) {
+                err = e;
+            }
 
             if (err != null) {
                 onDone(err);
@@ -901,16 +918,24 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                 return;
             }
 
+            if (storeFuture()) {
+                if (!cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this)) {
+                    assert isDone() : GridNearAtomicUpdateFuture.this;
+
+                    return;
+                }
+            }
+
             // Optimize mapping for single key.
             if (singleReq0 != null)
                 mapSingle(singleReq0.nodeId(), singleReq0);
             else {
-                assert pendingMappings != null;
+                assert mappings0 != null;
 
                 if (size == 0)
                     onDone(new GridCacheReturn(cctx, true, true, null, true));
                 else
-                    doUpdate(pendingMappings);
+                    doUpdate(mappings0);
             }
         }
 
@@ -958,10 +983,18 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
         /**
          * @param topNodes Cache nodes.
+         * @param topVer Topology version.
+         * @param futVer Future version.
+         * @param updVer Update version.
+         * @param remapKeys Keys to remap.
          * @return Mapping.
          * @throws Exception If failed.
          */
-        private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes) throws Exception {
+        private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes,
+            AffinityTopologyVersion topVer,
+            GridCacheVersion futVer,
+            @Nullable GridCacheVersion updVer,
+            @Nullable Collection<KeyCacheObject> remapKeys) throws Exception {
             Iterator<?> it = null;
 
             if (vals != null)
@@ -999,7 +1032,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                         throw new NullPointerException("Null value.");
                 }
                 else if (conflictPutVals != null) {
-                    GridCacheDrInfo conflictPutVal =  conflictPutValsIt.next();
+                    GridCacheDrInfo conflictPutVal = conflictPutValsIt.next();
 
                     val = conflictPutVal.value();
                     conflictVer = conflictPutVal.version();
@@ -1082,10 +1115,15 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         }
 
         /**
+         * @param topVer Topology version.
+         * @param futVer Future version.
+         * @param updVer Update version.
          * @return Request.
          * @throws Exception If failed.
          */
-        private GridNearAtomicUpdateRequest mapSingleUpdate() throws Exception {
+        private GridNearAtomicUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer,
+            GridCacheVersion futVer,
+            @Nullable GridCacheVersion updVer) throws Exception {
             Object key = F.first(keys);
 
             Object val;

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 22b329c..a5f5286 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -48,7 +48,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLock
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -596,12 +595,8 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
         AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
 
         // If there is another system transaction in progress, use it's topology version to prevent deadlock.
-        if (topVer == null && tx != null && tx.system()) {
-            IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(Thread.currentThread().getId(), tx);
-
-            if (tx0 != null)
-                topVer = tx0.topologyVersionSnapshot();
-        }
+        if (topVer == null && tx != null && tx.system())
+            topVer = cctx.tm().lockedTopologyVersion(Thread.currentThread().getId(), tx);
 
         if (topVer != null && tx != null)
             tx.topologyVersion(topVer);
@@ -980,7 +975,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
      * @throws IgniteCheckedException If failed.
      */
     private void proceedMapping() throws IgniteCheckedException {
-        boolean set = tx != null && cctx.shared().tm().setTxTopologyHint(tx);
+        boolean set = tx != null && cctx.shared().tm().setTxTopologyHint(tx.topologyVersionSnapshot());
 
         try {
             proceedMapping0();

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 23e0f6b..55c5ab6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -46,7 +46,6 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCa
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactionalCacheAdapter;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -723,12 +722,8 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
         AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
 
         // If there is another system transaction in progress, use it's topology version to prevent deadlock.
-        if (topVer == null && tx != null && tx.system()) {
-            IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(threadId, tx);
-
-            if (tx0 != null)
-                topVer = tx0.topologyVersionSnapshot();
-        }
+        if (topVer == null && tx != null && tx.system())
+            topVer = cctx.tm().lockedTopologyVersion(threadId, tx);
 
         if (topVer != null && tx != null)
             tx.topologyVersion(topVer);
@@ -1098,7 +1093,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
      * @throws IgniteCheckedException If failed.
      */
     private void proceedMapping() throws IgniteCheckedException {
-        boolean set = tx != null && cctx.shared().tm().setTxTopologyHint(tx);
+        boolean set = tx != null && cctx.shared().tm().setTxTopologyHint(tx.topologyVersionSnapshot());
 
         try {
             proceedMapping0();

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index f52b3fc..37dc564 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -84,7 +84,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
      * @param cctx Context.
      * @param tx Transaction.
      */
-    public GridNearOptimisticSerializableTxPrepareFuture(GridCacheSharedContext cctx, GridNearTxLocal tx) {
+    public GridNearOptimisticSerializableTxPrepareFuture(GridCacheSharedContext cctx,
+        GridNearTxLocal tx) {
         super(cctx, tx);
 
         assert tx.optimistic() && tx.serializable() : tx;
@@ -304,7 +305,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
             return;
         }
 
-        boolean set = cctx.tm().setTxTopologyHint(tx);
+        boolean set = cctx.tm().setTxTopologyHint(tx.topologyVersionSnapshot());
 
         try {
             prepare(tx.readEntries(), tx.writeEntries(), remap, topLocked);

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 2ce14af..a9f158a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -72,7 +72,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
      * @param cctx Context.
      * @param tx Transaction.
      */
-    public GridNearOptimisticTxPrepareFuture(GridCacheSharedContext cctx, GridNearTxLocal tx) {
+    public GridNearOptimisticTxPrepareFuture(GridCacheSharedContext cctx,
+        GridNearTxLocal tx) {
         super(cctx, tx);
 
         assert tx.optimistic() && !tx.serializable() : tx;
@@ -405,7 +406,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
         if (isDone())
             return;
 
-        boolean set = cctx.tm().setTxTopologyHint(tx);
+        boolean set = cctx.tm().setTxTopologyHint(tx.topologyVersionSnapshot());
 
         try {
             assert !m.empty();

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
index b3eab34..fa7020b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
@@ -23,7 +23,6 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -41,7 +40,8 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT
      * @param cctx Context.
      * @param tx Transaction.
      */
-    public GridNearOptimisticTxPrepareFutureAdapter(GridCacheSharedContext cctx, GridNearTxLocal tx) {
+    public GridNearOptimisticTxPrepareFutureAdapter(GridCacheSharedContext cctx,
+        GridNearTxLocal tx) {
         super(cctx, tx);
 
         assert tx.optimistic() : tx;
@@ -55,12 +55,8 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT
         AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
 
         // If there is another system transaction in progress, use it's topology version to prevent deadlock.
-        if (topVer == null && tx != null && tx.system()) {
-            IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(threadId, tx);
-
-            if (tx0 != null)
-                topVer = tx0.topologyVersionSnapshot();
-        }
+        if (topVer == null && tx != null && tx.system())
+            topVer = cctx.tm().lockedTopologyVersion(threadId, tx);
 
         if (topVer != null) {
             tx.topologyVersion(topVer);

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 720832e..70c79a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -3203,8 +3203,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
     }
 
     /**
+     * @param cacheCtx Cache context.
      * @param loadFut Missing keys load future.
      * @param ret Future result.
+     * @param keepBinary Keep binary flag.
      * @return Future.
      */
     private IgniteInternalFuture optimisticPutFuture(

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index d2b803a..d384e4e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -114,8 +114,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     /** Committing transactions. */
     private final ThreadLocal<IgniteInternalTx> threadCtx = new ThreadLocal<>();
 
-    /** Transaction which topology version should be used when mapping internal tx. */
-    private final ThreadLocal<IgniteInternalTx> txTopology = new ThreadLocal<>();
+    /** Topology version should be used when mapping internal tx. */
+    private final ThreadLocal<AffinityTopologyVersion> txTop = new ThreadLocal<>();
 
     /** Per-thread transaction map. */
     private final ConcurrentMap<Long, IgniteInternalTx> threadMap = newMap();
@@ -130,7 +130,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     private final ConcurrentMap<GridCacheVersion, IgniteInternalTx> nearIdMap = newMap();
 
     /** TX handler. */
-    private IgniteTxHandler txHandler;
+    private IgniteTxHandler txHnd;
 
     /** Committed local transactions. */
     private final GridBoundedConcurrentOrderedMap<GridCacheVersion, Boolean> completedVersSorted =
@@ -197,7 +197,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     @Override protected void start0() throws IgniteCheckedException {
         txFinishSync = new GridCacheTxFinishSync<>(cctx);
 
-        txHandler = new IgniteTxHandler(cctx);
+        txHnd = new IgniteTxHandler(cctx);
     }
 
     /** {@inheritDoc} */
@@ -212,7 +212,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
      * @return TX handler.
      */
     public IgniteTxHandler txHandler() {
-        return txHandler;
+        return txHnd;
     }
 
     /**
@@ -607,13 +607,17 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     /**
      * @param threadId Thread ID.
      * @param ignore Transaction to ignore.
-     * @return Any transaction associated with the current thread.
+     * @return Not null topology version if current thread holds lock preventing topology change.
      */
-    public IgniteInternalTx anyActiveThreadTx(long threadId, IgniteInternalTx ignore) {
+    @Nullable public AffinityTopologyVersion lockedTopologyVersion(long threadId, IgniteInternalTx ignore) {
         IgniteInternalTx tx = threadMap.get(threadId);
 
-        if (tx != null && tx.topologyVersionSnapshot() != null)
-            return tx;
+        if (tx != null) {
+            AffinityTopologyVersion topVer = tx.topologyVersionSnapshot();
+
+            if (topVer != null)
+                return topVer;
+        }
 
         for (GridCacheContext cacheCtx : cctx.cache().context().cacheContexts()) {
             if (!cacheCtx.systemTx())
@@ -621,22 +625,27 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
 
             tx = sysThreadMap.get(new TxThreadKey(threadId, cacheCtx.cacheId()));
 
-            if (tx != null && tx != ignore && tx.topologyVersionSnapshot() != null)
-                return tx;
+            if (tx != null && tx != ignore) {
+                AffinityTopologyVersion topVer = tx.topologyVersionSnapshot();
+
+                if (topVer != null)
+                    return topVer;
+            }
         }
 
-        return txTopology.get();
+        return txTop.get();
     }
 
     /**
-     * @param tx Transaction.
+     * @param topVer Locked topology version.
+     * @return {@code True} if topology hint was set.
      */
-    public boolean setTxTopologyHint(IgniteInternalTx tx) {
-        if (tx == null)
-            txTopology.remove();
+    public boolean setTxTopologyHint(@Nullable AffinityTopologyVersion topVer) {
+        if (topVer == null)
+            txTop.remove();
         else {
-            if (txTopology.get() == null) {
-                txTopology.set(tx);
+            if (txTop.get() == null) {
+                txTop.set(topVer);
 
                 return true;
             }
@@ -1807,8 +1816,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
             this.evtNodeId = evtNodeId;
         }
 
-        /** {@inheritDoc} */
-        @Override public void onTimeout() {
+        /**
+         *
+         */
+        private void onTimeout0() {
             try {
                 cctx.kernalContext().gateway().readLock();
             }
@@ -1861,6 +1872,16 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                 cctx.kernalContext().gateway().readUnlock();
             }
         }
+
+        /** {@inheritDoc} */
+        @Override public void onTimeout() {
+            // Should not block timeout thread.
+            cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                @Override public void run() {
+                    onTimeout0();
+                }
+            });
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index a2aab77..da39209 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -62,6 +62,9 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
     /** Marshaller. */
     private final Marshaller marsh;
 
+    /** */
+    private byte[] marshErrBytes;
+
     /**
      * @param ctx Kernal context.
      */
@@ -86,6 +89,9 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
         if (ctx.config().isDaemon())
             return;
 
+        marshErrBytes = marsh.marshal(new IgniteCheckedException("Failed to marshal response error, " +
+            "see node log for details."));
+
         flusher = new IgniteThread(new GridWorker(ctx.gridName(), "grid-data-loader-flusher", log) {
             @Override protected void body() throws InterruptedException {
                 while (!isCancelled()) {
@@ -324,10 +330,10 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
         try {
             errBytes = err != null ? marsh.marshal(err) : null;
         }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to marshal message.", e);
+        catch (Exception e) {
+            U.error(log, "Failed to marshal error [err=" + err + ", marshErr=" + e + ']', e);
 
-            return;
+            errBytes = marshErrBytes;
         }
 
         DataStreamerResponse res = new DataStreamerResponse(reqId, errBytes, forceLocDep);

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index 8d5a8e7..8eeca6b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -3408,6 +3408,7 @@ public class GridFunc {
      * @return First element in given collection for which predicate evaluates to
      *      {@code true} - or {@code null} if such element cannot be found.
      */
+    @SafeVarargs
     @Nullable public static <V> V find(Iterable<? extends V> c, @Nullable V dfltVal,
         @Nullable IgnitePredicate<? super V>... p) {
         A.notNull(c, "c");

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/config/websession/example-cache.xml
----------------------------------------------------------------------
diff --git a/modules/core/src/test/config/websession/example-cache.xml b/modules/core/src/test/config/websession/example-cache.xml
index d5bfeb7..0cc0e1e 100644
--- a/modules/core/src/test/config/websession/example-cache.xml
+++ b/modules/core/src/test/config/websession/example-cache.xml
@@ -130,14 +130,7 @@
         <property name="discoverySpi">
             <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                 <property name="ipFinder">
-                    <!--
-                        Ignite provides several options for automatic discovery that can be used
-                        instead os static IP based discovery. For information on all options refer
-                        to our documentation: http://apacheignite.readme.io/docs/cluster-config
-                    -->
-                    <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
-                    <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
-                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
+                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
                         <property name="addresses">
                             <list>
                                 <!-- In distributed environment, replace with actual host IP address. -->

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
index f050c72..7e217b7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
@@ -117,7 +117,8 @@ public abstract class IgniteClientReconnectFailoverAbstractTest extends IgniteCl
                     }
 
                     return null;
-                } catch (Throwable e) {
+                }
+                catch (Throwable e) {
                     log.error("Unexpected error in operation thread: " + e, e);
 
                     stop.set(true);

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 5b294cc..0d9c541 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -3277,9 +3277,9 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
      * @throws Exception If failed.
      */
     public void testPeekExpired() throws Exception {
-        IgniteCache<String, Integer> c = jcache();
+        final IgniteCache<String, Integer> c = jcache();
 
-        String key = primaryKeysForCache(c, 1).get(0);
+        final String key = primaryKeysForCache(c, 1).get(0);
 
         info("Using key: " + key);
 
@@ -3295,6 +3295,12 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
 
         Thread.sleep(ttl + 100);
 
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return peek(c, key) == null;
+            }
+        }, 2000);
+
         assert peek(c, key) == null;
 
         assert c.localSize() == 0 : "Cache is not empty.";
@@ -3307,9 +3313,9 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
      */
     public void testPeekExpiredTx() throws Exception {
         if (txShouldBeUsed()) {
-            IgniteCache<String, Integer> c = jcache();
+            final IgniteCache<String, Integer> c = jcache();
 
-            String key = "1";
+            final String key = "1";
             int ttl = 500;
 
             try (Transaction tx = grid(0).transactions().txStart()) {
@@ -3320,9 +3326,13 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
                 tx.commit();
             }
 
-            Thread.sleep(ttl + 100);
+            GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    return peek(c, key) == null;
+                }
+            }, 2000);
 
-            assertNull(c.localPeek(key, ONHEAP));
+            assertNull(peek(c, key));
 
             assert c.localSize() == 0;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
index 52fbf4c..b3d1384 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
@@ -416,9 +416,8 @@ public abstract class GridCacheAbstractSelfTest extends GridCommonAbstractTest {
      * @param cache Cache projection.
      * @param key Key.
      * @return Value.
-     * @throws Exception If failed.
      */
-    @Nullable protected <K, V> V peek(IgniteCache<K, V> cache, K key) throws Exception {
+    @Nullable protected <K, V> V peek(IgniteCache<K, V> cache, K key) {
         return offheapTiered(cache) ? cache.localPeek(key, CachePeekMode.SWAP, CachePeekMode.OFFHEAP) :
             cache.localPeek(key, CachePeekMode.ONHEAP);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java
index e53650c..c95c586 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java
@@ -152,7 +152,7 @@ public class IgniteBinaryMetadataUpdateChangingTopologySelfTest extends GridComm
         private List<Object> recordedMsgs = new ArrayList<>();
 
         /** {@inheritDoc} */
-        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
+        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
             throws IgniteSpiException {
             if (msg instanceof GridIoMessage) {
                 Object msg0 = ((GridIoMessage)msg).message();
@@ -174,7 +174,7 @@ public class IgniteBinaryMetadataUpdateChangingTopologySelfTest extends GridComm
                 }
             }
 
-            super.sendMessage(node, msg, ackClosure);
+            super.sendMessage(node, msg, ackC);
         }
 
         /**
@@ -238,6 +238,9 @@ public class IgniteBinaryMetadataUpdateChangingTopologySelfTest extends GridComm
         }
     }
 
+    /**
+     *
+     */
     private static class TestValue {
         /** Field1. */
         private String field1;

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java
new file mode 100644
index 0000000..814fb08
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.CacheException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteBinaryMetadataUpdateNodeRestartTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final String ATOMIC_CACHE = "atomicCache";
+
+    /** */
+    private static final String TX_CACHE = "txCache";
+
+    /** */
+    private static final int SRVS = 3;
+
+    /** */
+    private static final int CLIENTS = 1;
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
+        cfg.setMarshaller(null);
+
+        CacheConfiguration ccfg1 = cacheConfiguration(TX_CACHE, TRANSACTIONAL);
+        CacheConfiguration ccfg2 = cacheConfiguration(ATOMIC_CACHE, ATOMIC);
+        
+        cfg.setCacheConfiguration(ccfg1, ccfg2);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @param name Cache name.
+     * @param atomicityMode Cache atomicity mode.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration cacheConfiguration(String name, CacheAtomicityMode atomicityMode) {
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setCacheMode(PARTITIONED);
+        ccfg.setBackups(1);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setName(name);
+        ccfg.setAtomicityMode(atomicityMode);
+
+        return ccfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNodeRestart() throws Exception {
+        for (int i = 0; i < 10; i++) {
+            log.info("Iteration: " + i);
+
+            client = false;
+
+            startGridsMultiThreaded(SRVS);
+
+            client = true;
+
+            startGrid(SRVS);
+
+            final AtomicBoolean stop = new AtomicBoolean();
+
+            try {
+                IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        while (!stop.get()) {
+                            log.info("Start node.");
+
+                            startGrid(SRVS + CLIENTS);
+
+                            log.info("Stop node.");
+
+                            stopGrid(SRVS + CLIENTS);
+                        }
+
+                        return null;
+                    }
+                }, "restart-thread");
+
+                final AtomicInteger idx = new AtomicInteger();
+
+                IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        int threadIdx = idx.getAndIncrement();
+
+                        int node = threadIdx % (SRVS + CLIENTS);
+
+                        Ignite ignite = ignite(node);
+
+                        log.info("Started thread: " + ignite.name());
+
+                        Thread.currentThread().setName("update-thread-" + threadIdx + "-" + ignite.name());
+
+                        IgniteCache<Object, Object> cache1 = ignite.cache(ATOMIC_CACHE);
+                        IgniteCache<Object, Object> cache2 = ignite.cache(TX_CACHE);
+
+                        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                        while (!stop.get()) {
+                            try {
+                                cache1.put(new TestClass1(true), create(rnd.nextInt(20) + 1));
+
+                                cache1.invoke(new TestClass1(true), new TestEntryProcessor(rnd.nextInt(20) + 1));
+
+                                cache2.put(new TestClass1(true), create(rnd.nextInt(20) + 1));
+
+                                cache2.invoke(new TestClass1(true), new TestEntryProcessor(rnd.nextInt(20) + 1));
+                            }
+                            catch (CacheException | IgniteException e) {
+                                log.info("Error: " + e);
+
+                                if (X.hasCause(e, ClusterTopologyException.class)) {
+                                    ClusterTopologyException cause = X.cause(e, ClusterTopologyException.class);
+
+                                    if (cause.retryReadyFuture() != null)
+                                        cause.retryReadyFuture().get();
+                                }
+                            }
+                        }
+
+                        return null;
+                    }
+                }, 10, "update-thread");
+
+                U.sleep(5_000);
+
+                stop.set(true);
+
+                restartFut.get();
+
+                fut.get();
+            }
+            finally {
+                stop.set(true);
+
+                stopAllGrids();
+            }
+        }
+    }
+
+    /**
+     * @param id Class ID.
+     * @return Test class instance.
+     */
+    private static Object create(int id) {
+        switch (id) {
+            case 1: return new TestClass1(true);
+
+            case 2: return new TestClass2();
+
+            case 3: return new TestClass3();
+
+            case 4: return new TestClass4();
+
+            case 5: return new TestClass5();
+
+            case 6: return new TestClass6();
+
+            case 7: return new TestClass7();
+
+            case 8: return new TestClass8();
+
+            case 9: return new TestClass9();
+
+            case 10: return new TestClass10();
+
+            case 11: return new TestClass11();
+
+            case 12: return new TestClass12();
+
+            case 13: return new TestClass13();
+
+            case 14: return new TestClass14();
+
+            case 15: return new TestClass15();
+
+            case 16: return new TestClass16();
+
+            case 17: return new TestClass17();
+
+            case 18: return new TestClass18();
+
+            case 19: return new TestClass19();
+
+            case 20: return new TestClass20();
+        }
+
+        fail();
+
+        return null;
+    }
+
+    /**
+     *
+     */
+    static class TestEntryProcessor implements CacheEntryProcessor<Object, Object, Object> {
+        /** */
+        private int id;
+
+        /**
+         * @param id Value id.
+         */
+        public TestEntryProcessor(int id) {
+            this.id = id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object process(MutableEntry<Object, Object> entry, Object... args) {
+            entry.setValue(create(id));
+
+            return null;
+        }
+    }
+
+    /**
+     *
+     */
+    static class TestClass1 {
+        /** */
+        int val;
+
+        /**
+         * @param setVal Set value flag.
+         */
+        public TestClass1(boolean setVal) {
+            this.val = setVal ? ThreadLocalRandom.current().nextInt(10_000) : 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o) return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TestClass1 that = (TestClass1)o;
+
+            return val == that.val;
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return val;
+        }
+    }
+
+    /**
+     *
+     */
+    static class TestClass2 {}
+
+    /**
+     *
+     */
+    static class TestClass3 {}
+
+    /**
+     *
+     */
+    static class TestClass4 {}
+
+    /**
+     *
+     */
+    static class TestClass5 {}
+
+    /**
+     *
+     */
+    static class TestClass6 {}
+
+    /**
+     *
+     */
+    static class TestClass7 {}
+
+    /**
+     *
+     */
+    static class TestClass8 {}
+
+    /**
+     *
+     */
+    static class TestClass9 {}
+
+    /**
+     *
+     */
+    static class TestClass10 {}
+
+    /**
+     *
+     */
+    static class TestClass11 {}
+
+    /**
+     *
+     */
+    static class TestClass12 {}
+
+    /**
+     *
+     */
+    static class TestClass13 {}
+
+    /**
+     *
+     */
+    static class TestClass14 {}
+
+    /**
+     *
+     */
+    static class TestClass15 {}
+
+    /**
+     *
+     */
+    static class TestClass16 {}
+
+    /**
+     *
+     */
+    static class TestClass17 {}
+
+    /**
+     *
+     */
+    static class TestClass18 {}
+
+    /**
+     *
+     */
+    static class TestClass19 {}
+
+    /**
+     *
+     */
+    static class TestClass20 {}
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
index 242b12d..8d4af19 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
@@ -65,6 +65,8 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest {
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
+        cfg.setFailureDetectionTimeout(20_000);
+
         cfg.setConnectorConfiguration(null);
         cfg.setPeerClassLoadingEnabled(false);
         cfg.setTimeServerPortRange(200);


[08/12] ignite git commit: Merge branch 'ignite-1.5' into ignite-2206

Posted by vo...@apache.org.
Merge branch 'ignite-1.5' into ignite-2206


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/32ffef6b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/32ffef6b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/32ffef6b

Branch: refs/heads/ignite-2206
Commit: 32ffef6bb66f39bf1825c8442da23fa47699b5c9
Parents: 83e1249 9db05f5
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Dec 23 16:55:21 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Dec 23 16:55:21 2015 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   |  39 +-
 .../processors/cache/GridCacheAdapter.java      |  47 ++-
 .../processors/cache/GridCacheProxyImpl.java    |  29 ++
 .../cache/GridCacheSharedContext.java           |  10 +-
 .../processors/cache/IgniteCacheProxy.java      |  35 ++
 .../processors/cache/IgniteInternalCache.java   |  26 ++
 .../binary/CacheObjectBinaryProcessorImpl.java  |   4 +-
 .../distributed/dht/GridDhtLockFuture.java      |   2 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  95 +++--
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 150 ++++---
 .../colocated/GridDhtColocatedLockFuture.java   |  11 +-
 .../distributed/near/GridNearLockFuture.java    |  11 +-
 ...arOptimisticSerializableTxPrepareFuture.java |   5 +-
 .../near/GridNearOptimisticTxPrepareFuture.java |   5 +-
 ...ridNearOptimisticTxPrepareFutureAdapter.java |  12 +-
 .../query/GridCacheDistributedQueryManager.java |   6 +-
 .../transactions/IgniteTxLocalAdapter.java      |   2 +
 .../cache/transactions/IgniteTxManager.java     |  61 ++-
 .../datastreamer/DataStreamProcessor.java       |  12 +-
 .../ignite/internal/util/lang/GridFunc.java     |   1 +
 .../test/config/websession/example-cache.xml    |   9 +-
 ...niteClientReconnectFailoverAbstractTest.java |   3 +-
 .../cache/GridCacheAbstractFullApiSelfTest.java |  22 +-
 .../cache/GridCacheAbstractSelfTest.java        |   3 +-
 ...iteCacheScanPredicateDeploymentSelfTest.java | 114 +++++
 ...yMetadataUpdateChangingTopologySelfTest.java |   7 +-
 ...niteBinaryMetadataUpdateNodeRestartTest.java | 411 +++++++++++++++++++
 .../distributed/IgniteCacheManyClientsTest.java |   2 +
 ...ContinuousQueryFailoverAbstractSelfTest.java | 128 +++---
 ...ridCacheContinuousQueryAbstractSelfTest.java |   3 +
 .../service/ClosureServiceClientsNodesTest.java |  22 +-
 .../GridServiceProcessorStopSelfTest.java       |  21 +-
 ...cpCommunicationSpiMultithreadedSelfTest.java |  21 +
 .../spi/discovery/tcp/TcpDiscoverySelfTest.java |  14 +-
 .../testframework/GridSpiTestContext.java       |  18 +-
 .../testsuites/IgniteCacheTestSuite3.java       |   2 +
 .../p2p/CacheDeploymentAlwaysTruePredicate.java |  29 ++
 .../cache/websession/WebSessionFilter.java      |  82 ++--
 .../cache/websession/WebSessionListener.java    |  25 +-
 .../internal/websession/WebSessionSelfTest.java |   2 -
 40 files changed, 1172 insertions(+), 329 deletions(-)
----------------------------------------------------------------------



[03/12] ignite git commit: IGNITE-2200 - Fixed deployment. - Fixes #367.

Posted by vo...@apache.org.
IGNITE-2200 - Fixed deployment. - Fixes #367.

Signed-off-by: Alexey Goncharuk <al...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/16a9e6da
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/16a9e6da
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/16a9e6da

Branch: refs/heads/ignite-2206
Commit: 16a9e6da2a8baca35d9b95bc0a28995d3f1c5780
Parents: 66b33bc
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Wed Dec 23 14:52:28 2015 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Dec 23 14:52:28 2015 +0300

----------------------------------------------------------------------
 .../query/GridCacheDistributedQueryManager.java |   6 +-
 ...iteCacheScanPredicateDeploymentSelfTest.java | 114 +++++++++++++++++++
 .../testsuites/IgniteCacheTestSuite3.java       |   2 +
 .../p2p/CacheDeploymentAlwaysTruePredicate.java |  29 +++++
 4 files changed, 149 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/16a9e6da/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index 7f63b4c..353fbd3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -570,7 +570,8 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
                 qry.query().subjectId(),
                 qry.query().taskHash(),
                 queryTopologyVersion(),
-                cctx.deploymentEnabled());
+                // Force deployment anyway if scan query is used.
+                cctx.deploymentEnabled() || (qry.query().scanFilter() != null && cctx.gridDeploy().enabled()));
 
             addQueryFuture(req.id(), fut);
 
@@ -616,7 +617,8 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
                 qry.subjectId(),
                 qry.taskHash(),
                 queryTopologyVersion(),
-                cctx.deploymentEnabled());
+                // Force deployment anyway if scan query is used.
+                cctx.deploymentEnabled() || (qry.scanFilter() != null && cctx.gridDeploy().enabled()));
 
             sendRequest(fut, req, nodes);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/16a9e6da/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheScanPredicateDeploymentSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheScanPredicateDeploymentSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheScanPredicateDeploymentSelfTest.java
new file mode 100644
index 0000000..e1fbb0d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheScanPredicateDeploymentSelfTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.List;
+import javax.cache.Cache;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheScanPredicateDeploymentSelfTest extends GridCommonAbstractTest {
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Test value. */
+    protected static final String TEST_PREDICATE = "org.apache.ignite.tests.p2p.CacheDeploymentAlwaysTruePredicate";
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        if (getTestGridName(3).equals(gridName))
+            cfg.setClassLoader(getExternalClassLoader());
+
+        cfg.setCacheConfiguration(cacheConfiguration());
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(disco);
+
+        cfg.setConnectorConfiguration(null);
+
+        return cfg;
+    }
+
+    /**
+     * @return Cache configuration.
+     * @throws Exception In case of error.
+     */
+    protected CacheConfiguration cacheConfiguration() throws Exception {
+        CacheConfiguration cfg = defaultCacheConfiguration();
+
+        cfg.setCacheMode(PARTITIONED);
+        cfg.setWriteSynchronizationMode(FULL_SYNC);
+        cfg.setRebalanceMode(SYNC);
+        cfg.setAtomicityMode(atomicityMode());
+        cfg.setBackups(1);
+
+        return cfg;
+    }
+
+    protected CacheAtomicityMode atomicityMode() {
+        return TRANSACTIONAL;
+    }
+
+    /**
+     * @throws Exception In case of error.
+     */
+    public void testDeployScanPredicate() throws Exception {
+        startGrids(4);
+
+        awaitPartitionMapExchange();
+
+        try {
+            IgniteCache<Object, Object> cache = grid(3).cache(null);
+
+            // It is important that there are no too many keys.
+            for (int i = 0; i < 1; i++)
+                cache.put(i, i);
+
+            Class predCls = grid(3).configuration().getClassLoader().loadClass(TEST_PREDICATE);
+
+            IgniteBiPredicate<Object, Object> pred = (IgniteBiPredicate<Object, Object>)predCls.newInstance();
+
+            List<Cache.Entry<Object, Object>> all = cache.query(new ScanQuery<>(pred)).getAll();
+
+            assertEquals(1, all.size());
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/16a9e6da/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
index 176ab3f..b86a33d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheValueConsistencyTran
 import org.apache.ignite.internal.processors.cache.GridCacheValueConsistencyTransactionalSelfTest;
 import org.apache.ignite.internal.processors.cache.GridCacheVersionSelfTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheInterceptorSelfTestSuite;
+import org.apache.ignite.internal.processors.cache.IgniteCacheScanPredicateDeploymentSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheAsyncOperationsTest;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheMixedModeSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteTxGetAfterStopTest;
@@ -127,6 +128,7 @@ public class IgniteCacheTestSuite3 extends TestSuite {
         suite.addTestSuite(GridCacheConditionalDeploymentSelfTest.class);
         suite.addTestSuite(GridCacheAtomicEntryProcessorDeploymentSelfTest.class);
         suite.addTestSuite(GridCacheTransactionalEntryProcessorDeploymentSelfTest.class);
+        suite.addTestSuite(IgniteCacheScanPredicateDeploymentSelfTest.class);
 
         suite.addTestSuite(GridCachePutArrayValueSelfTest.class);
         suite.addTestSuite(GridCacheReplicatedUnswapAdvancedSelfTest.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/16a9e6da/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentAlwaysTruePredicate.java
----------------------------------------------------------------------
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentAlwaysTruePredicate.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentAlwaysTruePredicate.java
new file mode 100644
index 0000000..100ab95
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentAlwaysTruePredicate.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.tests.p2p;
+
+import org.apache.ignite.lang.IgniteBiPredicate;
+
+/**
+ *
+ */
+public class CacheDeploymentAlwaysTruePredicate implements IgniteBiPredicate<Object, Object> {
+    /** */
+    @Override public boolean apply(Object o, Object o2) {
+        return true;
+    }
+}


[09/12] ignite git commit: IGNITE-2206: diagnostic added to reproduce the bug.

Posted by vo...@apache.org.
IGNITE-2206: diagnostic added to reproduce the bug.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5d2d87cf
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5d2d87cf
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5d2d87cf

Branch: refs/heads/ignite-2206
Commit: 5d2d87cff6e824b0d37f76b3f8e8941ec49d2efc
Parents: 83e1249
Author: iveselovskiy <iv...@gridgain.com>
Authored: Wed Dec 23 16:56:13 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Wed Dec 23 16:56:13 2015 +0300

----------------------------------------------------------------------
 .../ignite/igfs/IgniteHadoopFileSystemShmemAbstractSelfTest.java  | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5d2d87cf/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemAbstractSelfTest.java
index d8cf74c..53b700b 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemAbstractSelfTest.java
@@ -46,7 +46,8 @@ public abstract class IgniteHadoopFileSystemShmemAbstractSelfTest extends Ignite
     @Override protected IgfsIpcEndpointConfiguration primaryIpcEndpointConfiguration(final String gridName) {
         IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration();
 
-        endpointCfg.setType(IgfsIpcEndpointType.SHMEM);
+        // TODO: this change is for debug purposes only:
+        endpointCfg.setType(IgfsIpcEndpointType.TCP/*IgfsIpcEndpointType.SHMEM*/);
         endpointCfg.setPort(DFLT_IPC_PORT + getTestGridIndex(gridName));
 
         return endpointCfg;


[04/12] ignite git commit: ignite-1.5 Fixed hang on metadata update inside put in atomic cache when topology read lock is held. Also fixed several test issues.

Posted by vo...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index 5a4ba14..283da80 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -1009,107 +1009,111 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
      */
     private void checkEvents(final List<T3<Object, Object, Object>> expEvts, final CacheEventListener2 lsnr,
         boolean lostAllow, boolean wait) throws Exception {
-        if (wait)
+        if (wait) {
             GridTestUtils.waitForCondition(new PA() {
-                @Override public boolean apply() {
+                @Override
+                public boolean apply() {
                     return expEvts.size() == lsnr.size();
                 }
             }, 2000L);
+        }
 
-        Map<Integer, List<CacheEntryEvent<?, ?>>> prevMap = new HashMap<>(lsnr.evts.size());
+        synchronized (lsnr) {
+            Map<Integer, List<CacheEntryEvent<?, ?>>> prevMap = new HashMap<>(lsnr.evts.size());
 
-        for (Map.Entry<Integer, List<CacheEntryEvent<?, ?>>> e : lsnr.evts.entrySet())
-            prevMap.put(e.getKey(), new ArrayList<>(e.getValue()));
+            for (Map.Entry<Integer, List<CacheEntryEvent<?, ?>>> e : lsnr.evts.entrySet())
+                prevMap.put(e.getKey(), new ArrayList<>(e.getValue()));
 
-        List<T3<Object, Object, Object>> lostEvts = new ArrayList<>();
+            List<T3<Object, Object, Object>> lostEvts = new ArrayList<>();
 
-        for (T3<Object, Object, Object> exp : expEvts) {
-            List<CacheEntryEvent<?, ?>> rcvdEvts = lsnr.evts.get(exp.get1());
+            for (T3<Object, Object, Object> exp : expEvts) {
+                List<CacheEntryEvent<?, ?>> rcvdEvts = lsnr.evts.get(exp.get1());
 
-            if (F.eq(exp.get2(), exp.get3()))
-                continue;
+                if (F.eq(exp.get2(), exp.get3()))
+                    continue;
 
-            if (rcvdEvts == null || rcvdEvts.isEmpty()) {
-                lostEvts.add(exp);
+                if (rcvdEvts == null || rcvdEvts.isEmpty()) {
+                    lostEvts.add(exp);
 
-                continue;
-            }
+                    continue;
+                }
 
-            Iterator<CacheEntryEvent<?, ?>> iter = rcvdEvts.iterator();
+                Iterator<CacheEntryEvent<?, ?>> iter = rcvdEvts.iterator();
 
-            boolean found = false;
+                boolean found = false;
 
-            while (iter.hasNext()) {
-                CacheEntryEvent<?, ?> e = iter.next();
+                while (iter.hasNext()) {
+                    CacheEntryEvent<?, ?> e = iter.next();
 
-                if ((exp.get2() != null && e.getValue() != null && exp.get2().equals(e.getValue()))
-                    && equalOldValue(e, exp)) {
-                    found = true;
+                    if ((exp.get2() != null && e.getValue() != null && exp.get2().equals(e.getValue()))
+                            && equalOldValue(e, exp)) {
+                        found = true;
 
-                    iter.remove();
+                        iter.remove();
 
-                    break;
+                        break;
+                    }
                 }
-            }
 
-            // Lost event is acceptable.
-            if (!found)
-                lostEvts.add(exp);
-        }
+                // Lost event is acceptable.
+                if (!found)
+                    lostEvts.add(exp);
+            }
 
-        boolean dup = false;
+            boolean dup = false;
 
-        // Check duplicate.
-        if (!lsnr.evts.isEmpty()) {
-            for (List<CacheEntryEvent<?, ?>> evts : lsnr.evts.values()) {
-                if (!evts.isEmpty()) {
-                    for (CacheEntryEvent<?, ?> e : evts) {
-                        boolean found = false;
+            // Check duplicate.
+            if (!lsnr.evts.isEmpty()) {
+                for (List<CacheEntryEvent<?, ?>> evts : lsnr.evts.values()) {
+                    if (!evts.isEmpty()) {
+                        for (CacheEntryEvent<?, ?> e : evts) {
+                            boolean found = false;
 
-                        for (T3<Object, Object, Object> lostEvt : lostEvts) {
-                            if (e.getKey().equals(lostEvt.get1()) && e.getValue().equals(lostEvt.get2())) {
-                                found = true;
+                            for (T3<Object, Object, Object> lostEvt : lostEvts) {
+                                if (e.getKey().equals(lostEvt.get1()) && e.getValue().equals(lostEvt.get2())) {
+                                    found = true;
 
-                                lostEvts.remove(lostEvt);
+                                    lostEvts.remove(lostEvt);
 
-                                break;
+                                    break;
+                                }
                             }
-                        }
 
-                        if (!found) {
-                            dup = true;
+                            if (!found) {
+                                dup = true;
 
-                            break;
+                                break;
+                            }
                         }
                     }
                 }
-            }
 
-            if (dup) {
-                for (List<CacheEntryEvent<?, ?>> e : lsnr.evts.values()) {
-                    if (!e.isEmpty()) {
-                        for (CacheEntryEvent<?, ?> event : e)
-                            log.error("Got duplicate event: " + event);
+                if (dup) {
+                    for (List<CacheEntryEvent<?, ?>> e : lsnr.evts.values()) {
+                        if (!e.isEmpty()) {
+                            for (CacheEntryEvent<?, ?> event : e)
+                                log.error("Got duplicate event: " + event);
+                        }
                     }
                 }
             }
-        }
 
-        if (!lostAllow && lostEvts.size() > 100) {
-            log.error("Lost event cnt: " + lostEvts.size());
+            if (!lostAllow && lostEvts.size() > 100) {
+                log.error("Lost event cnt: " + lostEvts.size());
 
-            for (T3<Object, Object, Object> e : lostEvts)
-                log.error("Lost event: " + e);
+                for (T3<Object, Object, Object> e : lostEvts)
+                    log.error("Lost event: " + e);
 
-            fail("Lose events, see log for details.");
-        }
+                fail("Lose events, see log for details.");
+            }
 
-        log.error("Lost event cnt: " + lostEvts.size());
+            log.error("Lost event cnt: " + lostEvts.size());
 
-        expEvts.clear();
+            expEvts.clear();
 
-        lsnr.evts.clear();
-        lsnr.vals.clear();
+            lsnr.evts.clear();
+            lsnr.vals.clear();
+        }
     }
 
     /**
@@ -2111,7 +2115,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
         /**
          * @return Count events.
          */
-        public int size() {
+        public synchronized int size() {
             int size = 0;
 
             for (List<CacheEntryEvent<?, ?>> e : evts.values())

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
index 5f5dfd4..db59a7f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
@@ -62,6 +62,7 @@ import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -128,6 +129,8 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
 
         cfg.setDiscoverySpi(disco);
 
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
         return cfg;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java
index b529b6c..49c6968 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java
@@ -27,6 +27,7 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
@@ -38,6 +39,7 @@ import org.apache.ignite.services.ServiceDescriptor;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 /**
@@ -171,14 +173,20 @@ public class ClosureServiceClientsNodesTest extends GridCommonAbstractTest {
         for (int i = 0 ; i < NODES_CNT; i++) {
             log.info("Iteration: " + i);
 
-            Ignite ignite = grid(i);
+            final Ignite ignite = grid(i);
 
             ignite.services().deployNodeSingleton(SINGLETON_NAME, new TestService());
 
-            ClusterGroup grp = ignite.cluster();
+            final ClusterGroup grp = ignite.cluster();
 
             assertEquals(NODES_CNT, grp.nodes().size());
 
+            GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    return ignite.services(grp).serviceDescriptors().size() == 1;
+                }
+            }, 5000);
+
             Collection<ServiceDescriptor> srvDscs = ignite.services(grp).serviceDescriptors();
 
             assertEquals(1, srvDscs.size());
@@ -206,14 +214,20 @@ public class ClosureServiceClientsNodesTest extends GridCommonAbstractTest {
         for (int i = 0 ; i < NODES_CNT; i++) {
             log.info("Iteration: " + i);
 
-            Ignite ignite = grid(i);
+            final Ignite ignite = grid(i);
 
             ignite.services(ignite.cluster().forClients()).deployNodeSingleton(SINGLETON_NAME, new TestService());
 
-            ClusterGroup grp = ignite.cluster();
+            final ClusterGroup grp = ignite.cluster();
 
             assertEquals(NODES_CNT, grp.nodes().size());
 
+            GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    return ignite.services(grp).serviceDescriptors().size() == 1;
+                }
+            }, 5000);
+
             Collection<ServiceDescriptor> srvDscs = ignite.services(grp).serviceDescriptors();
 
             assertEquals(1, srvDscs.size());

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
index dfea37a..92b18ab 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
@@ -17,15 +17,18 @@
 
 package org.apache.ignite.internal.processors.service;
 
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteServices;
 import org.apache.ignite.Ignition;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.services.Service;
 import org.apache.ignite.services.ServiceContext;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 /**
@@ -49,10 +52,8 @@ public class GridServiceProcessorStopSelfTest extends GridCommonAbstractTest {
 
         final Ignite ignite = startGrid(0);
 
-        Thread t = new Thread(new Runnable() {
-            @Override public void run() {
-                Thread.currentThread().setName("deploy-thread");
-
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
                 IgniteServices svcs = ignite.services();
 
                 IgniteServices services = svcs.withAsync();
@@ -67,13 +68,13 @@ public class GridServiceProcessorStopSelfTest extends GridCommonAbstractTest {
                 catch (IgniteException e) {
                     finishLatch.countDown();
                 }
-                catch (Throwable e) {
-                    log.error("Service deployment error: ", e);
+                finally {
+                    finishLatch.countDown();
                 }
-            }
-        });
 
-        t.start();
+                return null;
+            }
+        }, "deploy-thread");
 
         depLatch.await();
 
@@ -85,6 +86,8 @@ public class GridServiceProcessorStopSelfTest extends GridCommonAbstractTest {
             U.dumpThreads(log);
 
         assertTrue("Deploy future isn't completed", wait);
+
+        fut.get();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
index 731b0c7..7bbf531 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
@@ -37,6 +37,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.nio.GridCommunicationClient;
 import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
@@ -55,6 +56,7 @@ import org.apache.ignite.spi.communication.GridTestMessage;
 import org.apache.ignite.testframework.GridSpiTestContext;
 import org.apache.ignite.testframework.GridTestNode;
 import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
 import org.apache.ignite.testframework.junits.IgniteTestResources;
 import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest;
 import org.jsr166.ConcurrentLinkedDeque8;
@@ -90,6 +92,9 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
     /** Initialized nodes */
     private static final List<ClusterNode> nodes = new ArrayList<>();
 
+    /** */
+    private static GridTimeoutProcessor timeoutProcessor;
+
     /** Flag indicating if listener should reject messages. */
     private static boolean reject;
 
@@ -472,6 +477,12 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
 
         Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>();
 
+        timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log));
+
+        timeoutProcessor.start();
+
+        timeoutProcessor.onKernalStart();
+
         for (int i = 0; i < getSpiCount(); i++) {
             CommunicationSpi<Message> spi = newCommunicationSpi();
 
@@ -485,6 +496,8 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
 
             GridSpiTestContext ctx = initSpiContext();
 
+            ctx.timeoutProcessor(timeoutProcessor);
+
             ctx.setLocalNode(node);
 
             info(">>> Initialized context: nodeId=" + ctx.localNode().id());
@@ -548,6 +561,14 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
 
     /** {@inheritDoc} */
     @Override protected void afterTestsStopped() throws Exception {
+        if (timeoutProcessor != null) {
+            timeoutProcessor.onKernalStop(true);
+
+            timeoutProcessor.stop(true);
+
+            timeoutProcessor = null;
+        }
+
         for (CommunicationSpi<Message> spi : spis.values()) {
             spi.onContextDestroyed();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 5af0596..0df7da6 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -897,8 +897,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
      */
     public void testIpFinderCleaning() throws Exception {
         try {
-            ipFinder.registerAddresses(Arrays.asList(new InetSocketAddress("host1", 1024),
-                new InetSocketAddress("host2", 1024)));
+            ipFinder.registerAddresses(Arrays.asList(new InetSocketAddress("1.1.1.1", 1024),
+                new InetSocketAddress("1.1.1.2", 1024)));
 
             Ignite g1 = startGrid(1);
 
@@ -912,13 +912,19 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
                 }
             }, timeout);
 
+            if (ipFinder.getRegisteredAddresses().size() != 1) {
+                log.error("Failed to wait for IP cleanup, will dump threads.");
+
+                U.dumpThreads(log);
+            }
+
             assert ipFinder.getRegisteredAddresses().size() == 1 : "ipFinder=" + ipFinder.getRegisteredAddresses();
 
             // Check that missing addresses are returned back.
             ipFinder.unregisterAddresses(ipFinder.getRegisteredAddresses()); // Unregister valid address.
 
-            ipFinder.registerAddresses(Arrays.asList(new InetSocketAddress("host1", 1024),
-                new InetSocketAddress("host2", 1024)));
+            ipFinder.registerAddresses(Arrays.asList(new InetSocketAddress("1.1.1.1", 1024),
+                new InetSocketAddress("1.1.1.2", 1024)));
 
             GridTestUtils.waitForCondition(new GridAbsPredicate() {
                 @Override public boolean apply() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index e257a97..0bffe8b 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -41,6 +41,8 @@ import org.apache.ignite.internal.managers.communication.GridIoManager;
 import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.timeout.GridSpiTimeoutObject;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.MessageFactory;
@@ -89,6 +91,16 @@ public class GridSpiTestContext implements IgniteSpiContext {
     /** */
     private MessageFactory factory;
 
+    /** */
+    private GridTimeoutProcessor timeoutProcessor;
+
+    /**
+     * @param timeoutProcessor Timeout processor.
+     */
+    public void timeoutProcessor(GridTimeoutProcessor timeoutProcessor) {
+        this.timeoutProcessor = timeoutProcessor;
+    }
+
     /** {@inheritDoc} */
     @Override public Collection<ClusterNode> remoteNodes() {
         return rmtNodes;
@@ -530,12 +542,14 @@ public class GridSpiTestContext implements IgniteSpiContext {
 
     /** {@inheritDoc} */
     @Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) {
-        // No-op.
+        if (timeoutProcessor != null)
+            timeoutProcessor.addTimeoutObject(new GridSpiTimeoutObject(obj));
     }
 
     /** {@inheritDoc} */
     @Override public void removeTimeoutObject(IgniteSpiTimeoutObject obj) {
-        // No-op.
+        if (timeoutProcessor != null)
+            timeoutProcessor.removeTimeoutObject(new GridSpiTimeoutObject(obj));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionFilter.java
----------------------------------------------------------------------
diff --git a/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionFilter.java b/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionFilter.java
index 77e2dae..4a84931 100644
--- a/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionFilter.java
+++ b/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionFilter.java
@@ -38,14 +38,16 @@ import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteTransactions;
-import org.apache.ignite.cache.CachePartialUpdateException;
+import org.apache.ignite.cluster.ClusterTopologyException;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.startup.servlet.ServletContextListenerStartup;
 import org.apache.ignite.transactions.Transaction;
 
@@ -191,6 +193,9 @@ public class WebSessionFilter implements Filter {
     /** Transactions enabled flag. */
     private boolean txEnabled;
 
+    /** */
+    private int retries;
+
     /** {@inheritDoc} */
     @Override public void init(FilterConfig cfg) throws ServletException {
         ctx = cfg.getServletContext();
@@ -207,8 +212,6 @@ public class WebSessionFilter implements Filter {
             cfg.getInitParameter(WEB_SES_MAX_RETRIES_ON_FAIL_NAME_PARAM),
             ctx.getInitParameter(WEB_SES_MAX_RETRIES_ON_FAIL_NAME_PARAM));
 
-        int retries;
-
         try {
             retries = retriesStr != null ? Integer.parseInt(retriesStr) : DFLT_MAX_RETRIES_ON_FAIL;
         }
@@ -226,10 +229,6 @@ public class WebSessionFilter implements Filter {
 
         log = webSesIgnite.log();
 
-        if (webSesIgnite == null)
-            throw new IgniteException("Grid for web sessions caching is not started (is it configured?): " +
-                gridName);
-
         cache = webSesIgnite.cache(cacheName);
 
         if (cache == null)
@@ -409,41 +408,62 @@ public class WebSessionFilter implements Filter {
 
         WebSession cached = new WebSession(ses, true);
 
-        try {
-            while (true) {
-                try {
-                    IgniteCache<String, WebSession> cache0;
-
-                    if (cached.getMaxInactiveInterval() > 0) {
-                        long ttl = cached.getMaxInactiveInterval() * 1000;
+        for (int i = 0; i < retries; i++) {
+            try {
+                IgniteCache<String, WebSession> cache0;
 
-                        ExpiryPolicy plc = new ModifiedExpiryPolicy(new Duration(MILLISECONDS, ttl));
+                if (cached.getMaxInactiveInterval() > 0) {
+                    long ttl = cached.getMaxInactiveInterval() * 1000;
 
-                        cache0 = cache.withExpiryPolicy(plc);
-                    }
-                    else
-                        cache0 = cache;
+                    ExpiryPolicy plc = new ModifiedExpiryPolicy(new Duration(MILLISECONDS, ttl));
 
-                    WebSession old = cache0.getAndPutIfAbsent(sesId, cached);
+                    cache0 = cache.withExpiryPolicy(plc);
+                }
+                else
+                    cache0 = cache;
 
-                    if (old != null) {
-                        cached = old;
+                WebSession old = cache0.getAndPutIfAbsent(sesId, cached);
 
-                        if (cached.isNew())
-                            cached = new WebSession(cached, false);
-                    }
+                if (old != null) {
+                    cached = old;
 
-                    break;
+                    if (cached.isNew())
+                        cached = new WebSession(cached, false);
                 }
-                catch (CachePartialUpdateException e) {
+
+                break;
+            }
+            catch (CacheException | IgniteException e) {
+                if (log.isDebugEnabled())
+                    log.debug(e.getMessage());
+
+                if (i == retries - 1)
+                    throw new IgniteException("Failed to save session: " + sesId, e);
+                else {
                     if (log.isDebugEnabled())
-                        log.debug(e.getMessage());
+                        log.debug("Failed to save session (will retry): " + sesId);
+
+                    IgniteFuture<?> retryFut = null;
+
+                    if (X.hasCause(e, ClusterTopologyException.class)) {
+                        ClusterTopologyException cause = X.cause(e, ClusterTopologyException.class);
+
+                        assert cause != null : e;
+
+                        retryFut = cause.retryReadyFuture();
+                    }
+
+                    if (retryFut != null) {
+                        try {
+                            retryFut.get();
+                        }
+                        catch (IgniteException retryErr) {
+                            throw new IgniteException("Failed to save session: " + sesId, retryErr);
+                        }
+                    }
                 }
             }
         }
-        catch (CacheException e) {
-            throw new IgniteException("Failed to save session: " + sesId, e);
-        }
 
         return cached;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionListener.java
----------------------------------------------------------------------
diff --git a/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionListener.java b/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionListener.java
index 82f1633..b826031 100644
--- a/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionListener.java
+++ b/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionListener.java
@@ -30,12 +30,14 @@ import javax.cache.processor.EntryProcessor;
 import javax.cache.processor.MutableEntry;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.CachePartialUpdateException;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.cluster.ClusterTopologyException;
 import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
@@ -117,7 +119,7 @@ class WebSessionListener {
 
                     break;
                 }
-                catch (CachePartialUpdateException ignored) {
+                catch (CacheException | IgniteException e) {
                     if (i == retries - 1) {
                         U.warn(log, "Failed to apply updates for session (maximum number of retries exceeded) [sesId=" +
                             sesId + ", retries=" + retries + ']');
@@ -125,12 +127,25 @@ class WebSessionListener {
                     else {
                         U.warn(log, "Failed to apply updates for session (will retry): " + sesId);
 
-                        U.sleep(RETRY_DELAY);
+                        IgniteFuture<?> retryFut = null;
+
+                        if (X.hasCause(e, ClusterTopologyException.class)) {
+                            ClusterTopologyException cause = X.cause(e, ClusterTopologyException.class);
+
+                            assert cause != null : e;
+
+                            retryFut = cause.retryReadyFuture();
+                        }
+
+                        if (retryFut != null)
+                            retryFut.get();
+                        else
+                            U.sleep(RETRY_DELAY);
                     }
                 }
             }
         }
-        catch (CacheException | IgniteInterruptedCheckedException e) {
+        catch (Exception e) {
             U.error(log, "Failed to update session attributes [id=" + sesId + ']', e);
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/web/src/test/java/org/apache/ignite/internal/websession/WebSessionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/web/src/test/java/org/apache/ignite/internal/websession/WebSessionSelfTest.java b/modules/web/src/test/java/org/apache/ignite/internal/websession/WebSessionSelfTest.java
index 4508edb..7a321d6 100644
--- a/modules/web/src/test/java/org/apache/ignite/internal/websession/WebSessionSelfTest.java
+++ b/modules/web/src/test/java/org/apache/ignite/internal/websession/WebSessionSelfTest.java
@@ -142,7 +142,6 @@ public class WebSessionSelfTest extends GridCommonAbstractTest {
                     }
 
                     assert idx != -1;
-                    assert srv != null;
 
                     stopServer(srv);
 
@@ -181,7 +180,6 @@ public class WebSessionSelfTest extends GridCommonAbstractTest {
                 }
 
                 assert idx != -1;
-                assert srv != null;
 
                 int port = TEST_JETTY_PORT + idx;
 


[02/12] ignite git commit: Merge branch 'ignite-2206' of https://github.com/apache/ignite into ignite-2206

Posted by vo...@apache.org.
Merge branch 'ignite-2206' of https://github.com/apache/ignite into ignite-2206


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5a4586eb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5a4586eb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5a4586eb

Branch: refs/heads/ignite-2206
Commit: 5a4586ebd43b356361957b5eebc6bbfd206168fc
Parents: e35ae06 31d3289
Author: iveselovskiy <iv...@gridgain.com>
Authored: Wed Dec 23 13:45:10 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Wed Dec 23 13:45:10 2015 +0300

----------------------------------------------------------------------
 .../igfs/secondary/IgfsSecondaryFileSystem.java |  22 --
 .../processors/hadoop/HadoopPayloadAware.java   |  22 +-
 .../ignite/internal/processors/igfs/IgfsEx.java |  13 -
 .../processors/igfs/IgfsHandshakeResponse.java  |   3 +-
 .../internal/processors/igfs/IgfsImpl.java      |  10 +-
 .../internal/processors/igfs/IgfsPaths.java     |  32 +--
 .../igfs/IgfsSecondaryFileSystemImpl.java       |  15 --
 .../ignite/internal/util/lang/GridFunc.java     |  24 --
 .../visor/node/VisorIgfsConfiguration.java      |  57 -----
 .../fs/CachingHadoopFileSystemFactory.java      | 208 +++++++++++++++
 .../hadoop/fs/HadoopFileSystemFactory.java      |  41 +++
 .../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 256 ++++++-------------
 .../fs/v1/DefaultHadoopFileSystemFactory.java   | 185 --------------
 .../hadoop/fs/v1/HadoopFileSystemFactory.java   |  21 --
 .../hadoop/fs/v1/IgniteHadoopFileSystem.java    |   4 +-
 .../hadoop/IgfsSecondaryFileSystemEx.java       |  15 --
 .../hadoop/SecondaryFileSystemProvider.java     |   5 +-
 .../IgniteHadoopFileSystemAbstractSelfTest.java |   8 +-
 18 files changed, 375 insertions(+), 566 deletions(-)
----------------------------------------------------------------------