You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2009/09/24 22:51:39 UTC

svn commit: r818622 [1/2] - in /hadoop/hdfs/branches/HDFS-265: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/datanode/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/aop/...

Author: hairong
Date: Thu Sep 24 20:51:38 2009
New Revision: 818622

URL: http://svn.apache.org/viewvc?rev=818622&view=rev
Log:
HDFS-624. Support a new algorithm for pipeline recovery and pipeline setup for append. Contributed by Hairong Kuang. 

Added:
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaAlreadyExistsException.java
      - copied, changed from r818583, hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockAlreadyExistsException.java
Removed:
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockAlreadyExistsException.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockNotFoundException.java
Modified:
    hadoop/hdfs/branches/HDFS-265/CHANGES.txt
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/hdfs/branches/HDFS-265/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java

Modified: hadoop/hdfs/branches/HDFS-265/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/CHANGES.txt?rev=818622&r1=818621&r2=818622&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-265/CHANGES.txt Thu Sep 24 20:51:38 2009
@@ -35,6 +35,9 @@
     HDFS-592. Allow clients to fetch a new generation stamp from NameNode for
     pipeline recovery. (hairong)
 
+    HDFS-624. Support a new algorithm for pipeline recovery and pipeline setup 
+    for append. (hairong)
+
   IMPROVEMENTS
 
     HDFS-509. Redesign DataNode volumeMap to include all types of Replicas.

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=818622&r1=818621&r2=818622&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java Thu Sep 24 20:51:38 2009
@@ -36,8 +36,6 @@
 import java.nio.ByteBuffer;
 import java.util.AbstractMap;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -2425,6 +2423,13 @@
       long getLastByteOffsetBlock() {
         return offsetInBlock + dataPos - dataStart;
       }
+      
+      public String toString() {
+        return "packet seqno:" + this.seqno +
+        " offsetInBlock:" + this.offsetInBlock + 
+        " lastPacketInBlock:" + this.lastPacketInBlock +
+        " lastByteOffsetInBlock: " + this.getLastByteOffsetBlock();
+      }
     }
   
     //
@@ -2436,8 +2441,6 @@
     // if them are received, the DataStreamer closes the current block.
     //
     class DataStreamer extends Daemon {
-      private static final int MAX_RECOVERY_ERROR_COUNT = 5; // try block recovery 5 times
-      private int recoveryErrorCount = 0; // number of times block recovery failed
       private volatile boolean streamerClosed = false;
       private Block block; // its length is number of bytes acked
       private AccessToken accessToken;
@@ -2446,7 +2449,7 @@
       private ResponseProcessor response = null;
       private volatile DatanodeInfo[] nodes = null; // list of targets for current block
       volatile boolean hasError = false;
-      volatile int errorIndex = 0;
+      volatile int errorIndex = -1;
       private BlockConstructionStage stage;  // block construction stage
       private long bytesSent = 0; // number of bytes that've been sent
 
@@ -2511,7 +2514,17 @@
               "of file " + src);
 
         }
