You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2009/09/29 08:59:49 UTC

svn commit: r819843 - in /hadoop/hdfs/branches/HDFS-265: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/server/datanode/ src/test/hdfs/org/apache/hadoop/hdfs/ src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/

Author: hairong
Date: Tue Sep 29 06:59:47 2009
New Revision: 819843

URL: http://svn.apache.org/viewvc?rev=819843&view=rev
Log:
HDFS-642. Support pipeline close and close error recovery. Contributed by Hairong Kuang.

Modified:
    hadoop/hdfs/branches/HDFS-265/CHANGES.txt
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java

Modified: hadoop/hdfs/branches/HDFS-265/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/CHANGES.txt?rev=819843&r1=819842&r2=819843&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-265/CHANGES.txt Tue Sep 29 06:59:47 2009
@@ -50,6 +50,8 @@
    HDFS-627. Support replica update in data-node.
    (Tsz Wo (Nicholas), SZE and Hairong Kuang via shv)
 
+    HDFS-642. Support pipeline close and close error recovery. (hairong)
+
   IMPROVEMENTS
 
     HDFS-509. Redesign DataNode volumeMap to include all types of Replicas.

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=819843&r1=819842&r2=819843&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java Tue Sep 29 06:59:47 2009
@@ -1431,8 +1431,8 @@
         int dataLen = in.readInt();
       
         // Sanity check the lengths
