You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/06/30 01:30:48 UTC

[32/50] [abbrv] hadoop git commit: HADOOP-11203. Allow ditscp to accept bandwitdh in fraction MegaBytes. Contributed by Raju Bairishetti

HADOOP-11203. Allow ditscp to accept bandwitdh in fraction MegaBytes. Contributed by Raju Bairishetti


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/04fdf610
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/04fdf610
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/04fdf610

Branch: refs/heads/YARN-2928
Commit: 04fdf61066ca6be3eeea8eb9431d67923399fb90
Parents: 711141d
Author: Amareshwari Sriramadasu <am...@apache.org>
Authored: Fri Jun 26 09:52:06 2015 +0530
Committer: Zhijie Shen <zj...@apache.org>
Committed: Mon Jun 29 10:28:26 2015 -0700

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt    |  3 +++
 .../org/apache/hadoop/tools/DistCpConstants.java   |  2 +-
 .../apache/hadoop/tools/DistCpOptionSwitch.java    |  5 +++--
 .../org/apache/hadoop/tools/DistCpOptions.java     |  6 +++---
 .../org/apache/hadoop/tools/OptionsParser.java     |  2 +-
 .../org/apache/hadoop/tools/mapred/CopyMapper.java | 17 ++++++++++++++++-
 .../tools/mapred/RetriableFileCopyCommand.java     |  2 +-
 .../hadoop/tools/util/ThrottledInputStream.java    |  6 +++---
 .../org/apache/hadoop/tools/TestOptionsParser.java | 16 +++++++++-------
 9 files changed, 40 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/04fdf610/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index b2975dc..5901794 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -53,6 +53,9 @@ Trunk (Unreleased)
 
   IMPROVEMENTS
 
+    HADOOP-11203. Allow ditscp to accept bandwitdh in fraction MegaBytes
+    (Raju Bairishetti via amareshwari)
+
     HADOOP-8017. Configure hadoop-main pom to get rid of M2E plugin execution
     not covered (Eric Charles via bobby)
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04fdf610/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
index 21dca62..93d6a62 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
@@ -30,7 +30,7 @@ public class DistCpConstants {
   public static final int DEFAULT_MAPS = 20;
 
   /* Default bandwidth if none specified */
-  public static final int DEFAULT_BANDWIDTH_MB = 100;
+  public static final float DEFAULT_BANDWIDTH_MB = 100;
 
   /* Default strategy for copying. Implementation looked up
      from distcp-default.xml

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04fdf610/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
index ed4a0b2..f16a5d2 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
@@ -174,10 +174,11 @@ public enum DistCpOptionSwitch {
               "copied to <= n bytes")),
 
   /**
-   * Specify bandwidth per map in MB
+   * Specify bandwidth per map in MB, accepts bandwidth as a fraction
    */
   BANDWIDTH(DistCpConstants.CONF_LABEL_BANDWIDTH_MB,
-      new Option("bandwidth", true, "Specify bandwidth per map in MB")),
+      new Option("bandwidth", true, "Specify bandwidth per map in MB,"
+          + " accepts bandwidth as a fraction.")),
 
   /**
    * Path containing a list of strings, which when found in the path of

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04fdf610/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
index 302b626..5b4ccf9 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
@@ -47,7 +47,7 @@ public class DistCpOptions {
   public static final int maxNumListstatusThreads = 40;
   private int numListstatusThreads = 0;  // Indicates that flag is not set.
   private int maxMaps = DistCpConstants.DEFAULT_MAPS;
-  private int mapBandwidth = DistCpConstants.DEFAULT_BANDWIDTH_MB;
+  private float mapBandwidth = DistCpConstants.DEFAULT_BANDWIDTH_MB;
 
   private String sslConfigurationFile;
 
@@ -366,7 +366,7 @@ public class DistCpOptions {
    *
    * @return Bandwidth in MB
    */
