You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/06/05 09:27:13 UTC

svn commit: r663487 - in /hadoop/core/trunk: ./ bin/ conf/ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/util/ src/test/org/apache/hadoop/fs/

Author: ddas
Date: Thu Jun  5 00:27:13 2008
New Revision: 663487

URL: http://svn.apache.org/viewvc?rev=663487&view=rev
Log:
HADOOP-3307. Support for Archives in Hadoop. Contributed by Mahadev Konar.

Added:
    hadoop/core/trunk/src/java/org/apache/hadoop/fs/HarFileSystem.java
    hadoop/core/trunk/src/java/org/apache/hadoop/util/HadoopArchives.java
    hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestHarFileSystem.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/bin/hadoop
    hadoop/core/trunk/conf/hadoop-default.xml
    hadoop/core/trunk/src/java/org/apache/hadoop/fs/FilterFileSystem.java
    hadoop/core/trunk/src/java/org/apache/hadoop/util/CopyFiles.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=663487&r1=663486&r2=663487&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Jun  5 00:27:13 2008
@@ -141,6 +141,8 @@
 
     HADOOP-3187. Quotas for namespace management. (Hairong Kuang via ddas)
 
+    HADOOP-3307. Support for Archives in Hadoop. (Mahadev Konar via ddas)
+
   IMPROVEMENTS
    
     HADOOP-2928. Remove deprecated FileSystem.getContentLength().

Modified: hadoop/core/trunk/bin/hadoop
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/bin/hadoop?rev=663487&r1=663486&r2=663487&view=diff
==============================================================================
--- hadoop/core/trunk/bin/hadoop (original)
+++ hadoop/core/trunk/bin/hadoop Thu Jun  5 00:27:13 2008
@@ -233,6 +233,9 @@
 elif [ "$COMMAND" = "daemonlog" ] ; then
   CLASS=org.apache.hadoop.log.LogLevel
   HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
+elif [ "$COMMAND" = "archive" ] ; then
+  CLASS=org.apache.hadoop.util.HadoopArchives
+  HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
 else
   CLASS=$COMMAND
 fi

Modified: hadoop/core/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/hadoop-default.xml?rev=663487&r1=663486&r2=663487&view=diff
==============================================================================
--- hadoop/core/trunk/conf/hadoop-default.xml (original)
+++ hadoop/core/trunk/conf/hadoop-default.xml Thu Jun  5 00:27:13 2008
@@ -210,6 +210,12 @@
 </property>
 
 <property>
+  <name>fs.har.impl</name>
+  <value>org.apache.hadoop.fs.HarFileSystem</value>
+  <description>The filesystem for Hadoop archives. </description>
+</property>
+
+<property>
   <name>fs.inmemory.size.mb</name>
   <value>75</value>
   <description>The size of the in-memory filsystem instance in MB</description>

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/fs/FilterFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/FilterFileSystem.java?rev=663487&r1=663486&r2=663487&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/fs/FilterFileSystem.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/FilterFileSystem.java Thu Jun  5 00:27:13 2008
@@ -44,6 +44,12 @@
   
   protected FileSystem fs;
   
+  /*
+   * so that extending classes can define it
+   */
+  public FilterFileSystem() {
+  }
+  
   public FilterFileSystem(FileSystem fs) {
     this.fs = fs;
   }

