You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:06:08 UTC

svn commit: r885143 [7/18] - in /hadoop/hdfs/branches/HDFS-326: ./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/ src/ant/org/apache/hadoop/ant/ src/ant/org/apache/hadoop/ant/condition/ src/c++/ src/c++/libhdfs/ src/c++/libhdfs/docs/...

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/DFSClient.java Sat Nov 28 20:05:56 2009
@@ -1,3 +1,4 @@
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -36,8 +37,6 @@
 import java.nio.ByteBuffer;
 import java.util.AbstractMap;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -65,12 +64,15 @@
 import org.apache.hadoop.fs.FSInputChecker;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FSOutputSummer;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.FsStatus;
 import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Syncable;
+import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -84,6 +86,9 @@
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
+import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.security.InvalidAccessTokenException;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -103,8 +108,6 @@
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.AccessToken;
-import org.apache.hadoop.security.InvalidAccessTokenException;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
@@ -125,12 +128,15 @@
  ********************************************************/
 public class DFSClient implements FSConstants, java.io.Closeable {
   public static final Log LOG = LogFactory.getLog(DFSClient.class);
+  public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour
   public static final int MAX_BLOCK_ACQUIRE_FAILURES = 3;
   private static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
   private final ClientProtocol namenode;
   private final ClientProtocol rpcNamenode;
   final UnixUserGroupInformation ugi;
   volatile boolean clientRunning = true;
+  private volatile FsServerDefaults serverDefaults;
+  private volatile long serverDefaultsLastUpdate;
   Random r = new Random();
   final String clientName;
   final LeaseChecker leasechecker = new LeaseChecker();
@@ -245,13 +251,14 @@
     throws IOException {
     this.conf = conf;
     this.stats = stats;
-    this.socketTimeout = conf.getInt("dfs.socket.timeout", 
+    this.socketTimeout = conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 
                                      HdfsConstants.READ_TIMEOUT);
     this.datanodeWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout",
                                             HdfsConstants.WRITE_TIMEOUT);
     this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
     // dfs.write.packet.size is an internal config variable
-    this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
+    this.writePacketSize = conf.getInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 
+		                       DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
     this.maxBlockAcquireFailures = 
                           conf.getInt("dfs.client.max.block.acquire.failures",
                                       MAX_BLOCK_ACQUIRE_FAILURES);
@@ -270,7 +277,7 @@
     } else {
       this.clientName = "DFSClient_" + r.nextInt();
     }
-    defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+    defaultBlockSize = conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
     defaultReplication = (short) conf.getInt("dfs.replication", 3);
 
     if (nameNodeAddr != null && rpcNamenode == null) {
@@ -337,6 +344,18 @@
   }
 
   /**
+   * Get server default values for a number of configuration params.
+   */
+  public FsServerDefaults getServerDefaults() throws IOException {
+    long now = System.currentTimeMillis();
+    if (now - serverDefaultsLastUpdate > SERVER_DEFAULTS_VALIDITY_PERIOD) {
+      serverDefaults = namenode.getServerDefaults();
+      serverDefaultsLastUpdate = now;
+    }
+    return serverDefaults;
+  }
+
+  /**
    * Report corrupt blocks that were discovered by the client.
    */
   public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
@@ -408,10 +427,24 @@
    * namenode, and then reads from all the right places.  Creates
    * inner subclass of InputStream that does the right out-of-band
    * work.
+   * @deprecated Use {@link #open(String, int, boolean)} instead.
    */
-  DFSInputStream open(String src, int buffersize, boolean verifyChecksum,
+  @Deprecated
+  public DFSInputStream open(String src, int buffersize, boolean verifyChecksum,
                       FileSystem.Statistics stats
       ) throws IOException {
+    return open(src, buffersize, verifyChecksum);
+  }
+  
+
+  /**
+   * Create an input stream that obtains a nodelist from the
+   * namenode, and then reads from all the right places.  Creates
+   * inner subclass of InputStream that does the right out-of-band
+   * work.
+   */
+  public DFSInputStream open(String src, int buffersize, boolean verifyChecksum
+      ) throws IOException {
     checkOpen();
     //    Get block info from namenode
     return new DFSInputStream(src, buffersize, verifyChecksum);
@@ -513,6 +546,23 @@
   }
 
   /**
+   * Call
+   * {@link #create(String,FsPermission,EnumSet,boolean,short,long,Progressable,int)}
+   * with createParent set to true.
+   */
+  public OutputStream create(String src, 
+      FsPermission permission,
+      EnumSet<CreateFlag> flag, 
+      short replication,
+      long blockSize,
+      Progressable progress,
+      int buffersize
+      ) throws IOException {
+    return create(src, permission, flag, true,
+        replication, blockSize, progress, buffersize);
+  }
+
+  /**
    * Create a new dfs file with the specified block replication 
    * with write-progress reporting and return an output stream for writing
    * into the file.  
@@ -521,14 +571,16 @@
    * @param permission The permission of the directory being created.
    * If permission == null, use {@link FsPermission#getDefault()}.
    * @param flag do not check for file existence if true
+   * @param createParent create missing parent directory if true
    * @param replication block replication
    * @return output stream
    * @throws IOException
-   * @see ClientProtocol#create(String, FsPermission, String, EnumSetWritable, short, long)
+   * @see ClientProtocol#create(String, FsPermission, String, EnumSetWritable, boolean, short, long)
    */
   public OutputStream create(String src, 
                              FsPermission permission,
                              EnumSet<CreateFlag> flag, 
+                             boolean createParent,
                              short replication,
                              long blockSize,
                              Progressable progress,
@@ -541,11 +593,36 @@
     FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
     LOG.debug(src + ": masked=" + masked);
     OutputStream result = new DFSOutputStream(src, masked,
-        flag, replication, blockSize, progress, buffersize,
-        conf.getInt("io.bytes.per.checksum", 512));
+        flag, createParent, replication, blockSize, progress, buffersize,
+        conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 
+                    DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT));
     leasechecker.put(src, result);
     return result;
   }
+  
+  /**
+   * Same as {{@link #create(String, FsPermission, EnumSet, short, long,
+   *  Progressable, int)}   except that the permission
+   *   is absolute (ie has already been masked with umask.
+   * 
+   */
+  public OutputStream primitiveCreate(String src, 
+                             FsPermission absPermission,
+                             EnumSet<CreateFlag> flag,
+                             boolean createParent,
+                             short replication,
+                             long blockSize,
+                             Progressable progress,
+                             int buffersize,
+                             int bytesPerChecksum)
+    throws IOException {
+    checkOpen();
+    OutputStream result = new DFSOutputStream(src, absPermission,
+        flag, createParent, replication, blockSize, progress, buffersize,
+        bytesPerChecksum);
+    leasechecker.put(src, result);
+    return result;
+  } 
 
   /**
    * Append to an existing HDFS file.  
@@ -572,7 +649,8 @@
                                      DSQuotaExceededException.class);
     }
     OutputStream result = new DFSOutputStream(src, buffersize, progress,
-        lastBlock, stat, conf.getInt("io.bytes.per.checksum", 512));
+        lastBlock, stat, conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 
+                                     DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT));
     leasechecker.put(src, result);
     return result;
   }
@@ -599,8 +677,10 @@
 
   /**
    * Rename file or directory.
-   * See {@link ClientProtocol#rename(String, String)}. 
+   * See {@link ClientProtocol#rename(String, String)}.
+   * @deprecated Use {@link #rename(String, String, Options.Rename...)} instead.
    */