-  public int getMapBandwidth() {
+  public float getMapBandwidth() {
     return mapBandwidth;
   }
 
@@ -375,7 +375,7 @@ public class DistCpOptions {
    *
    * @param mapBandwidth - per map bandwidth
    */
-  public void setMapBandwidth(int mapBandwidth) {
+  public void setMapBandwidth(float mapBandwidth) {
     assert mapBandwidth > 0 : "Bandwidth " + mapBandwidth + " is invalid (should be > 0)";
     this.mapBandwidth = mapBandwidth;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04fdf610/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
index 37add1e..b414513 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
@@ -293,7 +293,7 @@ public class OptionsParser {
                                      DistCpOptions option) {
     if (command.hasOption(DistCpOptionSwitch.BANDWIDTH.getSwitch())) {
       try {
-        Integer mapBandwidth = Integer.parseInt(
+        Float mapBandwidth = Float.parseFloat(
             getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()).trim());
         if (mapBandwidth <= 0) {
           throw new IllegalArgumentException("Bandwidth specified is not " +

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04fdf610/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
index cca36df..f75fe76 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
@@ -62,6 +62,8 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
     BYTESEXPECTED,// Number of bytes expected to be copied.
     BYTESFAILED,  // Number of bytes that failed to be copied.
     BYTESSKIPPED, // Number of bytes that were skipped from copy.
+    SLEEP_TIME_MS, // Time map slept while trying to honor bandwidth cap.
+    BANDWIDTH_IN_BYTES, // Effective transfer rate in B/s.
   }
 
   /**
@@ -85,7 +87,9 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
   private EnumSet<FileAttribute> preserve = EnumSet.noneOf(FileAttribute.class);
 
   private FileSystem targetFS = null;
-  private Path    targetWorkPath = null;
+  private Path targetWorkPath = null;
+  private long startEpoch;
+  private long totalBytesCopied = 0;
 
   /**
    * Implementation of the Mapper::setup() method. This extracts the DistCp-
@@ -118,6 +122,7 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
     if (conf.get(DistCpConstants.CONF_LABEL_SSL_CONF) != null) {
       initializeSSLConf(context);
     }
+    startEpoch = System.currentTimeMillis();
   }
 
   /**
@@ -288,6 +293,7 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
     incrementCounter(context, Counter.BYTESEXPECTED, sourceFileStatus.getLen());
     incrementCounter(context, Counter.BYTESCOPIED, bytesCopied);
     incrementCounter(context, Counter.COPY, 1);
+    totalBytesCopied += bytesCopied;
   }
 
   private void createTargetDirsWithRetry(String description,
@@ -373,4 +379,13 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
       return false;
     }
   }
+
+  @Override
+  protected void cleanup(Context context)
+      throws IOException, InterruptedException {
+    super.cleanup(context);
+    long secs = (System.currentTimeMillis() - startEpoch) / 1000;
+    incrementCounter(context, Counter.BANDWIDTH_IN_BYTES,
+        totalBytesCopied / ((secs == 0 ? 1 : secs)));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04fdf610/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
index 65d644b..6b5078c 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
@@ -293,7 +293,7 @@ public class RetriableFileCopyCommand extends RetriableCommand {
       Configuration conf) throws IOException {
     try {
       FileSystem fs = path.getFileSystem(conf);
-      long bandwidthMB = conf.getInt(DistCpConstants.CONF_LABEL_BANDWIDTH_MB,
+      float bandwidthMB = conf.getFloat(DistCpConstants.CONF_LABEL_BANDWIDTH_MB,
               DistCpConstants.DEFAULT_BANDWIDTH_MB);
       FSDataInputStream in = fs.open(path);
       return new ThrottledInputStream(in, bandwidthMB * 1024 * 1024);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04fdf610/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java
index 9e435d9..2be8ef0 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java
@@ -39,7 +39,7 @@ import com.google.common.base.Preconditions;
 public class ThrottledInputStream extends InputStream {
 
   private final InputStream rawStream;
-  private final long maxBytesPerSec;
+  private final float maxBytesPerSec;
   private final long startTime = System.currentTimeMillis();
 
   private long bytesRead = 0;
@@ -51,8 +51,8 @@ public class ThrottledInputStream extends InputStream {
     this(rawStream, Long.MAX_VALUE);
   }
 
-  public ThrottledInputStream(InputStream rawStream, long maxBytesPerSec) {
-    assert maxBytesPerSec > 0 : "Bandwidth " + maxBytesPerSec + " is invalid"; 
+  public ThrottledInputStream(InputStream rawStream, float maxBytesPerSec) {
+    assert maxBytesPerSec > 0 : "Bandwidth " + maxBytesPerSec + " is invalid";
     this.rawStream = rawStream;
     this.maxBytesPerSec = maxBytesPerSec;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04fdf610/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
index b9d9ada..616872b 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
@@ -32,6 +32,8 @@ import java.util.NoSuchElementException;
 
 public class TestOptionsParser {
 
+  private static final float DELTA = 0.001f;
+
   @Test
   public void testParseIgnoreFailure() {
     DistCpOptions options = OptionsParser.parse(new String[] {
@@ -104,14 +106,14 @@ public class TestOptionsParser {
     DistCpOptions options = OptionsParser.parse(new String[] {
         "hdfs://localhost:8020/source/first",
         "hdfs://localhost:8020/target/"});
-    Assert.assertEquals(options.getMapBandwidth(), DistCpConstants.DEFAULT_BANDWIDTH_MB);
+    Assert.assertEquals(options.getMapBandwidth(), DistCpConstants.DEFAULT_BANDWIDTH_MB, DELTA);
 
     options = OptionsParser.parse(new String[] {
         "-bandwidth",
-        "11",
+        "11.2",
         "hdfs://localhost:8020/source/first",
         "hdfs://localhost:8020/target/"});
-    Assert.assertEquals(options.getMapBandwidth(), 11);
+    Assert.assertEquals(options.getMapBandwidth(), 11.2, DELTA);
   }
 
   @Test(expected=IllegalArgumentException.class)
@@ -585,8 +587,8 @@ public class TestOptionsParser {
     options.appendToConf(conf);
     Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false));
     Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.ATOMIC_COMMIT.getConfigLabel(), false));
-    Assert.assertEquals(conf.getInt(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1),
-        DistCpConstants.DEFAULT_BANDWIDTH_MB);
+    Assert.assertEquals(conf.getFloat(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1),
+        DistCpConstants.DEFAULT_BANDWIDTH_MB, DELTA);
 
     conf = new Configuration();
     Assert.assertFalse(conf.getBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false));
@@ -597,14 +599,14 @@ public class TestOptionsParser {
         "-delete",
         "-pu",
         "-bandwidth",
-        "11",
+        "11.2",
         "hdfs://localhost:8020/source/first",
         "hdfs://localhost:8020/target/"});
     options.appendToConf(conf);
     Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false));
     Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.DELETE_MISSING.getConfigLabel(), false));
     Assert.assertEquals(conf.get(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel()), "U");
-    Assert.assertEquals(conf.getInt(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1), 11);
+    Assert.assertEquals(conf.getFloat(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1), 11.2, DELTA);
   }
 
   @Test