You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/06/02 14:48:02 UTC

[3/3] incubator-ignite git commit: # IGNITE-218: Review.

# IGNITE-218: Review.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2488969b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2488969b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2488969b

Branch: refs/heads/ignite-218
Commit: 2488969b9cdd6e49214380bf0e259c7499d7665e
Parents: a6fc5b8
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Jun 2 15:48:17 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Jun 2 15:48:17 2015 +0300

----------------------------------------------------------------------
 .../fs/IgniteHadoopFileSystemCounterWriter.java | 24 +++---
 .../hadoop/fs/v1/IgniteHadoopFileSystem.java    |  1 +
 .../hadoop/v2/HadoopV2JobResourceManager.java   | 88 ++++++++++----------
 .../hadoop/v2/HadoopV2TaskContext.java          |  2 +
 4 files changed, 58 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2488969b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
index bbafcd7..7a6a269 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
@@ -73,19 +73,19 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter
         try {
             hadoopCfg.set(MRJobConfig.USER_NAME, user);
 
-            try (FileSystem fs = HadoopV2JobResourceManager.fileSystemForMrUser(jobStatPath.toUri(), hadoopCfg)) {
-                fs.mkdirs(jobStatPath);
-
-                try (PrintStream out = new PrintStream(fs.create(
-                        new Path(jobStatPath, PERFORMANCE_COUNTER_FILE_NAME)))) {
-                    for (T2<String, Long> evt : perfCntr.evts()) {
-                        out.print(evt.get1());
-                        out.print(':');
-                        out.println(evt.get2().toString());
-                    }
-
-                    out.flush();
+            // TODO: Check if FileSystem can be closed here safely.
+            FileSystem fs = HadoopV2JobResourceManager.fileSystemForMrUser(jobStatPath.toUri(), hadoopCfg);
+
+            fs.mkdirs(jobStatPath);
+
+            try (PrintStream out = new PrintStream(fs.create(new Path(jobStatPath, PERFORMANCE_COUNTER_FILE_NAME)))) {
+                for (T2<String, Long> evt : perfCntr.evts()) {
+                    out.print(evt.get1());
+                    out.print(':');
+                    out.println(evt.get2().toString());
                 }
+
+                out.flush();
             }
         }
         catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2488969b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index 4f703d2..969a6b0 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -355,6 +355,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
     @Override public void close() throws IOException {
         if (closeGuard.compareAndSet(false, true)) {
             if (cacheEnabled) {
+                // TODO: get must take in count user name.
                 FileSystem cached = get(getUri(), getConf());
 
                 if (cached == this)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2488969b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
index 75602dc..aaf7410 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
@@ -99,6 +99,7 @@ public class HadoopV2JobResourceManager {
      * @param cfg the Hadoop job configuration, may be null.
      * @return the user name, never null.
      */
+    // TODO: Move to HadoopUtils.
     private static String getMrHadoopUser(Configuration cfg) throws IOException {
         String user = cfg.get(MRJobConfig.USER_NAME);
 
@@ -117,10 +118,11 @@ public class HadoopV2JobResourceManager {
      * @return the file system
      * @throws IOException
      */
+    // TODO: Move to HadoopUtils.
     public static FileSystem fileSystemForMrUser(@Nullable URI uri, Configuration cfg) throws IOException {
-        final String user = getMrHadoopUser(cfg);
+        final String usr = getMrHadoopUser(cfg);
 
-        assert user != null;
+        assert usr != null;
 
         if (uri == null)
             uri = FileSystem.getDefaultUri(cfg);
@@ -128,17 +130,16 @@ public class HadoopV2JobResourceManager {
         final FileSystem fs;
 
         try {
-            fs = FileSystem.get(uri, cfg, user);
+            fs = FileSystem.get(uri, cfg, usr);
         }
         catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+
             throw new IOException(ie);
         }
 
         assert fs != null;
-
-        if (fs instanceof IgniteHadoopFileSystem)
-            //noinspection StringEquality
-            assert user == ((IgniteHadoopFileSystem)fs).user();
+        assert !(fs instanceof IgniteHadoopFileSystem) || F.eq(usr, ((IgniteHadoopFileSystem)fs).user());
 
         return fs;
     }
@@ -163,18 +164,16 @@ public class HadoopV2JobResourceManager {
                 stagingDir = new Path(new URI(mrDir));
 
                 if (download) {
-                    try (FileSystem fs = fileSystemForMrUser(stagingDir.toUri(), cfg)) {
-                        if (!fs.exists(stagingDir))
-                            throw new IgniteCheckedException("Failed to find map-reduce " +
-                                "submission directory (does not exist): " +
-                                stagingDir);
-
-                        if (!FileUtil.copy(fs, stagingDir, jobLocDir, false, cfg))
-                            throw new IgniteCheckedException("Failed to copy job submission " +
-                                "directory contents to local file system " +
-                                "[path=" + stagingDir + ", locDir=" + jobLocDir.getAbsolutePath()
-                                + ", jobId=" + jobId + ']');
-                    }
+                    // TODO: Create new ticket to investigate possibility closing it right-away.
+                    FileSystem fs = fileSystemForMrUser(stagingDir.toUri(), cfg);
+
+                    if (!fs.exists(stagingDir))
+                        throw new IgniteCheckedException("Failed to find map-reduce submission directory (does not exist): " +
+                            stagingDir);
+
+                    if (!FileUtil.copy(fs, stagingDir, jobLocDir, false, cfg))
+                        throw new IgniteCheckedException("Failed to copy job submission directory contents to local file system " +
+                            "[path=" + stagingDir + ", locDir=" + jobLocDir.getAbsolutePath() + ", jobId=" + jobId + ']');
                 }
 
                 File jarJobFile = new File(jobLocDir, "job.jar");
@@ -259,34 +258,35 @@ public class HadoopV2JobResourceManager {
 
             FileSystem dstFs = FileSystem.getLocal(cfg);
 
-            try (FileSystem srcFs = fileSystemForMrUser(srcPath.toUri(), cfg)) {
-                if (extract) {
-                    File archivesPath = new File(jobLocDir.getAbsolutePath(), ".cached-archives");
+            // TODO: Create new ticket to investigate possibility closing it right-away.
+            FileSystem srcFs = fileSystemForMrUser(srcPath.toUri(), cfg);
 
-                    if (!archivesPath.exists() && !archivesPath.mkdir())
-                        throw new IOException("Failed to create directory " +
-                            "[path=" + archivesPath + ", jobId=" + jobId + ']');
+            if (extract) {
+                File archivesPath = new File(jobLocDir.getAbsolutePath(), ".cached-archives");
 
-                    File archiveFile = new File(archivesPath, locName);
+                if (!archivesPath.exists() && !archivesPath.mkdir())
+                    throw new IOException("Failed to create directory " +
+                        "[path=" + archivesPath + ", jobId=" + jobId + ']');
 
-                    FileUtil.copy(srcFs, srcPath, dstFs, new Path(archiveFile.toString()), false, cfg);
+                File archiveFile = new File(archivesPath, locName);
 
-                    String archiveNameLC = archiveFile.getName().toLowerCase();
+                FileUtil.copy(srcFs, srcPath, dstFs, new Path(archiveFile.toString()), false, cfg);
 
-                    if (archiveNameLC.endsWith(".jar"))
-                        RunJar.unJar(archiveFile, dstPath);
-                    else if (archiveNameLC.endsWith(".zip"))
-                        FileUtil.unZip(archiveFile, dstPath);
-                    else if (archiveNameLC.endsWith(".tar.gz") ||
-                        archiveNameLC.endsWith(".tgz") ||
-                        archiveNameLC.endsWith(".tar"))
-                        FileUtil.unTar(archiveFile, dstPath);
-                    else
-                        throw new IOException("Cannot unpack archive [path=" + srcPath + ", jobId=" + jobId + ']');
-                }
+                String archiveNameLC = archiveFile.getName().toLowerCase();
+
+                if (archiveNameLC.endsWith(".jar"))
+                    RunJar.unJar(archiveFile, dstPath);
+                else if (archiveNameLC.endsWith(".zip"))
+                    FileUtil.unZip(archiveFile, dstPath);
+                else if (archiveNameLC.endsWith(".tar.gz") ||
+                    archiveNameLC.endsWith(".tgz") ||
+                    archiveNameLC.endsWith(".tar"))
+                    FileUtil.unTar(archiveFile, dstPath);
                 else
-                    FileUtil.copy(srcFs, srcPath, dstFs, new Path(dstPath.toString()), false, cfg);
+                    throw new IOException("Cannot unpack archive [path=" + srcPath + ", jobId=" + jobId + ']');
             }
+            else
+                FileUtil.copy(srcFs, srcPath, dstFs, new Path(dstPath.toString()), false, cfg);
         }
 
         if (!res.isEmpty() && rsrcNameProp != null)
@@ -341,11 +341,9 @@ public class HadoopV2JobResourceManager {
      */
     public void cleanupStagingDirectory() {
         try {
-            if (stagingDir != null) {
-                try (FileSystem fs = fileSystemForMrUser(stagingDir.toUri(), ctx.getJobConf())) {
-                    fs.delete(stagingDir, true);
-                }
-            }
+            if (stagingDir != null)
+                // TODO: Create new ticket to investigate possibility closing it right-away.
+                fileSystemForMrUser(stagingDir.toUri(), ctx.getJobConf()).delete(stagingDir, true);
         }
         catch (Exception e) {
             log.error("Failed to remove job staging directory [path=" + stagingDir + ", jobId=" + jobId + ']' , e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2488969b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
index e838df3..7033d22 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
@@ -462,9 +462,11 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
         assert user != null;
 
         String ugiUser;
+
         try {
             UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
 
+            // TODO: Ensure that UserGroupInformation.getCurrentUser() cannot return null, or add null-check.
             ugiUser = currUser.getShortUserName();
         }
         catch (IOException ioe) {