Added: hadoop/core/trunk/src/java/org/apache/hadoop/fs/HarFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/HarFileSystem.java?rev=663487&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/fs/HarFileSystem.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/HarFileSystem.java Thu Jun  5 00:27:13 2008
@@ -0,0 +1,874 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.LineRecordReader;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * This is an implementation of the Hadoop Archive 
+ * Filesystem. This archive Filesystem has index files
+ * of the form _index* and has contents of the form
+ * part-*. The index files store the indexes of the 
+ * real files. The index files are of the form _masterindex
+ * and _index. The master index is a level of indirection 
+ * in to the index file to make the look ups faster. the index
+ * file is sorted with hash code of the paths that it contains 
+ * and the master index contains pointers to the positions in 
+ * index for ranges of hashcodes.
+ */
+
+public class HarFileSystem extends FilterFileSystem {
+  public static final int VERSION = 1;
+  // uri representation of this Har filesystem
+  private URI uri;
+  // the version of this har filesystem
+  private int version;
+  // underlying uri 
+  private URI underLyingURI;
+  // the top level path of the archive
+  // in the underlying file system
+  private Path archivePath;
+  // the masterIndex of the archive
+  private Path masterIndex;
+  // the index file 
+  private Path archiveIndex;
+  // the har auth
+  private String harAuth;
+  
+  /**
+   * public construction of harfilesystem
+   *
+   */
+  public HarFileSystem() {
+  }
+  
+  /**
+   * Constructor to create a HarFileSystem with an
+   * underlying filesystem.
+   * @param fs
+   */
+  public HarFileSystem(FileSystem fs) {
+    super(fs);
+  }
+  
+  /**
+   * Initialize a Har filesystem per har archive. The 
+   * archive home directory is the top level directory
+   * in the filesystem that contains the HAR archive.
+   * Be careful with this method, you do not want to go 
+   * on creating new Filesystem instances per call to 
+   * path.getFileSystem().
+   * the uri of Har is 
+   * har://underlyingfsscheme-host:port/archivepath.
+   * or 
+   * har:///archivepath. This assumes the underlying filesystem
+   * to be used in case not specified.
+   */
+  public void initialize(URI name, Configuration conf) throws IOException {
+    //decode the name
+    underLyingURI = decodeHarURI(name, conf);
+    //  we got the right har Path- now check if this is 
+    //truly a har filesystem
+    Path harPath = archivePath(new Path(name.toString()));
+    if (harPath == null) { 
+      throw new IOException("Invalid path for the Har Filesystem. " + 
+                           name.toString());
+    }
+    if (fs == null) {
+      fs = FileSystem.get(underLyingURI, conf);
+    }
+    this.uri = harPath.toUri();
+    this.archivePath = new Path(this.uri.getPath());
+    this.harAuth = getHarAuth(this.underLyingURI);
+    //check for the underlying fs containing
+    // the index file
+    this.masterIndex = new Path(archivePath, "_masterindex");
+    this.archiveIndex = new Path(archivePath, "_index");
+    if (!fs.exists(masterIndex) || !fs.exists(archiveIndex)) {
+      throw new IOException("Invalid path for the Har Filesystem. " +
+          "No index file in " + harPath);
+    }
+    try{ 
+      this.version = getHarVersion();
+    } catch(IOException io) {
+      throw new IOException("Unable to " +
+          "read the version of the Har file system: " + this.archivePath);
+    }
+    if (this.version != HarFileSystem.VERSION) {
+      throw new IOException("Invalid version " + 
+          this.version + " expected " + HarFileSystem.VERSION);
+    }
+  }
+  
+  // get the version of the filesystem from the masterindex file
+  // the version is currently not useful since its the first version 
+  // of archives
+  public int getHarVersion() throws IOException { 
+    FSDataInputStream masterIn = fs.open(masterIndex);
+    LineRecordReader.LineReader lmaster = new LineRecordReader.LineReader(
+                                              masterIn, getConf());
+    Text line = new Text();
+    lmaster.readLine(line);
+    try {
+      masterIn.close();
+    } catch(IOException e){
+      //disregard it.
+      // its a read.
+    }
+    String versionLine = line.toString();
+    String[] arr = versionLine.split(" ");
+    int version = Integer.parseInt(arr[0]);
+    return version;
+  }
+  
+  /*
+   * find the parent path that is the 
+   * archive path in the path. The last
+   * path segment that ends with .har is 
+   * the path that will be returned.
+   */
+  private Path archivePath(Path p) {
+    Path retPath = null;
+    Path tmp = p;
+    for (int i=0; i< p.depth(); i++) {
+      if (tmp.toString().endsWith(".har")) {
+        retPath = tmp;
+        break;
+      }
+      tmp = tmp.getParent();
+    }
+    return retPath;
+  }
+
+  /**
+   * decode the raw URI to get the underlying URI
+   * @param rawURI raw Har URI
+   * @return filtered URI of the underlying fileSystem
+   */
+  private URI decodeHarURI(URI rawURI, Configuration conf) throws IOException {
+    String tmpAuth = rawURI.getAuthority();
+    //we are using the default file
+    //system in the config 
+    //so create a underlying uri and 
+    //return it
+    if (tmpAuth == null) {
+      //create a path 
+      return FileSystem.getDefaultUri(conf);
+    }
+    String host = rawURI.getHost();
+    String[] str = host.split("-", 2);
+    if (str[0] == null) {
+      throw new IOException("URI: " + rawURI + " is an invalid Har URI.");
+    }
+    String underLyingScheme = str[0];
+    String underLyingHost = (str.length > 1)? str[1]:null;
+    int underLyingPort = rawURI.getPort();
+    String auth = (underLyingHost == null && underLyingPort == -1)?
+                  null:(underLyingHost+":"+underLyingPort);
+    URI tmp = null;
+    if (rawURI.getQuery() != null) {
+      // query component not allowed
+      throw new IOException("query component in Path not supported  " + rawURI);
+    }
+    try {
+      tmp = new URI(underLyingScheme, auth, rawURI.getPath(), 
+            rawURI.getQuery(), rawURI.getFragment());
+    } catch (URISyntaxException e) {
+        // do nothing should not happen
+    }
+    return tmp;
+  }
+  
+  /**
+   * return the top level archive.
+   */
+  public Path getWorkingDirectory() {
+    return new Path(uri.toString());
+  }
+  
+  /**
+   * Create a har specific auth 
+   * har-underlyingfs:port
+   * @param underLyingURI the uri of underlying
+   * filesystem
+   * @return har specific auth
+   */
+  private String getHarAuth(URI underLyingUri) {
+    String auth = underLyingUri.getScheme() + "-";
+    if (underLyingUri.getHost() != null) {
+      auth += underLyingUri.getHost() + ":";
+      if (underLyingUri.getPort() != -1) {
+        auth +=  underLyingUri.getPort();
+      }
+    }
+    else {
+      auth += ":";
+    }
+    return auth;
+  }
+  
+  /**
+   * Returns the uri of this filesystem.
+   * The uri is of the form 
+   * har://underlyingfsschema-host:port/pathintheunderlyingfs
+   */
+  @Override
+  public URI getUri() {
+    return this.uri;
+  }
+  
+  /**
+   * this method returns the path 
+   * inside the har filesystem.
+   * this is relative path inside 
+   * the har filesystem.
+   * @param path the fully qualified path in the har filesystem.
+   * @return relative path in the filesystem.
+   */
+  private Path getPathInHar(Path path) {
+    Path harPath = new Path(path.toUri().getPath());
+    if (archivePath.compareTo(harPath) == 0)
+      return new Path(Path.SEPARATOR);
+    Path tmp = new Path(harPath.getName());
+    Path parent = harPath.getParent();
+    while (!(parent.compareTo(archivePath) == 0)) {
+      if (parent.toString().equals(Path.SEPARATOR)) {
+        tmp = null;
+        break;
+      }
+      tmp = new Path(parent.getName(), tmp);
+      parent = parent.getParent();
+    }
+    if (tmp != null) 
+      tmp = new Path(Path.SEPARATOR, tmp);
+    return tmp;
+  }
+  
+  //the relative path of p. basically 
+  // getting rid of /. Parsing and doing 
+  // string manipulation is not good - so
+  // just use the path api to do it.
+  private Path makeRelative(String initial, Path p) {
+    Path root = new Path(Path.SEPARATOR);
+    if (root.compareTo(p) == 0)
+      return new Path(initial);
+    Path retPath = new Path(p.getName());
+    Path parent = p.getParent();
+    for (int i=0; i < p.depth()-1; i++) {
+      retPath = new Path(parent.getName(), retPath);
+      parent = parent.getParent();
+    }
+    return new Path(initial, retPath.toString());
+  }
+  
+  /* this makes a path qualified in the har filesystem
+   * (non-Javadoc)
+   * @see org.apache.hadoop.fs.FilterFileSystem#makeQualified(
+   * org.apache.hadoop.fs.Path)
+   */
+  @Override
+  public Path makeQualified(Path path) {
+    // make sure that we just get the 
+    // path component 
+    Path fsPath = path;
+    if (!path.isAbsolute()) {
+      fsPath = new Path(archivePath, path);
+    }
+
+    URI tmpURI = fsPath.toUri();
+    fsPath = new Path(tmpURI.getPath());
+    //change this to Har uri 
+    URI tmp = null;
+    try {
+      tmp = new URI(uri.getScheme(), harAuth, fsPath.toString(),
+                    tmpURI.getQuery(), tmpURI.getFragment());
+    } catch(URISyntaxException ue) {
+      LOG.error("Error in URI ", ue);
+    }
+    if (tmp != null) {
+      return new Path(tmp.toString());
+    }
+    return null;
+  }
+  
+  /**
+   * get block locations from the underlying fs
+   * @param f the input path for the blocks
+   * @param start the start in the file
+   * @param len the length in the file
+   * @return block locations for this segment of file
+   * @throws IOException
+   */
+  @Override
+  public BlockLocation[] getFileBlockLocations(Path f, long start,
+      long len) throws IOException {
+    // need to look up the file in the underlying fs
+    // look up the index 
+    
+    // make sure this is a prt of this har filesystem
+    Path p = makeQualified(f);
+    Path harPath = getPathInHar(p);
+    String line = fileStatusInIndex(harPath);
+    if (line == null)  {
+      throw new FileNotFoundException("File " + f + " not found");
+    }
+    HarStatus harStatus = new HarStatus(line);
+    if (harStatus.isDir()) 
+      return new BlockLocation[0];
+    return fs.getFileBlockLocations(new Path(archivePath, harStatus.getPartName()), 
+        harStatus.getStartIndex(), harStatus.getLength());
+  }
+  
+  /**
+   * the hash of the path p inside iniside
+   * the filesystem
+   * @param p the path in the harfilesystem
+   * @return the hash code of the path.
+   */
+  public static int getHarHash(Path p) {
+    return (p.toString().hashCode() & 0x7fffffff);
+  }
+  
+  static class Store {
+    public Store() {
+      begin = end = startHash = endHash = 0;
+    }
+    public Store(long begin, long end, int startHash, int endHash) {
+      this.begin = begin;
+      this.end = end;
+      this.startHash = startHash;
+      this.endHash = endHash;
+    }
+    public long begin;
+    public long end;
+    public int startHash;
+    public int endHash;
+  }
+  
+  // make sure that this harPath is relative to the har filesystem
+  // this only works for relative paths. This returns the line matching
+  // the file in the index. Returns a null if there is not matching 
+  // filename in the index file.
+  private String fileStatusInIndex(Path harPath) throws IOException {
+    // read the index file 
+    int hashCode = getHarHash(harPath);
+    // get the master index to find the pos 
+    // in the index file
+    FSDataInputStream in = fs.open(masterIndex);
+    FileStatus masterStat = fs.getFileStatus(masterIndex);
+    LineRecordReader.LineReader lin = new LineRecordReader.LineReader(in,
+                                          getConf());
+    Text line = new Text();
+    long read = lin.readLine(line);
+   //ignore the first line. this is the header of the index files
+    String[] readStr = null;
+    List<Store> stores = new ArrayList<Store>();
+    while(read < masterStat.getLen()) {
+      int b = lin.readLine(line);
+      read += b;
+      readStr = line.toString().split(" ");
+      int startHash = Integer.parseInt(readStr[0]);
+      int endHash  = Integer.parseInt(readStr[1]);
+      if (startHash <= hashCode && hashCode <= endHash) {
+        stores.add(new Store(Long.parseLong(readStr[2]), 
+            Long.parseLong(readStr[3]), startHash,
+            endHash));
+      }
+      line.clear();
+    }
+    try {
+      lin.close();
+    } catch(IOException io){
+      // do nothing just a read.
+    }
+    FSDataInputStream aIn = fs.open(archiveIndex);
+    LineRecordReader.LineReader aLin = new LineRecordReader.LineReader(aIn, 
+                                           getConf());
+    String retStr = null;
+    // now start reading the real index file
+     read = 0;
+    for (Store s: stores) {
+      aIn.seek(s.begin);
+      while (read + s.begin < s.end) {
+        int tmp = aLin.readLine(line);
+        read += tmp;
+        String lineFeed = line.toString();
+        String[] parsed = lineFeed.split(" ");
+        if (harPath.compareTo(new Path(parsed[0])) == 0) {
+          // bingo!
+          retStr = lineFeed;
+          break;
+        }
+        line.clear();
+      }
+      if (retStr != null)
+        break;
+    }
+    try {
+      aIn.close();
+    } catch(IOException io) {
+      //do nothing
+    }
+    return retStr;
+  }
+  
+  // a single line parser for hadoop archives status 
+  // stored in a single line in the index files 
+  // the format is of the form 
+  // filename "dir"/"file" partFileName startIndex length 
+  // <space seperated children>
+  private static class HarStatus {
+    boolean isDir;
+    String name;
+    List<String> children;
+    String partName;
+    long startIndex;
+    long length;
+    public HarStatus(String harString) {
+      String[] splits = harString.split(" ");
+      this.name = splits[0];
+      this.isDir = "dir".equals(splits[1]) ? true: false;
+      // this is equal to "none" if its a directory
+      this.partName = splits[2];
+      this.startIndex = Long.parseLong(splits[3]);
+      this.length = Long.parseLong(splits[4]);
+      if (isDir) {
+        children = new ArrayList<String>();
+        for (int i = 5; i < splits.length; i++) {
+          children.add(splits[i]);
+        }
+      }
+    }
+    public boolean isDir() {
+      return isDir;
+    }
+    
+    public String getName() {
+      return name;
+    }
+    
+    public List<String> getChildren() {
+      return children;
+    }
+    public String getFileName() {
+      return name;
+    }
+    public String getPartName() {
+      return partName;
+    }
+    public long getStartIndex() {
+      return startIndex;
+    }
+    public long getLength() {
+      return length;
+    }
+  }
+  
+  /**
+   * return the filestatus of files in har archive.
+   * The permission returned are that of the archive
+   * index files. The permissions are not persisted 
+   * while creating a hadoop archive.
+   * @param f the path in har filesystem
+   * @return filestatus.
+   * @throws IOException
+   */
+  @Override
+  public FileStatus getFileStatus(Path f) throws IOException {
+    FileStatus archiveStatus = fs.getFileStatus(archiveIndex);
+    // get the fs DataInputStream for the the underlying file
+    // look up the index.
+    Path p = makeQualified(f);
+    Path harPath = getPathInHar(p);
+    if (harPath == null) {
+      throw new IOException("Invalid file name: " + f + " in " + uri);
+    }
+    String readStr = fileStatusInIndex(harPath);
+    if (readStr == null) {
+      throw new FileNotFoundException("File: " +  f + " does not exist in " + uri);
+    }
+    HarStatus hstatus = null;
+    hstatus = new HarStatus(readStr);
+    return new FileStatus(hstatus.isDir()?0:hstatus.getLength(), hstatus.isDir(),
+        (int)archiveStatus.getReplication(), archiveStatus.getBlockSize(),
+        archiveStatus.getModificationTime(), new FsPermission(
+        archiveStatus.getPermission()), archiveStatus.getOwner(), 
+        archiveStatus.getGroup(), 
+            makeRelative(this.uri.toString(), new Path(hstatus.name)));
+  }
+
+  /**
+   * Returns a har input stream which fakes end of 
+   * file. It reads the index files to get the part 
+   * file name and the size and start of the file.
+   */
+  @Override
+  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+    // get the fs DataInputStream for the the underlying file
+    // look up the index.
+    Path p = makeQualified(f);
+    Path harPath = getPathInHar(p);
+    if (harPath == null) {
+      throw new IOException("Invalid file name: " + f + " in " + uri);
+    }
+    String readStr = fileStatusInIndex(harPath);
+    if (readStr == null) {
+      throw new FileNotFoundException(f + ": not found in " + archivePath);
+    }
+    HarStatus hstatus = new HarStatus(readStr); 
+    // we got it.. woo hooo!!! 
+    if (hstatus.isDir()) {
+      throw new FileNotFoundException(f + " : not a file in " +
+                archivePath);
+    }
+    return new HarFSDataInputStream(fs, new Path(archivePath, 
+        hstatus.getPartName()),
+        hstatus.getStartIndex(), hstatus.getLength(), bufferSize);
+  }
+ 
+  /*
+   * create throws an exception in Har filesystem.
+   * The archive once created cannot be changed.
+   */
+  public FSDataOutputStream create(Path f, int bufferSize) 
+                                    throws IOException {
+    throw new IOException("Har: Create not implemented");
+  }
+  
+  public FSDataOutputStream create(Path f,
+      FsPermission permission,
+      boolean overwrite,
+      int bufferSize,
+      short replication,
+      long blockSize,
+      Progressable progress) throws IOException {
+    throw new IOException("Har: create not implemented.");
+  }
+  
+  @Override
+  public void close() throws IOException {
+    if (fs != null) {
+      try {
+        fs.close();
+      } catch(IOException ie) {
+        //this might already be closed
+        // ignore
+      }
+    }
+  }
+  
+  /**
+   * Not implemented.
+   */
+  @Override
+  public boolean setReplication(Path src, short replication) throws IOException{
+    throw new IOException("Har: setreplication not implemented");
+  }
+  
+  /**
+   * Not implemented.
+   */
+  @Override
+  public boolean delete(Path f, boolean recursive) throws IOException { 
+    throw new IOException("Har: delete not implemented");
+  }
+  
+  /**
+   * liststatus returns the children of a directory 
+   * after looking up the index files.
+   */
+  @Override
+  public FileStatus[] listStatus(Path f) throws IOException {
+    //need to see if the file is an index in file
+    //get the filestatus of the archive directory
+    // we will create fake filestatuses to return
+    // to the client
+    List<FileStatus> statuses = new ArrayList<FileStatus>();
+    FileStatus archiveStatus = fs.getFileStatus(archiveIndex);
+    Path tmpPath = makeQualified(f);
+    Path harPath = getPathInHar(tmpPath);
+    String readStr = fileStatusInIndex(harPath);
+    if (readStr == null) {
+      throw new FileNotFoundException("File " + f + " not found in " + archivePath);
+    }
+    HarStatus hstatus = new HarStatus(readStr);
+    if (!hstatus.isDir()) 
+        statuses.add(new FileStatus(hstatus.getLength(), 
+            hstatus.isDir(),
+            archiveStatus.getReplication(), archiveStatus.getBlockSize(),
+            archiveStatus.getModificationTime(), 
+            new FsPermission(archiveStatus.getPermission()),
+            archiveStatus.getOwner(), archiveStatus.getGroup(), 
+            makeRelative(this.uri.toString(), new Path(hstatus.name))));
+    else 
+      for (String child: hstatus.children) {
+        FileStatus tmp = getFileStatus(new Path(tmpPath, child));
+        statuses.add(tmp);
+      }
+    return statuses.toArray(new FileStatus[statuses.size()]);
+  }
+  
+  /**
+   * return the top level archive path.
+   */
+  public Path getHomeDirectory() {
+    return new Path(uri.toString());
+  }
+  
+  public void setWorkingDirectory(Path newDir) {
+    //does nothing.
+  }
+  
+  /**
+   * not implemented.
+   */
+  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+    throw new IOException("Har: mkdirs not implemented");
+  }
+  
+  /**
+   * not implemented.
+   */
+  public void copyFromLocalFile(boolean delSrc, Path src, Path dst) throws 
+        IOException {
+    throw new IOException("Har: copyfromlocalfile not implemented");
+  }
+  
+  /**
+   * copies the file in the har filesystem to a local file.
+   */
+  public void copyToLocalFile(boolean delSrc, Path src, Path dst) 
+    throws IOException {
+    FileUtil.copy(this, src, getLocal(getConf()), dst, false, getConf());
+  }
+  
+  /**
+   * not implemented.
+   */
+  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile) 
+    throws IOException {
+    throw new IOException("Har: startLocalOutput not implemented");
+  }
+  
+  /**
+   * not implemented.
+   */
+  public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile) 
+    throws IOException {
+    throw new IOException("Har: completeLocalOutput not implemented");
+  }
+  
+  /**
+   * not implemented.
+   */
+  public void setOwner(Path p, String username, String groupname)
+    throws IOException {
+    throw new IOException("Har: setowner not implemented");
+  }
+
+  /**
+   * Not implemented.
+   */
+  public void setPermission(Path p, FsPermission permisssion) 
+    throws IOException {
+    throw new IOException("Har: setPermission not implemented");
+  }
+  
+  /**
+   * Hadoop archives input stream. This input stream fakes EOF 
+   * since archive files are part of bigger part files.
+   */
+  private static class HarFSDataInputStream extends FSDataInputStream {
+    /**
+     * Create an input stream that fakes all the reads/positions/seeking.
+     */
+    private static class HarFsInputStream extends FSInputStream {
+      private long position, start, end;
+      //The underlying data input stream that the
+      // underlying filesystem will return.
+      private FSDataInputStream underLyingStream;
+      //one byte buffer
+      private byte[] oneBytebuff = new byte[1];
+      HarFsInputStream(FileSystem fs, Path path, long start,
+          long length, int bufferSize) throws IOException {
+        underLyingStream = fs.open(path, bufferSize);
+        underLyingStream.seek(start);
+        // the start of this file in the part file
+        this.start = start;
+        // the position pointer in the part file
+        this.position = start;
+        // the end pointer in the part file
+        this.end = start + length;
+      }
+      
+      public synchronized int available() throws IOException {
+        long remaining = end - underLyingStream.getPos();
+        if (remaining > (long)Integer.MAX_VALUE) {
+          return Integer.MAX_VALUE;
+        }
+        return (int) remaining;
+      }
+      
+      public synchronized  void close() throws IOException {
+        underLyingStream.close();
+        super.close();
+      }
+      
+      //not implemented
+      @Override
+      public void mark(int readLimit) {
+        // do nothing 
+      }
+      
+      /**
+       * reset is not implemented
+       */
+      public void reset() throws IOException {
+        throw new IOException("reset not implemented.");
+      }
+      
+      public synchronized int read() throws IOException {
+        int ret = read(oneBytebuff, 0, 1);
+        return (ret <= 0) ? -1: (oneBytebuff[0] & 0xff);
+      }
+      
+      public synchronized int read(byte[] b) throws IOException {
+        int ret = read(b, 0, b.length);
+        if (ret != -1) {
+          position += ret;
+        }
+        return ret;
+      }
+      
+      /**
+       * 
+       */
+      public synchronized int read(byte[] b, int offset, int len) 
+        throws IOException {
+        int newlen = len;
+        int ret = -1;
+        if (position + len > end) {
+          newlen = (int) (end - position);
+        }
+        // end case
+        if (newlen == 0) 
+          return ret;
+        ret = underLyingStream.read(b, offset, newlen);
+        position += ret;
+        return ret;
+      }
+      
+      public synchronized long skip(long n) throws IOException {
+        long tmpN = n;
+        if (tmpN > 0) {
+          if (position + tmpN > end) {
+            tmpN = end - position;
+          }
+          underLyingStream.seek(tmpN + position);
+          position += tmpN;
+          return tmpN;
+        }
+        return (tmpN < 0)? -1 : 0;
+      }
+      
+      public synchronized long getPos() throws IOException {
+        return (position - start);
+      }
+      
+      public synchronized void seek(long pos) throws IOException {
+        if (pos < 0 || (start + pos > end)) {
+          throw new IOException("Failed to seek: EOF");
+        }
+        position = start + pos;
+        underLyingStream.seek(position);
+      }
+
+      public boolean seekToNewSource(long targetPos) throws IOException {
+        //do not need to implement this
+        // hdfs in itself does seektonewsource 
+        // while reading.
+        return false;
+      }
+      
+      /**
+       * implementing position readable. 
+       */
+      public int read(long pos, byte[] b, int offset, int length) 
+      throws IOException {
+        int nlength = length;
+        if (start + nlength + pos > end) {
+          nlength = (int) (end - (start + pos));
+        }
+        return underLyingStream.read(pos + start , b, offset, nlength);
+      }
+      
+      /**
+       * position readable again.
+       */
+      public void readFully(long pos, byte[] b, int offset, int length) 
+      throws IOException {
+        if (start + length + pos > end) {
+          throw new IOException("Not enough bytes to read.");
+        }
+        underLyingStream.readFully(pos + start, b, offset, length);
+      }
+      
+      public void readFully(long pos, byte[] b) throws IOException {
+          readFully(pos, b, 0, b.length);
+      }
+      
+    }
+  
+    /**
+     * constructors for har input stream.
+     * @param fs the underlying filesystem
+     * @param p The path in the underlying filesystem
+     * @param start the start position in the part file
+     * @param length the length of valid data in the part file
+     * @param bufsize the buffer size
+     * @throws IOException
+     */
+    public HarFSDataInputStream(FileSystem fs, Path  p, long start, 
+        long length, int bufsize) throws IOException {
+        super(new HarFsInputStream(fs, p, start, length, bufsize));
+    }
+
+    /**
+     * constructor for har input stream.
+     * @param fs the underlying filesystem
+     * @param p the path in the underlying file system
+     * @param start the start position in the part file
+     * @param length the length of valid data in the part file.
+     * @throws IOException
+     */
+    public HarFSDataInputStream(FileSystem fs, Path  p, long start, long length)
+      throws IOException {
+        super(new HarFsInputStream(fs, p, start, length, 0));
+    }
+  }
+}

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/util/CopyFiles.java?rev=663487&r1=663486&r2=663487&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/util/CopyFiles.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/util/CopyFiles.java Thu Jun  5 00:27:13 2008
@@ -851,7 +851,7 @@
   }
 
   private static final Random RANDOM = new Random();
