You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2018/01/18 23:48:03 UTC
[32/49] hadoop git commit: MAPREDUCE-7032. Add the ability to specify
a delayed replication count (miklos.szegedi@cloudera.com via rkanter)
MAPREDUCE-7032. Add the ability to specify a delayed replication count (miklos.szegedi@cloudera.com via rkanter)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d716084f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d716084f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d716084f
Branch: refs/heads/YARN-7402
Commit: d716084f4503bf826ef10424d7025ea1ff4ee104
Parents: 5ac1099
Author: Robert Kanter <rk...@apache.org>
Authored: Tue Jan 16 10:45:45 2018 -0800
Committer: Robert Kanter <rk...@apache.org>
Committed: Tue Jan 16 10:45:45 2018 -0800
----------------------------------------------------------------------
.../mapred/uploader/FrameworkUploader.java | 124 +++++++++++++++++--
.../mapred/uploader/TestFrameworkUploader.java | 21 +++-
2 files changed, 128 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d716084f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/main/java/org/apache/hadoop/mapred/uploader/FrameworkUploader.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/main/java/org/apache/hadoop/mapred/uploader/FrameworkUploader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/main/java/org/apache/hadoop/mapred/uploader/FrameworkUploader.java
index 899689d..ee482d7 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/main/java/org/apache/hadoop/mapred/uploader/FrameworkUploader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/main/java/org/apache/hadoop/mapred/uploader/FrameworkUploader.java
@@ -25,6 +25,8 @@ import org.apache.commons.cli.Options;
import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -43,6 +45,8 @@ import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.NotLinkException;
import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
@@ -73,7 +77,15 @@ public class FrameworkUploader implements Runnable {
@VisibleForTesting
String target = null;
@VisibleForTesting
- short replication = 10;
+ Path targetPath = null;
+ @VisibleForTesting
+ short initialReplication = 3;
+ @VisibleForTesting
+ short finalReplication = 10;
+ @VisibleForTesting
+ short acceptableReplication = 9;
+ @VisibleForTesting
+ int timeout = 10;
private boolean ignoreSymlink = false;
@VisibleForTesting
@@ -101,9 +113,10 @@ public class FrameworkUploader implements Runnable {
LOG.info(
"Suggested mapreduce.application.classpath $PWD/" + alias + "/*");
System.out.println("Suggested classpath $PWD/" + alias + "/*");
- } catch (UploaderException|IOException e) {
+ } catch (UploaderException|IOException|InterruptedException e) {
LOG.error("Error in execution " + e.getMessage());
e.printStackTrace();
+ throw new RuntimeException(e);
}
}
@@ -147,7 +160,7 @@ public class FrameworkUploader implements Runnable {
if (targetStream == null) {
validateTargetPath();
int lastIndex = target.indexOf('#');
- Path targetPath =
+ targetPath =
new Path(
target.substring(
0, lastIndex == -1 ? target.length() : lastIndex));
@@ -160,7 +173,7 @@ public class FrameworkUploader implements Runnable {
targetStream = null;
if (fileSystem instanceof DistributedFileSystem) {
LOG.info("Set replication to " +
- replication + " for path: " + targetPath);
+ initialReplication + " for path: " + targetPath);
LOG.info("Disabling Erasure Coding for path: " + targetPath);
DistributedFileSystem dfs = (DistributedFileSystem)fileSystem;
DistributedFileSystem.HdfsDataOutputStreamBuilder builder =
@@ -168,13 +181,13 @@ public class FrameworkUploader implements Runnable {
.overwrite(true)
.ecPolicyName(
SystemErasureCodingPolicies.getReplicationPolicy().getName());
- if (replication > 0) {
- builder.replication(replication);
+ if (initialReplication > 0) {
+ builder.replication(initialReplication);
}
targetStream = builder.build();
} else {
LOG.warn("Cannot set replication to " +
- replication + " for path: " + targetPath +
+ initialReplication + " for path: " + targetPath +
" on a non-distributed fileystem " +
fileSystem.getClass().getName());
}
@@ -190,8 +203,70 @@ public class FrameworkUploader implements Runnable {
}
}
+ private long getSmallestReplicatedBlockCount()
+ throws IOException {
+ FileSystem fileSystem = targetPath.getFileSystem(new Configuration());
+ FileStatus status = fileSystem.getFileStatus(targetPath);
+ long length = status.getLen();
+ HashMap<Long, Integer> blockCount = new HashMap<>();
+
+ // Start with 0s for each offset
+ for (long offset = 0; offset < length; offset +=status.getBlockSize()) {
+ blockCount.put(offset, 0);
+ }
+
+ // Count blocks
+ BlockLocation[] locations = fileSystem.getFileBlockLocations(
+ targetPath, 0, length);
+ for(BlockLocation location: locations) {
+ final int replicas = location.getHosts().length;
+ blockCount.compute(
+ location.getOffset(), (key, value) -> value + replicas);
+ }
+
+ // Print out the results
+ for (long offset = 0; offset < length; offset +=status.getBlockSize()) {
+ LOG.info(String.format(
+ "Replication counts offset:%d blocks:%d",
+ offset, blockCount.get(offset)));
+ }
+
+ return Collections.min(blockCount.values());
+ }
+
+ private void endUpload()
+ throws IOException, InterruptedException {
+ FileSystem fileSystem = targetPath.getFileSystem(new Configuration());
+ if (fileSystem instanceof DistributedFileSystem) {
+ fileSystem.setReplication(targetPath, finalReplication);
+ LOG.info("Set replication to " +
+ finalReplication + " for path: " + targetPath);
+ long startTime = System.currentTimeMillis();
+ long endTime = startTime;
+ long currentReplication = 0;
+ while(endTime - startTime < timeout * 1000 &&
+ currentReplication < acceptableReplication) {
+ Thread.sleep(1000);
+ endTime = System.currentTimeMillis();
+ currentReplication = getSmallestReplicatedBlockCount();
+ }
+ if (endTime - startTime >= timeout * 1000) {
+ LOG.error(String.format(
+ "Timed out after %d seconds while waiting for acceptable" +
+ " replication of %d (current replication is %d)",
+ timeout, acceptableReplication, currentReplication));
+ }
+ } else {
+ LOG.info("Cannot set replication to " +
+ finalReplication + " for path: " + targetPath +
+ " on a non-distributed fileystem " +
+ fileSystem.getClass().getName());
+ }
+ }
+
@VisibleForTesting
- void buildPackage() throws IOException, UploaderException {
+ void buildPackage()
+ throws IOException, UploaderException, InterruptedException {
beginUpload();
LOG.info("Compressing tarball");
try (TarArchiveOutputStream out = new TarArchiveOutputStream(
@@ -206,6 +281,7 @@ public class FrameworkUploader implements Runnable {
out.closeArchiveEntry();
}
}
+ endUpload();
} finally {
if (targetStream != null) {
targetStream.close();
@@ -378,8 +454,21 @@ public class FrameworkUploader implements Runnable {
.hasArg().create("target"));
opts.addOption(OptionBuilder
.withDescription(
- "Desired replication count")
- .hasArg().create("replication"));
+ "Desired initial replication count. Default 3.")
+ .hasArg().create("initialReplication"));
+ opts.addOption(OptionBuilder
+ .withDescription(
+ "Desired final replication count. Default 10.")
+ .hasArg().create("finalReplication"));
+ opts.addOption(OptionBuilder
+ .withDescription(
+ "Desired acceptable replication count. Default 9.")
+ .hasArg().create("acceptableReplication"));
+ opts.addOption(OptionBuilder
+ .withDescription(
+ "Desired timeout for the acceptable" +
+ " replication in seconds. Default 10")
+ .hasArg().create("timeout"));
opts.addOption(OptionBuilder
.withDescription("Ignore symlinks into the same directory")
.create("nosymlink"));
@@ -395,8 +484,19 @@ public class FrameworkUploader implements Runnable {
"whitelist", DefaultJars.DEFAULT_MR_JARS);
blacklist = parser.getCommandLine().getOptionValue(
"blacklist", DefaultJars.DEFAULT_EXCLUDED_MR_JARS);
- replication = Short.parseShort(parser.getCommandLine().getOptionValue(
- "replication", "10"));
+ initialReplication =
+ Short.parseShort(parser.getCommandLine().getOptionValue(
+ "initialReplication", "3"));
+ finalReplication =
+ Short.parseShort(parser.getCommandLine().getOptionValue(
+ "finalReplication", "10"));
+ acceptableReplication =
+ Short.parseShort(
+ parser.getCommandLine().getOptionValue(
+ "acceptableReplication", "9"));
+ timeout =
+ Integer.parseInt(
+ parser.getCommandLine().getOptionValue("timeout", "10"));
if (parser.getCommandLine().hasOption("nosymlink")) {
ignoreSymlink = true;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d716084f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/test/java/org/apache/hadoop/mapred/uploader/TestFrameworkUploader.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/test/java/org/apache/hadoop/mapred/uploader/TestFrameworkUploader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/test/java/org/apache/hadoop/mapred/uploader/TestFrameworkUploader.java
index ef64bfe..61c0b12 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/test/java/org/apache/hadoop/mapred/uploader/TestFrameworkUploader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/test/java/org/apache/hadoop/mapred/uploader/TestFrameworkUploader.java
@@ -104,7 +104,10 @@ public class TestFrameworkUploader {
"-blacklist", "C",
"-fs", "hdfs://C:8020",
"-target", "D",
- "-replication", "100"};
+ "-initialReplication", "100",
+ "-acceptableReplication", "120",
+ "-finalReplication", "140",
+ "-timeout", "10"};
FrameworkUploader uploader = new FrameworkUploader();
boolean success = uploader.parseArguments(args);
Assert.assertTrue("Expected to print help", success);
@@ -116,8 +119,14 @@ public class TestFrameworkUploader {
uploader.blacklist);
Assert.assertEquals("Target mismatch", "hdfs://C:8020/D",
uploader.target);
- Assert.assertEquals("Replication mismatch", 100,
- uploader.replication);
+ Assert.assertEquals("Initial replication mismatch", 100,
+ uploader.initialReplication);
+ Assert.assertEquals("Acceptable replication mismatch", 120,
+ uploader.acceptableReplication);
+ Assert.assertEquals("Final replication mismatch", 140,
+ uploader.finalReplication);
+ Assert.assertEquals("Timeout mismatch", 10,
+ uploader.timeout);
}
/**
@@ -176,7 +185,8 @@ public class TestFrameworkUploader {
* Test building a tarball from source jars.
*/
@Test
- public void testBuildTarBall() throws IOException, UploaderException {
+ public void testBuildTarBall()
+ throws IOException, UploaderException, InterruptedException {
String[] testFiles = {"upload.tar", "upload.tar.gz"};
for (String testFile: testFiles) {
File parent = new File(testDir);
@@ -232,7 +242,8 @@ public class TestFrameworkUploader {
* Test upload to HDFS.
*/
@Test
- public void testUpload() throws IOException, UploaderException {
+ public void testUpload()
+ throws IOException, UploaderException, InterruptedException {
final String fileName = "/upload.tar.gz";
File parent = new File(testDir);
try {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org