You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/02 17:25:37 UTC
[1/6] incubator-ignite git commit: # ignite-sprint-5 potential NPE in
syncFuture listener
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-218 2488969b9 -> 9446023d4
# ignite-sprint-5 potential NPE in syncFuture listener
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/82cfb47c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/82cfb47c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/82cfb47c
Branch: refs/heads/ignite-218
Commit: 82cfb47cd652895d517dae50d670f13f7fcb9065
Parents: 45566fe
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jun 2 16:03:21 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jun 2 16:03:21 2015 +0300
----------------------------------------------------------------------
.../cache/distributed/dht/preloader/GridDhtPreloader.java | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/82cfb47c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index dc81993..1aef18c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
import org.apache.ignite.events.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.cluster.*;
@@ -227,12 +228,14 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
final long start = U.currentTimeMillis();
- if (cctx.config().getRebalanceDelay() >= 0) {
- U.log(log, "Starting rebalancing in " + cctx.config().getRebalanceMode() + " mode: " + cctx.name());
+ final CacheConfiguration cfg = cctx.config();
+
+ if (cfg.getRebalanceDelay() >= 0) {
+ U.log(log, "Starting rebalancing in " + cfg.getRebalanceMode() + " mode: " + cctx.name());
demandPool.syncFuture().listen(new CI1<Object>() {
@Override public void apply(Object t) {
- U.log(log, "Completed rebalancing in " + cctx.config().getRebalanceMode() + " mode " +
+ U.log(log, "Completed rebalancing in " + cfg.getRebalanceMode() + " mode " +
"[cache=" + cctx.name() + ", time=" + (U.currentTimeMillis() - start) + " ms]");
}
});
[6/6] incubator-ignite git commit: Merge branch 'ignite-sprint-5' of
https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-218
Posted by sb...@apache.org.
Merge branch 'ignite-sprint-5' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-218
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9446023d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9446023d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9446023d
Branch: refs/heads/ignite-218
Commit: 9446023d42f0d4263fe5ffc511aec0b2e09b4eeb
Parents: a8319c6 8719b73
Author: iveselovskiy <iv...@gridgain.com>
Authored: Tue Jun 2 18:22:19 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Tue Jun 2 18:22:19 2015 +0300
----------------------------------------------------------------------
assembly/dependencies-visor-console.xml | 6 +++---
.../ignite/cache/eviction/random/RandomEvictionPolicy.java | 3 ++-
.../cache/distributed/dht/preloader/GridDhtPreloader.java | 9 ++++++---
.../random/RandomEvictionPolicyCacheSizeSelfTest.java | 6 ++++++
4 files changed, 17 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
[5/6] incubator-ignite git commit: Merge remote-tracking branch
'origin/ignite-sprint-5' into ignite-sprint-5
Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/ignite-sprint-5' into ignite-sprint-5
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8719b735
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8719b735
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8719b735
Branch: refs/heads/ignite-218
Commit: 8719b735123b62e40acedecdce63b361a9f99e6b
Parents: 51ef9a9 4f7f58e
Author: avinogradov <av...@gridgain.com>
Authored: Tue Jun 2 17:33:41 2015 +0300
Committer: avinogradov <av...@gridgain.com>
Committed: Tue Jun 2 17:33:41 2015 +0300
----------------------------------------------------------------------
.../ignite/cache/eviction/random/RandomEvictionPolicy.java | 3 ++-
.../cache/distributed/dht/preloader/GridDhtPreloader.java | 9 ++++++---
.../random/RandomEvictionPolicyCacheSizeSelfTest.java | 6 ++++++
3 files changed, 14 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
[4/6] incubator-ignite git commit: GG-10390
Posted by sb...@apache.org.
GG-10390
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/51ef9a9b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/51ef9a9b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/51ef9a9b
Branch: refs/heads/ignite-218
Commit: 51ef9a9bdb13d6c20f0ef1a5a3a160abf4cea103
Parents: 45566fe
Author: avinogradov <av...@gridgain.com>
Authored: Tue Jun 2 17:33:20 2015 +0300
Committer: avinogradov <av...@gridgain.com>
Committed: Tue Jun 2 17:33:20 2015 +0300
----------------------------------------------------------------------
assembly/dependencies-visor-console.xml | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/51ef9a9b/assembly/dependencies-visor-console.xml
----------------------------------------------------------------------
diff --git a/assembly/dependencies-visor-console.xml b/assembly/dependencies-visor-console.xml
index 949943d..d773769 100644
--- a/assembly/dependencies-visor-console.xml
+++ b/assembly/dependencies-visor-console.xml
@@ -61,9 +61,6 @@
<include>scala-*.jar</include>
<include>spring-*.jar</include>
</includes>
- <excludes>
- <exclude>spring-jdbc-*.jar</exclude>
- </excludes>
</fileSet>
<fileSet>
@@ -93,6 +90,9 @@
<fileSet>
<directory>target/libs</directory>
<outputDirectory>/visor-common</outputDirectory>
+ <excludes>
+ <exclude>spring-jdbc-*.jar</exclude>
+ </excludes>
</fileSet>
<fileSet>
[3/6] incubator-ignite git commit: ignite-937 RandomEvictionPolicy
calculates cache size incorrectly
Posted by sb...@apache.org.
ignite-937 RandomEvictionPolicy calculates cache size incorrectly
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4f7f58ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4f7f58ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4f7f58ef
Branch: refs/heads/ignite-218
Commit: 4f7f58ef755da6bf2e26e60ab9d015644350abff
Parents: 82cfb47
Author: agura <ag...@gridgain.com>
Authored: Wed May 27 16:28:02 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Tue Jun 2 17:00:38 2015 +0300
----------------------------------------------------------------------
.../ignite/cache/eviction/random/RandomEvictionPolicy.java | 3 ++-
.../eviction/random/RandomEvictionPolicyCacheSizeSelfTest.java | 6 ++++++
2 files changed, 8 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f7f58ef/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/RandomEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/RandomEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/RandomEvictionPolicy.java
index c88b31d..0ac9197 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/RandomEvictionPolicy.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/RandomEvictionPolicy.java
@@ -18,6 +18,7 @@
package org.apache.ignite.cache.eviction.random;
import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
import org.apache.ignite.cache.eviction.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.util.typedef.internal.*;
@@ -87,7 +88,7 @@ public class RandomEvictionPolicy<K, V> implements EvictionPolicy<K, V>, RandomE
IgniteCache<K, V> cache = entry.unwrap(IgniteCache.class);
- int size = cache.size();
+ int size = cache.localSize(CachePeekMode.ONHEAP);
for (int i = max; i < size; i++) {
Cache.Entry<K, V> e = cache.randomEntry();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f7f58ef/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/random/RandomEvictionPolicyCacheSizeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/random/RandomEvictionPolicyCacheSizeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/random/RandomEvictionPolicyCacheSizeSelfTest.java
index e9e7228..2a2004e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/random/RandomEvictionPolicyCacheSizeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/random/RandomEvictionPolicyCacheSizeSelfTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.eviction.random;
import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
import org.apache.ignite.cache.eviction.random.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.testframework.junits.common.*;
@@ -50,6 +51,7 @@ public class RandomEvictionPolicyCacheSizeSelfTest extends GridCommonAbstractTes
IgniteConfiguration cfg = super.getConfiguration(gridName);
CacheConfiguration ccfg = defaultCacheConfiguration();
+ ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
ccfg.setNearConfiguration(null);
ccfg.setEvictionPolicy(new RandomEvictionPolicy(PLC_MAX_SIZE));
@@ -67,6 +69,10 @@ public class RandomEvictionPolicyCacheSizeSelfTest extends GridCommonAbstractTes
for (int i = 0; i < KEYS_CNT; i++)
cache.put(i, i);
+ // Ensure that all entries accessed without data races and cache size will correct
+ for (int i = 0; i < KEYS_CNT; i++)
+ cache.get(i);
+
assertEquals(PLC_MAX_SIZE * GRID_CNT, cache.size());
}
}
[2/6] incubator-ignite git commit: #[IGNITE-218]: fixed by review
results.
Posted by sb...@apache.org.
#[IGNITE-218]: fixed by review results.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a8319c6b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a8319c6b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a8319c6b
Branch: refs/heads/ignite-218
Commit: a8319c6b65eac290aff211eeb72c054c1b8d3091
Parents: 2488969
Author: iveselovskiy <iv...@gridgain.com>
Authored: Tue Jun 2 16:57:15 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Tue Jun 2 16:57:15 2015 +0300
----------------------------------------------------------------------
.../fs/IgniteHadoopFileSystemCounterWriter.java | 5 +-
.../hadoop/fs/v1/IgniteHadoopFileSystem.java | 20 ++++--
.../internal/processors/hadoop/HadoopUtils.java | 58 ++++++++++++++-
.../processors/hadoop/v2/HadoopV2Job.java | 1 -
.../hadoop/v2/HadoopV2JobResourceManager.java | 74 ++++----------------
.../hadoop/v2/HadoopV2TaskContext.java | 5 +-
6 files changed, 86 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a8319c6b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
index 7a6a269..7c47b3f 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
@@ -25,7 +25,6 @@ import org.apache.ignite.*;
import org.apache.ignite.internal.processors.hadoop.*;
import org.apache.ignite.internal.processors.hadoop.counter.*;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
import org.apache.ignite.internal.processors.igfs.*;
import org.apache.ignite.internal.util.typedef.*;
@@ -73,8 +72,8 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter
try {
hadoopCfg.set(MRJobConfig.USER_NAME, user);
- // TODO: Check if FileSystem can be closed here safely.
- FileSystem fs = HadoopV2JobResourceManager.fileSystemForMrUser(jobStatPath.toUri(), hadoopCfg);
+ // TODO: Check if FileSystem should be closed, see https://issues.apache.org/jira/browse/IGNITE-980
+ FileSystem fs = HadoopUtils.fileSystemForMrUser(jobStatPath.toUri(), hadoopCfg);
fs.mkdirs(jobStatPath);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a8319c6b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index 969a6b0..4ed3862 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -176,12 +176,9 @@ public class IgniteHadoopFileSystem extends FileSystem {
* @return the user name, never null.
*/
public static String getFsHadoopUser() throws IOException {
- String user = null;
-
UserGroupInformation currUgi = UserGroupInformation.getCurrentUser();
- if (currUgi != null)
- user = currUgi.getShortUserName();
+ String user = currUgi.getShortUserName();
user = IgfsUtils.fixUserName(user);
@@ -355,11 +352,22 @@ public class IgniteHadoopFileSystem extends FileSystem {
@Override public void close() throws IOException {
if (closeGuard.compareAndSet(false, true)) {
if (cacheEnabled) {
- // TODO: get must take in count user name.
- FileSystem cached = get(getUri(), getConf());
+ FileSystem cached;
+
+ try {
+ cached = get(getUri(), getConf(), user);
+ }
+ catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+
+ throw new IOException(ie);
+ }
if (cached == this)
return; // do not close cached instances.
+ else
+ // For some reason we created another Fs.
+ cached.close();
}
close0();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a8319c6b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
index 9085051..8e47abb 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
@@ -26,10 +26,14 @@ import org.apache.hadoop.mapreduce.JobPriority;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.*;
import org.apache.ignite.*;
+import org.apache.ignite.hadoop.fs.v1.*;
import org.apache.ignite.internal.processors.hadoop.v2.*;
+import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
import java.io.*;
+import java.net.*;
import java.util.*;
/**
@@ -58,6 +62,13 @@ public class HadoopUtils {
private static final String OLD_REDUCE_CLASS_ATTR = "mapred.reducer.class";
/**
+ * Constructor.
+ */
+ private HadoopUtils() {
+ // No-op.
+ }
+
+ /**
* Wraps native split.
*
* @param id Split ID.
@@ -322,9 +333,50 @@ public class HadoopUtils {
}
/**
- * Constructor.
+ * Gets non-null user name as per the Hadoop viewpoint.
+ * @param cfg the Hadoop job configuration, may be null.
+ * @return the user name, never null.
*/
- private HadoopUtils() {
- // No-op.
+ private static String getMrHadoopUser(Configuration cfg) throws IOException {
+ String user = cfg.get(MRJobConfig.USER_NAME);
+
+ if (user == null)
+ user = IgniteHadoopFileSystem.getFsHadoopUser();
+
+ return user;
+ }
+
+ /**
+ * Common method to get the V1 file system in MapRed engine.
+ * It creates the filesystem for the user specified in the
+ * configuration with {@link MRJobConfig#USER_NAME} property.
+ * @param uri the file system uri.
+ * @param cfg the configuration.
+ * @return the file system
+ * @throws IOException
+ */
+ public static FileSystem fileSystemForMrUser(@Nullable URI uri, Configuration cfg) throws IOException {
+ final String usr = getMrHadoopUser(cfg);
+
+ assert usr != null;
+
+ if (uri == null)
+ uri = FileSystem.getDefaultUri(cfg);
+
+ final FileSystem fs;
+
+ try {
+ fs = FileSystem.get(uri, cfg, usr);
+ }
+ catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+
+ throw new IOException(ie);
+ }
+
+ assert fs != null;
+ assert !(fs instanceof IgniteHadoopFileSystem) || F.eq(usr, ((IgniteHadoopFileSystem)fs).user());
+
+ return fs;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a8319c6b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
index 34ba053..fd5deaf 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
@@ -39,7 +39,6 @@ import java.util.Queue;
import java.util.concurrent.*;
import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
-import static org.apache.ignite.internal.processors.hadoop.v2.HadoopV2JobResourceManager.*;
/**
* Hadoop job implementation for v2 API.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a8319c6b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
index aaf7410..e9c0365 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.hadoop.v2;
-import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.Path;
@@ -25,7 +24,6 @@ import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.*;
import org.apache.ignite.*;
-import org.apache.ignite.hadoop.fs.v1.*;
import org.apache.ignite.internal.processors.hadoop.*;
import org.apache.ignite.internal.processors.hadoop.fs.*;
import org.apache.ignite.internal.util.typedef.*;
@@ -95,56 +93,6 @@ public class HadoopV2JobResourceManager {
}
/**
- * Gets non-null user name as per the Hadoop viewpoint.
- * @param cfg the Hadoop job configuration, may be null.
- * @return the user name, never null.
- */
- // TODO: Move to HadoopUtils.
- private static String getMrHadoopUser(Configuration cfg) throws IOException {
- String user = cfg.get(MRJobConfig.USER_NAME);
-
- if (user == null)
- user = IgniteHadoopFileSystem.getFsHadoopUser();
-
- return user;
- }
-
- /**
- * Common method to get the V1 file system in MapRed engine.
- * It creates the filesystem for the user specified in the
- * configuration with {@link MRJobConfig#USER_NAME} property.
- * @param uri the file system uri.
- * @param cfg the configuration.
- * @return the file system
- * @throws IOException
- */
- // TODO: Move to HadoopUtils.
- public static FileSystem fileSystemForMrUser(@Nullable URI uri, Configuration cfg) throws IOException {
- final String usr = getMrHadoopUser(cfg);
-
- assert usr != null;
-
- if (uri == null)
- uri = FileSystem.getDefaultUri(cfg);
-
- final FileSystem fs;
-
- try {
- fs = FileSystem.get(uri, cfg, usr);
- }
- catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
-
- throw new IOException(ie);
- }
-
- assert fs != null;
- assert !(fs instanceof IgniteHadoopFileSystem) || F.eq(usr, ((IgniteHadoopFileSystem)fs).user());
-
- return fs;
- }
-
- /**
* Prepare job resources. Resolve the classpath list and download it if needed.
*
* @param download {@code true} If need to download resources.
@@ -164,16 +112,18 @@ public class HadoopV2JobResourceManager {
stagingDir = new Path(new URI(mrDir));
if (download) {
- // TODO: Create new ticket to investigate possibility closing it right-away.
- FileSystem fs = fileSystemForMrUser(stagingDir.toUri(), cfg);
+ // TODO: Check if FileSystem should be closed, see https://issues.apache.org/jira/browse/IGNITE-980
+ FileSystem fs = HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), cfg);
if (!fs.exists(stagingDir))
- throw new IgniteCheckedException("Failed to find map-reduce submission directory (does not exist): " +
- stagingDir);
+ throw new IgniteCheckedException("Failed to find map-reduce submission " +
+ "directory (does not exist): " + stagingDir);
if (!FileUtil.copy(fs, stagingDir, jobLocDir, false, cfg))
- throw new IgniteCheckedException("Failed to copy job submission directory contents to local file system " +
- "[path=" + stagingDir + ", locDir=" + jobLocDir.getAbsolutePath() + ", jobId=" + jobId + ']');
+ throw new IgniteCheckedException("Failed to copy job submission directory "
+ + "contents to local file system "
+ + "[path=" + stagingDir + ", locDir=" + jobLocDir.getAbsolutePath()
+ + ", jobId=" + jobId + ']');
}
File jarJobFile = new File(jobLocDir, "job.jar");
@@ -258,8 +208,8 @@ public class HadoopV2JobResourceManager {
FileSystem dstFs = FileSystem.getLocal(cfg);
- // TODO: Create new ticket to investigate possibility closing it right-away.
- FileSystem srcFs = fileSystemForMrUser(srcPath.toUri(), cfg);
+ // TODO: Check if FileSystem should be closed, see https://issues.apache.org/jira/browse/IGNITE-980
+ FileSystem srcFs = HadoopUtils.fileSystemForMrUser(srcPath.toUri(), cfg);
if (extract) {
File archivesPath = new File(jobLocDir.getAbsolutePath(), ".cached-archives");
@@ -342,8 +292,8 @@ public class HadoopV2JobResourceManager {
public void cleanupStagingDirectory() {
try {
if (stagingDir != null)
- // TODO: Create new ticket to investigate possibility closing it right-away.
- fileSystemForMrUser(stagingDir.toUri(), ctx.getJobConf()).delete(stagingDir, true);
+ // TODO: Check if FileSystem should be closed, see https://issues.apache.org/jira/browse/IGNITE-980
+ HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), ctx.getJobConf()).delete(stagingDir, true);
}
catch (Exception e) {
log.error("Failed to remove job staging directory [path=" + stagingDir + ", jobId=" + jobId + ']' , e);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a8319c6b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
index 7033d22..7384421 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
@@ -423,7 +423,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
private Object readExternalSplit(HadoopExternalSplit split) throws IgniteCheckedException {
Path jobDir = new Path(jobConf().get(MRJobConfig.MAPREDUCE_JOB_DIR));
- try (FileSystem fs = HadoopV2JobResourceManager.fileSystemForMrUser(jobDir.toUri(), jobConf());
+ try (FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf());
FSDataInputStream in = fs.open(JobSubmissionFiles.getJobSplitFile(jobDir))) {
in.seek(split.offset());
@@ -466,7 +466,8 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
try {
UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
- // TODO: Ensure that UserGroupInformation.getCurrentUser() cannot return null, or add null-check.
+ assert currUser != null;
+
ugiUser = currUser.getShortUserName();
}
catch (IOException ioe) {