+  @Deprecated
   public boolean rename(String src, String dst) throws IOException {
     checkOpen();
     try {
@@ -613,6 +693,34 @@
   }
 
   /**
+   * Move blocks from src to trg and delete src
+   * See {@link ClientProtocol#concat(String, String [])}. 
+   */
+  public void concat(String trg, String [] srcs) throws IOException {
+    checkOpen();
+    try {
+      namenode.concat(trg, srcs);
+    } catch(RemoteException re) {
+      throw re.unwrapRemoteException(AccessControlException.class,
+                                     NSQuotaExceededException.class,
+                                     DSQuotaExceededException.class);
+    }
+  }
+  /**
+   * Rename file or directory.
+   * See {@link ClientProtocol#rename(String, String, Options.Rename...)}
+   */
+  public void rename(String src, String dst, Options.Rename... options) throws IOException {
+    checkOpen();
+    try {
+      namenode.rename(src, dst, options);
+    } catch(RemoteException re) {
+      throw re.unwrapRemoteException(AccessControlException.class,
+                                     NSQuotaExceededException.class,
+                                     DSQuotaExceededException.class);
+    }
+  }
+  /**
    * Delete file or directory.
    * See {@link ClientProtocol#delete(String)}. 
    */
@@ -667,7 +775,7 @@
    * @return The checksum 
    * @see DistributedFileSystem#getFileChecksum(Path)
    */
-  MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException {
+  public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException {
     checkOpen();
     return getFileChecksum(src, namenode, socketFactory, socketTimeout);    
   }
@@ -941,8 +1049,9 @@
 
   /**
    */
+  @Deprecated
   public boolean mkdirs(String src) throws IOException {
-    return mkdirs(src, null);
+    return mkdirs(src, null, true);
   }
 
   /**
@@ -952,10 +1061,11 @@
    * @param src The path of the directory being created
    * @param permission The permission of the directory being created.
    * If permission == null, use {@link FsPermission#getDefault()}.
+   * @param createParent create missing parent directory if true
    * @return True if the operation success.
-   * @see ClientProtocol#mkdirs(String, FsPermission)
+   * @see ClientProtocol#mkdirs(String, FsPermission, boolean)
    */
-  public boolean mkdirs(String src, FsPermission permission)throws IOException{
+  public boolean mkdirs(String src, FsPermission permission, boolean createParent)throws IOException{
     checkOpen();
     if (permission == null) {
       permission = FsPermission.getDefault();
@@ -963,7 +1073,31 @@
     FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
     LOG.debug(src + ": masked=" + masked);
     try {
-      return namenode.mkdirs(src, masked);
+      return namenode.mkdirs(src, masked, createParent);
+    } catch(RemoteException re) {
+      throw re.unwrapRemoteException(AccessControlException.class,
+                                     NSQuotaExceededException.class,
+                                     DSQuotaExceededException.class,
+                                     FileNotFoundException.class,
+                                     FileAlreadyExistsException.class);
+    }
+  }
+  
+  /**
+   * Same {{@link #mkdirs(String, FsPermission, boolean)} except
+   * that the permissions has already been masked against umask.
+   */
+  public boolean primitiveMkdir(String src, FsPermission absPermission)
+    throws IOException{
+    checkOpen();
+    if (absPermission == null) {
+      absPermission = 
+        FsPermission.getDefault().applyUMask(FsPermission.getUMask(conf));
+    } 
+
+    LOG.debug(src + ": masked=" + absPermission);
+    try {
+      return namenode.mkdirs(src, absPermission, true);
     } catch(RemoteException re) {
       throw re.unwrapRemoteException(AccessControlException.class,
                                      NSQuotaExceededException.class,
@@ -1081,10 +1215,17 @@
       }
     }
 
-    synchronized void close() {
-      while (!pendingCreates.isEmpty()) {
-        String src = pendingCreates.firstKey();
-        OutputStream out = pendingCreates.remove(src);
+    void close() {
+      while (true) {
+        String src;
+        OutputStream out;
+        synchronized (this) {
+          if (pendingCreates.isEmpty()) {
+            return;
+          }
+          src = pendingCreates.firstKey();
+          out = pendingCreates.remove(src);
+        }
         if (out != null) {
           try {
             out.close();
@@ -1349,8 +1490,8 @@
         int dataLen = in.readInt();
       
         // Sanity check the lengths
-        if ( dataLen < 0 || 
-             ( (dataLen % bytesPerChecksum) != 0 && !lastPacketInBlock ) ||
+        if ( ( dataLen <= 0 && !lastPacketInBlock ) ||
+             ( dataLen != 0 && lastPacketInBlock) ||
              (seqno != (lastSeqNo + 1)) ) {
              throw new IOException("BlockReader: error in packet header" +
                                    "(chunkOffset : " + chunkOffset + 
@@ -1414,7 +1555,7 @@
       checksumSize = this.checksum.getChecksumSize();
     }
 
-    public static BlockReader newBlockReader(Socket sock, String file, long blockId, AccessToken accessToken, 
+    public static BlockReader newBlockReader(Socket sock, String file, long blockId, BlockAccessToken accessToken, 
         long genStamp, long startOffset, long len, int bufferSize) throws IOException {
       return newBlockReader(sock, file, blockId, accessToken, genStamp, startOffset, len, bufferSize,
           true);
@@ -1422,7 +1563,7 @@
 
     /** Java Doc required */
     public static BlockReader newBlockReader( Socket sock, String file, long blockId, 
-                                       AccessToken accessToken,
+                                       BlockAccessToken accessToken,
                                        long genStamp,
                                        long startOffset, long len,
                                        int bufferSize, boolean verifyChecksum)
@@ -1433,7 +1574,7 @@
 
     public static BlockReader newBlockReader( Socket sock, String file,
                                        long blockId, 
-                                       AccessToken accessToken,
+                                       BlockAccessToken accessToken,
                                        long genStamp,
                                        long startOffset, long len,
                                        int bufferSize, boolean verifyChecksum,
@@ -1529,6 +1670,7 @@
     private BlockReader blockReader = null;
     private boolean verifyChecksum;
     private LocatedBlocks locatedBlocks = null;
+    private long lastBlockBeingWrittenLength = 0;
     private DatanodeInfo currentNode = null;
     private Block currentBlock = null;
     private long pos = 0;
@@ -1552,7 +1694,7 @@
       this.verifyChecksum = verifyChecksum;
       this.buffersize = buffersize;
       this.src = src;
-      prefetchSize = conf.getLong("dfs.read.prefetch.size", prefetchSize);
+      prefetchSize = conf.getLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, prefetchSize);
       openInfo();
     }
 
@@ -1561,6 +1703,9 @@
      */
     synchronized void openInfo() throws IOException {
       LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("newInfo = " + newInfo);
+      }
       if (newInfo == null) {
         throw new IOException("Cannot open filename " + src);
       }
@@ -1575,11 +1720,46 @@
         }
       }
       this.locatedBlocks = newInfo;
+      this.lastBlockBeingWrittenLength = 0;
+      if (!locatedBlocks.isLastBlockComplete()) {
+        final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
+        if (last != null) {
+          final long len = readBlockLength(last);
+          last.getBlock().setNumBytes(len);
+          this.lastBlockBeingWrittenLength = len; 
+        }
+      }
+
       this.currentNode = null;
     }
+
+    /** Read the block length from one of the datanodes. */
+    private long readBlockLength(LocatedBlock locatedblock) throws IOException {
+      if (locatedblock == null || locatedblock.getLocations().length == 0) {
+        return 0;
+      }
+      for(DatanodeInfo datanode : locatedblock.getLocations()) {
+        try {
+          final ClientDatanodeProtocol cdp = createClientDatanodeProtocolProxy(
+              datanode, conf);
+          final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock());
+          if (n >= 0) {
+            return n;
+          }
+        }
+        catch(IOException ioe) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Faild to getReplicaVisibleLength from datanode "
+                + datanode + " for block " + locatedblock.getBlock(), ioe);
+          }
+        }
+      }
+      throw new IOException("Cannot obtain block length for " + locatedblock);
+    }
     
     public synchronized long getFileLength() {
-      return (locatedBlocks == null) ? 0 : locatedBlocks.getFileLength();
+      return locatedBlocks == null? 0:
+          locatedBlocks.getFileLength() + lastBlockBeingWrittenLength;
     }
 
     /**
@@ -1615,17 +1795,36 @@
     private synchronized LocatedBlock getBlockAt(long offset,
         boolean updatePosition) throws IOException {
       assert (locatedBlocks != null) : "locatedBlocks is null";
-      // search cached blocks first
-      int targetBlockIdx = locatedBlocks.findBlock(offset);
-      if (targetBlockIdx < 0) { // block is not cached
-        targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
-        // fetch more blocks
-        LocatedBlocks newBlocks;
-        newBlocks = callGetBlockLocations(namenode, src, offset, prefetchSize);
-        assert (newBlocks != null) : "Could not find target position " + offset;
-        locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
+
+      final LocatedBlock blk;
+
+      //check offset
+      if (offset < 0 || offset >= getFileLength()) {
+        throw new IOException("offset < 0 || offset > getFileLength(), offset="
+            + offset
+            + ", updatePosition=" + updatePosition
+            + ", locatedBlocks=" + locatedBlocks);
+      }
+      else if (offset >= locatedBlocks.getFileLength()) {
+        // offset to the portion of the last block,
+        // which is not known to the name-node yet;
+        // getting the last block 
+        blk = locatedBlocks.getLastLocatedBlock();
+      }
+      else {
+        // search cached blocks first
+        int targetBlockIdx = locatedBlocks.findBlock(offset);
+        if (targetBlockIdx < 0) { // block is not cached
+          targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
+          // fetch more blocks
+          LocatedBlocks newBlocks;
+          newBlocks = callGetBlockLocations(namenode, src, offset, prefetchSize);
+          assert (newBlocks != null) : "Could not find target position " + offset;
+          locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
+        }
+        blk = locatedBlocks.get(targetBlockIdx);
       }
-      LocatedBlock blk = locatedBlocks.get(targetBlockIdx);
+
       // update current position
       if (updatePosition) {
         this.pos = offset;
@@ -1662,6 +1861,27 @@
     private synchronized List<LocatedBlock> getBlockRange(long offset, 
                                                           long length) 
                                                         throws IOException {
+      final List<LocatedBlock> blocks;
+      if (locatedBlocks.isLastBlockComplete()) {
+        blocks = getFinalizedBlockRange(offset, length);
+      }
+      else {
+        if (length + offset > locatedBlocks.getFileLength()) {
+          length = locatedBlocks.getFileLength() - offset;
+        }
+        blocks = getFinalizedBlockRange(offset, length);
+        blocks.add(locatedBlocks.getLastLocatedBlock());
+      }
+      return blocks;
+    }
+
+    /**
+     * Get blocks in the specified range.
+     * Includes only the complete blocks.
+     * Fetch them from the namenode if not cached.
+     */
+    private synchronized List<LocatedBlock> getFinalizedBlockRange(
+        long offset, long length) throws IOException {
       assert (locatedBlocks != null) : "locatedBlocks is null";
       List<LocatedBlock> blockRange = new ArrayList<LocatedBlock>();
       // search cached blocks first
@@ -1715,6 +1935,8 @@
       //
       DatanodeInfo chosenNode = null;
       int refetchToken = 1; // only need to get a new access token once
+      failures = 0;
+      
       while (true) {
         //
         // Compute desired block
@@ -1732,7 +1954,7 @@
           NetUtils.connect(s, targetAddr, socketTimeout);
           s.setSoTimeout(socketTimeout);
           Block blk = targetBlock.getBlock();
-          AccessToken accessToken = targetBlock.getAccessToken();
+          BlockAccessToken accessToken = targetBlock.getAccessToken();
           
           blockReader = BlockReader.newBlockReader(s, src, blk.getBlockId(), 
               accessToken, 
@@ -1944,6 +2166,7 @@
       //
       Socket dn = null;
       int refetchToken = 1; // only need to get a new access token once
+      failures = 0;
       
       while (true) {
         // cached block locations may have been updated by chooseDataNode()
@@ -1959,7 +2182,7 @@
           dn = socketFactory.createSocket();
           NetUtils.connect(dn, targetAddr, socketTimeout);
           dn.setSoTimeout(socketTimeout);
-          AccessToken accessToken = block.getAccessToken();
+          BlockAccessToken accessToken = block.getAccessToken();
               
           int len = (int) (end - start + 1);
               
@@ -2139,14 +2362,18 @@
       return pos;
     }
 
-    /**
+    /** Return the size of the remaining available bytes
+     * if the size is less than or equal to {@link Integer#MAX_VALUE},
+     * otherwise, return {@link Integer#MAX_VALUE}.
      */
     @Override
     public synchronized int available() throws IOException {
       if (closed) {
         throw new IOException("Stream closed");
       }
-      return (int) (getFileLength() - pos);
+
+      final long remaining = getFileLength() - pos;
+      return remaining <= Integer.MAX_VALUE? (int)remaining: Integer.MAX_VALUE;
     }
 
     /**
@@ -2165,8 +2392,8 @@
     }
   }
     
-  static class DFSDataInputStream extends FSDataInputStream {
-    DFSDataInputStream(DFSInputStream in)
+  public static class DFSDataInputStream extends FSDataInputStream {
+    public DFSDataInputStream(DFSInputStream in)
       throws IOException {
       super(in);
     }
@@ -2229,7 +2456,7 @@
     private final LinkedList<Packet> dataQueue = new LinkedList<Packet>();
     private final LinkedList<Packet> ackQueue = new LinkedList<Packet>();
     private Packet currentPacket = null;
-    private DataStreamer streamer = new DataStreamer();
+    private DataStreamer streamer;
     private long currentSeqno = 0;
     private long bytesCurBlock = 0; // bytes writen in current block
     private int packetSize = 0; // write packet size, including the header.
@@ -2336,6 +2563,18 @@
         buffer.reset();
         return buffer;
       }
+      
+      // get the packet's last byte's offset in the block
+      long getLastByteOffsetBlock() {
+        return offsetInBlock + dataPos - dataStart;
+      }
+      
+      public String toString() {
+        return "packet seqno:" + this.seqno +
+        " offsetInBlock:" + this.offsetInBlock + 
+        " lastPacketInBlock:" + this.lastPacketInBlock +
+        " lastByteOffsetInBlock: " + this.getLastByteOffsetBlock();
+      }
     }
   
     //
@@ -2347,18 +2586,102 @@
     // if them are received, the DataStreamer closes the current block.
     //
     class DataStreamer extends Daemon {
-      private static final int MAX_RECOVERY_ERROR_COUNT = 5; // try block recovery 5 times
-      private int recoveryErrorCount = 0; // number of times block recovery failed
       private volatile boolean streamerClosed = false;
-      private Block block;
-      private AccessToken accessToken;
+      private Block block; // its length is number of bytes acked
+      private BlockAccessToken accessToken;
       private DataOutputStream blockStream;
       private DataInputStream blockReplyStream;
       private ResponseProcessor response = null;
       private volatile DatanodeInfo[] nodes = null; // list of targets for current block
+      private ArrayList<DatanodeInfo> excludedNodes = new ArrayList<DatanodeInfo>();
       volatile boolean hasError = false;
-      volatile int errorIndex = 0;
-  
+      volatile int errorIndex = -1;
+      private BlockConstructionStage stage;  // block construction stage
+      private long bytesSent = 0; // number of bytes that've been sent
+
+      /**
+       * Default construction for file create
+       */
+      private DataStreamer() {
+        stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
+      }
+      
+      /**
+       * Construct a data streamer for append
+       * @param lastBlock last block of the file to be appended
+       * @param stat status of the file to be appended
+       * @param bytesPerChecksum number of bytes per checksum
+       * @throws IOException if error occurs
+       */
+      private DataStreamer(LocatedBlock lastBlock, FileStatus stat,
+          int bytesPerChecksum) throws IOException {
+        stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
+        block = lastBlock.getBlock();
+        bytesSent = block.getNumBytes();
+        accessToken = lastBlock.getAccessToken();
+        long usedInLastBlock = stat.getLen() % blockSize;
+        int freeInLastBlock = (int)(blockSize - usedInLastBlock);
+
+        // calculate the amount of free space in the pre-existing 
+        // last crc chunk
+        int usedInCksum = (int)(stat.getLen() % bytesPerChecksum);
+        int freeInCksum = bytesPerChecksum - usedInCksum;
+
+        // if there is space in the last block, then we have to 
+        // append to that block
+        if (freeInLastBlock == blockSize) {
+          throw new IOException("The last block for file " + 
+              src + " is full.");
+        }
+
+        if (usedInCksum > 0 && freeInCksum > 0) {
+          // if there is space in the last partial chunk, then 
+          // setup in such a way that the next packet will have only 
+          // one chunk that fills up the partial chunk.
+          //
+          computePacketChunkSize(0, freeInCksum);
+          resetChecksumChunk(freeInCksum);
+          appendChunk = true;
+        } else {
+          // if the remaining space in the block is smaller than 
+          // that expected size of of a packet, then create 
+          // smaller size packet.
+          //
+          computePacketChunkSize(Math.min(writePacketSize, freeInLastBlock), 
+              bytesPerChecksum);
+        }
+
+        // setup pipeline to append to the last block XXX retries??
+        nodes = lastBlock.getLocations();
+        errorIndex = -1;   // no errors yet.
+        if (nodes.length < 1) {
+          throw new IOException("Unable to retrieve blocks locations " +
+              " for last block " + block +
+              "of file " + src);
+
+        }
+      }
+      
+      /**
+       * Initialize for data streaming
+       */
+      private void initDataStreaming() {
+        this.setName("DataStreamer for file " + src +
+            " block " + block);
+        response = new ResponseProcessor(nodes);
+        response.start();
+        stage = BlockConstructionStage.DATA_STREAMING;
+      }
+      
+      private void endBlock() {
+        LOG.debug("Closing old block " + block);
+        this.setName("DataStreamer for file " + src);
+        closeResponder();
+        closeStream();
+        nodes = null;
+        stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
+      }
+      
       /*
        * streamer thread is the only thread that opens streams to datanode, 
        * and closes them. Any error recovery is also done by this thread.
@@ -2378,47 +2701,69 @@
 
           Packet one = null;
 
-          // process IO errors if any
-          boolean doSleep = processDatanodeError(hasError, false);
+          try {
+            // process datanode IO errors if any
+            boolean doSleep = false;
+            if (hasError && errorIndex>=0) {
+              doSleep = processDatanodeError();
+            }
 
-          synchronized (dataQueue) {
-            // wait for a packet to be sent.
-            while ((!streamerClosed && !hasError && clientRunning 
-                && dataQueue.size() == 0) || doSleep) {
-              try {
-                dataQueue.wait(1000);
-              } catch (InterruptedException  e) {
+            synchronized (dataQueue) {
+              // wait for a packet to be sent.
+              while ((!streamerClosed && !hasError && clientRunning 
+                  && dataQueue.size() == 0) || doSleep) {
+                try {
+                  dataQueue.wait(1000);
+                } catch (InterruptedException  e) {
+                }
+                doSleep = false;
               }
-              doSleep = false;
-            }
-            if (streamerClosed || hasError || dataQueue.size() == 0 || !clientRunning) {
-              continue;
+              if (streamerClosed || hasError || dataQueue.size() == 0 || !clientRunning) {
+                continue;
+              }
+              // get packet to be sent.
+              one = dataQueue.getFirst();
             }
-            // get packet to be sent.
-            one = dataQueue.getFirst();
-          }
-
-          try {
-            long offsetInBlock = one.offsetInBlock;
 
             // get new block from namenode.
-            if (blockStream == null) {
+            if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
               LOG.debug("Allocating new block");
-              nodes = nextBlockOutputStream(src); 
-              this.setName("DataStreamer for file " + src +
-                  " block " + block);
-              response = new ResponseProcessor(nodes);
-              response.start();
+              nodes = nextBlockOutputStream(src);
+              initDataStreaming();
+            } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
+              LOG.debug("Append to block " + block);
+              setupPipelineForAppendOrRecovery();
+              initDataStreaming();
             }
 
-            if (offsetInBlock >= blockSize) {
+            long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
+            if (lastByteOffsetInBlock > blockSize) {
               throw new IOException("BlockSize " + blockSize +
                   " is smaller than data size. " +
                   " Offset of packet in block " + 
-                  offsetInBlock +
+                  lastByteOffsetInBlock +
                   " Aborting file " + src);
             }
 
+            if (one.lastPacketInBlock) {
+              // wait for all data packets have been successfully acked
+              synchronized (dataQueue) {
+                while (!streamerClosed && !hasError && 
+                    ackQueue.size() != 0 && clientRunning) {
+                  try {
+                    // wait for acks to arrive from datanodes
+                    dataQueue.wait(1000);
+                  } catch (InterruptedException  e) {
+                  }
+                }
+              }
+              if (streamerClosed || hasError || !clientRunning) {
+                continue;
+              }
+              stage = BlockConstructionStage.PIPELINE_CLOSE;
+            }
+            
+            // send the packet
             ByteBuffer buf = one.getBuffer();
 
             synchronized (dataQueue) {
@@ -2428,19 +2773,45 @@
               dataQueue.notifyAll();
             }
 
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("DataStreamer block " + block +
+                  " sending packet " + one);
+            }
+
             // write out data to remote datanode
             blockStream.write(buf.array(), buf.position(), buf.remaining());
+            blockStream.flush();
+            
+            // update bytesSent
+            long tmpBytesSent = one.getLastByteOffsetBlock();
+            if (bytesSent < tmpBytesSent) {
+              bytesSent = tmpBytesSent;
+            }
+
+            if (streamerClosed || hasError || !clientRunning) {
+              continue;
+            }
 
+            // Is this block full?
             if (one.lastPacketInBlock) {
-              blockStream.writeInt(0); // indicate end-of-block 
+              // wait for the close packet has been acked
+              synchronized (dataQueue) {
+                while (!streamerClosed && !hasError && 
+                    ackQueue.size() != 0 && clientRunning) {
+                  dataQueue.wait(1000);// wait for acks to arrive from datanodes
+                }
+              }
+              if (streamerClosed || hasError || !clientRunning) {
+                continue;
+              }
+
+              endBlock();
             }
-            blockStream.flush();
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("DataStreamer block " + block +
-                  " wrote packet seqno:" + one.seqno +
-                  " size:" + buf.remaining() +
-                  " offsetInBlock:" + one.offsetInBlock + 
-                  " lastPacketInBlock:" + one.lastPacketInBlock);
+            if (progress != null) { progress.progress(); }
+
+            // This is used by unit test to trigger race conditions.
+            if (artificialSlowdown != 0 && clientRunning) {
+              Thread.sleep(artificialSlowdown); 
             }
           } catch (Throwable e) {
             LOG.warn("DataStreamer Exception: " + 
@@ -2449,47 +2820,16 @@
               setLastException((IOException)e);
             }
             hasError = true;
-          }
-
-
-          if (streamerClosed || hasError || !clientRunning) {
-            continue;
-          }
-
-          // Is this block full?
-          if (one.lastPacketInBlock) {
-            synchronized (dataQueue) {
-              while (!streamerClosed && !hasError && ackQueue.size() != 0 && clientRunning) {
-                try {
-                  dataQueue.wait(1000);   // wait for acks to arrive from datanodes
-                } catch (InterruptedException  e) {
-                }
-              }
-            }
-            if (streamerClosed || hasError || !clientRunning) {
-              continue;
+            if (errorIndex == -1) { // not a datanode error
+              streamerClosed = true;
             }
-
-            LOG.debug("Closing old block " + block);
-            this.setName("DataStreamer for file " + src);
-            closeResponder();
-            closeStream();
-            nodes = null;
-          }
-          if (progress != null) { progress.progress(); }
-
-          // This is used by unit test to trigger race conditions.
-          if (artificialSlowdown != 0 && clientRunning) {
-            try { 
-              Thread.sleep(artificialSlowdown); 
-            } catch (InterruptedException e) {}
           }
         }
         closeInternal();
       }
 
       private void closeInternal() {
-        closeResponder();
+        closeResponder();       // close and join
         closeStream();
         streamerClosed = true;
         closed = true;
@@ -2502,10 +2842,16 @@
        * close both streamer and DFSOutputStream, should be called only 
        * by an external thread and only after all data to be sent has 
        * been flushed to datanode.
+       * 
+       * Interrupt this data streamer if force is true
+       * 
+       * @param force if this data stream is forced to be closed 
        */
-      void close() {
+      void close(boolean force) {
         streamerClosed = true;
-        this.interrupt();
+        if (force) {
+          this.interrupt();
+        }
       }
 
       private void closeResponder() {
@@ -2563,12 +2909,12 @@
               // verify seqno from datanode
               long seqno = blockReplyStream.readLong();
               LOG.debug("DFSClient received ack for seqno " + seqno);
+              Packet one = null;
               if (seqno == -1) {
                 continue;
               } else if (seqno == -2) {
                 // no nothing
               } else {
-                Packet one = null;
                 synchronized (dataQueue) {
                   one = ackQueue.getFirst();
                 }
@@ -2581,10 +2927,20 @@
               }
 
               // processes response status from all datanodes.
+              String replies = null;
+              if (LOG.isDebugEnabled()) {
+                replies = "DFSClient Replies for seqno " + seqno + " are";
+              }
               for (int i = 0; i < targets.length && clientRunning; i++) {
                 final DataTransferProtocol.Status reply
                     = DataTransferProtocol.Status.read(blockReplyStream);
+                if (LOG.isDebugEnabled()) {
+                  replies += " " + reply;
+                }
                 if (reply != SUCCESS) {
+                  if (LOG.isDebugEnabled()) {
+                    LOG.debug(replies);
+                  }
                   errorIndex = i; // first bad datanode
                   throw new IOException("Bad response " + reply +
                       " for block " + block +
@@ -2593,6 +2949,18 @@
                 }
               }
 
+              if (LOG.isDebugEnabled()) {
+                LOG.debug(replies);
+              }
+              
+              if (one == null) {
+                throw new IOException("Panic: responder did not receive " +
+                    "an ack for a packet: " + seqno);
+              }
+              
+              // update bytesAcked
+              block.setNumBytes(one.getLastByteOffsetBlock());
+
               synchronized (dataQueue) {
                 ackQueue.removeFirst();
                 dataQueue.notifyAll();
@@ -2603,6 +2971,7 @@
                   setLastException((IOException)e);
                 }
                 hasError = true;
+                errorIndex = errorIndex==-1 ? 0 : errorIndex;
                 synchronized (dataQueue) {
                   dataQueue.notifyAll();
                 }
@@ -2625,21 +2994,12 @@
       // threads and mark stream as closed. Returns true if we should
       // sleep for a while after returning from this call.
       //
-      private boolean processDatanodeError(boolean error, boolean isAppend) {
-        if (!error) {
-          return false;
-        }
+      private boolean processDatanodeError() throws IOException {
         if (response != null) {
           LOG.info("Error Recovery for block " + block +
           " waiting for responder to exit. ");
           return true;
         }
-        if (errorIndex >= 0) {
-          LOG.warn("Error Recovery for block " + block
-              + " bad datanode[" + errorIndex + "] "
-              + (nodes == null? "nodes == null": nodes[errorIndex].getName()));
-        }
-
         closeStream();
 
         // move packets from ack queue to front of the data queue
@@ -2648,31 +3008,57 @@
           ackQueue.clear();
         }
 
-        boolean success = false;
-        while (!success && !streamerClosed && clientRunning) {
-          DatanodeInfo[] newnodes = null;
-          if (nodes == null) {
-            String msg = "Could not get block locations. " + "Source file \""
-                + src + "\" - Aborting...";
-            LOG.warn(msg);
-            setLastException(new IOException(msg));
-            streamerClosed = true;
-            return false;
-          }
-          StringBuilder pipelineMsg = new StringBuilder();
-          for (int j = 0; j < nodes.length; j++) {
-            pipelineMsg.append(nodes[j].getName());
-            if (j < nodes.length - 1) {
-              pipelineMsg.append(", ");
+        boolean doSleep = setupPipelineForAppendOrRecovery();
+        
+        if (!streamerClosed && clientRunning) {
+          if (stage == BlockConstructionStage.PIPELINE_CLOSE) {
+            synchronized (dataQueue) {
+              dataQueue.remove();  // remove the end of block packet
+              dataQueue.notifyAll();
             }
+            endBlock();
+          } else {
+            initDataStreaming();
           }
+        }
+        
+        return doSleep;
+      }
+
+
+      /**
+       * Open a DataOutputStream to a DataNode pipeline so that 
+       * it can be written to.
+       * This happens when a file is appended or data streaming fails
+       * It keeps on trying until a pipeline is setup
+       */
+      private boolean setupPipelineForAppendOrRecovery() throws IOException {
+        // check number of datanodes
+        if (nodes == null || nodes.length == 0) {
+          String msg = "Could not get block locations. " + "Source file \""
+              + src + "\" - Aborting...";
+          LOG.warn(msg);
+          setLastException(new IOException(msg));
+          streamerClosed = true;
+          return false;
+        }
+        
+        boolean success = false;
+        long newGS = 0L;
+        while (!success && !streamerClosed && clientRunning) {
+          boolean isRecovery = hasError;
           // remove bad datanode from list of datanodes.
           // If errorIndex was not set (i.e. appends), then do not remove 
           // any datanodes
           // 
-          if (errorIndex < 0) {
-            newnodes = nodes;
-          } else {
+          if (errorIndex >= 0) {
+            StringBuilder pipelineMsg = new StringBuilder();
+            for (int j = 0; j < nodes.length; j++) {
+              pipelineMsg.append(nodes[j].getName());
+              if (j < nodes.length - 1) {
+                pipelineMsg.append(", ");
+              }
+            }
             if (nodes.length <= 1) {
               lastException = new IOException("All datanodes " + pipelineMsg
                   + " are bad. Aborting...");
@@ -2682,86 +3068,32 @@
             LOG.warn("Error Recovery for block " + block +
                 " in pipeline " + pipelineMsg + 
                 ": bad datanode " + nodes[errorIndex].getName());
-            newnodes =  new DatanodeInfo[nodes.length-1];
+            DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
             System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
             System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
                 newnodes.length-errorIndex);
+            nodes = newnodes;
+            this.hasError = false;
+            lastException = null;
+            errorIndex = -1;
           }
 
-          // Tell the primary datanode to do error recovery 
-          // by stamping appropriate generation stamps.
-          //
-          LocatedBlock newBlock = null;
-          ClientDatanodeProtocol primary =  null;
-          DatanodeInfo primaryNode = null;
-          try {
-            // Pick the "least" datanode as the primary datanode to avoid deadlock.
-            primaryNode = Collections.min(Arrays.asList(newnodes));
-            primary = createClientDatanodeProtocolProxy(primaryNode, conf);
-            newBlock = primary.recoverBlock(block, isAppend, newnodes);
-          } catch (IOException e) {
-            recoveryErrorCount++;
-            if (recoveryErrorCount > MAX_RECOVERY_ERROR_COUNT) {
-              if (nodes.length > 1) {
-                // if the primary datanode failed, remove it from the list.
-                // The original bad datanode is left in the list because it is
-                // conservative to remove only one datanode in one iteration.
-                for (int j = 0; j < nodes.length; j++) {
-                  if (nodes[j].equals(primaryNode)) {
-                    errorIndex = j; // forget original bad node.
-                  }
-                }
-                // remove primary node from list
-                newnodes =  new DatanodeInfo[nodes.length-1];
-                System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
-                System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
-                    newnodes.length-errorIndex);
-                nodes = newnodes;
-                LOG.warn("Error Recovery for block " + block + " failed "
-                    + " because recovery from primary datanode " + primaryNode
-                    + " failed " + recoveryErrorCount + " times. "
-                    + " Pipeline was " + pipelineMsg
-                    + ". Marking primary datanode as bad.");
-                recoveryErrorCount = 0; 
-                errorIndex = -1;
-                return true;          // sleep when we return from here
-              }
-              String emsg = "Error Recovery for block " + block + " failed "
-                  + " because recovery from primary datanode " + primaryNode
-                  + " failed " + recoveryErrorCount + " times. "
-                  + " Pipeline was " + pipelineMsg + ". Aborting...";
-              LOG.warn(emsg);
-              lastException = new IOException(emsg);
-              streamerClosed = true;
-              return false;       // abort with IOexception
-            } 
-            LOG.warn("Error Recovery for block " + block + " failed "
-                + " because recovery from primary datanode " + primaryNode
-                + " failed " + recoveryErrorCount + " times. "
-                + " Pipeline was " + pipelineMsg + ". Will retry...");
-            return true;          // sleep when we return from here
-          } finally {
-            RPC.stopProxy(primary);
-          }
-          recoveryErrorCount = 0; // block recovery successful
-
-          // If the block recovery generated a new generation stamp, use that
-          // from now on.  Also, setup new pipeline
-          // newBlock should never be null and it should contain a newly 
-          // generated access token.
-          block = newBlock.getBlock();
-          accessToken = newBlock.getAccessToken();
-          nodes = newBlock.getLocations();
-
-          this.hasError = false;
-          lastException = null;
-          errorIndex = 0;
-          success = createBlockOutputStream(nodes, clientName, true);
+          // get a new generation stamp and an access token
+          LocatedBlock lb = namenode.updateBlockForPipeline(block, clientName);
+          newGS = lb.getBlock().getGenerationStamp();
+          accessToken = lb.getAccessToken();
+          
+          // set up the pipeline again with the remaining nodes
+          success = createBlockOutputStream(nodes, newGS, isRecovery);
         }
 
-        if (!streamerClosed && clientRunning) {
-          response = new ResponseProcessor(nodes);
-          response.start();
+        if (success) {
+          // update pipeline at the namenode
+          Block newBlock = new Block(
+              block.getBlockId(), block.getNumBytes(), newGS);
+          namenode.updatePipeline(clientName, block, newBlock, nodes);
+          // update client side generation stamp
+          block = newBlock;
         }
         return false; // do not sleep, continue processing
       }
@@ -2781,24 +3113,31 @@
         do {
           hasError = false;
           lastException = null;
-          errorIndex = 0;
+          errorIndex = -1;
           retry = false;
           success = false;
 
           long startTime = System.currentTimeMillis();
-          lb = locateFollowingBlock(startTime);
+          DatanodeInfo[] w = excludedNodes.toArray(
+              new DatanodeInfo[excludedNodes.size()]);
+          lb = locateFollowingBlock(startTime, w.length > 0 ? w : null);
           block = lb.getBlock();
+          block.setNumBytes(0);
           accessToken = lb.getAccessToken();
           nodes = lb.getLocations();
 
           //
           // Connect to first DataNode in the list.
           //
-          success = createBlockOutputStream(nodes, clientName, false);
+          success = createBlockOutputStream(nodes, 0L, false);
 
           if (!success) {
             LOG.info("Abandoning block " + block);
             namenode.abandonBlock(block, src, clientName);
+            block = null;
+
+            LOG.info("Excluding datanode " + nodes[errorIndex]);
+            excludedNodes.add(nodes[errorIndex]);
 
             // Connection failed.  Let's wait a little bit and retry
             retry = true;
@@ -2806,6 +3145,7 @@
               if (System.currentTimeMillis() - startTime > 5000) {
                 LOG.info("Waiting to find target node: " + nodes[0].getName());
               }
+              //TODO fix this timout. Extract it o a constant, maybe make it available from conf
               Thread.sleep(6000);
             } catch (InterruptedException iex) {
             }
@@ -2821,7 +3161,7 @@
       // connects to the first datanode in the pipeline
       // Returns true if success, otherwise return failure.
       //
-      private boolean createBlockOutputStream(DatanodeInfo[] nodes, String client,
+      private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
           boolean recoveryFlag) {
         DataTransferProtocol.Status pipelineStatus = SUCCESS;
         String firstBadLink = "";
@@ -2856,9 +3196,11 @@
               DataNode.SMALL_BUFFER_SIZE));
           blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
 
+          // send the request
           DataTransferProtocol.Sender.opWriteBlock(out,
-              block.getBlockId(), block.getGenerationStamp(), nodes.length,
-              recoveryFlag, client, null, nodes, accessToken);
+              block.getBlockId(), block.getGenerationStamp(),
+              nodes.length, recoveryFlag?stage.getRecoveryStage():stage, newGS,
+              block.getNumBytes(), bytesSent, clientName, null, nodes, accessToken);
           checksum.writeHeader(out);
           out.flush();
 
@@ -2891,6 +3233,8 @@
                 break;
               }
             }
+          } else {
+            errorIndex = 0;
           }
           hasError = true;
           setLastException(ie);
@@ -2899,14 +3243,15 @@
         }
       }
 
-      private LocatedBlock locateFollowingBlock(long start) throws IOException {
+      private LocatedBlock locateFollowingBlock(long start,
+          DatanodeInfo[] excludedNodes) throws IOException {
         int retries = conf.getInt("dfs.client.block.write.locateFollowingBlock.retries", 5);
         long sleeptime = 400;
         while (true) {
           long localstart = System.currentTimeMillis();
           while (true) {
             try {
-              return namenode.addBlock(src, clientName);
+              return namenode.addBlock(src, clientName, block, excludedNodes);
             } catch (RemoteException e) {
               IOException ue = 
                 e.unwrapRemoteException(FileNotFoundException.class,
@@ -2947,59 +3292,15 @@
         } 
       }
 
-      void initAppend(LocatedBlock lastBlock, FileStatus stat,
-          int bytesPerChecksum) throws IOException {
-        block = lastBlock.getBlock();
-        accessToken = lastBlock.getAccessToken();
-        long usedInLastBlock = stat.getLen() % blockSize;
-        int freeInLastBlock = (int)(blockSize - usedInLastBlock);
-
-        // calculate the amount of free space in the pre-existing 
-        // last crc chunk
-        int usedInCksum = (int)(stat.getLen() % bytesPerChecksum);
-        int freeInCksum = bytesPerChecksum - usedInCksum;
-
-        // if there is space in the last block, then we have to 
-        // append to that block
-        if (freeInLastBlock > blockSize) {
-          throw new IOException("The last block for file " + 
-              src + " is full.");
-        }
-
-        if (usedInCksum > 0 && freeInCksum > 0) {
-          // if there is space in the last partial chunk, then 
-          // setup in such a way that the next packet will have only 
-          // one chunk that fills up the partial chunk.
-          //
-          computePacketChunkSize(0, freeInCksum);
-          resetChecksumChunk(freeInCksum);
-          appendChunk = true;
-        } else {
-          // if the remaining space in the block is smaller than 
-          // that expected size of of a packet, then create 
-          // smaller size packet.
-          //
-          computePacketChunkSize(Math.min(writePacketSize, freeInLastBlock), 
-              bytesPerChecksum);
-        }
-
-        // setup pipeline to append to the last block XXX retries??
-        nodes = lastBlock.getLocations();
-        errorIndex = -1;   // no errors yet.
-        if (nodes.length < 1) {
-          throw new IOException("Unable to retrieve blocks locations " +
-              " for last block " + block +
-              "of file " + src);
-
-        }
-        processDatanodeError(true, true);
+      Block getBlock() {
+        return block;
       }
 
       DatanodeInfo[] getNodes() {
         return nodes;
       }
 
-      AccessToken getAccessToken() {
+      BlockAccessToken getAccessToken() {
         return accessToken;
       }
 
@@ -3058,10 +3359,10 @@
 
     /**
      * Create a new output stream to the given DataNode.
-     * @see ClientProtocol#create(String, FsPermission, String, boolean, short, long)
+     * @see ClientProtocol#create(String, FsPermission, String, EnumSetWritable, boolean, short, long)
      */
     DFSOutputStream(String src, FsPermission masked, EnumSet<CreateFlag> flag,
-        short replication, long blockSize, Progressable progress,
+        boolean createParent, short replication, long blockSize, Progressable progress,
         int buffersize, int bytesPerChecksum) throws IOException {
       this(src, blockSize, progress, bytesPerChecksum);
 
@@ -3069,12 +3370,15 @@
 
       try {
         namenode.create(
-            src, masked, clientName, new EnumSetWritable<CreateFlag>(flag), replication, blockSize);
+            src, masked, clientName, new EnumSetWritable<CreateFlag>(flag), createParent, replication, blockSize);
       } catch(RemoteException re) {
         throw re.unwrapRemoteException(AccessControlException.class,
+                                       FileAlreadyExistsException.class,
+                                       FileNotFoundException.class,
                                        NSQuotaExceededException.class,
                                        DSQuotaExceededException.class);
       }
+      streamer = new DataStreamer();
       streamer.start();
     }
   
@@ -3094,9 +3398,10 @@
       if (lastBlock != null) {
         // indicate that we are appending to an existing block
         bytesCurBlock = lastBlock.getBlockSize();
-        streamer.initAppend(lastBlock, stat, bytesPerChecksum);
+        streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum);
       } else {
         computePacketChunkSize(writePacketSize, bytesPerChecksum);
+        streamer = new DataStreamer();
       }
       streamer.start();
     }
@@ -3185,36 +3490,56 @@
               ", blockSize=" + blockSize +
               ", appendChunk=" + appendChunk);
         }
-        //
-        // if we allocated a new packet because we encountered a block
-        // boundary, reset bytesCurBlock.
-        //
-        if (bytesCurBlock == blockSize) {
-          currentPacket.lastPacketInBlock = true;
-          bytesCurBlock = 0;
-          lastFlushOffset = -1;
-        }
         waitAndQueuePacket(currentPacket);
         currentPacket = null;
 
-        // If this was the first write after reopening a file, then the above
-        // write filled up any partial chunk. Tell the summer to generate full 
+        // If the reopened file did not end at chunk boundary and the above
+        // write filled up its partial chunk. Tell the summer to generate full 
         // crc chunks from now on.
-        if (appendChunk) {
+        if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) {
           appendChunk = false;
           resetChecksumChunk(bytesPerChecksum);
         }
-        int psize = Math.min((int)(blockSize-bytesCurBlock), writePacketSize);
-        computePacketChunkSize(psize, bytesPerChecksum);
+
+        if (!appendChunk) {
+          int psize = Math.min((int)(blockSize-bytesCurBlock), writePacketSize);
+          computePacketChunkSize(psize, bytesPerChecksum);
+        }
+        //
+        // if encountering a block boundary, send an empty packet to 
+        // indicate the end of block and reset bytesCurBlock.
+        //
+        if (bytesCurBlock == blockSize) {
+          currentPacket = new Packet(DataNode.PKT_HEADER_LEN+4, 0, 
+              bytesCurBlock);
+          currentPacket.lastPacketInBlock = true;
+          waitAndQueuePacket(currentPacket);
+          currentPacket = null;
+          bytesCurBlock = 0;
+          lastFlushOffset = -1;
+        }
       }
     }
   
+    @Override
+    @Deprecated
+    public synchronized void sync() throws IOException {
+      hflush();
+    }
+    
     /**
-     * All data is written out to datanodes. It is not guaranteed 
-     * that data has been flushed to persistent store on the 
-     * datanode. Block allocations are persisted on namenode.
+     * flushes out to all replicas of the block. 
+     * The data is in the buffers of the DNs 
+     * but not neccessary on the DN's OS buffers. 
+     *
+     * It is a synchronous operation. When it returns,
+     * it gurantees that flushed data become visible to new readers. 
+     * It is not guaranteed that data has been flushed to 
+     * persistent store on the datanode. 
+     * Block allocations are persisted on namenode.
      */
-    public synchronized void sync() throws IOException {
+    @Override
+    public synchronized void hflush() throws IOException {
       checkOpen();
       isClosed();
       try {
@@ -3258,12 +3583,24 @@
         }
       } catch (IOException e) {
           lastException = new IOException("IOException flush:" + e);
-          closeThreads();
+          closeThreads(true);
           throw e;
       }
     }
 
     /**
+     * The expected semantics is all data have flushed out to all replicas 
+     * and all replicas have done posix fsync equivalent - ie the OS has 
+     * flushed it to the disk device (but the disk may have it in its cache).
+     * 
+     * Right now by default it is implemented as hflush
+     */
+    @Override
+    public synchronized void hsync() throws IOException {
+      hflush();
+    }
+    
+    /**
      * Waits till all existing data is flushed and confirmations 
      * received from datanodes. 
      */
@@ -3299,13 +3636,14 @@
       }
       streamer.setLastException(new IOException("Lease timeout of " +
                                (hdfsTimeout/1000) + " seconds expired."));
-      closeThreads();
+      closeThreads(true);
     }
  
     // shutdown datastreamer and responseprocessor threads.
-    private void closeThreads() throws IOException {
+    // interrupt datastreamer if force is true
+    private void closeThreads(boolean force) throws IOException {
       try {
-        streamer.close();
+        streamer.close(force);
         streamer.join();
         if (s != null) {
           s.close();
@@ -3336,21 +3674,22 @@
       try {
         flushBuffer();       // flush from all upper layers
 
-        // Mark that this packet is the last packet in block.
-        // If there are no outstanding packets and the last packet
-        // was not the last one in the current block, then create a
-        // packet with empty payload.
-        if (currentPacket == null && bytesCurBlock != 0) {
-          currentPacket = new Packet(packetSize, chunksPerPacket,
-              bytesCurBlock);
-        }
         if (currentPacket != null) { 
+          waitAndQueuePacket(currentPacket);
+        }
+
+        if (bytesCurBlock != 0) {
+          // send an empty packet to mark the end of the block
+          currentPacket = new Packet(DataNode.PKT_HEADER_LEN+4, 0, 
+              bytesCurBlock);
           currentPacket.lastPacketInBlock = true;
         }
 
         flushInternal();             // flush all data to Datanodes
-        closeThreads();
-        completeFile();
+        // get last block before destroying the streamer
+        Block lastBlock = streamer.getBlock();
+        closeThreads(false);
+        completeFile(lastBlock);
         leasechecker.remove(src);
       } finally {
         closed = true;
@@ -3359,11 +3698,11 @@
 
     // should be called holding (this) lock since setTestFilename() may 
     // be called during unit tests
-    private void completeFile() throws IOException {
+    private void completeFile(Block last) throws IOException {
       long localstart = System.currentTimeMillis();
       boolean fileComplete = false;
       while (!fileComplete) {
-        fileComplete = namenode.complete(src, clientName);
+        fileComplete = namenode.complete(src, clientName, last);
         if (!fileComplete) {
           if (!clientRunning ||
                 (hdfsTimeout > 0 &&
@@ -3411,7 +3750,7 @@
     /**
      * Returns the access token currently used by streamer, for testing only
      */
-    AccessToken getAccessToken() {
+    BlockAccessToken getAccessToken() {
       return streamer.getAccessToken();
     }
 

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Sat Nov 28 20:05:56 2009
@@ -37,6 +37,7 @@
 import org.apache.hadoop.hdfs.DFSClient.DFSOutputStream;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.fs.Options;
 
 
 /****************************************************************
@@ -177,12 +178,20 @@
     }
     return dfs.getBlockLocations(getPathName(file.getPath()), start, len);
   }
+  
+  @Override
+  public BlockLocation[] getFileBlockLocations(Path p, 
+      long start, long len) throws IOException {
+    return dfs.getBlockLocations(getPathName(p), start, len);
+
+  }
 
   @Override
   public void setVerifyChecksum(boolean verifyChecksum) {
     this.verifyChecksum = verifyChecksum;
   }
 
+  @SuppressWarnings("deprecation")
   @Override
   public FSDataInputStream open(Path f, int bufferSize) throws IOException {
     return new DFSClient.DFSDataInputStream(
@@ -203,11 +212,33 @@
     EnumSet<CreateFlag> flag, int bufferSize, short replication, long blockSize,
     Progressable progress) throws IOException {
 
-    return new FSDataOutputStream
-       (dfs.create(getPathName(f), permission,
+    return new FSDataOutputStream(dfs.create(getPathName(f), permission,
                    flag, replication, blockSize, progress, bufferSize),
         statistics);
   }
+  
+  @SuppressWarnings("deprecation")
+  @Override
+  protected FSDataOutputStream primitiveCreate(Path f,
+    FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
+    short replication, long blockSize, Progressable progress,
+    int bytesPerChecksum) throws IOException {
+    return new FSDataOutputStream(dfs.primitiveCreate(getPathName(f),
+        absolutePermission, flag, true, replication, blockSize,
+        progress, bufferSize, bytesPerChecksum),statistics);
+   } 
+
+  /**
+   * Same as create(), except fails if parent directory doesn't already exist.
+   * @see #create(Path, FsPermission, EnumSet, int, short, long, Progressable)
+   */
+  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
+      EnumSet<CreateFlag> flag, int bufferSize, short replication,
+      long blockSize, Progressable progress) throws IOException {
+
+    return new FSDataOutputStream(dfs.create(getPathName(f), permission, flag,
+        false, replication, blockSize, progress, bufferSize), statistics);
+  }
 
   @Override
   public boolean setReplication(Path src, 
@@ -215,19 +246,41 @@
                                ) throws IOException {
     return dfs.setReplication(getPathName(src), replication);
   }
-
+  
   /**
-   * Rename files/dirs
+   * THIS IS DFS only operations, it is not part of FileSystem
+   * move blocks from srcs to trg
+   * and delete srcs afterwards
+   * all blocks should be the same size
+   * @param trg existing file to append to
+   * @param psrcs list of files (same block size, same replication)
+   * @throws IOException
    */
+  public void concat(Path trg, Path [] psrcs) throws IOException {
+    String [] srcs = new String [psrcs.length];
+    for(int i=0; i<psrcs.length; i++) {
+      srcs[i] = getPathName(psrcs[i]);
+    }
+    dfs.concat(getPathName(trg), srcs);
+  }
+
+  /** {@inheritDoc} */
+  @SuppressWarnings("deprecation")
   @Override
   public boolean rename(Path src, Path dst) throws IOException {
     return dfs.rename(getPathName(src), getPathName(dst));
   }
 
-  /**
-   * requires a boolean check to delete a non 
-   * empty directory recursively.
+  /** 
+   * {@inheritDoc}
+   * This rename operation is guaranteed to be atomic.
    */
+  @SuppressWarnings("deprecation")
+  @Override
+  public void rename(Path src, Path dst, Options.Rename... options) throws IOException {
+    dfs.rename(getPathName(src), getPathName(dst), options);
+  }
+  
   @Override
   public boolean delete(Path f, boolean recursive) throws IOException {
    return dfs.delete(getPathName(f), recursive);
@@ -268,9 +321,24 @@
     return stats;
   }
 
+  /**
+   * Create a directory with given name and permission, only when
+   * parent directory exists.
+   */
+  public boolean mkdir(Path f, FsPermission permission) throws IOException {
+    return dfs.mkdirs(getPathName(f), permission, false);
+  }
+
   @Override
   public boolean mkdirs(Path f, FsPermission permission) throws IOException {
-    return dfs.mkdirs(getPathName(f), permission);
+    return dfs.mkdirs(getPathName(f), permission, true);
+  }
+
+  @SuppressWarnings("deprecation")
+  @Override
+  protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
+    throws IOException {
+    return dfs.primitiveMkdir(getPathName(f), absolutePermission);
   }
 
   /** {@inheritDoc} */
@@ -434,6 +502,12 @@
     dfs.metaSave(pathname);
   }
 
+  /** {@inheritDoc} */
+  @Override
+  public FsServerDefaults getServerDefaults() throws IOException {
+    return dfs.getServerDefaults();
+  }
+
   /**
    * We need to find the blocks that didn't match.  Likely only one 
    * is corrupt but we will report both to the namenode.  In the future,

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java Sat Nov 28 20:05:56 2009
@@ -27,9 +27,11 @@
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.text.ParseException;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.Random;
+import java.util.TimeZone;
 
 import javax.security.auth.login.LoginException;
 
@@ -37,15 +39,12 @@
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.server.common.ThreadLocalDateFormat;
-import org.apache.hadoop.hdfs.server.namenode.ListPathsServlet;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UnixUserGroupInformation;
@@ -59,6 +58,8 @@
 import org.xml.sax.helpers.DefaultHandler;
 import org.xml.sax.helpers.XMLReaderFactory;
 
+
+
 /** An implementation of a protocol for accessing filesystems over HTTP.
  * The following implementation provides a limited, read-only interface
  * to a filesystem over HTTP.
@@ -74,7 +75,21 @@
   protected UserGroupInformation ugi; 
   protected final Random ran = new Random();
 
-  protected static final ThreadLocalDateFormat df = ListPathsServlet.df;
+  public static final String HFTP_TIMEZONE = "UTC";
+  public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
+
+  public static final SimpleDateFormat getDateFormat() {
+    final SimpleDateFormat df = new SimpleDateFormat(HFTP_DATE_FORMAT);
+    df.setTimeZone(TimeZone.getTimeZone(HFTP_TIMEZONE));
+    return df;
+  }
+
+  protected static final ThreadLocal<SimpleDateFormat> df =
+    new ThreadLocal<SimpleDateFormat>() {
+      protected SimpleDateFormat initialValue() {
+        return getDateFormat();
+      }
+    };
 
   @Override
   public void initialize(URI name, Configuration conf) throws IOException {
@@ -100,55 +115,48 @@
     } 
   }
 
-  /**
-   * Open an HTTP connection to the namenode to read file data and metadata.
-   * @param path The path component of the URL
-   * @param query The query component of the URL
-   */
-  protected HttpURLConnection openConnection(String path, String query)
-      throws IOException {
+
+  /* 
+    Construct URL pointing to file on namenode
+  */
+  URL getNamenodeFileURL(Path f) throws IOException {
+    return getNamenodeURL("/data" + f.toUri().getPath(), "ugi=" + ugi);
+  }
+
+  /* 
+    Construct URL pointing to namenode. 
+  */
+  URL getNamenodeURL(String path, String query) throws IOException {
     try {
       final URL url = new URI("http", null, nnAddr.getHostName(),
           nnAddr.getPort(), path, query, null).toURL();
       if (LOG.isTraceEnabled()) {
         LOG.trace("url=" + url);
       }
-      HttpURLConnection connection = (HttpURLConnection)url.openConnection();
-      connection.setRequestMethod("GET");
-      connection.connect();
-      return connection;
+      return url;
     } catch (URISyntaxException e) {
-      throw (IOException)new IOException().initCause(e);
+      throw new IOException(e);
     }
   }
 
+  /**
+   * Open an HTTP connection to the namenode to read file data and metadata.
+   * @param path The path component of the URL
+   * @param query The query component of the URL
+   */
+  protected HttpURLConnection openConnection(String path, String query)
+      throws IOException {
+    final URL url = getNamenodeURL(path, query);
+    HttpURLConnection connection = (HttpURLConnection)url.openConnection();
+    connection.setRequestMethod("GET");
+    connection.connect();
+    return connection;
+  }
+
   @Override
   public FSDataInputStream open(Path f, int buffersize) throws IOException {
-    HttpURLConnection connection = null;
-    connection = openConnection("/data" + f.toUri().getPath(), "ugi=" + ugi);
-    final InputStream in = connection.getInputStream();
-    return new FSDataInputStream(new FSInputStream() {
-        public int read() throws IOException {
-          return in.read();
-        }
-        public int read(byte[] b, int off, int len) throws IOException {
-          return in.read(b, off, len);
-        }
-
-        public void close() throws IOException {
-          in.close();
-        }
-
-        public void seek(long pos) throws IOException {
-          throw new IOException("Can't seek!");
-        }
-        public long getPos() throws IOException {
-          throw new IOException("Position unknown!");
-        }
-        public boolean seekToNewSource(long targetPos) throws IOException {
-          return false;
-        }
-      });
+    URL u = getNamenodeURL("/data" + f.toUri().getPath(), "ugi=" + ugi);
+    return new FSDataInputStream(new ByteRangeInputStream(u));
   }
 
   /** Class to parse and store a listing reply from the server. */
@@ -168,10 +176,11 @@
       long modif;
       long atime = 0;
       try {
-        modif = df.parse(attrs.getValue("modified")).getTime();
+        final SimpleDateFormat ldf = df.get();
+        modif = ldf.parse(attrs.getValue("modified")).getTime();
         String astr = attrs.getValue("accesstime");
         if (astr != null) {
-          atime = df.parse(astr).getTime();
+          atime = ldf.parse(astr).getTime();
         }
       } catch (ParseException e) { throw new SAXException(e); }
       FileStatus fs = "file".equals(qname)
@@ -286,7 +295,7 @@
 
   @Override
   public Path getWorkingDirectory() {
-    return new Path("/").makeQualified(this);
+    return new Path("/").makeQualified(getUri(), null);
   }
 
   @Override

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/HsftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/HsftpFileSystem.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/HsftpFileSystem.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/HsftpFileSystem.java Sat Nov 28 20:05:56 2009
@@ -65,9 +65,9 @@
    * @throws IOException
    */
   private static void setupSsl(Configuration conf) throws IOException {
-    Configuration sslConf = new Configuration(false);
-    sslConf.addResource(conf.get("dfs.https.client.keystore.resource",
-        "ssl-client.xml"));
+    Configuration sslConf = new HdfsConfiguration(false);
+    sslConf.addResource(conf.get(DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY,
+                             DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_DEFAULT));
     FileInputStream fis = null;
     try {
       SSLContext sc = SSLContext.getInstance("SSL");

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/Block.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/Block.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/Block.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/Block.java Sat Nov 28 20:05:56 2009
@@ -40,10 +40,6 @@
        });
   }
 
-  // generation stamp of blocks that pre-date the introduction of
-  // a generation stamp.
-  public static final long GRANDFATHER_GENERATION_STAMP = 0;
-
   public static final Pattern blockFilePattern = Pattern
       .compile(BLOCK_FILE_PREFIX + "(-??\\d++)$");
   public static final Pattern metaFilePattern = Pattern
@@ -70,7 +66,7 @@
   public static long getGenerationStamp(String metaFile) {
     Matcher m = metaFilePattern.matcher(metaFile);
     return m.matches() ? Long.parseLong(m.group(2))
-        : GRANDFATHER_GENERATION_STAMP;
+        : GenerationStamp.GRANDFATHER_GENERATION_STAMP;
   }
 
   /**
@@ -91,9 +87,13 @@
     set(blkid, len, generationStamp);
   }
 
-  public Block(final long blkid) {this(blkid, 0, GenerationStamp.WILDCARD_STAMP);}
+  public Block(final long blkid) {
+    this(blkid, 0, GenerationStamp.GRANDFATHER_GENERATION_STAMP);
+  }
 
-  public Block(Block blk) {this(blk.blockId, blk.numBytes, blk.generationStamp);}
+  public Block(Block blk) {
+    this(blk.blockId, blk.numBytes, blk.generationStamp);
+  }
 
   /**
    * Find the blockid from the given filename
@@ -164,32 +164,13 @@
     }
   }
 
-  /////////////////////////////////////
-  // Comparable
-  /////////////////////////////////////
-  static void validateGenerationStamp(long generationstamp) {
-    if (generationstamp == GenerationStamp.WILDCARD_STAMP) {
-      throw new IllegalStateException("generationStamp (=" + generationstamp
-          + ") == GenerationStamp.WILDCARD_STAMP");
-    }    
-  }
-
-  /** {@inheritDoc} */
+  @Override // Comparable
   public int compareTo(Block b) {
-    //Wildcard generationStamp is NOT ALLOWED here
-    validateGenerationStamp(this.generationStamp);
-    validateGenerationStamp(b.generationStamp);
-
-    if (blockId < b.blockId) {
-      return -1;
-    } else if (blockId == b.blockId) {
-      return GenerationStamp.compare(generationStamp, b.generationStamp);
-    } else {
-      return 1;
-    }
+    return blockId < b.blockId ? -1 :
+           blockId > b.blockId ? 1 : 0;
   }
 
-  /** {@inheritDoc} */
+  @Override // Object
   public boolean equals(Object o) {
     if (this == o) {
       return true;
@@ -197,16 +178,12 @@
     if (!(o instanceof Block)) {
       return false;
     }
-    final Block that = (Block)o;
-    //Wildcard generationStamp is ALLOWED here
-    return this.blockId == that.blockId
-      && GenerationStamp.equalsWithWildcard(
-          this.generationStamp, that.generationStamp);
+    return compareTo((Block)o) == 0;
   }
 
-  /** {@inheritDoc} */
+  @Override // Object
   public int hashCode() {
     //GenerationStamp is IRRELEVANT and should not be used here
-    return 37 * 17 + (int) (blockId^(blockId>>>32));
+    return (int)(blockId^(blockId>>>32));
   }
 }