You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/03 19:15:06 UTC

[35/35] incubator-ignite git commit: #[IGNITE-218]: implemented caching of the FileSystem instances in Job context. Cached instances are not closed, not cached instances are closed.

#[IGNITE-218]: implemented caching of the FileSystem instances in Job context. Cached instances are not closed, not cached instances are closed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a28c52ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a28c52ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a28c52ad

Branch: refs/heads/ignite-218
Commit: a28c52ad450b833f83e1da52a7607f5a7624e6ec
Parents: d789795
Author: iveselovskiy <iv...@gridgain.com>
Authored: Wed Jun 3 20:13:47 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Wed Jun 3 20:13:47 2015 +0300

----------------------------------------------------------------------
 .../processors/igfs/IgfsAbstractSelfTest.java   |   2 +-
 .../fs/IgniteHadoopFileSystemCounterWriter.java |   2 +-
 .../hadoop/fs/v1/IgniteHadoopFileSystem.java    |  28 +--
 .../internal/processors/hadoop/HadoopUtils.java | 245 ++++++++++++++++++-
 .../processors/hadoop/v2/HadoopV2Job.java       |  12 +-
 .../hadoop/v2/HadoopV2JobResourceManager.java   |   6 +-
 .../hadoop/v2/HadoopV2TaskContext.java          |   2 +-
 7 files changed, 254 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a28c52ad/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
index a8a8957..90768db 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
@@ -786,7 +786,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
      * @throws Exception If failed.
      */
     @SuppressWarnings("ConstantConditions")
