You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/23 09:25:04 UTC

incubator-ignite git commit: # some refactoring + comments.

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-980 f27987549 -> 484de67c1


# some refactoring + comments.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/484de67c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/484de67c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/484de67c

Branch: refs/heads/ignite-980
Commit: 484de67c11bce70f61db49f737d7aa0e981fae06
Parents: f279875
Author: iveselovskiy <iv...@gridgain.com>
Authored: Tue Jun 23 10:24:38 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Tue Jun 23 10:24:38 2015 +0300

----------------------------------------------------------------------
 .../hadoop/fs/HadoopFileSystemCache.java        | 242 -------------------
 .../hadoop/fs/HadoopFileSystemCacheUtil.java    | 241 ++++++++++++++++++
 .../processors/hadoop/v2/HadoopV2Job.java       |   6 +-
 .../hadoop/v2/HadoopV2JobResourceManager.java   |  23 +-
 .../hadoop/v2/HadoopV2TaskContext.java          |  13 +-
 5 files changed, 258 insertions(+), 267 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/484de67c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCache.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCache.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCache.java
deleted file mode 100644
index 96b32db..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCache.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.ignite.*;
-import org.apache.ignite.hadoop.fs.v1.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.net.*;
-
-/**
- * Static caches of file systems used by Map-Reduce tasks and jobs.
- * This class
- */
-public class HadoopFileSystemCache {
-    /**
-     * Creates HadoopLazyConcurrentMap.
-     * @return a new HadoopLazyConcurrentMap.
-     */
-    public static HadoopLazyConcurrentMap<FsCacheKey, FileSystem> createHadoopLazyConcurrentMap() {
-        return new HadoopLazyConcurrentMap<>(
-            new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() {
-                @Override public FileSystem createValue(FsCacheKey key) {
-                    try {
-                        assert key != null;
-
-                        // Explicitly disable FileSystem caching:
-                        URI uri = key.uri();
-
-                        String scheme = uri.getScheme();
-
-                        // Copy the configuration to avoid altering the external object.
-                        Configuration cfg = new Configuration(key.configuration());
-
-                        String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(scheme);
-
-                        cfg.setBoolean(prop, true);
-
-                        return FileSystem.get(uri, cfg, key.user());
-                    }
-                    catch (IOException | InterruptedException ioe) {
-                        throw new IgniteException(ioe);
-                    }
-                }
-            }
-        );
-    }
-
-    /**
-     * Gets non-null user name as per the Hadoop viewpoint.
-     * @param cfg the Hadoop job configuration, may be null.
-     * @return the user name, never null.
-     */
-    private static String getMrHadoopUser(Configuration cfg) throws IOException {
-        String user = cfg.get(MRJobConfig.USER_NAME);
-
-        if (user == null)
-            user = IgniteHadoopFileSystem.getFsHadoopUser();
-
-        return user;
-    }
-
-    /**
-     * Common method to get the V1 file system in MapRed engine.
-     * It gets the filesystem for the user specified in the
-     * configuration with {@link MRJobConfig#USER_NAME} property.
-     * The file systems are created and cached in the given map upon first request.
-     *
-     * @param uri The file system uri.
-     * @param cfg The configuration.
-     * @param map The caching map.
-     * @return The file system.
-     * @throws IOException On error.
-     */
-    public static FileSystem fileSystemForMrUser(@Nullable URI uri, Configuration cfg,
-        HadoopLazyConcurrentMap<FsCacheKey, FileSystem> map)
-            throws IOException {
-        assert map != null;
-        assert cfg != null;
-
-        final String usr = getMrHadoopUser(cfg);
-
-        assert usr != null;
-
-        if (uri == null)
-            uri = FileSystem.getDefaultUri(cfg);
-
-        final FileSystem fs;
-
-        try {
-            final FsCacheKey key = new FsCacheKey(uri, usr, cfg);
-
-            fs = map.getOrCreate(key);
-        }
-        catch (IgniteException ie) {
-            throw new IOException(ie);
-        }
-
-        assert fs != null;
-        assert !(fs instanceof IgniteHadoopFileSystem) || F.eq(usr, ((IgniteHadoopFileSystem)fs).user());
-
-        return fs;
-    }
-
-    /**
-     * Takes Fs URI using logic similar to that used in FileSystem#get(1,2,3).
-     * @param uri0 The uri.
-     * @param cfg The cfg.
-     * @return Correct URI.
-     */
-    private static URI fixUri(URI uri0, Configuration cfg) {
-        if (uri0 == null)
-            return FileSystem.getDefaultUri(cfg);
-
-        String scheme = uri0.getScheme();
-        String authority = uri0.getAuthority();
-
-        if (authority == null) {
-            URI dfltUri = FileSystem.getDefaultUri(cfg);
-
-            if (scheme == null || (scheme.equals(dfltUri.getScheme()) && dfltUri.getAuthority() != null))
-                return dfltUri;
-        }
-
-        return uri0;
-    }
-
-    /**
-     * Note that configuration is not a part of the key.
-     * It is used solely to initialize the first instance
-     * that is created for the key.
-     */
-    public static final class FsCacheKey {
-        /** */
-        private final URI uri;
-
-        /** */
-        private final String usr;
-
-        /** */
-        private final String equalityKey;
-
-        /** */
-        private final Configuration cfg;
-
-        /**
-         * Constructor
-         */
-        public FsCacheKey(URI uri, String usr, Configuration cfg) {
-            assert uri != null;
-            assert usr != null;
-            assert cfg != null;
-
-            this.uri = fixUri(uri, cfg);
-            this.usr = usr;
-            this.cfg = cfg;
-
-            this.equalityKey = createEqualityKey();
-        }
-
-        /**
-         * Creates String key used for equality and hashing.
-         */
-        private String createEqualityKey() {
-            GridStringBuilder sb = new GridStringBuilder("(").a(usr).a(")@");
-
-            if (uri.getScheme() != null)
-                sb.a(uri.getScheme().toLowerCase());
-
-            sb.a("://");
-
-            if (uri.getAuthority() != null)
-                sb.a(uri.getAuthority().toLowerCase());
-
-            return sb.toString();
-        }
-
-        /**
-         * The URI.
-         */
-        public URI uri() {
-            return uri;
-        }
-
-        /**
-         * The User.
-         */
-        public String user() {
-            return usr;
-        }
-
-        /**
-         * The Configuration.
-         */
-        public Configuration configuration() {
-            return cfg;
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("SimplifiableIfStatement")
-        @Override public boolean equals(Object obj) {
-            if (obj == this)
-                return true;
-
-            if (obj == null || getClass() != obj.getClass())
-                return false;
-
-            return equalityKey.equals(((FsCacheKey)obj).equalityKey);
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            return equalityKey.hashCode();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return equalityKey;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/484de67c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtil.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtil.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtil.java
new file mode 100644
index 0000000..397b13e
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtil.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.ignite.*;
+import org.apache.ignite.hadoop.fs.v1.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
+
+/**
+ * File system cache utility methods used by Map-Reduce tasks and jobs.
+ */
+public class HadoopFileSystemCacheUtil {
+    /**
+     * A common static factory method. Creates new HadoopLazyConcurrentMap.
+     * @return a new HadoopLazyConcurrentMap.
+     */
+    public static HadoopLazyConcurrentMap<FsCacheKey, FileSystem> createHadoopLazyConcurrentMap() {
+        return new HadoopLazyConcurrentMap<>(
+            new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() {
+                @Override public FileSystem createValue(FsCacheKey key) {
+                    try {
+                        assert key != null;
+
+                        // Explicitly disable FileSystem caching:
+                        URI uri = key.uri();
+
+                        String scheme = uri.getScheme();
+
+                        // Copy the configuration to avoid altering the external object.
+                        Configuration cfg = new Configuration(key.configuration());
+
+                        String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(scheme);
+
+                        cfg.setBoolean(prop, true);
+
+                        return FileSystem.get(uri, cfg, key.user());
+                    }
+                    catch (IOException | InterruptedException ioe) {
+                        throw new IgniteException(ioe);
+                    }
+                }
+            }
+        );
+    }
+
+    /**
+     * Gets non-null user name as per the Hadoop viewpoint.
+     * @param cfg the Hadoop job configuration, may be null.
+     * @return the user name, never null.
+     */
+    private static String getMrHadoopUser(Configuration cfg) throws IOException {
+        String user = cfg.get(MRJobConfig.USER_NAME);
+
+        if (user == null)
+            user = IgniteHadoopFileSystem.getFsHadoopUser();
+
+        return user;
+    }
+
+    /**
+     * Common method to get the V1 file system in MapRed engine.
+     * It gets the filesystem for the user specified in the
+     * configuration with {@link MRJobConfig#USER_NAME} property.
+     * The file systems are created and cached in the given map upon first request.
+     *
+     * @param uri The file system uri.
+     * @param cfg The configuration.
+     * @param map The caching map.
+     * @return The file system.
+     * @throws IOException On error.
+     */
+    public static FileSystem fileSystemForMrUserWithCaching(@Nullable URI uri, Configuration cfg,
+        HadoopLazyConcurrentMap<FsCacheKey, FileSystem> map)
+            throws IOException {
+        assert map != null;
+        assert cfg != null;
+
+        final String usr = getMrHadoopUser(cfg);
+
+        assert usr != null;
+
+        if (uri == null)
+            uri = FileSystem.getDefaultUri(cfg);
+
+        final FileSystem fs;
+
+        try {
+            final FsCacheKey key = new FsCacheKey(uri, usr, cfg);
+
+            fs = map.getOrCreate(key);
+        }
+        catch (IgniteException ie) {
+            throw new IOException(ie);
+        }
+
+        assert fs != null;
+        assert !(fs instanceof IgniteHadoopFileSystem) || F.eq(usr, ((IgniteHadoopFileSystem)fs).user());
+
+        return fs;
+    }
+
+    /**
+     * Takes Fs URI using logic similar to that used in FileSystem#get(1,2,3).
+     * @param uri0 The uri.
+     * @param cfg The cfg.
+     * @return Correct URI.
+     */
+    private static URI fixUri(URI uri0, Configuration cfg) {
+        if (uri0 == null)
+            return FileSystem.getDefaultUri(cfg);
+
+        String scheme = uri0.getScheme();
+        String authority = uri0.getAuthority();
+
+        if (authority == null) {
+            URI dfltUri = FileSystem.getDefaultUri(cfg);
+
+            if (scheme == null || (scheme.equals(dfltUri.getScheme()) && dfltUri.getAuthority() != null))
+                return dfltUri;
+        }
+
+        return uri0;
+    }
+
+    /**
+     * Note that configuration is not a part of the key.
+     * It is used solely to initialize the first instance
+     * that is created for the key.
+     */
+    public static final class FsCacheKey {
+        /** */
+        private final URI uri;
+
+        /** */
+        private final String usr;
+
+        /** */
+        private final String equalityKey;
+
+        /** */
+        private final Configuration cfg;
+
+        /**
+         * Constructor
+         */
+        public FsCacheKey(URI uri, String usr, Configuration cfg) {
+            assert uri != null;
+            assert usr != null;
+            assert cfg != null;
+
+            this.uri = fixUri(uri, cfg);
+            this.usr = usr;
+            this.cfg = cfg;
+
+            this.equalityKey = createEqualityKey();
+        }
+
+        /**
+         * Creates String key used for equality and hashing.
+         */
+        private String createEqualityKey() {
+            GridStringBuilder sb = new GridStringBuilder("(").a(usr).a(")@");
+
+            if (uri.getScheme() != null)
+                sb.a(uri.getScheme().toLowerCase());
+
+            sb.a("://");
+
+            if (uri.getAuthority() != null)
+                sb.a(uri.getAuthority().toLowerCase());
+
+            return sb.toString();
+        }
+
+        /**
+         * The URI.
+         */
+        public URI uri() {
+            return uri;
+        }
+
+        /**
+         * The User.
+         */
+        public String user() {
+            return usr;
+        }
+
+        /**
+         * The Configuration.
+         */
+        public Configuration configuration() {
+            return cfg;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("SimplifiableIfStatement")
+        @Override public boolean equals(Object obj) {
+            if (obj == this)
+                return true;
+
+            if (obj == null || getClass() != obj.getClass())
+                return false;
+
+            return equalityKey.equals(((FsCacheKey)obj).equalityKey);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return equalityKey.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return equalityKey;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/484de67c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
index 319640d..d1765a8 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
@@ -43,7 +43,7 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
 import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCache.*;
+import static org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtil.*;
 
 /**
  * Hadoop job implementation for v2 API.
@@ -84,7 +84,7 @@ public class HadoopV2Job implements HadoopJob {
     private volatile byte[] jobConfData;
 
     /** File system cache map. */
-    private final HadoopLazyConcurrentMap<HadoopFileSystemCache.FsCacheKey, FileSystem> fsMap
+    private final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fsMap
         = createHadoopLazyConcurrentMap();
 
     /** Disposal guard. */
@@ -399,6 +399,6 @@ public class HadoopV2Job implements HadoopJob {
      * @throws IOException On error.
      */
     public FileSystem fileSystem(@Nullable URI uri, Configuration cfg) throws IOException {
-        return HadoopFileSystemCache.fileSystemForMrUser(uri, cfg, fsMap);
+        return fileSystemForMrUserWithCaching(uri, cfg, fsMap);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/484de67c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
index 55a31c6..97ad179 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
@@ -61,8 +61,8 @@ class HadoopV2JobResourceManager {
     /** Staging directory to delivery job jar and config to the work nodes. */
     private Path stagingDir;
 
-    /** TODO */
-    private final HadoopV2Job fsProvider;
+    /** The job. */
+    private final HadoopV2Job job;
 
     /**
      * Creates new instance.
@@ -70,11 +70,11 @@ class HadoopV2JobResourceManager {
      * @param ctx Hadoop job context.
      * @param log Logger.
      */
-    public HadoopV2JobResourceManager(HadoopJobId jobId, JobContextImpl ctx, IgniteLogger log, HadoopV2Job fsProvider) {
+    public HadoopV2JobResourceManager(HadoopJobId jobId, JobContextImpl ctx, IgniteLogger log, HadoopV2Job job) {
         this.jobId = jobId;
         this.ctx = ctx;
         this.log = log.getLogger(HadoopV2JobResourceManager.class);
-        this.fsProvider = fsProvider;
+        this.job = job;
     }
 
     /**
@@ -119,10 +119,7 @@ class HadoopV2JobResourceManager {
                 stagingDir = new Path(new URI(mrDir));
 
                 if (download) {
-//                    assert ((HadoopClassLoader)HadoopFileSystemCache.class.getClassLoader()).name()
-//                        .equals(HadoopClassLoader.nameForJob(locNodeId));
-
-                    FileSystem fs = fsProvider.fileSystem(stagingDir.toUri(), cfg);
+                    FileSystem fs = job.fileSystem(stagingDir.toUri(), cfg);
 
                     if (!fs.exists(stagingDir))
                         throw new IgniteCheckedException("Failed to find map-reduce submission " +
@@ -217,10 +214,7 @@ class HadoopV2JobResourceManager {
 
             FileSystem dstFs = FileSystem.getLocal(cfg);
 
-//            assert ((HadoopClassLoader)HadoopFileSystemCache.class.getClassLoader()).name()
-//                .equals(HadoopClassLoader.nameForJob(locNodeId));
-
-            FileSystem srcFs = fsProvider.fileSystem(srcPath.toUri(), cfg);
+            FileSystem srcFs = job.fileSystem(srcPath.toUri(), cfg);
 
             if (extract) {
                 File archivesPath = new File(jobLocDir.getAbsolutePath(), ".cached-archives");
@@ -303,10 +297,7 @@ class HadoopV2JobResourceManager {
     public void cleanupStagingDirectory() {
         try {
             if (stagingDir != null) {
-//                assert ((HadoopClassLoader)HadoopFileSystemCache.class.getClassLoader()).name()
-//                    .equals(HadoopClassLoader.nameForJob(locNodeId));
-
-                FileSystem fs = fsProvider.fileSystem(stagingDir.toUri(), ctx.getJobConf());
+                FileSystem fs = job.fileSystem(stagingDir.toUri(), ctx.getJobConf());
 
                 fs.delete(stagingDir, true);
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/484de67c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
index f007038..90b0e43 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
@@ -45,7 +45,7 @@ import java.security.*;
 import java.util.*;
 import java.util.concurrent.*;
 
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCache.*;
+import static org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtil.*;
 import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*;
 import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
 
@@ -56,13 +56,15 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
     /** */
     private static final boolean COMBINE_KEY_GROUPING_SUPPORTED;
 
-    /** Lazy per-user file system cache used by Hadoop tasks. */
-    private static final HadoopLazyConcurrentMap<HadoopFileSystemCache.FsCacheKey, FileSystem> fsMap
+    /** Lazy per-user file system cache used by the Hadoop task. */
+    private static final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fsMap
         = createHadoopLazyConcurrentMap();
 
     /**
      * This method is called with reflection upon Job finish with class loader of each task.
      * This will clean up all the Fs created for specific task.
+     * Each class loader sees uses its own instance of <code>fsMap<code/> since the class loaders
+     * are different.
      *
      * @throws IgniteCheckedException On error.
      */
@@ -444,12 +446,11 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
         try {
             // This assertion uses .startsWith() instead of .equals() because task class loaders may
             // be reused between tasks of the same job.
-            assert ((HadoopClassLoader)HadoopFileSystemCache.class.getClassLoader()).name()
+            assert ((HadoopClassLoader)getClass().getClassLoader()).name()
                 .startsWith(HadoopClassLoader.nameForTask(taskInfo(), true));
 
-            // Task class loader.
             // We also cache Fs there, all them will be cleared explicitly upon the Job end.
-            fs = HadoopFileSystemCache.fileSystemForMrUser(jobDir.toUri(), jobConf(), fsMap);
+            fs = fileSystemForMrUserWithCaching(jobDir.toUri(), jobConf(), fsMap);
         }
         catch (IOException e) {
             throw new IgniteCheckedException(e);