You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2017/12/12 23:59:16 UTC

[41/50] hadoop git commit: MAPREDUCE-7018. Apply erasure coding properly to framework tarball and support plain tar (miklos.szegedi@cloudera.com via rkanter)

MAPREDUCE-7018. Apply erasure coding properly to framework tarball and support plain tar (miklos.szegedi@cloudera.com via rkanter)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2316f526
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2316f526
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2316f526

Branch: refs/heads/HDFS-7240
Commit: 2316f526902e827b6c1b92a5bddef72d211bc742
Parents: 00129c5
Author: Robert Kanter <rk...@apache.org>
Authored: Mon Dec 11 14:00:42 2017 -0800
Committer: Robert Kanter <rk...@apache.org>
Committed: Mon Dec 11 14:00:42 2017 -0800

----------------------------------------------------------------------
 .../mapred/uploader/FrameworkUploader.java      | 59 +++++++++------
 .../mapred/uploader/TestFrameworkUploader.java  | 79 +++++++++++---------
 2 files changed, 79 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2316f526/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/main/java/org/apache/hadoop/mapred/uploader/FrameworkUploader.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/main/java/org/apache/hadoop/mapred/uploader/FrameworkUploader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/main/java/org/apache/hadoop/mapred/uploader/FrameworkUploader.java
