You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ki...@apache.org on 2013/11/14 21:14:43 UTC

svn commit: r1542057 - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/...

Author: kihwal
Date: Thu Nov 14 20:14:42 2013
New Revision: 1542057

URL: http://svn.apache.org/r1542057
Log:
svn merge -c 1542054 merging from trunk to branch-2 to fix HDFS-5438. Flaws in block report processing can cause data loss.

Modified:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCorruptFilesJsp.java

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1542057&r1=1542056&r2=1542057&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Nov 14 20:14:42 2013
@@ -182,6 +182,8 @@ Release 2.3.0 - UNRELEASED
     HDFS-5504. In HA mode, OP_DELETE_SNAPSHOT is not decrementing the safemode threshold, 
     leads to NN safemode. (Vinay via jing9)
 
+    HDFS-5438. Flaws in block report processing can cause data loss. (kihwal)
+
 Release 2.2.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java?rev=1542057&r1=1542056&r2=1542057&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java Thu Nov 14 20:14:42 2013
@@ -42,4 +42,8 @@ public class DFSClientFaultInjector {
   public boolean uncorruptPacket() {
     return false;
   }
+
+  public boolean failPacket() {
+    return false;
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1542057&r1=1542056&r2=1542057&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Thu Nov 14 20:14:42 2013
@@ -151,6 +151,7 @@ public class DFSOutputStream extends FSO
   private final short blockReplication; // replication factor of file
   private boolean shouldSyncBlock = false; // force blocks to disk upon close
   private CachingStrategy cachingStrategy;
+  private boolean failPacket = false;
   
   private class Packet {
     long    seqno;               // sequencenumber of buffer in block
@@ -748,6 +749,16 @@ public class DFSOutputStream extends FSO
                                     one.seqno + " but received " + seqno);
             }
             isLastPacketInBlock = one.lastPacketInBlock;
+
+            // Fail the packet write for testing in order to force a
+            // pipeline recovery.
+            if (DFSClientFaultInjector.get().failPacket() &&
+                isLastPacketInBlock) {
+              failPacket = true;
+              throw new IOException(
+                    "Failing the last packet for testing.");
+            }
+              
             // update bytesAcked
             block.setNumBytes(one.getLastByteOffsetBlock());
 
@@ -1032,7 +1043,18 @@ public class DFSOutputStream extends FSO
         accessToken = lb.getBlockToken();
         
         // set up the pipeline again with the remaining nodes
-        success = createBlockOutputStream(nodes, newGS, isRecovery);
+        if (failPacket) { // for testing
+          success = createBlockOutputStream(nodes, newGS-1, isRecovery);
+          failPacket = false;
+          try {
+            // Give DNs time to send in bad reports. In real situations,
+            // good reports should follow bad ones, if client committed
+            // with those nodes.
+            Thread.sleep(2000);
+          } catch (InterruptedException ie) {}
+        } else {
+          success = createBlockOutputStream(nodes, newGS, isRecovery);
+        }
       }
 
       if (success) {
@@ -1894,7 +1916,9 @@ public class DFSOutputStream extends FSO
   // be called during unit tests
   private void completeFile(ExtendedBlock last) throws IOException {
     long localstart = Time.now();
+    long localTimeout = 400;
     boolean fileComplete = false;
+    int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
     while (!fileComplete) {
       fileComplete =
           dfsClient.namenode.complete(src, dfsClient.clientName, last, fileId);
@@ -1910,7 +1934,13 @@ public class DFSOutputStream extends FSO
             throw new IOException(msg);
         }
         try {
-          Thread.sleep(400);
+          Thread.sleep(localTimeout);
+          if (retries == 0) {
+            throw new IOException("Unable to close file because the last block"
+                + " does not have enough number of replicas.");
+          }
+          retries--;
+          localTimeout *= 2;
           if (Time.now() - localstart > 5000) {
             DFSClient.LOG.info("Could not complete " + src + " retrying...");
           }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java?rev=1542057&r1=1542056&r2=1542057&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java Thu Nov 14 20:14:42 2013
@@ -230,6 +230,29 @@ public class BlockInfoUnderConstruction 
   }
 
   /**
+   * Process the recorded replicas. When about to commit or finish the
+   * pipeline recovery sort out bad replicas.
+   * @param genStamp  The final generation stamp for the block.
+   */
+  public void setGenerationStampAndVerifyReplicas(long genStamp) {
+    if (replicas == null)
+      return;
+
+    // Remove the replicas with wrong gen stamp.
+    // The replica list is unchanged.
+    for (ReplicaUnderConstruction r : replicas) {
+      if (genStamp != r.getGenerationStamp()) {
+        r.getExpectedLocation().removeBlock(this);
+        NameNode.blockStateChangeLog.info("BLOCK* Removing stale replica "
+            + "from location: " + r);
+      }
+    }
+
+    // Set the generation stamp for the block.
+    setGenerationStamp(genStamp);
+  }
+
+  /**
    * Commit block's length and generation stamp as reported by the client.
    * Set block state to {@link BlockUCState#COMMITTED}.
    * @param block - contains client reported block length and generation 
@@ -295,9 +318,13 @@ public class BlockInfoUnderConstruction 
   void addReplicaIfNotPresent(DatanodeDescriptor dn,
                      Block block,
                      ReplicaState rState) {
-    for(ReplicaUnderConstruction r : replicas)
-      if(r.getExpectedLocation() == dn)
+    for (ReplicaUnderConstruction r : replicas) {
+      if (r.getExpectedLocation() == dn) {
+        // Record the gen stamp from the report
+        r.setGenerationStamp(block.getGenerationStamp());
         return;
+      }
+    }
     replicas.add(new ReplicaUnderConstruction(block, dn, rState));
   }
 

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1542057&r1=1542056&r2=1542057&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Thu Nov 14 20:14:42 2013
@@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason;
 import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@@ -1046,7 +1047,8 @@ public class BlockManager {
           + blk + " not found");
       return;
     }
-    markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, reason), dn);
+    markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, reason,
+        Reason.CORRUPTION_REPORTED), dn);
   }
 
   private void markBlockAsCorrupt(BlockToMarkCorrupt b,
@@ -1069,7 +1071,8 @@ public class BlockManager {
     node.addBlock(b.stored);
 
     // Add this replica to corruptReplicas Map
-    corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason);
+    corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason,
+        b.reasonCode);
     if (countNodes(b.stored).liveReplicas() >= bc.getBlockReplication()) {
       // the block is over-replicated so invalidate the replicas immediately
       invalidateBlock(b, node);
@@ -1575,22 +1578,27 @@ public class BlockManager {
     final BlockInfo stored;
     /** The reason to mark corrupt. */
     final String reason;
-    
-    BlockToMarkCorrupt(BlockInfo corrupted, BlockInfo stored, String reason) {
+    /** The reason code to be stored */
+    final Reason reasonCode;
+
+    BlockToMarkCorrupt(BlockInfo corrupted, BlockInfo stored, String reason,
+        Reason reasonCode) {
       Preconditions.checkNotNull(corrupted, "corrupted is null");
       Preconditions.checkNotNull(stored, "stored is null");
 
       this.corrupted = corrupted;
       this.stored = stored;
       this.reason = reason;
+      this.reasonCode = reasonCode;
     }
 
-    BlockToMarkCorrupt(BlockInfo stored, String reason) {
-      this(stored, stored, reason);
+    BlockToMarkCorrupt(BlockInfo stored, String reason, Reason reasonCode) {
+      this(stored, stored, reason, reasonCode);
     }
 
-    BlockToMarkCorrupt(BlockInfo stored, long gs, String reason) {
-      this(new BlockInfo(stored), stored, reason);
+    BlockToMarkCorrupt(BlockInfo stored, long gs, String reason,
+        Reason reasonCode) {
+      this(new BlockInfo(stored), stored, reason, reasonCode);
       //the corrupted block in datanode has a different generation stamp
       corrupted.setGenerationStamp(gs);
     }
@@ -1935,9 +1943,11 @@ assert storedBlock.findDatanode(dn) < 0 
       return storedBlock;
     }
 
-    //add replica if appropriate
+    // Add replica if appropriate. If the replica was previously corrupt
+    // but now okay, it might need to be updated.
     if (reportedState == ReplicaState.FINALIZED
-        && storedBlock.findDatanode(dn) < 0) {
+        && (storedBlock.findDatanode(dn) < 0
+        || corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
       toAdd.add(storedBlock);
     }
     return storedBlock;
@@ -2028,12 +2038,13 @@ assert storedBlock.findDatanode(dn) < 0 
           return new BlockToMarkCorrupt(storedBlock, reportedGS,
               "block is " + ucState + " and reported genstamp " + reportedGS
               + " does not match genstamp in block map "
-              + storedBlock.getGenerationStamp());
+              + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
         } else if (storedBlock.getNumBytes() != reported.getNumBytes()) {
           return new BlockToMarkCorrupt(storedBlock,
               "block is " + ucState + " and reported length " +
               reported.getNumBytes() + " does not match " +
-              "length in block map " + storedBlock.getNumBytes());
+              "length in block map " + storedBlock.getNumBytes(),
+              Reason.SIZE_MISMATCH);
         } else {
           return null; // not corrupt
         }
@@ -2049,7 +2060,7 @@ assert storedBlock.findDatanode(dn) < 0 
         return new BlockToMarkCorrupt(storedBlock, reportedGS,
             "reported " + reportedState + " replica with genstamp " + reportedGS
             + " does not match COMPLETE block's genstamp in block map "
-            + storedBlock.getGenerationStamp());
+            + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
       } else { // COMPLETE block, same genstamp
         if (reportedState == ReplicaState.RBW) {
           // If it's a RBW report for a COMPLETE block, it may just be that
@@ -2062,7 +2073,8 @@ assert storedBlock.findDatanode(dn) < 0 
           return null;
         } else {
           return new BlockToMarkCorrupt(storedBlock,
-              "reported replica has invalid state " + reportedState);
+              "reported replica has invalid state " + reportedState,
+              Reason.INVALID_STATE);
         }
       }
     case RUR:       // should not be reported
@@ -2073,7 +2085,7 @@ assert storedBlock.findDatanode(dn) < 0 
       " on " + dn + " size " + storedBlock.getNumBytes();
       // log here at WARN level since this is really a broken HDFS invariant
       LOG.warn(msg);
-      return new BlockToMarkCorrupt(storedBlock, msg);
+      return new BlockToMarkCorrupt(storedBlock, msg, Reason.INVALID_STATE);
     }
   }
 
