You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/04 12:42:53 UTC

[15/16] incubator-ignite git commit: #[IGNITE-218]: cleanup

#[IGNITE-218]: cleanup


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5cbb7ddd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5cbb7ddd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5cbb7ddd

Branch: refs/heads/ignite-218
Commit: 5cbb7ddd8c985e07670e9281c36c1ba46a968a8f
Parents: a28c52a
Author: iveselovskiy <iv...@gridgain.com>
Authored: Thu Jun 4 13:42:00 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Thu Jun 4 13:42:00 2015 +0300

----------------------------------------------------------------------
 .../processors/igfs/IgfsAbstractSelfTest.java   |   2 +-
 .../hadoop/fs/v1/IgniteHadoopFileSystem.java    |   9 +-
 .../internal/processors/hadoop/HadoopUtils.java | 146 +++++++------------
 .../hadoop/SecondaryFileSystemProvider.java     |   2 +-
 .../hadoop/v2/HadoopV2JobResourceManager.java   |   4 +-
 5 files changed, 61 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5cbb7ddd/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
index 90768db..a8a8957 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
@@ -786,7 +786,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
      * @throws Exception If failed.
      */
     @SuppressWarnings("ConstantConditions")
-    public void _testFormat() throws Exception {
+    public void testFormat() throws Exception {
         // Test works too long and fails.
         fail("https://issues.apache.org/jira/browse/IGNITE-586");
         

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5cbb7ddd/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index 9b008c6..9d94e5b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -143,9 +143,6 @@ public class IgniteHadoopFileSystem extends FileSystem {
     /** Custom-provided sequential reads before prefetch. */
     private int seqReadsBeforePrefetch;
 
-    /** The cache was disabled when the instance was creating. */
-    //private boolean cacheEnabled;
-
     /** {@inheritDoc} */
     @Override public URI getUri() {
         if (uri == null)
@@ -213,10 +210,6 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
             setConf(cfg);
 
-            //String disableCacheName = String.format("fs.%s.impl.disable.cache", name.getScheme());
-
-            //cacheEnabled = !cfg.getBoolean(disableCacheName, false);
-
             mgmt = cfg.getBoolean(IGFS_MANAGEMENT, false);
 
             if (!IGFS_SCHEME.equals(name.getScheme()))
@@ -345,7 +338,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
     @Override protected void finalize() throws Throwable {
         super.finalize();
 
-        close0();
+        close();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5cbb7ddd/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
index f4323b6..94f1647 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
@@ -34,7 +34,6 @@ import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
 import java.io.*;
-import java.lang.reflect.*;
 import java.net.*;
 import java.util.*;
 
@@ -63,6 +62,32 @@ public class HadoopUtils {
     /** Old reducer class attribute. */
     private static final String OLD_REDUCE_CLASS_ATTR = "mapred.reducer.class";
 
+    /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method. */
+    // TODO: cleam up this cache upon Ignite node stop, see https://issues.apache.org/jira/browse/IGNITE-980 .
+    private static final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fileSysLazyMap = new HadoopLazyConcurrentMap<>(
+        new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() {
+            @Override public FileSystem createValue(FsCacheKey key) {
+                try {
+                    assert key != null;
+
+                    // Explicitly disable FileSystem caching:
+                    URI uri = key.uri();
+
+                    String scheme = uri.getScheme();
+
+                    String prop = HadoopUtils.disableFsCachePropertyName(scheme);
+
+                    key.configuration().setBoolean(prop, true);
+
+                    return FileSystem.get(uri, key.configuration(), key.user());
+                }
+                catch (IOException | InterruptedException ioe) {
+                    throw new IgniteException(ioe);
+                }
+            }
+        }
+    );
+
     /**
      * Constructor.
      */
@@ -376,8 +401,6 @@ public class HadoopUtils {
      * @throws IOException
      */
     public static FileSystem fileSystemForMrUser(@Nullable URI uri, Configuration cfg, boolean doCacheFs) throws IOException {
-        X.println("######## fileSystemForMrUser: " + HadoopUtils.class.getClassLoader());
-
         final String usr = getMrHadoopUser(cfg);
 
         assert usr != null;
@@ -412,42 +435,8 @@ public class HadoopUtils {
         return fs;
     }
 
-    /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method. */
-    private static final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fileSysLazyMap = new HadoopLazyConcurrentMap<>(
-        new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() {
-            @Override public FileSystem createValue(FsCacheKey key) {
-                try {
-                    assert key != null;
-
-                    // Disable cache:
-                    URI uri = key.uri();
-                    String scheme = uri.getScheme();
-                    assert scheme != null;
-                    String property = HadoopUtils.disableFsCahcePropertyName(scheme);
-                    key.configuration().setBoolean(property, true);
-
-                    FileSystem fs = FileSystem.get(uri, key.configuration(), key.user());
-
-                    // DIAGNOSTIC: Make sure this Fs is not cached by Hadoop:
-                    try {
-                        Object cached = getCached(fs);
-
-                        assert cached == null;
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                    }
-
-                    return fs;
-                }
-                catch (IOException | InterruptedException ioe) {
-                    throw new IgniteException(ioe);
-                }
-            }
-        }
-    );
-
     /**
-     * Note that configuration is not actually a part of the key.
+     * Note that configuration is not a part of the key.
      * It is used solely to initialize the first instance
      * that is created for the key.
      */
@@ -465,6 +454,7 @@ public class HadoopUtils {
         private final Configuration cfg;
 
         /**
+         * Constructor
          */
         public FsCacheKey(URI uri, String usr, Configuration cfg) {
             assert uri != null;
@@ -479,6 +469,7 @@ public class HadoopUtils {
         }
 
         /**
+         * Creates String key used for equality and hashing.
          */
         private String createEqualityKey() {
             String scheme = uri.getScheme() == null ? "" : uri.getScheme().toLowerCase();
@@ -489,24 +480,28 @@ public class HadoopUtils {
         }
 
         /**
+         * The URI.
          */
         public URI uri() {
             return uri;
         }
 
         /**
+         * The User.
          */
         public String user() {
             return usr;
         }
 
         /**
+         * The Configuration.
          */
         public Configuration configuration() {
             return cfg;
         }
 
         /** {@inheritDoc} */
+        @SuppressWarnings("SimplifiableIfStatement")
         @Override public boolean equals(Object obj) {
             if (obj == this)
                 return true;
@@ -528,17 +523,32 @@ public class HadoopUtils {
         }
     }
 
-    public static FileSystem getWithCaching(URI uri, Configuration cfg, String usr) {
+    /**
+     * Gets FileSystem caching it in static Ignite cache. The cache is a singleton
+     * for each class loader.
+     *
+     * <p/>Note that the file systems in the cache are keyed by a triplet {scheme, authority, user}.
+     * The Configuration is not a part of the key. This means that for the given key file system is
+     * initialized only once with the Configuration passed in upon the file system creation.
+     *
+     * @param uri The file system URI.
+     * @param cfg The configuration.
+     * @param usr The user to create file system for.
+     * @return The file system: either created, or taken from the cache.
+     */
+    private static FileSystem getWithCaching(URI uri, Configuration cfg, String usr) {
         FsCacheKey key = new FsCacheKey(uri, usr, cfg);
 
-        X.println("#### Key = " + key);
-
-        FileSystem fs = fileSysLazyMap.getOrCreate(key);
-
-        return fs;
+        return fileSysLazyMap.getOrCreate(key);
     }
 
-    public static String disableFsCahcePropertyName(String scheme) {
+    /**
+     * Gets the property name to disable file system cache.
+     * @param scheme The file system URI scheme.
+     * @return The property name. If scheme is null,
+     * returns "fs.null.impl.disable.cache".
+     */
+    public static String disableFsCachePropertyName(@Nullable String scheme) {
         return String.format("fs.%s.impl.disable.cache", scheme);
     }
 
@@ -566,50 +576,4 @@ public class HadoopUtils {
 
         return uri0;
     }
-
-
-    /**
-     * DIAGNOSTIC.
-     * @return The cached instance in FileSystem cache taken by 'fs.key'.
-     */
-    static Object getCached(FileSystem fs) throws Exception {
-        assert fs != null;
-
-        Field keyField = FileSystem.class.getDeclaredField("key");
-
-        keyField.setAccessible(true);
-
-        Object key = keyField.get(fs);
-
-        Map map = getMap();
-
-        Object cachedFs = map.get(key);
-
-        return cachedFs;
-    }
-
-    /**
-     * DIAGNOSTIC.
-     * @return The FileSystem.CACHE.map .
-     */
-    private static Map getMap() throws Exception {
-        Field CACHEField = FileSystem.class.getDeclaredField("CACHE");
-
-        CACHEField.setAccessible(true);
-
-        Object cacheObj = CACHEField.get(null);
-
-        Field mapField = cacheObj.getClass().getDeclaredField("map");
-
-        mapField.setAccessible(true);
-
-        Map map = (Map)mapField.get(cacheObj);
-
-        return map;
-    }
-
-    public static void main(String[] args) {
-        System.out.println(String.format("fs.%s.impl.disable.cache", null));
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5cbb7ddd/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
index f9a68f1..dd679de 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
@@ -76,7 +76,7 @@ public class SecondaryFileSystemProvider {
         }
 
         // Disable caching:
-        String prop = String.format("fs.%s.impl.disable.cache", uri.getScheme());
+        String prop = HadoopUtils.disableFsCachePropertyName(uri.getScheme());
 
         cfg.setBoolean(prop, true);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5cbb7ddd/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
index bb72f15..6e2764b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
@@ -84,7 +84,9 @@ public class HadoopV2JobResourceManager {
         try {
             cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, dir.getAbsolutePath());
 
-            if(!cfg.getBoolean("fs.file.impl.disable.cache", false))
+            String prop = HadoopUtils.disableFsCachePropertyName("file");
+
+            if (!cfg.getBoolean(prop, false))
                 FileSystem.getLocal(cfg).setWorkingDirectory(new Path(dir.getAbsolutePath()));
         }
         finally {