-        processDatanodeError(true, true);
+      }
+      
+      /**
+       * Initialize for data streaming
+       */
+      private void initDataStreaming() {
+        this.setName("DataStreamer for file " + src +
+            " block " + block);
+        response = new ResponseProcessor(nodes);
+        response.start();
+        stage = BlockConstructionStage.DATA_STREAMING;
       }
 
       /*
@@ -2533,38 +2546,41 @@
 
           Packet one = null;
 
-          // process IO errors if any
-          boolean doSleep = processDatanodeError(hasError, false);
+          try {
+            // process datanode IO errors if any
+            boolean doSleep = false;
+            if (hasError && errorIndex>=0) {
+              doSleep = processDatanodeError();
+            }
 
-          synchronized (dataQueue) {
-            // wait for a packet to be sent.
-            while ((!streamerClosed && !hasError && clientRunning 
-                && dataQueue.size() == 0) || doSleep) {
-              try {
-                dataQueue.wait(1000);
-              } catch (InterruptedException  e) {
+            synchronized (dataQueue) {
+              // wait for a packet to be sent.
+              while ((!streamerClosed && !hasError && clientRunning 
+                  && dataQueue.size() == 0) || doSleep) {
+                try {
+                  dataQueue.wait(1000);
+                } catch (InterruptedException  e) {
+                }
+                doSleep = false;
               }
-              doSleep = false;
-            }
-            if (streamerClosed || hasError || dataQueue.size() == 0 || !clientRunning) {
-              continue;
+              if (streamerClosed || hasError || dataQueue.size() == 0 || !clientRunning) {
+                continue;
+              }
+              // get packet to be sent.
+              one = dataQueue.getFirst();
             }
-            // get packet to be sent.
-            one = dataQueue.getFirst();
-          }
 
-          try {
             long offsetInBlock = one.offsetInBlock;
 
             // get new block from namenode.
-            if (blockStream == null) {
+            if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
               LOG.debug("Allocating new block");
-              nodes = nextBlockOutputStream(src); 
-              this.setName("DataStreamer for file " + src +
-                  " block " + block);
-              response = new ResponseProcessor(nodes);
-              response.start();
-              stage = BlockConstructionStage.DATA_STREAMING;
+              nodes = nextBlockOutputStream(src);
+              initDataStreaming();
+            } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
+              LOG.debug("Append to block " + block);
+              setupPipelineForAppendOrRecovery();
+              initDataStreaming();
             }
 
             if (offsetInBlock >= blockSize) {
@@ -2584,6 +2600,15 @@
               dataQueue.notifyAll();
             }
 
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("DataStreamer block " + block +
+                  " sending packet seqno:" + one.seqno +
+                  " size:" + buf.remaining() +
+                  " offsetInBlock:" + one.offsetInBlock + 
+                  " lastPacketInBlock:" + one.lastPacketInBlock +
+                  " lastByteOffsetInBlock" + one.getLastByteOffsetBlock());
+            }
+
             // write out data to remote datanode
             blockStream.write(buf.array(), buf.position(), buf.remaining());
             blockStream.flush();
@@ -2610,13 +2635,6 @@
                 blockStream.flush();
               }
             }
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("DataStreamer block " + block +
-                  " wrote packet seqno:" + one.seqno +
-                  " size:" + buf.remaining() +
-                  " offsetInBlock:" + one.offsetInBlock + 
-                  " lastPacketInBlock:" + one.lastPacketInBlock);
-            }
           } catch (Throwable e) {
             LOG.warn("DataStreamer Exception: " + 
                 StringUtils.stringifyException(e));
@@ -2624,6 +2642,9 @@
               setLastException((IOException)e);
             }
             hasError = true;
+            if (errorIndex == -1) { // not a datanode error
+              streamerClosed = true;
+            }
           }
 
 
@@ -2751,10 +2772,20 @@
               }
 
               // processes response status from all datanodes.
+              String replies = null;
+              if (LOG.isDebugEnabled()) {
+                replies = "DFSClient Replies for seqno " + seqno + " are";
+              }
               for (int i = 0; i < targets.length && clientRunning; i++) {
                 final DataTransferProtocol.Status reply
                     = DataTransferProtocol.Status.read(blockReplyStream);
+                if (LOG.isDebugEnabled()) {
+                  replies += " " + reply;
+                }
                 if (reply != SUCCESS) {
+                  if (LOG.isDebugEnabled()) {
+                    LOG.debug(replies);
+                  }
                   errorIndex = i; // first bad datanode
                   throw new IOException("Bad response " + reply +
                       " for block " + block +
@@ -2763,11 +2794,18 @@
                 }
               }
 
-              if (one != null) {
-                // update bytesAcked
-                block.setNumBytes(one.getLastByteOffsetBlock());
+              if (LOG.isDebugEnabled()) {
+                LOG.debug(replies);
               }
               
+              if (one == null) {
+                throw new IOException("Panic: responder did not receive " +
+                    "an ack for a packet: " + seqno);
+              }
+              
+              // update bytesAcked
+              block.setNumBytes(one.getLastByteOffsetBlock());
+
               synchronized (dataQueue) {
                 ackQueue.removeFirst();
                 dataQueue.notifyAll();
@@ -2778,6 +2816,7 @@
                   setLastException((IOException)e);
                 }
                 hasError = true;
+                errorIndex = errorIndex==-1 ? 0 : errorIndex;
                 synchronized (dataQueue) {
                   dataQueue.notifyAll();
                 }
@@ -2800,21 +2839,12 @@
       // threads and mark stream as closed. Returns true if we should
       // sleep for a while after returning from this call.
       //
-      private boolean processDatanodeError(boolean error, boolean isAppend) {
-        if (!error) {
-          return false;
-        }
+      private boolean processDatanodeError() throws IOException {
         if (response != null) {
           LOG.info("Error Recovery for block " + block +
           " waiting for responder to exit. ");
           return true;
         }
-        if (errorIndex >= 0) {
-          LOG.warn("Error Recovery for block " + block
-              + " bad datanode[" + errorIndex + "] "
-              + (nodes == null? "nodes == null": nodes[errorIndex].getName()));
-        }
-
         closeStream();
 
         // move packets from ack queue to front of the data queue
@@ -2823,31 +2853,49 @@
           ackQueue.clear();
         }
 
+        boolean doSleep = setupPipelineForAppendOrRecovery();
+        
+        if (!streamerClosed && clientRunning) {
+          initDataStreaming();
+        }
+        
+        return doSleep;
+      }
+
+
+      /**
+       * Open a DataOutputStream to a DataNode pipeline so that 
+       * it can be written to.
+       * This happens when a file is appended or data streaming fails
+       * It keeps on trying until a pipeline is setup
+       */
+      private boolean setupPipelineForAppendOrRecovery() throws IOException {
+        // check number of datanodes
+        if (nodes == null || nodes.length == 0) {
+          String msg = "Could not get block locations. " + "Source file \""
+              + src + "\" - Aborting...";
+          LOG.warn(msg);
+          setLastException(new IOException(msg));
+          streamerClosed = true;
+          return false;
+        }
+        
         boolean success = false;
+        long newGS = 0L;
         while (!success && !streamerClosed && clientRunning) {
-          DatanodeInfo[] newnodes = null;
-          if (nodes == null) {
-            String msg = "Could not get block locations. " + "Source file \""
-                + src + "\" - Aborting...";
-            LOG.warn(msg);
-            setLastException(new IOException(msg));
-            streamerClosed = true;
-            return false;
-          }
-          StringBuilder pipelineMsg = new StringBuilder();
-          for (int j = 0; j < nodes.length; j++) {
-            pipelineMsg.append(nodes[j].getName());
-            if (j < nodes.length - 1) {
-              pipelineMsg.append(", ");
-            }
-          }
+          boolean isRecovery = hasError;
           // remove bad datanode from list of datanodes.
           // If errorIndex was not set (i.e. appends), then do not remove 
           // any datanodes
           // 
-          if (errorIndex < 0) {
-            newnodes = nodes;
-          } else {
+          if (errorIndex >= 0) {
+            StringBuilder pipelineMsg = new StringBuilder();
+            for (int j = 0; j < nodes.length; j++) {
+              pipelineMsg.append(nodes[j].getName());
+              if (j < nodes.length - 1) {
+                pipelineMsg.append(", ");
+              }
+            }
             if (nodes.length <= 1) {
               lastException = new IOException("All datanodes " + pipelineMsg
                   + " are bad. Aborting...");
@@ -2857,86 +2905,32 @@
             LOG.warn("Error Recovery for block " + block +
                 " in pipeline " + pipelineMsg + 
                 ": bad datanode " + nodes[errorIndex].getName());
-            newnodes =  new DatanodeInfo[nodes.length-1];
+            DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
             System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
             System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
                 newnodes.length-errorIndex);
+            nodes = newnodes;
+            this.hasError = false;
+            lastException = null;
+            errorIndex = -1;
           }
 
-          // Tell the primary datanode to do error recovery 
-          // by stamping appropriate generation stamps.
-          //
-          LocatedBlock newBlock = null;
-          ClientDatanodeProtocol primary =  null;
-          DatanodeInfo primaryNode = null;
-          try {
-            // Pick the "least" datanode as the primary datanode to avoid deadlock.
-            primaryNode = Collections.min(Arrays.asList(newnodes));
-            primary = createClientDatanodeProtocolProxy(primaryNode, conf);
-            newBlock = primary.recoverBlock(block, isAppend, newnodes);
-          } catch (IOException e) {
-            recoveryErrorCount++;
-            if (recoveryErrorCount > MAX_RECOVERY_ERROR_COUNT) {
-              if (nodes.length > 1) {
-                // if the primary datanode failed, remove it from the list.
-                // The original bad datanode is left in the list because it is
-                // conservative to remove only one datanode in one iteration.
-                for (int j = 0; j < nodes.length; j++) {
-                  if (nodes[j].equals(primaryNode)) {
-                    errorIndex = j; // forget original bad node.
-                  }
-                }
-                // remove primary node from list
-                newnodes =  new DatanodeInfo[nodes.length-1];
-                System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
-                System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
-                    newnodes.length-errorIndex);
-                nodes = newnodes;
-                LOG.warn("Error Recovery for block " + block + " failed "
-                    + " because recovery from primary datanode " + primaryNode
-                    + " failed " + recoveryErrorCount + " times. "
-                    + " Pipeline was " + pipelineMsg
-                    + ". Marking primary datanode as bad.");
-                recoveryErrorCount = 0; 
-                errorIndex = -1;
-                return true;          // sleep when we return from here
-              }
-              String emsg = "Error Recovery for block " + block + " failed "
-                  + " because recovery from primary datanode " + primaryNode
-                  + " failed " + recoveryErrorCount + " times. "
-                  + " Pipeline was " + pipelineMsg + ". Aborting...";
-              LOG.warn(emsg);
-              lastException = new IOException(emsg);
-              streamerClosed = true;
-              return false;       // abort with IOexception
-            } 
-            LOG.warn("Error Recovery for block " + block + " failed "
-                + " because recovery from primary datanode " + primaryNode
-                + " failed " + recoveryErrorCount + " times. "
-                + " Pipeline was " + pipelineMsg + ". Will retry...");
-            return true;          // sleep when we return from here
-          } finally {
-            RPC.stopProxy(primary);
-          }
-          recoveryErrorCount = 0; // block recovery successful
-
-          // If the block recovery generated a new generation stamp, use that
-          // from now on.  Also, setup new pipeline
-          // newBlock should never be null and it should contain a newly 
-          // generated access token.
-          block = newBlock.getBlock();
-          accessToken = newBlock.getAccessToken();
-          nodes = newBlock.getLocations();
-
-          this.hasError = false;
-          lastException = null;
-          errorIndex = 0;
-          success = createBlockOutputStream(nodes, clientName, true);
+          // get a new generation stamp and an access token
+          LocatedBlock lb = namenode.updateBlockForPipeline(block, clientName);
+          newGS = lb.getBlock().getGenerationStamp();
+          accessToken = lb.getAccessToken();
+          
+          // set up the pipeline again with the remaining nodes
+          success = createBlockOutputStream(nodes, newGS, isRecovery);
         }
 
-        if (!streamerClosed && clientRunning) {
-          response = new ResponseProcessor(nodes);
-          response.start();
+        if (success) {
+          // update pipeline at the namenode
+          Block newBlock = new Block(
+              block.getBlockId(), block.getNumBytes(), newGS);
+          namenode.updatePipeline(clientName, block, newBlock, nodes);
+          // update client side generation stamp
+          block = newBlock;
         }
         return false; // do not sleep, continue processing
       }
@@ -2956,7 +2950,7 @@
         do {
           hasError = false;
           lastException = null;
-          errorIndex = 0;
+          errorIndex = -1;
           retry = false;
           success = false;
 
@@ -2970,7 +2964,7 @@
           //
           // Connect to first DataNode in the list.
           //
-          success = createBlockOutputStream(nodes, clientName, false);
+          success = createBlockOutputStream(nodes, 0L, false);
 
           if (!success) {
             LOG.info("Abandoning block " + block);
@@ -2998,7 +2992,7 @@
       // connects to the first datanode in the pipeline
       // Returns true if success, otherwise return failure.
       //
-      private boolean createBlockOutputStream(DatanodeInfo[] nodes, String client,
+      private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
           boolean recoveryFlag) {
         DataTransferProtocol.Status pipelineStatus = SUCCESS;
         String firstBadLink = "";
@@ -3033,11 +3027,11 @@
               DataNode.SMALL_BUFFER_SIZE));
           blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
 
-          // send the request: newGS now uses a dummy value 0 for now
+          // send the request
           DataTransferProtocol.Sender.opWriteBlock(out,
               block.getBlockId(), block.getGenerationStamp(),
-              nodes.length, recoveryFlag?stage.getRecoveryStage():stage, 0,
-              block.getNumBytes(), bytesSent, client, null, nodes, accessToken);
+              nodes.length, recoveryFlag?stage.getRecoveryStage():stage, newGS,
+              block.getNumBytes(), bytesSent, clientName, null, nodes, accessToken);
           checksum.writeHeader(out);
           out.flush();
 
@@ -3070,6 +3064,8 @@
                 break;
               }
             }