-  private static String getRandomId() {
+  public static String getRandomId() {
     return Integer.toString(RANDOM.nextInt(Integer.MAX_VALUE), 36);
   }
 

Added: hadoop/core/trunk/src/java/org/apache/hadoop/util/HadoopArchives.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/util/HadoopArchives.java?rev=663487&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/util/HadoopArchives.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/util/HadoopArchives.java Thu Jun  5 00:27:13 2008
@@ -0,0 +1,670 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.HarFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+
+
+/**
+ * a archive creation utility.
+ * This class provides methods that can be used 
+ * to create hadoop archives. For understanding of 
+ * Hadoop archives look at {@link HarFileSystem}.
+ */
+public class HadoopArchives implements Tool {
+  private static final Log LOG = LogFactory.getLog(HadoopArchives.class);
+  
+  private static final String NAME = "har"; 
+  static final String SRC_LIST_LABEL = NAME + ".src.list";
+  static final String DST_DIR_LABEL = NAME + ".dest.path";
+  static final String TMP_DIR_LABEL = NAME + ".tmp.dir";
+  static final String JOB_DIR_LABEL = NAME + ".job.dir";
+  static final String SRC_COUNT_LABEL = NAME + ".src.count";
+  static final String TOTAL_SIZE_LABEL = NAME + ".total.size";
+  static final String DST_HAR_LABEL = NAME + ".archive.name";
+  // size of each part file
+  // its fixed for now.
+  static final long partSize = 2 * 1024 * 1024 * 1024;
+
+  private static final String usage = "archive"
+  + " -archiveName NAME <src>* <dest>" +
+  "\n";
+  
+ 
+  private JobConf conf;
+
+  public void setConf(Configuration conf) {
+    if (conf instanceof JobConf) {
+      this.conf = (JobConf) conf;
+    } else {
+      this.conf = new JobConf(conf);
+    }
+  }
+
+  public Configuration getConf() {
+    return this.conf;
+  }
+
+  public HadoopArchives(Configuration conf) {
+    setConf(conf);
+  }
+
+  // check the src paths
+  private static void checkPaths(Configuration conf, List<Path> paths) throws
+  IOException {
+    for (Path p : paths) {
+      FileSystem fs = p.getFileSystem(conf);
+      if (!fs.exists(p)) {
+        throw new FileNotFoundException("Source " + p + " does not exist.");
+      }
+    }
+  }
+
+  /**
+   * this assumes that there are two types of files file/dir
+   * @param fs the input filesystem
+   * @param p the top level path 
+   * @param out the list of paths output of recursive ls
+   * @throws IOException
+   */
+  private void recursivels(FileSystem fs, Path p, List<FileStatus> out) 
+  throws IOException {
+    FileStatus fstatus = fs.getFileStatus(p);
+    if (!fstatus.isDir()) {
+      out.add(fstatus);
+      return;
+    }
+    else {
+      out.add(fstatus);
+      FileStatus[] listStatus = fs.listStatus(p);
+      for (FileStatus stat: listStatus) {
+        recursivels(fs, stat.getPath(), out);
+      }
+    }
+  }
+
+  /**
+   * Input format of a hadoop archive job responsible for 
+   * generating splits of the file list
+   */
+
+  static class HArchiveInputFormat implements InputFormat<LongWritable, Text> {
+    public void validateInput(JobConf jconf) throws IOException{};
+
+    //generate input splits from the src file lists
+    public InputSplit[] getSplits(JobConf jconf, int numSplits)
+    throws IOException {
+      String srcfilelist = jconf.get(SRC_LIST_LABEL, "");
+      if ("".equals(srcfilelist)) {
+          throw new IOException("Unable to get the " +
+              "src file for archive generation.");
+      }
+      long totalSize = jconf.getLong(TOTAL_SIZE_LABEL, -1);
+      if (totalSize == -1) {
+        throw new IOException("Invalid size of files to archive");
+      }
+      //we should be safe since this is set by our own code
+      Path src = new Path(srcfilelist);
+      FileSystem fs = src.getFileSystem(jconf);
+      FileStatus fstatus = fs.getFileStatus(src);
+      ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
+      LongWritable key = new LongWritable();
+      Text value = new Text();
+      SequenceFile.Reader reader = null;
+      // the remaining bytes in the file split
+      long remaining = fstatus.getLen();
+      // the count of sizes calculated till now
+      long currentCount = 0L;
+      // the endposition of the split
+      long lastPos = 0L;
+      // the start position of the split
+      long startPos = 0L;
+      long targetSize = totalSize/numSplits;
+      // create splits of size target size so that all the maps 
+      // have equals sized data to read and write to.
+      try {
+        reader = new SequenceFile.Reader(fs, src, jconf);
+        while(reader.next(key, value)) {
+          if (currentCount + key.get() > targetSize && currentCount != 0){
+            long size = lastPos - startPos;
+            splits.add(new FileSplit(src, startPos, size, (String[]) null));
+            remaining = remaining - size;
+            startPos = lastPos;
+            currentCount = 0L;
+          }
+          currentCount += key.get();
+          lastPos = reader.getPosition();
+        }
+        // the remaining not equal to the target size.
+        if (remaining != 0) {
+          splits.add(new FileSplit(src, startPos, remaining, (String[])null));
+        }
+      }
+      finally { 
+        reader.close();
+      }
+      return splits.toArray(new FileSplit[splits.size()]);
+    }
+
+    public RecordReader<LongWritable, Text> getRecordReader(InputSplit split,
+        JobConf job, Reporter reporter) throws IOException {
+      return new SequenceFileRecordReader<LongWritable, Text>(job,
+                 (FileSplit)split);
+    }
+  }
+
+  private boolean checkValidName(String name) {
+    if (name.endsWith(".har")) 
+      return true;
+    return false;
+  }
+  
+
+  private Path largestDepth(List<Path> paths) {
+    Path deepest = paths.get(0);
+    for (Path p: paths) {
+      if (p.depth() > deepest.depth()) {
+        deepest = p;
+      }
+    }
+    return deepest;
+  }
+  
+  // this method is tricky. This method writes 
+  // the top level directories in such a way so that 
+  // the output only contains valid directoreis in archives.
+  // so for an input path specified by the user 
+  // as /user/hadoop
+  // we need to index 
+  // / as the root 
+  // /user as a directory
+  // /user/hadoop as a directory
+  // so for multiple input paths it makes sure that it
+  // does the right thing.
+  // so if the user specifies the input directories as 
+  // /user/harry and /user/hadoop
+  // we need to write / and user as its child
+  // and /user and harry and hadoop as its children
+  private void writeTopLevelDirs(SequenceFile.Writer srcWriter, 
+      List<Path> paths) throws IOException {
+    //these are qualified paths 
+    List<Path> justDirs = new ArrayList<Path>();
+    for (Path p: paths) {
+      if (!p.getFileSystem(getConf()).isFile(p)) {
+        justDirs.add(new Path(p.toUri().getPath()));
+      }
+      else {
+        justDirs.add(new Path(p.getParent().toUri().getPath()));
+      }
+    }
+    
+    //get the largest depth path
+    // this is tricky
+    TreeMap<String, HashSet<String>> allpaths = new TreeMap<String, HashSet<String>>();
+    Path deepest = largestDepth(paths);
+    Path root = new Path(Path.SEPARATOR);
+    for (int i = 0; i < deepest.depth(); i++) {
+      List<Path> parents = new ArrayList<Path>();
+      for (Path p: justDirs) {
+        if (p.compareTo(root) == 0){
+          //don nothing
+        }
+        else {
+          Path parent = p.getParent();
+          if (allpaths.containsKey(parent.toString())) {
+            HashSet<String> children = allpaths.get(parent.toString());
+            children.add(p.getName());
+          }
+          else {
+            HashSet<String> children = new HashSet<String>();
+            children.add(p.getName());
+            allpaths.put(parent.toString(), children);
+          }
+          parents.add(parent);
+        }
+      }
+      justDirs = parents;
+    }
+    Set<Map.Entry<String, HashSet<String>>> keyVals = allpaths.entrySet();
+    for (Map.Entry<String, HashSet<String>> entry : keyVals) {
+      HashSet<String> children = entry.getValue();
+      String toWrite = entry.getKey() + " dir ";
+      StringBuffer sbuff = new StringBuffer();
+      sbuff.append(toWrite);
+      for (String child: children) {
+        sbuff.append(child + " ");
+      }
+      toWrite = sbuff.toString();
+      srcWriter.append(new LongWritable(0L), new Text(toWrite));
+    }
+  }
+  
+  /**archive the given source paths into
+   * the dest
+   * @param srcPaths the src paths to be archived
+   * @param dest the dest dir that will contain the archive
+   */
+  public void archive(List<Path> srcPaths, String archiveName, Path dest) 
+  throws IOException {
+    boolean isValid = checkValidName(archiveName);
+    if (!isValid) { 
+      throw new IOException("Invalid archiveName " + archiveName);
+    }
+    checkPaths(conf, srcPaths);
+    int numFiles = 0;
+    long totalSize = 0;
+    conf.set(DST_HAR_LABEL, archiveName);
+    Path outputPath = new Path(dest, archiveName);
+    FileOutputFormat.setOutputPath(conf, outputPath);
+    conf.set(DST_DIR_LABEL, outputPath.toString());
+    final String randomId = CopyFiles.getRandomId();
+    Path jobDirectory = new Path(conf.getSystemDir(), NAME + "_" + randomId);
+    conf.set(JOB_DIR_LABEL, jobDirectory.toString());
+    //get a tmp directory for input splits
+    FileSystem jobfs = jobDirectory.getFileSystem(conf);
+    jobfs.mkdirs(jobDirectory);
+    Path srcFiles = new Path(jobDirectory, "_har_src_files");
+    conf.set(SRC_LIST_LABEL, srcFiles.toString());
+    SequenceFile.Writer srcWriter = SequenceFile.createWriter(jobfs, conf,
+        srcFiles, LongWritable.class, Text.class, 
+        SequenceFile.CompressionType.NONE);
+    // get the list of files 
+    // create single list of files and dirs
+    try {
+      // write the top level dirs in first 
+      writeTopLevelDirs(srcWriter, srcPaths);
+      // these are the input paths passed 
+      // from the command line
+      // we do a recursive ls on these paths 
+      // and then write them to the input file 
+      // one at a time
+      for (Path src: srcPaths) {
+        FileSystem fs = src.getFileSystem(conf);
+        ArrayList<FileStatus> allFiles = new ArrayList<FileStatus>();
+        recursivels(fs, src, allFiles);
+        for (FileStatus stat: allFiles) {
+          String toWrite = "";
+          long len = stat.isDir()? 0:stat.getLen();
+          if (stat.isDir()) {
+            toWrite = "" + fs.makeQualified(stat.getPath()) + " dir ";
+            //get the children 
+            FileStatus[] list = fs.listStatus(stat.getPath());
+            StringBuffer sbuff = new StringBuffer();
+            sbuff.append(toWrite);
+            for (FileStatus stats: list) {
+              sbuff.append(stats.getPath().getName() + " ");
+            }
+            toWrite = sbuff.toString();
+          }
+          else {
+            toWrite +=  fs.makeQualified(stat.getPath()) + " file ";
+          }
+          srcWriter.append(new LongWritable(len), new 
+              Text(toWrite));
+          numFiles++;
+          totalSize += len;
+        }
+      }
+    } finally {
+      srcWriter.close();
+    }
+    //increase the replication of src files
+    jobfs.setReplication(srcFiles, (short) 10);
+    conf.setInt(SRC_COUNT_LABEL, numFiles);
+    conf.setLong(TOTAL_SIZE_LABEL, totalSize);
+    int numMaps = (int)(totalSize/partSize);
+    //run atleast one map.
+    conf.setNumMapTasks(numMaps == 0? 1:numMaps);
+    conf.setNumReduceTasks(1);
+    conf.setInputFormat(HArchiveInputFormat.class);
+    conf.setOutputFormat(NullOutputFormat.class);
+    conf.setMapperClass(HArchivesMapper.class);
+    conf.setReducerClass(HArchivesReducer.class);
+    conf.setMapOutputKeyClass(IntWritable.class);
+    conf.setMapOutputValueClass(Text.class);
+    FileInputFormat.addInputPath(conf, jobDirectory);
+    //make sure no speculative execution is done
+    conf.setSpeculativeExecution(false);
+    JobClient.runJob(conf);
+    //delete the tmp job directory
+    try {
+      jobfs.delete(jobDirectory, true);
+    } catch(IOException ie) {
+      LOG.info("Unable to clean tmp directory " + jobDirectory);
+    }
+  }
+
+  static class HArchivesMapper 
+  implements Mapper<LongWritable, Text, IntWritable, Text> {
+    private JobConf conf = null;
+    int partId = -1 ; 
+    Path tmpOutputDir = null;
+    Path tmpOutput = null;
+    String partname = null;
+    FSDataOutputStream partStream = null;
+    FileSystem destFs = null;
+    byte[] buffer;
+    int buf_size = 128 * 1024;
+    
+    // configure the mapper and create 
+    // the part file.
+    // use map reduce framework to write into
+    // tmp files. 
+    public void configure(JobConf conf) {
+      this.conf = conf;
+      // this is tightly tied to map reduce
+      // since it does not expose an api 
+      // to get the partition
+      partId = conf.getInt("mapred.task.partition", -1);
+      // create a file name using the partition
+      // we need to write to this directory
+      tmpOutputDir = FileOutputFormat.getWorkOutputPath(conf);
+      // get the output path and write to the tmp 
+      // directory 
+      partname = "part-" + partId;
+      tmpOutput = new Path(tmpOutputDir, partname);
+      try {
+        destFs = tmpOutput.getFileSystem(conf);
+        //this was a stale copy
+        if (destFs.exists(tmpOutput)) {
+          destFs.delete(tmpOutput, false);
+        }
+        partStream = destFs.create(tmpOutput);
+      } catch(IOException ie) {
+        throw new RuntimeException("Unable to open output file " + tmpOutput);
+      }
+      buffer = new byte[buf_size];
+    }
+
+    // copy raw data.
+    public void copyData(Path input, FSDataInputStream fsin, 
+        FSDataOutputStream fout, Reporter reporter) throws IOException {
+      try {
+        for (int cbread=0; (cbread = fsin.read(buffer))>= 0;) {
+          fout.write(buffer, 0,cbread);
+          reporter.progress();
+        }
+      } finally {
+        fsin.close();
+      }
+    }
+    
+    // the relative path of p. basically 
+    // getting rid of schema. Parsing and doing 
+    // string manipulation is not good - so
+    // just use the path api to do it.
+    private Path makeRelative(Path p) {
+      Path retPath = new Path(p.toUri().getPath());
+      return retPath;
+    }
+    
+    static class MapStat {
+      private String pathname;
+      private boolean isDir;
+      private List<String> children;
+      public MapStat(String line) {
+        String[] splits = line.split(" ");
+        pathname = splits[0];
+        if ("dir".equals(splits[1])) {
+          isDir = true;
+        }
+        else {
+          isDir = false;
+        }
+        if (isDir) {
+          children = new ArrayList<String>();
+          for (int i = 2; i < splits.length; i++) {
+            children.add(splits[i]);
+          }
+        }
+      }
+    }
+    // read files from the split input 
+    // and write it onto the part files.
+    // also output hash(name) and string 
+    // for reducer to create index 
+    // and masterindex files.
+    public void map(LongWritable key, Text value,
+        OutputCollector<IntWritable, Text> out,
+        Reporter reporter) throws IOException {
+      String line  = value.toString();
+      MapStat mstat = new MapStat(line);
+      Path srcPath = new Path(mstat.pathname);
+      String towrite = null;
+      Path relPath = makeRelative(srcPath);
+      int hash = HarFileSystem.getHarHash(relPath);
+      long startPos = partStream.getPos();
+      if (mstat.isDir) { 
+        towrite = relPath.toString() + " " + "dir none " + 0 + " " + 0 + " ";
+        StringBuffer sbuff = new StringBuffer();
+        sbuff.append(towrite);
+        for (String child: mstat.children) {
+          sbuff.append(child + " ");
+        }
+        towrite = sbuff.toString();
+        //reading directories is also progress
+        reporter.progress();
+      }
+      else {
+        FileSystem srcFs = srcPath.getFileSystem(conf);
+        FileStatus srcStatus = srcFs.getFileStatus(srcPath);
+        FSDataInputStream input = srcFs.open(srcStatus.getPath());
+        reporter.setStatus("Copying file " + srcStatus.getPath() + 
+            " to archive.");
+        copyData(srcStatus.getPath(), input, partStream, reporter);
+        towrite = relPath.toString() + " file " + partname + " " + startPos
+        + " " + srcStatus.getLen() + " ";
+      }
+      out.collect(new IntWritable(hash), new Text(towrite));
+    }
+    
+    public void close() throws IOException {
+      // close the part files.
+      partStream.close();
+    }
+  }
+  
+  /** the reduce for creating the index and the master index 
+   * 
+   */
+  static class HArchivesReducer implements Reducer<IntWritable, 
+  Text, Text, Text> {
+    private JobConf conf = null;
+    private long startIndex = 0;
+    private long endIndex = 0;
+    private long startPos = 0;
+    private Path masterIndex = null;
+    private Path index = null;
+    private FileSystem fs = null;
+    private FSDataOutputStream outStream = null;
+    private FSDataOutputStream indexStream = null;
+    private int numIndexes = 1000;
+    private Path tmpOutputDir = null;
+    private int written = 0;
+    private int keyVal = 0;
+    
+    // configure 
+    public void configure(JobConf conf) {
+      this.conf = conf;
+      tmpOutputDir = FileOutputFormat.getWorkOutputPath(this.conf);
+      masterIndex = new Path(tmpOutputDir, "_masterindex");
+      index = new Path(tmpOutputDir, "_index");
+      try {
+        fs = masterIndex.getFileSystem(conf);
+        if (fs.exists(masterIndex)) {
+          fs.delete(masterIndex, false);
+        }
+        if (fs.exists(index)) {
+          fs.delete(index, false);
+        }
+        indexStream = fs.create(index);
+        outStream = fs.create(masterIndex);
+        String version = HarFileSystem.VERSION + " \n";
+        outStream.write(version.getBytes());
+        
+      } catch(IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    
+    // create the index and master index. The input to 
+    // the reduce is already sorted by the hash of the 
+    // files. SO we just need to write it to the index. 
+    // We update the masterindex as soon as we update 
+    // numIndex entries.
+    public void reduce(IntWritable key, Iterator<Text> values,
+        OutputCollector<Text, Text> out,
+        Reporter reporter) throws IOException {
+      keyVal = key.get();
+      while(values.hasNext()) {
+        Text value = values.next();
+        String towrite = value.toString() + "\n";
+        indexStream.write(towrite.getBytes());
+        written++;
+        if (written > numIndexes -1) {
+          // every 1000 indexes we report status
+          reporter.setStatus("Creating index for archives");
+          reporter.progress();
+          endIndex = keyVal;
+          String masterWrite = startIndex + " " + endIndex + " " + startPos 
+                              +  " " + indexStream.getPos() + " \n" ;
+          outStream.write(masterWrite.getBytes());
+          startPos = indexStream.getPos();
+          startIndex = endIndex;
+          written = 0;
+        }
+      }
+    }
+    
+    public void close() throws IOException {
+      //write the last part of the master index.
+      if (written > 0) {
+        String masterWrite = startIndex + " " + keyVal + " " + startPos  +
+                             " " + indexStream.getPos() + " \n";
+        outStream.write(masterWrite.getBytes());
+      }
+      // close the streams
+      outStream.close();
+      indexStream.close();
+      // try increasing the replication 
+      fs.setReplication(index, (short) 10);
+      fs.setReplication(masterIndex, (short) 10);
+    }
+    
+  }
+  
+  /** the main driver for creating the archives
+   *  it takes at least two command line parameters. The src and the 
+   *  dest. It does an lsr on the source paths.
+   *  The mapper created archuves and the reducer creates 
+   *  the archive index.
+   */
+
+  public int run(String[] args) throws Exception {
+    List<Path> srcPaths = new ArrayList<Path>();
+    Path destPath = null;
+    // check we were supposed to archive or 
+    // unarchive
+    String archiveName = null;
+    if (args.length < 2) {
+      System.out.println(usage);
+      throw new IOException("Invalid usage.");
+    }
+    if (!"-archiveName".equals(args[0])) {
+      System.out.println(usage);
+      throw new IOException("Archive Name not specified.");
+    }
+    archiveName = args[1];
+    if (!checkValidName(archiveName)) {
+      throw new IOException("Invalid name for archives. " + archiveName);
+    }
+    for (int i = 2; i < args.length; i++) {
+      if (i == (args.length - 1)) {
+        destPath = new Path(args[i]);
+      }
+      else {
+        srcPaths.add(new Path(args[i]));
+      }
+    }
+    // do a glob on the srcPaths and then pass it on
+    List<Path> globPaths = new ArrayList<Path>();
+    for (Path p: srcPaths) {
+      FileSystem fs = p.getFileSystem(getConf());
+      FileStatus[] statuses = fs.globStatus(p);
+      for (FileStatus status: statuses) {
+        globPaths.add(fs.makeQualified(status.getPath()));
+      }
+    }
+    archive(globPaths, archiveName, destPath);
+    return 0;
+  }
+
+  /** the main functions **/
+  public static void main(String[] args) {
+    JobConf job = new JobConf(HadoopArchives.class);
+    HadoopArchives harchives = new HadoopArchives(job);
+    try {
+      int res = harchives.run(args);
+      System.exit(res);
+    } catch(Exception e) {
+      System.err.println(e.getLocalizedMessage());
+    }
+  }
+}

Added: hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestHarFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestHarFileSystem.java?rev=663487&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestHarFileSystem.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestHarFileSystem.java Thu Jun  5 00:27:13 2008
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.util.HadoopArchives;
+import org.apache.hadoop.util.ToolRunner;
+
+import junit.framework.TestCase;
+
+/**
+ * test the har file system
+ * create a har filesystem
+ * run fs commands
+ * and then run a map reduce job
+ */
+public class TestHarFileSystem extends TestCase {
+  private Path inputPath;
+  private MiniDFSCluster dfscluster;
+  private MiniMRCluster mapred;
+  private FileSystem fs;
+  private Path filea, fileb, filec;
+  private Path archivePath;
+  
+  protected void setUp() throws Exception {
+    super.setUp();
+    dfscluster = new MiniDFSCluster(new JobConf(), 2, true, null);
+    fs = dfscluster.getFileSystem();
+    mapred = new MiniMRCluster(2, fs.getUri().toString(), 1);
+    inputPath = new Path(fs.getHomeDirectory(), "test"); 
+    filea = new Path(inputPath,"a");
+    fileb = new Path(inputPath,"b");
+    filec = new Path(inputPath,"c");
+    archivePath = new Path(fs.getHomeDirectory(), "tmp");
+  }
+  
+  protected void tearDown() throws Exception {
+    try {
+      if (mapred != null) {
+        mapred.shutdown();
+      }
+      if (dfscluster != null) {
+        dfscluster.shutdown();
+      }
+    } catch(Exception e) {
+      System.err.println(e);
+    }
+    super.tearDown();
+  }
+  
+  static class TextMapperReducer implements Mapper<LongWritable, Text, Text, Text>, 
+            Reducer<Text, Text, Text, Text> {
+    
+    public void configure(JobConf conf) {
+      //do nothing 
+    }
+
+    public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+      output.collect(value, new Text(""));
+    }
+
+    public void close() throws IOException {
+      // do nothing
+    }
+
+    public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+      while(values.hasNext()) { 
+        values.next();
+        output.collect(key, null);
+      }
+    }
+  }
+  
+  public void testArchives() throws Exception {
+    fs.mkdirs(inputPath);
+    
+    FSDataOutputStream out = fs.create(filea); 
+    out.write("a".getBytes());
+    out.close();
+    out = fs.create(fileb);
+    out.write("b".getBytes());
+    out.close();
+    out = fs.create(filec);
+    out.write("c".getBytes());
+    out.close();
+    Configuration conf = mapred.createJobConf();
+    HadoopArchives har = new HadoopArchives(conf);
+    String[] args = new String[4];
+    args[0] = "-archiveName";
+    args[1] = "foo.har";
+    args[2] = inputPath.toString();
+    args[3] = archivePath.toString();
+    int ret = ToolRunner.run(har, args);
+    //checl for the existenece of the archive
+    assertTrue(ret == 0);
+    Path finalPath = new Path(archivePath, "foo.har");
+    Path fsPath = new Path(inputPath.toUri().getPath());
+    String relative = fsPath.toString().substring(1);
+    Path filePath = new Path(finalPath, relative);
+    //make it a har path 
+    Path harPath = new Path("har://" + filePath.toUri().getPath());
+    assertTrue(fs.exists(new Path(finalPath, "_index")));
+    assertTrue(fs.exists(new Path(finalPath, "_masterindex")));
+    //creation tested
+    //check if the archive is same
+    // do ls and cat on all the files
+    FsShell shell = new FsShell(conf);
+    args = new String[2];
+    args[0] = "-ls";
+    args[1] = harPath.toString();
+    ret = ToolRunner.run(shell, args);
+    // ls should work.
+    assertTrue((ret == 0));
+    //now check for contents of filea
+    // fileb and filec
+    Path harFilea = new Path(harPath, "a");
+    Path harFileb = new Path(harPath, "b");
+    Path harFilec = new Path(harPath, "c");
+    FileSystem harFs = harFilea.getFileSystem(conf);
+    FSDataInputStream fin = harFs.open(harFilea);
+    byte[] b = new byte[4];
+    int readBytes = fin.read(b);
+    fin.close();
+    assertTrue("strings are equal ", (b[0] == "a".getBytes()[0]));
+    fin = harFs.open(harFileb);
+    fin.read(b);
+    fin.close();
+    assertTrue("strings are equal ", (b[0] == "b".getBytes()[0]));
+    fin = harFs.open(harFilec);
+    fin.read(b);
+    fin.close();
+    assertTrue("strings are equal ", (b[0] == "c".getBytes()[0]));
+    // ok all files match 
+    // run a map reduce job
+    Path outdir = new Path(fs.getHomeDirectory(), "mapout"); 
+    JobConf jobconf = mapred.createJobConf();
+    FileInputFormat.addInputPath(jobconf, harPath);
+    jobconf.setInputFormat(TextInputFormat.class);
+    jobconf.setOutputFormat(TextOutputFormat.class);
+    FileOutputFormat.setOutputPath(jobconf, outdir);
+    jobconf.setMapperClass(TextMapperReducer.class);
+    jobconf.setMapOutputKeyClass(Text.class);
+    jobconf.setMapOutputValueClass(Text.class);
+    jobconf.setReducerClass(TextMapperReducer.class);
+    jobconf.setNumReduceTasks(1);
+    JobClient.runJob(jobconf);
+    args[1] = outdir.toString();
+    ret = ToolRunner.run(shell, args);
+    
+    FileStatus[] status = fs.globStatus(new Path(outdir, "part*"));
+    Path reduceFile = status[0].getPath();
+    FSDataInputStream reduceIn = fs.open(reduceFile);
+    b = new byte[6];
+    reduceIn.read(b);
+    //assuming all the 6 bytes were read.
+    Text readTxt = new Text(b);
+    assertTrue("a\nb\nc\n".equals(readTxt.toString()));
+    assertTrue("number of bytes left should be -1", reduceIn.read(b) == -1);
+    reduceIn.close();
+  }
+}
\ No newline at end of file