You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sz...@apache.org on 2010/04/02 18:42:52 UTC

svn commit: r930318 - in /hadoop/mapreduce/trunk: CHANGES.txt src/test/mapred/org/apache/hadoop/tools/TestHarFileSystem.java src/tools/org/apache/hadoop/tools/HadoopArchives.java

Author: szetszwo
Date: Fri Apr  2 16:42:52 2010
New Revision: 930318

URL: http://svn.apache.org/viewvc?rev=930318&view=rev
Log:
MAPREDUCE-1428.  Make block size and the size of archive created files configurable.  Contributed by mahadev

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/TestHarFileSystem.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/HadoopArchives.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=930318&r1=930317&r2=930318&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Apr  2 16:42:52 2010
@@ -238,6 +238,9 @@ Trunk (unreleased changes)
     MAPREDUCE-1514.  Add documentation on replication, permissions, new options,
     limitations and internals of har.  (mahadev via szetszwo)
 
+    MAPREDUCE-1428.  Make block size and the size of archive created files
+    configurable.  (mahadev via szetszwo)
+
   OPTIMIZATIONS
 
     MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/TestHarFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/TestHarFileSystem.java?rev=930318&r1=930317&r2=930318&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/TestHarFileSystem.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/TestHarFileSystem.java Fri Apr  2 16:42:52 2010
@@ -112,36 +112,9 @@ public class TestHarFileSystem extends T
       }
     }
   }
