You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2009/10/01 01:39:33 UTC

svn commit: r820497 [2/7] - in /hadoop/hdfs/trunk: ./ .eclipse.templates/.launches/ src/contrib/hdfsproxy/ src/docs/src/documentation/content/xdocs/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apach...

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java Wed Sep 30 23:39:30 2009
@@ -17,6 +17,10 @@
  */
 package org.apache.hadoop.hdfs.server.common;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
 /************************************
  * Some handy internal HDFS constants
  *
@@ -80,5 +84,77 @@
       return description;
     }
   }
+
+  /**
+   * Block replica states, which it can go through while being constructed.
+   */
+  static public enum ReplicaState {
+    /** Replica is finalized. The state when replica is not modified. */
+    FINALIZED(0),
+    /** Replica is being written to. */
+    RBW(1),
+    /** Replica is waiting to be recovered. */
+    RWR(2),
+    /** Replica is under recovery. */
+    RUR(3),
+    /** Temporary replica: created for replication and relocation only. */
+    TEMPORARY(4);
+
+    private int value;
+
+    private ReplicaState(int v) {
+      value = v;
+    }
+
+    public int getValue() {
+      return value;
+    }
+
+    public static ReplicaState getState(int v) {
+      return ReplicaState.values()[v];
+    }
+
+    /** Read from in */
+    public static ReplicaState read(DataInput in) throws IOException {
+      return values()[in.readByte()];
+    }
+
+    /** Write to out */
+    public void write(DataOutput out) throws IOException {
+      out.writeByte(ordinal());
+    }
+  }
+
+  /**
+   * States, which a block can go through while it is under construction.
+   */
+  static public enum BlockUCState {
+    /**
+     * Block construction completed.<br>
+     * The block has at least one {@link ReplicaState#FINALIZED} replica,
+     * and is not going to be modified.
+     */
+    COMPLETE,
+    /**
+     * The block is under construction.<br>
+     * It has been recently allocated for write or append.
+     */
+    UNDER_CONSTRUCTION,
+    /**
+     * The block is under recovery.<br>
+     * When a file lease expires its last block may not be {@link #COMPLETE}
+     * and needs to go through a recovery procedure, 
+     * which synchronizes the existing replicas contents.
+     */
+    UNDER_RECOVERY,
+    /**
+     * The block is committed.<br>
+     * The client reported that all bytes are written to data-nodes
+     * with the given generation stamp and block length, but no 
+     * {@link ReplicaState#FINALIZED} 
+     * replicas has yet been reported by data-nodes themselves.
+     */
+    COMMITTED;
+  }
 }
 

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/Storage.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/Storage.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/Storage.java Wed Sep 30 23:39:30 2009
@@ -74,6 +74,9 @@
    * any upgrade code that uses this constant should also be removed. */
   public static final int PRE_GENERATIONSTAMP_LAYOUT_VERSION = -13;
   
+  // last layout version that did not support persistent rbw replicas
+  public static final int PRE_RBW_LAYOUT_VERSION = -19;
+  
   private   static final String STORAGE_FILE_LOCK     = "in_use.lock";
   protected static final String STORAGE_FILE_VERSION  = "VERSION";
   public static final String STORAGE_DIR_CURRENT   = "current";

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Wed Sep 30 23:39:30 2009
@@ -39,6 +39,7 @@
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.Daemon;
@@ -55,7 +56,6 @@
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
   
   private Block block; // the block to receive
-  protected boolean finalized;
   private DataInputStream in = null; // from where data are read
   private DataChecksum checksum; // from where chunks of a block can be read
   private OutputStream out = null; // to block file at local disk
@@ -65,7 +65,6 @@
   private ByteBuffer buf; // contains one full packet.
   private int bufRead; //amount of valid data in the buf
   private int maxPacketReadLen;
-  protected long offsetInBlock;
   protected final String inAddr;
   protected final String myAddr;
   private String mirrorAddr;
@@ -73,46 +72,83 @@
   private Daemon responder = null;
   private BlockTransferThrottler throttler;
   private FSDataset.BlockWriteStreams streams;
-  private boolean isRecovery = false;
   private String clientName;
   DatanodeInfo srcDataNode = null;
   private Checksum partialCrc = null;
   private final DataNode datanode;
