You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/04/16 23:44:46 UTC

svn commit: r529410 [12/27] - in /lucene/hadoop/trunk: ./ src/contrib/abacus/src/examples/org/apache/hadoop/abacus/examples/ src/contrib/abacus/src/java/org/apache/hadoop/abacus/ src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ src/...

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java Mon Apr 16 14:44:35 2007
@@ -38,447 +38,447 @@
  */
 public class InMemoryFileSystem extends ChecksumFileSystem {
   private static class RawInMemoryFileSystem extends FileSystem {
-  private URI uri;
-  private int fsSize;
-  private volatile int totalUsed;
-  private Path staticWorkingDir;
-  
-  //pathToFileAttribs is the final place where a file is put after it is closed
-  private Map <String, FileAttributes> pathToFileAttribs = new HashMap();
-  
-  //tempFileAttribs is a temp place which is updated while reserving memory for
-  //files we are going to create. It is read in the createRaw method and the
-  //temp key/value is discarded. If the file makes it to "close", then it
-  //ends up being in the pathToFileAttribs map.
-  private Map <String, FileAttributes> tempFileAttribs = new HashMap();
+    private URI uri;
+    private int fsSize;
+    private volatile int totalUsed;
+    private Path staticWorkingDir;
   
-  public RawInMemoryFileSystem() {
-    setConf(new Configuration());
-  }
+    //pathToFileAttribs is the final place where a file is put after it is closed
+    private Map <String, FileAttributes> pathToFileAttribs = new HashMap();
+  
+    //tempFileAttribs is a temp place which is updated while reserving memory for
+    //files we are going to create. It is read in the createRaw method and the
+    //temp key/value is discarded. If the file makes it to "close", then it
+    //ends up being in the pathToFileAttribs map.
+    private Map <String, FileAttributes> tempFileAttribs = new HashMap();
+  
+    public RawInMemoryFileSystem() {
+      setConf(new Configuration());
+    }
 
-  public RawInMemoryFileSystem(URI uri, Configuration conf) {
-    initialize(uri, conf);
-  }
+    public RawInMemoryFileSystem(URI uri, Configuration conf) {
+      initialize(uri, conf);
+    }
   
-  //inherit javadoc
-  public void initialize(URI uri, Configuration conf) {
-    setConf(conf);
-    int size = Integer.parseInt(conf.get("fs.inmemory.size.mb", "100"));
-    this.fsSize = size * 1024 * 1024;
-    this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
-    this.staticWorkingDir = new Path(this.uri.getPath());
-    LOG.info("Initialized InMemoryFileSystem: " + uri.toString() + 
-             " of size (in bytes): " + fsSize);
-  }
+    //inherit javadoc
+    public void initialize(URI uri, Configuration conf) {
+      setConf(conf);
+      int size = Integer.parseInt(conf.get("fs.inmemory.size.mb", "100"));
+      this.fsSize = size * 1024 * 1024;
+      this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
+      this.staticWorkingDir = new Path(this.uri.getPath());
+      LOG.info("Initialized InMemoryFileSystem: " + uri.toString() + 
+               " of size (in bytes): " + fsSize);
+    }
 
-  //inherit javadoc
-  public URI getUri() {
-    return uri;
-  }
+    //inherit javadoc
+    public URI getUri() {
+      return uri;
+    }
 
-  /** @deprecated */
-  public String getName() {
-    return uri.toString();
-  }
+    /** @deprecated */
+    public String getName() {
+      return uri.toString();
+    }
 
-  /**
-   * Return 1x1 'inmemory' cell if the file exists.
-   * Return null if otherwise.
-   */
-  public String[][] getFileCacheHints(Path f, long start, long len)
+    /**
+     * Return 1x1 'inmemory' cell if the file exists.
+     * Return null if otherwise.
+     */
+    public String[][] getFileCacheHints(Path f, long start, long len)
       throws IOException {
-    if (! exists(f)) {
-      return null;
-    } else {
-      return new String[][] {{"inmemory"}};
+      if (! exists(f)) {
+        return null;
+      } else {
+        return new String[][] {{"inmemory"}};
+      }
     }
-  }
 
-  private class InMemoryInputStream extends FSInputStream {
-    private DataInputBuffer din = new DataInputBuffer();
-    private FileAttributes fAttr;
-    
-    public InMemoryInputStream(Path f) throws IOException {
-      synchronized (RawInMemoryFileSystem.this) {
-        fAttr = pathToFileAttribs.get(getPath(f));
-        if (fAttr == null) { 
-          throw new FileNotFoundException("File " + f + " does not exist");
-        }                            
-        din.reset(fAttr.data, 0, fAttr.size);
+    private class InMemoryInputStream extends FSInputStream {
+      private DataInputBuffer din = new DataInputBuffer();
+      private FileAttributes fAttr;
+    
+      public InMemoryInputStream(Path f) throws IOException {
+        synchronized (RawInMemoryFileSystem.this) {
+          fAttr = pathToFileAttribs.get(getPath(f));
+          if (fAttr == null) { 
+            throw new FileNotFoundException("File " + f + " does not exist");
+          }                            
+          din.reset(fAttr.data, 0, fAttr.size);
+        }
       }
-    }
     
-    public long getPos() throws IOException {
-      return din.getPosition();
-    }
+      public long getPos() throws IOException {
+        return din.getPosition();
+      }
     
-    public void seek(long pos) throws IOException {
-      if ((int)pos > fAttr.size)
-        throw new IOException("Cannot seek after EOF");
-      din.reset(fAttr.data, (int)pos, fAttr.size - (int)pos);
-    }
+      public void seek(long pos) throws IOException {
+        if ((int)pos > fAttr.size)
+          throw new IOException("Cannot seek after EOF");
+        din.reset(fAttr.data, (int)pos, fAttr.size - (int)pos);
+      }
     
-    public boolean seekToNewSource(long targetPos) throws IOException {
-      return false;
-    }
+      public boolean seekToNewSource(long targetPos) throws IOException {
+        return false;
+      }
 
-    public int available() throws IOException {
-      return din.available(); 
-    }
-    public boolean markSupport() { return false; }
+      public int available() throws IOException {
+        return din.available(); 
+      }
+      public boolean markSupport() { return false; }
 
-    public int read() throws IOException {
-      return din.read();
-    }
+      public int read() throws IOException {
+        return din.read();
+      }
 
-    public int read(byte[] b, int off, int len) throws IOException {
-      return din.read(b, off, len);
-    }
+      public int read(byte[] b, int off, int len) throws IOException {
+        return din.read(b, off, len);
+      }
     
-    public long skip(long n) throws IOException { return din.skip(n); }
-  }
-
-  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
-    return new FSDataInputStream(new InMemoryInputStream(f), bufferSize);
-  }
+      public long skip(long n) throws IOException { return din.skip(n); }
+    }
 
-  private class InMemoryOutputStream extends OutputStream {
-    private int count;
-    private FileAttributes fAttr;
-    private Path f;
-    
-    public InMemoryOutputStream(Path f, FileAttributes fAttr) 
-    throws IOException {
-      this.fAttr = fAttr;
-      this.f = f;
+    public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+      return new FSDataInputStream(new InMemoryInputStream(f), bufferSize);
     }
+
+    private class InMemoryOutputStream extends OutputStream {
+      private int count;
+      private FileAttributes fAttr;
+      private Path f;
     
-    public long getPos() throws IOException {
-      return count;
-    }
+      public InMemoryOutputStream(Path f, FileAttributes fAttr) 
+        throws IOException {
+        this.fAttr = fAttr;
+        this.f = f;
+      }
     
-    public void close() throws IOException {
-      synchronized (RawInMemoryFileSystem.this) {
-        pathToFileAttribs.put(getPath(f), fAttr);
+      public long getPos() throws IOException {
+        return count;
       }
-    }
     
-    public void write(byte[] b, int off, int len) throws IOException {
-      if ((off < 0) || (off > b.length) || (len < 0) ||
-          ((off + len) > b.length) || ((off + len) < 0)) {
-        throw new IndexOutOfBoundsException();
-      } else if (len == 0) {
-        return;
+      public void close() throws IOException {
+        synchronized (RawInMemoryFileSystem.this) {
+          pathToFileAttribs.put(getPath(f), fAttr);
+        }
       }
-      int newcount = count + len;
-      if (newcount > fAttr.size) {
-        throw new IOException("Insufficient space");
+    
+      public void write(byte[] b, int off, int len) throws IOException {
+        if ((off < 0) || (off > b.length) || (len < 0) ||
+            ((off + len) > b.length) || ((off + len) < 0)) {
+          throw new IndexOutOfBoundsException();
+        } else if (len == 0) {
+          return;
+        }
+        int newcount = count + len;
+        if (newcount > fAttr.size) {
+          throw new IOException("Insufficient space");
+        }
+        System.arraycopy(b, off, fAttr.data, count, len);
+        count = newcount;
       }
-      System.arraycopy(b, off, fAttr.data, count, len);
-      count = newcount;
-    }
     
-    public void write(int b) throws IOException {
-      int newcount = count + 1;
-      if (newcount > fAttr.size) {
-        throw new IOException("Insufficient space");
+      public void write(int b) throws IOException {
+        int newcount = count + 1;
+        if (newcount > fAttr.size) {
+          throw new IOException("Insufficient space");
+        }
+        fAttr.data[count] = (byte)b;
+        count = newcount;
       }
-      fAttr.data[count] = (byte)b;
-      count = newcount;
     }
-  }
   
-  public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,
-      short replication, long blockSize, Progressable progress)
+    public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,
+                                     short replication, long blockSize, Progressable progress)
       throws IOException {
-    synchronized (this) {
-      if (exists(f) && ! overwrite) {
-        throw new IOException("File already exists:"+f);
-      }
-      FileAttributes fAttr =(FileAttributes) tempFileAttribs.remove(getPath(f));
-      if (fAttr != null)
-        return create(f, fAttr);
-      return null;
+      synchronized (this) {
+        if (exists(f) && ! overwrite) {
+          throw new IOException("File already exists:"+f);
+        }
+        FileAttributes fAttr =(FileAttributes) tempFileAttribs.remove(getPath(f));
+        if (fAttr != null)
+          return create(f, fAttr);
+        return null;
+      }
     }
-  }
   
-  public FSDataOutputStream create(Path f, FileAttributes fAttr)
+    public FSDataOutputStream create(Path f, FileAttributes fAttr)
       throws IOException {
-    // the path is not added into the filesystem (in the pathToFileAttribs
-    // map) until close is called on the outputstream that this method is
-    // going to return
-    // Create an output stream out of data byte array
-    return new FSDataOutputStream(new InMemoryOutputStream(f, fAttr),
-        getConf());
-  }
+      // the path is not added into the filesystem (in the pathToFileAttribs
+      // map) until close is called on the outputstream that this method is
+      // going to return
+      // Create an output stream out of data byte array
+      return new FSDataOutputStream(new InMemoryOutputStream(f, fAttr),
+                                    getConf());
+    }
 
-  public void close() throws IOException {
-    super.close();
-    synchronized (this) {
-      if (pathToFileAttribs != null) { 
-        pathToFileAttribs.clear();
-      }
-      pathToFileAttribs = null;
-      if (tempFileAttribs != null) {
-        tempFileAttribs.clear();
+    public void close() throws IOException {
+      super.close();
+      synchronized (this) {
+        if (pathToFileAttribs != null) { 
+          pathToFileAttribs.clear();
+        }
+        pathToFileAttribs = null;
+        if (tempFileAttribs != null) {
+          tempFileAttribs.clear();
+        }
+        tempFileAttribs = null;
       }
-      tempFileAttribs = null;
     }
-  }
 
-  /**
-   * Replication is not supported for the inmemory file system.
-   */
-  public short getReplication(Path src) throws IOException {
-    return 1;
-  }
+    /**
+     * Replication is not supported for the inmemory file system.
+     */
+    public short getReplication(Path src) throws IOException {
+      return 1;
+    }
 
-  public boolean setReplication(Path src, short replication)
+    public boolean setReplication(Path src, short replication)
       throws IOException {
-    return true;
-  }
-
-  public boolean rename(Path src, Path dst) throws IOException {
-    synchronized (this) {
-      if (exists(dst)) {
-        throw new IOException ("Path " + dst + " already exists");
-      }
-      FileAttributes fAttr = pathToFileAttribs.remove(getPath(src));
-      if (fAttr == null) return false;
-      pathToFileAttribs.put(getPath(dst), fAttr);
       return true;
     }
-  }
 
-  public boolean delete(Path f) throws IOException {
-    synchronized (this) {
-      FileAttributes fAttr = pathToFileAttribs.remove(getPath(f));
-      if (fAttr != null) {
-        fAttr.data = null;
-        totalUsed -= fAttr.size;
+    public boolean rename(Path src, Path dst) throws IOException {
+      synchronized (this) {
+        if (exists(dst)) {
+          throw new IOException ("Path " + dst + " already exists");
+        }
+        FileAttributes fAttr = pathToFileAttribs.remove(getPath(src));
+        if (fAttr == null) return false;
+        pathToFileAttribs.put(getPath(dst), fAttr);
         return true;
       }
-      return false;
     }
-  }
 
-  public boolean exists(Path f) throws IOException {
-    synchronized (this) {
-      return pathToFileAttribs.containsKey(getPath(f));
+    public boolean delete(Path f) throws IOException {
+      synchronized (this) {
+        FileAttributes fAttr = pathToFileAttribs.remove(getPath(f));
+        if (fAttr != null) {
+          fAttr.data = null;
+          totalUsed -= fAttr.size;
+          return true;
+        }
+        return false;
+      }
+    }
+
+    public boolean exists(Path f) throws IOException {
+      synchronized (this) {
+        return pathToFileAttribs.containsKey(getPath(f));
+      }
     }
-  }
   
-  /**
-   * Directory operations are not supported
-   */
-  public boolean isDirectory(Path f) throws IOException {
-    return !isFile(f);
-  }
+    /**
+     * Directory operations are not supported
+     */
+    public boolean isDirectory(Path f) throws IOException {
+      return !isFile(f);
+    }
 
-  public boolean isFile(Path f) throws IOException {
-    return exists(f);
-  }
+    public boolean isFile(Path f) throws IOException {
+      return exists(f);
+    }
 
-  public long getLength(Path f) throws IOException {
-    synchronized (this) {
-      return pathToFileAttribs.get(getPath(f)).size;
+    public long getLength(Path f) throws IOException {
+      synchronized (this) {
+        return pathToFileAttribs.get(getPath(f)).size;
+      }
     }
-  }
   
-  /**
-   * Directory operations are not supported
-   */
-  public Path[] listPaths(Path f) throws IOException {
-    return null;
-  }
+    /**
+     * Directory operations are not supported
+     */
+    public Path[] listPaths(Path f) throws IOException {
+      return null;
+    }
 
-  public void setWorkingDirectory(Path new_dir) {
-    staticWorkingDir = new_dir;
-  }
+    public void setWorkingDirectory(Path new_dir) {
+      staticWorkingDir = new_dir;
+    }
   
-  public Path getWorkingDirectory() {
-    return staticWorkingDir;
-  }
+    public Path getWorkingDirectory() {
+      return staticWorkingDir;
+    }
 
-  public boolean mkdirs(Path f) throws IOException {
-    return true;
-  }
+    public boolean mkdirs(Path f) throws IOException {
+      return true;
+    }
   
-  /** lock operations are not supported */
-  public void lock(Path f, boolean shared) throws IOException {}
-  public void release(Path f) throws IOException {}
+    /** lock operations are not supported */
+    public void lock(Path f, boolean shared) throws IOException {}
+    public void release(Path f) throws IOException {}
   
-  /** copy/move operations are not supported */
-  public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
+    /** copy/move operations are not supported */
+    public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
       throws IOException {
-  }
+    }
 
-  public void copyToLocalFile(boolean delSrc, Path src, Path dst)
+    public void copyToLocalFile(boolean delSrc, Path src, Path dst)
       throws IOException {
-  }
+    }
 
-  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+    public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
       throws IOException {
-    return fsOutputFile;
-  }
+      return fsOutputFile;
+    }
 
-  public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+    public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
       throws IOException {
-  }
+    }
 
-  public long getBlockSize(Path f) throws IOException {
-    return getDefaultBlockSize();
-  }
+    public long getBlockSize(Path f) throws IOException {
+      return getDefaultBlockSize();
+    }
 
-  public long getDefaultBlockSize() {
-    return 32 * 1024; //some random large number. can be anything actually
-  }
+    public long getDefaultBlockSize() {
+      return 32 * 1024; //some random large number. can be anything actually
+    }
 
-  public short getDefaultReplication() {
-    return 1;
-  }
+    public short getDefaultReplication() {
+      return 1;
+    }
   
-  /** Some APIs exclusively for InMemoryFileSystem */
+    /** Some APIs exclusively for InMemoryFileSystem */
   
-  /** Register a path with its size. */
-  public boolean reserveSpace(Path f, int size) {
-    synchronized (this) {
-      if (!canFitInMemory(size))
-        return false;
-      FileAttributes fileAttr;
-      try {
-        fileAttr = new FileAttributes(size);
-      } catch (OutOfMemoryError o) {
-        return false;
+    /** Register a path with its size. */
+    public boolean reserveSpace(Path f, int size) {
+      synchronized (this) {
+        if (!canFitInMemory(size))
+          return false;
+        FileAttributes fileAttr;
+        try {
+          fileAttr = new FileAttributes(size);
+        } catch (OutOfMemoryError o) {
+          return false;
+        }
+        totalUsed += size;
+        tempFileAttribs.put(getPath(f), fileAttr);
+        return true;
       }
-      totalUsed += size;
-      tempFileAttribs.put(getPath(f), fileAttr);
-      return true;
     }
-  }
   
-  /** This API getClosedFiles could have been implemented over listPathsRaw
-   * but it is an overhead to maintain directory structures for this impl of
-   * the in-memory fs.
-   */
-  public Path[] getFiles(PathFilter filter) {
-    synchronized (this) {
-      List<String> closedFilesList = new ArrayList<String>();
-      synchronized (pathToFileAttribs) {
-        Set paths = pathToFileAttribs.keySet();
-        if (paths == null || paths.isEmpty()) {
-          return new Path[0];
-        }
-        Iterator iter = paths.iterator();
-        while (iter.hasNext()) {
-          String f = (String)iter.next();
-          if (filter.accept(new Path(f))) {
-            closedFilesList.add(f);
+    /** This API getClosedFiles could have been implemented over listPathsRaw
+     * but it is an overhead to maintain directory structures for this impl of
+     * the in-memory fs.
+     */
+    public Path[] getFiles(PathFilter filter) {
+      synchronized (this) {
+        List<String> closedFilesList = new ArrayList<String>();
+        synchronized (pathToFileAttribs) {
+          Set paths = pathToFileAttribs.keySet();
+          if (paths == null || paths.isEmpty()) {
+            return new Path[0];
+          }
+          Iterator iter = paths.iterator();
+          while (iter.hasNext()) {
+            String f = (String)iter.next();
+            if (filter.accept(new Path(f))) {
+              closedFilesList.add(f);
+            }
           }
         }
+        String [] names = 
+          closedFilesList.toArray(new String[closedFilesList.size()]);
+        Path [] results = new Path[names.length];
+        for (int i = 0; i < names.length; i++) {
+          results[i] = new Path(names[i]);
+        }
+        return results;
       }
-      String [] names = 
-        closedFilesList.toArray(new String[closedFilesList.size()]);
-      Path [] results = new Path[names.length];
-      for (int i = 0; i < names.length; i++) {
-        results[i] = new Path(names[i]);
-      }
-      return results;
     }
-  }
   
-  public int getNumFiles(PathFilter filter) {
-    return getFiles(filter).length;
-  }
+    public int getNumFiles(PathFilter filter) {
+      return getFiles(filter).length;
+    }
 
-  public int getFSSize() {
-    return fsSize;
-  }
+    public int getFSSize() {
+      return fsSize;
+    }
   
-  public float getPercentUsed() {
-    if (fsSize > 0)
-      return (float)totalUsed/fsSize;
-    else return 0.1f;
-  }
+    public float getPercentUsed() {
+      if (fsSize > 0)
+        return (float)totalUsed/fsSize;
+      else return 0.1f;
+    }
  
-  private boolean canFitInMemory(int size) {
-    if (size + totalUsed < fsSize)
-      return true;
-    return false;
-  }
+    private boolean canFitInMemory(int size) {
+      if (size + totalUsed < fsSize)
+        return true;
+      return false;
+    }
   
-  private String getPath(Path f) {
-    return f.toUri().getPath();
-  }
+    private String getPath(Path f) {
+      return f.toUri().getPath();
+    }
   
-  private static class FileAttributes {
-    private byte[] data;
-    private int size;
-    
-    public FileAttributes(int size) {
-      this.size = size;
-      this.data = new byte[size];
+    private static class FileAttributes {
+      private byte[] data;
+      private int size;
+    
+      public FileAttributes(int size) {
+        this.size = size;
+        this.data = new byte[size];
+      }
     }
   }
-  }
     
-    public InMemoryFileSystem() {
-        super(new RawInMemoryFileSystem());
-    }
+  public InMemoryFileSystem() {
+    super(new RawInMemoryFileSystem());
+  }
     
-    public InMemoryFileSystem(URI uri, Configuration conf) {
-        super(new RawInMemoryFileSystem(uri, conf));
-    }
+  public InMemoryFileSystem(URI uri, Configuration conf) {
+    super(new RawInMemoryFileSystem(uri, conf));
+  }
     
-    /** copy/move operations are not supported */
-    public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
+  /** copy/move operations are not supported */
+  public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
     throws IOException {}
-    public void copyToLocalFile(boolean delSrc, Path src, Path dst)
+  public void copyToLocalFile(boolean delSrc, Path src, Path dst)
     throws IOException {}
     
-    public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
     throws IOException {
-        return fsOutputFile;
-    }
+    return fsOutputFile;
+  }
     
-    public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+  public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
     throws IOException {
-    }
+  }
     
-    /**
-     * Register a file with its size. This will also register a checksum for the
-     * file that the user is trying to create. This is required since none of
-     * the FileSystem APIs accept the size of the file as argument. But since it
-     * is required for us to apriori know the size of the file we are going to
-     * create, the user must call this method for each file he wants to create
-     * and reserve memory for that file. We either succeed in reserving memory
-     * for both the main file and the checksum file and return true, or return
-     * false.
-     */
-    public boolean reserveSpaceWithCheckSum(Path f, int size) {
-        // get the size of the checksum file (we know it is going to be 'int'
-        // since this is an inmem fs with file sizes that will fit in 4 bytes)
-        long checksumSize = getChecksumFileLength(f, size);
-        RawInMemoryFileSystem mfs = (RawInMemoryFileSystem)getRawFileSystem();
-        synchronized(mfs) {
-            return mfs.reserveSpace(f, size) && 
-            mfs.reserveSpace(getChecksumFile(f),
-                    (int)getChecksumFileLength(f, size));
-        }
-    }
-    public Path[] getFiles(PathFilter filter) {
-        return ((RawInMemoryFileSystem)getRawFileSystem()).getFiles(filter);
+  /**
+   * Register a file with its size. This will also register a checksum for the
+   * file that the user is trying to create. This is required since none of
+   * the FileSystem APIs accept the size of the file as argument. But since it
+   * is required for us to apriori know the size of the file we are going to
+   * create, the user must call this method for each file he wants to create
+   * and reserve memory for that file. We either succeed in reserving memory
+   * for both the main file and the checksum file and return true, or return
+   * false.
+   */
+  public boolean reserveSpaceWithCheckSum(Path f, int size) {
+    // get the size of the checksum file (we know it is going to be 'int'
+    // since this is an inmem fs with file sizes that will fit in 4 bytes)
+    long checksumSize = getChecksumFileLength(f, size);
+    RawInMemoryFileSystem mfs = (RawInMemoryFileSystem)getRawFileSystem();
+    synchronized(mfs) {
+      return mfs.reserveSpace(f, size) && 
+        mfs.reserveSpace(getChecksumFile(f),
+                         (int)getChecksumFileLength(f, size));
     }
+  }
+  public Path[] getFiles(PathFilter filter) {
+    return ((RawInMemoryFileSystem)getRawFileSystem()).getFiles(filter);
+  }
     
-    public int getNumFiles(PathFilter filter) {
-      return ((RawInMemoryFileSystem)getRawFileSystem()).getNumFiles(filter);
-    }
+  public int getNumFiles(PathFilter filter) {
+    return ((RawInMemoryFileSystem)getRawFileSystem()).getNumFiles(filter);
+  }
 
-    public int getFSSize() {
-        return ((RawInMemoryFileSystem)getRawFileSystem()).getFSSize();
-    }
+  public int getFSSize() {
+    return ((RawInMemoryFileSystem)getRawFileSystem()).getFSSize();
+  }
     
-    public float getPercentUsed() {
-        return ((RawInMemoryFileSystem)getRawFileSystem()).getPercentUsed();
-    }
+  public float getPercentUsed() {
+    return ((RawInMemoryFileSystem)getRawFileSystem()).getPercentUsed();
+  }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java Mon Apr 16 14:44:35 2007
@@ -28,78 +28,78 @@
  * @author Mike Cafarella
  *****************************************************************/
 public class LocalFileSystem extends ChecksumFileSystem {
-    static final URI NAME = URI.create("file:///");
+  static final URI NAME = URI.create("file:///");
 
-    public LocalFileSystem() {
-        super(new RawLocalFileSystem());
-    }
+  public LocalFileSystem() {
+    super(new RawLocalFileSystem());
+  }
     
-    public LocalFileSystem( FileSystem rawLocalFileSystem ) {
-        super(rawLocalFileSystem);
-    }
+  public LocalFileSystem( FileSystem rawLocalFileSystem ) {
+    super(rawLocalFileSystem);
+  }
     
-    /** Convert a path to a File. */
-    public File pathToFile(Path path) {
-      return ((RawLocalFileSystem)fs).pathToFile(path);
-    }
+  /** Convert a path to a File. */
+  public File pathToFile(Path path) {
+    return ((RawLocalFileSystem)fs).pathToFile(path);
+  }
 
-    @Override
+  @Override
     public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
     throws IOException {
-      FileUtil.copy(this, src, this, dst, delSrc, getConf());
-    }
+    FileUtil.copy(this, src, this, dst, delSrc, getConf());
+  }
 
-    @Override
+  @Override
     public void copyToLocalFile(boolean delSrc, Path src, Path dst)
     throws IOException {
-      FileUtil.copy(this, src, this, dst, delSrc, getConf());
-    }
+    FileUtil.copy(this, src, this, dst, delSrc, getConf());
+  }
 
-    /**
-     * Moves files to a bad file directory on the same device, so that their
-     * storage will not be reused.
-     */
-    public boolean reportChecksumFailure(Path p, FSDataInputStream in,
-                                      long inPos,
-                                      FSDataInputStream sums, long sumsPos) {
-      try {
-        // canonicalize f
-        File f = ((RawLocalFileSystem)fs).pathToFile(p).getCanonicalFile();
+  /**
+   * Moves files to a bad file directory on the same device, so that their
+   * storage will not be reused.
+   */
+  public boolean reportChecksumFailure(Path p, FSDataInputStream in,
+                                       long inPos,
+                                       FSDataInputStream sums, long sumsPos) {
+    try {
+      // canonicalize f
+      File f = ((RawLocalFileSystem)fs).pathToFile(p).getCanonicalFile();
       
-        // find highest writable parent dir of f on the same device
-        String device = new DF(f, getConf()).getMount();
-        File parent = f.getParentFile();
-        File dir = null;
-        while (parent!=null && parent.canWrite() && parent.toString().startsWith(device)) {
-          dir = parent;
-          parent = parent.getParentFile();
-        }
+      // find highest writable parent dir of f on the same device
+      String device = new DF(f, getConf()).getMount();
+      File parent = f.getParentFile();
+      File dir = null;
+      while (parent!=null && parent.canWrite() && parent.toString().startsWith(device)) {
+        dir = parent;
+        parent = parent.getParentFile();
+      }
 
-        if (dir==null) {
-          throw new IOException(
-              "not able to find the highest writable parent dir");
-        }
+      if (dir==null) {
+        throw new IOException(
+                              "not able to find the highest writable parent dir");
+      }
         
-        // move the file there
-        File badDir = new File(dir, "bad_files");
-        if (!badDir.mkdirs()) {
-          if (!badDir.isDirectory()) {
-            throw new IOException("Mkdirs failed to create " + badDir.toString());
-          }
+      // move the file there
+      File badDir = new File(dir, "bad_files");
+      if (!badDir.mkdirs()) {
+        if (!badDir.isDirectory()) {
+          throw new IOException("Mkdirs failed to create " + badDir.toString());
         }
-        String suffix = "." + new Random().nextInt();
-        File badFile = new File(badDir,f.getName()+suffix);
-        LOG.warn("Moving bad file " + f + " to " + badFile);
-        in.close();                               // close it first
-        f.renameTo(badFile);                      // rename it
-
-        // move checksum file too
-        File checkFile = ((RawLocalFileSystem)fs).pathToFile(getChecksumFile(p));
-        checkFile.renameTo(new File(badDir, checkFile.getName()+suffix));
-
-      } catch (IOException e) {
-        LOG.warn("Error moving bad file " + p + ": " + e);
       }
-      return false;
+      String suffix = "." + new Random().nextInt();
+      File badFile = new File(badDir,f.getName()+suffix);
+      LOG.warn("Moving bad file " + f + " to " + badFile);
+      in.close();                               // close it first
+      f.renameTo(badFile);                      // rename it
+
+      // move checksum file too
+      File checkFile = ((RawLocalFileSystem)fs).pathToFile(getChecksumFile(p));
+      checkFile.renameTo(new File(badDir, checkFile.getName()+suffix));
+
+    } catch (IOException e) {
+      LOG.warn("Error moving bad file " + p + ": " + e);
     }
+    return false;
+  }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/PositionedReadable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/PositionedReadable.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/PositionedReadable.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/PositionedReadable.java Mon Apr 16 14:44:35 2007
@@ -28,7 +28,7 @@
    * change the current offset of a file, and is thread-safe.
    */
   public int read(long position, byte[] buffer, int offset, int length)
-  throws IOException;
+    throws IOException;
   
   /**
    * Read the specified number of bytes, from a given
@@ -36,7 +36,7 @@
    * change the current offset of a file, and is thread-safe.
    */
   public void readFully(long position, byte[] buffer, int offset, int length)
-  throws IOException;
+    throws IOException;
   
   /**
    * Read number of bytes equalt to the length of the buffer, from a given

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/RawLocalFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/RawLocalFileSystem.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/RawLocalFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/RawLocalFileSystem.java Mon Apr 16 14:44:35 2007
@@ -123,7 +123,7 @@
     }
     
     public int read(long position, byte[] b, int off, int len)
-    throws IOException {
+      throws IOException {
       ByteBuffer bb = ByteBuffer.wrap(b, off, len);
       try {
         return fis.getChannel().read(bb, position);
@@ -179,8 +179,8 @@
   }
   
   public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,
-      short replication, long blockSize, Progressable progress)
-  throws IOException {
+                                   short replication, long blockSize, Progressable progress)
+    throws IOException {
     if (exists(f) && ! overwrite) {
       throw new IOException("File already exists:"+f);
     }
@@ -200,8 +200,8 @@
   
   /** Set the replication of the given file */
   public boolean setReplication( Path src,
-      short replication
-  ) throws IOException {
+                                 short replication
+                                 ) throws IOException {
     return true;
   }
   
@@ -261,24 +261,24 @@
     Path parent = f.getParent();
     File p2f = pathToFile(f);
     return (parent == null || mkdirs(parent)) &&
-    (p2f.mkdir() || p2f.isDirectory());
+      (p2f.mkdir() || p2f.isDirectory());
   }
   
   /**
    * Set the working directory to the given directory.
    */
   @Override
-  public void setWorkingDirectory(Path newDir) {
+    public void setWorkingDirectory(Path newDir) {
     workingDir = newDir;
   }
   
   @Override
-  public Path getWorkingDirectory() {
+    public Path getWorkingDirectory() {
     return workingDir;
   }
   
   /** @deprecated */ @Deprecated
-  public void lock(Path p, boolean shared) throws IOException {
+    public void lock(Path p, boolean shared) throws IOException {
     File f = pathToFile(p);
     f.createNewFile();
     
@@ -301,7 +301,7 @@
   }
   
   /** @deprecated */ @Deprecated
-  public void release(Path p) throws IOException {
+    public void release(Path p) throws IOException {
     File f = pathToFile(p);
     
     FileLock lockObj;
@@ -335,26 +335,26 @@
   }
   
   @Override
-  public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
-  throws IOException {
+    public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
+    throws IOException {
     FileUtil.copy(this, src, this, dst, delSrc, getConf());
   }
   
   @Override
-  public void copyToLocalFile(boolean delSrc, Path src, Path dst)
-  throws IOException {
+    public void copyToLocalFile(boolean delSrc, Path src, Path dst)
+    throws IOException {
     FileUtil.copy(this, src, this, dst, delSrc, getConf());
   }
   
   // We can write output directly to the final location
   public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
-  throws IOException {
+    throws IOException {
     return fsOutputFile;
   }
   
   // It's in the right place - nothing to do.
   public void completeLocalOutput(Path fsWorkingFile, Path tmpLocalFile)
-  throws IOException {
+    throws IOException {
   }
   
   public void close() throws IOException {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/ArrayWritable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/ArrayWritable.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/ArrayWritable.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/ArrayWritable.java Mon Apr 16 14:44:35 2007
@@ -48,8 +48,8 @@
 
   public void setValueClass(Class valueClass) {
     if (valueClass != this.valueClass) {
-        this.valueClass = valueClass;
-        this.values = null;
+      this.valueClass = valueClass;
+      this.values = null;
     }
   }
   

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/BooleanWritable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/BooleanWritable.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/BooleanWritable.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/BooleanWritable.java Mon Apr 16 14:44:35 2007
@@ -24,86 +24,86 @@
  * A WritableComparable for booleans. 
  */
 public class BooleanWritable implements WritableComparable {
-    private boolean value;
+  private boolean value;
 
-    /** 
-     */
-    public BooleanWritable() {};
-
-    /** 
-     */
-    public BooleanWritable(boolean value) {
-        set(value);
-    }
-
-    /** 
-     * Set the value of the BooleanWritable
-     */    
-    public void set(boolean value) {
-        this.value = value;
-    }
-
-    /**
-     * Returns the value of the BooleanWritable
-     */
-    public boolean get() {
-        return value;
-    }
-
-    /**
-     */
-    public void readFields(DataInput in) throws IOException {
-        value = in.readBoolean();
-    }
-
-    /**
-     */
-    public void write(DataOutput out) throws IOException {
-        out.writeBoolean(value);
-    }
-
-    /**
-     */
-    public boolean equals(Object o) {
-        if (!(o instanceof BooleanWritable)) {
-            return false;
-        }
-        BooleanWritable other = (BooleanWritable) o;
-        return this.value == other.value;
-    }
-
-    public int hashCode() {
-      return value ? 0 : 1;
-    }
-
-
-
-    /**
-     */
-    public int compareTo(Object o) {
-        boolean a = this.value;
-        boolean b = ((BooleanWritable) o).value;
-        return ((a == b) ? 0 : (a == false) ? -1 : 1);
-    }
-
-    /** 
-     * A Comparator optimized for BooleanWritable. 
-     */ 
-    public static class Comparator extends WritableComparator {
-        public Comparator() {
-            super(BooleanWritable.class);
-        }
-
-        public int compare(byte[] b1, int s1, int l1,
-                           byte[] b2, int s2, int l2) {
-            boolean a = (readInt(b1, s1) == 1) ? true : false;
-            boolean b = (readInt(b2, s2) == 1) ? true : false;
-            return ((a == b) ? 0 : (a == false) ? -1 : 1);
-        }
-    }
-
-
-    static {
-      WritableComparator.define(BooleanWritable.class, new Comparator());
-    }
+  /** 
+   */
+  public BooleanWritable() {};
+
+  /** 
+   */
+  public BooleanWritable(boolean value) {
+    set(value);
+  }
+
+  /** 
+   * Set the value of the BooleanWritable
+   */    
+  public void set(boolean value) {
+    this.value = value;
+  }
+
+  /**
+   * Returns the value of the BooleanWritable
+   */
+  public boolean get() {
+    return value;
+  }
+
+  /**
+   */
+  public void readFields(DataInput in) throws IOException {
+    value = in.readBoolean();
+  }
+
+  /**
+   */
+  public void write(DataOutput out) throws IOException {
+    out.writeBoolean(value);
+  }
+
+  /**
+   */
+  public boolean equals(Object o) {
+    if (!(o instanceof BooleanWritable)) {
+      return false;
+    }
+    BooleanWritable other = (BooleanWritable) o;
+    return this.value == other.value;
+  }
+
+  public int hashCode() {
+    return value ? 0 : 1;
+  }
+
+
+
+  /**
+   */
+  public int compareTo(Object o) {
+    boolean a = this.value;
+    boolean b = ((BooleanWritable) o).value;
+    return ((a == b) ? 0 : (a == false) ? -1 : 1);
+  }
+
+  /** 
+   * A Comparator optimized for BooleanWritable. 
+   */ 
+  public static class Comparator extends WritableComparator {
+    public Comparator() {
+      super(BooleanWritable.class);
+    }
+
+    public int compare(byte[] b1, int s1, int l1,
+                       byte[] b2, int s2, int l2) {
+      boolean a = (readInt(b1, s1) == 1) ? true : false;
+      boolean b = (readInt(b2, s2) == 1) ? true : false;
+      return ((a == b) ? 0 : (a == false) ? -1 : 1);
+    }
+  }
+
+
+  static {
+    WritableComparator.define(BooleanWritable.class, new Comparator());
+  }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/BytesWritable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/BytesWritable.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/BytesWritable.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/BytesWritable.java Mon Apr 16 14:44:35 2007
@@ -194,7 +194,7 @@
      * Compare the buffers in serialized form.
      */
     public int compare(byte[] b1, int s1, int l1,
-        byte[] b2, int s2, int l2) {
+                       byte[] b2, int s2, int l2) {
       int size1 = readInt(b1, s1);
       int size2 = readInt(b2, s2);
       return compareBytes(b1,s1+4, size1, b2, s2+4, size2);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Mon Apr 16 14:44:35 2007
@@ -82,7 +82,7 @@
   static public CompressionType getCompressionType(Configuration job) {
     String name = job.get("io.seqfile.compression.type");
     return name == null ? CompressionType.RECORD : 
-                          CompressionType.valueOf(name);
+      CompressionType.valueOf(name);
   }
   
   /**
@@ -106,9 +106,9 @@
    * @throws IOException
    */
   public static Writer 
-  createWriter(FileSystem fs, Configuration conf, Path name, 
-      Class keyClass, Class valClass) 
-  throws IOException {
+    createWriter(FileSystem fs, Configuration conf, Path name, 
+                 Class keyClass, Class valClass) 
+    throws IOException {
     return createWriter(fs,conf,name,keyClass,valClass,
                         getCompressionType(conf));
   }
@@ -125,19 +125,19 @@
    * @throws IOException
    */
   public static Writer 
-  createWriter(FileSystem fs, Configuration conf, Path name, 
-      Class keyClass, Class valClass, CompressionType compressionType) 
-  throws IOException {
+    createWriter(FileSystem fs, Configuration conf, Path name, 
+                 Class keyClass, Class valClass, CompressionType compressionType) 
+    throws IOException {
     Writer writer = null;
     
     if (compressionType == CompressionType.NONE) {
       writer = new Writer(fs, conf, name, keyClass, valClass, null, new Metadata());
     } else if (compressionType == CompressionType.RECORD) {
       writer = new RecordCompressWriter(fs, conf, name, keyClass, valClass, 
-          new DefaultCodec());
+                                        new DefaultCodec());
     } else if (compressionType == CompressionType.BLOCK){
       writer = new BlockCompressWriter(fs, conf, name, keyClass, valClass, 
-          new DefaultCodec());
+                                       new DefaultCodec());
     }
     
     return writer;
@@ -156,19 +156,19 @@
    * @throws IOException
    */
   public static Writer
-  createWriter(FileSystem fs, Configuration conf, Path name, 
-      Class keyClass, Class valClass, CompressionType compressionType,
-      Progressable progress) throws IOException {
+    createWriter(FileSystem fs, Configuration conf, Path name, 
+                 Class keyClass, Class valClass, CompressionType compressionType,
+                 Progressable progress) throws IOException {
     Writer writer = null;
     
     if (compressionType == CompressionType.NONE) {
       writer = new Writer(fs, conf, name, keyClass, valClass, progress, new Metadata()); 
     } else if (compressionType == CompressionType.RECORD) {
       writer = new RecordCompressWriter(fs, conf, name, 
-          keyClass, valClass, new DefaultCodec(), progress, new Metadata());
+                                        keyClass, valClass, new DefaultCodec(), progress, new Metadata());
     } else if (compressionType == CompressionType.BLOCK){
       writer = new BlockCompressWriter(fs, conf, name, 
-          keyClass, valClass, new DefaultCodec(), progress, new Metadata());
+                                       keyClass, valClass, new DefaultCodec(), progress, new Metadata());
     }
     
     return writer;
@@ -187,15 +187,15 @@
    * @throws IOException
    */
   public static Writer 
-  createWriter(FileSystem fs, Configuration conf, Path name, 
-      Class keyClass, Class valClass, 
-      CompressionType compressionType, CompressionCodec codec) 
-  throws IOException {
+    createWriter(FileSystem fs, Configuration conf, Path name, 
+                 Class keyClass, Class valClass, 
+                 CompressionType compressionType, CompressionCodec codec) 
+    throws IOException {
     if ((codec instanceof GzipCodec) && 
         !NativeCodeLoader.isNativeCodeLoaded() && 
         !ZlibFactory.isNativeZlibLoaded()) {
       throw new IllegalArgumentException("SequenceFile doesn't work with " +
-          "GzipCodec without native-hadoop code!");
+                                         "GzipCodec without native-hadoop code!");
     }
     
     Writer writer = null;
@@ -204,10 +204,10 @@
       writer = new Writer(fs, conf, name, keyClass, valClass); 
     } else if (compressionType == CompressionType.RECORD) {
       writer = new RecordCompressWriter(fs, conf, name, keyClass, valClass, 
-          codec);
+                                        codec);
     } else if (compressionType == CompressionType.BLOCK){
       writer = new BlockCompressWriter(fs, conf, name, keyClass, valClass, 
-          codec);
+                                       codec);
     }
     
     return writer;
@@ -228,15 +228,15 @@
    * @throws IOException
    */
   public static Writer
-  createWriter(FileSystem fs, Configuration conf, Path name, 
-      Class keyClass, Class valClass, 
-      CompressionType compressionType, CompressionCodec codec,
-      Progressable progress, Metadata metadata) throws IOException {
+    createWriter(FileSystem fs, Configuration conf, Path name, 
+                 Class keyClass, Class valClass, 
+                 CompressionType compressionType, CompressionCodec codec,
+                 Progressable progress, Metadata metadata) throws IOException {
     if ((codec instanceof GzipCodec) && 
         !NativeCodeLoader.isNativeCodeLoaded() && 
         !ZlibFactory.isNativeZlibLoaded()) {
       throw new IllegalArgumentException("SequenceFile doesn't work with " +
-          "GzipCodec without native-hadoop code!");
+                                         "GzipCodec without native-hadoop code!");
     }
     
     Writer writer = null;
@@ -245,10 +245,10 @@
       writer = new Writer(fs, conf, name, keyClass, valClass, progress, metadata);
     } else if (compressionType == CompressionType.RECORD) {
       writer = new RecordCompressWriter(fs, conf, name, 
-          keyClass, valClass, codec, progress, metadata);
+                                        keyClass, valClass, codec, progress, metadata);
     } else if (compressionType == CompressionType.BLOCK){
       writer = new BlockCompressWriter(fs, conf, name, 
-          keyClass, valClass, codec, progress, metadata);
+                                       keyClass, valClass, codec, progress, metadata);
     }
     
     return writer;
@@ -268,12 +268,12 @@
    * @throws IOException
    */
   public static Writer
-  createWriter(FileSystem fs, Configuration conf, Path name, 
-      Class keyClass, Class valClass, 
-      CompressionType compressionType, CompressionCodec codec,
-      Progressable progress) throws IOException {
+    createWriter(FileSystem fs, Configuration conf, Path name, 
+                 Class keyClass, Class valClass, 
+                 CompressionType compressionType, CompressionCodec codec,
+                 Progressable progress) throws IOException {
     Writer writer = createWriter(fs, conf, name, keyClass, valClass, 
-        compressionType, codec, progress, new Metadata());
+                                 compressionType, codec, progress, new Metadata());
     return writer;
   }
 
@@ -289,15 +289,15 @@
    * @throws IOException
    */
   private static Writer
-  createWriter(Configuration conf, FSDataOutputStream out, 
-      Class keyClass, Class valClass, boolean compress, boolean blockCompress,
-      CompressionCodec codec, Metadata metadata)
-  throws IOException {
+    createWriter(Configuration conf, FSDataOutputStream out, 
+                 Class keyClass, Class valClass, boolean compress, boolean blockCompress,
+                 CompressionCodec codec, Metadata metadata)
+    throws IOException {
     if ((codec instanceof GzipCodec) && 
         !NativeCodeLoader.isNativeCodeLoaded() && 
         !ZlibFactory.isNativeZlibLoaded()) {
       throw new IllegalArgumentException("SequenceFile doesn't work with " +
-          "GzipCodec without native-hadoop code!");
+                                         "GzipCodec without native-hadoop code!");
     }
 
     Writer writer = null;
@@ -324,12 +324,12 @@
    * @throws IOException
    */
   private static Writer
-  createWriter(Configuration conf, FSDataOutputStream out, 
-      Class keyClass, Class valClass, boolean compress, boolean blockCompress,
-      CompressionCodec codec)
-  throws IOException {
+    createWriter(Configuration conf, FSDataOutputStream out, 
+                 Class keyClass, Class valClass, boolean compress, boolean blockCompress,
+                 CompressionCodec codec)
+    throws IOException {
     Writer writer = createWriter(conf, out, keyClass, valClass, compress, 
-        blockCompress, codec, new Metadata());
+                                 blockCompress, codec, new Metadata());
     return writer;
   }
 
@@ -347,15 +347,15 @@
    * @throws IOException
    */
   public static Writer
-  createWriter(Configuration conf, FSDataOutputStream out, 
-      Class keyClass, Class valClass, CompressionType compressionType,
-      CompressionCodec codec, Metadata metadata)
-  throws IOException {
+    createWriter(Configuration conf, FSDataOutputStream out, 
+                 Class keyClass, Class valClass, CompressionType compressionType,
+                 CompressionCodec codec, Metadata metadata)
+    throws IOException {
     if ((codec instanceof GzipCodec) && 
         !NativeCodeLoader.isNativeCodeLoaded() && 
         !ZlibFactory.isNativeZlibLoaded()) {
       throw new IllegalArgumentException("SequenceFile doesn't work with " +
-          "GzipCodec without native-hadoop code!");
+                                         "GzipCodec without native-hadoop code!");
     }
 
     Writer writer = null;
@@ -383,12 +383,12 @@
    * @throws IOException
    */
   public static Writer
-  createWriter(Configuration conf, FSDataOutputStream out, 
-      Class keyClass, Class valClass, CompressionType compressionType,
-      CompressionCodec codec)
-  throws IOException {
+    createWriter(Configuration conf, FSDataOutputStream out, 
+                 Class keyClass, Class valClass, CompressionType compressionType,
+                 CompressionCodec codec)
+    throws IOException {
     Writer writer = createWriter(conf, out, keyClass, valClass, compressionType,
-        codec, new Metadata());
+                                 codec, new Metadata());
     return writer;
   }
   
@@ -401,14 +401,14 @@
      * @throws IOException
      */
     public void writeUncompressedBytes(DataOutputStream outStream)
-    throws IOException;
+      throws IOException;
 
     /** Write compressed bytes to outStream. 
      * Note: that it will NOT compress the bytes if they are not compressed.
      * @param outStream : Stream to write compressed bytes into.
      */
     public void writeCompressedBytes(DataOutputStream outStream) 
-    throws IllegalArgumentException, IOException;
+      throws IllegalArgumentException, IOException;
   }
   
   private static class UncompressedBytes implements ValueBytes {
@@ -433,14 +433,14 @@
     }
     
     public void writeUncompressedBytes(DataOutputStream outStream)
-    throws IOException {
+      throws IOException {
       outStream.write(data, 0, dataSize);
     }
 
     public void writeCompressedBytes(DataOutputStream outStream) 
-    throws IllegalArgumentException, IOException {
+      throws IllegalArgumentException, IOException {
       throw 
-      new IllegalArgumentException("UncompressedBytes cannot be compressed!");
+        new IllegalArgumentException("UncompressedBytes cannot be compressed!");
     }
 
   } // UncompressedBytes
@@ -471,7 +471,7 @@
     }
     
     public void writeUncompressedBytes(DataOutputStream outStream)
-    throws IOException {
+      throws IOException {
       if (decompressedStream == null) {
         rawData = new DataInputBuffer();
         decompressedStream = codec.createInputStream(rawData);
@@ -488,7 +488,7 @@
     }
 
     public void writeCompressedBytes(DataOutputStream outStream) 
-    throws IllegalArgumentException, IOException {
+      throws IllegalArgumentException, IOException {
       outStream.write(data, 0, dataSize);
     }
 
@@ -562,10 +562,10 @@
         Map.Entry<Text, Text> en1 = (Map.Entry<Text, Text>)iter1.next();
         Map.Entry<Text, Text> en2 = (Map.Entry<Text, Text>)iter2.next();
         if (!en1.getKey().equals(en2.getKey())) {
-           return false;
+          return false;
         }
         if (!en1.getValue().equals(en2.getValue())) {
-           return false;
+          return false;
         }
       }
       if (iter1.hasNext() || iter2.hasNext()) {
@@ -624,14 +624,14 @@
     
     /** Create the named file. */
     public Writer(FileSystem fs, Configuration conf, Path name, 
-        Class keyClass, Class valClass)
+                  Class keyClass, Class valClass)
       throws IOException {
       this(fs, conf, name, keyClass, valClass, null, new Metadata());
     }
     
     /** Create the named file with write-progress reporter. */
     public Writer(FileSystem fs, Configuration conf, Path name, 
-        Class keyClass, Class valClass, Progressable progress, Metadata metadata)
+                  Class keyClass, Class valClass, Progressable progress, Metadata metadata)
       throws IOException {
       init(name, conf, fs.create(name, progress), keyClass, valClass, false, null, metadata);
       initializeFileHeader();
@@ -641,8 +641,8 @@
     
     /** Write to an arbitrary stream using a specified buffer size. */
     private Writer(Configuration conf, FSDataOutputStream out, 
-        Class keyClass, Class valClass, Metadata metadata)
-    throws IOException {
+                   Class keyClass, Class valClass, Metadata metadata)
+      throws IOException {
       init(null, conf, out, keyClass, valClass, false, null, metadata);
       
       initializeFileHeader();
@@ -652,13 +652,13 @@
 
     /** Write the initial part of file header. */
     void initializeFileHeader() 
-    throws IOException{
+      throws IOException{
       out.write(VERSION);
     }
 
     /** Write the final part of file header. */
     void finalizeFileHeader() 
-    throws IOException{
+      throws IOException{
       out.write(sync);                       // write the sync bytes
       out.flush();                           // flush header
     }
@@ -668,7 +668,7 @@
     
     /** Write and flush the file header. */
     void writeFileHeader() 
-    throws IOException {
+      throws IOException {
       Text.writeString(out, keyClass.getName());
       Text.writeString(out, valClass.getName());
       
@@ -683,9 +683,9 @@
     
     /** Initialize. */
     void init(Path name, Configuration conf, FSDataOutputStream out,
-                      Class keyClass, Class valClass,
-                      boolean compress, CompressionCodec codec, Metadata metadata) 
-    throws IOException {
+              Class keyClass, Class valClass,
+              boolean compress, CompressionCodec codec, Metadata metadata) 
+      throws IOException {
       this.target = name;
       this.conf = conf;
       this.out = out;
@@ -741,10 +741,10 @@
       throws IOException {
       if (key.getClass() != keyClass)
         throw new IOException("wrong key class: "+key.getClass().getName()
-            +" is not "+keyClass);
+                              +" is not "+keyClass);
       if (val.getClass() != valClass)
         throw new IOException("wrong value class: "+val.getClass().getName()
-            +" is not "+valClass);
+                              +" is not "+valClass);
 
       buffer.reset();
 
@@ -772,8 +772,8 @@
     }
 
     public synchronized void appendRaw(
-        byte[] keyData, int keyOffset, int keyLength, ValueBytes val) 
-    throws IOException {
+                                       byte[] keyData, int keyOffset, int keyLength, ValueBytes val) 
+      throws IOException {
       if (keyLength == 0)
         throw new IOException("zero length keys not allowed: " + keyLength);
 
@@ -808,8 +808,8 @@
     
     /** Create the named file. */
     public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, 
-        Class keyClass, Class valClass, CompressionCodec codec) 
-    throws IOException {
+                                Class keyClass, Class valClass, CompressionCodec codec) 
+      throws IOException {
       super.init(name, conf, fs.create(name), keyClass, valClass, true, codec, new Metadata());
       
       initializeFileHeader();
@@ -819,11 +819,11 @@
     
     /** Create the named file with write-progress reporter. */
     public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, 
-        Class keyClass, Class valClass, CompressionCodec codec,
-        Progressable progress, Metadata metadata)
-    throws IOException {
+                                Class keyClass, Class valClass, CompressionCodec codec,
+                                Progressable progress, Metadata metadata)
+      throws IOException {
       super.init(name, conf, fs.create(name, progress), 
-          keyClass, valClass, true, codec, metadata);
+                 keyClass, valClass, true, codec, metadata);
       
       initializeFileHeader();
       writeFileHeader();
@@ -832,15 +832,15 @@
     
     /** Create the named file with write-progress reporter. */
     public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, 
-        Class keyClass, Class valClass, CompressionCodec codec,
-        Progressable progress)
-    throws IOException {
+                                Class keyClass, Class valClass, CompressionCodec codec,
+                                Progressable progress)
+      throws IOException {
       this(fs, conf, name, keyClass, valClass, codec, progress, new Metadata());
     }
     
     /** Write to an arbitrary stream using a specified buffer size. */
     private RecordCompressWriter(Configuration conf, FSDataOutputStream out,
-                   Class keyClass, Class valClass, CompressionCodec codec, Metadata metadata)
+                                 Class keyClass, Class valClass, CompressionCodec codec, Metadata metadata)
       throws IOException {
       super.init(null, conf, out, keyClass, valClass, true, codec, metadata);
       
@@ -858,10 +858,10 @@
       throws IOException {
       if (key.getClass() != keyClass)
         throw new IOException("wrong key class: "+key.getClass().getName()
-            +" is not "+keyClass);
+                              +" is not "+keyClass);
       if (val.getClass() != valClass)
         throw new IOException("wrong value class: "+val.getClass().getName()
-            +" is not "+valClass);
+                              +" is not "+valClass);
 
       buffer.reset();
 
@@ -886,9 +886,9 @@
 
     /** Append a key/value pair. */
     public synchronized void appendRaw(
-        byte[] keyData, int keyOffset, int keyLength,
-        ValueBytes val
-        ) throws IOException {
+                                       byte[] keyData, int keyOffset, int keyLength,
+                                       ValueBytes val
+                                       ) throws IOException {
 
       if (keyLength == 0)
         throw new IOException("zero length keys not allowed");
@@ -924,8 +924,8 @@
     
     /** Create the named file. */
     public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, 
-        Class keyClass, Class valClass, CompressionCodec codec) 
-    throws IOException {
+                               Class keyClass, Class valClass, CompressionCodec codec) 
+      throws IOException {
       super.init(name, conf, fs.create(name), keyClass, valClass, true, codec, new Metadata());
       init(conf.getInt("io.seqfile.compress.blocksize", 1000000));
       
@@ -936,11 +936,11 @@
     
     /** Create the named file with write-progress reporter. */
     public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, 
-        Class keyClass, Class valClass, CompressionCodec codec,
-        Progressable progress, Metadata metadata)
-    throws IOException {
+                               Class keyClass, Class valClass, CompressionCodec codec,
+                               Progressable progress, Metadata metadata)
+      throws IOException {
       super.init(name, conf, fs.create(name, progress), keyClass, valClass, 
-          true, codec, metadata);
+                 true, codec, metadata);
       init(conf.getInt("io.seqfile.compress.blocksize", 1000000));
       
       initializeFileHeader();
@@ -950,15 +950,15 @@
     
     /** Create the named file with write-progress reporter. */
     public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, 
-        Class keyClass, Class valClass, CompressionCodec codec,
-        Progressable progress)
-    throws IOException {
+                               Class keyClass, Class valClass, CompressionCodec codec,
+                               Progressable progress)
+      throws IOException {
       this(fs, conf, name, keyClass, valClass, codec, progress, new Metadata());
     }
     
     /** Write to an arbitrary stream using a specified buffer size. */
     private BlockCompressWriter(Configuration conf, FSDataOutputStream out,
-                   Class keyClass, Class valClass, CompressionCodec codec, Metadata metadata)
+                                Class keyClass, Class valClass, CompressionCodec codec, Metadata metadata)
       throws IOException {
       super.init(null, conf, out, keyClass, valClass, true, codec, metadata);
       init(1000000);
@@ -978,12 +978,12 @@
     
     /** Workhorse to check and write out compressed data/lengths */
     private synchronized 
-    void writeBuffer(DataOutputBuffer uncompressedDataBuffer) 
-    throws IOException {
+      void writeBuffer(DataOutputBuffer uncompressedDataBuffer) 
+      throws IOException {
       deflateFilter.resetState();
       buffer.reset();
       deflateOut.write(uncompressedDataBuffer.getData(), 0, 
-          uncompressedDataBuffer.getLength());
+                       uncompressedDataBuffer.getLength());
       deflateOut.flush();
       deflateFilter.finish();
       
@@ -1070,9 +1070,9 @@
     
     /** Append a key/value pair. */
     public synchronized void appendRaw(
-        byte[] keyData, int keyOffset, int keyLength,
-        ValueBytes val
-        ) throws IOException {
+                                       byte[] keyData, int keyOffset, int keyLength,
+                                       ValueBytes val
+                                       ) throws IOException {
       
       if (keyLength == 0)
         throw new IOException("zero length keys not allowed");
@@ -1218,7 +1218,7 @@
           try {
             Class codecClass = conf.getClassByName(codecClassname);
             this.codec = (CompressionCodec)
-                 ReflectionUtils.newInstance(codecClass, conf);
+              ReflectionUtils.newInstance(codecClass, conf);
           } catch (ClassNotFoundException cnfe) {
             throw new IllegalArgumentException("Unknown codec: " + 
                                                codecClassname, cnfe);
@@ -1293,7 +1293,7 @@
     
     /** Read a compressed buffer */
     private synchronized void readBuffer(DataInputBuffer buffer, 
-        CompressionInputStream filter) throws IOException {
+                                         CompressionInputStream filter) throws IOException {
       // Read data into a temporary buffer
       DataOutputBuffer dataBuffer = new DataOutputBuffer();
       int dataBufferLength = WritableUtils.readVInt(in);
@@ -1378,7 +1378,7 @@
         if (skipValBytes > 0) {
           if (valIn.skipBytes(skipValBytes) != skipValBytes) {
             throw new IOException("Failed to seek to " + currentKey + 
-                "(th) value!");
+                                  "(th) value!");
           }
         }
       }
@@ -1390,7 +1390,7 @@
      * @throws IOException
      */
     public synchronized void getCurrentValue(Writable val) 
-    throws IOException {
+      throws IOException {
       if (val instanceof Configurable) {
         ((Configurable) val).setConf(this.conf);
       }
@@ -1404,8 +1404,8 @@
         if (valIn.read() > 0) {
           LOG.info("available bytes: " + valIn.available());
           throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
-              + " bytes, should read " +
-              (valBuffer.getLength()-keyLength));
+                                + " bytes, should read " +
+                                (valBuffer.getLength()-keyLength));
         }
       } else {
         // Get the value
@@ -1428,7 +1428,7 @@
     public synchronized boolean next(Writable key) throws IOException {
       if (key.getClass() != keyClass)
         throw new IOException("wrong key class: "+key.getClass().getName()
-            +" is not "+keyClass);
+                              +" is not "+keyClass);
 
       if (!blockCompressed) {
         outBuf.reset();
@@ -1443,7 +1443,7 @@
         valBuffer.mark(0);
         if (valBuffer.getPosition() != keyLength)
           throw new IOException(key + " read " + valBuffer.getPosition()
-              + " bytes, should read " + keyLength);
+                                + " bytes, should read " + keyLength);
       } else {
         //Reset syncSeen
         syncSeen = false;
@@ -1489,7 +1489,7 @@
     }
     
     private synchronized int checkAndReadSync(int length) 
-    throws IOException {
+      throws IOException {
       if (version > 1 && sync != null &&
           length == SYNC_ESCAPE) {              // process a sync entry
         //LOG.info("sync@"+in.getPos());
@@ -1514,7 +1514,7 @@
       // Unsupported for block-compressed sequence files
       if (blockCompressed) {
         throw new IOException("Unsupported call for block-compressed" +
-            " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
+                              " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
       }
       if (in.getPos() >= end)
         return -1;
@@ -1548,7 +1548,7 @@
      * @throws IOException
      */
     public int nextRaw(DataOutputBuffer key, ValueBytes val) 
-    throws IOException {
+      throws IOException {
       if (!blockCompressed) {
         if (in.getPos() >= end) 
           return -1;
@@ -1607,7 +1607,7 @@
      * @throws IOException
      */
     public int nextRawKey(DataOutputBuffer key) 
-    throws IOException {
+      throws IOException {
       if (!blockCompressed) {
         if (in.getPos() >= end) 
           return -1;
@@ -1650,7 +1650,7 @@
      * @throws IOException
      */
     public int nextRawValue(ValueBytes val) 
-    throws IOException {
+      throws IOException {
       
       // Position stream to current value
       seekToCurrentValue();
@@ -1776,7 +1776,7 @@
 
     /** Sort and merge using an arbitrary {@link WritableComparator}. */
     public Sorter(FileSystem fs, WritableComparator comparator, Class valClass, 
-        Configuration conf) {
+                  Configuration conf) {
       this.fs = fs;
       this.comparator = comparator;
       this.keyClass = comparator.getKeyClass();
@@ -1827,7 +1827,7 @@
      * @return iterator the RawKeyValueIterator
      */
     public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir, 
-                                    boolean deleteInput) throws IOException {
+                                              boolean deleteInput) throws IOException {
       Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2");
       if (fs.exists(outFile)) {
         throw new IOException("already exists: " + outFile);
@@ -1915,15 +1915,15 @@
           int bytesProcessed = 0;
           rawKeys.reset();
           while (!atEof && 
-              bytesProcessed < memoryLimit && count < recordLimit) {
+                 bytesProcessed < memoryLimit && count < recordLimit) {
 
             // Read a record into buffer
             // Note: Attempt to re-use 'rawValue' as far as possible
             int keyOffset = rawKeys.getLength();       
             ValueBytes rawValue = 
               (count == keyOffsets.length || rawValues[count] == null) ? 
-                  in.createValueBytes() : 
-                  rawValues[count];
+              in.createValueBytes() : 
+              rawValues[count];
             int recordLength = in.nextRaw(rawKeys, rawValue);
             if (recordLength == -1) {
               in.close();
@@ -1959,7 +1959,7 @@
           rawBuffer = rawKeys.getData();
           sort(count);
           flush(count, bytesProcessed, isCompressed, isBlockCompressed, codec, 
-              segments==0 && atEof);
+                segments==0 && atEof);
           segments++;
         }
         return segments;
@@ -2002,8 +2002,8 @@
       }
 
       private void flush(int count, int bytesProcessed, boolean isCompressed, 
-          boolean isBlockCompressed, CompressionCodec codec, boolean done) 
-      throws IOException {
+                         boolean isBlockCompressed, CompressionCodec codec, boolean done) 
+        throws IOException {
         if (out == null) {
           outName = done ? outFile : outFile.suffix(".0");
           out = fs.create(outName);
@@ -2014,7 +2014,7 @@
 
         long segmentStart = out.getPos();
         Writer writer = createWriter(conf, out, keyClass, valClass, 
-            isCompressed, isBlockCompressed, codec);
+                                     isCompressed, isBlockCompressed, codec);
         
         if (!done) {
           writer.sync = null;                     // disable sync on temp files
@@ -2076,16 +2076,16 @@
       Progress getProgress();
     }    
     
-  /**
-   * Merges the list of segments of type <code>SegmentDescriptor</code>
-   * @param segments the list of SegmentDescriptors
+    /**
+     * Merges the list of segments of type <code>SegmentDescriptor</code>
+     * @param segments the list of SegmentDescriptors
      * @param tmpDir the directory to write temporary files into
-   * @return RawKeyValueIterator
-   * @throws IOException
-   */
+     * @return RawKeyValueIterator
+     * @throws IOException
+     */
     public RawKeyValueIterator merge(List <SegmentDescriptor> segments, 
                                      Path tmpDir) 
-    throws IOException {
+      throws IOException {
       MergeQueue mQueue = new MergeQueue(segments, tmpDir);
       return mQueue.merge();
     }
@@ -2102,7 +2102,7 @@
      */
     public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
                                      Path tmpDir) 
-    throws IOException {
+      throws IOException {
       return merge(inNames, deleteInputs, 
                    (inNames.length < factor) ? inNames.length : factor,
                    tmpDir);
@@ -2120,12 +2120,12 @@
      */
     public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
                                      int factor, Path tmpDir) 
-    throws IOException {
+      throws IOException {
       //get the segments from inNames
       ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
       for (int i = 0; i < inNames.length; i++) {
         SegmentDescriptor s = new SegmentDescriptor(0, 
-                              fs.getLength(inNames[i]), inNames[i]);
+                                                    fs.getLength(inNames[i]), inNames[i]);
         s.preserveInput(!deleteInputs);
         s.doSync();
         a.add(s);
@@ -2146,7 +2146,7 @@
      */
     public RawKeyValueIterator merge(Path [] inNames, Path tempDir, 
                                      boolean deleteInputs) 
-    throws IOException {
+      throws IOException {
       //outFile will basically be used as prefix for temp files for the
       //intermediate merge outputs           
       this.outFile = new Path(tempDir + Path.SEPARATOR + "merged");
@@ -2154,7 +2154,7 @@
       ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
       for (int i = 0; i < inNames.length; i++) {
         SegmentDescriptor s = new SegmentDescriptor(0, 
-                              fs.getLength(inNames[i]), inNames[i]);
+                                                    fs.getLength(inNames[i]), inNames[i]);
         s.preserveInput(!deleteInputs);
         s.doSync();
         a.add(s);
@@ -2177,8 +2177,8 @@
      * @deprecated call  #cloneFileAttributes(Path,Path,Progressable) instead
      */
     public Writer cloneFileAttributes(FileSystem ignoredFileSys,
-                  Path inputFile, Path outputFile, Progressable prog) 
-    throws IOException {
+                                      Path inputFile, Path outputFile, Progressable prog) 
+      throws IOException {
       return cloneFileAttributes(inputFile, outputFile, prog);
     }
 
@@ -2193,7 +2193,7 @@
      * @throws IOException
      */
     public Writer cloneFileAttributes(Path inputFile, Path outputFile, 
-                  Progressable prog) throws IOException {
+                                      Progressable prog) throws IOException {
       FileSystem srcFileSys = inputFile.getFileSystem(conf);
       Reader reader = new Reader(srcFileSys, inputFile, 4096, conf);
       boolean compress = reader.isCompressed();
@@ -2205,12 +2205,12 @@
       FSDataOutputStream out;
       if (prog != null)
         out = dstFileSys.create(outputFile, true, 
-            conf.getInt("io.file.buffer.size", 4096), prog);
+                                conf.getInt("io.file.buffer.size", 4096), prog);
       else
         out = dstFileSys.create(outputFile, true, 
-            conf.getInt("io.file.buffer.size", 4096));
+                                conf.getInt("io.file.buffer.size", 4096));
       Writer writer = createWriter(conf, out, keyClass, valClass, compress, 
-                          blockCompress, codec);
+                                   blockCompress, codec);
       return writer;
     }
 
@@ -2222,7 +2222,7 @@
      * @throws IOException
      */
     public void writeFile(RawKeyValueIterator records, Writer writer) 
-    throws IOException {
+      throws IOException {
       while(records.next()) {
         writer.appendRaw(records.getKey().getData(), 0, 
                          records.getKey().getLength(), records.getValue());
@@ -2251,7 +2251,7 @@
     private int mergePass(Path tmpDir) throws IOException {
       LOG.debug("running merge pass");
       Writer writer = cloneFileAttributes(
-              outFile.suffix(".0"), outFile, null);
+                                          outFile.suffix(".0"), outFile, null);
       RawKeyValueIterator r = merge(outFile.suffix(".0"), 
                                     outFile.suffix(".0.index"), tmpDir);
       writeFile(r, writer);
@@ -2268,7 +2268,7 @@
      * @throws IOException
      */
     private RawKeyValueIterator merge(Path inName, Path indexIn, Path tmpDir) 
-    throws IOException {
+      throws IOException {
       //get the segments from indexIn
       //we create a SegmentContainer so that we can track segments belonging to
       //inName and delete inName as soon as we see that we have looked at all
@@ -2281,7 +2281,7 @@
     
     /** This class implements the core of the merge logic */
     private class MergeQueue extends PriorityQueue 
-    implements RawKeyValueIterator {
+      implements RawKeyValueIterator {
       private boolean compress;
       private boolean blockCompress;
       private DataOutputBuffer rawKey = new DataOutputBuffer();
@@ -2300,7 +2300,7 @@
           compress = stream.in.isCompressed();
           blockCompress = stream.in.isBlockCompressed();
         } else if (compress != stream.in.isCompressed() || 
-            blockCompress != stream.in.isBlockCompressed()) {
+                   blockCompress != stream.in.isBlockCompressed()) {
           throw new IOException("All merged files must be compressed or not.");
         } 
         super.put(stream);
@@ -2323,8 +2323,8 @@
         SegmentDescriptor msa = (SegmentDescriptor)a;
         SegmentDescriptor msb = (SegmentDescriptor)b;
         return comparator.compare(msa.getKey().getData(), 0, 
-            msa.getKey().getLength(), msb.getKey().getData(), 0, 
-            msb.getKey().getLength()) < 0;
+                                  msa.getKey().getLength(), msb.getKey().getData(), 0, 
+                                  msb.getKey().getLength()) < 0;
       }
       public void close() throws IOException {
         SegmentDescriptor ms;                           // close inputs
@@ -2436,8 +2436,8 @@
                                                 tmpFilename.toString());
             LOG.info("writing intermediate results to " + outputFile);
             Writer writer = cloneFileAttributes(
-                      fs.makeQualified(segmentsToMerge.get(0).segmentPathName), 
-                      fs.makeQualified(outputFile), null);
+                                                fs.makeQualified(segmentsToMerge.get(0).segmentPathName), 
+                                                fs.makeQualified(outputFile), null);
             writer.sync = null; //disable sync for temp files
             writeFile(this, writer);
             writer.close();
@@ -2447,7 +2447,7 @@
             this.close();
             
             SegmentDescriptor tempSegment = 
-                 new SegmentDescriptor(0, fs.getLength(outputFile), outputFile);
+              new SegmentDescriptor(0, fs.getLength(outputFile), outputFile);
             //put the segment back in the TreeMap
             sortedSegmentSizes.put(tempSegment, null);
             numSegments = sortedSegmentSizes.size();
@@ -2476,7 +2476,7 @@
         if (numDescriptors > sortedSegmentSizes.size())
           numDescriptors = sortedSegmentSizes.size();
         SegmentDescriptor[] SegmentDescriptors = 
-                                   new SegmentDescriptor[numDescriptors];
+          new SegmentDescriptor[numDescriptors];
         Iterator iter = sortedSegmentSizes.keySet().iterator();
         int i = 0;
         while (i < numDescriptors) {
@@ -2507,7 +2507,7 @@
        * @param segmentPathName the path name of the file containing the segment
        */
       public SegmentDescriptor (long segmentOffset, long segmentLength, 
-              Path segmentPathName) {
+                                Path segmentPathName) {
         this.segmentOffset = segmentOffset;
         this.segmentLength = segmentLength;
         this.segmentPathName = segmentPathName;
@@ -2534,7 +2534,7 @@
           return (this.segmentOffset < that.segmentOffset ? -1 : 1);
         }
         return (this.segmentPathName.toString()).
-                compareTo(that.segmentPathName.toString());
+          compareTo(that.segmentPathName.toString());
       }
 
       /** Fills up the rawKey object with the key returned by the Reader
@@ -2543,25 +2543,25 @@
        */
       public boolean nextRawKey() throws IOException {
         if (in == null) {
-        int bufferSize = conf.getInt("io.file.buffer.size", 4096); 
-        if (fs.getUri().getScheme().startsWith("ramfs")) {
-          bufferSize = conf.getInt("io.bytes.per.checksum", 512);
-        }
-        Reader reader = new Reader(fs, segmentPathName, 
-            bufferSize, segmentOffset, 
-                segmentLength, conf);
-        
-        //sometimes we ignore syncs especially for temp merge files
-        if (ignoreSync) reader.sync = null;
-
-        if (reader.keyClass != keyClass)
-          throw new IOException("wrong key class: " + reader.getKeyClass() +
-                                " is not " + keyClass);
-        if (reader.valClass != valClass)
-          throw new IOException("wrong value class: "+reader.getValueClass()+
-                                " is not " + valClass);
-        this.in = reader;
-        rawKey = new DataOutputBuffer();
+          int bufferSize = conf.getInt("io.file.buffer.size", 4096); 
+          if (fs.getUri().getScheme().startsWith("ramfs")) {
+            bufferSize = conf.getInt("io.bytes.per.checksum", 512);
+          }
+          Reader reader = new Reader(fs, segmentPathName, 
+                                     bufferSize, segmentOffset, 
+                                     segmentLength, conf);
+        
+          //sometimes we ignore syncs especially for temp merge files
+          if (ignoreSync) reader.sync = null;
+
+          if (reader.keyClass != keyClass)
+            throw new IOException("wrong key class: " + reader.getKeyClass() +
+                                  " is not " + keyClass);
+          if (reader.valClass != valClass)
+            throw new IOException("wrong value class: "+reader.getValueClass()+
+                                  " is not " + valClass);
+          this.in = reader;
+          rawKey = new DataOutputBuffer();
         }
         rawKey.reset();
         int keyLength = 
@@ -2616,7 +2616,7 @@
        * @param parent the parent SegmentContainer that holds the segment
        */
       public LinkedSegmentsDescriptor (long segmentOffset, long segmentLength, 
-              Path segmentPathName, SegmentContainer parent) {
+                                       Path segmentPathName, SegmentContainer parent) {
         super(segmentOffset, segmentLength, segmentPathName);
         this.parentContainer = parent;
       }
@@ -2640,7 +2640,7 @@
       
       //the list of segments read from the file
       private ArrayList <SegmentDescriptor> segments = 
-                                   new ArrayList <SegmentDescriptor>();
+        new ArrayList <SegmentDescriptor>();
       /** This constructor is there primarily to serve the sort routine that 
        * generates a single output file with an associated index file */
       public SegmentContainer(Path inName, Path indexIn) throws IOException {
@@ -2652,7 +2652,7 @@
           long segmentLength = WritableUtils.readVLong(fsIndexIn);
           Path segmentName = inName;
           segments.add(new LinkedSegmentsDescriptor(segmentOffset, 
-                                 segmentLength, segmentName, this));
+                                                    segmentLength, segmentName, this));
         }
         fsIndexIn.close();
         fs.delete(indexIn);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/Text.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/Text.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/Text.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/Text.java Mon Apr 16 14:44:35 2007
@@ -308,8 +308,8 @@
   }
   
   public static String decode(byte[] utf8, int start, int length) 
-      throws CharacterCodingException {
-      return decode(ByteBuffer.wrap(utf8, start, length), true);
+    throws CharacterCodingException {
+    return decode(ByteBuffer.wrap(utf8, start, length), true);
   }
   
   /**
@@ -414,7 +414,7 @@
    * @throws MalformedInputException if the byte array contains invalid utf-8
    */
   public static void validateUTF8(byte[] utf8) throws MalformedInputException {
-     validateUTF8(utf8, 0, utf8.length);     
+    validateUTF8(utf8, 0, utf8.length);     
   }
   
   /**