You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:06:08 UTC

svn commit: r885143 [8/18] - in /hadoop/hdfs/branches/HDFS-326: ./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/ src/ant/org/apache/hadoop/ant/ src/ant/org/apache/hadoop/ant/condition/ src/c++/ src/c++/libhdfs/ src/c++/libhdfs/docs/...

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java Sat Nov 28 20:05:56 2009
@@ -17,48 +17,98 @@
  */
 package org.apache.hadoop.hdfs.protocol;
 
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
+
 /**
  * This class provides an interface for accessing list of blocks that
  * has been implemented as long[].
- * This class is usefull for block report. Rather than send block reports
+ * This class is useful for block report. Rather than send block reports
  * as a Block[] we can send it as a long[].
  *
+ * The structure of the array is as follows:
+ * 0: the length of the finalized replica list;
+ * 1: the length of the under-construction replica list;
+ * - followed by finalized replica list where each replica is represented by
+ *   3 longs: one for the blockId, one for the block length, and one for
+ *   the generation stamp;
+ * - followed by the invalid replica represented with three -1s;
+ * - followed by the under-construction replica list where each replica is
+ *   represented by 4 longs: three for the block id, length, generation 
+ *   stamp, and the forth for the replica state.
  */
-public class BlockListAsLongs {
+public class BlockListAsLongs implements Iterable<Block> {
   /**
-   * A block as 3 longs
+   * A finalized block as 3 longs
    *   block-id and block length and generation stamp
    */
-  private static final int LONGS_PER_BLOCK = 3;
-  
-  private static int index2BlockId(int index) {
-    return index*LONGS_PER_BLOCK;
-  }
-  private static int index2BlockLen(int index) {
-    return (index*LONGS_PER_BLOCK) + 1;
-  }
-  private static int index2BlockGenStamp(int index) {
-    return (index*LONGS_PER_BLOCK) + 2;
+  private static final int LONGS_PER_FINALIZED_BLOCK = 3;
+
+  /**
+   * An under-construction block as 4 longs
+   *   block-id and block length, generation stamp and replica state
+   */
+  private static final int LONGS_PER_UC_BLOCK = 4;
+
+  /** Number of longs in the header */
+  private static final int HEADER_SIZE = 2;
+
+  /**
+   * Returns the index of the first long in blockList
+   * belonging to the specified block.
+   * The first long contains the block id.
+   */
+  private int index2BlockId(int blockIndex) {
+    if(blockIndex < 0 || blockIndex > getNumberOfBlocks())
+      return -1;
+    int finalizedSize = getNumberOfFinalizedReplicas();
+    if(blockIndex < finalizedSize)
+      return HEADER_SIZE + blockIndex * LONGS_PER_FINALIZED_BLOCK;
+    return HEADER_SIZE + (finalizedSize + 1) * LONGS_PER_FINALIZED_BLOCK
+            + (blockIndex - finalizedSize) * LONGS_PER_UC_BLOCK;
   }
-  
+
   private long[] blockList;
   
   /**
-   * Converting a block[] to a long[]
-   * @param blockArray - the input array block[]
-   * @return the output array of long[]
+   * Create block report from finalized and under construction lists of blocks.
+   * 
+   * @param finalized - list of finalized blocks
+   * @param uc - list of under construction blocks
    */
-  
-  public static long[] convertToArrayLongs(final Block[] blockArray) {
-    long[] blocksAsLongs = new long[blockArray.length * LONGS_PER_BLOCK];
+  public BlockListAsLongs(final List<? extends Block> finalized,
+                          final List<ReplicaInfo> uc) {
+    int finalizedSize = finalized == null ? 0 : finalized.size();
+    int ucSize = uc == null ? 0 : uc.size();
+    int len = HEADER_SIZE
+              + (finalizedSize + 1) * LONGS_PER_FINALIZED_BLOCK
+              + ucSize * LONGS_PER_UC_BLOCK;
 
-    BlockListAsLongs bl = new BlockListAsLongs(blocksAsLongs);
-    assert bl.getNumberOfBlocks() == blockArray.length;
+    blockList = new long[len];
 
-    for (int i = 0; i < blockArray.length; i++) {
-      bl.setBlock(i, blockArray[i]);
+    // set the header
+    blockList[0] = finalizedSize;
+    blockList[1] = ucSize;
+
+    // set finalized blocks
+    for (int i = 0; i < finalizedSize; i++) {
+      setBlock(i, finalized.get(i));
     }
-    return blocksAsLongs;
+
+    // set invalid delimiting block
+    setDelimitingBlock(finalizedSize);
+
+    // set under construction blocks
+    for (int i = 0; i < ucSize; i++) {
+      setBlock(finalizedSize + i, uc.get(i));
+    }
+  }
+
+  public BlockListAsLongs() {
+    this(null);
   }
 
   /**
@@ -67,33 +117,136 @@
    */
   public BlockListAsLongs(final long[] iBlockList) {
     if (iBlockList == null) {
-      blockList = new long[0];
-    } else {
-      if (iBlockList.length%LONGS_PER_BLOCK != 0) {
-        // must be multiple of LONGS_PER_BLOCK
-        throw new IllegalArgumentException();
-      }
-      blockList = iBlockList;
+      blockList = new long[HEADER_SIZE];
+      return;
+    }
+    blockList = iBlockList;
+  }
+
+  public long[] getBlockListAsLongs() {
+    return blockList;
+  }
+
+  /**
+   * Iterates over blocks in the block report.
+   * Avoids object allocation on each iteration.
+   */
+  public class BlockReportIterator implements Iterator<Block> {
+    private int currentBlockIndex;
+    private Block block;
+    private ReplicaState currentReplicaState;
+
+    BlockReportIterator() {
+      this.currentBlockIndex = 0;
+      this.block = new Block();
+      this.currentReplicaState = null;
+    }
+
+    public boolean hasNext() {
+      return currentBlockIndex < getNumberOfBlocks();
+    }
+
+    public Block next() {
+      block.set(blockId(currentBlockIndex),
+                blockLength(currentBlockIndex),
+                blockGenerationStamp(currentBlockIndex));
+      currentReplicaState = blockReplicaState(currentBlockIndex);
+      currentBlockIndex++;
+      return block;
+    }
+
+    public void remove() {
+      throw new UnsupportedOperationException("Sorry. can't remove.");
+    }
+
+    /**
+     * Get the state of the current replica.
+     * The state corresponds to the replica returned
+     * by the latest {@link #next()}. 
+     */
+    public ReplicaState getCurrentReplicaState() {
+      return currentReplicaState;
     }
   }
 
-  
+  /**
+   * Returns an iterator over blocks in the block report. 
+   */
+  public Iterator<Block> iterator() {
+    return getBlockReportIterator();
+  }
+
+  /**
+   * Returns {@link BlockReportIterator}. 
+   */
+  public BlockReportIterator getBlockReportIterator() {
+    return new BlockReportIterator();
+  }
+
   /**
    * The number of blocks
    * @return - the number of blocks
    */
   public int getNumberOfBlocks() {
-    return blockList.length/LONGS_PER_BLOCK;
+    assert blockList.length == HEADER_SIZE + 
+            (blockList[0] + 1) * LONGS_PER_FINALIZED_BLOCK +
+            blockList[1] * LONGS_PER_UC_BLOCK :
+              "Number of blocks is inconcistent with the array length";
+    return getNumberOfFinalizedReplicas() + getNumberOfUCReplicas();
   }
-  
-  
+
+  /**
+   * Returns the number of finalized replicas in the block report.
+   */
+  private int getNumberOfFinalizedReplicas() {
+    return (int)blockList[0];
+  }
+
+  /**
+   * Returns the number of under construction replicas in the block report.
+   */
+  private int getNumberOfUCReplicas() {
+    return (int)blockList[1];
+  }
+
+  /**
+   * Returns the id of the specified replica of the block report.
+   */
+  private long blockId(int index) {
+    return blockList[index2BlockId(index)];
+  }
+
+  /**
+   * Returns the length of the specified replica of the block report.
+   */
+  private long blockLength(int index) {
+    return blockList[index2BlockId(index) + 1];
+  }
+
+  /**
+   * Returns the generation stamp of the specified replica of the block report.
+   */
+  private long blockGenerationStamp(int index) {
+    return blockList[index2BlockId(index) + 2];
+  }
+
+  /**
+   * Returns the state of the specified replica of the block report.
+   */
+  private ReplicaState blockReplicaState(int index) {
+    if(index < getNumberOfFinalizedReplicas())
+      return ReplicaState.FINALIZED;
+    return ReplicaState.getState((int)blockList[index2BlockId(index) + 3]);
+  }
+
   /**
    * The block-id of the indexTh block
    * @param index - the block whose block-id is desired
    * @return the block-id
    */
+  @Deprecated
   public long getBlockId(final int index)  {
-    return blockList[index2BlockId(index)];
+    return blockId(index);
   }
   
   /**
@@ -101,8 +254,9 @@
    * @param index - the block whose block-len is desired
    * @return - the block-len
    */
+  @Deprecated
   public long getBlockLen(final int index)  {
-    return blockList[index2BlockLen(index)];
+    return blockLength(index);
   }
 
   /**
@@ -110,8 +264,9 @@
    * @param index - the block whose block-len is desired
    * @return - the generation stamp
    */
+  @Deprecated
   public long getBlockGenStamp(final int index)  {
-    return blockList[index2BlockGenStamp(index)];
+    return blockGenerationStamp(index);
   }
   
   /**
@@ -119,9 +274,28 @@
    * @param index - the index of the block to set
    * @param b - the block is set to the value of the this block
    */
-  void setBlock(final int index, final Block b) {
-    blockList[index2BlockId(index)] = b.getBlockId();
-    blockList[index2BlockLen(index)] = b.getNumBytes();
-    blockList[index2BlockGenStamp(index)] = b.getGenerationStamp();
+  private <T extends Block> void setBlock(final int index, final T b) {
+    int pos = index2BlockId(index);
+    blockList[pos] = b.getBlockId();
+    blockList[pos + 1] = b.getNumBytes();
+    blockList[pos + 2] = b.getGenerationStamp();
+    if(index < getNumberOfFinalizedReplicas())
+      return;
+    assert ((ReplicaInfo)b).getState() != ReplicaState.FINALIZED :
+      "Must be under-construction replica.";
+    blockList[pos + 3] = ((ReplicaInfo)b).getState().getValue();
+  }
+
+  /**
+   * Set the invalid delimiting block between the finalized and
+   * the under-construction lists.
+   * The invalid block has all three fields set to -1.
+   * @param finalizedSzie - the size of the finalized list
+   */
+  private void setDelimitingBlock(final int finalizedSzie) {
+    int idx = HEADER_SIZE + finalizedSzie * LONGS_PER_FINALIZED_BLOCK;
+    blockList[idx] = -1;
+    blockList[idx+1] = -1;
+    blockList[idx+2] = -1;
   }
 }

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java Sat Nov 28 20:05:56 2009
@@ -29,19 +29,10 @@
   public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class);
 
   /**
-   * 4: never return null and always return a newly generated access token
+   * 6: recoverBlock() removed.
    */
-  public static final long versionID = 4L;
+  public static final long versionID = 6L;
 
-  /** Start generation-stamp recovery for specified block
-   * @param block the specified block
-   * @param keepLength keep the block length
-   * @param targets the list of possible locations of specified block
-   * @return either a new generation stamp, or the original generation stamp. 
-   * Regardless of whether a new generation stamp is returned, a newly 
-   * generated access token is returned as part of the return value.
-   * @throws IOException
-   */
-  LocatedBlock recoverBlock(Block block, boolean keepLength,
-      DatanodeInfo[] targets) throws IOException;
+  /** Return the visible length of a replica. */
+  long getReplicaVisibleLength(Block b) throws IOException;
 }

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Sat Nov 28 20:05:56 2009
@@ -21,8 +21,11 @@
 import java.io.IOException;
 
 import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
@@ -43,9 +46,9 @@
    * Compared to the previous version the following changes have been introduced:
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
-   * 45: add create flag for create command, see Hadoop-5438
+   * 52: adding concat() API
    */
-  public static final long versionID = 45L;
+  public static final long versionID = 52L;
   
   ///////////////////////////////////////
   // File contents
@@ -74,6 +77,13 @@
                                           long length) throws IOException;
 
   /**
+   * Get server default values for a number of configuration params.
+   * @return a set of server default configuration values
+   * @throws IOException
+   */
+  public FsServerDefaults getServerDefaults() throws IOException;
+
+  /**
    * Create a new file entry in the namespace.
    * <p>
    * This will create an empty file specified by the source path.
@@ -85,14 +95,15 @@
    * {@link #rename(String, String)} it until the file is completed
    * or explicitly as a result of lease expiration.
    * <p>
-   * Blocks have a maximum size.  Clients that intend to
-   * create multi-block files must also use {@link #addBlock(String, String)}.
+   * Blocks have a maximum size.  Clients that intend to create
+   * multi-block files must also use {@link #addBlock(String, String, Block)}.
    *
    * @param src path of the file being created.
    * @param masked masked permission.
    * @param clientName name of the current client.
    * @param flag indicates whether the file should be 
    * overwritten if it already exists or create if it does not exist or append.
+   * @param createParent create missing parent directory if true
    * @param replication block replication factor.
    * @param blockSize maximum block size.
    * 
@@ -107,6 +118,7 @@
                      FsPermission masked,
                              String clientName, 
                              EnumSetWritable<CreateFlag> flag, 
+                             boolean createParent,
                              short replication,
                              long blockSize
                              ) throws IOException;
@@ -177,9 +189,17 @@
    * addBlock() allocates a new block and datanodes the block data
    * should be replicated to.
    * 
+   * addBlock() also commits the previous block by reporting
+   * to the name-node the actual generation stamp and the length
+   * of the block that the client has transmitted to data-nodes.
+   * 
    * @return LocatedBlock allocated block information.
    */
-  public LocatedBlock addBlock(String src, String clientName) throws IOException;
+  public LocatedBlock addBlock(String src, String clientName,
+                               Block previous) throws IOException;
+
+  public LocatedBlock addBlock(String src, String clientName,
+      Block previous, DatanodeInfo[] excludedNode) throws IOException;
 
   /**
    * The client is done writing data to the given filename, and would 
@@ -187,13 +207,18 @@
    *
    * The function returns whether the file has been closed successfully.
    * If the function returns false, the caller should try again.
+   * 
+   * close() also commits the last block of the file by reporting
+   * to the name-node the actual generation stamp and the length
+   * of the block that the client has transmitted to data-nodes.
    *
    * A call to complete() will not return true until all the file's
    * blocks have been replicated the minimum number of times.  Thus,
    * DataNode failures may cause a client to call complete() several
    * times before succeeding.
    */
-  public boolean complete(String src, String clientName) throws IOException;
+  public boolean complete(String src, String clientName,
+                          Block last) throws IOException;
 
   /**
    * The client wants to report corrupted blocks (blocks with specified
@@ -207,7 +232,6 @@
   ///////////////////////////////////////
   /**
    * Rename an item in the file system namespace.
-   * 
    * @param src existing file or directory name.
    * @param dst new name.
    * @return true if successful, or false if the old name does not exist
@@ -215,10 +239,45 @@
    * @throws IOException if the new name is invalid.
    * @throws QuotaExceededException if the rename would violate 
    *                                any quota restriction
+   * @deprecated Use {@link #rename(String, String, Options.Rename...)} instead.
    */
+  @Deprecated
   public boolean rename(String src, String dst) throws IOException;
 
   /**
+   * moves blocks from srcs to trg and delete srcs
+   * 
+   * @param trg existing file
+   * @param srcs - list of existing files (same block size, same replication)
+   * @throws IOException if some arguments are invalid
+   * @throws QuotaExceededException if the rename would violate 
+   *                                any quota restriction
+   */
+  public void concat(String trg, String [] srcs) throws IOException;
+
+  /**
+   * Rename src to dst.
+   * <ul>
+   * <li>Fails if src is a file and dst is a directory.
+   * <li>Fails if src is a directory and dst is a file.
+   * <li>Fails if the parent of dst does not exist or is a file.
+   * </ul>
+   * <p>
+   * Without OVERWRITE option, rename fails if the dst already exists.
+   * With OVERWRITE option, rename overwrites the dst, if it is a file 
+   * or an empty directory. Rename fails if dst is a non-empty directory.
+   * <p>
+   * This implementation of rename is atomic.
+   * <p>
+   * @param src existing file or directory name.
+   * @param dst new name.
+   * @param options Rename options
+   * @throws IOException if rename failed
+   */
+  public void rename(String src, String dst, Options.Rename... options)
+      throws IOException;
+  
+  /**
    * Delete the given file or directory from the file system.
    * <p>
    * Any blocks belonging to the deleted files will be garbage-collected.
@@ -250,6 +309,7 @@
    *
    * @param src The path of the directory being created
    * @param masked The masked permission of the directory being created
+   * @param createParent create missing parent directory if true
    * @return True if the operation success.
    * @throws {@link AccessControlException} if permission to create file is 
    * denied by the system. As usually on the client side the exception will 
@@ -257,7 +317,8 @@
    * @throws QuotaExceededException if the operation would violate 
    *                                any quota restriction.
    */
-  public boolean mkdirs(String src, FsPermission masked) throws IOException;
+  public boolean mkdirs(String src, FsPermission masked, boolean createParent)
+      throws IOException;
 
   /**
    * Get a listing of the indicated directory
@@ -344,7 +405,7 @@
    * percentage called threshold of blocks, which satisfy the minimal 
    * replication condition.
    * The minimal replication condition is that each block must have at least
-   * <tt>dfs.replication.min</tt> replicas.
+   * <tt>dfs.namenode.replication.min</tt> replicas.
    * When the threshold is reached the name node extends safe mode
    * for a configurable amount of time
    * to let the remaining data nodes to check in before it
@@ -360,7 +421,7 @@
    * <h4>Configuration parameters:</h4>
    * <tt>dfs.safemode.threshold.pct</tt> is the threshold parameter.<br>
    * <tt>dfs.safemode.extension</tt> is the safe mode extension parameter.<br>
-   * <tt>dfs.replication.min</tt> is the minimal replication parameter.
+   * <tt>dfs.namenode.replication.min</tt> is the minimal replication parameter.
    * 
    * <h4>Special cases:</h4>
    * The name node does not enter safe mode at startup if the threshold is 
@@ -488,4 +549,32 @@
    *              by this call.
    */
   public void setTimes(String src, long mtime, long atime) throws IOException;
+  
+  /**
+   * Get a new generation stamp together with an access token for 
+   * a block under construction
+   * 
+   * This method is called only when a client needs to recover a failed
+   * pipeline or set up a pipeline for appending to a block.
+   * 
+   * @param block a block
+   * @param clientName the name of the client
+   * @return a located block with a new generation stamp and an access token
+   * @throws IOException if any error occurs
+   */
+  public LocatedBlock updateBlockForPipeline(Block block, String clientName) 
+  throws IOException;
+
+  /**
+   * Update a pipeline for a block under construction
+   * 
+   * @param clientName the name of the client
+   * @param oldBlock the old block
+   * @param newBlock the new block containing new generation stamp and length
+   * @param newNodes datanodes in the pipeline
+   * @throws IOException if any error occurs
+   */
+  public void updatePipeline(String clientName, Block oldBlock, 
+      Block newBlock, DatanodeID[] newNodes)
+  throws IOException;
 }

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Sat Nov 28 20:05:56 2009
@@ -24,8 +24,9 @@
 import java.io.IOException;
 import java.io.OutputStream;
 
+import org.apache.hadoop.hdfs.security.BlockAccessToken;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.AccessToken;
+import org.apache.hadoop.io.WritableUtils;
 
 /**
  * Transfer data to/from datanode using a streaming protocol.
@@ -38,12 +39,12 @@
    * when protocol changes. It is not very obvious. 
    */
   /*
-   * Version 16:
-   *    Datanode now needs to send back a status code together 
-   *    with firstBadLink during pipeline setup for dfs write
-   *    (only for DFSClients, not for other datanodes).
+   * Version 17:
+   *    Change the block write protocol to support pipeline recovery.
+   *    Additional fields, like recovery flags, new GS, minBytesRcvd, 
+   *    and maxBytesRcvd are included.
    */
-  public static final int DATA_TRANSFER_VERSION = 16;
+  public static final int DATA_TRANSFER_VERSION = 17;
 
   /** Operation */
   public enum Op {
@@ -119,6 +120,55 @@
     }
   };
   