+          } else {
+            errorIndex = 0;
           }
           hasError = true;
           setLastException(ie);

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=818622&r1=818621&r2=818622&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Thu Sep 24 20:51:38 2009
@@ -44,8 +44,9 @@
    * Compared to the previous version the following changes have been introduced:
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
-   * 49: added a new method getNewStampForPipeline(Block, String) 
-   * to support pipeline recovery
+   * 49: added two new methods to support pipeline recovery and append
+   *     updateBlockForPipeline(Block, String) and
+   *     updatePipeline(String, Block, Block, DatanodeID[])
    */
   public static final long versionID = 49L;
   
@@ -524,6 +525,19 @@
    * @return a located block with a new generation stamp and an access token
    * @throws IOException if any error occurs
    */
-  public LocatedBlock getNewStampForPipeline(Block block, String clientName) 
+  public LocatedBlock updateBlockForPipeline(Block block, String clientName) 
+  throws IOException;
+
+  /**
+   * Update a pipeline for a block under construction
+   * 
+   * @param clientName the name of the client
+   * @param oldBlock the old block
+   * @param newBlock the new block containing new generation stamp and length
+   * @param newNodes datanodes in the pipeline
+   * @throws IOException if any error occurs
+   */
+  public void updatePipeline(String clientName, Block oldBlock, 
+      Block newBlock, DatanodeID[] newNodes)
   throws IOException;
 }

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=818622&r1=818621&r2=818622&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Thu Sep 24 20:51:38 2009
@@ -56,7 +56,6 @@
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
   
   private Block block; // the block to receive
-  protected boolean finalized;
   private DataInputStream in = null; // from where data are read
   private DataChecksum checksum; // from where chunks of a block can be read
   private OutputStream out = null; // to block file at local disk
@@ -73,7 +72,6 @@
   private Daemon responder = null;
   private BlockTransferThrottler throttler;
   private FSDataset.BlockWriteStreams streams;
-  private boolean isRecovery = false;
   private String clientName;
   DatanodeInfo srcDataNode = null;
   private Checksum partialCrc = null;
@@ -93,34 +91,41 @@
       this.clientName = clientName;
       this.srcDataNode = srcDataNode;
       this.datanode = datanode;
-      this.finalized = datanode.data.isValidBlock(block);
       //
       // Open local disk out
       //
       if (clientName.length() == 0) { //replication or move
-        replicaInfo = datanode.data.writeToTemporary(block);
+        replicaInfo = datanode.data.createTemporary(block);
       } else {
         switch (stage) {
         case PIPELINE_SETUP_CREATE:
-          isRecovery = false;
-          replicaInfo = datanode.data.writeToRbw(block, isRecovery);
+          replicaInfo = datanode.data.createRbw(block);
           break;
         case PIPELINE_SETUP_STREAMING_RECOVERY:
-          isRecovery = true;
-          if (finalized) {
-            replicaInfo = datanode.data.append(block);
-            finalized = false;
+          if (datanode.data.isValidBlock(block)) {
+            // pipeline failed after the replica is finalized. This will be 
+            // handled differently when pipeline close/recovery is introduced
+            replicaInfo = datanode.data.append(block, newGs, maxBytesRcvd);
           } else {
-            replicaInfo = datanode.data.writeToRbw(block, isRecovery);
+            replicaInfo = datanode.data.recoverRbw(
+                block, newGs, minBytesRcvd, maxBytesRcvd);
           }
+          block.setGenerationStamp(newGs);
           break;
         case PIPELINE_SETUP_APPEND:
+          replicaInfo = datanode.data.append(block, newGs, minBytesRcvd);
+          if (datanode.blockScanner != null) { // remove from block scanner
+            datanode.blockScanner.deleteBlock(block);
+          }
+          block.setGenerationStamp(newGs);
+          break;
         case PIPELINE_SETUP_APPEND_RECOVERY:
-          isRecovery = true;
-          replicaInfo = datanode.data.append(block);
-          finalized = false;
+          replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
+          if (datanode.blockScanner != null) { // remove from block scanner
+            datanode.blockScanner.deleteBlock(block);
+          }
+          block.setGenerationStamp(newGs);
           break;
-        case PIPELINE_CLOSE_RECOVERY:
         default: throw new IOException("Unsupported stage " + stage + 
               " while receiving block " + block + " from " + inAddr);
         }
@@ -131,20 +136,24 @@
         this.checksumOut = new DataOutputStream(new BufferedOutputStream(
                                                   streams.checksumOut, 
                                                   SMALL_BUFFER_SIZE));
-        // If this block is for appends, then remove it from periodic
-        // validation.
-        if (datanode.blockScanner != null && isRecovery) {
-          datanode.blockScanner.deleteBlock(block);
-        }
         
         // read checksum meta information
         this.checksum = DataChecksum.newDataChecksum(in);
         this.bytesPerChecksum = checksum.getBytesPerChecksum();
         this.checksumSize = checksum.getChecksumSize();
+        
+        // write data chunk header if creating a new replica
+        if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE 
+            || clientName.length() == 0) {
+          BlockMetadataHeader.writeHeader(checksumOut, checksum);
+        } else {
+          datanode.data.setChannelPosition(block, streams, 0, 
+              BlockMetadataHeader.getHeaderSize());
+        }
       }
