You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2009/09/29 20:18:15 UTC

svn commit: r820053 - in /hadoop/hdfs/branches/HDFS-265: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/server/datanode/ src/test/hdfs/org/apache/hadoop/hdfs/ src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/

Author: hairong
Date: Tue Sep 29 18:18:14 2009
New Revision: 820053

URL: http://svn.apache.org/viewvc?rev=820053&view=rev
Log:
Revert the changes of HDFS-642 by merging  -r 819843:819842

Modified:
    hadoop/hdfs/branches/HDFS-265/CHANGES.txt
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java

Modified: hadoop/hdfs/branches/HDFS-265/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/CHANGES.txt?rev=820053&r1=820052&r2=820053&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-265/CHANGES.txt Tue Sep 29 18:18:14 2009
@@ -50,8 +50,6 @@
    HDFS-627. Support replica update in data-node.
    (Tsz Wo (Nicholas), SZE and Hairong Kuang via shv)
 
-    HDFS-642. Support pipeline close and close error recovery. (hairong)
-
   IMPROVEMENTS
 
     HDFS-509. Redesign DataNode volumeMap to include all types of Replicas.

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=820053&r1=820052&r2=820053&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java Tue Sep 29 18:18:14 2009
@@ -1431,8 +1431,8 @@
         int dataLen = in.readInt();
       
         // Sanity check the lengths
