You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2018/01/18 23:48:03 UTC

[32/49] hadoop git commit: MAPREDUCE-7032. Add the ability to specify a delayed replication count (miklos.szegedi@cloudera.com via rkanter)

MAPREDUCE-7032. Add the ability to specify a delayed replication count (miklos.szegedi@cloudera.com via rkanter)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d716084f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d716084f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d716084f

Branch: refs/heads/YARN-7402
Commit: d716084f4503bf826ef10424d7025ea1ff4ee104
Parents: 5ac1099
Author: Robert Kanter <rk...@apache.org>
Authored: Tue Jan 16 10:45:45 2018 -0800
Committer: Robert Kanter <rk...@apache.org>
Committed: Tue Jan 16 10:45:45 2018 -0800

----------------------------------------------------------------------
 .../mapred/uploader/FrameworkUploader.java      | 124 +++++++++++++++++--
 .../mapred/uploader/TestFrameworkUploader.java  |  21 +++-
 2 files changed, 128 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d716084f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/main/java/org/apache/hadoop/mapred/uploader/FrameworkUploader.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/main/java/org/apache/hadoop/mapred/uploader/FrameworkUploader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/main/java/org/apache/hadoop/mapred/uploader/FrameworkUploader.java
index 899689d..ee482d7 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/main/java/org/apache/hadoop/mapred/uploader/FrameworkUploader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/main/java/org/apache/hadoop/mapred/uploader/FrameworkUploader.java
@@ -25,6 +25,8 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.compress.archivers.ArchiveEntry;
 import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -43,6 +45,8 @@ import java.io.OutputStream;
 import java.nio.file.Files;
 import java.nio.file.NotLinkException;
 import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -73,7 +77,15 @@ public class FrameworkUploader implements Runnable {
   @VisibleForTesting
   String target = null;
   @VisibleForTesting
-  short replication = 10;
+  Path targetPath = null;
+  @VisibleForTesting
+  short initialReplication = 3;
+  @VisibleForTesting
+  short finalReplication = 10;
+  @VisibleForTesting
+  short acceptableReplication = 9;
+  @VisibleForTesting
+  int timeout = 10;
   private boolean ignoreSymlink = false;
 
   @VisibleForTesting
@@ -101,9 +113,10 @@ public class FrameworkUploader implements Runnable {
       LOG.info(
           "Suggested mapreduce.application.classpath $PWD/" + alias + "/*");
       System.out.println("Suggested classpath $PWD/" + alias + "/*");
-    } catch (UploaderException|IOException e) {
+    } catch (UploaderException|IOException|InterruptedException e) {
       LOG.error("Error in execution " + e.getMessage());
       e.printStackTrace();
+      throw new RuntimeException(e);
     }
   }
 