-    } catch (BlockAlreadyExistsException bae) {
+    } catch (ReplicaAlreadyExistsException bae) {
       throw bae;
-    } catch (BlockNotFoundException bne) {
+    } catch (ReplicaNotFoundException bne) {
       throw bne;
     } catch(IOException ioe) {
       IOUtils.closeStream(this);
@@ -494,7 +503,7 @@
       }
 
       try {
-        if (!finalized  && replicaInfo.getBytesOnDisk()<offsetInBlock) {
+        if (replicaInfo.getBytesOnDisk()<offsetInBlock) {
           //finally write to the disk :
           setBlockPosition(offsetInBlock-len);
           
@@ -561,10 +570,6 @@
       throttler = throttlerArg;
 
     try {
-      // write data chunk header
-      if (!finalized) {
-        BlockMetadataHeader.writeHeader(checksumOut, checksum);
-      }
       if (clientName.length() > 0) {
         responder = new Daemon(datanode.threadGroup, 
                                new PacketResponder(this, block, mirrIn, 
@@ -644,21 +649,6 @@
    * Sets the file pointer in the local block file to the specified value.
    */
   private void setBlockPosition(long offsetInBlock) throws IOException {
-    if (finalized) {
-      if (!isRecovery) {
-        throw new IOException("Write to offset " + offsetInBlock +
-                              " of block " + block +
-                              " that is already finalized.");
-      }
-      if (offsetInBlock > datanode.data.getLength(block)) {
-        throw new IOException("Write to offset " + offsetInBlock +
-                              " of block " + block +
-                              " that is already finalized and is of size " +
-                              datanode.data.getLength(block));
-      }
-      return;
-    }
-
     if (datanode.data.getChannelPosition(block, streams) == offsetInBlock) {
       return;                   // nothing to do 
     }
@@ -852,26 +842,24 @@
             // If this is the last packet in block, then close block
             // file and finalize the block before responding success
             if (pkt.lastPacketInBlock) {
-              if (!receiver.finalized) {
-                receiver.close();
-                final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
-                block.setNumBytes(replicaInfo.getNumBytes());
-                datanode.data.finalizeBlock(block);
-                datanode.myMetrics.blocksWritten.inc();
-                datanode.notifyNamenodeReceivedBlock(block, 
-                    DataNode.EMPTY_DEL_HINT);
-                if (ClientTraceLog.isInfoEnabled() &&
-                    receiver.clientName.length() > 0) {
-                  long offset = 0;
-                  ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
-                        receiver.inAddr, receiver.myAddr, block.getNumBytes(),
-                        "HDFS_WRITE", receiver.clientName, offset,
-                        datanode.dnRegistration.getStorageID(), block, endTime-startTime));
-                } else {
-                  LOG.info("Received block " + block + 
-                           " of size " + block.getNumBytes() + 
-                           " from " + receiver.inAddr);
-                }
+              receiver.close();
+              final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
+              block.setNumBytes(replicaInfo.getNumBytes());
+              datanode.data.finalizeBlock(block);
+              datanode.myMetrics.blocksWritten.inc();
+              datanode.notifyNamenodeReceivedBlock(block, 
+                  DataNode.EMPTY_DEL_HINT);
+              if (ClientTraceLog.isInfoEnabled() &&
+                  receiver.clientName.length() > 0) {
+                long offset = 0;
+                ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
+                    receiver.inAddr, receiver.myAddr, block.getNumBytes(),
+                    "HDFS_WRITE", receiver.clientName, offset,
+                    datanode.dnRegistration.getStorageID(), block, endTime-startTime));
+              } else {
+                LOG.info("Received block " + block + 
+                    " of size " + block.getNumBytes() + 
+                    " from " + receiver.inAddr);
               }
               lastPacket = true;
             }
@@ -879,6 +867,9 @@
             replyOut.writeLong(expected);
             SUCCESS.write(replyOut);
             replyOut.flush();
+            if (pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
+              replicaInfo.setBytesAcked(pkt.lastByteInBlock);
+            }
         } catch (Exception e) {
           LOG.warn("IOException in BlockReceiver.lastNodeRun: ", e);
           if (running) {
@@ -911,6 +902,7 @@
       }
 
       boolean lastPacketInBlock = false;
+      Packet pkt = null;
       final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
       while (running && datanode.shouldRun && !lastPacketInBlock) {
 
@@ -933,7 +925,6 @@
               } else {
                 LOG.debug("PacketResponder " + numTargets + " got seqno = " + 
                     seqno);
-                Packet pkt = null;
                 synchronized (this) {
                   while (running && datanode.shouldRun && ackQueue.size() == 0) {
                     if (LOG.isDebugEnabled()) {
@@ -951,9 +942,6 @@
                   }
                   pkt = ackQueue.removeFirst();
                   expected = pkt.seqno;
-                  if (pkt.lastByteInBlock > replicaInfo.getBytesAcked()) {
-                    replicaInfo.setBytesAcked(pkt.lastByteInBlock);
-                  }
                   notifyAll();
                   LOG.debug("PacketResponder " + numTargets + " seqno = " + seqno);
                   if (seqno != expected) {
@@ -994,7 +982,7 @@
             
             // If this is the last packet in block, then close block
             // file and finalize the block before responding success
-            if (lastPacketInBlock && !receiver.finalized) {
+            if (lastPacketInBlock) {
               receiver.close();
               final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
               block.setNumBytes(replicaInfo.getNumBytes());
@@ -1025,12 +1013,14 @@
                       " responded my status " +
                       " for seqno " + expected);
 
+            boolean success = true;
             // forward responses from downstream datanodes.
             for (int i = 0; i < numTargets && datanode.shouldRun; i++) {
               try {
                 if (op == SUCCESS) {
                   op = Status.read(mirrorIn);
                   if (op != SUCCESS) {
+                    success = false;
                     LOG.debug("PacketResponder for block " + block +
                               ": error code received from downstream " +
                               " datanode[" + i + "] " + op);
@@ -1038,6 +1028,7 @@
                 }
               } catch (Throwable e) {
                 op = ERROR;
+                success = false;
               }
               op.write(replyOut);
             }
@@ -1045,6 +1036,10 @@
             LOG.debug("PacketResponder " + block + " " + numTargets + 
                       " responded other status " + " for seqno " + expected);
 
+            if (pkt != null && success && 
+                pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
+              replicaInfo.setBytesAcked(pkt.lastByteInBlock);
+            }
             // If we were unable to read the seqno from downstream, then stop.
             if (expected == -2) {
               running = false;

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=818622&r1=818621&r2=818622&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Thu Sep 24 20:51:38 2009
@@ -1094,36 +1094,61 @@
   }
 
   @Override  // FSDatasetInterface
-  public ReplicaInPipelineInterface append(Block b)
-      throws IOException {
+  public synchronized ReplicaInPipelineInterface append(Block b,
+      long newGS, long expectedBlockLen) throws IOException {
     // If the block was successfully finalized because all packets
     // were successfully processed at the Datanode but the ack for
     // some of the packets were not received by the client. The client 
     // re-opens the connection and retries sending those packets.
     // The other reason is that an "append" is occurring to this block.
     
-    ReplicaInfo replicaInfo = volumeMap.get(b);
     // check the validity of the parameter
+    if (newGS < b.getGenerationStamp()) {
+      throw new IOException("The new generation stamp " + newGS + 
+          " should be greater than the replica " + b + "'s generation stamp");
+    }
+    ReplicaInfo replicaInfo = volumeMap.get(b);
     if (replicaInfo == null) {
-      throw new BlockNotFoundException(
-          BlockNotFoundException.NON_EXISTENT_REPLICA + b);
+      throw new ReplicaNotFoundException(
+          ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
     }  
+    DataNode.LOG.info("Appending to replica " + replicaInfo);
     if (replicaInfo.getState() != ReplicaState.FINALIZED) {
-      throw new BlockNotFoundException(
-          BlockNotFoundException.UNFINALIZED_REPLICA + b);
+      throw new ReplicaNotFoundException(
+          ReplicaNotFoundException.UNFINALIZED_REPLICA + b);
+    }
+    if (replicaInfo.getNumBytes() != expectedBlockLen) {
+      throw new IOException("Corrupted replica " + replicaInfo + 
+          " with a length of " + replicaInfo.getNumBytes() + 
+          " expected length is " + expectedBlockLen);
     }
-    
-    DataNode.LOG.info("Reopen Block for append " + b);
 
+    return append((FinalizedReplica)replicaInfo, newGS, b.getNumBytes());
+  }
+  
+  /** Append to a finalized replica
+   * Change a finalized replica to be a RBW replica and 
+   * bump its generation stamp to be the newGS
+   * 
+   * @param replicaInfo a finalized replica
+   * @param newGS new generation stamp
+   * @param estimateBlockLen estimate generation stamp
+   * @return a RBW replica
+   * @throws IOException if moving the replica from finalized directory 
+   *         to rbw directory fails
+   */
+  private synchronized ReplicaBeingWritten append(FinalizedReplica replicaInfo, 
+      long newGS, long estimateBlockLen) throws IOException {
     // unlink the finalized replica
     replicaInfo.detachBlock(1);
     
-    // construct a RBW replica
+    // construct a RBW replica with the new GS
     File blkfile = replicaInfo.getBlockFile();
-    FSVolume v = volumes.getNextVolume(b.getNumBytes());
-    File newBlkFile = v.createRbwFile(b);
+    FSVolume v = volumes.getNextVolume(estimateBlockLen);
+    File newBlkFile = v.createRbwFile(replicaInfo);
     File oldmeta = replicaInfo.getMetaFile();
-    ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(replicaInfo,
+    ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(
+        replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS,
         v, newBlkFile.getParentFile(), Thread.currentThread());
     File newmeta = newReplicaInfo.getMetaFile();
 
@@ -1132,7 +1157,7 @@
       DataNode.LOG.debug("Renaming " + oldmeta + " to " + newmeta);
     }
     if (!oldmeta.renameTo(newmeta)) {
-      throw new IOException("Block " + b + " reopen failed. " +
+      throw new IOException("Block " + replicaInfo + " reopen failed. " +
                             " Unable to move meta file  " + oldmeta +
                             " to rbw dir " + newmeta);
     }
@@ -1140,13 +1165,14 @@
     // rename block file to rbw directory
     if (DataNode.LOG.isDebugEnabled()) {
       DataNode.LOG.debug("Renaming " + blkfile + " to " + newBlkFile);
+      DataNode.LOG.debug("Old block file length is " + blkfile.length());
     }
     if (!blkfile.renameTo(newBlkFile)) {
       if (!newmeta.renameTo(oldmeta)) {  // restore the meta file
         DataNode.LOG.warn("Cannot move meta file " + newmeta + 
             "back to the finalized directory " + oldmeta);
       }
-      throw new IOException("Block " + b + " reopen failed. " +
+      throw new IOException("Block " + replicaInfo + " reopen failed. " +
                               " Unable to move block file " + blkfile +
                               " to rbw dir " + newBlkFile);
     }
@@ -1157,47 +1183,165 @@
     return newReplicaInfo;
   }
 
-  @Override
-  public ReplicaInPipelineInterface writeToRbw(Block b, boolean isRecovery)
-      throws IOException {
-    ReplicaInfo replicaInfo = volumeMap.get(b);
-    ReplicaBeingWritten newReplicaInfo;
-    if (replicaInfo == null) { // create a new block
-      FSVolume v = volumes.getNextVolume(b.getNumBytes());
-      // create a rbw file to hold block in the designated volume
-      File f = v.createRbwFile(b);
-      newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), 
-          b.getGenerationStamp(), v, f.getParentFile());
-      volumeMap.add(newReplicaInfo);
-    } else {
-      if (!isRecovery) {
-        throw new BlockAlreadyExistsException("Block " + b +
-        " already exists in state " + replicaInfo.getState() +
-        " and thus cannot be created.");
-      }
-      if (replicaInfo.getState() != ReplicaState.RBW) {
-        throw new BlockNotFoundException(
-            BlockNotFoundException.NON_RBW_REPLICA + b);
-      }
-      newReplicaInfo = (ReplicaBeingWritten)replicaInfo;
-      synchronized (this) {
-        //
-        // Is it already in the write process?
-        //
-        newReplicaInfo.stopWriter();
-        newReplicaInfo.setWriter(Thread.currentThread());
+  @Override  // FSDatasetInterface
+  public synchronized ReplicaInPipelineInterface recoverAppend(Block b,
+      long newGS, long expectedBlockLen) throws IOException {
+    DataNode.LOG.info("Recover failed append to " + b);
+
+    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
+    if (replicaInfo == null) {
+      throw new ReplicaNotFoundException(
+          ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
+    }
+    
+    // check generation stamp
+    long replicaGenerationStamp = replicaInfo.getGenerationStamp();
+    if (replicaGenerationStamp < b.getGenerationStamp() ||
+        replicaGenerationStamp > newGS) {
+      throw new ReplicaNotFoundException(
+          ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + replicaGenerationStamp
+          + ". Expected GS range is [" + b.getGenerationStamp() + ", " + 
+          newGS + "].");
+    }
+    
+    // stop the previous writer before check a replica's length
+    long replicaLen = replicaInfo.getNumBytes();
+    if (replicaInfo.getState() == ReplicaState.RBW) {
+      ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;
+      // kill the previous writer
+      rbw.stopWriter();
+      rbw.setWriter(Thread.currentThread());
+      // check length: bytesRcvd, bytesOnDisk, and bytesAcked should be the same
+      if (replicaLen != rbw.getBytesOnDisk() 
+          || replicaLen != rbw.getBytesAcked()) {
+        throw new ReplicaAlreadyExistsException("RBW replica " + replicaInfo + 
+            "bytesRcvd(" + rbw.getNumBytes() + "), bytesOnDisk(" + 
+            rbw.getBytesOnDisk() + "), and bytesAcked(" + rbw.getBytesAcked() +
+            ") are not the same.");
       }
     }
+    
+    // check block length
+    if (replicaLen != expectedBlockLen) {
+      throw new IOException("Corrupted replica " + replicaInfo + 
+          " with a length of " + replicaLen + 
+          " expected length is " + expectedBlockLen);
+    }
+
+    // change the replica's state/gs etc.
+    switch (replicaInfo.getState()) {
+    case FINALIZED:
+      return append((FinalizedReplica)replicaInfo, newGS, b.getNumBytes());
+    case RBW:
+      bumpReplicaGS(replicaInfo, newGS);
+      return (ReplicaBeingWritten)replicaInfo;
+    default:
+      throw new ReplicaNotFoundException(
+          ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA + replicaInfo);
+    }
+  }
+
+  /**
+   * Bump a replica's generation stamp to a new one.
+   * Its on-disk meta file name is renamed to be the new one too.
+   * 
+   * @param replicaInfo a replica
+   * @param newGS new generation stamp
+   * @throws IOException if rename fails
+   */
+  private void bumpReplicaGS(ReplicaInfo replicaInfo, 
+      long newGS) throws IOException { 
+    long oldGS = replicaInfo.getGenerationStamp();
+    File oldmeta = replicaInfo.getMetaFile();
+    replicaInfo.setGenerationStamp(newGS);
+    File newmeta = replicaInfo.getMetaFile();
+
+    // rename meta file to new GS
+    if (DataNode.LOG.isDebugEnabled()) {
+      DataNode.LOG.debug("Renaming " + oldmeta + " to " + newmeta);
+    }
+    if (!oldmeta.renameTo(newmeta)) {
+      replicaInfo.setGenerationStamp(oldGS); // restore old GS
+      throw new IOException("Block " + (Block)replicaInfo + " reopen failed. " +
+                            " Unable to move meta file  " + oldmeta +
+                            " to " + newmeta);
+    }
+  }
 
+  @Override
+  public synchronized ReplicaInPipelineInterface createRbw(Block b)
+      throws IOException {
+    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
+    if (replicaInfo != null) {
+      throw new ReplicaAlreadyExistsException("Block " + b +
+      " already exists in state " + replicaInfo.getState() +
+      " and thus cannot be created.");
+    }
+    // create a new block
+    FSVolume v = volumes.getNextVolume(b.getNumBytes());
+    // create a rbw file to hold block in the designated volume
+    File f = v.createRbwFile(b);
+    ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), 
+        b.getGenerationStamp(), v, f.getParentFile());
+    volumeMap.add(newReplicaInfo);
     return newReplicaInfo;
   }
   
   @Override
-  public ReplicaInPipelineInterface writeToTemporary(Block b)
+  public synchronized ReplicaInPipelineInterface recoverRbw(Block b,
+      long newGS, long minBytesRcvd, long maxBytesRcvd)
       throws IOException {
-    ReplicaInfo replicaInfo = volumeMap.get(b);
+    DataNode.LOG.info("Recover the RBW replica " + b);
+
+    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
+    if (replicaInfo == null) {
+      throw new ReplicaNotFoundException(
+          ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
+    }
+    
+    // check the replica's state
+    if (replicaInfo.getState() != ReplicaState.RBW) {
+      throw new ReplicaNotFoundException(
+          ReplicaNotFoundException.NON_RBW_REPLICA + replicaInfo);
+    }
+    ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;
+    
+    DataNode.LOG.info("Recovering replica " + rbw);
+
+    // Stop the previous writer
+    rbw.stopWriter();
+    rbw.setWriter(Thread.currentThread());
+
+    // check generation stamp
+    long replicaGenerationStamp = rbw.getGenerationStamp();
+    if (replicaGenerationStamp < b.getGenerationStamp() ||
+        replicaGenerationStamp > newGS) {
+      throw new ReplicaNotFoundException(
+          ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + b +
+          ". Expected GS range is [" + b.getGenerationStamp() + ", " + 
+          newGS + "].");
+    }
+    
+    // check replica length
+    if (rbw.getBytesAcked() < minBytesRcvd || rbw.getNumBytes() > maxBytesRcvd){
+      throw new ReplicaNotFoundException("Unmatched length replica " + 
+          replicaInfo + ": BytesAcked = " + rbw.getBytesAcked() + 
+          " BytesRcvd = " + rbw.getNumBytes() + " are not in the range of [" + 
+          minBytesRcvd + ", " + maxBytesRcvd + "].");
+    }
+
+    // bump the replica's generation stamp to newGS
+    bumpReplicaGS(rbw, newGS);
+    
+    return rbw;
+  }
+  
+  @Override
+  public synchronized ReplicaInPipelineInterface createTemporary(Block b)
+      throws IOException {
+    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
     if (replicaInfo != null) {
-      throw new BlockAlreadyExistsException("Block " + b +
+      throw new ReplicaAlreadyExistsException("Block " + b +
           " already exists in state " + replicaInfo.getState() +
           " and thus cannot be created.");
     }

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=818622&r1=818621&r2=818622&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Thu Sep 24 20:51:38 2009
@@ -180,35 +180,64 @@
   }
     
   /**
-   * Creates a temporary replica and returns output streams to write data and CRC
+   * Creates a temporary replica and returns the meta information of the replica
    * 
    * @param b block
-   * @return the meata info of the replica which is being written to
+   * @return the meta info of the replica which is being written to
    * @throws IOException if an error occurs
    */
-  public ReplicaInPipelineInterface writeToTemporary(Block b) throws IOException;
+  public ReplicaInPipelineInterface createTemporary(Block b)
+  throws IOException;
 
   /**
-   * Creates/recovers a RBW replica and returns output streams to 
-   * write data and CRC
+   * Creates a RBW replica and returns the meta info of the replica
    * 
    * @param b block
-   * @param isRecovery True if this is part of error recovery, otherwise false
-   * @return the meata info of the replica which is being written to
+   * @return the meta info of the replica which is being written to
+   * @throws IOException if an error occurs
+   */
+  public ReplicaInPipelineInterface createRbw(Block b) throws IOException;
+
+  /**
+   * Recovers a RBW replica and returns the meta info of the replica
+   * 
+   * @param b block
+   * @param newGS the new generation stamp for the replica
+   * @param minBytesRcvd the minimum number of bytes that the replica could have
+   * @param maxBytesRcvd the maximum number of bytes that the replica could have
+   * @return the meta info of the replica which is being written to
    * @throws IOException if an error occurs
    */
-  public ReplicaInPipelineInterface writeToRbw(Block b, boolean isRecovery)
+  public ReplicaInPipelineInterface recoverRbw(Block b, 
+      long newGS, long minBytesRcvd, long maxBytesRcvd)
   throws IOException;
 
   /**
-   * Append to a finalized replica and returns output streams to write data and CRC
+   * Append to a finalized replica and returns the meta info of the replica
+   * 
    * @param b block
+   * @param newGS the new generation stamp for the replica
+   * @param expectedBlockLen the number of bytes the replica is expected to have
    * @return the meata info of the replica which is being written to
    * @throws IOException
    */
-  public ReplicaInPipelineInterface append(Block b) throws IOException;
+  public ReplicaInPipelineInterface append(Block b, 
+      long newGS, long expectedBlockLen) throws IOException;
 
   /**
+   * Recover a failed append to a finalized replica
+   * and returns the meta info of the replica
+   * 
+   * @param b block
+   * @param newGS the new generation stamp for the replica
+   * @param expectedBlockLen the number of bytes the replica is expected to have
+   * @return the meta info of the replica which is being written to
+   * @throws IOException
+   */
+  public ReplicaInPipelineInterface recoverAppend(Block b,
+      long newGS, long expectedBlockLen) throws IOException;
+  
+  /**
    * Update the block to the new generation stamp and length.  
    */
   public void updateBlock(Block oldblock, Block newblock) throws IOException;

Copied: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaAlreadyExistsException.java (from r818583, hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockAlreadyExistsException.java)
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaAlreadyExistsException.java?p2=hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaAlreadyExistsException.java&p1=hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockAlreadyExistsException.java&r1=818583&r2=818622&rev=818622&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockAlreadyExistsException.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaAlreadyExistsException.java Thu Sep 24 20:51:38 2009
@@ -25,14 +25,14 @@
  * Exception indicating that the target block already exists 
  * and is not set to be recovered/overwritten.  
  */
-class BlockAlreadyExistsException extends IOException {
+class ReplicaAlreadyExistsException extends IOException {
   private static final long serialVersionUID = 1L;
 
-  public BlockAlreadyExistsException() {
+  public ReplicaAlreadyExistsException() {
     super();
   }
 
-  public BlockAlreadyExistsException(String msg) {
+  public ReplicaAlreadyExistsException(String msg) {
     super(msg);
   }
 }

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java?rev=818622&r1=818621&r2=818622&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java Thu Sep 24 20:51:38 2009
@@ -15,6 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
+
 package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.IOException;
@@ -22,12 +24,30 @@
 import org.apache.hadoop.hdfs.protocol.Block;
 
 /**
- * Exception indicating that a replica is not found.
+ * Exception indicating that DataNode does not have a replica
+ * that matches the target block.  
  */
 class ReplicaNotFoundException extends IOException {
   private static final long serialVersionUID = 1L;
+  final static String NON_RBW_REPLICA = "Cannot recover a non-RBW replica ";
+  final static String UNFINALIZED_REPLICA = 
+    "Cannot append to an unfinalized replica ";
+  final static String UNFINALIZED_AND_NONRBW_REPLICA = 
+    "Cannot recover appending to a replica that's not FINALIZED and not RBW ";
+  final static String NON_EXISTENT_REPLICA =
+    "Cannot append to a non-existent replica ";
+  final static String UNEXPECTED_GS_REPLICA =
+    "Cannot append to a replica with unexpeted generation stamp ";
+
+  public ReplicaNotFoundException() {
+    super();
+  }
 
   ReplicaNotFoundException(Block b) {
     super("Replica not found for " + b);
   }
-}
\ No newline at end of file
+  
+  public ReplicaNotFoundException(String msg) {
+    super(msg);
+  }
+}

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=818622&r1=818621&r2=818622&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Thu Sep 24 20:51:38 2009
@@ -3753,24 +3753,13 @@
     return nextGenerationStamp();
   }
 
-  /**
-   * Get a new generation stamp together with an access token for 
-   * a block under construction
-   * 
-   * This method is called for recovering a failed pipeline or setting up
-   * a pipeline to append to a block.
-   * 
-   * @param block a block
-   * @param clientName the name of a client
-   * @return a located block with a new generation stamp and an access token
-   * @throws IOException if any error occurs
-   */
-  synchronized LocatedBlock getNewStampForPipeline(Block block, String clientName) 
+
+  private INodeFileUnderConstruction checkUCBlock(Block block, String clientName) 
   throws IOException {
     // check safe mode
     if (isInSafeMode())
       throw new SafeModeException("Cannot get a new generation stamp and an " +
-      		"access token for block " + block, safeMode);
+                                "access token for block " + block, safeMode);
     
     // check stored block state
     BlockInfo storedBlock = blockManager.getStoredBlock(block);
@@ -3794,6 +3783,26 @@
           " is accessed by a non lease holder " + clientName); 
     }
 
+    return pendingFile;
+  }
+  
+  /**
+   * Get a new generation stamp together with an access token for 
+   * a block under construction
+   * 
+   * This method is called for recovering a failed pipeline or setting up
+   * a pipeline to append to a block.
+   * 
+   * @param block a block
+   * @param clientName the name of a client
+   * @return a located block with a new generation stamp and an access token
+   * @throws IOException if any error occurs
+   */
+  synchronized LocatedBlock updateBlockForPipeline(Block block, 
+      String clientName) throws IOException {
+    // check vadility of parameters
+    checkUCBlock(block, clientName);
+    
     // get a new generation stamp and an access token
     block.setGenerationStamp(nextGenerationStamp());
     LocatedBlock locatedBlock = new LocatedBlock(block, new DatanodeInfo[0]);
@@ -3804,6 +3813,72 @@
     return locatedBlock;
   }
   
+  
+  /**
+   * Update a pipeline for a block under construction
+   * 
+   * @param clientName the name of the client
+   * @param oldblock and old block
+   * @param newBlock a new block with a new generation stamp and length
+   * @param newNodes datanodes in the pipeline
+   * @throws IOException if any error occurs
+   */
+  synchronized void updatePipeline(String clientName, Block oldBlock, 
+      Block newBlock, DatanodeID[] newNodes)
+      throws IOException {
+    LOG.info("updatePipeline(block=" + oldBlock
+        + ", newGenerationStamp=" + newBlock.getGenerationStamp()
+        + ", newLength=" + newBlock.getNumBytes()
+        + ", newNodes=" + Arrays.asList(newNodes)
+        + ", clientName=" + clientName
+        + ")");
+
+    // check the vadility of the block and lease holder name
+    final INodeFileUnderConstruction pendingFile = 
+      checkUCBlock(oldBlock, clientName);
+    final BlockInfo oldblockinfo = pendingFile.getLastBlock();
+
+    // check new GS & length: this is not expected
+    if (newBlock.getGenerationStamp() <= oldblockinfo.getGenerationStamp() ||
+        newBlock.getNumBytes() < oldblockinfo.getNumBytes()) {
+      String msg = "Update " + oldBlock + " (len = " + 
+      oldblockinfo.getNumBytes() + ") to an older state: " + newBlock + 
+      " (len = " + newBlock.getNumBytes() +")";
+      LOG.warn(msg);
+      throw new IOException(msg);
+    }
+    
+    // Remove old block from blocks map. This always have to be done
+    // because the generation stamp of this block is changing.
+    blockManager.removeBlockFromMap(oldblockinfo);
+
+    // update last block, construct newblockinfo and add it to the blocks map
+    BlockInfoUnderConstruction newblockinfo = 
+      new BlockInfoUnderConstruction(
+          newBlock, pendingFile.getReplication());
+    blockManager.addINode(newblockinfo, pendingFile);
+
+    // find the DatanodeDescriptor objects
+    DatanodeDescriptor[] descriptors = null;
+    if (newNodes.length > 0) {
+      descriptors = new DatanodeDescriptor[newNodes.length];
+      for(int i = 0; i < newNodes.length; i++) {
+        descriptors[i] = getDatanode(newNodes[i]);
+      }
+    }
+    // add locations into the INodeUnderConstruction
+    pendingFile.setLastBlock(newblockinfo, descriptors);
+
+    // persist blocks only if append is supported
+    String src = leaseManager.findPath(pendingFile);
+    if (supportAppends) {
+      dir.persistBlocks(src, pendingFile);
+      getEditLog().logSync();
+    }
+    LOG.info("updatePipeline(" + oldBlock + ") successfully to " + newBlock);
+    return;
+  }
+
   // rename was successful. If any part of the renamed subtree had
   // files that were being written to, update with new filename.
   //

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=818622&r1=818621&r2=818622&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Thu Sep 24 20:51:38 2009
@@ -672,11 +672,19 @@
   }
 
   @Override
-  public LocatedBlock getNewStampForPipeline(Block block, String clientName)
+  public LocatedBlock updateBlockForPipeline(Block block, String clientName)
   throws IOException {
-    return namesystem.getNewStampForPipeline(block, clientName);
+    return namesystem.updateBlockForPipeline(block, clientName);
   }
 
+
+  @Override
+  public void updatePipeline(String clientName, Block oldBlock,
+      Block newBlock, DatanodeID[] newNodes)
+      throws IOException {
+    namesystem.updatePipeline(clientName, oldBlock, newBlock, newNodes);
+  }
+  
   /** {@inheritDoc} */
   public void commitBlockSynchronization(Block block,
       long newgenerationstamp, long newlength,

Modified: hadoop/hdfs/branches/HDFS-265/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj?rev=818622&r1=818621&r2=818622&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj Thu Sep 24 20:51:38 2009
@@ -35,7 +35,7 @@
 
   before(DataStreamer datastreamer) : callCreateBlockOutputStream(datastreamer) {
     Assert.assertFalse(datastreamer.hasError);
-    Assert.assertEquals(0, datastreamer.errorIndex);
+    Assert.assertEquals(-1, datastreamer.errorIndex);
   }
 
   pointcut pipelineInitNonAppend(DataStreamer datastreamer):
@@ -66,14 +66,12 @@
         + " errorIndex=" + datastreamer.errorIndex);
   }
 
-  pointcut pipelineErrorAfterInit(boolean onError, boolean isAppend,
-      DataStreamer datastreamer):
-    call(* processDatanodeError(boolean, boolean))
-    && args(onError, isAppend)
-    && target(datastreamer)
-    && if(onError && !isAppend);
+  pointcut pipelineErrorAfterInit(DataStreamer datastreamer):
+    call(* processDatanodeError())
+    && within (DFSClient.DFSOutputStream.DataStreamer)
+    && target(datastreamer);
 
-  before(DataStreamer datastreamer) : pipelineErrorAfterInit(boolean, boolean, datastreamer) {
+  before(DataStreamer datastreamer) : pipelineErrorAfterInit(datastreamer) {
     LOG.info("FI: before pipelineErrorAfterInit: errorIndex="
         + datastreamer.errorIndex);
     try {

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java?rev=818622&r1=818621&r2=818622&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java Thu Sep 24 20:51:38 2009
@@ -55,7 +55,7 @@
 
       // test getNewStampAndToken on a finalized block
       try {
-        namenode.getNewStampForPipeline(firstBlock, "");
+        namenode.updateBlockForPipeline(firstBlock, "");
         Assert.fail("Can not get a new GS from a finalized block");
       } catch (IOException e) {
         Assert.assertTrue(e.getMessage().contains("is not under Construction"));
@@ -66,7 +66,7 @@
         long newBlockId = firstBlock.getBlockId() + 1;
         Block newBlock = new Block(newBlockId, 0, 
             firstBlock.getGenerationStamp());
-        namenode.getNewStampForPipeline(newBlock, "");
+        namenode.updateBlockForPipeline(newBlock, "");
         Assert.fail("Cannot get a new GS from a non-existent block");
       } catch (IOException e) {
         Assert.assertTrue(e.getMessage().contains("does not exist"));
@@ -92,7 +92,7 @@
         // test non-lease holder
         DFSClient dfs = ((DistributedFileSystem)fileSys).dfs;
         try {
-          namenode.getNewStampForPipeline(firstBlock, "test" + dfs.clientName);
+          namenode.updateBlockForPipeline(firstBlock, "test" + dfs.clientName);
           Assert.fail("Cannot get a new GS for a non lease holder");
         } catch (LeaseExpiredException e) {
           Assert.assertTrue(e.getMessage().startsWith("Lease mismatch"));
@@ -100,14 +100,14 @@
 
         // test null lease holder
         try {
-          namenode.getNewStampForPipeline(firstBlock, null);
+          namenode.updateBlockForPipeline(firstBlock, null);
           Assert.fail("Cannot get a new GS for a null lease holder");
         } catch (LeaseExpiredException e) {
           Assert.assertTrue(e.getMessage().startsWith("Lease mismatch"));
         }
 
         // test getNewStampAndToken on a rbw block
-        namenode.getNewStampForPipeline(firstBlock, dfs.clientName);
+        namenode.updateBlockForPipeline(firstBlock, dfs.clientName);
       } finally {
         IOUtils.closeStream(out);
       }

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=818622&r1=818621&r2=818622&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java Thu Sep 24 20:51:38 2009
@@ -215,9 +215,12 @@
 
     public void setTimes(String src, long mtime, long atime) throws IOException {}
 
-    public LocatedBlock getNewStampForPipeline(Block block, String clientName)
-    throws IOException { return null; }
+    @Override public LocatedBlock updateBlockForPipeline(Block block, 
+        String clientName) throws IOException { return null; }
 
+    @Override public void updatePipeline(String clientName, Block oldblock,
+        Block newBlock, DatanodeID[] newNodes)
+        throws IOException {}
   }
   
   public void testNotYetReplicatedErrors() throws IOException

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=818622&r1=818621&r2=818622&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Thu Sep 24 20:51:38 2009
@@ -46,6 +46,7 @@
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
@@ -165,13 +166,13 @@
     sendRecvData(description, false);
   }
   
-  private void testWrite(Block block, BlockConstructionStage stage, 
+  private void testWrite(Block block, BlockConstructionStage stage, long newGS,
       String description, Boolean eofExcepted) throws IOException {
     sendBuf.reset();
     recvBuf.reset();
     DataTransferProtocol.Sender.opWriteBlock(sendOut, 
         block.getBlockId(), block.getGenerationStamp(), 0,
-        stage, 0L, 0L, 0L, "cl", null,
+        stage, newGS, block.getNumBytes(), block.getNumBytes(), "cl", null,
         new DatanodeInfo[1], AccessToken.DUMMY_TOKEN);
     if (eofExcepted) {
       ERROR.write(recvOut);
@@ -198,23 +199,29 @@
       // get the first blockid for the file
       Block firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
       // test PIPELINE_SETUP_CREATE on a finalized block
-      testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_CREATE,
+      testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L,
           "Cannot create an existing block", true);
       // test PIPELINE_DATA_STREAMING on a finalized block
-      testWrite(firstBlock, BlockConstructionStage.DATA_STREAMING,
+      testWrite(firstBlock, BlockConstructionStage.DATA_STREAMING, 0L,
           "Unexpected stage", true);
       // test PIPELINE_SETUP_STREAMING_RECOVERY on an existing block
+      long newGS = firstBlock.getGenerationStamp() + 1;
       testWrite(firstBlock, 
-          BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY,
-          "Successful for now", false);
+          BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY, 
+          newGS, "Successful for now", false);
+      firstBlock.setGenerationStamp(newGS);
       // test PIPELINE_SETUP_APPEND on an existing block
+      newGS = firstBlock.getGenerationStamp() + 1;
       testWrite(firstBlock, 
           BlockConstructionStage.PIPELINE_SETUP_APPEND,
-          "Append to a finalized replica", false);
-      // test PIPELINE_SETUP_APPEND on an existing block
+          newGS, "Append to a finalized replica", false);
+      firstBlock.setGenerationStamp(newGS);
+      // test PIPELINE_SETUP_APPEND_RECOVERY on an existing block
+      newGS = firstBlock.getGenerationStamp() + 1;
       testWrite(firstBlock, 
-          BlockConstructionStage.PIPELINE_SETUP_APPEND_RECOVERY,
+          BlockConstructionStage.PIPELINE_SETUP_APPEND_RECOVERY, newGS,
           "Recover appending to a finalized replica", false);
+      firstBlock.setGenerationStamp(newGS);
 
       /* Test writing to a new block */
       long newBlockId = firstBlock.getBlockId() + 1;
@@ -222,48 +229,58 @@
           firstBlock.getGenerationStamp());
 
       // test PIPELINE_SETUP_CREATE on a new block
-      testWrite(newBlock, BlockConstructionStage.PIPELINE_SETUP_CREATE,
+      testWrite(newBlock, BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L,
           "Create a new block", false);
       // test PIPELINE_SETUP_STREAMING_RECOVERY on a new block
+      newGS = newBlock.getGenerationStamp() + 1;
       newBlock.setBlockId(newBlock.getBlockId()+1);
       testWrite(newBlock, 
-          BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY,
-          "Recover a new block", false);
+          BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY, newGS,
+          "Recover a new block", true);
+      
       // test PIPELINE_SETUP_APPEND on a new block
-      newBlock.setBlockId(newBlock.getBlockId()+1);
+      newGS = newBlock.getGenerationStamp() + 1;
       testWrite(newBlock, 
-          BlockConstructionStage.PIPELINE_SETUP_APPEND,
+          BlockConstructionStage.PIPELINE_SETUP_APPEND, newGS,
           "Cannot append to a new block", true);
+
       // test PIPELINE_SETUP_APPEND_RECOVERY on a new block
       newBlock.setBlockId(newBlock.getBlockId()+1);
+      newGS = newBlock.getGenerationStamp() + 1;
       testWrite(newBlock, 
-          BlockConstructionStage.PIPELINE_SETUP_APPEND_RECOVERY,
+          BlockConstructionStage.PIPELINE_SETUP_APPEND_RECOVERY, newGS,
           "Cannot append to a new block", true);
 
       /* Test writing to RBW replicas */
-      // change first block to a RBW
-      DFSOutputStream out = (DFSOutputStream)(fileSys.append(file).
+      Path file1 = new Path("dataprotocol1.dat");    
+      DFSTestUtil.createFile(fileSys, file1, 1L, (short)numDataNodes, 0L);
+      DFSOutputStream out = (DFSOutputStream)(fileSys.append(file1).
           getWrappedStream()); 
       out.write(1);
       out.hflush();
-      FSDataInputStream in = fileSys.open(file);
+      FSDataInputStream in = fileSys.open(file1);
       firstBlock = DFSTestUtil.getAllBlocks(in).get(0).getBlock();
+      firstBlock.setNumBytes(2L);
       
       try {
         // test PIPELINE_SETUP_CREATE on a RBW block
-        testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_CREATE,
+        testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L,
             "Cannot create a RBW block", true);
         // test PIPELINE_SETUP_APPEND on an existing block
+        newGS = newBlock.getGenerationStamp() + 1;
         testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_APPEND,
-            "Cannot append to a RBW replica", true);
+            newGS, "Cannot append to a RBW replica", true);
         // test PIPELINE_SETUP_APPEND on an existing block
         testWrite(firstBlock, 
             BlockConstructionStage.PIPELINE_SETUP_APPEND_RECOVERY,
-            "Cannot append to a RBW replica", true);
+            newGS, "Recover append to a RBW replica", false);
+        firstBlock.setGenerationStamp(newGS);
         // test PIPELINE_SETUP_STREAMING_RECOVERY on a RBW block
+        newGS = firstBlock.getGenerationStamp() + 1;
         testWrite(firstBlock, 
             BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY,
-            "Recover a RBW replica", false);
+            newGS, "Recover a RBW replica", false);
+        firstBlock.setGenerationStamp(newGS);
       } finally {
         IOUtils.closeStream(in);
         IOUtils.closeStream(out);

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java?rev=818622&r1=818621&r2=818622&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java Thu Sep 24 20:51:38 2009
@@ -37,7 +37,7 @@
 
 /** This class implements some of tests posted in HADOOP-2658. */
 public class TestFileAppend3 extends junit.framework.TestCase {
-  static final long BLOCK_SIZE = 64 * 1024;
+  static final long BLOCK_SIZE = 3 * 64 * 1024;
   static final short REPLICATION = 3;
   static final int DATANODE_NUM = 5;
 
@@ -220,6 +220,7 @@
     FSDataOutputStream out = fs.append(p);
     final int len2 = (int)BLOCK_SIZE/2; 
     AppendTestUtil.write(out, len1, len2);
+    out.sync();
     
     //c. Rename file to file.new.
     final Path pnew = new Path(p + ".new");

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=818622&r1=818621&r2=818622&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Thu Sep 24 20:51:38 2009
@@ -467,10 +467,11 @@
   }
 
   @Override
-  public ReplicaInPipelineInterface append(Block b) throws IOException {
+  public synchronized ReplicaInPipelineInterface append(Block b,
+      long newGS, long expectedBlockLen) throws IOException {
     BInfo binfo = blockMap.get(b);
     if (binfo == null || !binfo.isFinalized()) {
-      throw new BlockNotFoundException("Block " + b
+      throw new ReplicaNotFoundException("Block " + b
           + " is not valid, and cannot be appended to.");
     }
     binfo.unfinalizeBlock();
@@ -478,30 +479,51 @@
   }
 
   @Override
-  public synchronized ReplicaInPipelineInterface writeToRbw(Block b,
-      boolean isRecovery) throws IOException {
-    if (isValidBlock(b)) {
-      throw new BlockAlreadyExistsException("Block " + b
-          + " is valid, and cannot be written to.");
+  public synchronized ReplicaInPipelineInterface recoverAppend(Block b,
+      long newGS, long expectedBlockLen) throws IOException {
+    BInfo binfo = blockMap.get(b);
+    if (binfo == null || !binfo.isFinalized()) {
+      throw new ReplicaNotFoundException("Block " + b
+          + " is not valid, and cannot be appended to.");
     }
+    if (binfo.isFinalized()) {
+      binfo.unfinalizeBlock();
+    }
+    binfo.theBlock.setGenerationStamp(newGS);
+    return binfo;
+  }
+
+  @Override
+  public synchronized ReplicaInPipelineInterface recoverRbw(Block b,
+      long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException {
     BInfo binfo = blockMap.get(b);
-    if (isRecovery && binfo != null) {
-      return binfo;
+    if ( binfo == null) {
+      throw new ReplicaNotFoundException("Block " + b
+          + " does not exist, and cannot be appended to.");
     }
-    binfo = new BInfo(b, true);
-    blockMap.put(binfo.theBlock, binfo);
+    if (binfo.isFinalized()) {
+      throw new ReplicaAlreadyExistsException("Block " + b
+          + " is valid, and cannot be written to.");
+    }
+    binfo.theBlock.setGenerationStamp(newGS);
     return binfo;
   }
 
   @Override
-  public synchronized ReplicaInPipelineInterface writeToTemporary(Block b)
+  public synchronized ReplicaInPipelineInterface createRbw(Block b) 
+  throws IOException {
+    return createTemporary(b);
+  }
+
+  @Override
+  public synchronized ReplicaInPipelineInterface createTemporary(Block b)
       throws IOException {
     if (isValidBlock(b)) {
-          throw new BlockAlreadyExistsException("Block " + b + 
+          throw new ReplicaAlreadyExistsException("Block " + b + 
               " is valid, and cannot be written to.");
       }
     if (isBeingWritten(b)) {
-        throw new BlockAlreadyExistsException("Block " + b + 
+        throw new ReplicaAlreadyExistsException("Block " + b + 
             " is being written, and cannot be written to.");
     }
     BInfo binfo = new BInfo(b, true);

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java?rev=818622&r1=818621&r2=818622&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java Thu Sep 24 20:51:38 2009
@@ -64,7 +64,7 @@
     int bytesAdded = 0;
     for (int i = startingBlockId; i < startingBlockId+NUMBLOCKS; ++i) {
       Block b = new Block(i, 0, 0); // we pass expected len as zero, - fsdataset should use the sizeof actual data written
-      ReplicaInPipelineInterface bInfo = fsdataset.writeToRbw(b, false);
+      ReplicaInPipelineInterface bInfo = fsdataset.createRbw(b);
       BlockWriteStreams out = bInfo.createStreams();
       try {
         OutputStream dataOut  = out.dataOut;