You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/10 16:11:24 UTC

[03/28] incubator-ignite git commit: ignite-545: merge from sprint-6

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamSelfTestSuite.java
new file mode 100644
index 0000000..a277fc8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamSelfTestSuite.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.testsuites;
+
+import org.apache.ignite.stream.socket.*;
+
+import junit.framework.*;
+
+/**
+ * Stream test suite.
+ */
+public class IgniteStreamSelfTestSuite extends TestSuite {
+    /**
+     * @return Stream tests suite.
+     * @throws Exception If failed.
+     */
+    public static TestSuite suite() throws Exception {
+        TestSuite suite = new TestSuite("Ignite Stream Test Suite");
+
+        suite.addTest(new TestSuite(SocketStreamerSelfTest.class));
+
+        return suite;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
index 32cd038..1c75a7f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
@@ -67,7 +67,7 @@ public class IgniteUtilSelfTestSuite extends TestSuite {
         suite.addTestSuite(GridNioSelfTest.class);
         suite.addTestSuite(GridNioFilterChainSelfTest.class);
         suite.addTestSuite(GridNioSslSelfTest.class);
-        suite.addTestSuite(GridNioDelimitedBufferTest.class);
+        suite.addTestSuite(GridNioDelimitedBufferSelfTest.class);
 
         return suite;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/gce/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/gce/TcpDiscoveryGoogleStorageIpFinder.java
----------------------------------------------------------------------
diff --git a/modules/gce/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/gce/TcpDiscoveryGoogleStorageIpFinder.java b/modules/gce/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/gce/TcpDiscoveryGoogleStorageIpFinder.java
index b496f60..48991e8 100644
--- a/modules/gce/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/gce/TcpDiscoveryGoogleStorageIpFinder.java
+++ b/modules/gce/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/gce/TcpDiscoveryGoogleStorageIpFinder.java
@@ -68,34 +68,37 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
  * Note that this finder is shared by default (see {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder#isShared()}.
  */
 public class TcpDiscoveryGoogleStorageIpFinder extends TcpDiscoveryIpFinderAdapter {
-    /* Default object's content. */
+    /** Default object's content. */
     private final static ByteArrayInputStream OBJECT_CONTENT =  new ByteArrayInputStream(new byte[0]);
 
     /** Grid logger. */
     @LoggerResource
     private IgniteLogger log;
 
-    /* Google Cloud Platform's project name.*/
+    /** Google Cloud Platform's project name.*/
     private String projectName;
 
-    /* Google Storage bucket name. */
+    /** Google Storage bucket name. */
     private String bucketName;
 
-    /* Service account p12 private key file name. */
-    private String serviceAccountP12FilePath;
+    /** Service account p12 private key file name. */
+    private String srvcAccountP12FilePath;
 
-    /* Service account id. */
-    private String serviceAccountId;
+    /** Service account id. */
+    private String srvcAccountId;
 
-    /* Google storage. */
+    /** Google storage. */
     private Storage storage;
 
-    /* Init routine guard. */
+    /** Init routine guard. */
     private final AtomicBoolean initGuard = new AtomicBoolean();
 
-    /* Init routine latch. */
+    /** Init routine latch. */
     private final CountDownLatch initLatch = new CountDownLatch(1);
 
+    /**
+     *
+     */
     public TcpDiscoveryGoogleStorageIpFinder() {
         setShared(true);
     }
@@ -221,7 +224,7 @@ public class TcpDiscoveryGoogleStorageIpFinder extends TcpDiscoveryIpFinderAdapt
      */
     @IgniteSpiConfiguration(optional = false)
     public void setServiceAccountP12FilePath(String p12FileName) {
-        this.serviceAccountP12FilePath = p12FileName;
+        this.srvcAccountP12FilePath = p12FileName;
     }
 
     /**
@@ -235,7 +238,7 @@ public class TcpDiscoveryGoogleStorageIpFinder extends TcpDiscoveryIpFinderAdapt
      */
     @IgniteSpiConfiguration(optional = false)
     public void setServiceAccountId(String id) {
-        this.serviceAccountId = id;
+        this.srvcAccountId = id;
     }
 
     /**
@@ -245,13 +248,13 @@ public class TcpDiscoveryGoogleStorageIpFinder extends TcpDiscoveryIpFinderAdapt
      */
     private void init() throws IgniteSpiException {
         if (initGuard.compareAndSet(false, true)) {
-            if (serviceAccountId == null ||
-                serviceAccountP12FilePath == null ||
+            if (srvcAccountId == null ||
+                srvcAccountP12FilePath == null ||
                 projectName == null ||
                 bucketName == null) {
                 throw new IgniteSpiException(
                     "One or more of the required parameters is not set [serviceAccountId=" +
-                        serviceAccountId + ", serviceAccountP12FilePath=" + serviceAccountP12FilePath + ", projectName=" +
+                        srvcAccountId + ", serviceAccountP12FilePath=" + srvcAccountP12FilePath + ", projectName=" +
                         projectName + ", bucketName=" + bucketName + "]");
             }
 
@@ -265,12 +268,12 @@ public class TcpDiscoveryGoogleStorageIpFinder extends TcpDiscoveryIpFinderAdapt
                     throw new IgniteSpiException(e);
                 }
 
-                GoogleCredential credential;
+                GoogleCredential cred;
 
                 try {
-                    credential = new GoogleCredential.Builder().setTransport(httpTransport)
-                        .setJsonFactory(JacksonFactory.getDefaultInstance()).setServiceAccountId(serviceAccountId)
-                        .setServiceAccountPrivateKeyFromP12File(new File(serviceAccountP12FilePath))
+                    cred = new GoogleCredential.Builder().setTransport(httpTransport)
+                        .setJsonFactory(JacksonFactory.getDefaultInstance()).setServiceAccountId(srvcAccountId)
+                        .setServiceAccountPrivateKeyFromP12File(new File(srvcAccountP12FilePath))
                         .setServiceAccountScopes(Collections.singleton(StorageScopes.DEVSTORAGE_FULL_CONTROL)).build();
 
                 }
@@ -279,7 +282,7 @@ public class TcpDiscoveryGoogleStorageIpFinder extends TcpDiscoveryIpFinderAdapt
                 }
 
                 try {
-                    storage = new Storage.Builder(httpTransport, JacksonFactory.getDefaultInstance(), credential)
+                    storage = new Storage.Builder(httpTransport, JacksonFactory.getDefaultInstance(), cred)
                         .setApplicationName(projectName).build();
                 }
                 catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
index 66e9761..d910507 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
@@ -20,10 +20,12 @@ package org.apache.ignite.hadoop.fs;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.mapreduce.*;
 import org.apache.ignite.*;
 import org.apache.ignite.internal.processors.hadoop.*;
 import org.apache.ignite.internal.processors.hadoop.counter.*;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.typedef.*;
 
 import java.io.*;
@@ -37,9 +39,6 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter
     public static final String PERFORMANCE_COUNTER_FILE_NAME = "performance";
 
     /** */
-    private static final String DEFAULT_USER_NAME = "anonymous";
-
-    /** */
     public static final String COUNTER_WRITER_DIR_PROPERTY = "ignite.counters.fswriter.directory";
 
     /** */
@@ -52,15 +51,14 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter
     @Override public void write(HadoopJobInfo jobInfo, HadoopJobId jobId, HadoopCounters cntrs)
         throws IgniteCheckedException {
 
-        Configuration hadoopCfg = new Configuration();
+        Configuration hadoopCfg = HadoopUtils.safeCreateConfiguration();
 
         for (Map.Entry<String, String> e : ((HadoopDefaultJobInfo)jobInfo).properties().entrySet())
             hadoopCfg.set(e.getKey(), e.getValue());
 
         String user = jobInfo.user();
 
-        if (F.isEmpty(user))
-            user = DEFAULT_USER_NAME;
+        user = IgfsUtils.fixUserName(user);
 
         String dir = jobInfo.property(COUNTER_WRITER_DIR_PROPERTY);
 
@@ -72,7 +70,9 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter
         HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(cntrs, null);
 
         try {
-            FileSystem fs = jobStatPath.getFileSystem(hadoopCfg);
+            hadoopCfg.set(MRJobConfig.USER_NAME, user);
+
+            FileSystem fs = HadoopUtils.fileSystemForMrUser(jobStatPath.toUri(), hadoopCfg, true);
 
             fs.mkdirs(jobStatPath);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
index ba891f8..6a630fb 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
@@ -20,15 +20,16 @@ package org.apache.ignite.hadoop.fs;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.permission.*;
-import org.apache.hadoop.ipc.*;
 import org.apache.ignite.*;
 import org.apache.ignite.igfs.*;
 import org.apache.ignite.igfs.secondary.*;
 import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.fs.*;
 import org.apache.ignite.internal.processors.hadoop.igfs.*;
 import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.jetbrains.annotations.*;
+import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap.*;
 
 import java.io.*;
 import java.net.*;
@@ -37,15 +38,45 @@ import java.util.*;
 import static org.apache.ignite.internal.processors.igfs.IgfsEx.*;
 
 /**
- * Adapter to use any Hadoop file system {@link FileSystem} as  {@link IgfsSecondaryFileSystem}.
+ * Adapter to use any Hadoop file system {@link FileSystem} as {@link IgfsSecondaryFileSystem}.
+ * In fact, this class deals with different FileSystems depending on the user context,
+ * see {@link IgfsUserContext#currentUser()}.
  */
-public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem, AutoCloseable {
-    /** Hadoop file system. */
-    private final FileSystem fileSys;
-
-    /** Properties of file system */
+public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem {
+    /** Properties of file system, see {@link #properties()}
+     *
+     * See {@link IgfsEx#SECONDARY_FS_CONFIG_PATH}
+     * See {@link IgfsEx#SECONDARY_FS_URI}
+     * See {@link IgfsEx#SECONDARY_FS_USER_NAME}
+     * */
     private final Map<String, String> props = new HashMap<>();
 
+    /** Secondary file system provider. */
+    private final SecondaryFileSystemProvider secProvider;
+
+    /** The default user name. It is used if no user context is set. */
+    private final String dfltUserName;
+
+    /** FileSystem instance created for the default user.
+     * Stored outside the fileSysLazyMap due to performance reasons. */
+    private final FileSystem dfltFs;
+
+    /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method. */
+    private final HadoopLazyConcurrentMap<String, FileSystem> fileSysLazyMap = new HadoopLazyConcurrentMap<>(
+        new ValueFactory<String, FileSystem>() {
+            @Override public FileSystem createValue(String key) {
+                try {
+                    assert !F.isEmpty(key);
+
+                    return secProvider.createFileSystem(key);
+                }
+                catch (IOException ioe) {
+                    throw new IgniteException(ioe);
+                }
+            }
+        }
+    );
+
     /**
      * Simple constructor that is to be used by default.
      *
@@ -77,8 +108,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
      * @throws IgniteCheckedException In case of error.
      */
     public IgniteHadoopIgfsSecondaryFileSystem(@Nullable String uri, @Nullable String cfgPath,
-        @Nullable String userName)
-            throws IgniteCheckedException {
+        @Nullable String userName) throws IgniteCheckedException {
         // Treat empty uri and userName arguments as nulls to improve configuration usability:
         if (F.isEmpty(uri))
             uri = null;
@@ -89,27 +119,31 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
         if (F.isEmpty(userName))
             userName = null;
 
+        this.dfltUserName = IgfsUtils.fixUserName(userName);
+
         try {
-            SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(uri, cfgPath, userName);
+            this.secProvider = new SecondaryFileSystemProvider(uri, cfgPath);
 
-            fileSys = secProvider.createFileSystem();
+            // File system creation for the default user name.
+            // The value is *not* stored in the 'fileSysLazyMap' cache, but saved in field:
+            this.dfltFs = secProvider.createFileSystem(dfltUserName);
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
 
-            uri = secProvider.uri().toString();
+        assert dfltFs != null;
 
-            if (!uri.endsWith("/"))
-                uri += "/";
+        uri = secProvider.uri().toString();
 
-            if (cfgPath != null)
-                props.put(SECONDARY_FS_CONFIG_PATH, cfgPath);
+        if (!uri.endsWith("/"))
+            uri += "/";
 
-            if (userName != null)
-                props.put(SECONDARY_FS_USER_NAME, userName);
+        if (cfgPath != null)
+            props.put(SECONDARY_FS_CONFIG_PATH, cfgPath);
 
-            props.put(SECONDARY_FS_URI, uri);
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
-        }
+        props.put(SECONDARY_FS_URI, uri);
+        props.put(SECONDARY_FS_USER_NAME, dfltUserName);
     }
 
     /**
@@ -119,7 +153,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
      * @return Hadoop path.
      */
     private Path convert(IgfsPath path) {
-        URI uri = fileSys.getUri();
+        URI uri = fileSysForUser().getUri();
 
         return new Path(uri.getScheme(), uri.getAuthority(), path.toString());
     }
@@ -131,14 +165,9 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
      * @param detailMsg Detailed error message.
      * @return Appropriate exception.
      */
-    @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "unchecked"})
     private IgfsException handleSecondaryFsError(IOException e, String detailMsg) {
-        boolean wrongVer = X.hasCause(e, RemoteException.class) ||
-            (e.getMessage() != null && e.getMessage().contains("Failed on local"));
-
-        return !wrongVer ? cast(detailMsg, e) :
-            new IgfsInvalidHdfsVersionException("HDFS version you are connecting to differs from local " +
-                "version.", e);    }
+        return cast(detailMsg, e);
+    }
 
     /**
      * Cast IO exception to IGFS exception.
@@ -178,7 +207,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
     /** {@inheritDoc} */
     @Override public boolean exists(IgfsPath path) {
         try {
-            return fileSys.exists(convert(path));
+            return fileSysForUser().exists(convert(path));
         }
         catch (IOException e) {
             throw handleSecondaryFsError(e, "Failed to check file existence [path=" + path + "]");
@@ -189,6 +218,8 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
     @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) {
         HadoopIgfsProperties props0 = new HadoopIgfsProperties(props);
 
+        final FileSystem fileSys = fileSysForUser();
+
         try {
             if (props0.userName() != null || props0.groupName() != null)
                 fileSys.setOwner(convert(path), props0.userName(), props0.groupName());
@@ -208,7 +239,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
     @Override public void rename(IgfsPath src, IgfsPath dest) {
         // Delegate to the secondary file system.
         try {
-            if (!fileSys.rename(convert(src), convert(dest)))
+            if (!fileSysForUser().rename(convert(src), convert(dest)))
                 throw new IgfsException("Failed to rename (secondary file system returned false) " +
                     "[src=" + src + ", dest=" + dest + ']');
         }
@@ -220,7 +251,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
     /** {@inheritDoc} */
     @Override public boolean delete(IgfsPath path, boolean recursive) {
         try {
-            return fileSys.delete(convert(path), recursive);
+            return fileSysForUser().delete(convert(path), recursive);
         }
         catch (IOException e) {
             throw handleSecondaryFsError(e, "Failed to delete file [path=" + path + ", recursive=" + recursive + "]");
@@ -230,7 +261,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
     /** {@inheritDoc} */
     @Override public void mkdirs(IgfsPath path) {
         try {
-            if (!fileSys.mkdirs(convert(path)))
+            if (!fileSysForUser().mkdirs(convert(path)))
                 throw new IgniteException("Failed to make directories [path=" + path + "]");
         }
         catch (IOException e) {
@@ -241,7 +272,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
     /** {@inheritDoc} */
     @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) {
         try {
-            if (!fileSys.mkdirs(convert(path), new HadoopIgfsProperties(props).permission()))
+            if (!fileSysForUser().mkdirs(convert(path), new HadoopIgfsProperties(props).permission()))
                 throw new IgniteException("Failed to make directories [path=" + path + ", props=" + props + "]");
         }
         catch (IOException e) {
@@ -252,7 +283,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
     /** {@inheritDoc} */
     @Override public Collection<IgfsPath> listPaths(IgfsPath path) {
         try {
-            FileStatus[] statuses = fileSys.listStatus(convert(path));
+            FileStatus[] statuses = fileSysForUser().listStatus(convert(path));
 
             if (statuses == null)
                 throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
@@ -275,7 +306,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
     /** {@inheritDoc} */
     @Override public Collection<IgfsFile> listFiles(IgfsPath path) {
         try {
-            FileStatus[] statuses = fileSys.listStatus(convert(path));
+            FileStatus[] statuses = fileSysForUser().listStatus(convert(path));
 
             if (statuses == null)
                 throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
@@ -302,13 +333,13 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
 
     /** {@inheritDoc} */
     @Override public IgfsSecondaryFileSystemPositionedReadable open(IgfsPath path, int bufSize) {
-        return new HadoopIgfsSecondaryFileSystemPositionedReadable(fileSys, convert(path), bufSize);
+        return new HadoopIgfsSecondaryFileSystemPositionedReadable(fileSysForUser(), convert(path), bufSize);
     }
 
     /** {@inheritDoc} */
     @Override public OutputStream create(IgfsPath path, boolean overwrite) {
         try {
-            return fileSys.create(convert(path), overwrite);
+            return fileSysForUser().create(convert(path), overwrite);
         }
         catch (IOException e) {
             throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite=" + overwrite + "]");
@@ -322,8 +353,8 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
             new HadoopIgfsProperties(props != null ? props : Collections.<String, String>emptyMap());
 
         try {
-            return fileSys.create(convert(path), props0.permission(), overwrite, bufSize, (short)replication, blockSize,
-                null);
+            return fileSysForUser().create(convert(path), props0.permission(), overwrite, bufSize,
+                (short)replication, blockSize, null);
         }
         catch (IOException e) {
             throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", props=" + props +
@@ -336,7 +367,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
     @Override public OutputStream append(IgfsPath path, int bufSize, boolean create,
         @Nullable Map<String, String> props) {
         try {
-            return fileSys.append(convert(path), bufSize);
+            return fileSysForUser().append(convert(path), bufSize);
         }
         catch (IOException e) {
             throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize=" + bufSize + "]");
@@ -346,7 +377,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
     /** {@inheritDoc} */
     @Override public IgfsFile info(final IgfsPath path) {
         try {
-            final FileStatus status = fileSys.getFileStatus(convert(path));
+            final FileStatus status = fileSysForUser().getFileStatus(convert(path));
 
             if (status == null)
                 return null;
@@ -421,7 +452,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
         try {
             // We don't use FileSystem#getUsed() since it counts only the files
             // in the filesystem root, not all the files recursively.
-            return fileSys.getContentSummary(new Path("/")).getSpaceConsumed();
+            return fileSysForUser().getContentSummary(new Path("/")).getSpaceConsumed();
         }
         catch (IOException e) {
             throw handleSecondaryFsError(e, "Failed to get used space size of file system.");
@@ -429,25 +460,57 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public Map<String, String> properties() {
+    @Override public Map<String, String> properties() {
         return props;
     }
 
     /** {@inheritDoc} */
-    @Override public void close() throws IgniteCheckedException {
+    @Override public void close() throws IgniteException {
+        Exception e = null;
+
         try {
-            fileSys.close();
+            dfltFs.close();
         }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
+        catch (Exception e0) {
+            e = e0;
+        }
+
+        try {
+            fileSysLazyMap.close();
+        }
+        catch (IgniteCheckedException ice) {
+            if (e == null)
+                e = ice;
         }
+
+        if (e != null)
+            throw new IgniteException(e);
     }
 
     /**
      * Gets the underlying {@link FileSystem}.
+     * This method is used solely for testing.
      * @return the underlying Hadoop {@link FileSystem}.
      */
     public FileSystem fileSystem() {
-        return fileSys;
+        return fileSysForUser();
+    }
+
+    /**
+     * Gets the FileSystem for the current context user.
+     * @return the FileSystem instance, never null.
+     */
+    private FileSystem fileSysForUser() {
+        String user = IgfsUserContext.currentUser();
+
+        if (F.isEmpty(user))
+            user = dfltUserName; // default is never empty.
+
+        assert !F.isEmpty(user);
+
+        if (F.eq(user, dfltUserName))
+            return dfltFs; // optimization
+
+        return fileSysLazyMap.getOrCreate(user);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index 1f53a06..9d94e5b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.permission.*;
 import org.apache.hadoop.hdfs.*;
-import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.security.*;
 import org.apache.hadoop.util.*;
 import org.apache.ignite.*;
 import org.apache.ignite.igfs.*;
@@ -97,21 +97,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
     /** Grid remote client. */
     private HadoopIgfsWrapper rmtClient;
 
-    /** User name for each thread. */
-    private final ThreadLocal<String> userName = new ThreadLocal<String>(){
-        /** {@inheritDoc} */
-        @Override protected String initialValue() {
-            return DFLT_USER_NAME;
-        }
-    };
-
-    /** Working directory for each thread. */
-    private final ThreadLocal<Path> workingDir = new ThreadLocal<Path>(){
-        /** {@inheritDoc} */
-        @Override protected Path initialValue() {
-            return getHomeDirectory();
-        }
-    };
+    /** working directory. */
+    private Path workingDir;
 
     /** Default replication factor. */
     private short dfltReplication;
@@ -129,6 +116,9 @@ public class IgniteHadoopFileSystem extends FileSystem {
     /** Secondary URI string. */
     private URI secondaryUri;
 
+    /** The user name this file system was created on behalf of. */
+    private String user;
+
     /** IGFS mode resolver. */
     private IgfsModeResolver modeRslvr;
 
@@ -153,9 +143,6 @@ public class IgniteHadoopFileSystem extends FileSystem {
     /** Custom-provided sequential reads before prefetch. */
     private int seqReadsBeforePrefetch;
 
-    /** The cache was disabled when the instance was creating. */
-    private boolean cacheEnabled;
-
     /** {@inheritDoc} */
     @Override public URI getUri() {
         if (uri == null)
@@ -182,6 +169,22 @@ public class IgniteHadoopFileSystem extends FileSystem {
     }
 
     /**
+     * Gets non-null user name as per the Hadoop file system viewpoint.
+     * @return the user name, never null.
+     */
+    public static String getFsHadoopUser() throws IOException {
+        UserGroupInformation currUgi = UserGroupInformation.getCurrentUser();
+
+        String user = currUgi.getShortUserName();
+
+        user = IgfsUtils.fixUserName(user);
+
+        assert user != null;
+
+        return user;
+    }
+
+    /**
      * Public setter that can be used by direct users of FS or Visor.
      *
      * @param colocateFileWrites Whether all ongoing file writes should be colocated.
@@ -207,10 +210,6 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
             setConf(cfg);
 
-            String disableCacheName = String.format("fs.%s.impl.disable.cache", name.getScheme());
-
-            cacheEnabled = !cfg.getBoolean(disableCacheName, false);
-
             mgmt = cfg.getBoolean(IGFS_MANAGEMENT, false);
 
             if (!IGFS_SCHEME.equals(name.getScheme()))
@@ -221,7 +220,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
             uriAuthority = uri.getAuthority();
 
-            setUser(cfg.get(MRJobConfig.USER_NAME, DFLT_USER_NAME));
+            user = getFsHadoopUser();
 
             // Override sequential reads before prefetch if needed.
             seqReadsBeforePrefetch = parameter(cfg, PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, uriAuthority, 0);
@@ -244,7 +243,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
             String logDir = logDirFile != null ? logDirFile.getAbsolutePath() : null;
 
-            rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG);
+            rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG, user);
 
             // Handshake.
             IgfsHandshakeResponse handshake = rmtClient.handshake(logDir);
@@ -289,13 +288,12 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
                 String secUri = props.get(SECONDARY_FS_URI);
                 String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH);
-                String secUserName = props.get(SECONDARY_FS_USER_NAME);
 
                 try {
-                    SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath,
-                        secUserName);
+                    SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath);
+
+                    secondaryFs = secProvider.createFileSystem(user);
 
-                    secondaryFs = secProvider.createFileSystem();
                     secondaryUri = secProvider.uri();
                 }
                 catch (IOException e) {
@@ -306,6 +304,9 @@ public class IgniteHadoopFileSystem extends FileSystem {
                             "will have no effect): " + e.getMessage());
                 }
             }
+
+            // set working directory to the home directory of the current Fs user:
+            setWorkingDirectory(null);
         }
         finally {
             leaveBusy();
@@ -337,15 +338,13 @@ public class IgniteHadoopFileSystem extends FileSystem {
     @Override protected void finalize() throws Throwable {
         super.finalize();
 
-        close0();
+        close();
     }
 
     /** {@inheritDoc} */
     @Override public void close() throws IOException {
-        if (cacheEnabled && get(getUri(), getConf()) == this)
-            return;
-
-        close0();
+        if (closeGuard.compareAndSet(false, true))
+            close0();
     }
 
     /**
@@ -354,27 +353,25 @@ public class IgniteHadoopFileSystem extends FileSystem {
      * @throws IOException If failed.
      */
     private void close0() throws IOException {
-        if (closeGuard.compareAndSet(false, true)) {
-            if (LOG.isDebugEnabled())
-                LOG.debug("File system closed [uri=" + uri + ", endpoint=" + uriAuthority + ']');
+        if (LOG.isDebugEnabled())
+            LOG.debug("File system closed [uri=" + uri + ", endpoint=" + uriAuthority + ']');
 
-            if (rmtClient == null)
-                return;
+        if (rmtClient == null)
+            return;
 
-            super.close();
+        super.close();
 
-            rmtClient.close(false);
+        rmtClient.close(false);
 
-            if (clientLog.isLogEnabled())
-                clientLog.close();
+        if (clientLog.isLogEnabled())
+            clientLog.close();
 
-            if (secondaryFs != null)
-                U.closeQuiet(secondaryFs);
+        if (secondaryFs != null)
+            U.closeQuiet(secondaryFs);
 
-            // Reset initialized resources.
-            uri = null;
-            rmtClient = null;
-        }
+        // Reset initialized resources.
+        uri = null;
+        rmtClient = null;
     }
 
     /** {@inheritDoc} */
@@ -849,22 +846,11 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
     /** {@inheritDoc} */
     @Override public Path getHomeDirectory() {
-        Path path = new Path("/user/" + userName.get());
+        Path path = new Path("/user/" + user);
 
         return path.makeQualified(getUri(), null);
     }
 
-    /**
-     * Set user name and default working directory for current thread.
-     *
-     * @param userName User name.
-     */
-    public void setUser(String userName) {
-        this.userName.set(userName);
-
-        setWorkingDirectory(null);
-    }
-
     /** {@inheritDoc} */
     @Override public void setWorkingDirectory(Path newPath) {
         if (newPath == null) {
@@ -873,7 +859,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
             if (secondaryFs != null)
                 secondaryFs.setWorkingDirectory(toSecondary(homeDir));
 
-            workingDir.set(homeDir);
+            workingDir = homeDir;
         }
         else {
             Path fixedNewPath = fixRelativePart(newPath);
@@ -886,13 +872,13 @@ public class IgniteHadoopFileSystem extends FileSystem {
             if (secondaryFs != null)
                 secondaryFs.setWorkingDirectory(toSecondary(fixedNewPath));
 
-            workingDir.set(fixedNewPath);
+            workingDir = fixedNewPath;
         }
     }
 
     /** {@inheritDoc} */
     @Override public Path getWorkingDirectory() {
-        return workingDir.get();
+        return workingDir;
     }
 
     /** {@inheritDoc} */
@@ -1153,7 +1139,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
             return null;
 
         return path.isAbsolute() ? new IgfsPath(path.toUri().getPath()) :
-            new IgfsPath(convert(workingDir.get()), path.toUri().getPath());
+            new IgfsPath(convert(workingDir), path.toUri().getPath());
     }
 
     /**
@@ -1191,9 +1177,16 @@ public class IgniteHadoopFileSystem extends FileSystem {
      */
     @SuppressWarnings("deprecation")
     private FileStatus convert(IgfsFile file) {
-        return new FileStatus(file.length(), file.isDirectory(), getDefaultReplication(),
-            file.groupBlockSize(), file.modificationTime(), file.accessTime(), permission(file),
-            file.property(PROP_USER_NAME, DFLT_USER_NAME), file.property(PROP_GROUP_NAME, "users"),
+        return new FileStatus(
+            file.length(),
+            file.isDirectory(),
+            getDefaultReplication(),
+            file.groupBlockSize(),
+            file.modificationTime(),
+            file.accessTime(),
+            permission(file),
+            file.property(PROP_USER_NAME, user),
+            file.property(PROP_GROUP_NAME, "users"),
             convert(file.path())) {
             @Override public String toString() {
                 return "FileStatus [path=" + getPath() + ", isDir=" + isDir() + ", len=" + getLen() +
@@ -1247,4 +1240,12 @@ public class IgniteHadoopFileSystem extends FileSystem {
     @Override public String toString() {
         return S.toString(IgniteHadoopFileSystem.class, this);
     }
+
+    /**
+     * Returns the user name this File System is created on behalf of.
+     * @return the user name
+     */
+    public String user() {
+        return user;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
index 9cfb79b..8330143 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.permission.*;
-import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.util.*;
 import org.apache.ignite.*;
 import org.apache.ignite.igfs.*;
@@ -40,6 +39,7 @@ import java.util.*;
 import java.util.concurrent.atomic.*;
 
 import static org.apache.ignite.configuration.FileSystemConfiguration.*;
+import static org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.*;
 import static org.apache.ignite.igfs.IgfsMode.*;
 import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*;
 import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.*;
@@ -91,11 +91,14 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
     /** Grid remote client. */
     private HadoopIgfsWrapper rmtClient;
 
+    /** The name of the user this File System created on behalf of. */
+    private final String user;
+
     /** Working directory. */
     private IgfsPath workingDir;
 
     /** URI. */
-    private URI uri;
+    private final URI uri;
 
     /** Authority. */
     private String uriAuthority;
@@ -141,6 +144,8 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
 
         uri = name;
 
+        user = getFsHadoopUser();
+
         try {
             initialize(name, cfg);
         }
@@ -152,7 +157,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
             throw e;
         }
 
-        workingDir = new IgfsPath("/user/" + cfg.get(MRJobConfig.USER_NAME, DFLT_USER_NAME));
+        workingDir = new IgfsPath("/user/" + user);
     }
 
     /** {@inheritDoc} */
@@ -240,7 +245,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
 
             String logDir = logDirFile != null ? logDirFile.getAbsolutePath() : null;
 
-            rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG);
+            rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG, user);
 
             // Handshake.
             IgfsHandshakeResponse handshake = rmtClient.handshake(logDir);
@@ -284,13 +289,12 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
 
                 String secUri = props.get(SECONDARY_FS_URI);
                 String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH);
-                String secUserName = props.get(SECONDARY_FS_USER_NAME);
 
                 try {
-                    SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath,
-                        secUserName);
+                    SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath);
+
+                    secondaryFs = secProvider.createAbstractFileSystem(user);
 
-                    secondaryFs = secProvider.createAbstractFileSystem();
                     secondaryUri = secProvider.uri();
                 }
                 catch (IOException e) {
@@ -929,7 +933,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
             file.modificationTime(),
             file.accessTime(),
             permission(file),
-            file.property(PROP_USER_NAME, DFLT_USER_NAME),
+            file.property(PROP_USER_NAME, user),
             file.property(PROP_GROUP_NAME, "users"),
             convert(file.path())) {
             @Override public String toString() {
@@ -983,4 +987,12 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
     @Override public String toString() {
         return S.toString(IgniteHadoopFileSystem.class, this);
     }
-}
+
+    /**
+     * Returns the user name this File System is created on behalf of.
+     * @return the user name
+     */
+    public String user() {
+        return user;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
index d0a327e..2e855d0 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
@@ -89,7 +89,7 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable {
             if (jobCls0 == null) { // It is enough to have only one class loader with only Hadoop classes.
                 synchronized (HadoopDefaultJobInfo.class) {
                     if ((jobCls0 = jobCls) == null) {
-                        HadoopClassLoader ldr = new HadoopClassLoader(null, "hadoop-main");
+                        HadoopClassLoader ldr = new HadoopClassLoader(null, "hadoop-job");
 
                         jobCls = jobCls0 = ldr.loadClass(HadoopV2Job.class.getName());
                     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
index 00be422..68a9ef6 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
@@ -26,10 +26,16 @@ import org.apache.hadoop.mapreduce.JobPriority;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.*;
 import org.apache.ignite.*;
+import org.apache.ignite.hadoop.fs.v1.*;
+import org.apache.ignite.internal.processors.hadoop.fs.*;
 import org.apache.ignite.internal.processors.hadoop.v2.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
 
 import java.io.*;
+import java.net.*;
 import java.util.*;
 
 /**
@@ -57,6 +63,41 @@ public class HadoopUtils {
     /** Old reducer class attribute. */
     private static final String OLD_REDUCE_CLASS_ATTR = "mapred.reducer.class";
 
+    /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method. */
+    private static final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fileSysLazyMap = new HadoopLazyConcurrentMap<>(
+        new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() {
+            @Override public FileSystem createValue(FsCacheKey key) {
+                try {
+                    assert key != null;
+
+                    // Explicitly disable FileSystem caching:
+                    URI uri = key.uri();
+
+                    String scheme = uri.getScheme();
+
+                    // Copy the configuration to avoid altering the external object.
+                    Configuration cfg = new Configuration(key.configuration());
+
+                    String prop = HadoopUtils.disableFsCachePropertyName(scheme);
+
+                    cfg.setBoolean(prop, true);
+
+                    return FileSystem.get(uri, cfg, key.user());
+                }
+                catch (IOException | InterruptedException ioe) {
+                    throw new IgniteException(ioe);
+                }
+            }
+        }
+    );
+
+    /**
+     * Constructor.
+     */
+    private HadoopUtils() {
+        // No-op.
+    }
+
     /**
      * Wraps native split.
      *
@@ -126,11 +167,13 @@ public class HadoopUtils {
                 break;
 
             case PHASE_REDUCE:
-                assert status.totalReducerCnt() > 0;
-
                 setupProgress = 1;
                 mapProgress = 1;
-                reduceProgress = 1f - status.pendingReducerCnt() / (float)status.totalReducerCnt();
+
+                if (status.totalReducerCnt() > 0)
+                    reduceProgress = 1f - status.pendingReducerCnt() / (float)status.totalReducerCnt();
+                else
+                    reduceProgress = 1f;
 
                 break;
 
@@ -300,9 +343,242 @@ public class HadoopUtils {
     }
 
     /**
-     * Constructor.
+     * Creates {@link Configuration} in a correct class loader context to avoid caching
+     * of inappropriate class loader in the Configuration object.
+     * @return New instance of {@link Configuration}.
      */
-    private HadoopUtils() {
-        // No-op.
+    public static Configuration safeCreateConfiguration() {
+        final ClassLoader cl0 = Thread.currentThread().getContextClassLoader();
+
+        Thread.currentThread().setContextClassLoader(Configuration.class.getClassLoader());
+
+        try {
+            return new Configuration();
+        }
+        finally {
+            Thread.currentThread().setContextClassLoader(cl0);
+        }
+    }
+
+    /**
+     * Creates {@link JobConf} in a correct class loader context to avoid caching
+     * of inappropriate class loader in the Configuration object.
+     * @return New instance of {@link JobConf}.
+     */
+    public static JobConf safeCreateJobConf() {
+        final ClassLoader cl0 = Thread.currentThread().getContextClassLoader();
+
+        Thread.currentThread().setContextClassLoader(JobConf.class.getClassLoader());
+
+        try {
+            return new JobConf();
+        }
+        finally {
+            Thread.currentThread().setContextClassLoader(cl0);
+        }
+    }
+
+    /**
+     * Gets non-null user name as per the Hadoop viewpoint.
+     * @param cfg the Hadoop job configuration, may be null.
+     * @return the user name, never null.
+     */
+    private static String getMrHadoopUser(Configuration cfg) throws IOException {
+        String user = cfg.get(MRJobConfig.USER_NAME);
+
+        if (user == null)
+            user = IgniteHadoopFileSystem.getFsHadoopUser();
+
+        return user;
+    }
+
+    /**
+     * Common method to get the V1 file system in MapRed engine.
+     * It creates the filesystem for the user specified in the
+     * configuration with {@link MRJobConfig#USER_NAME} property.
+     * @param uri the file system uri.
+     * @param cfg the configuration.
+     * @return the file system
+     * @throws IOException
+     */
+    public static FileSystem fileSystemForMrUser(@Nullable URI uri, Configuration cfg, boolean doCacheFs) throws IOException {
+        final String usr = getMrHadoopUser(cfg);
+
+        assert usr != null;
+
+        if (uri == null)
+            uri = FileSystem.getDefaultUri(cfg);
+
+        final FileSystem fs;
+
+        if (doCacheFs) {
+            try {
+                fs = getWithCaching(uri, cfg, usr);
+            }
+            catch (IgniteException ie) {
+                throw new IOException(ie);
+            }
+        }
+        else {
+            try {
+                fs = FileSystem.get(uri, cfg, usr);
+            }
+            catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+
+                throw new IOException(ie);
+            }
+        }
+
+        assert fs != null;
+        assert !(fs instanceof IgniteHadoopFileSystem) || F.eq(usr, ((IgniteHadoopFileSystem)fs).user());
+
+        return fs;
+    }
+
+    /**
+     * Note that configuration is not a part of the key.
+     * It is used solely to initialize the first instance
+     * that is created for the key.
+     */
+    public static final class FsCacheKey {
+        /** */
+        private final URI uri;
+
+        /** */
+        private final String usr;
+
+        /** */
+        private final String equalityKey;
+
+        /** */
+        private final Configuration cfg;
+
+        /**
+         * Constructor
+         */
+        public FsCacheKey(URI uri, String usr, Configuration cfg) {
+            assert uri != null;
+            assert usr != null;
+            assert cfg != null;
+
+            this.uri = fixUri(uri, cfg);
+            this.usr = usr;
+            this.cfg = cfg;
+
+            this.equalityKey = createEqualityKey();
+        }
+
+        /**
+         * Creates String key used for equality and hashing.
+         */
+        private String createEqualityKey() {
+            GridStringBuilder sb = new GridStringBuilder("(").a(usr).a(")@");
+
+            if (uri.getScheme() != null)
+                sb.a(uri.getScheme().toLowerCase());
+
+            sb.a("://");
+
+            if (uri.getAuthority() != null)
+                sb.a(uri.getAuthority().toLowerCase());
+
+            return sb.toString();
+        }
+
+        /**
+         * The URI.
+         */
+        public URI uri() {
+            return uri;
+        }
+
+        /**
+         * The User.
+         */
+        public String user() {
+            return usr;
+        }
+
+        /**
+         * The Configuration.
+         */
+        public Configuration configuration() {
+            return cfg;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("SimplifiableIfStatement")
+        @Override public boolean equals(Object obj) {
+            if (obj == this)
+                return true;
+
+            if (obj == null || getClass() != obj.getClass())
+                return false;
+
+            return equalityKey.equals(((FsCacheKey)obj).equalityKey);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return equalityKey.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return equalityKey;
+        }
+    }
+
+    /**
+     * Gets FileSystem caching it in static Ignite cache. The cache is a singleton
+     * for each class loader.
+     *
+     * <p/>Note that the file systems in the cache are keyed by a triplet {scheme, authority, user}.
+     * The Configuration is not a part of the key. This means that for the given key file system is
+     * initialized only once with the Configuration passed in upon the file system creation.
+     *
+     * @param uri The file system URI.
+     * @param cfg The configuration.
+     * @param usr The user to create file system for.
+     * @return The file system: either created, or taken from the cache.
+     */
+    private static FileSystem getWithCaching(URI uri, Configuration cfg, String usr) {
+        FsCacheKey key = new FsCacheKey(uri, usr, cfg);
+
+        return fileSysLazyMap.getOrCreate(key);
+    }
+
+    /**
+     * Gets the property name to disable file system cache.
+     * @param scheme The file system URI scheme.
+     * @return The property name. If scheme is null,
+     * returns "fs.null.impl.disable.cache".
+     */
+    public static String disableFsCachePropertyName(@Nullable String scheme) {
+        return String.format("fs.%s.impl.disable.cache", scheme);
+    }
+
+    /**
+     * Takes Fs URI using logic similar to that used in FileSystem#get(1,2,3).
+     * @param uri0 The uri.
+     * @param cfg The cfg.
+     * @return Correct URI.
+     */
+    public static URI fixUri(URI uri0, Configuration cfg) {
+        if (uri0 == null)
+            return FileSystem.getDefaultUri(cfg);
+
+        String scheme = uri0.getScheme();
+        String authority = uri0.getAuthority();
+
+        if (authority == null) {
+            URI dfltUri = FileSystem.getDefaultUri(cfg);
+
+            if (scheme == null || (scheme.equals(dfltUri.getScheme()) && dfltUri.getAuthority() != null))
+                return dfltUri;
+        }
+
+        return uri0;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
index 27805f8..dd679de 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
@@ -19,26 +19,26 @@ package org.apache.ignite.internal.processors.hadoop;
 
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.security.*;
+import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
 import java.io.*;
 import java.net.*;
+import java.security.*;
 
 /**
  * Encapsulates logic of secondary filesystem creation.
  */
 public class SecondaryFileSystemProvider {
     /** Configuration of the secondary filesystem, never null. */
-    private final Configuration cfg = new Configuration();
+    private final Configuration cfg = HadoopUtils.safeCreateConfiguration();
 
     /** The secondary filesystem URI, never null. */
     private final URI uri;
 
-    /** Optional user name to log into secondary filesystem with. */
-    private @Nullable final String userName;
-
     /**
      * Creates new provider with given config parameters. The configuration URL is optional. The filesystem URI must be
      * specified either explicitly or in the configuration provided.
@@ -47,13 +47,10 @@ public class SecondaryFileSystemProvider {
      * property in the provided configuration.
      * @param secConfPath the secondary Fs path (file path on the local file system, optional).
      * See {@link IgniteUtils#resolveIgniteUrl(String)} on how the path resolved.
-     * @param userName User name.
      * @throws IOException
      */
     public SecondaryFileSystemProvider(final @Nullable String secUri,
-        final @Nullable String secConfPath, @Nullable String userName) throws IOException {
-        this.userName = userName;
-
+        final @Nullable String secConfPath) throws IOException {
         if (secConfPath != null) {
             URL url = U.resolveIgniteUrl(secConfPath);
 
@@ -79,7 +76,7 @@ public class SecondaryFileSystemProvider {
         }
 
         // Disable caching:
-        String prop = String.format("fs.%s.impl.disable.cache", uri.getScheme());
+        String prop = HadoopUtils.disableFsCachePropertyName(uri.getScheme());
 
         cfg.setBoolean(prop, true);
     }
@@ -88,20 +85,18 @@ public class SecondaryFileSystemProvider {
      * @return {@link org.apache.hadoop.fs.FileSystem}  instance for this secondary Fs.
      * @throws IOException
      */
-    public FileSystem createFileSystem() throws IOException {
+    public FileSystem createFileSystem(String userName) throws IOException {
+        userName = IgfsUtils.fixUserName(userName);
+
         final FileSystem fileSys;
 
-        if (userName == null)
-            fileSys = FileSystem.get(uri, cfg);
-        else {
-            try {
-                fileSys = FileSystem.get(uri, cfg, userName);
-            }
-            catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
+        try {
+           fileSys = FileSystem.get(uri, cfg, userName);
+        }
+        catch (InterruptedException e) {
+           Thread.currentThread().interrupt();
 
-                throw new IOException("Failed to create file system due to interrupt.", e);
-            }
+           throw new IOException("Failed to create file system due to interrupt.", e);
         }
 
         return fileSys;
@@ -109,10 +104,26 @@ public class SecondaryFileSystemProvider {
 
     /**
      * @return {@link org.apache.hadoop.fs.AbstractFileSystem} instance for this secondary Fs.
-     * @throws IOException
+     * @throws IOException in case of error.
      */
-    public AbstractFileSystem createAbstractFileSystem() throws IOException {
-        return AbstractFileSystem.get(uri, cfg);
+    public AbstractFileSystem createAbstractFileSystem(String userName) throws IOException {
+        userName = IgfsUtils.fixUserName(userName);
+
+        String ticketCachePath = cfg.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
+
+        UserGroupInformation ugi = UserGroupInformation.getBestUGI(ticketCachePath, userName);
+
+        try {
+            return ugi.doAs(new PrivilegedExceptionAction<AbstractFileSystem>() {
+                @Override public AbstractFileSystem run() throws IOException {
+                    return AbstractFileSystem.get(uri, cfg);
+                }
+            });
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+
+            throw new IOException("Failed to create file system due to interrupt.", ie);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java
deleted file mode 100644
index 509f443..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.hdfs.*;
-import org.apache.hadoop.mapreduce.*;
-
-import java.io.*;
-import java.net.*;
-
-import static org.apache.ignite.configuration.FileSystemConfiguration.*;
-
-/**
- * Wrapper of HDFS for support of separated working directory.
- */
-public class HadoopDistributedFileSystem extends DistributedFileSystem {
-    /** User name for each thread. */
-    private final ThreadLocal<String> userName = new ThreadLocal<String>() {
-        /** {@inheritDoc} */
-        @Override protected String initialValue() {
-            return DFLT_USER_NAME;
-        }
-    };
-
-    /** Working directory for each thread. */
-    private final ThreadLocal<Path> workingDir = new ThreadLocal<Path>() {
-        /** {@inheritDoc} */
-        @Override protected Path initialValue() {
-            return getHomeDirectory();
-        }
-    };
-
-    /** {@inheritDoc} */
-    @Override public void initialize(URI uri, Configuration conf) throws IOException {
-        super.initialize(uri, conf);
-
-        setUser(conf.get(MRJobConfig.USER_NAME, DFLT_USER_NAME));
-    }
-
-    /**
-     * Set user name and default working directory for current thread.
-     *
-     * @param userName User name.
-     */
-    public void setUser(String userName) {
-        this.userName.set(userName);
-
-        setWorkingDirectory(getHomeDirectory());
-    }
-
-    /** {@inheritDoc} */
-    @Override public Path getHomeDirectory() {
-        Path path = new Path("/user/" + userName.get());
-
-        return path.makeQualified(getUri(), null);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setWorkingDirectory(Path dir) {
-        Path fixedDir = fixRelativePart(dir);
-
-        String res = fixedDir.toUri().getPath();
-
-        if (!DFSUtil.isValidName(res))
-            throw new IllegalArgumentException("Invalid DFS directory name " + res);
-
-        workingDir.set(fixedDir);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Path getWorkingDirectory() {
-        return workingDir.get();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
index f3f51d4..d90bc28 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
@@ -19,8 +19,6 @@ package org.apache.ignite.internal.processors.hadoop.fs;
 
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.*;
-import org.apache.hadoop.hdfs.protocol.*;
-import org.apache.ignite.hadoop.fs.v1.*;
 
 /**
  * Utilities for configuring file systems to support the separate working directory per each thread.
@@ -30,19 +28,6 @@ public class HadoopFileSystemsUtils {
     public static final String LOC_FS_WORK_DIR_PROP = "fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".workDir";
 
     /**
-     * Set user name and default working directory for current thread if it's supported by file system.
-     *
-     * @param fs File system.
-     * @param userName User name.
-     */
-    public static void setUser(FileSystem fs, String userName) {
-        if (fs instanceof IgniteHadoopFileSystem)
-            ((IgniteHadoopFileSystem)fs).setUser(userName);
-        else if (fs instanceof HadoopDistributedFileSystem)
-            ((HadoopDistributedFileSystem)fs).setUser(userName);
-    }
-
-    /**
      * Setup wrappers of filesystems to support the separate working directory.
      *
      * @param cfg Config for setup.
@@ -51,7 +36,5 @@ public class HadoopFileSystemsUtils {
         cfg.set("fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", HadoopLocalFileSystemV1.class.getName());
         cfg.set("fs.AbstractFileSystem." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl",
                 HadoopLocalFileSystemV2.class.getName());
-
-        cfg.set("fs." + HdfsConstants.HDFS_URI_SCHEME + ".impl", HadoopDistributedFileSystem.class.getName());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
new file mode 100644
index 0000000..71b38c4
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.future.*;
+import org.jsr166.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.*;
+
+/**
+ * Maps values by keys.
+ * Values are created lazily using {@link ValueFactory}.
+ *
+ * Despite of the name, does not depend on any Hadoop classes.
+ */
+public class HadoopLazyConcurrentMap<K, V extends Closeable> {
+    /** The map storing the actual values. */
+    private final ConcurrentMap<K, ValueWrapper> map = new ConcurrentHashMap8<>();
+
+    /** The factory passed in by the client. Will be used for lazy value creation. */
+    private final ValueFactory<K, V> factory;
+
+    /** Lock used to close the objects. */
+    private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
+
+    /** Flag indicating that this map is closed and cleared. */
+    private boolean closed;
+
+    /**
+     * Constructor.
+     * @param factory the factory to create new values lazily.
+     */
+    public HadoopLazyConcurrentMap(ValueFactory<K, V> factory) {
+        this.factory = factory;
+    }
+
+    /**
+     * Gets cached or creates a new value of V.
+     * Never returns null.
+     * @param k the key to associate the value with.
+     * @return the cached or newly created value, never null.
+     * @throws IgniteException on error
+     */
+    public V getOrCreate(K k) {
+        ValueWrapper w = map.get(k);
+
+        if (w == null) {
+            closeLock.readLock().lock();
+
+            try {
+                if (closed)
+                    throw new IllegalStateException("Failed to create value for key [" + k
+                        + "]: the map is already closed.");
+
+                final ValueWrapper wNew = new ValueWrapper(k);
+
+                w = map.putIfAbsent(k, wNew);
+
+                if (w == null) {
+                    wNew.init();
+
+                    w = wNew;
+                }
+            }
+            finally {
+                closeLock.readLock().unlock();
+            }
+        }
+
+        try {
+            V v = w.getValue();
+
+            assert v != null;
+
+            return v;
+        }
+        catch (IgniteCheckedException ie) {
+            throw new IgniteException(ie);
+        }
+    }
+
+    /**
+     * Clears the map and closes all the values.
+     */
+    public void close() throws IgniteCheckedException {
+        closeLock.writeLock().lock();
+
+        try {
+            closed = true;
+
+            Exception err = null;
+
+            Set<K> keySet = map.keySet();
+
+            for (K key : keySet) {
+                V v = null;
+
+                try {
+                    v = map.get(key).getValue();
+                }
+                catch (IgniteCheckedException ignore) {
+                    // No-op.
+                }
+
+                if (v != null) {
+                    try {
+                        v.close();
+                    }
+                    catch (Exception err0) {
+                        if (err == null)
+                            err = err0;
+                    }
+                }
+            }
+
+            map.clear();
+
+            if (err != null)
+                throw new IgniteCheckedException(err);
+        }
+        finally {
+            closeLock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Helper class that drives the lazy value creation.
+     */
+    private class ValueWrapper {
+        /** Future. */
+        private final GridFutureAdapter<V> fut = new GridFutureAdapter<>();
+
+        /** the key */
+        private final K key;
+
+        /**
+         * Creates new wrapper.
+         */
+        private ValueWrapper(K key) {
+            this.key = key;
+        }
+
+        /**
+         * Initializes the value using the factory.
+         */
+        private void init() {
+            try {
+                final V v0 = factory.createValue(key);
+
+                if (v0 == null)
+                    throw new IgniteException("Failed to create non-null value. [key=" + key + ']');
+
+                fut.onDone(v0);
+            }
+            catch (Throwable e) {
+                fut.onDone(e);
+            }
+        }
+
+        /**
+         * Gets the available value or blocks until the value is initialized.
+         * @return the value, never null.
+         * @throws IgniteCheckedException on error.
+         */
+        V getValue() throws IgniteCheckedException {
+            return fut.get();
+        }
+    }
+
+    /**
+     * Interface representing the factory that creates map values.
+     * @param <K> the type of the key.
+     * @param <V> the type of the value.
+     */
+    public interface ValueFactory <K, V> {
+        /**
+         * Creates the new value. Should never return null.
+         *
+         * @param key the key to create value for
+         * @return the value.
+         * @throws IgniteException on failure.
+         */
+        public V createValue(K key);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
index 2f19226..b9c5113 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
@@ -85,4 +85,10 @@ public interface HadoopIgfsEx extends HadoopIgfs {
      * @throws IOException If failed.
      */
     public void flush(HadoopIgfsStreamDelegate delegate) throws IOException;
+
+    /**
+     * The user this Igfs instance works on behalf of.
+     * @return the user name.
+     */
+    public String user();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
index 44e531e..47ba0e8 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
@@ -23,6 +23,7 @@ import org.apache.ignite.igfs.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
 
 import java.io.*;
@@ -46,25 +47,35 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     /** Logger. */
     private final Log log;
 
+    /** The user this Igfs works on behalf of. */
+    private final String user;
+
     /**
      * Constructor.
      *
      * @param igfs Target IGFS.
      * @param log Log.
      */
-    public HadoopIgfsInProc(IgfsEx igfs, Log log) {
+    public HadoopIgfsInProc(IgfsEx igfs, Log log, String userName) throws IgniteCheckedException {
+        this.user = IgfsUtils.fixUserName(userName);
+
         this.igfs = igfs;
+
         this.log = log;
 
         bufSize = igfs.configuration().getBlockSize() * 2;
     }
 
     /** {@inheritDoc} */
-    @Override public IgfsHandshakeResponse handshake(String logDir) {
-        igfs.clientLogDirectory(logDir);
+    @Override public IgfsHandshakeResponse handshake(final String logDir) {
+        return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsHandshakeResponse>() {
+            @Override public IgfsHandshakeResponse apply() {
+                igfs.clientLogDirectory(logDir);
 
-        return new IgfsHandshakeResponse(igfs.name(), igfs.proxyPaths(), igfs.groupBlockSize(),
-            igfs.globalSampling());
+                return new IgfsHandshakeResponse(igfs.name(), igfs.proxyPaths(), igfs.groupBlockSize(),
+                    igfs.globalSampling());
+                }
+         });
     }
 
     /** {@inheritDoc} */
@@ -82,9 +93,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public IgfsFile info(IgfsPath path) throws IgniteCheckedException {
+    @Override public IgfsFile info(final IgfsPath path) throws IgniteCheckedException {
         try {
-            return igfs.info(path);
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsFile>() {
+                @Override public IgfsFile apply() {
+                    return igfs.info(path);
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -95,9 +110,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException {
+    @Override public IgfsFile update(final IgfsPath path, final Map<String, String> props) throws IgniteCheckedException {
         try {
-            return igfs.update(path, props);
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsFile>() {
+                @Override public IgfsFile apply() {
+                    return igfs.update(path, props);
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -108,9 +127,15 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException {
+    @Override public Boolean setTimes(final IgfsPath path, final long accessTime, final long modificationTime) throws IgniteCheckedException {
         try {
-            igfs.setTimes(path, accessTime, modificationTime);
+            IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() {
+                @Override public Void apply() {
+                    igfs.setTimes(path, accessTime, modificationTime);
+
+                    return null;
+                }
+            });
 
             return true;
         }
@@ -124,9 +149,15 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException {
+    @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) throws IgniteCheckedException {
         try {
-            igfs.rename(src, dest);
+            IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() {
+                @Override public Void apply() {
+                    igfs.rename(src, dest);
+
+                    return null;
+                }
+            });
 
             return true;
         }
@@ -139,9 +170,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException {
+    @Override public Boolean delete(final IgfsPath path, final boolean recursive) throws IgniteCheckedException {
         try {
-            return igfs.delete(path, recursive);
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<Boolean>() {
+                @Override public Boolean apply() {
+                    return igfs.delete(path, recursive);
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -154,18 +189,32 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     /** {@inheritDoc} */
     @Override public IgfsStatus fsStatus() throws IgniteCheckedException {
         try {
-            return igfs.globalSpace();
+            return IgfsUserContext.doAs(user, new Callable<IgfsStatus>() {
+                @Override public IgfsStatus call() throws IgniteCheckedException {
+                    return igfs.globalSpace();
+                }
+            });
         }
         catch (IllegalStateException e) {
             throw new HadoopIgfsCommunicationException("Failed to get file system status because Grid is " +
                 "stopping.");
         }
+        catch (IgniteCheckedException | RuntimeException | Error e) {
+            throw e;
+        }
+        catch (Exception e) {
+            throw new AssertionError("Must never go there.");
+        }
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException {
+    @Override public Collection<IgfsPath> listPaths(final IgfsPath path) throws IgniteCheckedException {
         try {
-            return igfs.listPaths(path);
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<Collection<IgfsPath>>() {
+                @Override public Collection<IgfsPath> apply() {
+                    return igfs.listPaths(path);
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -176,9 +225,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException {
+    @Override public Collection<IgfsFile> listFiles(final IgfsPath path) throws IgniteCheckedException {
         try {
-            return igfs.listFiles(path);
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<Collection<IgfsFile>>() {
+                @Override public Collection<IgfsFile> apply() {
+                    return igfs.listFiles(path);
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -189,9 +242,15 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException {
+    @Override public Boolean mkdirs(final IgfsPath path, final Map<String, String> props) throws IgniteCheckedException {
         try {
-            igfs.mkdirs(path, props);
+            IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() {
+                @Override public Void apply() {
+                    igfs.mkdirs(path, props);
+
+                    return null;
+                }
+            });
 
             return true;
         }
@@ -205,9 +264,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException {
+    @Override public IgfsPathSummary contentSummary(final IgfsPath path) throws IgniteCheckedException {
         try {
-            return igfs.summary(path);
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsPathSummary>() {
+                @Override public IgfsPathSummary apply() {
+                    return igfs.summary(path);
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -219,10 +282,14 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len)
+    @Override public Collection<IgfsBlockLocation> affinity(final IgfsPath path, final long start, final long len)
         throws IgniteCheckedException {
         try {
-            return igfs.affinity(path, start, len);
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<Collection<IgfsBlockLocation>>() {
+                @Override public Collection<IgfsBlockLocation> apply() {
+                    return igfs.affinity(path, start, len);
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -233,11 +300,15 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate open(IgfsPath path) throws IgniteCheckedException {
+    @Override public HadoopIgfsStreamDelegate open(final IgfsPath path) throws IgniteCheckedException {
         try {
-            IgfsInputStreamAdapter stream = igfs.open(path, bufSize);
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() {
+                @Override public HadoopIgfsStreamDelegate apply() {
+                    IgfsInputStreamAdapter stream = igfs.open(path, bufSize);
 
-            return new HadoopIgfsStreamDelegate(this, stream, stream.fileInfo().length());
+                    return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.fileInfo().length());
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -248,12 +319,16 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate open(IgfsPath path, int seqReadsBeforePrefetch)
+    @Override public HadoopIgfsStreamDelegate open(final IgfsPath path, final int seqReadsBeforePrefetch)
         throws IgniteCheckedException {
         try {
-            IgfsInputStreamAdapter stream = igfs.open(path, bufSize, seqReadsBeforePrefetch);
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() {
+                @Override public HadoopIgfsStreamDelegate apply() {
+                    IgfsInputStreamAdapter stream = igfs.open(path, bufSize, seqReadsBeforePrefetch);
 
-            return new HadoopIgfsStreamDelegate(this, stream, stream.fileInfo().length());
+                    return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.fileInfo().length());
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -264,13 +339,17 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate,
-        int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException {
+    @Override public HadoopIgfsStreamDelegate create(final IgfsPath path, final boolean overwrite, final boolean colocate,
+        final int replication, final long blockSize, final @Nullable Map<String, String> props) throws IgniteCheckedException {
         try {
-            IgfsOutputStream stream = igfs.create(path, bufSize, overwrite,
-                colocate ? igfs.nextAffinityKey() : null, replication, blockSize, props);
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() {
+                @Override public HadoopIgfsStreamDelegate apply() {
+                    IgfsOutputStream stream = igfs.create(path, bufSize, overwrite,
+                        colocate ? igfs.nextAffinityKey() : null, replication, blockSize, props);
 
-            return new HadoopIgfsStreamDelegate(this, stream);
+                    return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream);
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -281,12 +360,16 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate append(IgfsPath path, boolean create,
-        @Nullable Map<String, String> props) throws IgniteCheckedException {
+    @Override public HadoopIgfsStreamDelegate append(final IgfsPath path, final boolean create,
+        final @Nullable Map<String, String> props) throws IgniteCheckedException {
         try {
-            IgfsOutputStream stream = igfs.append(path, bufSize, create, props);
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() {
+                @Override public HadoopIgfsStreamDelegate apply() {
+                    IgfsOutputStream stream = igfs.append(path, bufSize, create, props);
 
-            return new HadoopIgfsStreamDelegate(this, stream);
+                    return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream);
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -407,4 +490,9 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
         if (lsnr0 != null && log.isDebugEnabled())
             log.debug("Removed stream event listener [delegate=" + delegate + ']');
     }
+
+    /** {@inheritDoc} */
+    @Override public String user() {
+        return user;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java
index 0264e7b..3561e95 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java
@@ -41,7 +41,7 @@ import java.util.concurrent.locks.*;
 @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
 public class HadoopIgfsIpcIo implements HadoopIgfsIo {
     /** Logger. */
-    private Log log;
+    private final Log log;
 
     /** Request futures map. */
     private ConcurrentMap<Long, HadoopIgfsFuture> reqMap =