You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by am...@apache.org on 2015/06/26 06:22:34 UTC
hadoop git commit: HADOOP-11203. Allow ditscp to accept bandwitdh in
fraction MegaBytes. Contributed by Raju Bairishetti
Repository: hadoop
Updated Branches:
refs/heads/trunk 1403b84b1 -> 8ef07f767
HADOOP-11203. Allow ditscp to accept bandwitdh in fraction MegaBytes. Contributed by Raju Bairishetti
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8ef07f76
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8ef07f76
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8ef07f76
Branch: refs/heads/trunk
Commit: 8ef07f767f0421b006b0fc77e5daf36c7b06abf1
Parents: 1403b84
Author: Amareshwari Sriramadasu <am...@apache.org>
Authored: Fri Jun 26 09:52:06 2015 +0530
Committer: Amareshwari Sriramadasu <am...@apache.org>
Committed: Fri Jun 26 09:52:06 2015 +0530
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 3 +++
.../org/apache/hadoop/tools/DistCpConstants.java | 2 +-
.../apache/hadoop/tools/DistCpOptionSwitch.java | 5 +++--
.../org/apache/hadoop/tools/DistCpOptions.java | 6 +++---
.../org/apache/hadoop/tools/OptionsParser.java | 2 +-
.../org/apache/hadoop/tools/mapred/CopyMapper.java | 17 ++++++++++++++++-
.../tools/mapred/RetriableFileCopyCommand.java | 2 +-
.../hadoop/tools/util/ThrottledInputStream.java | 6 +++---
.../org/apache/hadoop/tools/TestOptionsParser.java | 16 +++++++++-------
9 files changed, 40 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8ef07f76/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index b2975dc..5901794 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -53,6 +53,9 @@ Trunk (Unreleased)
IMPROVEMENTS
+ HADOOP-11203. Allow ditscp to accept bandwitdh in fraction MegaBytes
+ (Raju Bairishetti via amareshwari)
+
HADOOP-8017. Configure hadoop-main pom to get rid of M2E plugin execution
not covered (Eric Charles via bobby)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8ef07f76/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
index 21dca62..93d6a62 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
@@ -30,7 +30,7 @@ public class DistCpConstants {
public static final int DEFAULT_MAPS = 20;
/* Default bandwidth if none specified */
- public static final int DEFAULT_BANDWIDTH_MB = 100;
+ public static final float DEFAULT_BANDWIDTH_MB = 100;
/* Default strategy for copying. Implementation looked up
from distcp-default.xml
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8ef07f76/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
index ed4a0b2..f16a5d2 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
@@ -174,10 +174,11 @@ public enum DistCpOptionSwitch {
"copied to <= n bytes")),
/**
- * Specify bandwidth per map in MB
+ * Specify bandwidth per map in MB, accepts bandwidth as a fraction
*/
BANDWIDTH(DistCpConstants.CONF_LABEL_BANDWIDTH_MB,
- new Option("bandwidth", true, "Specify bandwidth per map in MB")),
+ new Option("bandwidth", true, "Specify bandwidth per map in MB,"
+ + " accepts bandwidth as a fraction.")),
/**
* Path containing a list of strings, which when found in the path of
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8ef07f76/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
index 302b626..5b4ccf9 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
@@ -47,7 +47,7 @@ public class DistCpOptions {
public static final int maxNumListstatusThreads = 40;
private int numListstatusThreads = 0; // Indicates that flag is not set.
private int maxMaps = DistCpConstants.DEFAULT_MAPS;
- private int mapBandwidth = DistCpConstants.DEFAULT_BANDWIDTH_MB;
+ private float mapBandwidth = DistCpConstants.DEFAULT_BANDWIDTH_MB;
private String sslConfigurationFile;
@@ -366,7 +366,7 @@ public class DistCpOptions {
*
* @return Bandwidth in MB
*/
- public int getMapBandwidth() {
+ public float getMapBandwidth() {
return mapBandwidth;
}
@@ -375,7 +375,7 @@ public class DistCpOptions {
*
* @param mapBandwidth - per map bandwidth
*/
- public void setMapBandwidth(int mapBandwidth) {
+ public void setMapBandwidth(float mapBandwidth) {
assert mapBandwidth > 0 : "Bandwidth " + mapBandwidth + " is invalid (should be > 0)";
this.mapBandwidth = mapBandwidth;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8ef07f76/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
index 37add1e..b414513 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
@@ -293,7 +293,7 @@ public class OptionsParser {
DistCpOptions option) {
if (command.hasOption(DistCpOptionSwitch.BANDWIDTH.getSwitch())) {
try {
- Integer mapBandwidth = Integer.parseInt(
+ Float mapBandwidth = Float.parseFloat(
getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()).trim());
if (mapBandwidth <= 0) {
throw new IllegalArgumentException("Bandwidth specified is not " +
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8ef07f76/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
index cca36df..f75fe76 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
@@ -62,6 +62,8 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
BYTESEXPECTED,// Number of bytes expected to be copied.
BYTESFAILED, // Number of bytes that failed to be copied.
BYTESSKIPPED, // Number of bytes that were skipped from copy.
+ SLEEP_TIME_MS, // Time map slept while trying to honor bandwidth cap.
+ BANDWIDTH_IN_BYTES, // Effective transfer rate in B/s.
}
/**
@@ -85,7 +87,9 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
private EnumSet<FileAttribute> preserve = EnumSet.noneOf(FileAttribute.class);
private FileSystem targetFS = null;
- private Path targetWorkPath = null;
+ private Path targetWorkPath = null;
+ private long startEpoch;
+ private long totalBytesCopied = 0;
/**
* Implementation of the Mapper::setup() method. This extracts the DistCp-
@@ -118,6 +122,7 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
if (conf.get(DistCpConstants.CONF_LABEL_SSL_CONF) != null) {
initializeSSLConf(context);
}
+ startEpoch = System.currentTimeMillis();
}
/**
@@ -288,6 +293,7 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
incrementCounter(context, Counter.BYTESEXPECTED, sourceFileStatus.getLen());
incrementCounter(context, Counter.BYTESCOPIED, bytesCopied);
incrementCounter(context, Counter.COPY, 1);
+ totalBytesCopied += bytesCopied;
}
private void createTargetDirsWithRetry(String description,
@@ -373,4 +379,13 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
return false;
}
}
+
+ @Override
+ protected void cleanup(Context context)
+ throws IOException, InterruptedException {
+ super.cleanup(context);
+ long secs = (System.currentTimeMillis() - startEpoch) / 1000;
+ incrementCounter(context, Counter.BANDWIDTH_IN_BYTES,
+ totalBytesCopied / ((secs == 0 ? 1 : secs)));
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8ef07f76/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
index 65d644b..6b5078c 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
@@ -293,7 +293,7 @@ public class RetriableFileCopyCommand extends RetriableCommand {
Configuration conf) throws IOException {
try {
FileSystem fs = path.getFileSystem(conf);
- long bandwidthMB = conf.getInt(DistCpConstants.CONF_LABEL_BANDWIDTH_MB,
+ float bandwidthMB = conf.getFloat(DistCpConstants.CONF_LABEL_BANDWIDTH_MB,
DistCpConstants.DEFAULT_BANDWIDTH_MB);
FSDataInputStream in = fs.open(path);
return new ThrottledInputStream(in, bandwidthMB * 1024 * 1024);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8ef07f76/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java
index 9e435d9..2be8ef0 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java
@@ -39,7 +39,7 @@ import com.google.common.base.Preconditions;
public class ThrottledInputStream extends InputStream {
private final InputStream rawStream;
- private final long maxBytesPerSec;
+ private final float maxBytesPerSec;
private final long startTime = System.currentTimeMillis();
private long bytesRead = 0;
@@ -51,8 +51,8 @@ public class ThrottledInputStream extends InputStream {
this(rawStream, Long.MAX_VALUE);
}
- public ThrottledInputStream(InputStream rawStream, long maxBytesPerSec) {
- assert maxBytesPerSec > 0 : "Bandwidth " + maxBytesPerSec + " is invalid";
+ public ThrottledInputStream(InputStream rawStream, float maxBytesPerSec) {
+ assert maxBytesPerSec > 0 : "Bandwidth " + maxBytesPerSec + " is invalid";
this.rawStream = rawStream;
this.maxBytesPerSec = maxBytesPerSec;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8ef07f76/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
index b9d9ada..616872b 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
@@ -32,6 +32,8 @@ import java.util.NoSuchElementException;
public class TestOptionsParser {
+ private static final float DELTA = 0.001f;
+
@Test
public void testParseIgnoreFailure() {
DistCpOptions options = OptionsParser.parse(new String[] {
@@ -104,14 +106,14 @@ public class TestOptionsParser {
DistCpOptions options = OptionsParser.parse(new String[] {
"hdfs://localhost:8020/source/first",
"hdfs://localhost:8020/target/"});
- Assert.assertEquals(options.getMapBandwidth(), DistCpConstants.DEFAULT_BANDWIDTH_MB);
+ Assert.assertEquals(options.getMapBandwidth(), DistCpConstants.DEFAULT_BANDWIDTH_MB, DELTA);
options = OptionsParser.parse(new String[] {
"-bandwidth",
- "11",
+ "11.2",
"hdfs://localhost:8020/source/first",
"hdfs://localhost:8020/target/"});
- Assert.assertEquals(options.getMapBandwidth(), 11);
+ Assert.assertEquals(options.getMapBandwidth(), 11.2, DELTA);
}
@Test(expected=IllegalArgumentException.class)
@@ -585,8 +587,8 @@ public class TestOptionsParser {
options.appendToConf(conf);
Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false));
Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.ATOMIC_COMMIT.getConfigLabel(), false));
- Assert.assertEquals(conf.getInt(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1),
- DistCpConstants.DEFAULT_BANDWIDTH_MB);
+ Assert.assertEquals(conf.getFloat(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1),
+ DistCpConstants.DEFAULT_BANDWIDTH_MB, DELTA);
conf = new Configuration();
Assert.assertFalse(conf.getBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false));
@@ -597,14 +599,14 @@ public class TestOptionsParser {
"-delete",
"-pu",
"-bandwidth",
- "11",
+ "11.2",
"hdfs://localhost:8020/source/first",
"hdfs://localhost:8020/target/"});
options.appendToConf(conf);
Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false));
Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.DELETE_MISSING.getConfigLabel(), false));
Assert.assertEquals(conf.get(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel()), "U");
- Assert.assertEquals(conf.getInt(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1), 11);
+ Assert.assertEquals(conf.getFloat(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1), 11.2, DELTA);
}
@Test