You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2012/10/19 06:58:56 UTC

svn commit: r1399966 - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/

Author: suresh
Date: Fri Oct 19 04:58:55 2012
New Revision: 1399966

URL: http://svn.apache.org/viewvc?rev=1399966&view=rev
Log:
HDFS-4072. Merging change 1399965 from trunk.

Modified:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1399966&r1=1399965&r2=1399966&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Oct 19 04:58:55 2012
@@ -137,6 +137,9 @@ Release 2.0.3-alpha - Unreleased
 
     HDFS-4055. TestAuditLogs is flaky. (Binglin Chang via eli)
 
+    HDFS-4072. On file deletion remove corresponding blocks pending
+    replications. (Jing Zhao via suresh)
+
 Release 2.0.2-alpha - 2012-09-07 
   
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1399966&r1=1399965&r2=1399966&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Fri Oct 19 04:58:55 2012
@@ -289,7 +289,7 @@ public class BlockManager {
   }
 
   private static BlockTokenSecretManager createBlockTokenSecretManager(
-      final Configuration conf) throws IOException {
+      final Configuration conf) {
     final boolean isEnabled = conf.getBoolean(
         DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, 
         DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT);
@@ -1261,7 +1261,7 @@ public class BlockManager {
           // Move the block-replication into a "pending" state.
           // The reason we use 'pending' is so we can retry
           // replications that fail after an appropriate amount of time.
-          pendingReplications.add(block, targets.length);
+          pendingReplications.increment(block, targets.length);
           if(NameNode.stateChangeLog.isDebugEnabled()) {
             NameNode.stateChangeLog.debug(
                 "BLOCK* block " + block
@@ -1307,8 +1307,11 @@ public class BlockManager {
 
   /**
    * Choose target datanodes according to the replication policy.
-   * @throws IOException if the number of targets < minimum replication.
-   * @see BlockPlacementPolicy#chooseTarget(String, int, DatanodeDescriptor, HashMap, long)
+   * 
+   * @throws IOException
+   *           if the number of targets < minimum replication.
+   * @see BlockPlacementPolicy#chooseTarget(String, int, DatanodeDescriptor,
+   *      List, boolean, HashMap, long)
    */
   public DatanodeDescriptor[] chooseTarget(final String src,
       final int numOfReplicas, final DatanodeDescriptor client,
@@ -1812,7 +1815,7 @@ assert storedBlock.findDatanode(dn) < 0 
 
   /**
    * Queue the given reported block for later processing in the
-   * standby node. {@see PendingDataNodeMessages}.
+   * standby node. @see PendingDataNodeMessages.
    * @param reason a textual reason to report in the debug logs
    */
   private void queueReportedBlock(DatanodeDescriptor dn, Block block,
@@ -1977,14 +1980,15 @@ assert storedBlock.findDatanode(dn) < 0 
   }
   
   /**
-   * Faster version of {@link addStoredBlock()}, intended for use with 
-   * initial block report at startup.  If not in startup safe mode, will
-   * call standard addStoredBlock().
-   * Assumes this method is called "immediately" so there is no need to
-   * refresh the storedBlock from blocksMap.
-   * Doesn't handle underReplication/overReplication, or worry about
+   * Faster version of
+   * {@link #addStoredBlock(BlockInfo, DatanodeDescriptor, DatanodeDescriptor, boolean)}
+   * , intended for use with initial block report at startup. If not in startup
+   * safe mode, will call standard addStoredBlock(). Assumes this method is
+   * called "immediately" so there is no need to refresh the storedBlock from
+   * blocksMap. Doesn't handle underReplication/overReplication, or worry about
    * pendingReplications or corruptReplicas, because it's in startup safe mode.
    * Doesn't log every block, because there are typically millions of them.
+   * 
    * @throws IOException
    */
   private void addStoredBlockImmediate(BlockInfo storedBlock,
@@ -2534,7 +2538,7 @@ assert storedBlock.findDatanode(dn) < 0 
     //
     // Modify the blocks->datanode map and node's map.
     //
-    pendingReplications.remove(block);
+    pendingReplications.decrement(block);
     processAndHandleReportedBlock(node, block, ReplicaState.FINALIZED,
         delHintNode);
   }
@@ -2670,7 +2674,7 @@ assert storedBlock.findDatanode(dn) < 0 
   }
 
   /** 
-   * Simpler, faster form of {@link countNodes()} that only returns the number
+   * Simpler, faster form of {@link #countNodes(Block)} that only returns the number
    * of live nodes.  If in startup safemode (or its 30-sec extension period),
    * then it gains speed by ignoring issues of excess replicas or nodes
    * that are decommissioned or in process of becoming decommissioned.
@@ -2819,6 +2823,8 @@ assert storedBlock.findDatanode(dn) < 0 
     addToInvalidates(block);
     corruptReplicas.removeFromCorruptReplicasMap(block);
     blocksMap.removeBlock(block);
+    // Remove the block from pendingReplications
+    pendingReplications.remove(block);
     if (postponedMisreplicatedBlocks.remove(block)) {
       postponedMisreplicatedBlocksCount--;
     }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java?rev=1399966&r1=1399965&r2=1399966&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java Fri Oct 19 04:58:55 2012
@@ -72,7 +72,7 @@ class PendingReplicationBlocks {
   /**
    * Add a block to the list of pending Replications
    */
-  void add(Block block, int numReplicas) {
+  void increment(Block block, int numReplicas) {
     synchronized (pendingReplications) {
       PendingBlockInfo found = pendingReplications.get(block);
       if (found == null) {
@@ -89,7 +89,7 @@ class PendingReplicationBlocks {
    * Decrement the number of pending replication requests
    * for this block.
    */
-  void remove(Block block) {
+  void decrement(Block block) {
     synchronized (pendingReplications) {
       PendingBlockInfo found = pendingReplications.get(block);
       if (found != null) {
@@ -104,6 +104,16 @@ class PendingReplicationBlocks {
     }
   }
 
+  /**
+   * Remove the record about the given block from pendingReplications.
+   * @param block The given block whose pending replication requests need to be
+   *              removed
+   */
+  void remove(Block block) {
+    synchronized (pendingReplications) {
+      pendingReplications.remove(block);
+    }
+  }
 
   public void clear() {
     synchronized (pendingReplications) {

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java?rev=1399966&r1=1399965&r2=1399966&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java Fri Oct 19 04:58:55 2012
@@ -20,14 +20,30 @@ package org.apache.hadoop.hdfs.server.bl
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.junit.Test;
 
 /**
- * This class tests the internals of PendingReplicationBlocks.java
+ * This class tests the internals of PendingReplicationBlocks.java,
+ * as well as how PendingReplicationBlocks acts in BlockManager
  */
 public class TestPendingReplication {
   final static int TIMEOUT = 3;     // 3 seconds
+  private static final int DFS_REPLICATION_INTERVAL = 1;
+  // Number of datanodes in the cluster
+  private static final int DATANODE_COUNT = 5;
 
   @Test
   public void testPendingReplication() {
@@ -40,7 +56,7 @@ public class TestPendingReplication {
     //
     for (int i = 0; i < 10; i++) {
       Block block = new Block(i, i, 0);
-      pendingReplications.add(block, i);
+      pendingReplications.increment(block, i);
     }
     assertEquals("Size of pendingReplications ",
                  10, pendingReplications.size());
@@ -50,15 +66,15 @@ public class TestPendingReplication {
     // remove one item and reinsert it
     //
     Block blk = new Block(8, 8, 0);
-    pendingReplications.remove(blk);             // removes one replica
+    pendingReplications.decrement(blk);             // removes one replica
     assertEquals("pendingReplications.getNumReplicas ",
                  7, pendingReplications.getNumReplicas(blk));
 
     for (int i = 0; i < 7; i++) {
-      pendingReplications.remove(blk);           // removes all replicas
+      pendingReplications.decrement(blk);           // removes all replicas
     }
     assertTrue(pendingReplications.size() == 9);
-    pendingReplications.add(blk, 8);
+    pendingReplications.increment(blk, 8);
     assertTrue(pendingReplications.size() == 10);
 
     //
@@ -86,7 +102,7 @@ public class TestPendingReplication {
 
     for (int i = 10; i < 15; i++) {
       Block block = new Block(i, i, 0);
-      pendingReplications.add(block, i);
+      pendingReplications.increment(block, i);
     }
     assertTrue(pendingReplications.size() == 15);
 
@@ -116,4 +132,70 @@ public class TestPendingReplication {
     }
     pendingReplications.stop();
   }
+  
+  /**
+   * Test if BlockManager can correctly remove corresponding pending records
+   * when a file is deleted
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testPendingAndInvalidate() throws Exception {
+    final Configuration CONF = new HdfsConfiguration();
+    CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
+    CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
+        DFS_REPLICATION_INTERVAL);
+    CONF.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 
+        DFS_REPLICATION_INTERVAL);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(
+        DATANODE_COUNT).build();
+    cluster.waitActive();
+    
+    FSNamesystem namesystem = cluster.getNamesystem();
+    BlockManager bm = namesystem.getBlockManager();
+    DistributedFileSystem fs = cluster.getFileSystem();
+    try {
+      // 1. create a file
+      Path filePath = new Path("/tmp.txt");
+      DFSTestUtil.createFile(fs, filePath, 1024, (short) 3, 0L);
+      
+      // 2. disable the heartbeats
+      for (DataNode dn : cluster.getDataNodes()) {
+        DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
+      }
+      
+      // 3. mark a couple of blocks as corrupt
+      LocatedBlock block = NameNodeAdapter.getBlockLocations(
+          cluster.getNameNode(), filePath.toString(), 0, 1).get(0);
+      cluster.getNamesystem().writeLock();
+      try {
+        bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0],
+            "TEST");
+        bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[1],
+            "TEST");
+      } finally {
+        cluster.getNamesystem().writeUnlock();
+      }
+      BlockManagerTestUtil.computeAllPendingWork(bm);
+      BlockManagerTestUtil.updateState(bm);
+      assertEquals(bm.getPendingReplicationBlocksCount(), 1L);
+      assertEquals(bm.pendingReplications.getNumReplicas(block.getBlock()
+          .getLocalBlock()), 2);
+      
+      // 4. delete the file
+      fs.delete(filePath, true);
+      // retry at most 10 times, each time sleep for 1s. Note that 10s is much
+      // less than the default pending record timeout (5~10min)
+      int retries = 10; 
+      long pendingNum = bm.getPendingReplicationBlocksCount();
+      while (pendingNum != 0 && retries-- > 0) {
+        Thread.sleep(1000);  // let NN do the deletion
+        BlockManagerTestUtil.updateState(bm);
+        pendingNum = bm.getPendingReplicationBlocksCount();
+      }
+      assertEquals(pendingNum, 0L);
+    } finally {
+      cluster.shutdown();
+    }
+  }
 }