index d1cd740..a374262 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/main/java/org/apache/hadoop/mapred/uploader/FrameworkUploader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/main/java/org/apache/hadoop/mapred/uploader/FrameworkUploader.java
@@ -81,7 +81,6 @@ public class FrameworkUploader implements Runnable {
 
   @VisibleForTesting
   OutputStream targetStream = null;
-  private Path targetPath = null;
   private String alias = null;
 
   private void printHelp(Options options) {
@@ -140,11 +139,12 @@ public class FrameworkUploader implements Runnable {
     }
   }
 
-  private void beginUpload() throws IOException, UploaderException {
+  @VisibleForTesting
+  void beginUpload() throws IOException, UploaderException {
     if (targetStream == null) {
       validateTargetPath();
       int lastIndex = target.indexOf('#');
-      targetPath =
+      Path targetPath =
           new Path(
               target.substring(
                   0, lastIndex == -1 ? target.length() : lastIndex));
@@ -153,7 +153,37 @@ public class FrameworkUploader implements Runnable {
           targetPath.getName();
       LOG.info("Target " + targetPath);
       FileSystem fileSystem = targetPath.getFileSystem(new Configuration());
-      targetStream = fileSystem.create(targetPath, true);
+
+      targetStream = null;
+      if (fileSystem instanceof DistributedFileSystem) {
+        LOG.info("Set replication to " +
+            replication + " for path: " + targetPath);
+        LOG.info("Disabling Erasure Coding for path: " + targetPath);
+        DistributedFileSystem dfs = (DistributedFileSystem)fileSystem;
+        DistributedFileSystem.HdfsDataOutputStreamBuilder builder =
+            dfs.createFile(targetPath)
+            .overwrite(true)
+            .ecPolicyName(
+                SystemErasureCodingPolicies.getReplicationPolicy().getName());
+        if (replication > 0) {
+          builder.replication(replication);
+        }
+        targetStream = builder.build();
+      } else {
+        LOG.warn("Cannot set replication to " +
+            replication + " for path: " + targetPath +
+            " on a non-distributed fileystem " +
+            fileSystem.getClass().getName());
+      }
+      if (targetStream == null) {
+        targetStream = fileSystem.create(targetPath, true);
+      }
+
+      if (targetPath.getName().endsWith("gz") ||
+          targetPath.getName().endsWith("tgz")) {
+        LOG.info("Creating GZip");
+        targetStream = new GZIPOutputStream(targetStream);
+      }
     }
   }
 
@@ -162,7 +192,7 @@ public class FrameworkUploader implements Runnable {
     beginUpload();
     LOG.info("Compressing tarball");
     try (TarArchiveOutputStream out = new TarArchiveOutputStream(
-        new GZIPOutputStream(targetStream))) {
+        targetStream)) {
       for (String fullPath : filteredInputFiles) {
         LOG.info("Adding " + fullPath);
         File file = new File(fullPath);
@@ -178,25 +208,6 @@ public class FrameworkUploader implements Runnable {
         targetStream.close();
       }
     }
-
-    if (targetPath == null) {
-      return;
-    }
-
-    // Set file attributes
-    FileSystem fileSystem = targetPath.getFileSystem(new Configuration());
-    if (fileSystem instanceof DistributedFileSystem) {
-      LOG.info("Disabling Erasure Coding for path: " + targetPath);
-      DistributedFileSystem dfs = (DistributedFileSystem) fileSystem;
-      dfs.setErasureCodingPolicy(targetPath,
-          SystemErasureCodingPolicies.getReplicationPolicy().getName());
-    }
-
-    if (replication > 0) {
-      LOG.info("Set replication to " +
-          replication + " for path: " + targetPath);
-      fileSystem.setReplication(targetPath, replication);
-    }
   }
 
   private void parseLists() throws UploaderException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2316f526/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/test/java/org/apache/hadoop/mapred/uploader/TestFrameworkUploader.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/test/java/org/apache/hadoop/mapred/uploader/TestFrameworkUploader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/test/java/org/apache/hadoop/mapred/uploader/TestFrameworkUploader.java
index 9d03165..f3e4fc5 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/test/java/org/apache/hadoop/mapred/uploader/TestFrameworkUploader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/test/java/org/apache/hadoop/mapred/uploader/TestFrameworkUploader.java
@@ -30,6 +30,7 @@ import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.PrintStream;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -171,46 +172,54 @@ public class TestFrameworkUploader {
    */
   @Test
   public void testBuildTarBall() throws IOException, UploaderException {
-    File parent = new File(testDir);
-    try {
-      parent.deleteOnExit();
-      FrameworkUploader uploader = prepareTree(parent);
+    String[] testFiles = {"upload.tar", "upload.tar.gz"};
+    for (String testFile: testFiles) {
+      File parent = new File(testDir);
+      try {
+        parent.deleteOnExit();
+        FrameworkUploader uploader = prepareTree(parent);
 
-      File gzipFile = new File("upload.tar.gz");
-      gzipFile.deleteOnExit();
-      Assert.assertTrue("Creating output", gzipFile.createNewFile());
-      uploader.targetStream = new FileOutputStream(gzipFile);
+        File gzipFile =
+            new File(parent.getAbsolutePath() + "/" + testFile);
+        gzipFile.deleteOnExit();
 
-      uploader.buildPackage();
+        uploader.target =
+            "file:///" + gzipFile.getAbsolutePath();
+        uploader.beginUpload();
+        uploader.buildPackage();
+        InputStream stream = new FileInputStream(gzipFile);
+        if (gzipFile.getName().endsWith(".gz")) {
+          stream = new GZIPInputStream(stream);
+        }
 
-      TarArchiveInputStream result = null;
-      try {
-        result =
-            new TarArchiveInputStream(
-                new GZIPInputStream(new FileInputStream(gzipFile)));
-        Set<String> fileNames = new HashSet<>();
-        Set<Long> sizes = new HashSet<>();
-        TarArchiveEntry entry1 = result.getNextTarEntry();
-        fileNames.add(entry1.getName());
-        sizes.add(entry1.getSize());
-        TarArchiveEntry entry2 = result.getNextTarEntry();
-        fileNames.add(entry2.getName());
-        sizes.add(entry2.getSize());
-        Assert.assertTrue(
-            "File name error", fileNames.contains("a.jar"));
-        Assert.assertTrue(
-            "File size error", sizes.contains((long) 13));
-        Assert.assertTrue(
-            "File name error", fileNames.contains("b.jar"));
-        Assert.assertTrue(
-            "File size error", sizes.contains((long) 14));
-      } finally {
-        if (result != null) {
-          result.close();
+        TarArchiveInputStream result = null;
+        try {
+          result =
+              new TarArchiveInputStream(stream);
+          Set<String> fileNames = new HashSet<>();
+          Set<Long> sizes = new HashSet<>();
+          TarArchiveEntry entry1 = result.getNextTarEntry();
+          fileNames.add(entry1.getName());
+          sizes.add(entry1.getSize());
+          TarArchiveEntry entry2 = result.getNextTarEntry();
+          fileNames.add(entry2.getName());
+          sizes.add(entry2.getSize());
+          Assert.assertTrue(
+              "File name error", fileNames.contains("a.jar"));
+          Assert.assertTrue(
+              "File size error", sizes.contains((long) 13));
+          Assert.assertTrue(
+              "File name error", fileNames.contains("b.jar"));
+          Assert.assertTrue(
+              "File size error", sizes.contains((long) 14));
+        } finally {
+          if (result != null) {
+            result.close();
+          }
         }
+      } finally {
+        FileUtils.deleteDirectory(parent);
       }
-    } finally {
-      FileUtils.deleteDirectory(parent);
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org