+  final private ReplicaInPipelineInterface replicaInfo;
 
   BlockReceiver(Block block, DataInputStream in, String inAddr,
-                String myAddr, boolean isRecovery, String clientName, 
-                DatanodeInfo srcDataNode, DataNode datanode) throws IOException {
+                String myAddr, BlockConstructionStage stage, 
+                long newGs, long minBytesRcvd, long maxBytesRcvd, 
+                String clientName, DatanodeInfo srcDataNode, DataNode datanode)
+                throws IOException {
     try{
       this.block = block;
       this.in = in;
       this.inAddr = inAddr;
       this.myAddr = myAddr;
-      this.isRecovery = isRecovery;
       this.clientName = clientName;
-      this.offsetInBlock = 0;
       this.srcDataNode = srcDataNode;
       this.datanode = datanode;
-      this.checksum = DataChecksum.newDataChecksum(in);
-      this.bytesPerChecksum = checksum.getBytesPerChecksum();
-      this.checksumSize = checksum.getChecksumSize();
       //
       // Open local disk out
       //
-      streams = datanode.data.writeToBlock(block, isRecovery);
-      this.finalized = datanode.data.isValidBlock(block);
+      if (clientName.length() == 0) { //replication or move
+        replicaInfo = datanode.data.createTemporary(block);
+      } else {
+        switch (stage) {
+        case PIPELINE_SETUP_CREATE:
+          replicaInfo = datanode.data.createRbw(block);
+          break;
+        case PIPELINE_SETUP_STREAMING_RECOVERY:
+          replicaInfo = datanode.data.recoverRbw(
+              block, newGs, minBytesRcvd, maxBytesRcvd);
+          block.setGenerationStamp(newGs);
+          break;
+        case PIPELINE_SETUP_APPEND:
+          replicaInfo = datanode.data.append(block, newGs, minBytesRcvd);
+          if (datanode.blockScanner != null) { // remove from block scanner
+            datanode.blockScanner.deleteBlock(block);
+          }
+          block.setGenerationStamp(newGs);
+          break;
+        case PIPELINE_SETUP_APPEND_RECOVERY:
+          replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
+          if (datanode.blockScanner != null) { // remove from block scanner
+            datanode.blockScanner.deleteBlock(block);
+          }
+          block.setGenerationStamp(newGs);
+          break;
+        default: throw new IOException("Unsupported stage " + stage + 
+              " while receiving block " + block + " from " + inAddr);
+        }
+      }
+      streams = replicaInfo.createStreams();
       if (streams != null) {
         this.out = streams.dataOut;
         this.checksumOut = new DataOutputStream(new BufferedOutputStream(
                                                   streams.checksumOut, 
                                                   SMALL_BUFFER_SIZE));
-        // If this block is for appends, then remove it from periodic
-        // validation.
-        if (datanode.blockScanner != null && isRecovery) {
-          datanode.blockScanner.deleteBlock(block);
+        
+        // read checksum meta information
+        this.checksum = DataChecksum.newDataChecksum(in);
+        this.bytesPerChecksum = checksum.getBytesPerChecksum();
+        this.checksumSize = checksum.getChecksumSize();
+        
+        // write data chunk header if creating a new replica
+        if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE 
+            || clientName.length() == 0) {
+          BlockMetadataHeader.writeHeader(checksumOut, checksum);
+        } else {
+          datanode.data.setChannelPosition(block, streams, 0, 
+              BlockMetadataHeader.getHeaderSize());
         }
       }
-    } catch (BlockAlreadyExistsException bae) {
+    } catch (ReplicaAlreadyExistsException bae) {
       throw bae;
+    } catch (ReplicaNotFoundException bne) {
+      throw bne;
     } catch(IOException ioe) {
       IOUtils.closeStream(this);
       cleanupBlock();
@@ -288,7 +324,7 @@
    * It tries to read a full packet with single read call.
    * Consecutive packets are usually of the same length.
    */
-  private int readNextPacket() throws IOException {
+  private void readNextPacket() throws IOException {
     /* This dances around buf a little bit, mainly to read 
      * full packet with single read and to accept arbitarary size  
      * for next packet at the same time.
@@ -324,12 +360,6 @@
     int payloadLen = buf.getInt();
     buf.reset();
     
-    if (payloadLen == 0) {
-      //end of stream!
-      buf.limit(buf.position() + SIZE_OF_INTEGER);
-      return 0;
-    }
-    
     // check corrupt values for pktLen, 100MB upper limit should be ok?
     if (payloadLen < 0 || payloadLen > (100*1024*1024)) {
       throw new IOException("Incorrect value for packet payload : " +
@@ -369,42 +399,58 @@
     if (pktSize > maxPacketReadLen) {
       maxPacketReadLen = pktSize;
     }
-    
-    return payloadLen;
   }
   
   /** 
    * Receives and processes a packet. It can contain many chunks.
-   * returns size of the packet.
+   * returns the number of data bytes that the packet has.
    */
   private int receivePacket() throws IOException {
-    
-    int payloadLen = readNextPacket();
-    
-    if (payloadLen <= 0) {
-      return payloadLen;
-    }
+    // read the next packet
+    readNextPacket();
     
     buf.mark();
     //read the header
     buf.getInt(); // packet length
-    offsetInBlock = buf.getLong(); // get offset of packet in block
+    long offsetInBlock = buf.getLong(); // get offset of packet in block
+    
+    if (offsetInBlock > replicaInfo.getNumBytes()) {
+      throw new IOException("Received an out-of-sequence packet for " + block + 
+          "from " + inAddr + " at offset " + offsetInBlock +
+          ". Expecting packet starting at " + replicaInfo.getNumBytes());
+    }
     long seqno = buf.getLong();    // get seqno
     boolean lastPacketInBlock = (buf.get() != 0);
     
+    int len = buf.getInt();
+    if (len < 0) {
+      throw new IOException("Got wrong length during writeBlock(" + block + 
+                            ") from " + inAddr + " at offset " + 
+                            offsetInBlock + ": " + len); 
+    } 
     int endOfHeader = buf.position();
     buf.reset();
     
     if (LOG.isDebugEnabled()){
       LOG.debug("Receiving one packet for block " + block +
-                " of length " + payloadLen +
+                " of length " + len +
                 " seqno " + seqno +
                 " offsetInBlock " + offsetInBlock +
                 " lastPacketInBlock " + lastPacketInBlock);
     }
     
-    setBlockPosition(offsetInBlock);
+    // update received bytes
+    offsetInBlock += len;
+    if (replicaInfo.getNumBytes() < offsetInBlock) {
+      replicaInfo.setNumBytes(offsetInBlock);
+    }
     
+    // put in queue for pending acks
+    if (responder != null) {
+      ((PacketResponder)responder.getRunnable()).enqueue(seqno,
+                                      lastPacketInBlock, offsetInBlock); 
+    }  
+
     //First write the packet to the mirror:
     if (mirrorOut != null) {
       try {
@@ -416,19 +462,10 @@
     }
 
     buf.position(endOfHeader);        
-    int len = buf.getInt();
     
-    if (len < 0) {
-      throw new IOException("Got wrong length during writeBlock(" + block + 
-                            ") from " + inAddr + " at offset " + 
-                            offsetInBlock + ": " + len); 
-    } 
-
-    if (len == 0) {
-      LOG.debug("Receiving empty packet for block " + block);
+    if (lastPacketInBlock || len == 0) {
+      LOG.debug("Receiving an empty packet or the end of the block " + block);
     } else {
-      offsetInBlock += len;
-
       int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
                                                             checksumSize;
 
@@ -454,8 +491,10 @@
       }
 
       try {
-        if (!finalized) {
+        if (replicaInfo.getBytesOnDisk()<offsetInBlock) {
           //finally write to the disk :
+          setBlockPosition(offsetInBlock-len);
+          
           out.write(pktBuf, dataOff, len);
 
           // If this is a partial chunk, then verify that this is the only
@@ -476,6 +515,7 @@
           } else {
             checksumOut.write(pktBuf, checksumOff, checksumLen);
           }
+          replicaInfo.setBytesOnDisk(offsetInBlock);
           datanode.myMetrics.bytesWritten.inc(len);
         }
       } catch (IOException iex) {
@@ -487,17 +527,11 @@
     /// flush entire packet before sending ack
     flush();
 
-    // put in queue for pending acks
-    if (responder != null) {
-      ((PacketResponder)responder.getRunnable()).enqueue(seqno,
-                                      lastPacketInBlock); 
-    }
-    
     if (throttler != null) { // throttle I/O
-      throttler.throttle(payloadLen);
+      throttler.throttle(len);
     }
     
-    return payloadLen;
+    return len;
   }
 
   void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException {
@@ -518,10 +552,6 @@
       throttler = throttlerArg;
 
     try {
-      // write data chunk header
-      if (!finalized) {
-        BlockMetadataHeader.writeHeader(checksumOut, checksum);
-      }
       if (clientName.length() > 0) {
         responder = new Daemon(datanode.threadGroup, 
                                new PacketResponder(this, block, mirrIn, 
@@ -530,20 +560,10 @@
       }
 
       /* 
-       * Receive until packet length is zero.
+       * Receive until packet has zero bytes of data.
        */
       while (receivePacket() > 0) {}
 
-      // flush the mirror out
-      if (mirrorOut != null) {
-        try {
-          mirrorOut.writeInt(0); // mark the end of the block
-          mirrorOut.flush();
-        } catch (IOException e) {
-          handleMirrorOutError(e);
-        }
-      }
-
       // wait for all outstanding packet responses. And then
       // indicate responder to gracefully shutdown.
       // Mark that responder has been closed for future processing
@@ -560,7 +580,7 @@
         close();
 
         // Finalize the block. Does this fsync()?
-        block.setNumBytes(offsetInBlock);
+        block.setNumBytes(replicaInfo.getNumBytes());
         datanode.data.finalizeBlock(block);
         datanode.myMetrics.blocksWritten.inc();
       }
@@ -601,21 +621,6 @@
    * Sets the file pointer in the local block file to the specified value.
    */
   private void setBlockPosition(long offsetInBlock) throws IOException {
-    if (finalized) {
-      if (!isRecovery) {
-        throw new IOException("Write to offset " + offsetInBlock +
-                              " of block " + block +
-                              " that is already finalized.");
-      }
-      if (offsetInBlock > datanode.data.getLength(block)) {
-        throw new IOException("Write to offset " + offsetInBlock +
-                              " of block " + block +
-                              " that is already finalized and is of size " +
-                              datanode.data.getLength(block));
-      }
-      return;
-    }
-
     if (datanode.data.getChannelPosition(block, streams) == offsetInBlock) {
       return;                   // nothing to do 
     }
@@ -732,12 +737,13 @@
      * enqueue the seqno that is still be to acked by the downstream datanode.
      * @param seqno
      * @param lastPacketInBlock
+     * @param lastByteInPacket
      */
-    synchronized void enqueue(long seqno, boolean lastPacketInBlock) {
+    synchronized void enqueue(long seqno, boolean lastPacketInBlock, long lastByteInPacket) {
       if (running) {
         LOG.debug("PacketResponder " + numTargets + " adding seqno " + seqno +
                   " to ack queue.");
-        ackQueue.addLast(new Packet(seqno, lastPacketInBlock));
+        ackQueue.addLast(new Packet(seqno, lastPacketInBlock, lastByteInPacket));
         notifyAll();
       }
     }
@@ -808,26 +814,22 @@
             // If this is the last packet in block, then close block
             // file and finalize the block before responding success
             if (pkt.lastPacketInBlock) {
-              if (!receiver.finalized) {
-                receiver.close();
-                final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
-                block.setNumBytes(receiver.offsetInBlock);
-                datanode.data.finalizeBlock(block);
-                datanode.myMetrics.blocksWritten.inc();
-                datanode.notifyNamenodeReceivedBlock(block, 
-                    DataNode.EMPTY_DEL_HINT);
-                if (ClientTraceLog.isInfoEnabled() &&
-                    receiver.clientName.length() > 0) {
-                  long offset = 0;
-                  ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
-                        receiver.inAddr, receiver.myAddr, block.getNumBytes(),
-                        "HDFS_WRITE", receiver.clientName, offset,
-                        datanode.dnRegistration.getStorageID(), block, endTime-startTime));
-                } else {
-                  LOG.info("Received block " + block + 
-                           " of size " + block.getNumBytes() + 
-                           " from " + receiver.inAddr);
-                }
+              receiver.close();
+              final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
+              block.setNumBytes(replicaInfo.getNumBytes());
+              datanode.data.finalizeBlock(block);
+              datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
+              if (ClientTraceLog.isInfoEnabled() &&
+                  receiver.clientName.length() > 0) {
+                long offset = 0;
+                ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
+                    receiver.inAddr, receiver.myAddr, block.getNumBytes(),
+                    "HDFS_WRITE", receiver.clientName, offset,
+                    datanode.dnRegistration.getStorageID(), block, endTime-startTime));
+              } else {
+                LOG.info("Received block " + block + 
+                    " of size " + block.getNumBytes() + 
+                    " from " + receiver.inAddr);
               }
               lastPacket = true;
             }
@@ -835,6 +837,9 @@
             replyOut.writeLong(expected);
             SUCCESS.write(replyOut);
             replyOut.flush();
+            if (pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
+              replicaInfo.setBytesAcked(pkt.lastByteInBlock);
+            }
         } catch (Exception e) {
           LOG.warn("IOException in BlockReceiver.lastNodeRun: ", e);
           if (running) {
@@ -867,6 +872,7 @@
       }
 
       boolean lastPacketInBlock = false;
+      Packet pkt = null;
       final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
       while (running && datanode.shouldRun && !lastPacketInBlock) {
 
@@ -889,7 +895,6 @@
               } else {
                 LOG.debug("PacketResponder " + numTargets + " got seqno = " + 
                     seqno);
-                Packet pkt = null;
                 synchronized (this) {
                   while (running && datanode.shouldRun && ackQueue.size() == 0) {
                     if (LOG.isDebugEnabled()) {
@@ -947,14 +952,12 @@
             
             // If this is the last packet in block, then close block
             // file and finalize the block before responding success
-            if (lastPacketInBlock && !receiver.finalized) {
+            if (lastPacketInBlock) {
               receiver.close();
               final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
-              block.setNumBytes(receiver.offsetInBlock);
+              block.setNumBytes(replicaInfo.getNumBytes());
               datanode.data.finalizeBlock(block);
-              datanode.myMetrics.blocksWritten.inc();
-              datanode.notifyNamenodeReceivedBlock(block, 
-                  DataNode.EMPTY_DEL_HINT);
+              datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
               if (ClientTraceLog.isInfoEnabled() &&
                   receiver.clientName.length() > 0) {
                 long offset = 0;
@@ -978,12 +981,14 @@
                       " responded my status " +
                       " for seqno " + expected);
 
+            boolean success = true;
             // forward responses from downstream datanodes.
             for (int i = 0; i < numTargets && datanode.shouldRun; i++) {
               try {
                 if (op == SUCCESS) {
                   op = Status.read(mirrorIn);
                   if (op != SUCCESS) {
+                    success = false;
                     LOG.debug("PacketResponder for block " + block +
                               ": error code received from downstream " +
                               " datanode[" + i + "] " + op);
@@ -991,6 +996,7 @@
                 }
               } catch (Throwable e) {
                 op = ERROR;
+                success = false;
               }
               op.write(replyOut);
             }
@@ -998,6 +1004,10 @@
             LOG.debug("PacketResponder " + block + " " + numTargets + 
                       " responded other status " + " for seqno " + expected);
 
+            if (pkt != null && success && 
+                pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
+              replicaInfo.setBytesAcked(pkt.lastByteInBlock);
+            }
             // If we were unable to read the seqno from downstream, then stop.
             if (expected == -2) {
               running = false;
@@ -1039,10 +1049,12 @@
   static private class Packet {
     long seqno;
     boolean lastPacketInBlock;
+    long lastByteInBlock;
 
-    Packet(long seqno, boolean lastPacketInBlock) {
+    Packet(long seqno, boolean lastPacketInBlock, long lastByteInPacket) {
       this.seqno = seqno;
       this.lastPacketInBlock = lastPacketInBlock;
+      this.lastByteInBlock = lastByteInPacket;
     }
   }
 }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Wed Sep 30 23:39:30 2009
@@ -46,13 +46,18 @@
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
   
   private Block block; // the block to read from
+
+  /** the replica to read from */
+  private final Replica replica;
+  /** The visible length of a replica. */
+  private final long replicaVisibleLength;
+
   private InputStream blockIn; // data stream
   private long blockInPosition = -1; // updated while using transferTo().
   private DataInputStream checksumIn; // checksum datastream
   private DataChecksum checksum; // checksum stream
   private long offset; // starting position to read
   private long endOffset; // ending position
-  private long blockLength;
   private int bytesPerChecksum; // chunk size
   private int checksumSize; // checksum size
   private boolean corruptChecksumOk; // if need to verify checksum
@@ -86,10 +91,29 @@
       throws IOException {
     try {
       this.block = block;
+      synchronized(datanode.data) { 
+        this.replica = datanode.data.getReplica(block.getBlockId());
+        if (replica == null) {
+          throw new ReplicaNotFoundException(block);
+        }
+        this.replicaVisibleLength = replica.getVisibleLength();
+      }
+      if (replica.getGenerationStamp() < block.getGenerationStamp()) {
+        throw new IOException(
+            "replica.getGenerationStamp() < block.getGenerationStamp(), block="
+            + block + ", replica=" + replica);
+      }
+      if (replicaVisibleLength < 0) {
+        throw new IOException("The replica is not readable, block="
+            + block + ", replica=" + replica);
+      }
+      if (DataNode.LOG.isDebugEnabled()) {
+        DataNode.LOG.debug("block=" + block + ", replica=" + replica);
+      }
+      
       this.chunkOffsetOK = chunkOffsetOK;
       this.corruptChecksumOk = corruptChecksumOk;
       this.verifyChecksum = verifyChecksum;
-      this.blockLength = datanode.data.getLength(block);
       this.transferToAllowed = datanode.transferToAllowed;
       this.clientTraceFmt = clientTraceFmt;
 
@@ -119,18 +143,18 @@
        * blockLength.
        */        
       bytesPerChecksum = checksum.getBytesPerChecksum();
-      if (bytesPerChecksum > 10*1024*1024 && bytesPerChecksum > blockLength){
+      if (bytesPerChecksum > 10*1024*1024 && bytesPerChecksum > replicaVisibleLength) {
         checksum = DataChecksum.newDataChecksum(checksum.getChecksumType(),
-                                   Math.max((int)blockLength, 10*1024*1024));
+            Math.max((int)replicaVisibleLength, 10*1024*1024));
         bytesPerChecksum = checksum.getBytesPerChecksum();        
       }
       checksumSize = checksum.getChecksumSize();
 
       if (length < 0) {
-        length = blockLength;
+        length = replicaVisibleLength;
       }
 
-      endOffset = blockLength;
+      endOffset = replicaVisibleLength;
       if (startOffset < 0 || startOffset > endOffset
           || (length + startOffset) > endOffset) {
         String msg = " Offset " + startOffset + " and length " + length
@@ -163,6 +187,18 @@
       }
       seqno = 0;
 
+      //sleep a few times if getBytesOnDisk() < visible length
+      for(int i = 0; i < 30 && replica.getBytesOnDisk() < replicaVisibleLength; i++) {
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException ie) {
+          throw new IOException(ie);
+        }
+      }
+      if (DataNode.LOG.isDebugEnabled()) {
+        DataNode.LOG.debug("replica=" + replica);
+      }
+
       blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
     } catch (IOException ioe) {
       IOUtils.closeStream(this);
@@ -234,10 +270,6 @@
 
     int len = Math.min((int) (endOffset - offset),
                        bytesPerChecksum*maxChunks);
-    if (len == 0) {
-      return 0;
-    }
-
     int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum;
     int packetLen = len + numChunks*checksumSize + 4;
     pkt.clear();
@@ -246,7 +278,7 @@
     pkt.putInt(packetLen);
     pkt.putLong(offset);
     pkt.putLong(seqno);
-    pkt.put((byte)((offset + len >= endOffset) ? 1 : 0));
+    pkt.put((byte)((len == 0) ? 1 : 0));
                //why no ByteBuf.putBoolean()?
     pkt.putInt(len);
     
@@ -407,7 +439,8 @@
         seqno++;
       }
       try {
-        out.writeInt(0); // mark the end of block        
+        // send an empty packet to mark the end of the block
+        sendChunks(pktBuf, maxChunksPerPacket, streamForSendChunks);        
         out.flush();
       } catch (IOException e) { //socket error
         throw ioeToSocketException(e);
@@ -420,7 +453,7 @@
       close();
     }
 
-    blockReadFully = (initialOffset == 0 && offset >= blockLength);
+    blockReadFully = initialOffset == 0 && offset >= replicaVisibleLength;
 
     return totalRead;
   }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java Wed Sep 30 23:39:30 2009
@@ -29,11 +29,11 @@
 import java.io.PrintStream;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Random;
 import java.util.TreeSet;
 import java.util.regex.Matcher;
@@ -211,8 +211,8 @@
   private void init() {
     
     // get the list of blocks and arrange them in random order
-    Block arr[] = dataset.getBlockReport();
-    Collections.shuffle(Arrays.asList(arr));
+    List<Block> arr = dataset.getFinalizedBlocks();
+    Collections.shuffle(arr);
     
     blockInfoSet = new TreeSet<BlockScanInfo>();
     blockMap = new HashMap<Block, BlockScanInfo>();

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Wed Sep 30 23:39:30 2009
@@ -34,11 +34,10 @@
 import java.util.AbstractList;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.EnumSet;
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -55,11 +54,13 @@
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
-import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@@ -68,6 +69,7 @@
 import org.apache.hadoop.hdfs.server.namenode.StreamFile;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockMetaDataInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -75,7 +77,9 @@
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RPC;
@@ -168,8 +172,6 @@
 
   volatile boolean shouldRun = true;
   private LinkedList<Block> receivedBlockList = new LinkedList<Block>();
-  /** list of blocks being recovered */
-  private final Map<Block, Block> ongoingRecovery = new HashMap<Block, Block>();
   private LinkedList<String> delHints = new LinkedList<String>();
   public final static String EMPTY_DEL_HINT = "";
   AtomicInteger xmitsInProgress = new AtomicInteger();
@@ -912,7 +914,7 @@
       processDistributedUpgradeCommand((UpgradeCommand)cmd);
       break;
     case DatanodeProtocol.DNA_RECOVERBLOCK:
-      recoverBlocks(bcmd.getBlocks(), bcmd.getTargets());
+      recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks());
       break;
     case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
       LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
@@ -989,13 +991,12 @@
       // and can be safely GC'ed.
       //
       long brStartTime = now();
-      Block[] bReport = data.getBlockReport();
+      BlockListAsLongs bReport = data.getBlockReport();
 
-      cmd = namenode.blockReport(dnRegistration,
-              BlockListAsLongs.convertToArrayLongs(bReport));
+      cmd = namenode.blockReport(dnRegistration, bReport.getBlockListAsLongs());
       long brTime = now() - brStartTime;
       myMetrics.blockReports.inc(brTime);
-      LOG.info("BlockReport of " + bReport.length +
+      LOG.info("BlockReport of " + bReport.getNumberOfBlocks() +
           " blocks got processed in " + brTime + " msecs");
       //
       // If we have sent the first block report, then wait a random
@@ -1250,7 +1251,8 @@
               EnumSet.of(AccessTokenHandler.AccessMode.WRITE));
         }
         DataTransferProtocol.Sender.opWriteBlock(out,
-            b.getBlockId(), b.getGenerationStamp(), 0, false, "",
+            b.getBlockId(), b.getGenerationStamp(), 0, 
+            BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0, 0, "",
             srcNode, targets, accessToken);
 
         // send data & checksum
@@ -1273,6 +1275,20 @@
       }
     }
   }
+  
+  /**
+   * After a block becomes finalized, a datanode increases metric counter,
+   * notifies namenode, and adds it to the block scanner
+   * @param block
+   * @param delHint
+   */
+  void closeBlock(Block block, String delHint) {
+    myMetrics.blocksWritten.inc();
+    notifyNamenodeReceivedBlock(block, delHint);
+    if (blockScanner != null) {
+      blockScanner.addBlock(block);
+    }
+  }
 
   /**
    * No matter what kind of exception we get, keep retrying to offerService().
@@ -1514,16 +1530,16 @@
     return info;
   }
 
-  public Daemon recoverBlocks(final Block[] blocks, final DatanodeInfo[][] targets) {
+  public Daemon recoverBlocks(final Collection<RecoveringBlock> blocks) {
     Daemon d = new Daemon(threadGroup, new Runnable() {
       /** Recover a list of blocks. It is run by the primary datanode. */
       public void run() {
-        for(int i = 0; i < blocks.length; i++) {
+        for(RecoveringBlock b : blocks) {
           try {
-            logRecoverBlock("NameNode", blocks[i], targets[i]);
-            recoverBlock(blocks[i], false, targets[i], true);
+            logRecoverBlock("NameNode", b.getBlock(), b.getLocations());
+            recoverBlock(b);
           } catch (IOException e) {
-            LOG.warn("recoverBlocks FAILED, blocks[" + i + "]=" + blocks[i], e);
+            LOG.warn("recoverBlocks FAILED: " + b, e);
           }
         }
       }
@@ -1548,6 +1564,38 @@
     }
   }
 
+  @Override // InterDatanodeProtocol
+  public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
+  throws IOException {
+    return data.initReplicaRecovery(rBlock);
+  }
+
+  /**
+   * Convenience method, which unwraps RemoteException.
+   * @throws IOException not a RemoteException.
+   */
+  private static ReplicaRecoveryInfo callInitReplicaRecovery(
+      InterDatanodeProtocol datanode,
+      RecoveringBlock rBlock) throws IOException {
+    try {
+      return datanode.initReplicaRecovery(rBlock);
+    } catch(RemoteException re) {
+      throw re.unwrapRemoteException();
+    }
+  }
+
+  /**
+   * Update replica with the new generation stamp and length.  
+   */
+  @Override // InterDatanodeProtocol
+  public Block updateReplicaUnderRecovery(Block oldBlock,
+                                          long recoveryId,
+                                          long newLength) throws IOException {
+    ReplicaInfo r =
+      data.updateReplicaUnderRecovery(oldBlock, recoveryId, newLength);
+    return new Block(r);
+  }
+
   /** {@inheritDoc} */
   public long getProtocolVersion(String protocol, long clientVersion
       ) throws IOException {
@@ -1560,164 +1608,171 @@
         + ": " + protocol);
   }
 
-  /** A convenient class used in lease recovery */
+  /** A convenient class used in block recovery */
   private static class BlockRecord { 
     final DatanodeID id;
     final InterDatanodeProtocol datanode;
-    final Block block;
+    final ReplicaRecoveryInfo rInfo;
     
-    BlockRecord(DatanodeID id, InterDatanodeProtocol datanode, Block block) {
+    BlockRecord(DatanodeID id,
+                InterDatanodeProtocol datanode,
+                ReplicaRecoveryInfo rInfo) {
       this.id = id;
       this.datanode = datanode;
-      this.block = block;
+      this.rInfo = rInfo;
     }
 
     /** {@inheritDoc} */
     public String toString() {
-      return "block:" + block + " node:" + id;
+      return "block:" + rInfo + " node:" + id;
     }
   }
 
   /** Recover a block */
-  private LocatedBlock recoverBlock(Block block, boolean keepLength,
-      DatanodeInfo[] targets, boolean closeFile) throws IOException {
-
+  private void recoverBlock(RecoveringBlock rBlock) throws IOException {
+    Block block = rBlock.getBlock();
+    DatanodeInfo[] targets = rBlock.getLocations();
     DatanodeID[] datanodeids = (DatanodeID[])targets;
-    // If the block is already being recovered, then skip recovering it.
-    // This can happen if the namenode and client start recovering the same
-    // file at the same time.
-    synchronized (ongoingRecovery) {
-      Block tmp = new Block();
-      tmp.set(block.getBlockId(), block.getNumBytes(), GenerationStamp.WILDCARD_STAMP);
-      if (ongoingRecovery.get(tmp) != null) {
-        String msg = "Block " + block + " is already being recovered, " +
-                     " ignoring this request to recover it.";
-        LOG.info(msg);
-        throw new IOException(msg);
-      }
-      ongoingRecovery.put(block, block);
-    }
-    try {
-      List<BlockRecord> syncList = new ArrayList<BlockRecord>();
-      long minlength = Long.MAX_VALUE;
-      int errorCount = 0;
+    List<BlockRecord> syncList = new ArrayList<BlockRecord>(datanodeids.length);
+    int errorCount = 0;
 
-      //check generation stamps
-      for(DatanodeID id : datanodeids) {
-        try {
-          InterDatanodeProtocol datanode = dnRegistration.equals(id)?
-              this: DataNode.createInterDataNodeProtocolProxy(id, getConf());
-          BlockMetaDataInfo info = datanode.getBlockMetaDataInfo(block);
-          if (info != null && info.getGenerationStamp() >= block.getGenerationStamp()) {
-            if (keepLength) {
-              if (info.getNumBytes() == block.getNumBytes()) {
-                syncList.add(new BlockRecord(id, datanode, new Block(info)));
-              }
-            }
-            else {
-              syncList.add(new BlockRecord(id, datanode, new Block(info)));
-              if (info.getNumBytes() < minlength) {
-                minlength = info.getNumBytes();
-              }
-            }
-          }
-        } catch (IOException e) {
-          ++errorCount;
-          InterDatanodeProtocol.LOG.warn(
-              "Failed to getBlockMetaDataInfo for block (=" + block 
-              + ") from datanode (=" + id + ")", e);
+    //check generation stamps
+    for(DatanodeID id : datanodeids) {
+      try {
+        InterDatanodeProtocol datanode = dnRegistration.equals(id)?
+            this: DataNode.createInterDataNodeProtocolProxy(id, getConf());
+        ReplicaRecoveryInfo info = callInitReplicaRecovery(datanode, rBlock);
+        if (info != null &&
+            info.getGenerationStamp() >= block.getGenerationStamp() &&
+            info.getNumBytes() > 0) {
+          syncList.add(new BlockRecord(id, datanode, info));
         }
+      } catch (RecoveryInProgressException ripE) {
+        InterDatanodeProtocol.LOG.warn(
+            "Recovery for replica " + block + " on data-node " + id
+            + " is already in progress. Recovery id = "
+            + rBlock.getNewGenerationStamp() + " is aborted.", ripE);
+        return;
+      } catch (IOException e) {
+        ++errorCount;
+        InterDatanodeProtocol.LOG.warn(
+            "Failed to obtain replica info for block (=" + block 
+            + ") from datanode (=" + id + ")", e);
       }
+    }
 
-      if (syncList.isEmpty() && errorCount > 0) {
-        throw new IOException("All datanodes failed: block=" + block
-            + ", datanodeids=" + Arrays.asList(datanodeids));
-      }
-      if (!keepLength) {
-        block.setNumBytes(minlength);
-      }
-      return syncBlock(block, syncList, targets, closeFile);
-    } finally {
-      synchronized (ongoingRecovery) {
-        ongoingRecovery.remove(block);
-      }
+    if (errorCount == datanodeids.length) {
+      throw new IOException("All datanodes failed: block=" + block
+          + ", datanodeids=" + Arrays.asList(datanodeids));
     }
+
+    syncBlock(rBlock, syncList);
   }
 
   /** Block synchronization */
-  private LocatedBlock syncBlock(Block block, List<BlockRecord> syncList,
-      DatanodeInfo[] targets, boolean closeFile) throws IOException {
+  private void syncBlock(RecoveringBlock rBlock,
+                         List<BlockRecord> syncList) throws IOException {
+    Block block = rBlock.getBlock();
+    long recoveryId = rBlock.getNewGenerationStamp();
     if (LOG.isDebugEnabled()) {
       LOG.debug("block=" + block + ", (length=" + block.getNumBytes()
-          + "), syncList=" + syncList + ", closeFile=" + closeFile);
+          + "), syncList=" + syncList);
     }
 
-    //syncList.isEmpty() that all datanodes do not have the block
-    //so the block can be deleted.
+    // syncList.isEmpty() means that all data-nodes do not have the block
+    // or their replicas have 0 length.
+    // The block can be deleted.
     if (syncList.isEmpty()) {
-      namenode.commitBlockSynchronization(block, 0, 0, closeFile, true,
-          DatanodeID.EMPTY_ARRAY);
-      //always return a new access token even if everything else stays the same
-      LocatedBlock b = new LocatedBlock(block, targets);
-      if (isAccessTokenEnabled) {
-        b.setAccessToken(accessTokenHandler.generateToken(null, b.getBlock()
-            .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
-      }
-      return b;
+      namenode.commitBlockSynchronization(block, recoveryId, 0,
+          true, true, DatanodeID.EMPTY_ARRAY);
+      return;
     }
 
-    List<DatanodeID> successList = new ArrayList<DatanodeID>();
+    // Calculate the best available replica state.
+    ReplicaState bestState = ReplicaState.RWR;
+    long finalizedLength = -1;
+    for(BlockRecord r : syncList) {
+      assert r.rInfo.getNumBytes() > 0 : "zero length replica";
+      ReplicaState rState = r.rInfo.getOriginalReplicaState(); 
+      if(rState.getValue() < bestState.getValue())
+        bestState = rState;
+      if(rState == ReplicaState.FINALIZED) {
+        if(finalizedLength > 0 && finalizedLength != r.rInfo.getNumBytes())
+          throw new IOException("Inconsistent size of finalized replicas. " +
+              "Replica " + r.rInfo + " expected size: " + finalizedLength);
+        finalizedLength = r.rInfo.getNumBytes();
+      }
+    }
 
-    long generationstamp = namenode.nextGenerationStamp(block);
-    Block newblock = new Block(block.getBlockId(), block.getNumBytes(), generationstamp);
+    // Calculate list of nodes that will participate in the recovery
+    // and the new block size
+    List<BlockRecord> participatingList = new ArrayList<BlockRecord>();
+    Block newBlock = new Block(block.getBlockId(), -1, recoveryId);
+    switch(bestState) {
+    case FINALIZED:
+      assert finalizedLength > 0 : "finalizedLength is not positive";
+      for(BlockRecord r : syncList) {
+        ReplicaState rState = r.rInfo.getOriginalReplicaState();
+        if(rState == ReplicaState.FINALIZED ||
+           rState == ReplicaState.RBW &&
+                      r.rInfo.getNumBytes() == finalizedLength)
+          participatingList.add(r);
+      }
+      newBlock.setNumBytes(finalizedLength);
+      break;
+    case RBW:
+    case RWR:
+      long minLength = Long.MAX_VALUE;
+      for(BlockRecord r : syncList) {
+        ReplicaState rState = r.rInfo.getOriginalReplicaState();
+        if(rState == bestState) {
+          minLength = Math.min(minLength, r.rInfo.getNumBytes());
+          participatingList.add(r);
+        }
+      }
+      newBlock.setNumBytes(minLength);
+      break;
+    case RUR:
+    case TEMPORARY:
+      assert false : "bad replica state: " + bestState;
+    }
 
-    for(BlockRecord r : syncList) {
+    List<DatanodeID> failedList = new ArrayList<DatanodeID>();
+    List<DatanodeID> successList = new ArrayList<DatanodeID>();
+    for(BlockRecord r : participatingList) {
       try {
-        r.datanode.updateBlock(r.block, newblock, closeFile);
+        Block reply = r.datanode.updateReplicaUnderRecovery(
+            r.rInfo, recoveryId, newBlock.getNumBytes());
+        assert reply.equals(newBlock) &&
+               reply.getNumBytes() == newBlock.getNumBytes() :
+          "Updated replica must be the same as the new block.";
         successList.add(r.id);
       } catch (IOException e) {
         InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
-            + newblock + ", datanode=" + r.id + ")", e);
+            + newBlock + ", datanode=" + r.id + ")", e);
+        failedList.add(r.id);
       }
     }
 
-    if (!successList.isEmpty()) {
-      DatanodeID[] nlist = successList.toArray(new DatanodeID[successList.size()]);
-
-      namenode.commitBlockSynchronization(block,
-          newblock.getGenerationStamp(), newblock.getNumBytes(), closeFile, false,
-          nlist);
-      DatanodeInfo[] info = new DatanodeInfo[nlist.length];
-      for (int i = 0; i < nlist.length; i++) {
-        info[i] = new DatanodeInfo(nlist[i]);
-      }
-      LocatedBlock b = new LocatedBlock(newblock, info); // success
-      // should have used client ID to generate access token, but since 
-      // owner ID is not checked, we simply pass null for now.
-      if (isAccessTokenEnabled) {
-        b.setAccessToken(accessTokenHandler.generateToken(null, b.getBlock()
-            .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+    // If any of the data-nodes failed, the recovery fails, because
+    // we never know the actual state of the replica on failed data-nodes.
+    // The recovery should be started over.
+    if(!failedList.isEmpty()) {
+      StringBuilder b = new StringBuilder();
+      for(DatanodeID id : failedList) {
+        b.append("\n  " + id);
       }
-      return b;
+      throw new IOException("Cannot recover " + block + ", the following "
+          + failedList.size() + " data-nodes failed {" + b + "\n}");
     }
 
-    //failed
-    StringBuilder b = new StringBuilder();
-    for(BlockRecord r : syncList) {
-      b.append("\n  " + r.id);
-    }
-    throw new IOException("Cannot recover " + block + ", none of these "
-        + syncList.size() + " datanodes success {" + b + "\n}");
+    // Notify the name-node about successfully recovered replicas.
+    DatanodeID[] nlist = successList.toArray(new DatanodeID[successList.size()]);
+    namenode.commitBlockSynchronization(block,
+        newBlock.getGenerationStamp(), newBlock.getNumBytes(), true, false,
+        nlist);
   }
   
-  // ClientDataNodeProtocol implementation
-  /** {@inheritDoc} */
-  public LocatedBlock recoverBlock(Block block, boolean keepLength, DatanodeInfo[] targets
-      ) throws IOException {
-    logRecoverBlock("Client", block, targets);
-    return recoverBlock(block, keepLength, targets, false);
-  }
-
   private static void logRecoverBlock(String who,
       Block block, DatanodeID[] targets) {
     StringBuilder msg = new StringBuilder(targets[0].getName());
@@ -1727,4 +1782,11 @@
     LOG.info(who + " calls recoverBlock(block=" + block
         + ", targets=[" + msg + "])");
   }
+
+  // ClientDataNodeProtocol implementation
+  /** {@inheritDoc} */
+  @Override // ClientDataNodeProtocol
+  public long getReplicaVisibleLength(final Block block) throws IOException {
+    return data.getReplicaVisibleLength(block);
+  }
 }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java Wed Sep 30 23:39:30 2009
@@ -53,6 +53,9 @@
   final static String BLOCK_SUBDIR_PREFIX = "subdir";
   final static String BLOCK_FILE_PREFIX = "blk_";
   final static String COPY_FILE_PREFIX = "dncp_";
+  final static String STORAGE_DIR_RBW = "rbw";
+  final static String STORAGE_DIR_FINALIZED = "finalized";
+  final static String STORAGE_DIR_DETACHED = "detach";
   
   private String storageID;
 
@@ -270,6 +273,8 @@
     File curDir = sd.getCurrentDir();
     File prevDir = sd.getPreviousDir();
     assert curDir.exists() : "Current directory must exist.";
+    // Cleanup directory "detach"
+    cleanupDetachDir(new File(curDir, STORAGE_DIR_DETACHED));
     // delete previous dir before upgrading
     if (prevDir.exists())
       deleteDir(prevDir);
@@ -277,8 +282,11 @@
     assert !tmpDir.exists() : "previous.tmp directory must not exist.";
     // rename current to tmp
     rename(curDir, tmpDir);
-    // hardlink blocks
-    linkBlocks(tmpDir, curDir, this.getLayoutVersion());
+    // hard link finalized & rbw blocks
+    linkAllBlocks(tmpDir, curDir);
+    // create current directory if not exists
+    if (!curDir.exists() && !curDir.mkdirs())
+      throw new IOException("Cannot create directory " + curDir);
     // write version file
     this.layoutVersion = FSConstants.LAYOUT_VERSION;
     assert this.namespaceID == nsInfo.getNamespaceID() :
@@ -290,6 +298,30 @@
     LOG.info("Upgrade of " + sd.getRoot()+ " is complete.");
   }
 
+  /**
+   * Cleanup the detachDir. 
+   * 
+   * If the directory is not empty report an error; 
+   * Otherwise remove the directory.
+   * 
+   * @param detachDir detach directory
+   * @throws IOException if the directory is not empty or it can not be removed
+   */
+  private void cleanupDetachDir(File detachDir) throws IOException {
+    if (layoutVersion >= PRE_RBW_LAYOUT_VERSION &&
+        detachDir.exists() && detachDir.isDirectory() ) {
+      
+        if (detachDir.list().length != 0 ) {
+          throw new IOException("Detached directory " + detachDir +
+              " is not empty. Please manually move each file under this " +
+              "directory to the finalized directory if the finalized " +
+              "directory tree does not have the file.");
+        } else if (!detachDir.delete()) {
+          throw new IOException("Cannot remove directory " + detachDir);
+        }
+    }
+  }
+  
   void doRollback( StorageDirectory sd,
                    NamespaceInfo nsInfo
                    ) throws IOException {
@@ -359,8 +391,34 @@
       doFinalize(it.next());
     }
   }
+
+  /**
+   * Hardlink all finalized and RBW blocks in fromDir to toDir
+   * @param fromDir directory where the snapshot is stored
+   * @param toDir the current data directory
+   * @throws IOException if error occurs during hardlink
+   */
+  private void linkAllBlocks(File fromDir, File toDir) throws IOException {
+    // do the link
+    int diskLayoutVersion = this.getLayoutVersion();
+    if (diskLayoutVersion < PRE_RBW_LAYOUT_VERSION) { // RBW version
+      // hardlink finalized blocks in tmpDir/finalized
+      linkBlocks(new File(fromDir, STORAGE_DIR_FINALIZED), 
+          new File(toDir, STORAGE_DIR_FINALIZED), diskLayoutVersion);
+      // hardlink rbw blocks in tmpDir/finalized
+      linkBlocks(new File(fromDir, STORAGE_DIR_RBW), 
+          new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion);
+    } else { // pre-RBW version
+      // hardlink finalized blocks in tmpDir
+      linkBlocks(fromDir, 
+          new File(toDir, STORAGE_DIR_FINALIZED), diskLayoutVersion);      
+    }    
+  }
   
   static void linkBlocks(File from, File to, int oldLV) throws IOException {
+    if (!from.exists()) {
+      return;
+    }
     if (!from.isDirectory()) {
       if (from.getName().startsWith(COPY_FILE_PREFIX)) {
         FileInputStream in = new FileInputStream(from);
@@ -387,7 +445,7 @@
       return;
     }
     // from is a directory
-    if (!to.mkdir())
+    if (!to.mkdirs())
       throw new IOException("Cannot create directory " + to);
     String[] blockNames = from.list(new java.io.FilenameFilter() {
         public boolean accept(File dir, String name) {

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Wed Sep 30 23:39:30 2009
@@ -38,6 +38,7 @@
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.MetaDataInputStream;
 import org.apache.hadoop.io.IOUtils;
@@ -208,7 +209,8 @@
    */
   @Override
   protected void opWriteBlock(DataInputStream in, long blockId, long blockGs,
-      int pipelineSize, boolean isRecovery,
+      int pipelineSize, BlockConstructionStage stage,
+      long newGs, long minBytesRcvd, long maxBytesRcvd,
       String client, DatanodeInfo srcDataNode, DatanodeInfo[] targets,
       AccessToken accessToken) throws IOException {
 
@@ -250,11 +252,17 @@
     String firstBadLink = "";           // first datanode that failed in connection setup
     DataTransferProtocol.Status mirrorInStatus = SUCCESS;
     try {
-      // open a block receiver and check if the block does not exist
-      blockReceiver = new BlockReceiver(block, in, 
-          s.getRemoteSocketAddress().toString(),
-          s.getLocalSocketAddress().toString(),
-          isRecovery, client, srcDataNode, datanode);
+      if (client.length() == 0 || 
+          stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
+        // open a block receiver
+        blockReceiver = new BlockReceiver(block, in, 
+            s.getRemoteSocketAddress().toString(),
+            s.getLocalSocketAddress().toString(),
+            stage, newGs, minBytesRcvd, maxBytesRcvd,
+            client, srcDataNode, datanode);
+      } else {
+        datanode.data.recoverClose(block, newGs, minBytesRcvd);
+      }
 
       //
       // Open network conn to backup machine, if 
@@ -282,10 +290,13 @@
 
           // Write header: Copied from DFSClient.java!
           DataTransferProtocol.Sender.opWriteBlock(mirrorOut,
-              block.getBlockId(), block.getGenerationStamp(), pipelineSize,
-              isRecovery, client, srcDataNode, targets, accessToken);
+              blockId, blockGs, 
+              pipelineSize, stage, newGs, minBytesRcvd, maxBytesRcvd, client, 
+              srcDataNode, targets, accessToken);
 
-          blockReceiver.writeChecksumHeader(mirrorOut);
+          if (blockReceiver != null) { // send checksum header
+            blockReceiver.writeChecksumHeader(mirrorOut);
+          }
           mirrorOut.flush();
 
           // read connect ack (only for clients, not for replication req)
@@ -336,24 +347,31 @@
       }
 
       // receive the block and mirror to the next target
-      String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
-      blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
-                                 mirrorAddr, null, targets.length);
+      if (blockReceiver != null) {
+        String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
+        blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
+            mirrorAddr, null, targets.length);
+      }
 
-      // if this write is for a replication request (and not
-      // from a client), then confirm block. For client-writes,
+      // update its generation stamp
+      if (client.length() != 0 && 
+          stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
+        block.setGenerationStamp(newGs);
+        block.setNumBytes(minBytesRcvd);
+      }
+      
+      // if this write is for a replication request or recovering
+      // a failed close for client, then confirm block. For other client-writes,
       // the block is finalized in the PacketResponder.
-      if (client.length() == 0) {
-        datanode.notifyNamenodeReceivedBlock(block, DataNode.EMPTY_DEL_HINT);
+      if (client.length() == 0 || 
+          stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
+        datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
         LOG.info("Received block " + block + 
                  " src: " + remoteAddress +
                  " dest: " + localAddress +
                  " of size " + block.getNumBytes());
       }
 
-      if (datanode.blockScanner != null) {
-        datanode.blockScanner.addBlock(block);
-      }
       
     } catch (IOException ioe) {
       LOG.info("writeBlock " + block + " received exception " + ioe);
@@ -569,7 +587,7 @@
       blockReceiver = new BlockReceiver(
           block, proxyReply, proxySock.getRemoteSocketAddress().toString(),
           proxySock.getLocalSocketAddress().toString(),
-          false, "", null, datanode);
+          null, 0, 0, 0, "", null, datanode);
 
       // receive a block
       blockReceiver.receiveBlock(null, null, null, null,