+  public enum BlockConstructionStage {
+    /** The enumerates are always listed as regular stage followed by the
+     * recovery stage. 
+     * Changing this order will make getRecoveryStage not working.
+     */
+    // pipeline set up for block append
+    PIPELINE_SETUP_APPEND,
+    // pipeline set up for failed PIPELINE_SETUP_APPEND recovery
+    PIPELINE_SETUP_APPEND_RECOVERY,
+    // data streaming
+    DATA_STREAMING,
+    // pipeline setup for failed data streaming recovery
+    PIPELINE_SETUP_STREAMING_RECOVERY,
+    // close the block and pipeline
+    PIPELINE_CLOSE,
+    // Recover a failed PIPELINE_CLOSE
+    PIPELINE_CLOSE_RECOVERY,
+    // pipeline set up for block creation
+    PIPELINE_SETUP_CREATE;
+    
+    final static private byte RECOVERY_BIT = (byte)1;
+    
+    /**
+     * get the recovery stage of this stage
+     */
+    public BlockConstructionStage getRecoveryStage() {
+      if (this == PIPELINE_SETUP_CREATE) {
+        throw new IllegalArgumentException( "Unexpected blockStage " + this);
+      } else {
+        return values()[ordinal()|RECOVERY_BIT];
+      }
+    }
+    
+    private static BlockConstructionStage valueOf(byte code) {
+      return code < 0 || code >= values().length? null: values()[code];
+    }
+    
+    /** Read from in */
+    private static BlockConstructionStage readFields(DataInput in)
+    throws IOException {
+      return valueOf(in.readByte());
+    }
+
+    /** write to out */
+    private void write(DataOutput out) throws IOException {
+      out.writeByte(ordinal());
+    }
+  }    
+
   /** @deprecated Deprecated at 0.21.  Use Op.WRITE_BLOCK instead. */
   @Deprecated
   public static final byte OP_WRITE_BLOCK = Op.WRITE_BLOCK.code;