@@ -147,7 +160,7 @@ public class FrameworkUploader implements Runnable {
     if (targetStream == null) {
       validateTargetPath();
       int lastIndex = target.indexOf('#');
-      Path targetPath =
+      targetPath =
           new Path(
               target.substring(
                   0, lastIndex == -1 ? target.length() : lastIndex));
@@ -160,7 +173,7 @@ public class FrameworkUploader implements Runnable {
       targetStream = null;
       if (fileSystem instanceof DistributedFileSystem) {
         LOG.info("Set replication to " +
-            replication + " for path: " + targetPath);
+            initialReplication + " for path: " + targetPath);
         LOG.info("Disabling Erasure Coding for path: " + targetPath);
         DistributedFileSystem dfs = (DistributedFileSystem)fileSystem;
         DistributedFileSystem.HdfsDataOutputStreamBuilder builder =
@@ -168,13 +181,13 @@ public class FrameworkUploader implements Runnable {
             .overwrite(true)
             .ecPolicyName(
                 SystemErasureCodingPolicies.getReplicationPolicy().getName());
-        if (replication > 0) {
-          builder.replication(replication);
+        if (initialReplication > 0) {
+          builder.replication(initialReplication);
         }
         targetStream = builder.build();
       } else {
         LOG.warn("Cannot set replication to " +
-            replication + " for path: " + targetPath +
+            initialReplication + " for path: " + targetPath +
             " on a non-distributed fileystem " +
             fileSystem.getClass().getName());
       }
@@ -190,8 +203,70 @@ public class FrameworkUploader implements Runnable {
     }
   }
 
+  private long getSmallestReplicatedBlockCount()
+      throws IOException {
+    FileSystem fileSystem = targetPath.getFileSystem(new Configuration());
+    FileStatus status = fileSystem.getFileStatus(targetPath);
+    long length = status.getLen();
+    HashMap<Long, Integer> blockCount = new HashMap<>();
+
+    // Start with 0s for each offset
+    for (long offset = 0; offset < length; offset +=status.getBlockSize()) {
+      blockCount.put(offset, 0);
+    }
+
+    // Count blocks
+    BlockLocation[] locations = fileSystem.getFileBlockLocations(
+        targetPath, 0, length);
+    for(BlockLocation location: locations) {
+      final int replicas = location.getHosts().length;
+      blockCount.compute(
+          location.getOffset(), (key, value) -> value + replicas);
+    }
+
+    // Print out the results
+    for (long offset = 0; offset < length; offset +=status.getBlockSize()) {
+      LOG.info(String.format(
+          "Replication counts offset:%d blocks:%d",
+          offset, blockCount.get(offset)));
+    }
+
+    return Collections.min(blockCount.values());
+  }
+
+  private void endUpload()
+      throws IOException, InterruptedException {
+    FileSystem fileSystem = targetPath.getFileSystem(new Configuration());
+    if (fileSystem instanceof DistributedFileSystem) {
+      fileSystem.setReplication(targetPath, finalReplication);
+      LOG.info("Set replication to " +
+          finalReplication + " for path: " + targetPath);
+      long startTime = System.currentTimeMillis();
+      long endTime = startTime;
+      long currentReplication = 0;
+      while(endTime - startTime < timeout * 1000 &&
+           currentReplication < acceptableReplication) {
+        Thread.sleep(1000);
+        endTime = System.currentTimeMillis();
+        currentReplication = getSmallestReplicatedBlockCount();
+      }
+      if (endTime - startTime >= timeout * 1000) {
+        LOG.error(String.format(
+            "Timed out after %d seconds while waiting for acceptable" +
+                " replication of %d (current replication is %d)",
+            timeout, acceptableReplication, currentReplication));
+      }
+    } else {
+      LOG.info("Cannot set replication to " +
+          finalReplication + " for path: " + targetPath +
+          " on a non-distributed fileystem " +
+          fileSystem.getClass().getName());
+    }
+  }
+
   @VisibleForTesting
-  void buildPackage() throws IOException, UploaderException {
+  void buildPackage()
+      throws IOException, UploaderException, InterruptedException {
     beginUpload();
     LOG.info("Compressing tarball");
     try (TarArchiveOutputStream out = new TarArchiveOutputStream(
@@ -206,6 +281,7 @@ public class FrameworkUploader implements Runnable {
           out.closeArchiveEntry();
         }
       }
+      endUpload();
     } finally {
       if (targetStream != null) {
         targetStream.close();
@@ -378,8 +454,21 @@ public class FrameworkUploader implements Runnable {
         .hasArg().create("target"));
     opts.addOption(OptionBuilder
         .withDescription(
-            "Desired replication count")
-        .hasArg().create("replication"));
+            "Desired initial replication count. Default 3.")
+        .hasArg().create("initialReplication"));
+    opts.addOption(OptionBuilder
+        .withDescription(
+            "Desired final replication count. Default 10.")
+        .hasArg().create("finalReplication"));
+    opts.addOption(OptionBuilder
+        .withDescription(
+            "Desired acceptable replication count. Default 9.")
+        .hasArg().create("acceptableReplication"));
+    opts.addOption(OptionBuilder
+        .withDescription(
+            "Desired timeout for the acceptable" +
+                " replication in seconds. Default 10")
+        .hasArg().create("timeout"));
     opts.addOption(OptionBuilder
         .withDescription("Ignore symlinks into the same directory")
         .create("nosymlink"));
@@ -395,8 +484,19 @@ public class FrameworkUploader implements Runnable {
         "whitelist", DefaultJars.DEFAULT_MR_JARS);
     blacklist = parser.getCommandLine().getOptionValue(
         "blacklist", DefaultJars.DEFAULT_EXCLUDED_MR_JARS);
-    replication = Short.parseShort(parser.getCommandLine().getOptionValue(
-        "replication", "10"));
+    initialReplication =
+        Short.parseShort(parser.getCommandLine().getOptionValue(
+            "initialReplication", "3"));
+    finalReplication =
+        Short.parseShort(parser.getCommandLine().getOptionValue(
+            "finalReplication", "10"));
+    acceptableReplication =
+        Short.parseShort(
+            parser.getCommandLine().getOptionValue(
+                "acceptableReplication", "9"));
+    timeout =
+        Integer.parseInt(
+            parser.getCommandLine().getOptionValue("timeout", "10"));
     if (parser.getCommandLine().hasOption("nosymlink")) {
       ignoreSymlink = true;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d716084f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/test/java/org/apache/hadoop/mapred/uploader/TestFrameworkUploader.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/test/java/org/apache/hadoop/mapred/uploader/TestFrameworkUploader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/test/java/org/apache/hadoop/mapred/uploader/TestFrameworkUploader.java
index ef64bfe..61c0b12 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/test/java/org/apache/hadoop/mapred/uploader/TestFrameworkUploader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/test/java/org/apache/hadoop/mapred/uploader/TestFrameworkUploader.java
@@ -104,7 +104,10 @@ public class TestFrameworkUploader {
             "-blacklist", "C",
             "-fs", "hdfs://C:8020",
             "-target", "D",
-            "-replication", "100"};
+            "-initialReplication", "100",
+            "-acceptableReplication", "120",
+            "-finalReplication", "140",
+            "-timeout", "10"};
     FrameworkUploader uploader = new FrameworkUploader();
     boolean success = uploader.parseArguments(args);
     Assert.assertTrue("Expected to print help", success);
@@ -116,8 +119,14 @@ public class TestFrameworkUploader {
         uploader.blacklist);
     Assert.assertEquals("Target mismatch", "hdfs://C:8020/D",
         uploader.target);
-    Assert.assertEquals("Replication mismatch", 100,
-        uploader.replication);
+    Assert.assertEquals("Initial replication mismatch", 100,
+        uploader.initialReplication);
+    Assert.assertEquals("Acceptable replication mismatch", 120,
+        uploader.acceptableReplication);
+    Assert.assertEquals("Final replication mismatch", 140,
+        uploader.finalReplication);
+    Assert.assertEquals("Timeout mismatch", 10,
+        uploader.timeout);
   }
 
   /**
@@ -176,7 +185,8 @@ public class TestFrameworkUploader {
    * Test building a tarball from source jars.
    */
   @Test
-  public void testBuildTarBall() throws IOException, UploaderException {
+  public void testBuildTarBall()
+      throws IOException, UploaderException, InterruptedException {
     String[] testFiles = {"upload.tar", "upload.tar.gz"};
     for (String testFile: testFiles) {
       File parent = new File(testDir);
@@ -232,7 +242,8 @@ public class TestFrameworkUploader {
    * Test upload to HDFS.
    */
   @Test
-  public void testUpload() throws IOException, UploaderException {
+  public void testUpload()
+      throws IOException, UploaderException, InterruptedException {
     final String fileName = "/upload.tar.gz";
     File parent = new File(testDir);
     try {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org