You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/02 17:25:38 UTC

[2/6] incubator-ignite git commit: #[IGNITE-218]: fixed by review results.

#[IGNITE-218]: fixed by review results.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a8319c6b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a8319c6b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a8319c6b

Branch: refs/heads/ignite-218
Commit: a8319c6b65eac290aff211eeb72c054c1b8d3091
Parents: 2488969
Author: iveselovskiy <iv...@gridgain.com>
Authored: Tue Jun 2 16:57:15 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Tue Jun 2 16:57:15 2015 +0300

----------------------------------------------------------------------
 .../fs/IgniteHadoopFileSystemCounterWriter.java |  5 +-
 .../hadoop/fs/v1/IgniteHadoopFileSystem.java    | 20 ++++--
 .../internal/processors/hadoop/HadoopUtils.java | 58 ++++++++++++++-
 .../processors/hadoop/v2/HadoopV2Job.java       |  1 -
 .../hadoop/v2/HadoopV2JobResourceManager.java   | 74 ++++----------------
 .../hadoop/v2/HadoopV2TaskContext.java          |  5 +-
 6 files changed, 86 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a8319c6b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
index 7a6a269..7c47b3f 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
@@ -25,7 +25,6 @@ import org.apache.ignite.*;
 import org.apache.ignite.internal.processors.hadoop.*;
 import org.apache.ignite.internal.processors.hadoop.counter.*;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
 import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.typedef.*;
 