@@ -173,7 +223,7 @@
     /** Send OP_READ_BLOCK */
     public static void opReadBlock(DataOutputStream out,
         long blockId, long blockGs, long blockOffset, long blockLen,
-        String clientName, AccessToken accessToken) throws IOException {
+        String clientName, BlockAccessToken accessToken) throws IOException {
       op(out, Op.READ_BLOCK);
 
       out.writeLong(blockId);
@@ -187,15 +237,19 @@
     
     /** Send OP_WRITE_BLOCK */
     public static void opWriteBlock(DataOutputStream out,
-        long blockId, long blockGs, int pipelineSize, boolean isRecovery,
-        String client, DatanodeInfo src, DatanodeInfo[] targets,
-        AccessToken accesstoken) throws IOException {
+        long blockId, long blockGs, int pipelineSize, 
+        BlockConstructionStage stage, long newGs, long minBytesRcvd,
+        long maxBytesRcvd, String client, DatanodeInfo src, 
+        DatanodeInfo[] targets, BlockAccessToken accesstoken) throws IOException {
       op(out, Op.WRITE_BLOCK);
 
       out.writeLong(blockId);
       out.writeLong(blockGs);
       out.writeInt(pipelineSize);
-      out.writeBoolean(isRecovery);
+      stage.write(out);
+      WritableUtils.writeVLong(out, newGs);
+      WritableUtils.writeVLong(out, minBytesRcvd);
+      WritableUtils.writeVLong(out, maxBytesRcvd);
       Text.writeString(out, client);
 
       out.writeBoolean(src != null);
@@ -213,7 +267,7 @@
     /** Send OP_REPLACE_BLOCK */
     public static void opReplaceBlock(DataOutputStream out,
         long blockId, long blockGs, String storageId, DatanodeInfo src,
-        AccessToken accesstoken) throws IOException {
+        BlockAccessToken accesstoken) throws IOException {
       op(out, Op.REPLACE_BLOCK);
 
       out.writeLong(blockId);
@@ -226,7 +280,7 @@
 
     /** Send OP_COPY_BLOCK */
     public static void opCopyBlock(DataOutputStream out,
-        long blockId, long blockGs, AccessToken accesstoken) throws IOException {
+        long blockId, long blockGs, BlockAccessToken accesstoken) throws IOException {
       op(out, Op.COPY_BLOCK);
 
       out.writeLong(blockId);
@@ -237,7 +291,7 @@
 
     /** Send OP_BLOCK_CHECKSUM */
     public static void opBlockChecksum(DataOutputStream out,
-        long blockId, long blockGs, AccessToken accesstoken) throws IOException {
+        long blockId, long blockGs, BlockAccessToken accesstoken) throws IOException {
       op(out, Op.BLOCK_CHECKSUM);
 
       out.writeLong(blockId);
@@ -289,7 +343,7 @@
       final long offset = in.readLong();
       final long length = in.readLong();
       final String client = Text.readString(in);
-      final AccessToken accesstoken = readAccessToken(in);
+      final BlockAccessToken accesstoken = readAccessToken(in);
 
       opReadBlock(in, blockId, blockGs, offset, length, client, accesstoken);
     }
@@ -300,14 +354,18 @@
      */
     protected abstract void opReadBlock(DataInputStream in,
         long blockId, long blockGs, long offset, long length,
-        String client, AccessToken accesstoken) throws IOException;
+        String client, BlockAccessToken accesstoken) throws IOException;
     
     /** Receive OP_WRITE_BLOCK */
     private void opWriteBlock(DataInputStream in) throws IOException {
       final long blockId = in.readLong();          
       final long blockGs = in.readLong();
       final int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
-      final boolean isRecovery = in.readBoolean(); // is this part of recovery?
+      final BlockConstructionStage stage = 
+        BlockConstructionStage.readFields(in);
+      final long newGs = WritableUtils.readVLong(in);
+      final long minBytesRcvd = WritableUtils.readVLong(in);
+      final long maxBytesRcvd = WritableUtils.readVLong(in);
       final String client = Text.readString(in); // working on behalf of this client
       final DatanodeInfo src = in.readBoolean()? DatanodeInfo.read(in): null;
 
@@ -319,10 +377,10 @@
       for (int i = 0; i < targets.length; i++) {
         targets[i] = DatanodeInfo.read(in);
       }
-      final AccessToken accesstoken = readAccessToken(in);
+      final BlockAccessToken accesstoken = readAccessToken(in);
 
-      opWriteBlock(in, blockId, blockGs, pipelineSize, isRecovery,
-          client, src, targets, accesstoken);
+      opWriteBlock(in, blockId, blockGs, pipelineSize, stage,
+          newGs, minBytesRcvd, maxBytesRcvd, client, src, targets, accesstoken);
     }
 
     /**
@@ -330,9 +388,11 @@
      * Write a block.
      */
     protected abstract void opWriteBlock(DataInputStream in,
-        long blockId, long blockGs, int pipelineSize, boolean isRecovery,
+        long blockId, long blockGs,
+        int pipelineSize, BlockConstructionStage stage,
+        long newGs, long minBytesRcvd, long maxBytesRcvd,
         String client, DatanodeInfo src, DatanodeInfo[] targets,
-        AccessToken accesstoken) throws IOException;
+        BlockAccessToken accesstoken) throws IOException;
 
     /** Receive OP_REPLACE_BLOCK */
     private void opReplaceBlock(DataInputStream in) throws IOException {
@@ -340,7 +400,7 @@
       final long blockGs = in.readLong();
       final String sourceId = Text.readString(in); // read del hint
       final DatanodeInfo src = DatanodeInfo.read(in); // read proxy source
-      final AccessToken accesstoken = readAccessToken(in);
+      final BlockAccessToken accesstoken = readAccessToken(in);
 
       opReplaceBlock(in, blockId, blockGs, sourceId, src, accesstoken);
     }
@@ -351,13 +411,13 @@
      */
     protected abstract void opReplaceBlock(DataInputStream in,
         long blockId, long blockGs, String sourceId, DatanodeInfo src,
-        AccessToken accesstoken) throws IOException;
+        BlockAccessToken accesstoken) throws IOException;
 
     /** Receive OP_COPY_BLOCK */
     private void opCopyBlock(DataInputStream in) throws IOException {
       final long blockId = in.readLong();          
       final long blockGs = in.readLong();
-      final AccessToken accesstoken = readAccessToken(in);
+      final BlockAccessToken accesstoken = readAccessToken(in);
 
       opCopyBlock(in, blockId, blockGs, accesstoken);
     }
@@ -367,13 +427,13 @@
      * It is used for balancing purpose; send to a proxy source.
      */
     protected abstract void opCopyBlock(DataInputStream in,
-        long blockId, long blockGs, AccessToken accesstoken) throws IOException;
+        long blockId, long blockGs, BlockAccessToken accesstoken) throws IOException;
 
     /** Receive OP_BLOCK_CHECKSUM */
     private void opBlockChecksum(DataInputStream in) throws IOException {
       final long blockId = in.readLong();          
       final long blockGs = in.readLong();
-      final AccessToken accesstoken = readAccessToken(in);
+      final BlockAccessToken accesstoken = readAccessToken(in);
 
       opBlockChecksum(in, blockId, blockGs, accesstoken);
     }