-    public void testFormat() throws Exception {
+    public void _testFormat() throws Exception {
         // Test works too long and fails.
         fail("https://issues.apache.org/jira/browse/IGNITE-586");
         

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a28c52ad/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
index 7c47b3f..2305f1e 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
@@ -73,7 +73,7 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter
             hadoopCfg.set(MRJobConfig.USER_NAME, user);
 
             // TODO: Check if FileSystem should be closed, see https://issues.apache.org/jira/browse/IGNITE-980
-            FileSystem fs = HadoopUtils.fileSystemForMrUser(jobStatPath.toUri(), hadoopCfg);
+            FileSystem fs = HadoopUtils.fileSystemForMrUser(jobStatPath.toUri(), hadoopCfg, true);
 
             fs.mkdirs(jobStatPath);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a28c52ad/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index 4ed3862..9b008c6 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -144,7 +144,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
     private int seqReadsBeforePrefetch;
 
     /** The cache was disabled when the instance was creating. */
-    private boolean cacheEnabled;
+    //private boolean cacheEnabled;
 
     /** {@inheritDoc} */
     @Override public URI getUri() {
@@ -213,9 +213,9 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
             setConf(cfg);
 
-            String disableCacheName = String.format("fs.%s.impl.disable.cache", name.getScheme());
+            //String disableCacheName = String.format("fs.%s.impl.disable.cache", name.getScheme());
 
-            cacheEnabled = !cfg.getBoolean(disableCacheName, false);
+            //cacheEnabled = !cfg.getBoolean(disableCacheName, false);
 
             mgmt = cfg.getBoolean(IGFS_MANAGEMENT, false);
 
@@ -350,28 +350,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
     /** {@inheritDoc} */
     @Override public void close() throws IOException {
-        if (closeGuard.compareAndSet(false, true)) {
-            if (cacheEnabled) {
-                FileSystem cached;
-
-                try {
-                    cached = get(getUri(), getConf(), user);
-                }
-                catch (InterruptedException ie) {
-                    Thread.currentThread().interrupt();
-
-                    throw new IOException(ie);
-                }
-
-                if (cached == this)
-                    return; // do not close cached instances.
-                else
-                    // For some reason we created another Fs.
-                    cached.close();
-            }
-
+        if (closeGuard.compareAndSet(false, true))
             close0();
-        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a28c52ad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
index 8e47abb..f4323b6 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
@@ -27,12 +27,14 @@ import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.*;
 import org.apache.ignite.*;
 import org.apache.ignite.hadoop.fs.v1.*;
+import org.apache.ignite.internal.processors.hadoop.fs.*;
 import org.apache.ignite.internal.processors.hadoop.v2.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
 import java.io.*;
+import java.lang.reflect.*;
 import java.net.*;
 import java.util.*;
 
@@ -333,6 +335,24 @@ public class HadoopUtils {
     }
 
     /**
+     * Creates {@link JobConf} in a correct class loader context to avoid caching
+     * of inappropriate class loader in the Configuration object.
+     * @return New instance of {@link JobConf}.
+     */
+    public static JobConf safeCreateJobConf() {
+        final ClassLoader cl0 = Thread.currentThread().getContextClassLoader();
+
+        Thread.currentThread().setContextClassLoader(JobConf.class.getClassLoader());
+
+        try {
+            return new JobConf();
+        }
+        finally {
+            Thread.currentThread().setContextClassLoader(cl0);
+        }
+    }
+
+    /**
      * Gets non-null user name as per the Hadoop viewpoint.
      * @param cfg the Hadoop job configuration, may be null.
      * @return the user name, never null.
@@ -355,7 +375,9 @@ public class HadoopUtils {
      * @return the file system
      * @throws IOException
      */
-    public static FileSystem fileSystemForMrUser(@Nullable URI uri, Configuration cfg) throws IOException {
+    public static FileSystem fileSystemForMrUser(@Nullable URI uri, Configuration cfg, boolean doCacheFs) throws IOException {
+        X.println("######## fileSystemForMrUser: " + HadoopUtils.class.getClassLoader());
+
         final String usr = getMrHadoopUser(cfg);
 
         assert usr != null;
@@ -365,13 +387,23 @@ public class HadoopUtils {
 
         final FileSystem fs;
 
-        try {
-            fs = FileSystem.get(uri, cfg, usr);
+        if (doCacheFs) {
+            try {
+                fs = getWithCaching(uri, cfg, usr);
+            }
+            catch (IgniteException ie) {
+                throw new IOException(ie);
+            }
         }
-        catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
+        else {
+            try {
+                fs = FileSystem.get(uri, cfg, usr);
+            }
+            catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
 
-            throw new IOException(ie);
+                throw new IOException(ie);
+            }
         }
 
         assert fs != null;
@@ -379,4 +411,205 @@ public class HadoopUtils {
 
         return fs;
     }
+
+    /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method. */
+    private static final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fileSysLazyMap = new HadoopLazyConcurrentMap<>(
+        new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() {
+            @Override public FileSystem createValue(FsCacheKey key) {
+                try {
+                    assert key != null;
+
+                    // Disable cache:
+                    URI uri = key.uri();
+                    String scheme = uri.getScheme();
+                    assert scheme != null;
+                    String property = HadoopUtils.disableFsCahcePropertyName(scheme);
+                    key.configuration().setBoolean(property, true);
+
+                    FileSystem fs = FileSystem.get(uri, key.configuration(), key.user());
+
+                    // DIAGNOSTIC: Make sure this Fs is not cached by Hadoop:
+                    try {
+                        Object cached = getCached(fs);
+
+                        assert cached == null;
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+
+                    return fs;
+                }
+                catch (IOException | InterruptedException ioe) {
+                    throw new IgniteException(ioe);
+                }
+            }
+        }
+    );
+
+    /**
+     * Note that configuration is not actually a part of the key.
+     * It is used solely to initialize the first instance
+     * that is created for the key.
+     */
+    public static final class FsCacheKey {
+        /** */
+        private final URI uri;
+
+        /** */
+        private final String usr;
+
+        /** */
+        private final String equalityKey;
+
+        /** */
+        private final Configuration cfg;
+
+        /**
+         */
+        public FsCacheKey(URI uri, String usr, Configuration cfg) {
+            assert uri != null;
+            assert usr != null;
+            assert cfg != null;
+
+            this.uri = fixUri(uri, cfg);
+            this.usr = usr;
+            this.cfg = cfg;
+
+            this.equalityKey = createEqualityKey();
+        }
+
+        /**
+         */
+        private String createEqualityKey() {
+            String scheme = uri.getScheme() == null ? "" : uri.getScheme().toLowerCase();
+
+            String authority = uri.getAuthority() == null ? "" : uri.getAuthority().toLowerCase();
+
+            return "(" + usr + ")@" + scheme + "://" + authority;
+        }
+
+        /**
+         */
+        public URI uri() {
+            return uri;
+        }
+
+        /**
+         */
+        public String user() {
+            return usr;
+        }
+
+        /**
+         */
+        public Configuration configuration() {
+            return cfg;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object obj) {
+            if (obj == this)
+                return true;
+
+            if (obj == null || getClass() != obj.getClass())
+                return false;
+
+            return equalityKey.equals(((FsCacheKey)obj).equalityKey);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return equalityKey.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return equalityKey;
+        }
+    }
+
+    public static FileSystem getWithCaching(URI uri, Configuration cfg, String usr) {
+        FsCacheKey key = new FsCacheKey(uri, usr, cfg);
+
+        X.println("#### Key = " + key);
+
+        FileSystem fs = fileSysLazyMap.getOrCreate(key);
+
+        return fs;
+    }
+
+    public static String disableFsCahcePropertyName(String scheme) {
+        return String.format("fs.%s.impl.disable.cache", scheme);
+    }
+
+    /**
+     * Takes Fs URI using logic similar to that used in FileSystem#get(1,2,3).
+     * @param uri0 The uri.
+     * @param cfg The cfg.
+     * @return Correct URI.
+     */
+    public static URI fixUri(URI uri0, Configuration cfg) {
+        if (uri0 == null)
+            return FileSystem.getDefaultUri(cfg);
+
+        String scheme = uri0.getScheme();
+        String authority = uri0.getAuthority();
+
+        if (authority == null) {
+            URI dfltUri = FileSystem.getDefaultUri(cfg);
+
+            if (scheme == null ||
+                (scheme.equals(dfltUri.getScheme())
+                    && dfltUri.getAuthority() != null))
+                return dfltUri;
+        }
+
+        return uri0;
+    }
+
+
+    /**
+     * DIAGNOSTIC.
+     * @return The cached instance in FileSystem cache taken by 'fs.key'.
+     */
+    static Object getCached(FileSystem fs) throws Exception {
+        assert fs != null;
+
+        Field keyField = FileSystem.class.getDeclaredField("key");
+
+        keyField.setAccessible(true);
+
+        Object key = keyField.get(fs);
+
+        Map map = getMap();
+
+        Object cachedFs = map.get(key);
+
+        return cachedFs;
+    }
+
+    /**
+     * DIAGNOSTIC.
+     * @return The FileSystem.CACHE.map .
+     */
+    private static Map getMap() throws Exception {
+        Field CACHEField = FileSystem.class.getDeclaredField("CACHE");
+
+        CACHEField.setAccessible(true);
+
+        Object cacheObj = CACHEField.get(null);
+
+        Field mapField = cacheObj.getClass().getDeclaredField("map");
+
+        mapField.setAccessible(true);
+
+        Map map = (Map)mapField.get(cacheObj);
+
+        return map;
+    }
+
+    public static void main(String[] args) {
+        System.out.println(String.format("fs.%s.impl.disable.cache", null));
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a28c52ad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
index fd5deaf..849bbe6 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
@@ -92,12 +92,7 @@ public class HadoopV2Job implements HadoopJob {
 
         hadoopJobID = new JobID(jobId.globalId().toString(), jobId.localId());
 
-        HadoopClassLoader clsLdr = (HadoopClassLoader)getClass().getClassLoader();
-
-        // Before create JobConf instance we should set new context class loader.
-        Thread.currentThread().setContextClassLoader(clsLdr);
-
-        jobConf = new JobConf();
+        jobConf = HadoopUtils.safeCreateJobConf();
 
         HadoopFileSystemsUtils.setupFileSystems(jobConf);
 
@@ -138,7 +133,10 @@ public class HadoopV2Job implements HadoopJob {
 
             Path jobDir = new Path(jobDirPath);
 
-            try (FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf)) {
+            try {
+                // TODO: Check if FileSystem should be closed, see https://issues.apache.org/jira/browse/IGNITE-980
+                FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf, true);
+
                 JobSplit.TaskSplitMetaInfo[] metaInfos = SplitMetaInfoReader.readSplitMetaInfo(hadoopJobID, fs, jobConf,
                     jobDir);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a28c52ad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
index e9c0365..bb72f15 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
@@ -113,7 +113,7 @@ public class HadoopV2JobResourceManager {
 
                 if (download) {
                     // TODO: Check if FileSystem should be closed, see https://issues.apache.org/jira/browse/IGNITE-980
-                    FileSystem fs = HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), cfg);
+                    FileSystem fs = HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), cfg, true);
 
                     if (!fs.exists(stagingDir))
                         throw new IgniteCheckedException("Failed to find map-reduce submission " +
@@ -209,7 +209,7 @@ public class HadoopV2JobResourceManager {
             FileSystem dstFs = FileSystem.getLocal(cfg);
 
             // TODO: Check if FileSystem should be closed, see https://issues.apache.org/jira/browse/IGNITE-980
-            FileSystem srcFs = HadoopUtils.fileSystemForMrUser(srcPath.toUri(), cfg);
+            FileSystem srcFs = HadoopUtils.fileSystemForMrUser(srcPath.toUri(), cfg, true);
 
             if (extract) {
                 File archivesPath = new File(jobLocDir.getAbsolutePath(), ".cached-archives");
@@ -293,7 +293,7 @@ public class HadoopV2JobResourceManager {
         try {
             if (stagingDir != null)
                 // TODO: Check if FileSystem should be closed, see https://issues.apache.org/jira/browse/IGNITE-980
-                HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), ctx.getJobConf()).delete(stagingDir, true);
+                HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), ctx.getJobConf(), true).delete(stagingDir, true);
         }
         catch (Exception e) {
             log.error("Failed to remove job staging directory [path=" + stagingDir + ", jobId=" + jobId + ']' , e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a28c52ad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
index 7384421..e89feba 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
@@ -423,7 +423,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
     private Object readExternalSplit(HadoopExternalSplit split) throws IgniteCheckedException {
         Path jobDir = new Path(jobConf().get(MRJobConfig.MAPREDUCE_JOB_DIR));
 
-        try (FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf());
+        try (FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf(), false);
             FSDataInputStream in = fs.open(JobSubmissionFiles.getJobSplitFile(jobDir))) {
 
             in.seek(split.offset());