You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sz...@apache.org on 2010/04/02 18:42:52 UTC
svn commit: r930318 - in /hadoop/mapreduce/trunk: CHANGES.txt
src/test/mapred/org/apache/hadoop/tools/TestHarFileSystem.java
src/tools/org/apache/hadoop/tools/HadoopArchives.java
Author: szetszwo
Date: Fri Apr 2 16:42:52 2010
New Revision: 930318
URL: http://svn.apache.org/viewvc?rev=930318&view=rev
Log:
MAPREDUCE-1428. Make block size and the size of archive created files configurable. Contributed by mahadev
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/TestHarFileSystem.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/HadoopArchives.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=930318&r1=930317&r2=930318&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Apr 2 16:42:52 2010
@@ -238,6 +238,9 @@ Trunk (unreleased changes)
MAPREDUCE-1514. Add documentation on replication, permissions, new options,
limitations and internals of har. (mahadev via szetszwo)
+ MAPREDUCE-1428. Make block size and the size of archive created files
+ configurable. (mahadev via szetszwo)
+
OPTIMIZATIONS
MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/TestHarFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/TestHarFileSystem.java?rev=930318&r1=930317&r2=930318&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/TestHarFileSystem.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/TestHarFileSystem.java Fri Apr 2 16:42:52 2010
@@ -112,36 +112,9 @@ public class TestHarFileSystem extends T
}
}
}
-
- // test archives with a -p option
- public void testRelativeArchives() throws Exception {
- fs.delete(archivePath,true);
- Configuration conf = mapred.createJobConf();
- HadoopArchives har = new HadoopArchives(conf);
- String[] args = new String[6];
- args[0] = "-archiveName";
- args[1] = "foo.har";
- args[2] = "-p";
- args[3] = fs.getHomeDirectory().toString();
- args[4] = "test";
- args[5] = archivePath.toString();
- int ret = ToolRunner.run(har, args);
- assertTrue("failed test", ret == 0);
- Path finalPath = new Path(archivePath, "foo.har");
- Path fsPath = new Path(inputPath.toUri().getPath());
- Path filePath = new Path(finalPath, "test");
- //make it a har path
- Path harPath = new Path("har://" + filePath.toUri().getPath());
- assertTrue(fs.exists(new Path(finalPath, "_index")));
- assertTrue(fs.exists(new Path(finalPath, "_masterindex")));
- assertTrue(!fs.exists(new Path(finalPath, "_logs")));
- args = new String[2];
- args[0] = "-ls";
- args[1] = harPath.toString();
- FsShell shell = new FsShell(conf);
- ret = ToolRunner.run(shell, args);
- // fileb and filec
- assertTrue(ret == 0);
+
+ /* check bytes in the har output files */
+ private void checkBytes(Path harPath, Configuration conf) throws IOException {
Path harFilea = new Path(harPath, "a");
Path harFileb = new Path(harPath, "b");
Path harFilec = new Path(harPath, "c");
@@ -160,7 +133,95 @@ public class TestHarFileSystem extends T
fin.close();
assertTrue("strings are equal ", (b[0] == "c".getBytes()[0]));
}
+
+ /**
+ * check if the block size of the part files is what we had specified
+ */
+ private void checkBlockSize(FileSystem fs, Path finalPath, long blockSize) throws IOException {
+ FileStatus[] statuses = fs.globStatus(new Path(finalPath, "part-*"));
+ for (FileStatus status: statuses) {
+ assertTrue(status.getBlockSize() == blockSize);
+ }
+ }
+ // test archives with a -p option
+ public void testRelativeArchives() throws Exception {
+ fs.delete(archivePath, true);
+ Configuration conf = mapred.createJobConf();
+ HadoopArchives har = new HadoopArchives(conf);
+
+ {
+ String[] args = new String[6];
+ args[0] = "-archiveName";
+ args[1] = "foo1.har";
+ args[2] = "-p";
+ args[3] = fs.getHomeDirectory().toString();
+ args[4] = "test";
+ args[5] = archivePath.toString();
+ int ret = ToolRunner.run(har, args);
+ assertTrue("failed test", ret == 0);
+ Path finalPath = new Path(archivePath, "foo1.har");
+ Path fsPath = new Path(inputPath.toUri().getPath());
+ Path filePath = new Path(finalPath, "test");
+ // make it a har path
+ Path harPath = new Path("har://" + filePath.toUri().getPath());
+ assertTrue(fs.exists(new Path(finalPath, "_index")));
+ assertTrue(fs.exists(new Path(finalPath, "_masterindex")));
+ /*check for existence of only 1 part file, since part file size == 2GB */
+ assertTrue(fs.exists(new Path(finalPath, "part-0")));
+ assertTrue(!fs.exists(new Path(finalPath, "part-1")));
+ assertTrue(!fs.exists(new Path(finalPath, "part-2")));
+ assertTrue(!fs.exists(new Path(finalPath, "_logs")));
+ FileStatus[] statuses = fs.listStatus(finalPath);
+ args = new String[2];
+ args[0] = "-ls";
+ args[1] = harPath.toString();
+ FsShell shell = new FsShell(conf);
+ ret = ToolRunner.run(shell, args);
+ // fileb and filec
+ assertTrue(ret == 0);
+ checkBytes(harPath, conf);
+ /* check block size for path files */
+ checkBlockSize(fs, finalPath, 512 * 1024 * 1024l);
+ }
+
+ /** now try with different block size and part file size **/
+ {
+ String[] args = new String[8];
+ args[0] = "-Dhar.block.size=512";
+ args[1] = "-Dhar.partfile.size=1";
+ args[2] = "-archiveName";
+ args[3] = "foo.har";
+ args[4] = "-p";
+ args[5] = fs.getHomeDirectory().toString();
+ args[6] = "test";
+ args[7] = archivePath.toString();
+ int ret = ToolRunner.run(har, args);
+ assertTrue("failed test", ret == 0);
+ Path finalPath = new Path(archivePath, "foo.har");
+ Path fsPath = new Path(inputPath.toUri().getPath());
+ Path filePath = new Path(finalPath, "test");
+ // make it a har path
+ Path harPath = new Path("har://" + filePath.toUri().getPath());
+ assertTrue(fs.exists(new Path(finalPath, "_index")));
+ assertTrue(fs.exists(new Path(finalPath, "_masterindex")));
+ /*check for existence of 3 part files, since part file size == 1 */
+ assertTrue(fs.exists(new Path(finalPath, "part-0")));
+ assertTrue(fs.exists(new Path(finalPath, "part-1")));
+ assertTrue(fs.exists(new Path(finalPath, "part-2")));
+ assertTrue(!fs.exists(new Path(finalPath, "_logs")));
+ FileStatus[] statuses = fs.listStatus(finalPath);
+ args = new String[2];
+ args[0] = "-ls";
+ args[1] = harPath.toString();
+ FsShell shell = new FsShell(conf);
+ ret = ToolRunner.run(shell, args);
+ // fileb and filec
+ assertTrue(ret == 0);
+ checkBytes(harPath, conf);
+ checkBlockSize(fs, finalPath, 512);
+ }
+ }
public void testArchivesWithMapred() throws Exception {
fs.delete(archivePath, true);
Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/HadoopArchives.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/HadoopArchives.java?rev=930318&r1=930317&r2=930318&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/HadoopArchives.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/HadoopArchives.java Fri Apr 2 16:42:52 2010
@@ -86,7 +86,10 @@ public class HadoopArchives implements T
static final String TOTAL_SIZE_LABEL = NAME + ".total.size";
static final String DST_HAR_LABEL = NAME + ".archive.name";
static final String SRC_PARENT_LABEL = NAME + ".parent.path";
-
+ /** the size of the blocks that will be created when archiving **/
+ static final String HAR_BLOCKSIZE_LABEL = NAME + ".block.size";
+ /**the size of the part files that will be created when archiving **/
+ static final String HAR_PARTSIZE_LABEL = NAME + ".partfile.size";
static final String SPACE_REPLACE_LABEL = NAME + ".space.replace.enable";
static final boolean SPACE_REPLACE_DEFAULT = false;
static final String SPACE_REPLACEMENT_LABEL = NAME + ".space.replacement";
@@ -104,9 +107,10 @@ public class HadoopArchives implements T
+ " by the space replacement option."
+ " The resulted har contains only the replaced paths.";
- // size of each part file
- // its fixed for now.
- static final long partSize = 2 * 1024 * 1024 * 1024l;
+ /** size of each part file size **/
+ long partSize = 2 * 1024 * 1024 * 1024l;
+ /** size of blocks in hadoop archives **/
+ long blockSize = 512 * 1024 * 1024l;
private static final String usage = "archive"
+ " -archiveName NAME -p <parent path> <src>* <dest>" +
@@ -487,6 +491,10 @@ public class HadoopArchives implements T
int numFiles = 0;
long totalSize = 0;
FileSystem fs = parentPath.getFileSystem(conf);
+ this.blockSize = conf.getLong(HAR_BLOCKSIZE_LABEL, blockSize);
+ this.partSize = conf.getLong(HAR_PARTSIZE_LABEL, partSize);
+ conf.setLong(HAR_BLOCKSIZE_LABEL, blockSize);
+ conf.setLong(HAR_PARTSIZE_LABEL, partSize);
conf.set(DST_HAR_LABEL, archiveName);
conf.set(SRC_PARENT_LABEL, parentPath.makeQualified(fs).toString());
Path outputPath = new Path(dest, archiveName);
@@ -599,7 +607,8 @@ public class HadoopArchives implements T
FileSystem destFs = null;
byte[] buffer;
int buf_size = 128 * 1024;
-
+ long blockSize = 512 * 1024 * 1024l;
+
// configure the mapper and create
// the part file.
// use map reduce framework to write into
@@ -616,6 +625,7 @@ public class HadoopArchives implements T
// create a file name using the partition
// we need to write to this directory
tmpOutputDir = FileOutputFormat.getWorkOutputPath(conf);
+ blockSize = conf.getLong(HAR_BLOCKSIZE_LABEL, blockSize);
// get the output path and write to the tmp
// directory
partname = "part-" + partId;
@@ -631,10 +641,11 @@ public class HadoopArchives implements T
//this was a stale copy
if (destFs.exists(tmpOutput)) {
destFs.delete(tmpOutput, false);
- }
- partStream = destFs.create(tmpOutput);
+ }
+ partStream = destFs.create(tmpOutput, false, conf.getInt("io.file.buffer.size", 4096),
+ destFs.getDefaultReplication(), blockSize);
} catch(IOException ie) {
- throw new RuntimeException("Unable to open output file " + tmpOutput);
+ throw new RuntimeException("Unable to open output file " + tmpOutput, ie);
}
buffer = new byte[buf_size];
}