-        if ( ( dataLen <= 0 && !lastPacketInBlock ) ||
-             ( dataLen != 0 && lastPacketInBlock) ||
+        if ( dataLen < 0 || 
+             ( (dataLen % bytesPerChecksum) != 0 && !lastPacketInBlock ) ||
              (seqno != (lastSeqNo + 1)) ) {
              throw new IOException("BlockReader: error in packet header" +
                                    "(chunkOffset : " + chunkOffset + 
@@ -2598,16 +2598,7 @@
         response.start();
         stage = BlockConstructionStage.DATA_STREAMING;
       }
-      
-      private void endBlock() {
-        LOG.debug("Closing old block " + block);
-        this.setName("DataStreamer for file " + src);
-        closeResponder();
-        closeStream();
-        nodes = null;
-        stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
-      }
-      
+
       /*
        * streamer thread is the only thread that opens streams to datanode, 
        * and closes them. Any error recovery is also done by this thread.
@@ -2651,6 +2642,8 @@
               one = dataQueue.getFirst();
             }
 
+            long offsetInBlock = one.offsetInBlock;
+
             // get new block from namenode.
             if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
               LOG.debug("Allocating new block");
@@ -2662,34 +2655,14 @@
               initDataStreaming();
             }
 
-            long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
-            if (lastByteOffsetInBlock > blockSize) {
+            if (offsetInBlock >= blockSize) {
               throw new IOException("BlockSize " + blockSize +
                   " is smaller than data size. " +
                   " Offset of packet in block " + 
-                  lastByteOffsetInBlock +
+                  offsetInBlock +
                   " Aborting file " + src);
             }
 
-            if (one.lastPacketInBlock) {
-              // wait for all data packets have been successfully acked
-              synchronized (dataQueue) {
-                while (!streamerClosed && !hasError && 
-                    ackQueue.size() != 0 && clientRunning) {
-                  try {
-                    // wait for acks to arrive from datanodes
-                    dataQueue.wait(1000);
-                  } catch (InterruptedException  e) {
-                  }
-                }
-              }
-              if (streamerClosed || hasError || !clientRunning) {
-                continue;
-              }
-              stage = BlockConstructionStage.PIPELINE_CLOSE;
-            }
-            
-            // send the packet
             ByteBuffer buf = one.getBuffer();
 
             synchronized (dataQueue) {
@@ -2701,7 +2674,11 @@
 
             if (LOG.isDebugEnabled()) {
               LOG.debug("DataStreamer block " + block +
-                  " sending packet " + one);
+                  " sending packet seqno:" + one.seqno +
+                  " size:" + buf.remaining() +
+                  " offsetInBlock:" + one.offsetInBlock + 
+                  " lastPacketInBlock:" + one.lastPacketInBlock +
+                  " lastByteOffsetInBlock" + one.getLastByteOffsetBlock());
             }
 
             // write out data to remote datanode
@@ -2713,31 +2690,22 @@
             if (bytesSent < tmpBytesSent) {
               bytesSent = tmpBytesSent;
             }
-
-            if (streamerClosed || hasError || !clientRunning) {
-              continue;
-            }
-
-            // Is this block full?
+            
             if (one.lastPacketInBlock) {
-              // wait for the close packet has been acked
               synchronized (dataQueue) {
-                while (!streamerClosed && !hasError && 
-                    ackQueue.size() != 0 && clientRunning) {
-                  dataQueue.wait(1000);// wait for acks to arrive from datanodes
+                while (!streamerClosed && !hasError && ackQueue.size() != 0 && clientRunning) {
+                  try {
+                    dataQueue.wait(1000);   // wait for acks to arrive from datanodes
+                  } catch (InterruptedException  e) {
+                  }
                 }
               }
-              if (streamerClosed || hasError || !clientRunning) {
-                continue;
+              
+              if (ackQueue.isEmpty()) { // done receiving all acks
+                // indicate end-of-block
+                blockStream.writeInt(0);
+                blockStream.flush();
               }
-
-              endBlock();
-            }
-            if (progress != null) { progress.progress(); }
-
-            // This is used by unit test to trigger race conditions.
-            if (artificialSlowdown != 0 && clientRunning) {
-              Thread.sleep(artificialSlowdown); 
             }
           } catch (Throwable e) {
             LOG.warn("DataStreamer Exception: " + 
@@ -2750,6 +2718,29 @@
               streamerClosed = true;
             }
           }
+
+
+          if (streamerClosed || hasError || !clientRunning) {
+            continue;
+          }
+
+          // Is this block full?
+          if (one.lastPacketInBlock) {
+            LOG.debug("Closing old block " + block);
+            this.setName("DataStreamer for file " + src);
+            closeResponder();
+            closeStream();
+            nodes = null;
+            stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
+          }
+          if (progress != null) { progress.progress(); }
+
+          // This is used by unit test to trigger race conditions.
+          if (artificialSlowdown != 0 && clientRunning) {
+            try { 
+              Thread.sleep(artificialSlowdown); 
+            } catch (InterruptedException e) {}
+          }
         }
         closeInternal();
       }
@@ -2937,15 +2928,7 @@
         boolean doSleep = setupPipelineForAppendOrRecovery();
         
         if (!streamerClosed && clientRunning) {
-          if (stage == BlockConstructionStage.PIPELINE_CLOSE) {
-            synchronized (dataQueue) {
-              dataQueue.remove();  // remove the end of block packet
-              dataQueue.notifyAll();
-            }
-            endBlock();
-          } else {
-            initDataStreaming();
-          }
+          initDataStreaming();
         }
         
         return doSleep;
@@ -3409,6 +3392,15 @@
               ", blockSize=" + blockSize +
               ", appendChunk=" + appendChunk);
         }
+        //
+        // if we allocated a new packet because we encountered a block
+        // boundary, reset bytesCurBlock.
+        //
+        if (bytesCurBlock == blockSize) {
+          currentPacket.lastPacketInBlock = true;
+          bytesCurBlock = 0;
+          lastFlushOffset = -1;
+        }
         waitAndQueuePacket(currentPacket);
         currentPacket = null;
 
@@ -3421,20 +3413,6 @@
         }
         int psize = Math.min((int)(blockSize-bytesCurBlock), writePacketSize);
         computePacketChunkSize(psize, bytesPerChecksum);
-        
-        //
-        // if encountering a block boundary, send an empty packet to 
-        // indicate the end of block and reset bytesCurBlock.
-        //
-        if (bytesCurBlock == blockSize) {
-          currentPacket = new Packet(DataNode.PKT_HEADER_LEN+4, 0, 
-              bytesCurBlock);
-          currentPacket.lastPacketInBlock = true;
-          waitAndQueuePacket(currentPacket);
-          currentPacket = null;
-          bytesCurBlock = 0;
-          lastFlushOffset = -1;
-        }
       }
     }
   
@@ -3578,22 +3556,21 @@
       try {
         flushBuffer();       // flush from all upper layers
 
-        if (currentPacket != null) { 
-          waitAndQueuePacket(currentPacket);
-        }
-
-        if (bytesCurBlock != 0) {
-          // send an empty packet to mark the end of the block
-          currentPacket = new Packet(DataNode.PKT_HEADER_LEN+4, 0, 
+        // Mark that this packet is the last packet in block.
+        // If there are no outstanding packets and the last packet
+        // was not the last one in the current block, then create a
+        // packet with empty payload.
+        if (currentPacket == null && bytesCurBlock != 0) {
+          currentPacket = new Packet(packetSize, chunksPerPacket,
               bytesCurBlock);
+        }
+        if (currentPacket != null) { 
           currentPacket.lastPacketInBlock = true;
         }
 
         flushInternal();             // flush all data to Datanodes
-        LOG.info("Done flushing");
         // get last block before destroying the streamer
         Block lastBlock = streamer.getBlock();
-        LOG.info("Closing the streams...");
         closeThreads(false);
         completeFile(lastBlock);
         leasechecker.remove(src);

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=820053&r1=820052&r2=820053&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue Sep 29 18:18:14 2009
@@ -102,8 +102,14 @@
           replicaInfo = datanode.data.createRbw(block);
           break;
         case PIPELINE_SETUP_STREAMING_RECOVERY:
-          replicaInfo = datanode.data.recoverRbw(
-              block, newGs, minBytesRcvd, maxBytesRcvd);
+          if (datanode.data.isValidBlock(block)) {
+            // pipeline failed after the replica is finalized. This will be 
+            // handled differently when pipeline close/recovery is introduced
+            replicaInfo = datanode.data.append(block, newGs, maxBytesRcvd);
+          } else {
+            replicaInfo = datanode.data.recoverRbw(
+                block, newGs, minBytesRcvd, maxBytesRcvd);
+          }
           block.setGenerationStamp(newGs);
           break;
         case PIPELINE_SETUP_APPEND:
@@ -324,7 +330,7 @@
    * It tries to read a full packet with single read call.
    * Consecutive packets are usually of the same length.
    */
-  private void readNextPacket() throws IOException {
+  private int readNextPacket() throws IOException {
     /* This dances around buf a little bit, mainly to read 
      * full packet with single read and to accept arbitarary size  
      * for next packet at the same time.
@@ -360,6 +366,12 @@
     int payloadLen = buf.getInt();
     buf.reset();
     
+    if (payloadLen == 0) {
+      //end of stream!
+      buf.limit(buf.position() + SIZE_OF_INTEGER);
+      return 0;
+    }
+    
     // check corrupt values for pktLen, 100MB upper limit should be ok?
     if (payloadLen < 0 || payloadLen > (100*1024*1024)) {
       throw new IOException("Incorrect value for packet payload : " +
@@ -399,15 +411,21 @@
     if (pktSize > maxPacketReadLen) {
       maxPacketReadLen = pktSize;
     }
+    
+    return payloadLen;
   }
   
   /** 
    * Receives and processes a packet. It can contain many chunks.
-   * returns the number of data bytes that the packet has.
+   * returns size of the packet.
    */
   private int receivePacket() throws IOException {
-    // read the next packet
-    readNextPacket();
+    
+    int payloadLen = readNextPacket();
+    
+    if (payloadLen <= 0) {
+      return payloadLen;
+    }
     
     buf.mark();
     //read the header
@@ -433,7 +451,7 @@
     
     if (LOG.isDebugEnabled()){
       LOG.debug("Receiving one packet for block " + block +
-                " of length " + len +
+                " of length " + payloadLen +
                 " seqno " + seqno +
                 " offsetInBlock " + offsetInBlock +
                 " lastPacketInBlock " + lastPacketInBlock);
@@ -444,12 +462,6 @@
     if (replicaInfo.getNumBytes() < offsetInBlock) {
       replicaInfo.setNumBytes(offsetInBlock);
     }
-    
-    // put in queue for pending acks
-    if (responder != null) {
-      ((PacketResponder)responder.getRunnable()).enqueue(seqno,
-                                      lastPacketInBlock, offsetInBlock); 
-    }  
 
     //First write the packet to the mirror:
     if (mirrorOut != null) {
@@ -463,8 +475,8 @@
 
     buf.position(endOfHeader);        
     
-    if (lastPacketInBlock || len == 0) {
-      LOG.debug("Receiving an empty packet or the end of the block " + block);
+    if (len == 0) {
+      LOG.debug("Receiving empty packet for block " + block);
     } else {
       int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
                                                             checksumSize;
@@ -527,11 +539,17 @@
     /// flush entire packet before sending ack
     flush();
 
+    // put in queue for pending acks
+    if (responder != null) {
+      ((PacketResponder)responder.getRunnable()).enqueue(seqno,
+                                      lastPacketInBlock, offsetInBlock); 
+    }
+    
     if (throttler != null) { // throttle I/O
-      throttler.throttle(len);
+      throttler.throttle(payloadLen);
     }
     
-    return len;
+    return payloadLen;
   }
 
   void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException {
@@ -560,10 +578,20 @@
       }
 
       /* 
-       * Receive until packet has zero bytes of data.
+       * Receive until packet length is zero.
        */
       while (receivePacket() > 0) {}
 
+      // flush the mirror out
+      if (mirrorOut != null) {
+        try {
+          mirrorOut.writeInt(0); // mark the end of the block
+          mirrorOut.flush();
+        } catch (IOException e) {
+          handleMirrorOutError(e);
+        }
+      }
+
       // wait for all outstanding packet responses. And then
       // indicate responder to gracefully shutdown.
       // Mark that responder has been closed for future processing
@@ -818,7 +846,9 @@
               final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
               block.setNumBytes(replicaInfo.getNumBytes());
               datanode.data.finalizeBlock(block);
-              datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
+              datanode.myMetrics.blocksWritten.inc();
+              datanode.notifyNamenodeReceivedBlock(block, 
+                  DataNode.EMPTY_DEL_HINT);
               if (ClientTraceLog.isInfoEnabled() &&
                   receiver.clientName.length() > 0) {
                 long offset = 0;
@@ -957,7 +987,9 @@
               final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
               block.setNumBytes(replicaInfo.getNumBytes());
               datanode.data.finalizeBlock(block);
-              datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
+              datanode.myMetrics.blocksWritten.inc();
+              datanode.notifyNamenodeReceivedBlock(block, 
+                  DataNode.EMPTY_DEL_HINT);
               if (ClientTraceLog.isInfoEnabled() &&
                   receiver.clientName.length() > 0) {
                 long offset = 0;

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=820053&r1=820052&r2=820053&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Tue Sep 29 18:18:14 2009
@@ -270,6 +270,10 @@
 
     int len = Math.min((int) (endOffset - offset),
                        bytesPerChecksum*maxChunks);
+    if (len == 0) {
+      return 0;
+    }
+
     int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum;
     int packetLen = len + numChunks*checksumSize + 4;
     pkt.clear();
@@ -278,7 +282,7 @@
     pkt.putInt(packetLen);
     pkt.putLong(offset);
     pkt.putLong(seqno);
-    pkt.put((byte)((len == 0) ? 1 : 0));
+    pkt.put((byte)((offset + len >= endOffset) ? 1 : 0));
                //why no ByteBuf.putBoolean()?
     pkt.putInt(len);
     
@@ -439,8 +443,7 @@
         seqno++;
       }
       try {
-        // send an empty packet to mark the end of the block
-        sendChunks(pktBuf, maxChunksPerPacket, streamForSendChunks);        
+        out.writeInt(0); // mark the end of block        
         out.flush();
       } catch (IOException e) { //socket error
         throw ioeToSocketException(e);

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=820053&r1=820052&r2=820053&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Sep 29 18:18:14 2009
@@ -1275,20 +1275,6 @@
       }
     }
   }
-  
-  /**
-   * After a block becomes finalized, a datanode increases metric counter,
-   * notifies namenode, and adds it to the block scanner
-   * @param block
-   * @param delHint
-   */
-  void closeBlock(Block block, String delHint) {
-    myMetrics.blocksWritten.inc();
-    notifyNamenodeReceivedBlock(block, delHint);
-    if (blockScanner != null) {
-      blockScanner.addBlock(block);
-    }
-  }
 
   /**
    * No matter what kind of exception we get, keep retrying to offerService().

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=820053&r1=820052&r2=820053&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Tue Sep 29 18:18:14 2009
@@ -252,17 +252,12 @@
     String firstBadLink = "";           // first datanode that failed in connection setup
     DataTransferProtocol.Status mirrorInStatus = SUCCESS;
     try {
-      if (client.length() == 0 || 
-          stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
-        // open a block receiver
-        blockReceiver = new BlockReceiver(block, in, 
-            s.getRemoteSocketAddress().toString(),
-            s.getLocalSocketAddress().toString(),
-            stage, newGs, minBytesRcvd, maxBytesRcvd,
-            client, srcDataNode, datanode);
-      } else {
-        datanode.data.recoverClose(block, newGs, minBytesRcvd);
-      }
+      // open a block receiver and check if the block does not exist
+      blockReceiver = new BlockReceiver(block, in, 
+          s.getRemoteSocketAddress().toString(),
+          s.getLocalSocketAddress().toString(),
+          stage, newGs, minBytesRcvd, maxBytesRcvd,
+          client, srcDataNode, datanode);
 
       //
       // Open network conn to backup machine, if 
@@ -294,9 +289,7 @@
               pipelineSize, stage, newGs, minBytesRcvd, maxBytesRcvd, client, 
               srcDataNode, targets, accessToken);
 
-          if (blockReceiver != null) { // send checksum header
-            blockReceiver.writeChecksumHeader(mirrorOut);
-          }
+          blockReceiver.writeChecksumHeader(mirrorOut);
           mirrorOut.flush();
 
           // read connect ack (only for clients, not for replication req)
@@ -347,30 +340,24 @@
       }
 
       // receive the block and mirror to the next target
-      if (blockReceiver != null) {
-        String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
-        blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
-            mirrorAddr, null, targets.length);
-      }
+      String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
+      blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
+                                 mirrorAddr, null, targets.length);
 
-      // update its generation stamp
-      if (client.length() != 0 && 
-          stage != BlockConstructionStage.PIPELINE_SETUP_CREATE) {
-        block.setGenerationStamp(newGs);
-      }
-      
-      // if this write is for a replication request or recovering
-      // a failed close for client, then confirm block. For other client-writes,
+      // if this write is for a replication request (and not
+      // from a client), then confirm block. For client-writes,
       // the block is finalized in the PacketResponder.
-      if (client.length() == 0 || 
-          stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
-        datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
+      if (client.length() == 0) {
+        datanode.notifyNamenodeReceivedBlock(block, DataNode.EMPTY_DEL_HINT);
         LOG.info("Received block " + block + 
                  " src: " + remoteAddress +
                  " dest: " + localAddress +
                  " of size " + block.getNumBytes());
       }
 
+      if (datanode.blockScanner != null) {
+        datanode.blockScanner.addBlock(block);
+      }
       
     } catch (IOException ioe) {
       LOG.info("writeBlock " + block + " received exception " + ioe);

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=820053&r1=820052&r2=820053&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Tue Sep 29 18:18:14 2009
@@ -1175,21 +1175,17 @@
     return newReplicaInfo;
   }
 
-  private ReplicaInfo recoverCheck(Block b, long newGS, 
-      long expectedBlockLen) throws IOException {
+  @Override  // FSDatasetInterface
+  public synchronized ReplicaInPipelineInterface recoverAppend(Block b,
+      long newGS, long expectedBlockLen) throws IOException {
+    DataNode.LOG.info("Recover failed append to " + b);
+
     ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
     if (replicaInfo == null) {
       throw new ReplicaNotFoundException(
           ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
     }
     
-    // check state
-    if (replicaInfo.getState() != ReplicaState.FINALIZED &&
-        replicaInfo.getState() != ReplicaState.RBW) {
-      throw new ReplicaNotFoundException(
-          ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA + replicaInfo);
-    }
-
     // check generation stamp
     long replicaGenerationStamp = replicaInfo.getGenerationStamp();
     if (replicaGenerationStamp < b.getGenerationStamp() ||
@@ -1223,39 +1219,20 @@
           " with a length of " + replicaLen + 
           " expected length is " + expectedBlockLen);
     }
-    
-    return replicaInfo;
-  }
-  @Override  // FSDatasetInterface
-  public synchronized ReplicaInPipelineInterface recoverAppend(Block b,
-      long newGS, long expectedBlockLen) throws IOException {
-    DataNode.LOG.info("Recover failed append to " + b);
-
-    ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
 
     // change the replica's state/gs etc.
-    if (replicaInfo.getState() == ReplicaState.FINALIZED ) {
+    switch (replicaInfo.getState()) {
+    case FINALIZED:
       return append((FinalizedReplica)replicaInfo, newGS, b.getNumBytes());
-    } else { //RBW
+    case RBW:
       bumpReplicaGS(replicaInfo, newGS);
       return (ReplicaBeingWritten)replicaInfo;
+    default:
+      throw new ReplicaNotFoundException(
+          ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA + replicaInfo);
     }
   }
 
-  @Override
-  public void recoverClose(Block b, long newGS,
-      long expectedBlockLen) throws IOException {
-    DataNode.LOG.info("Recover failed close " + b);
-    // check replica's state
-    ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
-    // bump the replica's GS
-    bumpReplicaGS(replicaInfo, newGS);
-    // finalize the replica if RBW
-    if (replicaInfo.getState() == ReplicaState.RBW) {
-      finalizeBlock(replicaInfo);
-    }
-  }
-  
   /**
    * Bump a replica's generation stamp to a new one.
    * Its on-disk meta file name is renamed to be the new one too.

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=820053&r1=820052&r2=820053&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Tue Sep 29 18:18:14 2009
@@ -240,18 +240,6 @@
       long newGS, long expectedBlockLen) throws IOException;
   
   /**
-   * Recover a failed pipeline close
-   * It bumps the replica's generation stamp and finalize it if RBW replica
-   * 
-   * @param b block
-   * @param newGS the new generation stamp for the replica
-   * @param expectedBlockLen the number of bytes the replica is expected to have
-   * @throws IOException
-   */
-  public void recoverClose(Block b,
-      long newGS, long expectedBlockLen) throws IOException;
-  
-  /**
    * Update the block to the new generation stamp and length.  
    */
   public void updateBlock(Block oldblock, Block newblock) throws IOException;

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java?rev=820053&r1=820052&r2=820053&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java Tue Sep 29 18:18:14 2009
@@ -33,7 +33,7 @@
   final static String UNFINALIZED_REPLICA = 
     "Cannot append to an unfinalized replica ";
   final static String UNFINALIZED_AND_NONRBW_REPLICA = 
-    "Cannot recover append/close to a replica that's not FINALIZED and not RBW ";
+    "Cannot recover appending to a replica that's not FINALIZED and not RBW ";
   final static String NON_EXISTENT_REPLICA =
     "Cannot append to a non-existent replica ";
   final static String UNEXPECTED_GS_REPLICA =

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=820053&r1=820052&r2=820053&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Tue Sep 29 18:18:14 2009
@@ -46,6 +46,7 @@
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
@@ -153,7 +154,10 @@
 
     sendOut.writeInt(0);           // chunk length
     sendOut.writeInt(0);           // zero checksum
-        
+    
+    // mark the end of block
+    sendOut.writeInt(0);
+    
     //ok finally write a block with 0 len
     SUCCESS.write(recvOut);
     Text.writeString(recvOut, ""); // first bad node
@@ -173,11 +177,6 @@
     if (eofExcepted) {
       ERROR.write(recvOut);
       sendRecvData(description, true);
-    } else if (stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
-      //ok finally write a block with 0 len
-      SUCCESS.write(recvOut);
-      Text.writeString(recvOut, ""); // first bad node
-      sendRecvData(description, false);
     } else {
       writeZeroLengthPacket(block, description);
     }
@@ -209,7 +208,8 @@
       long newGS = firstBlock.getGenerationStamp() + 1;
       testWrite(firstBlock, 
           BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY, 
-          newGS, "Cannot recover data streaming to a finalized replica", true);
+          newGS, "Successful for now", false);
+      firstBlock.setGenerationStamp(newGS);
       // test PIPELINE_SETUP_APPEND on an existing block
       newGS = firstBlock.getGenerationStamp() + 1;
       testWrite(firstBlock, 
@@ -217,21 +217,10 @@
           newGS, "Append to a finalized replica", false);
       firstBlock.setGenerationStamp(newGS);
       // test PIPELINE_SETUP_APPEND_RECOVERY on an existing block
-      file = new Path("dataprotocol1.dat");    
-      DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L);
-      firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
       newGS = firstBlock.getGenerationStamp() + 1;
       testWrite(firstBlock, 
           BlockConstructionStage.PIPELINE_SETUP_APPEND_RECOVERY, newGS,
           "Recover appending to a finalized replica", false);
-      // test PIPELINE_CLOSE_RECOVERY on an existing block
-      file = new Path("dataprotocol2.dat");    
-      DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L);
-      firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
-      newGS = firstBlock.getGenerationStamp() + 1;
-      testWrite(firstBlock, 
-          BlockConstructionStage.PIPELINE_CLOSE_RECOVERY, newGS,
-          "Recover failed close to a finalized replica", false);
       firstBlock.setGenerationStamp(newGS);
 
       /* Test writing to a new block */
@@ -287,19 +276,11 @@
             newGS, "Recover append to a RBW replica", false);
         firstBlock.setGenerationStamp(newGS);
         // test PIPELINE_SETUP_STREAMING_RECOVERY on a RBW block
-        file = new Path("dataprotocol2.dat");    
-        DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L);
-        out = (DFSOutputStream)(fileSys.append(file).
-            getWrappedStream()); 
-        out.write(1);
-        out.hflush();
-        in = fileSys.open(file);
-        firstBlock = DFSTestUtil.getAllBlocks(in).get(0).getBlock();
-        firstBlock.setNumBytes(2L);
         newGS = firstBlock.getGenerationStamp() + 1;
         testWrite(firstBlock, 
             BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY,
             newGS, "Recover a RBW replica", false);
+        firstBlock.setGenerationStamp(newGS);
       } finally {
         IOUtils.closeStream(in);
         IOUtils.closeStream(out);

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java?rev=820053&r1=820052&r2=820053&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java Tue Sep 29 18:18:14 2009
@@ -36,7 +36,7 @@
 
 /** This class implements some of tests posted in HADOOP-2658. */
 public class TestFileAppend3 extends junit.framework.TestCase {
-  static final long BLOCK_SIZE = 64 * 1024;
+  static final long BLOCK_SIZE = 3 * 64 * 1024;
   static final short REPLICATION = 3;
   static final int DATANODE_NUM = 5;
 

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java?rev=820053&r1=820052&r2=820053&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java Tue Sep 29 18:18:14 2009
@@ -48,7 +48,6 @@
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.log4j.Level;
 
 

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=820053&r1=820052&r2=820053&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Tue Sep 29 18:18:14 2009
@@ -484,36 +484,18 @@
   public synchronized ReplicaInPipelineInterface recoverAppend(Block b,
       long newGS, long expectedBlockLen) throws IOException {
     BInfo binfo = blockMap.get(b);
-    if (binfo == null) {
+    if (binfo == null || !binfo.isFinalized()) {
       throw new ReplicaNotFoundException("Block " + b
           + " is not valid, and cannot be appended to.");
     }
     if (binfo.isFinalized()) {
       binfo.unfinalizeBlock();
     }
-    blockMap.remove(b);
     binfo.theBlock.setGenerationStamp(newGS);
-    blockMap.put(binfo.theBlock, binfo);
     return binfo;
   }
 
   @Override
-  public void recoverClose(Block b, long newGS,
-      long expectedBlockLen) throws IOException {
-    BInfo binfo = blockMap.get(b);
-    if (binfo == null) {
-      throw new ReplicaNotFoundException("Block " + b
-          + " is not valid, and cannot be appended to.");
-    }
-    if (!binfo.isFinalized()) {
-      binfo.finalizeBlock(binfo.getNumBytes());
-    }
-    blockMap.remove(b);
-    binfo.theBlock.setGenerationStamp(newGS);
-    blockMap.put(binfo.theBlock, binfo);
-  }
-  
-  @Override
   public synchronized ReplicaInPipelineInterface recoverRbw(Block b,
       long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException {
     BInfo binfo = blockMap.get(b);
@@ -525,9 +507,7 @@
       throw new ReplicaAlreadyExistsException("Block " + b
           + " is valid, and cannot be written to.");
     }
-    blockMap.remove(b);
     binfo.theBlock.setGenerationStamp(newGS);
-    blockMap.put(binfo.theBlock, binfo);
     return binfo;
   }
 

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java?rev=820053&r1=820052&r2=820053&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java Tue Sep 29 18:18:14 2009
@@ -32,11 +32,8 @@
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Sender;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.AccessToken;
@@ -117,12 +114,17 @@
       DataOutputStream out = new DataOutputStream(
           s.getOutputStream());
 
-      Sender.opWriteBlock(out, block.getBlock().getBlockId(), 
-          block.getBlock().getGenerationStamp(), 1, 
-          BlockConstructionStage.PIPELINE_SETUP_CREATE, 
-          0L, 0L, 0L, "", null, new DatanodeInfo[0], 
-          AccessToken.DUMMY_TOKEN);
-
+      out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
+      WRITE_BLOCK.write(out);
+      out.writeLong( block.getBlock().getBlockId());
+      out.writeLong( block.getBlock().getGenerationStamp() );
+      out.writeInt(1);
+      out.writeBoolean( false );       // recovery flag
+      Text.writeString( out, "" );
+      out.writeBoolean(false); // Not sending src node information
+      out.writeInt(0);
+      AccessToken.DUMMY_TOKEN.write(out);
+      
       // write check header
       out.writeByte( 1 );
       out.writeInt( 512 );

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java?rev=820053&r1=820052&r2=820053&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java Tue Sep 29 18:18:14 2009
@@ -41,25 +41,6 @@
   final private static int RUR = 4;
   final private static int NON_EXISTENT = 5;
   
-  // test close
-  @Test
-  public void testClose() throws Exception {
-    MiniDFSCluster cluster = new MiniDFSCluster(new Configuration(), 1, true, null);
-    try {
-      cluster.waitActive();
-      DataNode dn = cluster.getDataNodes().get(0);
-      FSDataset dataSet = (FSDataset)dn.data;
-
-      // set up replicasMap
-      setup(dataSet);
-
-      // test close
-      testClose(dataSet);
-    } finally {
-      cluster.shutdown();
-    }
-  }
-
   // test append
   @Test
   public void testAppend() throws Exception {
@@ -98,7 +79,7 @@
     }
   }
   
-  // test writeToTemporary
+  // test writeToRbw
   @Test
   public void testWriteToTempoary() throws Exception {
     MiniDFSCluster cluster = new MiniDFSCluster(new Configuration(), 1, true, null);
@@ -132,8 +113,9 @@
         blocks[TEMPORARY].getGenerationStamp(), vol, 
         vol.createTmpFile(blocks[TEMPORARY]).getParentFile()));
     
-    replicaInfo = new ReplicaBeingWritten(blocks[RBW], vol, 
-        vol.createRbwFile(blocks[RBW]).getParentFile(), null);
+    replicaInfo = new ReplicaBeingWritten(blocks[RBW].getBlockId(),
+        blocks[RBW].getGenerationStamp(), vol, 
+        vol.createRbwFile(blocks[RBW]).getParentFile());
     replicasMap.add(replicaInfo);
     replicaInfo.getBlockFile().createNewFile();
     replicaInfo.getMetaFile().createNewFile();
@@ -145,10 +127,8 @@
   }
   
   private void testAppend(FSDataset dataSet) throws IOException {
-    long newGS = blocks[FINALIZED].getGenerationStamp()+1;
-    dataSet.append(blocks[FINALIZED], newGS, 
+    dataSet.append(blocks[FINALIZED], blocks[FINALIZED].getGenerationStamp()+1, 
         blocks[FINALIZED].getNumBytes());  // successful
-    blocks[FINALIZED].setGenerationStamp(newGS);
     
     try {
       dataSet.append(blocks[TEMPORARY], blocks[TEMPORARY].getGenerationStamp()+1, 
@@ -197,106 +177,8 @@
       Assert.assertEquals(ReplicaNotFoundException.NON_EXISTENT_REPLICA + 
           blocks[NON_EXISTENT], e.getMessage());
     }
-    
-    newGS = blocks[FINALIZED].getGenerationStamp()+1;
-    dataSet.recoverAppend(blocks[FINALIZED], newGS, 
-        blocks[FINALIZED].getNumBytes());  // successful
-    blocks[FINALIZED].setGenerationStamp(newGS);
-    
-    try {
-      dataSet.recoverAppend(blocks[TEMPORARY], blocks[TEMPORARY].getGenerationStamp()+1, 
-          blocks[TEMPORARY].getNumBytes());
-      Assert.fail("Should not have appended to a temporary replica " 
-          + blocks[TEMPORARY]);
-    } catch (ReplicaNotFoundException e) {
-      Assert.assertTrue(e.getMessage().startsWith(
-          ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
-    }
-
-    newGS = blocks[RBW].getGenerationStamp()+1;
-    dataSet.recoverAppend(blocks[RBW], newGS, blocks[RBW].getNumBytes());
-    blocks[RBW].setGenerationStamp(newGS);
-
-    try {
-      dataSet.recoverAppend(blocks[RWR], blocks[RWR].getGenerationStamp()+1,
-          blocks[RBW].getNumBytes());
-      Assert.fail("Should not have appended to an RWR replica" + blocks[RWR]);
-    } catch (ReplicaNotFoundException e) {
-      Assert.assertTrue(e.getMessage().startsWith(
-          ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
-    }
-
-    try {
-      dataSet.recoverAppend(blocks[RUR], blocks[RUR].getGenerationStamp()+1,
-          blocks[RUR].getNumBytes());
-      Assert.fail("Should not have appended to an RUR replica" + blocks[RUR]);
-    } catch (ReplicaNotFoundException e) {
-      Assert.assertTrue(e.getMessage().startsWith(
-          ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
-    }
-
-    try {
-      dataSet.recoverAppend(blocks[NON_EXISTENT], 
-          blocks[NON_EXISTENT].getGenerationStamp(), 
-          blocks[NON_EXISTENT].getNumBytes());
-      Assert.fail("Should not have appended to a non-existent replica " + 
-          blocks[NON_EXISTENT]);
-    } catch (ReplicaNotFoundException e) {
-      Assert.assertTrue(e.getMessage().startsWith(
-          ReplicaNotFoundException.NON_EXISTENT_REPLICA));
-    }
   }
 
-  private void testClose(FSDataset dataSet) throws IOException {
-    long newGS = blocks[FINALIZED].getGenerationStamp()+1;
-    dataSet.recoverClose(blocks[FINALIZED], newGS, 
-        blocks[FINALIZED].getNumBytes());  // successful
-    blocks[FINALIZED].setGenerationStamp(newGS);
-    
-    try {
-      dataSet.recoverClose(blocks[TEMPORARY], blocks[TEMPORARY].getGenerationStamp()+1, 
-          blocks[TEMPORARY].getNumBytes());
-      Assert.fail("Should not have recovered close a temporary replica " 
-          + blocks[TEMPORARY]);
-    } catch (ReplicaNotFoundException e) {
-      Assert.assertTrue(e.getMessage().startsWith(
-          ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
-    }
-
-    newGS = blocks[RBW].getGenerationStamp()+1;
-    dataSet.recoverClose(blocks[RBW], newGS, blocks[RBW].getNumBytes());
-    blocks[RBW].setGenerationStamp(newGS);
-
-    try {
-      dataSet.recoverClose(blocks[RWR], blocks[RWR].getGenerationStamp()+1,
-          blocks[RBW].getNumBytes());
-      Assert.fail("Should not have recovered close an RWR replica" + blocks[RWR]);
-    } catch (ReplicaNotFoundException e) {
-      Assert.assertTrue(e.getMessage().startsWith(
-          ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
-    }
-
-    try {
-      dataSet.recoverClose(blocks[RUR], blocks[RUR].getGenerationStamp()+1,
-          blocks[RUR].getNumBytes());
-      Assert.fail("Should not have recovered close an RUR replica" + blocks[RUR]);
-    } catch (ReplicaNotFoundException e) {
-      Assert.assertTrue(e.getMessage().startsWith(
-          ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
-    }
-
-    try {
-      dataSet.recoverClose(blocks[NON_EXISTENT], 
-          blocks[NON_EXISTENT].getGenerationStamp(), 
-          blocks[NON_EXISTENT].getNumBytes());
-      Assert.fail("Should not have recovered close a non-existent replica " + 
-          blocks[NON_EXISTENT]);
-    } catch (ReplicaNotFoundException e) {
-      Assert.assertTrue(e.getMessage().startsWith(
-          ReplicaNotFoundException.NON_EXISTENT_REPLICA));
-    }
-  }
-  
   private void testWriteToRbw(FSDataset dataSet) throws IOException {
     try {
       dataSet.recoverRbw(blocks[FINALIZED],