@@ -383,12 +443,12 @@
      * Get the checksum of a block 
      */
     protected abstract void opBlockChecksum(DataInputStream in,
-        long blockId, long blockGs, AccessToken accesstoken) throws IOException;
+        long blockId, long blockGs, BlockAccessToken accesstoken) throws IOException;
 
     /** Read an AccessToken */
-    static private AccessToken readAccessToken(DataInputStream in
+    static private BlockAccessToken readAccessToken(DataInputStream in
         ) throws IOException {
-      final AccessToken t = new AccessToken();
+      final BlockAccessToken t = new BlockAccessToken();
       t.readFields(in);
       return t; 
     }

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java Sat Nov 28 20:05:56 2009
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.protocol;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
 
 /************************************
  * Some handy constants
@@ -27,7 +28,7 @@
   public static int MIN_BLOCKS_FOR_WRITE = 5;
 
   // Chunk the block Invalidate message
-  public static final int BLOCK_INVALIDATE_CHUNK = 100;
+  public static final int BLOCK_INVALIDATE_CHUNK = 1000;
 
   // Long that indicates "leave current quota unchanged"
   public static final long QUOTA_DONT_SET = Long.MAX_VALUE;
@@ -48,11 +49,15 @@
   public static int MAX_PATH_LENGTH = 8000;
   public static int MAX_PATH_DEPTH = 1000;
     
-  public static final int BUFFER_SIZE = new Configuration().getInt("io.file.buffer.size", 4096);
+  public static final int BUFFER_SIZE = new HdfsConfiguration().getInt("io.file.buffer.size", 4096);
   //Used for writing header etc.
   public static final int SMALL_BUFFER_SIZE = Math.min(BUFFER_SIZE/2, 512);
   //TODO mb@media-style.com: should be conf injected?
   public static final long DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024;
+  public static final int DEFAULT_BYTES_PER_CHECKSUM = 512;
+  public static final int DEFAULT_WRITE_PACKET_SIZE = 64 * 1024;
+  public static final short DEFAULT_REPLICATION_FACTOR = 3;
+  public static final int DEFAULT_FILE_BUFFER_SIZE = 4096;
   public static final int DEFAULT_DATA_SOCKET_SIZE = 128 * 1024;
 
   public static final int SIZE_OF_INTEGER = Integer.SIZE / Byte.SIZE;
@@ -86,7 +91,7 @@
   // Version is reflected in the data storage file.
   // Versions are negative.
   // Decrement LAYOUT_VERSION to define a new version.
-  public static final int LAYOUT_VERSION = -19;
+  public static final int LAYOUT_VERSION = -22;
   // Current version: 
-  // -19: Sticky bit
+  // -22: added new OP_CONCAT_DELETE 
 }

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java Sat Nov 28 20:05:56 2009
@@ -17,8 +17,8 @@
  */
 package org.apache.hadoop.hdfs.protocol;
 
+import org.apache.hadoop.hdfs.security.BlockAccessToken;
 import org.apache.hadoop.io.*;
-import org.apache.hadoop.security.AccessToken;
 
 import java.io.*;
 
@@ -44,7 +44,7 @@
   // else false. If block has few corrupt replicas, they are filtered and 
   // their locations are not part of this object
   private boolean corrupt;
-  private AccessToken accessToken = new AccessToken();
+  private BlockAccessToken accessToken = new BlockAccessToken();
 
   /**
    */
@@ -78,11 +78,11 @@
     }
   }
 
-  public AccessToken getAccessToken() {
+  public BlockAccessToken getAccessToken() {
     return accessToken;
   }
 
-  public void setAccessToken(AccessToken token) {
+  public void setAccessToken(BlockAccessToken token) {
     this.accessToken = token;
   }
 
@@ -145,4 +145,21 @@
       locs[i].readFields(in);
     }
   }
+
+  /** Read LocatedBlock from in. */
+  public static LocatedBlock read(DataInput in) throws IOException {
+    final LocatedBlock lb = new LocatedBlock();
+    lb.readFields(in);
+    return lb;
+  }
+
+  /** {@inheritDoc} */
+  public String toString() {
+    return getClass().getSimpleName() + "{" + b
+        + "; getBlockSize()=" + getBlockSize()
+        + "; corrupt=" + corrupt
+        + "; offset=" + offset
+        + "; locs=" + java.util.Arrays.asList(locs)
+        + "}";
+  }
 }

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java Sat Nov 28 20:05:56 2009
@@ -36,6 +36,8 @@
   private long fileLength;
   private List<LocatedBlock> blocks; // array of blocks with prioritized locations
   private boolean underConstruction;
+  private LocatedBlock lastLocatedBlock = null;
+  private boolean isLastBlockComplete = false;
 
   LocatedBlocks() {
     fileLength = 0;
@@ -43,11 +45,15 @@
     underConstruction = false;
   }
   
-  public LocatedBlocks(long flength, List<LocatedBlock> blks, boolean isUnderConstuction) {
-
+  /** public Constructor */
+  public LocatedBlocks(long flength, boolean isUnderConstuction,
+      List<LocatedBlock> blks, 
+      LocatedBlock lastBlock, boolean isLastBlockCompleted) {
     fileLength = flength;
     blocks = blks;
     underConstruction = isUnderConstuction;
+    this.lastLocatedBlock = lastBlock;
+    this.isLastBlockComplete = isLastBlockCompleted;
   }
   
   /**
@@ -57,6 +63,16 @@
     return blocks;
   }
   
+  /** Get the last located block. */
+  public LocatedBlock getLastLocatedBlock() {
+    return lastLocatedBlock;
+  }
+  
+  /** Is the last block completed? */
+  public boolean isLastBlockComplete() {
+    return isLastBlockComplete;
+  }
+
   /**
    * Get located block.
    */
@@ -161,6 +177,15 @@
   public void write(DataOutput out) throws IOException {
     out.writeLong(this.fileLength);
     out.writeBoolean(underConstruction);
+
+    //write the last located block
+    final boolean isNull = lastLocatedBlock == null;
+    out.writeBoolean(isNull);
+    if (!isNull) {
+      lastLocatedBlock.write(out);
+    }
+    out.writeBoolean(isLastBlockComplete);
+
     // write located blocks
     int nrBlocks = locatedBlockCount();
     out.writeInt(nrBlocks);
@@ -175,6 +200,14 @@
   public void readFields(DataInput in) throws IOException {
     this.fileLength = in.readLong();
     underConstruction = in.readBoolean();
+
+    //read the last located block
+    final boolean isNull = in.readBoolean();
+    if (!isNull) {
+      lastLocatedBlock = LocatedBlock.read(in);
+    }
+    isLastBlockComplete = in.readBoolean();
+
     // read located blocks
     int nrBlocks = in.readInt();
     this.blocks = new ArrayList<LocatedBlock>(nrBlocks);
@@ -184,4 +217,18 @@
       this.blocks.add(blk);
     }
   }
+
+  /** {@inheritDoc} */
+  @Override
+  public String toString() {
+    final StringBuilder b = new StringBuilder(getClass().getSimpleName());
+    b.append("{")
+     .append("\n  fileLength=").append(fileLength)
+     .append("\n  underConstruction=").append(underConstruction)
+     .append("\n  blocks=").append(blocks)
+     .append("\n  lastLocatedBlock=").append(lastLocatedBlock)
+     .append("\n  isLastBlockComplete=").append(isLastBlockComplete)
+     .append("}");
+    return b.toString();
+  }
 }

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Sat Nov 28 20:05:56 2009
@@ -25,6 +25,7 @@
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.lang.Class;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
@@ -61,9 +62,15 @@
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.security.AccessTokenHandler;
+import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicy;
+import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyDefault;
+import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.io.IOUtils;
@@ -74,9 +81,6 @@
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.security.AccessToken;
-import org.apache.hadoop.security.AccessTokenHandler;
-import org.apache.hadoop.security.ExportedAccessKeys;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Daemon;
@@ -365,7 +369,7 @@
     
     /* Send a block replace request to the output stream*/
     private void sendRequest(DataOutputStream out) throws IOException {
-      AccessToken accessToken = AccessToken.DUMMY_TOKEN;
+      BlockAccessToken accessToken = BlockAccessToken.DUMMY_TOKEN;
       if (isAccessTokenEnabled) {
         accessToken = accessTokenHandler.generateToken(null, block.getBlock()
             .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.REPLACE,
@@ -772,18 +776,31 @@
       }
     }
   }