-  
-  // test archives with a -p option
-  public void testRelativeArchives() throws Exception {
-    fs.delete(archivePath,true);
-    Configuration conf = mapred.createJobConf();
-    HadoopArchives har = new HadoopArchives(conf);
-    String[] args = new String[6];
-    args[0] = "-archiveName";
-    args[1] = "foo.har";
-    args[2] = "-p";
-    args[3] =  fs.getHomeDirectory().toString();
-    args[4] = "test";
-    args[5] = archivePath.toString();
-    int ret = ToolRunner.run(har, args);
-    assertTrue("failed test", ret == 0);
-    Path finalPath = new Path(archivePath, "foo.har");
-    Path fsPath = new Path(inputPath.toUri().getPath());
-    Path filePath = new Path(finalPath, "test");
-    //make it a har path 
-    Path harPath = new Path("har://" + filePath.toUri().getPath());
-    assertTrue(fs.exists(new Path(finalPath, "_index")));
-    assertTrue(fs.exists(new Path(finalPath, "_masterindex")));
-    assertTrue(!fs.exists(new Path(finalPath, "_logs")));
-    args = new String[2];
-    args[0] = "-ls";
-    args[1] = harPath.toString();
-    FsShell shell = new FsShell(conf);
-    ret = ToolRunner.run(shell, args);
-    // fileb and filec
-    assertTrue(ret == 0);
+
+  /* check bytes in the har output files */
+  private void  checkBytes(Path harPath, Configuration conf) throws IOException {
     Path harFilea = new Path(harPath, "a");
     Path harFileb = new Path(harPath, "b");
     Path harFilec = new Path(harPath, "c");
@@ -160,7 +133,95 @@ public class TestHarFileSystem extends T
     fin.close();
     assertTrue("strings are equal ", (b[0] == "c".getBytes()[0]));
   }
+
+  /**
+   * check if the block size of the part files is what we had specified
+   */
+  private void checkBlockSize(FileSystem fs, Path finalPath, long blockSize) throws IOException {
+    FileStatus[] statuses = fs.globStatus(new Path(finalPath, "part-*"));
+    for (FileStatus status: statuses) {
+      assertTrue(status.getBlockSize() == blockSize);
+    }
+  }
   
+  // test archives with a -p option
+  public void testRelativeArchives() throws Exception {
+    fs.delete(archivePath, true);
+    Configuration conf = mapred.createJobConf();
+    HadoopArchives har = new HadoopArchives(conf);
+
+    {
+      String[] args = new String[6];
+      args[0] = "-archiveName";
+      args[1] = "foo1.har";
+      args[2] = "-p";
+      args[3] = fs.getHomeDirectory().toString();
+      args[4] = "test";
+      args[5] = archivePath.toString();
+      int ret = ToolRunner.run(har, args);
+      assertTrue("failed test", ret == 0);
+      Path finalPath = new Path(archivePath, "foo1.har");
+      Path fsPath = new Path(inputPath.toUri().getPath());
+      Path filePath = new Path(finalPath, "test");
+      // make it a har path
+      Path harPath = new Path("har://" + filePath.toUri().getPath());
+      assertTrue(fs.exists(new Path(finalPath, "_index")));
+      assertTrue(fs.exists(new Path(finalPath, "_masterindex")));
+      /*check for existence of only 1 part file, since part file size == 2GB */
+      assertTrue(fs.exists(new Path(finalPath, "part-0")));
+      assertTrue(!fs.exists(new Path(finalPath, "part-1")));
+      assertTrue(!fs.exists(new Path(finalPath, "part-2")));
+      assertTrue(!fs.exists(new Path(finalPath, "_logs")));
+      FileStatus[] statuses = fs.listStatus(finalPath);
+      args = new String[2];
+      args[0] = "-ls";
+      args[1] = harPath.toString();
+      FsShell shell = new FsShell(conf);
+      ret = ToolRunner.run(shell, args);
+      // fileb and filec
+      assertTrue(ret == 0);
+      checkBytes(harPath, conf);
+      /* check block size for path files */
+      checkBlockSize(fs, finalPath, 512 * 1024 * 1024l);
+    }
+    
+    /** now try with different block size and part file size **/
+    {
+      String[] args = new String[8];
+      args[0] = "-Dhar.block.size=512";
+      args[1] = "-Dhar.partfile.size=1";
+      args[2] = "-archiveName";
+      args[3] = "foo.har";
+      args[4] = "-p";
+      args[5] = fs.getHomeDirectory().toString();
+      args[6] = "test";
+      args[7] = archivePath.toString();
+      int ret = ToolRunner.run(har, args);
+      assertTrue("failed test", ret == 0);
+      Path finalPath = new Path(archivePath, "foo.har");
+      Path fsPath = new Path(inputPath.toUri().getPath());
+      Path filePath = new Path(finalPath, "test");
+      // make it a har path
+      Path harPath = new Path("har://" + filePath.toUri().getPath());
+      assertTrue(fs.exists(new Path(finalPath, "_index")));
+      assertTrue(fs.exists(new Path(finalPath, "_masterindex")));
+      /*check for existence of 3 part files, since part file size == 1 */
+      assertTrue(fs.exists(new Path(finalPath, "part-0")));
+      assertTrue(fs.exists(new Path(finalPath, "part-1")));
+      assertTrue(fs.exists(new Path(finalPath, "part-2")));
+      assertTrue(!fs.exists(new Path(finalPath, "_logs")));
+      FileStatus[] statuses = fs.listStatus(finalPath);
+      args = new String[2];
+      args[0] = "-ls";
+      args[1] = harPath.toString();
+      FsShell shell = new FsShell(conf);
+      ret = ToolRunner.run(shell, args);
+      // fileb and filec
+      assertTrue(ret == 0);
+      checkBytes(harPath, conf);
+      checkBlockSize(fs, finalPath, 512);
+    }
+  }
  
   public void testArchivesWithMapred() throws Exception {
     fs.delete(archivePath, true);

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/HadoopArchives.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/HadoopArchives.java?rev=930318&r1=930317&r2=930318&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/HadoopArchives.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/HadoopArchives.java Fri Apr  2 16:42:52 2010
@@ -86,7 +86,10 @@ public class HadoopArchives implements T
   static final String TOTAL_SIZE_LABEL = NAME + ".total.size";
   static final String DST_HAR_LABEL = NAME + ".archive.name";
   static final String SRC_PARENT_LABEL = NAME + ".parent.path";
-
+  /** the size of the blocks that will be created when archiving **/
+  static final String HAR_BLOCKSIZE_LABEL = NAME + ".block.size";
+  /**the size of the part files that will be created when archiving **/
+  static final String HAR_PARTSIZE_LABEL = NAME + ".partfile.size";
   static final String SPACE_REPLACE_LABEL = NAME + ".space.replace.enable";
   static final boolean SPACE_REPLACE_DEFAULT = false;
   static final String SPACE_REPLACEMENT_LABEL = NAME + ".space.replacement";
@@ -104,9 +107,10 @@ public class HadoopArchives implements T
     + " by the space replacement option."
     + "  The resulted har contains only the replaced paths.";
 
-  // size of each part file
-  // its fixed for now.
-  static final long partSize = 2 * 1024 * 1024 * 1024l;
+  /** size of each part file size **/
+  long partSize = 2 * 1024 * 1024 * 1024l;
+  /** size of blocks in hadoop archives **/
+  long blockSize = 512 * 1024 * 1024l;
 
   private static final String usage = "archive"
   + " -archiveName NAME -p <parent path> <src>* <dest>" +
@@ -487,6 +491,10 @@ public class HadoopArchives implements T
     int numFiles = 0;
     long totalSize = 0;
     FileSystem fs = parentPath.getFileSystem(conf);
+    this.blockSize = conf.getLong(HAR_BLOCKSIZE_LABEL, blockSize);
+    this.partSize = conf.getLong(HAR_PARTSIZE_LABEL, partSize);
+    conf.setLong(HAR_BLOCKSIZE_LABEL, blockSize);
+    conf.setLong(HAR_PARTSIZE_LABEL, partSize);
     conf.set(DST_HAR_LABEL, archiveName);
     conf.set(SRC_PARENT_LABEL, parentPath.makeQualified(fs).toString());
     Path outputPath = new Path(dest, archiveName);
@@ -599,7 +607,8 @@ public class HadoopArchives implements T
     FileSystem destFs = null;
     byte[] buffer;
     int buf_size = 128 * 1024;
-    
+    long blockSize = 512 * 1024 * 1024l;
+
     // configure the mapper and create 
     // the part file.
     // use map reduce framework to write into
@@ -616,6 +625,7 @@ public class HadoopArchives implements T
       // create a file name using the partition
       // we need to write to this directory
       tmpOutputDir = FileOutputFormat.getWorkOutputPath(conf);
+      blockSize = conf.getLong(HAR_BLOCKSIZE_LABEL, blockSize);
       // get the output path and write to the tmp 
       // directory 
       partname = "part-" + partId;
@@ -631,10 +641,11 @@ public class HadoopArchives implements T
         //this was a stale copy
         if (destFs.exists(tmpOutput)) {
           destFs.delete(tmpOutput, false);
-        }
-        partStream = destFs.create(tmpOutput);
+        } 
+        partStream = destFs.create(tmpOutput, false, conf.getInt("io.file.buffer.size", 4096), 
+            destFs.getDefaultReplication(), blockSize);
       } catch(IOException ie) {
-        throw new RuntimeException("Unable to open output file " + tmpOutput);
+        throw new RuntimeException("Unable to open output file " + tmpOutput, ie);
       }
       buffer = new byte[buf_size];
     }