@@ -2189,6 +2201,11 @@ assert storedBlock.findDatanode(dn) < 0 
         logAddStoredBlock(storedBlock, node);
       }
     } else {
+      // if the same block is added again and the replica was corrupt
+      // previously because of a wrong gen stamp, remove it from the
+      // corrupt block list.
+      corruptReplicas.removeFromCorruptReplicasMap(block, node,
+          Reason.GENSTAMP_MISMATCH);
       curReplicaDelta = 0;
       blockLog.warn("BLOCK* addStoredBlock: "
           + "Redundant addStoredBlock request received for " + storedBlock
@@ -2285,7 +2302,8 @@ assert storedBlock.findDatanode(dn) < 0 
     DatanodeDescriptor[] nodesCopy = nodes.toArray(new DatanodeDescriptor[0]);
     for (DatanodeDescriptor node : nodesCopy) {
       try {
-        if (!invalidateBlock(new BlockToMarkCorrupt(blk, null), node)) {
+        if (!invalidateBlock(new BlockToMarkCorrupt(blk, null,
+              Reason.ANY), node)) {
           removedFromBlocksMap = false;
         }
       } catch (IOException e) {

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java?rev=1542057&r1=1542056&r2=1542057&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java Thu Nov 14 20:14:42 2013
@@ -36,8 +36,18 @@ import java.util.*;
 @InterfaceAudience.Private
 public class CorruptReplicasMap{
 
-  private SortedMap<Block, Collection<DatanodeDescriptor>> corruptReplicasMap =
-    new TreeMap<Block, Collection<DatanodeDescriptor>>();
+  /** The corruption reason code */
+  public static enum Reason {
+    NONE,                // not specified.
+    ANY,                 // wildcard reason
+    GENSTAMP_MISMATCH,   // mismatch in generation stamps
+    SIZE_MISMATCH,       // mismatch in sizes
+    INVALID_STATE,       // invalid state
+    CORRUPTION_REPORTED  // client or datanode reported the corruption
+  }
+
+  private SortedMap<Block, Map<DatanodeDescriptor, Reason>> corruptReplicasMap =
+    new TreeMap<Block, Map<DatanodeDescriptor, Reason>>();
   
   /**
    * Mark the block belonging to datanode as corrupt.
@@ -48,9 +58,22 @@ public class CorruptReplicasMap{
    */
   public void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn,
       String reason) {
-    Collection<DatanodeDescriptor> nodes = getNodes(blk);
+    addToCorruptReplicasMap(blk, dn, reason, Reason.NONE);
+  }
+
+  /**
+   * Mark the block belonging to datanode as corrupt.
+   *
+   * @param blk Block to be added to CorruptReplicasMap
+   * @param dn DatanodeDescriptor which holds the corrupt replica
+   * @param reason a textual reason (for logging purposes)
+   * @param reasonCode the enum representation of the reason
+   */
+  public void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn,
+      String reason, Reason reasonCode) {
+    Map <DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk);
     if (nodes == null) {
-      nodes = new TreeSet<DatanodeDescriptor>();
+      nodes = new HashMap<DatanodeDescriptor, Reason>();
       corruptReplicasMap.put(blk, nodes);
     }
     
@@ -61,8 +84,7 @@ public class CorruptReplicasMap{
       reasonText = "";
     }
     
-    if (!nodes.contains(dn)) {
-      nodes.add(dn);
+    if (!nodes.keySet().contains(dn)) {
       NameNode.blockStateChangeLog.info("BLOCK NameSystem.addToCorruptReplicasMap: "+
                                    blk.getBlockName() +
                                    " added as corrupt on " + dn +
@@ -76,6 +98,8 @@ public class CorruptReplicasMap{
                                    " by " + Server.getRemoteIp() +
                                    reasonText);
     }
+    // Add the node or update the reason.
+    nodes.put(dn, reasonCode);
   }
 
   /**
@@ -97,10 +121,24 @@ public class CorruptReplicasMap{
              false if the replica is not in the map
    */ 
   boolean removeFromCorruptReplicasMap(Block blk, DatanodeDescriptor datanode) {
-    Collection<DatanodeDescriptor> datanodes = corruptReplicasMap.get(blk);
+    return removeFromCorruptReplicasMap(blk, datanode, Reason.ANY);
+  }
+
+  boolean removeFromCorruptReplicasMap(Block blk, DatanodeDescriptor datanode,
+      Reason reason) {
+    Map <DatanodeDescriptor, Reason> datanodes = corruptReplicasMap.get(blk);
+    boolean removed = false;
     if (datanodes==null)
       return false;
-    if (datanodes.remove(datanode)) { // remove the replicas
+
+    // if reasons can be compared but don't match, return false.
+    Reason storedReason = datanodes.get(datanode);
+    if (reason != Reason.ANY && storedReason != null &&
+        reason != storedReason) {
+      return false;
+    }
+
+    if (datanodes.remove(datanode) != null) { // remove the replicas
       if (datanodes.isEmpty()) {
         // remove the block if there is no more corrupted replicas
         corruptReplicasMap.remove(blk);
@@ -118,7 +156,10 @@ public class CorruptReplicasMap{
    * @return collection of nodes. Null if does not exists
    */
   Collection<DatanodeDescriptor> getNodes(Block blk) {
-    return corruptReplicasMap.get(blk);
+    Map <DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk);
+    if (nodes == null)
+      return null;
+    return nodes.keySet();
   }
 
   /**

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1542057&r1=1542056&r2=1542057&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Thu Nov 14 20:14:42 2013
@@ -5867,8 +5867,8 @@ public class FSNamesystem implements Nam
     }
 
     // Update old block with the new generation stamp and new length
-    blockinfo.setGenerationStamp(newBlock.getGenerationStamp());
     blockinfo.setNumBytes(newBlock.getNumBytes());
+    blockinfo.setGenerationStampAndVerifyReplicas(newBlock.getGenerationStamp());
 
     // find the DatanodeDescriptor objects
     final DatanodeManager dm = getBlockManager().getDatanodeManager();

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java?rev=1542057&r1=1542056&r2=1542057&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java Thu Nov 14 20:14:42 2013
@@ -26,10 +26,14 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.io.IOUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
+
 /**
  * This tests pipeline recovery related client protocol works correct or not.
  */
@@ -112,4 +116,55 @@ public class TestClientProtocolForPipeli
       cluster.shutdown();
     }
   }
+
+  /** Test whether corrupt replicas are detected correctly during pipeline
+   * recoveries.
+   */
+  @Test
+  public void testPipelineRecoveryForLastBlock() throws IOException {
+    DFSClientFaultInjector faultInjector
+        = Mockito.mock(DFSClientFaultInjector.class);
+    DFSClientFaultInjector oldInjector = DFSClientFaultInjector.instance;
+    DFSClientFaultInjector.instance = faultInjector;
+    Configuration conf = new HdfsConfiguration();
+
+    conf.setInt(DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, 3);
+    MiniDFSCluster cluster = null;
+
+    try {
+      int numDataNodes = 3;
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
+      cluster.waitActive();
+      FileSystem fileSys = cluster.getFileSystem();
+
+      Path file = new Path("dataprotocol1.dat");
+      Mockito.when(faultInjector.failPacket()).thenReturn(true);
+      try {
+        DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L);
+      } catch (IOException e) {
+        // completeFile() should fail.
+        Assert.assertTrue(e.getMessage().startsWith("Unable to close file"));
+        return;
+      }
+
+      // At this point, NN let data corruption to happen. 
+      // Before failing test, try reading the file. It should fail.
+      FSDataInputStream in = fileSys.open(file);
+      try {
+        int c = in.read();
+        // Test will fail with BlockMissingException if NN does not update the
+        // replica state based on the latest report.
+      } catch (org.apache.hadoop.hdfs.BlockMissingException bme) {
+        Assert.fail("Block is missing because the file was closed with"
+            + " corrupt replicas.");
+      }
+      Assert.fail("The file was closed with corrupt replicas, but read still"
+          + " works!");
+    } finally {
+      DFSClientFaultInjector.instance = oldInjector;
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCorruptFilesJsp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCorruptFilesJsp.java?rev=1542057&r1=1542056&r2=1542057&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCorruptFilesJsp.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCorruptFilesJsp.java Thu Nov 14 20:14:42 2013
@@ -93,6 +93,10 @@ public class TestCorruptFilesJsp  {
         in.close();
       }
 
+      try {
+        Thread.sleep(3000); // Wait for block reports. They shouldn't matter.
+      } catch (InterruptedException ie) {}
+
       // verify if all corrupt files were reported to NN
       badFiles = namenode.getNamesystem().listCorruptFileBlocks("/", null);
       assertTrue("Expecting 3 corrupt files, but got " + badFiles.size(),