You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/02 17:25:38 UTC
[2/6] incubator-ignite git commit: #[IGNITE-218]: fixed by review
results.
#[IGNITE-218]: fixed by review results.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a8319c6b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a8319c6b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a8319c6b
Branch: refs/heads/ignite-218
Commit: a8319c6b65eac290aff211eeb72c054c1b8d3091
Parents: 2488969
Author: iveselovskiy <iv...@gridgain.com>
Authored: Tue Jun 2 16:57:15 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Tue Jun 2 16:57:15 2015 +0300
----------------------------------------------------------------------
.../fs/IgniteHadoopFileSystemCounterWriter.java | 5 +-
.../hadoop/fs/v1/IgniteHadoopFileSystem.java | 20 ++++--
.../internal/processors/hadoop/HadoopUtils.java | 58 ++++++++++++++-
.../processors/hadoop/v2/HadoopV2Job.java | 1 -
.../hadoop/v2/HadoopV2JobResourceManager.java | 74 ++++----------------
.../hadoop/v2/HadoopV2TaskContext.java | 5 +-
6 files changed, 86 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a8319c6b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
index 7a6a269..7c47b3f 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
@@ -25,7 +25,6 @@ import org.apache.ignite.*;
import org.apache.ignite.internal.processors.hadoop.*;
import org.apache.ignite.internal.processors.hadoop.counter.*;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
import org.apache.ignite.internal.processors.igfs.*;
import org.apache.ignite.internal.util.typedef.*;
@@ -73,8 +72,8 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter
try {
hadoopCfg.set(MRJobConfig.USER_NAME, user);
- // TODO: Check if FileSystem can be closed here safely.
- FileSystem fs = HadoopV2JobResourceManager.fileSystemForMrUser(jobStatPath.toUri(), hadoopCfg);
+ // TODO: Check if FileSystem should be closed, see https://issues.apache.org/jira/browse/IGNITE-980
+ FileSystem fs = HadoopUtils.fileSystemForMrUser(jobStatPath.toUri(), hadoopCfg);
fs.mkdirs(jobStatPath);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a8319c6b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index 969a6b0..4ed3862 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -176,12 +176,9 @@ public class IgniteHadoopFileSystem extends FileSystem {
* @return the user name, never null.
*/
public static String getFsHadoopUser() throws IOException {
- String user = null;
-
UserGroupInformation currUgi = UserGroupInformation.getCurrentUser();
- if (currUgi != null)
- user = currUgi.getShortUserName();
+ String user = currUgi.getShortUserName();
user = IgfsUtils.fixUserName(user);
@@ -355,11 +352,22 @@ public class IgniteHadoopFileSystem extends FileSystem {
@Override public void close() throws IOException {
if (closeGuard.compareAndSet(false, true)) {
if (cacheEnabled) {
- // TODO: get must take in count user name.
- FileSystem cached = get(getUri(), getConf());
+ FileSystem cached;
+
+ try {
+ cached = get(getUri(), getConf(), user);
+ }
+ catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+
+ throw new IOException(ie);
+ }
if (cached == this)
return; // do not close cached instances.
+ else
+ // For some reason we created another Fs.
+ cached.close();
}
close0();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a8319c6b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
index 9085051..8e47abb 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
@@ -26,10 +26,14 @@ import org.apache.hadoop.mapreduce.JobPriority;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.*;
import org.apache.ignite.*;
+import org.apache.ignite.hadoop.fs.v1.*;
import org.apache.ignite.internal.processors.hadoop.v2.*;
+import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
import java.io.*;
+import java.net.*;
import java.util.*;
/**
@@ -58,6 +62,13 @@ public class HadoopUtils {
private static final String OLD_REDUCE_CLASS_ATTR = "mapred.reducer.class";
/**
+ * Constructor.
+ */
+ private HadoopUtils() {
+ // No-op.
+ }
+
+ /**
* Wraps native split.
*
* @param id Split ID.
@@ -322,9 +333,50 @@ public class HadoopUtils {
}
/**
- * Constructor.
+ * Gets non-null user name as per the Hadoop viewpoint.
+ * @param cfg the Hadoop job configuration, may be null.
+ * @return the user name, never null.
*/
- private HadoopUtils() {
- // No-op.
+ private static String getMrHadoopUser(Configuration cfg) throws IOException {
+ String user = cfg.get(MRJobConfig.USER_NAME);
+
+ if (user == null)
+ user = IgniteHadoopFileSystem.getFsHadoopUser();
+
+ return user;
+ }
+
+ /**
+ * Common method to get the V1 file system in MapRed engine.
+ * It creates the filesystem for the user specified in the
+ * configuration with {@link MRJobConfig#USER_NAME} property.
+ * @param uri the file system uri.
+ * @param cfg the configuration.
+ * @return the file system
+ * @throws IOException
+ */
+ public static FileSystem fileSystemForMrUser(@Nullable URI uri, Configuration cfg) throws IOException {
+ final String usr = getMrHadoopUser(cfg);
+
+ assert usr != null;
+
+ if (uri == null)
+ uri = FileSystem.getDefaultUri(cfg);
+
+ final FileSystem fs;
+
+ try {
+ fs = FileSystem.get(uri, cfg, usr);
+ }
+ catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+
+ throw new IOException(ie);
+ }
+
+ assert fs != null;
+ assert !(fs instanceof IgniteHadoopFileSystem) || F.eq(usr, ((IgniteHadoopFileSystem)fs).user());
+
+ return fs;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a8319c6b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
index 34ba053..fd5deaf 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
@@ -39,7 +39,6 @@ import java.util.Queue;
import java.util.concurrent.*;
import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
-import static org.apache.ignite.internal.processors.hadoop.v2.HadoopV2JobResourceManager.*;
/**
* Hadoop job implementation for v2 API.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a8319c6b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
index aaf7410..e9c0365 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.hadoop.v2;
-import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.Path;
@@ -25,7 +24,6 @@ import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.*;
import org.apache.ignite.*;
-import org.apache.ignite.hadoop.fs.v1.*;
import org.apache.ignite.internal.processors.hadoop.*;
import org.apache.ignite.internal.processors.hadoop.fs.*;
import org.apache.ignite.internal.util.typedef.*;
@@ -95,56 +93,6 @@ public class HadoopV2JobResourceManager {
}
/**
- * Gets non-null user name as per the Hadoop viewpoint.
- * @param cfg the Hadoop job configuration, may be null.
- * @return the user name, never null.
- */
- // TODO: Move to HadoopUtils.
- private static String getMrHadoopUser(Configuration cfg) throws IOException {
- String user = cfg.get(MRJobConfig.USER_NAME);
-
- if (user == null)
- user = IgniteHadoopFileSystem.getFsHadoopUser();
-
- return user;
- }
-
- /**
- * Common method to get the V1 file system in MapRed engine.
- * It creates the filesystem for the user specified in the
- * configuration with {@link MRJobConfig#USER_NAME} property.
- * @param uri the file system uri.
- * @param cfg the configuration.
- * @return the file system
- * @throws IOException
- */
- // TODO: Move to HadoopUtils.
- public static FileSystem fileSystemForMrUser(@Nullable URI uri, Configuration cfg) throws IOException {
- final String usr = getMrHadoopUser(cfg);
-
- assert usr != null;
-
- if (uri == null)
- uri = FileSystem.getDefaultUri(cfg);
-
- final FileSystem fs;
-
- try {
- fs = FileSystem.get(uri, cfg, usr);
- }
- catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
-
- throw new IOException(ie);
- }
-
- assert fs != null;
- assert !(fs instanceof IgniteHadoopFileSystem) || F.eq(usr, ((IgniteHadoopFileSystem)fs).user());
-
- return fs;
- }
-
- /**
* Prepare job resources. Resolve the classpath list and download it if needed.
*
* @param download {@code true} If need to download resources.
@@ -164,16 +112,18 @@ public class HadoopV2JobResourceManager {
stagingDir = new Path(new URI(mrDir));
if (download) {
- // TODO: Create new ticket to investigate possibility closing it right-away.
- FileSystem fs = fileSystemForMrUser(stagingDir.toUri(), cfg);
+ // TODO: Check if FileSystem should be closed, see https://issues.apache.org/jira/browse/IGNITE-980
+ FileSystem fs = HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), cfg);
if (!fs.exists(stagingDir))
- throw new IgniteCheckedException("Failed to find map-reduce submission directory (does not exist): " +
- stagingDir);
+ throw new IgniteCheckedException("Failed to find map-reduce submission " +
+ "directory (does not exist): " + stagingDir);
if (!FileUtil.copy(fs, stagingDir, jobLocDir, false, cfg))
- throw new IgniteCheckedException("Failed to copy job submission directory contents to local file system " +
- "[path=" + stagingDir + ", locDir=" + jobLocDir.getAbsolutePath() + ", jobId=" + jobId + ']');
+ throw new IgniteCheckedException("Failed to copy job submission directory "
+ + "contents to local file system "
+ + "[path=" + stagingDir + ", locDir=" + jobLocDir.getAbsolutePath()
+ + ", jobId=" + jobId + ']');
}
File jarJobFile = new File(jobLocDir, "job.jar");
@@ -258,8 +208,8 @@ public class HadoopV2JobResourceManager {
FileSystem dstFs = FileSystem.getLocal(cfg);
- // TODO: Create new ticket to investigate possibility closing it right-away.
- FileSystem srcFs = fileSystemForMrUser(srcPath.toUri(), cfg);
+ // TODO: Check if FileSystem should be closed, see https://issues.apache.org/jira/browse/IGNITE-980
+ FileSystem srcFs = HadoopUtils.fileSystemForMrUser(srcPath.toUri(), cfg);
if (extract) {
File archivesPath = new File(jobLocDir.getAbsolutePath(), ".cached-archives");
@@ -342,8 +292,8 @@ public class HadoopV2JobResourceManager {
public void cleanupStagingDirectory() {
try {
if (stagingDir != null)
- // TODO: Create new ticket to investigate possibility closing it right-away.
- fileSystemForMrUser(stagingDir.toUri(), ctx.getJobConf()).delete(stagingDir, true);
+ // TODO: Check if FileSystem should be closed, see https://issues.apache.org/jira/browse/IGNITE-980
+ HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), ctx.getJobConf()).delete(stagingDir, true);
}
catch (Exception e) {
log.error("Failed to remove job staging directory [path=" + stagingDir + ", jobId=" + jobId + ']' , e);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a8319c6b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
index 7033d22..7384421 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
@@ -423,7 +423,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
private Object readExternalSplit(HadoopExternalSplit split) throws IgniteCheckedException {
Path jobDir = new Path(jobConf().get(MRJobConfig.MAPREDUCE_JOB_DIR));
- try (FileSystem fs = HadoopV2JobResourceManager.fileSystemForMrUser(jobDir.toUri(), jobConf());
+ try (FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf());
FSDataInputStream in = fs.open(JobSubmissionFiles.getJobSplitFile(jobDir))) {
in.seek(split.offset());
@@ -466,7 +466,8 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
try {
UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
- // TODO: Ensure that UserGroupInformation.getCurrentUser() cannot return null, or add null-check.
+ assert currUser != null;
+
ugiUser = currUser.getShortUserName();
}
catch (IOException ioe) {