@@ -73,8 +72,8 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter
         try {
             hadoopCfg.set(MRJobConfig.USER_NAME, user);
 
-            // TODO: Check if FileSystem can be closed here safely.
-            FileSystem fs = HadoopV2JobResourceManager.fileSystemForMrUser(jobStatPath.toUri(), hadoopCfg);
+            // TODO: Check if FileSystem should be closed, see https://issues.apache.org/jira/browse/IGNITE-980
+            FileSystem fs = HadoopUtils.fileSystemForMrUser(jobStatPath.toUri(), hadoopCfg);
 
             fs.mkdirs(jobStatPath);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a8319c6b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index 969a6b0..4ed3862 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -176,12 +176,9 @@ public class IgniteHadoopFileSystem extends FileSystem {
      * @return the user name, never null.
      */
     public static String getFsHadoopUser() throws IOException {
-        String user = null;
-
         UserGroupInformation currUgi = UserGroupInformation.getCurrentUser();
 
-        if (currUgi != null)
-            user = currUgi.getShortUserName();
+        String user = currUgi.getShortUserName();
 
         user = IgfsUtils.fixUserName(user);
 
@@ -355,11 +352,22 @@ public class IgniteHadoopFileSystem extends FileSystem {
     @Override public void close() throws IOException {
         if (closeGuard.compareAndSet(false, true)) {
             if (cacheEnabled) {
-                // TODO: get must take in count user name.
-                FileSystem cached = get(getUri(), getConf());
+                FileSystem cached;
+
+                try {
+                    cached = get(getUri(), getConf(), user);
+                }
+                catch (InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+
+                    throw new IOException(ie);
+                }
 
                 if (cached == this)
                     return; // do not close cached instances.
+                else
+                    // For some reason we created another Fs.
+                    cached.close();
             }
 
             close0();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a8319c6b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
index 9085051..8e47abb 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
@@ -26,10 +26,14 @@ import org.apache.hadoop.mapreduce.JobPriority;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.*;
 import org.apache.ignite.*;
+import org.apache.ignite.hadoop.fs.v1.*;
 import org.apache.ignite.internal.processors.hadoop.v2.*;
+import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
 
 import java.io.*;
+import java.net.*;
 import java.util.*;
 
 /**
@@ -58,6 +62,13 @@ public class HadoopUtils {
     private static final String OLD_REDUCE_CLASS_ATTR = "mapred.reducer.class";
 
     /**
+     * Constructor.
+     */
+    private HadoopUtils() {
+        // No-op.
+    }
+
+    /**
      * Wraps native split.
      *
      * @param id Split ID.
@@ -322,9 +333,50 @@ public class HadoopUtils {
     }
 
     /**
-     * Constructor.
+     * Gets non-null user name as per the Hadoop viewpoint.
+     * @param cfg the Hadoop job configuration, may be null.
+     * @return the user name, never null.
      */
-    private HadoopUtils() {
-        // No-op.
+    private static String getMrHadoopUser(Configuration cfg) throws IOException {
+        String user = cfg.get(MRJobConfig.USER_NAME);
+
+        if (user == null)
+            user = IgniteHadoopFileSystem.getFsHadoopUser();
+
+        return user;
+    }
+
+    /**
+     * Common method to get the V1 file system in MapRed engine.
+     * It creates the filesystem for the user specified in the
+     * configuration with {@link MRJobConfig#USER_NAME} property.
+     * @param uri the file system uri.
+     * @param cfg the configuration.
+     * @return the file system
+     * @throws IOException
+     */
+    public static FileSystem fileSystemForMrUser(@Nullable URI uri, Configuration cfg) throws IOException {
+        final String usr = getMrHadoopUser(cfg);
+
+        assert usr != null;
+
+        if (uri == null)
+            uri = FileSystem.getDefaultUri(cfg);
+
+        final FileSystem fs;
+
+        try {
+            fs = FileSystem.get(uri, cfg, usr);
+        }
+        catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+
+            throw new IOException(ie);
+        }
+
+        assert fs != null;
+        assert !(fs instanceof IgniteHadoopFileSystem) || F.eq(usr, ((IgniteHadoopFileSystem)fs).user());
+
+        return fs;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a8319c6b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
index 34ba053..fd5deaf 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
@@ -39,7 +39,6 @@ import java.util.Queue;
 import java.util.concurrent.*;
 
 import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
-import static org.apache.ignite.internal.processors.hadoop.v2.HadoopV2JobResourceManager.*;
 
 /**
  * Hadoop job implementation for v2 API.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a8319c6b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
index aaf7410..e9c0365 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.hadoop.v2;
 
-import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.Path;
@@ -25,7 +24,6 @@ import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.util.*;
 import org.apache.ignite.*;
-import org.apache.ignite.hadoop.fs.v1.*;
 import org.apache.ignite.internal.processors.hadoop.*;
 import org.apache.ignite.internal.processors.hadoop.fs.*;
 import org.apache.ignite.internal.util.typedef.*;
@@ -95,56 +93,6 @@ public class HadoopV2JobResourceManager {
     }
 
     /**
-     * Gets non-null user name as per the Hadoop viewpoint.
-     * @param cfg the Hadoop job configuration, may be null.
-     * @return the user name, never null.
-     */
-    // TODO: Move to HadoopUtils.
-    private static String getMrHadoopUser(Configuration cfg) throws IOException {
-        String user = cfg.get(MRJobConfig.USER_NAME);
-
-        if (user == null)
-            user = IgniteHadoopFileSystem.getFsHadoopUser();
-
-        return user;
-    }
-
-    /**
-     * Common method to get the V1 file system in MapRed engine.
-     * It creates the filesystem for the user specified in the
-     * configuration with {@link MRJobConfig#USER_NAME} property.
-     * @param uri the file system uri.
-     * @param cfg the configuration.
-     * @return the file system
-     * @throws IOException
-     */
-    // TODO: Move to HadoopUtils.
-    public static FileSystem fileSystemForMrUser(@Nullable URI uri, Configuration cfg) throws IOException {
-        final String usr = getMrHadoopUser(cfg);
-
-        assert usr != null;
-
-        if (uri == null)
-            uri = FileSystem.getDefaultUri(cfg);
-
-        final FileSystem fs;
-
-        try {
-            fs = FileSystem.get(uri, cfg, usr);
-        }
-        catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-
-            throw new IOException(ie);
-        }
-
-        assert fs != null;
-        assert !(fs instanceof IgniteHadoopFileSystem) || F.eq(usr, ((IgniteHadoopFileSystem)fs).user());
-
-        return fs;
-    }
-
-    /**
      * Prepare job resources. Resolve the classpath list and download it if needed.
      *
      * @param download {@code true} If need to download resources.
@@ -164,16 +112,18 @@ public class HadoopV2JobResourceManager {
                 stagingDir = new Path(new URI(mrDir));
 
                 if (download) {
-                    // TODO: Create new ticket to investigate possibility closing it right-away.
-                    FileSystem fs = fileSystemForMrUser(stagingDir.toUri(), cfg);
+                    // TODO: Check if FileSystem should be closed, see https://issues.apache.org/jira/browse/IGNITE-980
+                    FileSystem fs = HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), cfg);
 
                     if (!fs.exists(stagingDir))
-                        throw new IgniteCheckedException("Failed to find map-reduce submission directory (does not exist): " +
-                            stagingDir);
+                        throw new IgniteCheckedException("Failed to find map-reduce submission " +
+                            "directory (does not exist): " + stagingDir);
 
                     if (!FileUtil.copy(fs, stagingDir, jobLocDir, false, cfg))
-                        throw new IgniteCheckedException("Failed to copy job submission directory contents to local file system " +
-                            "[path=" + stagingDir + ", locDir=" + jobLocDir.getAbsolutePath() + ", jobId=" + jobId + ']');
+                        throw new IgniteCheckedException("Failed to copy job submission directory "
+                            + "contents to local file system "
+                            + "[path=" + stagingDir + ", locDir=" + jobLocDir.getAbsolutePath()
+                            + ", jobId=" + jobId + ']');
                 }
 
                 File jarJobFile = new File(jobLocDir, "job.jar");
@@ -258,8 +208,8 @@ public class HadoopV2JobResourceManager {
 
             FileSystem dstFs = FileSystem.getLocal(cfg);
 
-            // TODO: Create new ticket to investigate possibility closing it right-away.
-            FileSystem srcFs = fileSystemForMrUser(srcPath.toUri(), cfg);
+            // TODO: Check if FileSystem should be closed, see https://issues.apache.org/jira/browse/IGNITE-980
+            FileSystem srcFs = HadoopUtils.fileSystemForMrUser(srcPath.toUri(), cfg);
 
             if (extract) {
                 File archivesPath = new File(jobLocDir.getAbsolutePath(), ".cached-archives");
@@ -342,8 +292,8 @@ public class HadoopV2JobResourceManager {
     public void cleanupStagingDirectory() {
         try {
             if (stagingDir != null)
-                // TODO: Create new ticket to investigate possibility closing it right-away.
-                fileSystemForMrUser(stagingDir.toUri(), ctx.getJobConf()).delete(stagingDir, true);
+                // TODO: Check if FileSystem should be closed, see https://issues.apache.org/jira/browse/IGNITE-980
+                HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), ctx.getJobConf()).delete(stagingDir, true);
         }
         catch (Exception e) {
             log.error("Failed to remove job staging directory [path=" + stagingDir + ", jobId=" + jobId + ']' , e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a8319c6b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
index 7033d22..7384421 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
@@ -423,7 +423,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
     private Object readExternalSplit(HadoopExternalSplit split) throws IgniteCheckedException {
         Path jobDir = new Path(jobConf().get(MRJobConfig.MAPREDUCE_JOB_DIR));
 
-        try (FileSystem fs = HadoopV2JobResourceManager.fileSystemForMrUser(jobDir.toUri(), jobConf());
+        try (FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf());
             FSDataInputStream in = fs.open(JobSubmissionFiles.getJobSplitFile(jobDir))) {
 
             in.seek(split.offset());
@@ -466,7 +466,8 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
         try {
             UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
 
-            // TODO: Ensure that UserGroupInformation.getCurrentUser() cannot return null, or add null-check.
+            assert currUser != null;
+
             ugiUser = currUser.getShortUserName();
         }
         catch (IOException ioe) {