You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/04 12:42:53 UTC
[15/16] incubator-ignite git commit: #[IGNITE-218]: cleanup
#[IGNITE-218]: cleanup
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5cbb7ddd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5cbb7ddd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5cbb7ddd
Branch: refs/heads/ignite-218
Commit: 5cbb7ddd8c985e07670e9281c36c1ba46a968a8f
Parents: a28c52a
Author: iveselovskiy <iv...@gridgain.com>
Authored: Thu Jun 4 13:42:00 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Thu Jun 4 13:42:00 2015 +0300
----------------------------------------------------------------------
.../processors/igfs/IgfsAbstractSelfTest.java | 2 +-
.../hadoop/fs/v1/IgniteHadoopFileSystem.java | 9 +-
.../internal/processors/hadoop/HadoopUtils.java | 146 +++++++------------
.../hadoop/SecondaryFileSystemProvider.java | 2 +-
.../hadoop/v2/HadoopV2JobResourceManager.java | 4 +-
5 files changed, 61 insertions(+), 102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5cbb7ddd/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
index 90768db..a8a8957 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
@@ -786,7 +786,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
* @throws Exception If failed.
*/
@SuppressWarnings("ConstantConditions")
- public void _testFormat() throws Exception {
+ public void testFormat() throws Exception {
// Test works too long and fails.
fail("https://issues.apache.org/jira/browse/IGNITE-586");
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5cbb7ddd/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index 9b008c6..9d94e5b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -143,9 +143,6 @@ public class IgniteHadoopFileSystem extends FileSystem {
/** Custom-provided sequential reads before prefetch. */
private int seqReadsBeforePrefetch;
- /** The cache was disabled when the instance was creating. */
- //private boolean cacheEnabled;
-
/** {@inheritDoc} */
@Override public URI getUri() {
if (uri == null)
@@ -213,10 +210,6 @@ public class IgniteHadoopFileSystem extends FileSystem {
setConf(cfg);
- //String disableCacheName = String.format("fs.%s.impl.disable.cache", name.getScheme());
-
- //cacheEnabled = !cfg.getBoolean(disableCacheName, false);
-
mgmt = cfg.getBoolean(IGFS_MANAGEMENT, false);
if (!IGFS_SCHEME.equals(name.getScheme()))
@@ -345,7 +338,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
@Override protected void finalize() throws Throwable {
super.finalize();
- close0();
+ close();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5cbb7ddd/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
index f4323b6..94f1647 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
@@ -34,7 +34,6 @@ import org.apache.ignite.internal.util.typedef.internal.*;
import org.jetbrains.annotations.*;
import java.io.*;
-import java.lang.reflect.*;
import java.net.*;
import java.util.*;
@@ -63,6 +62,32 @@ public class HadoopUtils {
/** Old reducer class attribute. */
private static final String OLD_REDUCE_CLASS_ATTR = "mapred.reducer.class";
+ /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method. */
+ // TODO: cleam up this cache upon Ignite node stop, see https://issues.apache.org/jira/browse/IGNITE-980 .
+ private static final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fileSysLazyMap = new HadoopLazyConcurrentMap<>(
+ new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() {
+ @Override public FileSystem createValue(FsCacheKey key) {
+ try {
+ assert key != null;
+
+ // Explicitly disable FileSystem caching:
+ URI uri = key.uri();
+
+ String scheme = uri.getScheme();
+
+ String prop = HadoopUtils.disableFsCachePropertyName(scheme);
+
+ key.configuration().setBoolean(prop, true);
+
+ return FileSystem.get(uri, key.configuration(), key.user());
+ }
+ catch (IOException | InterruptedException ioe) {
+ throw new IgniteException(ioe);
+ }
+ }
+ }
+ );
+
/**
* Constructor.
*/
@@ -376,8 +401,6 @@ public class HadoopUtils {
* @throws IOException
*/
public static FileSystem fileSystemForMrUser(@Nullable URI uri, Configuration cfg, boolean doCacheFs) throws IOException {
- X.println("######## fileSystemForMrUser: " + HadoopUtils.class.getClassLoader());
-
final String usr = getMrHadoopUser(cfg);
assert usr != null;
@@ -412,42 +435,8 @@ public class HadoopUtils {
return fs;
}
- /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method. */
- private static final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fileSysLazyMap = new HadoopLazyConcurrentMap<>(
- new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() {
- @Override public FileSystem createValue(FsCacheKey key) {
- try {
- assert key != null;
-
- // Disable cache:
- URI uri = key.uri();
- String scheme = uri.getScheme();
- assert scheme != null;
- String property = HadoopUtils.disableFsCahcePropertyName(scheme);
- key.configuration().setBoolean(property, true);
-
- FileSystem fs = FileSystem.get(uri, key.configuration(), key.user());
-
- // DIAGNOSTIC: Make sure this Fs is not cached by Hadoop:
- try {
- Object cached = getCached(fs);
-
- assert cached == null;
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- return fs;
- }
- catch (IOException | InterruptedException ioe) {
- throw new IgniteException(ioe);
- }
- }
- }
- );
-
/**
- * Note that configuration is not actually a part of the key.
+ * Note that configuration is not a part of the key.
* It is used solely to initialize the first instance
* that is created for the key.
*/
@@ -465,6 +454,7 @@ public class HadoopUtils {
private final Configuration cfg;
/**
+ * Constructor
*/
public FsCacheKey(URI uri, String usr, Configuration cfg) {
assert uri != null;
@@ -479,6 +469,7 @@ public class HadoopUtils {
}
/**
+ * Creates String key used for equality and hashing.
*/
private String createEqualityKey() {
String scheme = uri.getScheme() == null ? "" : uri.getScheme().toLowerCase();
@@ -489,24 +480,28 @@ public class HadoopUtils {
}
/**
+ * The URI.
*/
public URI uri() {
return uri;
}
/**
+ * The User.
*/
public String user() {
return usr;
}
/**
+ * The Configuration.
*/
public Configuration configuration() {
return cfg;
}
/** {@inheritDoc} */
+ @SuppressWarnings("SimplifiableIfStatement")
@Override public boolean equals(Object obj) {
if (obj == this)
return true;
@@ -528,17 +523,32 @@ public class HadoopUtils {
}
}
- public static FileSystem getWithCaching(URI uri, Configuration cfg, String usr) {
+ /**
+ * Gets FileSystem caching it in static Ignite cache. The cache is a singleton
+ * for each class loader.
+ *
+ * <p/>Note that the file systems in the cache are keyed by a triplet {scheme, authority, user}.
+ * The Configuration is not a part of the key. This means that for the given key file system is
+ * initialized only once with the Configuration passed in upon the file system creation.
+ *
+ * @param uri The file system URI.
+ * @param cfg The configuration.
+ * @param usr The user to create file system for.
+ * @return The file system: either created, or taken from the cache.
+ */
+ private static FileSystem getWithCaching(URI uri, Configuration cfg, String usr) {
FsCacheKey key = new FsCacheKey(uri, usr, cfg);
- X.println("#### Key = " + key);
-
- FileSystem fs = fileSysLazyMap.getOrCreate(key);
-
- return fs;
+ return fileSysLazyMap.getOrCreate(key);
}
- public static String disableFsCahcePropertyName(String scheme) {
+ /**
+ * Gets the property name to disable file system cache.
+ * @param scheme The file system URI scheme.
+ * @return The property name. If scheme is null,
+ * returns "fs.null.impl.disable.cache".
+ */
+ public static String disableFsCachePropertyName(@Nullable String scheme) {
return String.format("fs.%s.impl.disable.cache", scheme);
}
@@ -566,50 +576,4 @@ public class HadoopUtils {
return uri0;
}
-
-
- /**
- * DIAGNOSTIC.
- * @return The cached instance in FileSystem cache taken by 'fs.key'.
- */
- static Object getCached(FileSystem fs) throws Exception {
- assert fs != null;
-
- Field keyField = FileSystem.class.getDeclaredField("key");
-
- keyField.setAccessible(true);
-
- Object key = keyField.get(fs);
-
- Map map = getMap();
-
- Object cachedFs = map.get(key);
-
- return cachedFs;
- }
-
- /**
- * DIAGNOSTIC.
- * @return The FileSystem.CACHE.map .
- */
- private static Map getMap() throws Exception {
- Field CACHEField = FileSystem.class.getDeclaredField("CACHE");
-
- CACHEField.setAccessible(true);
-
- Object cacheObj = CACHEField.get(null);
-
- Field mapField = cacheObj.getClass().getDeclaredField("map");
-
- mapField.setAccessible(true);
-
- Map map = (Map)mapField.get(cacheObj);
-
- return map;
- }
-
- public static void main(String[] args) {
- System.out.println(String.format("fs.%s.impl.disable.cache", null));
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5cbb7ddd/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
index f9a68f1..dd679de 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
@@ -76,7 +76,7 @@ public class SecondaryFileSystemProvider {
}
// Disable caching:
- String prop = String.format("fs.%s.impl.disable.cache", uri.getScheme());
+ String prop = HadoopUtils.disableFsCachePropertyName(uri.getScheme());
cfg.setBoolean(prop, true);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5cbb7ddd/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
index bb72f15..6e2764b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
@@ -84,7 +84,9 @@ public class HadoopV2JobResourceManager {
try {
cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, dir.getAbsolutePath());
- if(!cfg.getBoolean("fs.file.impl.disable.cache", false))
+ String prop = HadoopUtils.disableFsCachePropertyName("file");
+
+ if (!cfg.getBoolean(prop, false))
FileSystem.getLocal(cfg).setWorkingDirectory(new Path(dir.getAbsolutePath()));
}
finally {