-        if ( dataLen < 0 || 
-             ( (dataLen % bytesPerChecksum) != 0 && !lastPacketInBlock ) ||
+        if ( ( dataLen <= 0 && !lastPacketInBlock ) ||
+             ( dataLen != 0 && lastPacketInBlock) ||
              (seqno != (lastSeqNo + 1)) ) {
              throw new IOException("BlockReader: error in packet header" +
                                    "(chunkOffset : " + chunkOffset + 
@@ -2598,7 +2598,16 @@
         response.start();
         stage = BlockConstructionStage.DATA_STREAMING;
       }
-
+      
+      private void endBlock() {
+        LOG.debug("Closing old block " + block);
+        this.setName("DataStreamer for file " + src);
+        closeResponder();
+        closeStream();
+        nodes = null;
+        stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
+      }
+      
       /*
        * streamer thread is the only thread that opens streams to datanode, 
        * and closes them. Any error recovery is also done by this thread.
@@ -2642,8 +2651,6 @@
               one = dataQueue.getFirst();
             }
 
-            long offsetInBlock = one.offsetInBlock;
-
             // get new block from namenode.
             if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
               LOG.debug("Allocating new block");
@@ -2655,14 +2662,34 @@
               initDataStreaming();
             }
 
-            if (offsetInBlock >= blockSize) {
+            long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
+            if (lastByteOffsetInBlock > blockSize) {
               throw new IOException("BlockSize " + blockSize +
                   " is smaller than data size. " +
                   " Offset of packet in block " + 
-                  offsetInBlock +
+                  lastByteOffsetInBlock +
                   " Aborting file " + src);
             }
 
+            if (one.lastPacketInBlock) {
+              // wait for all data packets have been successfully acked
+              synchronized (dataQueue) {
+                while (!streamerClosed && !hasError && 
+                    ackQueue.size() != 0 && clientRunning) {
+                  try {
+                    // wait for acks to arrive from datanodes
+                    dataQueue.wait(1000);
+                  } catch (InterruptedException  e) {
+                  }
+                }
+              }
+              if (streamerClosed || hasError || !clientRunning) {
+                continue;
+              }
+              stage = BlockConstructionStage.PIPELINE_CLOSE;
+            }
+            
+            // send the packet
             ByteBuffer buf = one.getBuffer();
 
             synchronized (dataQueue) {
@@ -2674,11 +2701,7 @@
 
             if (LOG.isDebugEnabled()) {
               LOG.debug("DataStreamer block " + block +
-                  " sending packet seqno:" + one.seqno +
-                  " size:" + buf.remaining() +
-                  " offsetInBlock:" + one.offsetInBlock + 
-                  " lastPacketInBlock:" + one.lastPacketInBlock +
-                  " lastByteOffsetInBlock" + one.getLastByteOffsetBlock());
+                  " sending packet " + one);
             }
 
             // write out data to remote datanode
@@ -2690,22 +2713,31 @@
             if (bytesSent < tmpBytesSent) {
               bytesSent = tmpBytesSent;
             }
-            
+
+            if (streamerClosed || hasError || !clientRunning) {
+              continue;
+            }
+
+            // Is this block full?
             if (one.lastPacketInBlock) {
+              // wait for the close packet has been acked
               synchronized (dataQueue) {
-                while (!streamerClosed && !hasError && ackQueue.size() != 0 && clientRunning) {
-                  try {
-                    dataQueue.wait(1000);   // wait for acks to arrive from datanodes
-                  } catch (InterruptedException  e) {
-                  }
+                while (!streamerClosed && !hasError && 
+                    ackQueue.size() != 0 && clientRunning) {
+                  dataQueue.wait(1000);// wait for acks to arrive from datanodes
                 }
               }
-              
-              if (ackQueue.isEmpty()) { // done receiving all acks
-                // indicate end-of-block
-                blockStream.writeInt(0);
-                blockStream.flush();
+              if (streamerClosed || hasError || !clientRunning) {
+                continue;
               }
+
+              endBlock();
+            }
+            if (progress != null) { progress.progress(); }
+
+            // This is used by unit test to trigger race conditions.
+            if (artificialSlowdown != 0 && clientRunning) {
+              Thread.sleep(artificialSlowdown); 
             }
           } catch (Throwable e) {
             LOG.warn("DataStreamer Exception: " + 
@@ -2718,29 +2750,6 @@
               streamerClosed = true;
             }
           }
-
-
-          if (streamerClosed || hasError || !clientRunning) {
-            continue;
-          }
-
-          // Is this block full?
-          if (one.lastPacketInBlock) {
-            LOG.debug("Closing old block " + block);
-            this.setName("DataStreamer for file " + src);
-            closeResponder();
-            closeStream();
-            nodes = null;
-            stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
-          }
-          if (progress != null) { progress.progress(); }
-
-          // This is used by unit test to trigger race conditions.
-          if (artificialSlowdown != 0 && clientRunning) {
-            try { 
-              Thread.sleep(artificialSlowdown); 
-            } catch (InterruptedException e) {}
-          }
         }
         closeInternal();
       }
@@ -2928,7 +2937,15 @@
         boolean doSleep = setupPipelineForAppendOrRecovery();
         
         if (!streamerClosed && clientRunning) {
-          initDataStreaming();
+          if (stage == BlockConstructionStage.PIPELINE_CLOSE) {
+            synchronized (dataQueue) {
+              dataQueue.remove();  // remove the end of block packet
+              dataQueue.notifyAll();
+            }
+            endBlock();
+          } else {
+            initDataStreaming();
+          }
         }
         
         return doSleep;
@@ -3392,15 +3409,6 @@
               ", blockSize=" + blockSize +
               ", appendChunk=" + appendChunk);
         }
-        //
-        // if we allocated a new packet because we encountered a block
-        // boundary, reset bytesCurBlock.
-        //
-        if (bytesCurBlock == blockSize) {
-          currentPacket.lastPacketInBlock = true;
-          bytesCurBlock = 0;
-          lastFlushOffset = -1;
-        }
         waitAndQueuePacket(currentPacket);
         currentPacket = null;
 
@@ -3413,6 +3421,20 @@
         }
         int psize = Math.min((int)(blockSize-bytesCurBlock), writePacketSize);
         computePacketChunkSize(psize, bytesPerChecksum);
+        
+        //
+        // if encountering a block boundary, send an empty packet to 
+        // indicate the end of block and reset bytesCurBlock.
+        //
+        if (bytesCurBlock == blockSize) {
+          currentPacket = new Packet(DataNode.PKT_HEADER_LEN+4, 0, 
+              bytesCurBlock);
+          currentPacket.lastPacketInBlock = true;
+          waitAndQueuePacket(currentPacket);
+          currentPacket = null;
+          bytesCurBlock = 0;
+          lastFlushOffset = -1;
+        }
       }
     }
   
@@ -3556,21 +3578,22 @@
       try {
         flushBuffer();       // flush from all upper layers
 
-        // Mark that this packet is the last packet in block.
-        // If there are no outstanding packets and the last packet
-        // was not the last one in the current block, then create a
-        // packet with empty payload.
-        if (currentPacket == null && bytesCurBlock != 0) {
-          currentPacket = new Packet(packetSize, chunksPerPacket,
-              bytesCurBlock);
-        }
         if (currentPacket != null) { 
+          waitAndQueuePacket(currentPacket);
+        }
+
+        if (bytesCurBlock != 0) {
+          // send an empty packet to mark the end of the block
+          currentPacket = new Packet(DataNode.PKT_HEADER_LEN+4, 0, 
+              bytesCurBlock);
           currentPacket.lastPacketInBlock = true;
         }
 
         flushInternal();             // flush all data to Datanodes
+        LOG.info("Done flushing");
         // get last block before destroying the streamer
         Block lastBlock = streamer.getBlock();
+        LOG.info("Closing the streams...");
         closeThreads(false);
         completeFile(lastBlock);
         leasechecker.remove(src);

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=819843&r1=819842&r2=819843&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue Sep 29 06:59:47 2009
@@ -102,14 +102,8 @@
           replicaInfo = datanode.data.createRbw(block);
           break;
         case PIPELINE_SETUP_STREAMING_RECOVERY:
-          if (datanode.data.isValidBlock(block)) {
-            // pipeline failed after the replica is finalized. This will be 
-            // handled differently when pipeline close/recovery is introduced
-            replicaInfo = datanode.data.append(block, newGs, maxBytesRcvd);
-          } else {
-            replicaInfo = datanode.data.recoverRbw(
-                block, newGs, minBytesRcvd, maxBytesRcvd);
-          }
+          replicaInfo = datanode.data.recoverRbw(
+              block, newGs, minBytesRcvd, maxBytesRcvd);
           block.setGenerationStamp(newGs);
           break;
         case PIPELINE_SETUP_APPEND:
@@ -330,7 +324,7 @@
    * It tries to read a full packet with single read call.
    * Consecutive packets are usually of the same length.
    */
-  private int readNextPacket() throws IOException {
+  private void readNextPacket() throws IOException {
     /* This dances around buf a little bit, mainly to read 
      * full packet with single read and to accept arbitarary size  
      * for next packet at the same time.
@@ -366,12 +360,6 @@
     int payloadLen = buf.getInt();
     buf.reset();
     
-    if (payloadLen == 0) {
-      //end of stream!
-      buf.limit(buf.position() + SIZE_OF_INTEGER);
-      return 0;
-    }
-    
     // check corrupt values for pktLen, 100MB upper limit should be ok?
     if (payloadLen < 0 || payloadLen > (100*1024*1024)) {
       throw new IOException("Incorrect value for packet payload : " +
@@ -411,21 +399,15 @@
     if (pktSize > maxPacketReadLen) {
       maxPacketReadLen = pktSize;
     }
-    
-    return payloadLen;
   }
   
   /** 
    * Receives and processes a packet. It can contain many chunks.
-   * returns size of the packet.
+   * returns the number of data bytes that the packet has.
    */
   private int receivePacket() throws IOException {
-    
-    int payloadLen = readNextPacket();
-    
-    if (payloadLen <= 0) {
-      return payloadLen;
-    }
+    // read the next packet
+    readNextPacket();
     
     buf.mark();
     //read the header
@@ -451,7 +433,7 @@
     
     if (LOG.isDebugEnabled()){
       LOG.debug("Receiving one packet for block " + block +
-                " of length " + payloadLen +
+                " of length " + len +
                 " seqno " + seqno +
                 " offsetInBlock " + offsetInBlock +
                 " lastPacketInBlock " + lastPacketInBlock);
@@ -462,6 +444,12 @@
     if (replicaInfo.getNumBytes() < offsetInBlock) {
       replicaInfo.setNumBytes(offsetInBlock);
     }
+    
+    // put in queue for pending acks
+    if (responder != null) {
+      ((PacketResponder)responder.getRunnable()).enqueue(seqno,
+                                      lastPacketInBlock, offsetInBlock); 
+    }  
 
     //First write the packet to the mirror:
     if (mirrorOut != null) {
@@ -475,8 +463,8 @@
 
     buf.position(endOfHeader);        
     
-    if (len == 0) {
-      LOG.debug("Receiving empty packet for block " + block);
+    if (lastPacketInBlock || len == 0) {
+      LOG.debug("Receiving an empty packet or the end of the block " + block);
     } else {
       int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
                                                             checksumSize;
@@ -539,17 +527,11 @@
     /// flush entire packet before sending ack
     flush();
 
-    // put in queue for pending acks
-    if (responder != null) {
-      ((PacketResponder)responder.getRunnable()).enqueue(seqno,
-                                      lastPacketInBlock, offsetInBlock); 
-    }
-    
     if (throttler != null) { // throttle I/O
-      throttler.throttle(payloadLen);
+      throttler.throttle(len);
     }
     
-    return payloadLen;
+    return len;
   }
 
   void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException {
@@ -578,20 +560,10 @@
       }
 
       /* 
-       * Receive until packet length is zero.
+       * Receive until packet has zero bytes of data.
        */
       while (receivePacket() > 0) {}
 
-      // flush the mirror out
-      if (mirrorOut != null) {
-        try {
-          mirrorOut.writeInt(0); // mark the end of the block
-          mirrorOut.flush();
-        } catch (IOException e) {
-          handleMirrorOutError(e);
-        }
-      }
-
       // wait for all outstanding packet responses. And then
       // indicate responder to gracefully shutdown.
       // Mark that responder has been closed for future processing
@@ -846,9 +818,7 @@
               final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
               block.setNumBytes(replicaInfo.getNumBytes());
               datanode.data.finalizeBlock(block);
-              datanode.myMetrics.blocksWritten.inc();
-              datanode.notifyNamenodeReceivedBlock(block, 
-                  DataNode.EMPTY_DEL_HINT);
+              datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
               if (ClientTraceLog.isInfoEnabled() &&
                   receiver.clientName.length() > 0) {
                 long offset = 0;
@@ -987,9 +957,7 @@
               final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
               block.setNumBytes(replicaInfo.getNumBytes());
               datanode.data.finalizeBlock(block);
-              datanode.myMetrics.blocksWritten.inc();
-              datanode.notifyNamenodeReceivedBlock(block, 
-                  DataNode.EMPTY_DEL_HINT);
+              datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
               if (ClientTraceLog.isInfoEnabled() &&
                   receiver.clientName.length() > 0) {
                 long offset = 0;

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=819843&r1=819842&r2=819843&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Tue Sep 29 06:59:47 2009
@@ -270,10 +270,6 @@
 
     int len = Math.min((int) (endOffset - offset),
                        bytesPerChecksum*maxChunks);
-    if (len == 0) {
-      return 0;
-    }
-
     int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum;
     int packetLen = len + numChunks*checksumSize + 4;
     pkt.clear();
@@ -282,7 +278,7 @@
     pkt.putInt(packetLen);
     pkt.putLong(offset);
     pkt.putLong(seqno);
-    pkt.put((byte)((offset + len >= endOffset) ? 1 : 0));
+    pkt.put((byte)((len == 0) ? 1 : 0));
                //why no ByteBuf.putBoolean()?
     pkt.putInt(len);
     
@@ -443,7 +439,8 @@
         seqno++;
       }
       try {
-        out.writeInt(0); // mark the end of block        
+        // send an empty packet to mark the end of the block
+        sendChunks(pktBuf, maxChunksPerPacket, streamForSendChunks);        
         out.flush();
       } catch (IOException e) { //socket error
         throw ioeToSocketException(e);

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=819843&r1=819842&r2=819843&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Sep 29 06:59:47 2009
@@ -1275,6 +1275,20 @@
       }
     }
   }
+  
+  /**
+   * After a block becomes finalized, a datanode increases metric counter,
+   * notifies namenode, and adds it to the block scanner
+   * @param block
+   * @param delHint
+   */
+  void closeBlock(Block block, String delHint) {
+    myMetrics.blocksWritten.inc();
+    notifyNamenodeReceivedBlock(block, delHint);
+    if (blockScanner != null) {
+      blockScanner.addBlock(block);
+    }
+  }
 
   /**
    * No matter what kind of exception we get, keep retrying to offerService().

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=819843&r1=819842&r2=819843&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Tue Sep 29 06:59:47 2009
@@ -252,12 +252,17 @@
     String firstBadLink = "";           // first datanode that failed in connection setup
     DataTransferProtocol.Status mirrorInStatus = SUCCESS;
     try {
-      // open a block receiver and check if the block does not exist
-      blockReceiver = new BlockReceiver(block, in, 
-          s.getRemoteSocketAddress().toString(),
-          s.getLocalSocketAddress().toString(),
-          stage, newGs, minBytesRcvd, maxBytesRcvd,
-          client, srcDataNode, datanode);
+      if (client.length() == 0 || 
+          stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
+        // open a block receiver
+        blockReceiver = new BlockReceiver(block, in, 
+            s.getRemoteSocketAddress().toString(),
+            s.getLocalSocketAddress().toString(),
+            stage, newGs, minBytesRcvd, maxBytesRcvd,
+            client, srcDataNode, datanode);
+      } else {
+        datanode.data.recoverClose(block, newGs, minBytesRcvd);
+      }
 
       //
       // Open network conn to backup machine, if 
@@ -289,7 +294,9 @@
               pipelineSize, stage, newGs, minBytesRcvd, maxBytesRcvd, client, 
               srcDataNode, targets, accessToken);
 
-          blockReceiver.writeChecksumHeader(mirrorOut);
+          if (blockReceiver != null) { // send checksum header
+            blockReceiver.writeChecksumHeader(mirrorOut);
+          }
           mirrorOut.flush();
 
           // read connect ack (only for clients, not for replication req)
@@ -340,24 +347,30 @@
       }
 
       // receive the block and mirror to the next target
-      String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
-      blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
-                                 mirrorAddr, null, targets.length);
+      if (blockReceiver != null) {
+        String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
+        blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
+            mirrorAddr, null, targets.length);
+      }
 
-      // if this write is for a replication request (and not
-      // from a client), then confirm block. For client-writes,
+      // update its generation stamp
+      if (client.length() != 0 && 
+          stage != BlockConstructionStage.PIPELINE_SETUP_CREATE) {
+        block.setGenerationStamp(newGs);
+      }
+      
+      // if this write is for a replication request or recovering
+      // a failed close for client, then confirm block. For other client-writes,
       // the block is finalized in the PacketResponder.
-      if (client.length() == 0) {
-        datanode.notifyNamenodeReceivedBlock(block, DataNode.EMPTY_DEL_HINT);
+      if (client.length() == 0 || 
+          stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
+        datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
         LOG.info("Received block " + block + 
                  " src: " + remoteAddress +
                  " dest: " + localAddress +
                  " of size " + block.getNumBytes());
       }
 
-      if (datanode.blockScanner != null) {
-        datanode.blockScanner.addBlock(block);
-      }
       
     } catch (IOException ioe) {
       LOG.info("writeBlock " + block + " received exception " + ioe);

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=819843&r1=819842&r2=819843&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Tue Sep 29 06:59:47 2009
@@ -1175,17 +1175,21 @@
     return newReplicaInfo;
   }
 
-  @Override  // FSDatasetInterface
-  public synchronized ReplicaInPipelineInterface recoverAppend(Block b,
-      long newGS, long expectedBlockLen) throws IOException {
-    DataNode.LOG.info("Recover failed append to " + b);
-
+  private ReplicaInfo recoverCheck(Block b, long newGS, 
+      long expectedBlockLen) throws IOException {
     ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
     if (replicaInfo == null) {
       throw new ReplicaNotFoundException(
           ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
     }
     
+    // check state
+    if (replicaInfo.getState() != ReplicaState.FINALIZED &&
+        replicaInfo.getState() != ReplicaState.RBW) {
+      throw new ReplicaNotFoundException(
+          ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA + replicaInfo);
+    }
+
     // check generation stamp
     long replicaGenerationStamp = replicaInfo.getGenerationStamp();
     if (replicaGenerationStamp < b.getGenerationStamp() ||
@@ -1219,20 +1223,39 @@
           " with a length of " + replicaLen + 
           " expected length is " + expectedBlockLen);
     }
+    
+    return replicaInfo;
+  }
+  @Override  // FSDatasetInterface
+  public synchronized ReplicaInPipelineInterface recoverAppend(Block b,
+      long newGS, long expectedBlockLen) throws IOException {
+    DataNode.LOG.info("Recover failed append to " + b);
+
+    ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
 
     // change the replica's state/gs etc.
-    switch (replicaInfo.getState()) {
-    case FINALIZED:
+    if (replicaInfo.getState() == ReplicaState.FINALIZED ) {
       return append((FinalizedReplica)replicaInfo, newGS, b.getNumBytes());
-    case RBW:
+    } else { //RBW
       bumpReplicaGS(replicaInfo, newGS);
       return (ReplicaBeingWritten)replicaInfo;
-    default:
-      throw new ReplicaNotFoundException(
-          ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA + replicaInfo);
     }
   }
 
+  @Override
+  public void recoverClose(Block b, long newGS,
+      long expectedBlockLen) throws IOException {
+    DataNode.LOG.info("Recover failed close " + b);
+    // check replica's state
+    ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
+    // bump the replica's GS
+    bumpReplicaGS(replicaInfo, newGS);
+    // finalize the replica if RBW
+    if (replicaInfo.getState() == ReplicaState.RBW) {
+      finalizeBlock(replicaInfo);
+    }
+  }
+  
   /**
    * Bump a replica's generation stamp to a new one.
    * Its on-disk meta file name is renamed to be the new one too.

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=819843&r1=819842&r2=819843&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Tue Sep 29 06:59:47 2009
@@ -240,6 +240,18 @@
       long newGS, long expectedBlockLen) throws IOException;
   
   /**
+   * Recover a failed pipeline close
+   * It bumps the replica's generation stamp and finalize it if RBW replica
+   * 
+   * @param b block
+   * @param newGS the new generation stamp for the replica
+   * @param expectedBlockLen the number of bytes the replica is expected to have
+   * @throws IOException
+   */
+  public void recoverClose(Block b,
+      long newGS, long expectedBlockLen) throws IOException;
+  
+  /**
    * Update the block to the new generation stamp and length.  
    */
   public void updateBlock(Block oldblock, Block newblock) throws IOException;

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java?rev=819843&r1=819842&r2=819843&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java Tue Sep 29 06:59:47 2009
@@ -33,7 +33,7 @@
   final static String UNFINALIZED_REPLICA = 
     "Cannot append to an unfinalized replica ";
   final static String UNFINALIZED_AND_NONRBW_REPLICA = 
-    "Cannot recover appending to a replica that's not FINALIZED and not RBW ";
+    "Cannot recover append/close to a replica that's not FINALIZED and not RBW ";
   final static String NON_EXISTENT_REPLICA =
     "Cannot append to a non-existent replica ";
   final static String UNEXPECTED_GS_REPLICA =

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=819843&r1=819842&r2=819843&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Tue Sep 29 06:59:47 2009
@@ -46,7 +46,6 @@
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
@@ -154,10 +153,7 @@
 
     sendOut.writeInt(0);           // chunk length
     sendOut.writeInt(0);           // zero checksum
-    
-    // mark the end of block
-    sendOut.writeInt(0);
-    
+        
     //ok finally write a block with 0 len
     SUCCESS.write(recvOut);
     Text.writeString(recvOut, ""); // first bad node
@@ -177,6 +173,11 @@
     if (eofExcepted) {
       ERROR.write(recvOut);
       sendRecvData(description, true);
+    } else if (stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
+      //ok finally write a block with 0 len
+      SUCCESS.write(recvOut);
+      Text.writeString(recvOut, ""); // first bad node
+      sendRecvData(description, false);
     } else {
       writeZeroLengthPacket(block, description);
     }
@@ -208,8 +209,7 @@
       long newGS = firstBlock.getGenerationStamp() + 1;
       testWrite(firstBlock, 
           BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY, 
-          newGS, "Successful for now", false);
-      firstBlock.setGenerationStamp(newGS);
+          newGS, "Cannot recover data streaming to a finalized replica", true);
       // test PIPELINE_SETUP_APPEND on an existing block
       newGS = firstBlock.getGenerationStamp() + 1;
       testWrite(firstBlock, 
@@ -217,10 +217,21 @@
           newGS, "Append to a finalized replica", false);
       firstBlock.setGenerationStamp(newGS);
       // test PIPELINE_SETUP_APPEND_RECOVERY on an existing block
+      file = new Path("dataprotocol1.dat");    
+      DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L);
+      firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
       newGS = firstBlock.getGenerationStamp() + 1;
       testWrite(firstBlock, 
           BlockConstructionStage.PIPELINE_SETUP_APPEND_RECOVERY, newGS,
           "Recover appending to a finalized replica", false);
+      // test PIPELINE_CLOSE_RECOVERY on an existing block
+      file = new Path("dataprotocol2.dat");    
+      DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L);
+      firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
+      newGS = firstBlock.getGenerationStamp() + 1;
+      testWrite(firstBlock, 
+          BlockConstructionStage.PIPELINE_CLOSE_RECOVERY, newGS,
+          "Recover failed close to a finalized replica", false);
       firstBlock.setGenerationStamp(newGS);
 
       /* Test writing to a new block */
@@ -276,11 +287,19 @@
             newGS, "Recover append to a RBW replica", false);
         firstBlock.setGenerationStamp(newGS);
         // test PIPELINE_SETUP_STREAMING_RECOVERY on a RBW block
+        file = new Path("dataprotocol2.dat");    
+        DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L);
+        out = (DFSOutputStream)(fileSys.append(file).
+            getWrappedStream()); 
+        out.write(1);
+        out.hflush();
+        in = fileSys.open(file);
+        firstBlock = DFSTestUtil.getAllBlocks(in).get(0).getBlock();
+        firstBlock.setNumBytes(2L);
         newGS = firstBlock.getGenerationStamp() + 1;
         testWrite(firstBlock, 
             BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY,
             newGS, "Recover a RBW replica", false);
-        firstBlock.setGenerationStamp(newGS);
       } finally {
         IOUtils.closeStream(in);
         IOUtils.closeStream(out);

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java?rev=819843&r1=819842&r2=819843&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java Tue Sep 29 06:59:47 2009
@@ -36,7 +36,7 @@
 
 /** This class implements some of tests posted in HADOOP-2658. */
 public class TestFileAppend3 extends junit.framework.TestCase {
-  static final long BLOCK_SIZE = 3 * 64 * 1024;
+  static final long BLOCK_SIZE = 64 * 1024;
   static final short REPLICATION = 3;
   static final int DATANODE_NUM = 5;
 

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java?rev=819843&r1=819842&r2=819843&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java Tue Sep 29 06:59:47 2009
@@ -48,6 +48,7 @@
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.log4j.Level;
 
 

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=819843&r1=819842&r2=819843&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Tue Sep 29 06:59:47 2009
@@ -484,18 +484,36 @@
   public synchronized ReplicaInPipelineInterface recoverAppend(Block b,
       long newGS, long expectedBlockLen) throws IOException {
     BInfo binfo = blockMap.get(b);
-    if (binfo == null || !binfo.isFinalized()) {
+    if (binfo == null) {
       throw new ReplicaNotFoundException("Block " + b
           + " is not valid, and cannot be appended to.");
     }
     if (binfo.isFinalized()) {
       binfo.unfinalizeBlock();
     }
+    blockMap.remove(b);
     binfo.theBlock.setGenerationStamp(newGS);
+    blockMap.put(binfo.theBlock, binfo);
     return binfo;
   }
 
   @Override
+  public void recoverClose(Block b, long newGS,
+      long expectedBlockLen) throws IOException {
+    BInfo binfo = blockMap.get(b);
+    if (binfo == null) {
+      throw new ReplicaNotFoundException("Block " + b
+          + " is not valid, and cannot be appended to.");
+    }
+    if (!binfo.isFinalized()) {
+      binfo.finalizeBlock(binfo.getNumBytes());
+    }
+    blockMap.remove(b);
+    binfo.theBlock.setGenerationStamp(newGS);
+    blockMap.put(binfo.theBlock, binfo);
+  }
+  
+  @Override
   public synchronized ReplicaInPipelineInterface recoverRbw(Block b,
       long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException {
     BInfo binfo = blockMap.get(b);
@@ -507,7 +525,9 @@
       throw new ReplicaAlreadyExistsException("Block " + b
           + " is valid, and cannot be written to.");
     }
+    blockMap.remove(b);
     binfo.theBlock.setGenerationStamp(newGS);
+    blockMap.put(binfo.theBlock, binfo);
     return binfo;
   }
 

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java?rev=819843&r1=819842&r2=819843&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java Tue Sep 29 06:59:47 2009
@@ -32,8 +32,11 @@
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Sender;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.AccessToken;
@@ -114,17 +117,12 @@
       DataOutputStream out = new DataOutputStream(
           s.getOutputStream());
 
-      out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
-      WRITE_BLOCK.write(out);
-      out.writeLong( block.getBlock().getBlockId());
-      out.writeLong( block.getBlock().getGenerationStamp() );
-      out.writeInt(1);
-      out.writeBoolean( false );       // recovery flag
-      Text.writeString( out, "" );
-      out.writeBoolean(false); // Not sending src node information
-      out.writeInt(0);
-      AccessToken.DUMMY_TOKEN.write(out);
-      
+      Sender.opWriteBlock(out, block.getBlock().getBlockId(), 
+          block.getBlock().getGenerationStamp(), 1, 
+          BlockConstructionStage.PIPELINE_SETUP_CREATE, 
+          0L, 0L, 0L, "", null, new DatanodeInfo[0], 
+          AccessToken.DUMMY_TOKEN);
+
       // write check header
       out.writeByte( 1 );
       out.writeInt( 512 );

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java?rev=819843&r1=819842&r2=819843&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java Tue Sep 29 06:59:47 2009
@@ -41,6 +41,25 @@
   final private static int RUR = 4;
   final private static int NON_EXISTENT = 5;
   
+  // test close
+  @Test
+  public void testClose() throws Exception {
+    MiniDFSCluster cluster = new MiniDFSCluster(new Configuration(), 1, true, null);
+    try {
+      cluster.waitActive();
+      DataNode dn = cluster.getDataNodes().get(0);
+      FSDataset dataSet = (FSDataset)dn.data;
+
+      // set up replicasMap
+      setup(dataSet);
+
+      // test close
+      testClose(dataSet);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
   // test append
   @Test
   public void testAppend() throws Exception {
@@ -79,7 +98,7 @@
     }
   }
   
-  // test writeToRbw
+  // test writeToTemporary
   @Test
   public void testWriteToTempoary() throws Exception {
     MiniDFSCluster cluster = new MiniDFSCluster(new Configuration(), 1, true, null);
@@ -113,9 +132,8 @@
         blocks[TEMPORARY].getGenerationStamp(), vol, 
         vol.createTmpFile(blocks[TEMPORARY]).getParentFile()));
     
-    replicaInfo = new ReplicaBeingWritten(blocks[RBW].getBlockId(),
-        blocks[RBW].getGenerationStamp(), vol, 
-        vol.createRbwFile(blocks[RBW]).getParentFile());
+    replicaInfo = new ReplicaBeingWritten(blocks[RBW], vol, 
+        vol.createRbwFile(blocks[RBW]).getParentFile(), null);
     replicasMap.add(replicaInfo);
     replicaInfo.getBlockFile().createNewFile();
     replicaInfo.getMetaFile().createNewFile();
@@ -127,8 +145,10 @@
   }
   
   private void testAppend(FSDataset dataSet) throws IOException {
-    dataSet.append(blocks[FINALIZED], blocks[FINALIZED].getGenerationStamp()+1, 
+    long newGS = blocks[FINALIZED].getGenerationStamp()+1;
+    dataSet.append(blocks[FINALIZED], newGS, 
         blocks[FINALIZED].getNumBytes());  // successful
+    blocks[FINALIZED].setGenerationStamp(newGS);
     
     try {
       dataSet.append(blocks[TEMPORARY], blocks[TEMPORARY].getGenerationStamp()+1, 
@@ -177,8 +197,106 @@
       Assert.assertEquals(ReplicaNotFoundException.NON_EXISTENT_REPLICA + 
           blocks[NON_EXISTENT], e.getMessage());
     }
+    
+    newGS = blocks[FINALIZED].getGenerationStamp()+1;
+    dataSet.recoverAppend(blocks[FINALIZED], newGS, 
+        blocks[FINALIZED].getNumBytes());  // successful
+    blocks[FINALIZED].setGenerationStamp(newGS);
+    
+    try {
+      dataSet.recoverAppend(blocks[TEMPORARY], blocks[TEMPORARY].getGenerationStamp()+1, 
+          blocks[TEMPORARY].getNumBytes());
+      Assert.fail("Should not have appended to a temporary replica " 
+          + blocks[TEMPORARY]);
+    } catch (ReplicaNotFoundException e) {
+      Assert.assertTrue(e.getMessage().startsWith(
+          ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
+    }
+
+    newGS = blocks[RBW].getGenerationStamp()+1;
+    dataSet.recoverAppend(blocks[RBW], newGS, blocks[RBW].getNumBytes());
+    blocks[RBW].setGenerationStamp(newGS);
+
+    try {
+      dataSet.recoverAppend(blocks[RWR], blocks[RWR].getGenerationStamp()+1,
+          blocks[RBW].getNumBytes());
+      Assert.fail("Should not have appended to an RWR replica" + blocks[RWR]);
+    } catch (ReplicaNotFoundException e) {
+      Assert.assertTrue(e.getMessage().startsWith(
+          ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
+    }
+
+    try {
+      dataSet.recoverAppend(blocks[RUR], blocks[RUR].getGenerationStamp()+1,
+          blocks[RUR].getNumBytes());
+      Assert.fail("Should not have appended to an RUR replica" + blocks[RUR]);
+    } catch (ReplicaNotFoundException e) {
+      Assert.assertTrue(e.getMessage().startsWith(
+          ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
+    }
+
+    try {
+      dataSet.recoverAppend(blocks[NON_EXISTENT], 
+          blocks[NON_EXISTENT].getGenerationStamp(), 
+          blocks[NON_EXISTENT].getNumBytes());
+      Assert.fail("Should not have appended to a non-existent replica " + 
+          blocks[NON_EXISTENT]);
+    } catch (ReplicaNotFoundException e) {
+      Assert.assertTrue(e.getMessage().startsWith(
+          ReplicaNotFoundException.NON_EXISTENT_REPLICA));
+    }
   }
 
+  private void testClose(FSDataset dataSet) throws IOException {
+    long newGS = blocks[FINALIZED].getGenerationStamp()+1;
+    dataSet.recoverClose(blocks[FINALIZED], newGS, 
+        blocks[FINALIZED].getNumBytes());  // successful
+    blocks[FINALIZED].setGenerationStamp(newGS);
+    
+    try {
+      dataSet.recoverClose(blocks[TEMPORARY], blocks[TEMPORARY].getGenerationStamp()+1, 
+          blocks[TEMPORARY].getNumBytes());
+      Assert.fail("Should not have recovered close a temporary replica " 
+          + blocks[TEMPORARY]);
+    } catch (ReplicaNotFoundException e) {
+      Assert.assertTrue(e.getMessage().startsWith(
+          ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
+    }
+
+    newGS = blocks[RBW].getGenerationStamp()+1;
+    dataSet.recoverClose(blocks[RBW], newGS, blocks[RBW].getNumBytes());
+    blocks[RBW].setGenerationStamp(newGS);
+
+    try {
+      dataSet.recoverClose(blocks[RWR], blocks[RWR].getGenerationStamp()+1,
+          blocks[RBW].getNumBytes());
+      Assert.fail("Should not have recovered close an RWR replica" + blocks[RWR]);
+    } catch (ReplicaNotFoundException e) {
+      Assert.assertTrue(e.getMessage().startsWith(
+          ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
+    }
+
+    try {
+      dataSet.recoverClose(blocks[RUR], blocks[RUR].getGenerationStamp()+1,
+          blocks[RUR].getNumBytes());
+      Assert.fail("Should not have recovered close an RUR replica" + blocks[RUR]);
+    } catch (ReplicaNotFoundException e) {
+      Assert.assertTrue(e.getMessage().startsWith(
+          ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
+    }
+
+    try {
+      dataSet.recoverClose(blocks[NON_EXISTENT], 
+          blocks[NON_EXISTENT].getGenerationStamp(), 
+          blocks[NON_EXISTENT].getNumBytes());
+      Assert.fail("Should not have recovered close a non-existent replica " + 
+          blocks[NON_EXISTENT]);
+    } catch (ReplicaNotFoundException e) {
+      Assert.assertTrue(e.getMessage().startsWith(
+          ReplicaNotFoundException.NON_EXISTENT_REPLICA));
+    }
+  }
+  
   private void testWriteToRbw(FSDataset dataSet) throws IOException {
     try {
       dataSet.recoverRbw(blocks[FINALIZED],