+
+  /* Check that this Balancer is compatible with the Block Placement Policy
+   * used by the Namenode.
+   */
+  private void checkReplicationPolicyCompatibility(Configuration conf) throws UnsupportedActionException {
+    if (BlockPlacementPolicy.getInstance(conf, null, null).getClass() != 
+        BlockPlacementPolicyDefault.class) {
+      throw new UnsupportedActionException("Balancer without BlockPlacementPolicyDefault");
+    }
+  }
   
   /** Default constructor */
-  Balancer() {
+  Balancer() throws UnsupportedActionException {
+    checkReplicationPolicyCompatibility(getConf());
   }
   
   /** Construct a balancer from the given configuration */
-  Balancer(Configuration conf) {
+  Balancer(Configuration conf) throws UnsupportedActionException {
+    checkReplicationPolicyCompatibility(conf);
     setConf(conf);
   } 
 
   /** Construct a balancer from the given configuration and threshold */
-  Balancer(Configuration conf, double threshold) {
+  Balancer(Configuration conf, double threshold) throws UnsupportedActionException {
+    checkReplicationPolicyCompatibility(conf);
     setConf(conf);
     this.threshold = threshold;
   }

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/GenerationStamp.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/GenerationStamp.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/GenerationStamp.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/GenerationStamp.java Sat Nov 28 20:05:56 2009
@@ -17,35 +17,36 @@
  */
 package org.apache.hadoop.hdfs.server.common;
 
-import java.io.*;
-import org.apache.hadoop.io.*;
-
 /****************************************************************
  * A GenerationStamp is a Hadoop FS primitive, identified by a long.
  ****************************************************************/
-public class GenerationStamp implements WritableComparable<GenerationStamp> {
-  public static final long WILDCARD_STAMP = 1;
+public class GenerationStamp implements Comparable<GenerationStamp> {
+  /**
+   * The first valid generation stamp.
+   */
   public static final long FIRST_VALID_STAMP = 1000L;
 
-  static {                                      // register a ctor
-    WritableFactories.setFactory
-      (GenerationStamp.class,
-       new WritableFactory() {
-         public Writable newInstance() { return new GenerationStamp(0); }
-       });
-  }
+  /**
+   * Generation stamp of blocks that pre-date the introduction
+   * of a generation stamp.
+   */
+  public static final long GRANDFATHER_GENERATION_STAMP = 0;
 
-  long genstamp;
+  private volatile long genstamp;
 
   /**
    * Create a new instance, initialized to FIRST_VALID_STAMP.
    */
-  public GenerationStamp() {this(GenerationStamp.FIRST_VALID_STAMP);}
+  public GenerationStamp() {
+    this(GenerationStamp.FIRST_VALID_STAMP);
+  }
 
   /**
    * Create a new instance, initialized to the specified value.
    */
-  GenerationStamp(long stamp) {this.genstamp = stamp;}
+  GenerationStamp(long stamp) {
+    this.genstamp = stamp;
+  }
 
   /**
    * Returns the current generation stamp
@@ -69,46 +70,22 @@
     return this.genstamp;
   }
 
-  /////////////////////////////////////
-  // Writable
-  /////////////////////////////////////
-  public void write(DataOutput out) throws IOException {
-    out.writeLong(genstamp);
-  }
-
-  public void readFields(DataInput in) throws IOException {
-    this.genstamp = in.readLong();
-    if (this.genstamp < 0) {
-      throw new IOException("Bad Generation Stamp: " + this.genstamp);
-    }
-  }
-
-  /////////////////////////////////////
-  // Comparable
-  /////////////////////////////////////
-  public static int compare(long x, long y) {
-    return x < y? -1: x == y? 0: 1;
-  }
-
-  /** {@inheritDoc} */
+  @Override // Comparable
   public int compareTo(GenerationStamp that) {
-    return compare(this.genstamp, that.genstamp);
+    return this.genstamp < that.genstamp ? -1 :
+           this.genstamp > that.genstamp ? 1 : 0;
   }
 
-  /** {@inheritDoc} */
+  @Override // Object
   public boolean equals(Object o) {
     if (!(o instanceof GenerationStamp)) {
       return false;
     }
-    return genstamp == ((GenerationStamp)o).genstamp;
-  }
-
-  public static boolean equalsWithWildcard(long x, long y) {
-    return x == y || x == WILDCARD_STAMP || y == WILDCARD_STAMP;  
+    return compareTo((GenerationStamp)o) == 0;
   }
 
-  /** {@inheritDoc} */
+  @Override // Object
   public int hashCode() {
-    return 37 * 17 + (int) (genstamp^(genstamp>>>32));
+    return (int) (genstamp^(genstamp>>>32));
   }
 }

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java Sat Nov 28 20:05:56 2009
@@ -17,6 +17,10 @@
  */
 package org.apache.hadoop.hdfs.server.common;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
 /************************************
  * Some handy internal HDFS constants
  *
@@ -80,5 +84,77 @@
       return description;
     }
   }
+
+  /**
+   * Block replica states, which it can go through while being constructed.
+   */
+  static public enum ReplicaState {
+    /** Replica is finalized. The state when replica is not modified. */
+    FINALIZED(0),
+    /** Replica is being written to. */
+    RBW(1),
+    /** Replica is waiting to be recovered. */
+    RWR(2),
+    /** Replica is under recovery. */
+    RUR(3),
+    /** Temporary replica: created for replication and relocation only. */
+    TEMPORARY(4);
+
+    private int value;
+
+    private ReplicaState(int v) {
+      value = v;
+    }
+
+    public int getValue() {
+      return value;
+    }
+
+    public static ReplicaState getState(int v) {
+      return ReplicaState.values()[v];
+    }
+
+    /** Read from in */
+    public static ReplicaState read(DataInput in) throws IOException {
+      return values()[in.readByte()];
+    }
+
+    /** Write to out */
+    public void write(DataOutput out) throws IOException {
+      out.writeByte(ordinal());
+    }
+  }
+
+  /**
+   * States, which a block can go through while it is under construction.
+   */
+  static public enum BlockUCState {
+    /**
+     * Block construction completed.<br>
+     * The block has at least one {@link ReplicaState#FINALIZED} replica,
+     * and is not going to be modified.
+     */
+    COMPLETE,
+    /**
+     * The block is under construction.<br>
+     * It has been recently allocated for write or append.
+     */
+    UNDER_CONSTRUCTION,
+    /**
+     * The block is under recovery.<br>
+     * When a file lease expires its last block may not be {@link #COMPLETE}
+     * and needs to go through a recovery procedure, 
+     * which synchronizes the existing replicas contents.
+     */
+    UNDER_RECOVERY,
+    /**
+     * The block is committed.<br>
+     * The client reported that all bytes are written to data-nodes
+     * with the given generation stamp and block length, but no 
+     * {@link ReplicaState#FINALIZED} 
+     * replicas has yet been reported by data-nodes themselves.
+     */
+    COMMITTED;
+  }
 }
 

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Sat Nov 28 20:05:56 2009
@@ -36,18 +36,19 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.BlockAccessToken;
 import org.apache.hadoop.hdfs.server.namenode.DatanodeDescriptor;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.AccessToken;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.util.VersionInfo;
 
 public class JspHelper {
   final static public String WEB_UGI_PROPERTY_NAME = "dfs.web.ugi";
 
-  public static final Configuration conf = new Configuration();
+  public static final Configuration conf = new HdfsConfiguration();
   public static final UnixUserGroupInformation webUGI
   = UnixUserGroupInformation.createImmutable(
       conf.getStrings(WEB_UGI_PROPERTY_NAME));
@@ -105,7 +106,7 @@
   }
 
   public static void streamBlockInAscii(InetSocketAddress addr, long blockId, 
-                                 AccessToken accessToken, long genStamp, long blockSize, 
+                                 BlockAccessToken accessToken, long genStamp, long blockSize, 
                                  long offsetIntoBlock, long chunkSizeToView, JspWriter out) 
     throws IOException {
     if (chunkSizeToView == 0) return;

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/Storage.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/Storage.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/Storage.java Sat Nov 28 20:05:56 2009
@@ -74,6 +74,9 @@
    * any upgrade code that uses this constant should also be removed. */
   public static final int PRE_GENERATIONSTAMP_LAYOUT_VERSION = -13;
   
+  // last layout version that did not support persistent rbw replicas
+  public static final int PRE_RBW_LAYOUT_VERSION = -19;
+  
   private   static final String STORAGE_FILE_LOCK     = "in_use.lock";
   protected static final String STORAGE_FILE_VERSION  = "VERSION";
   public static final String STORAGE_DIR_CURRENT   = "current";

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Sat Nov 28 20:05:56 2009
@@ -39,6 +39,7 @@
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.Daemon;
@@ -55,7 +56,6 @@
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
   
   private Block block; // the block to receive
-  protected boolean finalized;
   private DataInputStream in = null; // from where data are read
   private DataChecksum checksum; // from where chunks of a block can be read
   private OutputStream out = null; // to block file at local disk
@@ -65,7 +65,6 @@
   private ByteBuffer buf; // contains one full packet.
   private int bufRead; //amount of valid data in the buf
   private int maxPacketReadLen;
-  protected long offsetInBlock;
   protected final String inAddr;
   protected final String myAddr;
   private String mirrorAddr;
@@ -73,46 +72,82 @@
   private Daemon responder = null;
   private BlockTransferThrottler throttler;
   private FSDataset.BlockWriteStreams streams;
-  private boolean isRecovery = false;
   private String clientName;
   DatanodeInfo srcDataNode = null;
   private Checksum partialCrc = null;
   private final DataNode datanode;
+  final private ReplicaInPipelineInterface replicaInfo;
 
   BlockReceiver(Block block, DataInputStream in, String inAddr,
-                String myAddr, boolean isRecovery, String clientName, 
-                DatanodeInfo srcDataNode, DataNode datanode) throws IOException {
+                String myAddr, BlockConstructionStage stage, 
+                long newGs, long minBytesRcvd, long maxBytesRcvd, 
+                String clientName, DatanodeInfo srcDataNode, DataNode datanode)
+                throws IOException {
     try{
       this.block = block;
       this.in = in;
       this.inAddr = inAddr;
       this.myAddr = myAddr;
-      this.isRecovery = isRecovery;
       this.clientName = clientName;
-      this.offsetInBlock = 0;
       this.srcDataNode = srcDataNode;
       this.datanode = datanode;
-      this.checksum = DataChecksum.newDataChecksum(in);
-      this.bytesPerChecksum = checksum.getBytesPerChecksum();
-      this.checksumSize = checksum.getChecksumSize();
       //
       // Open local disk out
       //
-      streams = datanode.data.writeToBlock(block, isRecovery);
-      this.finalized = datanode.data.isValidBlock(block);
+      if (clientName.length() == 0) { //replication or move
+        replicaInfo = datanode.data.createTemporary(block);
+      } else {
+        switch (stage) {
+        case PIPELINE_SETUP_CREATE:
+          replicaInfo = datanode.data.createRbw(block);
+          break;
+        case PIPELINE_SETUP_STREAMING_RECOVERY:
+          replicaInfo = datanode.data.recoverRbw(
+              block, newGs, minBytesRcvd, maxBytesRcvd);
+          block.setGenerationStamp(newGs);
+          break;
+        case PIPELINE_SETUP_APPEND:
+          replicaInfo = datanode.data.append(block, newGs, minBytesRcvd);
+          if (datanode.blockScanner != null) { // remove from block scanner
+            datanode.blockScanner.deleteBlock(block);
+          }
+          block.setGenerationStamp(newGs);
+          break;
+        case PIPELINE_SETUP_APPEND_RECOVERY:
+          replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
+          if (datanode.blockScanner != null) { // remove from block scanner
+            datanode.blockScanner.deleteBlock(block);
+          }
+          block.setGenerationStamp(newGs);
+          break;
+        default: throw new IOException("Unsupported stage " + stage + 
+              " while receiving block " + block + " from " + inAddr);
+        }
+      }
+      // read checksum meta information
+      this.checksum = DataChecksum.newDataChecksum(in);
+      this.bytesPerChecksum = checksum.getBytesPerChecksum();
+      this.checksumSize = checksum.getChecksumSize();
+      
+      boolean isCreate = stage == BlockConstructionStage.PIPELINE_SETUP_CREATE 
+      || clientName.length() == 0;
+      streams = replicaInfo.createStreams(isCreate,
+          this.bytesPerChecksum, this.checksumSize);
       if (streams != null) {
         this.out = streams.dataOut;
         this.checksumOut = new DataOutputStream(new BufferedOutputStream(
                                                   streams.checksumOut, 
                                                   SMALL_BUFFER_SIZE));
-        // If this block is for appends, then remove it from periodic
-        // validation.
-        if (datanode.blockScanner != null && isRecovery) {
-          datanode.blockScanner.deleteBlock(block);
-        }
+        
+        // write data chunk header if creating a new replica
+        if (isCreate) {
+          BlockMetadataHeader.writeHeader(checksumOut, checksum);
+        } 
       }
-    } catch (BlockAlreadyExistsException bae) {
+    } catch (ReplicaAlreadyExistsException bae) {
       throw bae;
+    } catch (ReplicaNotFoundException bne) {
+      throw bne;
     } catch(IOException ioe) {
       IOUtils.closeStream(this);
       cleanupBlock();
@@ -288,7 +323,7 @@
    * It tries to read a full packet with single read call.
    * Consecutive packets are usually of the same length.
    */
-  private int readNextPacket() throws IOException {
+  private void readNextPacket() throws IOException {
     /* This dances around buf a little bit, mainly to read 
      * full packet with single read and to accept arbitarary size  
      * for next packet at the same time.
@@ -324,12 +359,6 @@
     int payloadLen = buf.getInt();
     buf.reset();
     
-    if (payloadLen == 0) {
-      //end of stream!
-      buf.limit(buf.position() + SIZE_OF_INTEGER);
-      return 0;
-    }
-    
     // check corrupt values for pktLen, 100MB upper limit should be ok?
     if (payloadLen < 0 || payloadLen > (100*1024*1024)) {
       throw new IOException("Incorrect value for packet payload : " +
@@ -369,42 +398,68 @@
     if (pktSize > maxPacketReadLen) {
       maxPacketReadLen = pktSize;
     }
-    
-    return payloadLen;
   }
   
   /** 
    * Receives and processes a packet. It can contain many chunks.
-   * returns size of the packet.
+   * returns the number of data bytes that the packet has.
    */
   private int receivePacket() throws IOException {
-    
-    int payloadLen = readNextPacket();
-    
-    if (payloadLen <= 0) {
-      return payloadLen;
-    }
+    // read the next packet
+    readNextPacket();
     
     buf.mark();
     //read the header
     buf.getInt(); // packet length
-    offsetInBlock = buf.getLong(); // get offset of packet in block
+    long offsetInBlock = buf.getLong(); // get offset of packet in block
+    
+    if (offsetInBlock > replicaInfo.getNumBytes()) {
+      throw new IOException("Received an out-of-sequence packet for " + block + 
+          "from " + inAddr + " at offset " + offsetInBlock +
+          ". Expecting packet starting at " + replicaInfo.getNumBytes());
+    }
     long seqno = buf.getLong();    // get seqno
     boolean lastPacketInBlock = (buf.get() != 0);
     
+    int len = buf.getInt();
+    if (len < 0) {
+      throw new IOException("Got wrong length during writeBlock(" + block + 
+                            ") from " + inAddr + " at offset " + 
+                            offsetInBlock + ": " + len); 
+    } 
     int endOfHeader = buf.position();
     buf.reset();
     
+    return receivePacket(offsetInBlock, seqno, lastPacketInBlock, len, endOfHeader);
+  }
+
+  /** 
+   * Receives and processes a packet. It can contain many chunks.
+   * returns the number of data bytes that the packet has.
+   */
+  private int receivePacket(long offsetInBlock, long seqno,
+      boolean lastPacketInBlock, int len, int endOfHeader) throws IOException {
     if (LOG.isDebugEnabled()){
       LOG.debug("Receiving one packet for block " + block +
-                " of length " + payloadLen +
+                " of length " + len +
                 " seqno " + seqno +
                 " offsetInBlock " + offsetInBlock +
                 " lastPacketInBlock " + lastPacketInBlock);
     }
     
-    setBlockPosition(offsetInBlock);
+    // update received bytes
+    long firstByteInBlock = offsetInBlock;
+    offsetInBlock += len;
+    if (replicaInfo.getNumBytes() < offsetInBlock) {
+      replicaInfo.setNumBytes(offsetInBlock);
+    }
     
+    // put in queue for pending acks
+    if (responder != null) {
+      ((PacketResponder)responder.getRunnable()).enqueue(seqno,
+                                      lastPacketInBlock, offsetInBlock); 
+    }  
+
     //First write the packet to the mirror:
     if (mirrorOut != null) {
       try {
@@ -416,25 +471,19 @@
     }
 
     buf.position(endOfHeader);        
-    int len = buf.getInt();
     
-    if (len < 0) {
-      throw new IOException("Got wrong length during writeBlock(" + block + 
-                            ") from " + inAddr + " at offset " + 
-                            offsetInBlock + ": " + len); 
-    } 
-
-    if (len == 0) {
-      LOG.debug("Receiving empty packet for block " + block);
+    if (lastPacketInBlock || len == 0) {
+      LOG.debug("Receiving an empty packet or the end of the block " + block);
     } else {
-      offsetInBlock += len;
-
       int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
                                                             checksumSize;
 
       if ( buf.remaining() != (checksumLen + len)) {
-        throw new IOException("Data remaining in packet does not match " +
-                              "sum of checksumLen and dataLen");
+        throw new IOException("Data remaining in packet does not match" +
+                              "sum of checksumLen and dataLen " +
+                              " size remaining: " + buf.remaining() +
+                              " data len: " + len +
+                              " checksum Len: " + checksumLen);
       }
       int checksumOff = buf.position();
       int dataOff = checksumOff + checksumLen;
@@ -454,9 +503,29 @@
       }
 
       try {
-        if (!finalized) {
+        long onDiskLen = replicaInfo.getBytesOnDisk();
+        if (onDiskLen<offsetInBlock) {
           //finally write to the disk :
-          out.write(pktBuf, dataOff, len);
+          
+          if (onDiskLen % bytesPerChecksum != 0) { 
+            // prepare to overwrite last checksum
+            adjustCrcFilePosition();
+          }
+          
+          // If this is a partial chunk, then read in pre-existing checksum
+          if (firstByteInBlock % bytesPerChecksum != 0) {
+            LOG.info("Packet starts at " + firstByteInBlock +
+                     " for block " + block +
+                     " which is not a multiple of bytesPerChecksum " +
+                     bytesPerChecksum);
+            long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
+                onDiskLen / bytesPerChecksum * checksumSize;
+            computePartialChunkCrc(onDiskLen, offsetInChecksum, bytesPerChecksum);
+          }
+
+          int startByteToDisk = dataOff+(int)(onDiskLen-firstByteInBlock);
+          int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
+          out.write(pktBuf, startByteToDisk, numBytesToDisk);
 
           // If this is a partial chunk, then verify that this is the only
           // chunk in the packet. Calculate new crc for this chunk.
@@ -468,7 +537,7 @@
                                     " len = " + len + 
                                     " bytesPerChecksum " + bytesPerChecksum);
             }
-            partialCrc.update(pktBuf, dataOff, len);
+            partialCrc.update(pktBuf, startByteToDisk, numBytesToDisk);
             byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize);
             checksumOut.write(buf);
             LOG.debug("Writing out partial crc for data len " + len);
@@ -476,7 +545,10 @@
           } else {
             checksumOut.write(pktBuf, checksumOff, checksumLen);
           }
+          replicaInfo.setBytesOnDisk(offsetInBlock);
           datanode.myMetrics.bytesWritten.inc(len);
+          /// flush entire packet
+          flush();
         }
       } catch (IOException iex) {
         datanode.checkDiskError(iex);
@@ -484,20 +556,11 @@
       }
     }
 
-    /// flush entire packet before sending ack
-    flush();
-
-    // put in queue for pending acks
-    if (responder != null) {
-      ((PacketResponder)responder.getRunnable()).enqueue(seqno,
-                                      lastPacketInBlock); 
-    }
-    
     if (throttler != null) { // throttle I/O
-      throttler.throttle(payloadLen);
+      throttler.throttle(len);
     }
     
-    return payloadLen;
+    return len;
   }
 
   void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException {
@@ -518,10 +581,6 @@
       throttler = throttlerArg;
 
     try {
-      // write data chunk header
-      if (!finalized) {
-        BlockMetadataHeader.writeHeader(checksumOut, checksum);
-      }
       if (clientName.length() > 0) {
         responder = new Daemon(datanode.threadGroup, 
                                new PacketResponder(this, block, mirrIn, 
@@ -530,20 +589,10 @@
       }
 
       /* 
-       * Receive until packet length is zero.
+       * Receive until packet has zero bytes of data.
        */
       while (receivePacket() > 0) {}
 
-      // flush the mirror out
-      if (mirrorOut != null) {
-        try {
-          mirrorOut.writeInt(0); // mark the end of the block
-          mirrorOut.flush();
-        } catch (IOException e) {
-          handleMirrorOutError(e);
-        }
-      }
-
       // wait for all outstanding packet responses. And then
       // indicate responder to gracefully shutdown.
       // Mark that responder has been closed for future processing
@@ -560,7 +609,7 @@
         close();
 
         // Finalize the block. Does this fsync()?
-        block.setNumBytes(offsetInBlock);
+        block.setNumBytes(replicaInfo.getNumBytes());
         datanode.data.finalizeBlock(block);
         datanode.myMetrics.blocksWritten.inc();
       }
@@ -598,29 +647,10 @@
   }
 
   /**
-   * Sets the file pointer in the local block file to the specified value.
+   * Adjust the file pointer in the local meta file so that the last checksum
+   * will be overwritten.
    */
-  private void setBlockPosition(long offsetInBlock) throws IOException {
-    if (finalized) {
-      if (!isRecovery) {
-        throw new IOException("Write to offset " + offsetInBlock +
-                              " of block " + block +
-                              " that is already finalized.");
-      }
-      if (offsetInBlock > datanode.data.getLength(block)) {
-        throw new IOException("Write to offset " + offsetInBlock +
-                              " of block " + block +
-                              " that is already finalized and is of size " +
-                              datanode.data.getLength(block));
-      }
-      return;
-    }
-
-    if (datanode.data.getChannelPosition(block, streams) == offsetInBlock) {
-      return;                   // nothing to do 
-    }
-    long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
-                            offsetInBlock / bytesPerChecksum * checksumSize;
+  private void adjustCrcFilePosition() throws IOException {
     if (out != null) {
      out.flush();
     }
@@ -628,23 +658,8 @@
       checksumOut.flush();
     }
 
-    // If this is a partial chunk, then read in pre-existing checksum
-    if (offsetInBlock % bytesPerChecksum != 0) {
-      LOG.info("setBlockPosition trying to set position to " +
-               offsetInBlock +
-               " for block " + block +
-               " which is not a multiple of bytesPerChecksum " +
-               bytesPerChecksum);
-      computePartialChunkCrc(offsetInBlock, offsetInChecksum, bytesPerChecksum);
-    }
-
-    LOG.info("Changing block file offset of block " + block + " from " + 
-        datanode.data.getChannelPosition(block, streams) +
-             " to " + offsetInBlock +
-             " meta file offset to " + offsetInChecksum);
-
-    // set the position of the block file
-    datanode.data.setChannelPosition(block, streams, offsetInBlock, offsetInChecksum);
+    // rollback the position of the meta file
+    datanode.data.adjustCrcChannelPosition(block, streams, checksumSize);
   }
 
   /**
@@ -732,12 +747,13 @@
      * enqueue the seqno that is still be to acked by the downstream datanode.
      * @param seqno
      * @param lastPacketInBlock
+     * @param lastByteInPacket
      */
-    synchronized void enqueue(long seqno, boolean lastPacketInBlock) {
+    synchronized void enqueue(long seqno, boolean lastPacketInBlock, long lastByteInPacket) {
       if (running) {
         LOG.debug("PacketResponder " + numTargets + " adding seqno " + seqno +
                   " to ack queue.");
-        ackQueue.addLast(new Packet(seqno, lastPacketInBlock));
+        ackQueue.addLast(new Packet(seqno, lastPacketInBlock, lastByteInPacket));
         notifyAll();
       }
     }
@@ -798,9 +814,8 @@
             if (!running || !datanode.shouldRun) {
               break;
             }
-            Packet pkt = ackQueue.removeFirst();
+            Packet pkt = ackQueue.getFirst();
             long expected = pkt.seqno;
-            notifyAll();
             LOG.debug("PacketResponder " + numTargets +
                       " for block " + block + 
                       " acking for packet " + expected);
@@ -808,33 +823,34 @@
             // If this is the last packet in block, then close block
             // file and finalize the block before responding success
             if (pkt.lastPacketInBlock) {
-              if (!receiver.finalized) {
-                receiver.close();
-                final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
-                block.setNumBytes(receiver.offsetInBlock);
-                datanode.data.finalizeBlock(block);
-                datanode.myMetrics.blocksWritten.inc();
-                datanode.notifyNamenodeReceivedBlock(block, 
-                    DataNode.EMPTY_DEL_HINT);
-                if (ClientTraceLog.isInfoEnabled() &&
-                    receiver.clientName.length() > 0) {
-                  long offset = 0;
-                  ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
-                        receiver.inAddr, receiver.myAddr, block.getNumBytes(),
-                        "HDFS_WRITE", receiver.clientName, offset,
-                        datanode.dnRegistration.getStorageID(), block, endTime-startTime));
-                } else {
-                  LOG.info("Received block " + block + 
-                           " of size " + block.getNumBytes() + 
-                           " from " + receiver.inAddr);
-                }
+              receiver.close();
+              final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
+              block.setNumBytes(replicaInfo.getNumBytes());
+              datanode.data.finalizeBlock(block);
+              datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
+              if (ClientTraceLog.isInfoEnabled() &&
+                  receiver.clientName.length() > 0) {
+                long offset = 0;
+                ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
+                    receiver.inAddr, receiver.myAddr, block.getNumBytes(),
+                    "HDFS_WRITE", receiver.clientName, offset,
+                    datanode.dnRegistration.getStorageID(), block, endTime-startTime));
+              } else {
+                LOG.info("Received block " + block + 
+                    " of size " + block.getNumBytes() + 
+                    " from " + receiver.inAddr);
               }
               lastPacket = true;
             }
 
-            replyOut.writeLong(expected);
-            SUCCESS.write(replyOut);
+            ackReply(expected);
             replyOut.flush();
+            // remove the packet from the ack queue
+            removeAckHead();
+            // update the bytes acked
+            if (pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
+              replicaInfo.setBytesAcked(pkt.lastByteInBlock);
+            }
         } catch (Exception e) {
           LOG.warn("IOException in BlockReceiver.lastNodeRun: ", e);
           if (running) {
@@ -854,6 +870,14 @@
                " for block " + block + " terminating");
     }
 
+    // This method is introduced to facilitate testing. Otherwise
+    // there was a little chance to bind an AspectJ advice to such a sequence
+    // of calls
+    private void ackReply(long expected) throws IOException {
+      replyOut.writeLong(expected);
+      SUCCESS.write(replyOut);
+    }
+
     /**
      * Thread to process incoming acks.
      * @see java.lang.Runnable#run()
@@ -870,9 +894,11 @@
       final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
       while (running && datanode.shouldRun && !lastPacketInBlock) {
 
+        boolean isInterrupted = false;
         try {
             DataTransferProtocol.Status op = SUCCESS;
             boolean didRead = false;
+            Packet pkt = null;
             long expected = -2;
             try { 
               // read seqno from downstream datanode
@@ -888,7 +914,6 @@
               } else {
                 LOG.debug("PacketResponder " + numTargets + " got seqno = " + 
                     seqno);
-                Packet pkt = null;
                 synchronized (this) {
                   while (running && datanode.shouldRun && ackQueue.size() == 0) {
                     if (LOG.isDebugEnabled()) {
@@ -897,11 +922,15 @@
                                 " for block " + block +
                                 " waiting for local datanode to finish write.");
                     }
-                    wait();
+                    try {
+                      wait();
+                    } catch (InterruptedException e) {
+                      isInterrupted = true;
+                      throw e;
+                    }
                   }
-                  pkt = ackQueue.removeFirst();
+                  pkt = ackQueue.getFirst();
                   expected = pkt.seqno;
-                  notifyAll();
                   LOG.debug("PacketResponder " + numTargets + " seqno = " + seqno);
                   if (seqno != expected) {
                     throw new IOException("PacketResponder " + numTargets +
@@ -920,7 +949,7 @@
               }
             }
 
-            if (Thread.interrupted()) {
+            if (Thread.interrupted() || isInterrupted) {
               /* The receiver thread cancelled this thread. 
                * We could also check any other status updates from the 
                * receiver thread (e.g. if it is ok to write to replyOut). 
@@ -941,14 +970,12 @@
             
             // If this is the last packet in block, then close block
             // file and finalize the block before responding success
-            if (lastPacketInBlock && !receiver.finalized) {
+            if (lastPacketInBlock) {
               receiver.close();
               final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
-              block.setNumBytes(receiver.offsetInBlock);
+              block.setNumBytes(replicaInfo.getNumBytes());
               datanode.data.finalizeBlock(block);
-              datanode.myMetrics.blocksWritten.inc();
-              datanode.notifyNamenodeReceivedBlock(block, 
-                  DataNode.EMPTY_DEL_HINT);
+              datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
               if (ClientTraceLog.isInfoEnabled() &&
                   receiver.clientName.length() > 0) {
                 long offset = 0;
@@ -964,20 +991,21 @@
             }
 
             // send my status back to upstream datanode
-            replyOut.writeLong(expected); // send seqno upstream
-            SUCCESS.write(replyOut);
+            ackReply(expected);
 
             LOG.debug("PacketResponder " + numTargets + 
                       " for block " + block +
                       " responded my status " +
                       " for seqno " + expected);
 
+            boolean success = true;
             // forward responses from downstream datanodes.
             for (int i = 0; i < numTargets && datanode.shouldRun; i++) {
               try {
                 if (op == SUCCESS) {
                   op = Status.read(mirrorIn);
                   if (op != SUCCESS) {
+                    success = false;
                     LOG.debug("PacketResponder for block " + block +
                               ": error code received from downstream " +
                               " datanode[" + i + "] " + op);
@@ -985,13 +1013,23 @@
                 }
               } catch (Throwable e) {
                 op = ERROR;
+                success = false;
               }
               op.write(replyOut);
             }
             replyOut.flush();
+            
             LOG.debug("PacketResponder " + block + " " + numTargets + 
                       " responded other status " + " for seqno " + expected);
 
+            if (pkt != null) {
+              // remove the packet from the ack queue
+              removeAckHead();
+              // update bytes acked
+              if (success && pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
+                replicaInfo.setBytesAcked(pkt.lastByteInBlock);
+              }
+            }
             // If we were unable to read the seqno from downstream, then stop.
             if (expected == -2) {
               running = false;
@@ -1025,6 +1063,16 @@
       LOG.info("PacketResponder " + numTargets + 
                " for block " + block + " terminating");
     }
+    
+    /**
+     * Remove a packet from the head of the ack queue
+     * 
+     * This should be called only when the ack queue is not empty
+     */
+    private synchronized void removeAckHead() {
+      ackQueue.removeFirst();
+      notifyAll();
+    }
   }
   
   /**
@@ -1033,10 +1081,12 @@
   static private class Packet {
     long seqno;
     boolean lastPacketInBlock;
+    long lastByteInBlock;
 
-    Packet(long seqno, boolean lastPacketInBlock) {
+    Packet(long seqno, boolean lastPacketInBlock, long lastByteInPacket) {
       this.seqno = seqno;
       this.lastPacketInBlock = lastPacketInBlock;
+      this.lastByteInBlock = lastByteInPacket;
     }
   }
 }

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Sat Nov 28 20:05:56 2009
@@ -46,13 +46,18 @@
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
   
   private Block block; // the block to read from
+
+  /** the replica to read from */
+  private final Replica replica;
+  /** The visible length of a replica. */
+  private final long replicaVisibleLength;
+
   private InputStream blockIn; // data stream
   private long blockInPosition = -1; // updated while using transferTo().
   private DataInputStream checksumIn; // checksum datastream
   private DataChecksum checksum; // checksum stream
   private long offset; // starting position to read
   private long endOffset; // ending position
-  private long blockLength;
   private int bytesPerChecksum; // chunk size
   private int checksumSize; // checksum size
   private boolean corruptChecksumOk; // if need to verify checksum
@@ -86,10 +91,29 @@
       throws IOException {
     try {
       this.block = block;
+      synchronized(datanode.data) { 
+        this.replica = datanode.data.getReplica(block.getBlockId());
+        if (replica == null) {
+          throw new ReplicaNotFoundException(block);
+        }
+        this.replicaVisibleLength = replica.getVisibleLength();
+      }
+      if (replica.getGenerationStamp() < block.getGenerationStamp()) {
+        throw new IOException(
+            "replica.getGenerationStamp() < block.getGenerationStamp(), block="
+            + block + ", replica=" + replica);
+      }
+      if (replicaVisibleLength < 0) {
+        throw new IOException("The replica is not readable, block="
+            + block + ", replica=" + replica);
+      }
+      if (DataNode.LOG.isDebugEnabled()) {
+        DataNode.LOG.debug("block=" + block + ", replica=" + replica);
+      }
+      
       this.chunkOffsetOK = chunkOffsetOK;
       this.corruptChecksumOk = corruptChecksumOk;
       this.verifyChecksum = verifyChecksum;
-      this.blockLength = datanode.data.getLength(block);
       this.transferToAllowed = datanode.transferToAllowed;
       this.clientTraceFmt = clientTraceFmt;
 
@@ -119,18 +143,18 @@
        * blockLength.
        */        
       bytesPerChecksum = checksum.getBytesPerChecksum();
-      if (bytesPerChecksum > 10*1024*1024 && bytesPerChecksum > blockLength){
+      if (bytesPerChecksum > 10*1024*1024 && bytesPerChecksum > replicaVisibleLength) {
         checksum = DataChecksum.newDataChecksum(checksum.getChecksumType(),
-                                   Math.max((int)blockLength, 10*1024*1024));
+            Math.max((int)replicaVisibleLength, 10*1024*1024));
         bytesPerChecksum = checksum.getBytesPerChecksum();        
       }
       checksumSize = checksum.getChecksumSize();
 
       if (length < 0) {
-        length = blockLength;
+        length = replicaVisibleLength;
       }
 
-      endOffset = blockLength;
+      endOffset = replicaVisibleLength;
       if (startOffset < 0 || startOffset > endOffset
           || (length + startOffset) > endOffset) {
         String msg = " Offset " + startOffset + " and length " + length
@@ -163,6 +187,18 @@
       }
       seqno = 0;
 
+      //sleep a few times if getBytesOnDisk() < visible length
+      for(int i = 0; i < 30 && replica.getBytesOnDisk() < replicaVisibleLength; i++) {
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException ie) {
+          throw new IOException(ie);
+        }
+      }
+      if (DataNode.LOG.isDebugEnabled()) {
+        DataNode.LOG.debug("replica=" + replica);
+      }
+
       blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
     } catch (IOException ioe) {
       IOUtils.closeStream(this);
@@ -234,10 +270,6 @@
 
     int len = Math.min((int) (endOffset - offset),
                        bytesPerChecksum*maxChunks);
-    if (len == 0) {
-      return 0;
-    }
-
     int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum;
     int packetLen = len + numChunks*checksumSize + 4;
     pkt.clear();
@@ -246,7 +278,7 @@
     pkt.putInt(packetLen);
     pkt.putLong(offset);
     pkt.putLong(seqno);
-    pkt.put((byte)((offset + len >= endOffset) ? 1 : 0));
+    pkt.put((byte)((len == 0) ? 1 : 0));
                //why no ByteBuf.putBoolean()?
     pkt.putInt(len);
     
@@ -407,7 +439,8 @@
         seqno++;
       }
       try {
-        out.writeInt(0); // mark the end of block        
+        // send an empty packet to mark the end of the block
+        sendChunks(pktBuf, maxChunksPerPacket, streamForSendChunks);        
         out.flush();
       } catch (IOException e) { //socket error
         throw ioeToSocketException(e);
@@ -420,7 +453,7 @@
       close();
     }
 
-    blockReadFully = (initialOffset == 0 && offset >= blockLength);
+    blockReadFully = initialOffset == 0 && offset >= replicaVisibleLength;
 
     return totalRead;
   }