You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ji...@apache.org on 2013/07/24 09:36:32 UTC

svn commit: r1506425 - in /hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/main/java/org/apache/hadoop/hdfs/server/datanode/ src/main/java/org/apache/hadoop/hdf...

Author: jing9
Date: Wed Jul 24 07:36:32 2013
New Revision: 1506425

URL: http://svn.apache.org/r1506425
Log:
HDFS-5020. Merge change r1506424 from branch-2.

Modified:
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1506425&r1=1506424&r2=1506425&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Jul 24 07:36:32 2013
@@ -213,6 +213,9 @@ Release 2.1.0-beta - 2013-07-02
     HADOOP-9760. Move GSet and related classes to common from HDFS.
     (suresh)
 
+    HDFS-5020. Make DatanodeProtocol#blockReceivedAndDeleted idempotent. 
+    (jing9)
+
   OPTIMIZATIONS
 
     HDFS-4465. Optimize datanode ReplicasMap and ReplicaInfo. (atm)

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1506425&r1=1506424&r2=1506425&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Wed Jul 24 07:36:32 2013
@@ -1321,7 +1321,7 @@ public class BlockManager {
           // Move the block-replication into a "pending" state.
           // The reason we use 'pending' is so we can retry
           // replications that fail after an appropriate amount of time.
-          pendingReplications.increment(block, targets.length);
+          pendingReplications.increment(block, targets);
           if(blockLog.isDebugEnabled()) {
             blockLog.debug(
                 "BLOCK* block " + block
@@ -2601,6 +2601,8 @@ assert storedBlock.findDatanode(dn) < 0 
   void addBlock(DatanodeDescriptor node, Block block, String delHint)
       throws IOException {
     // decrement number of blocks scheduled to this datanode.
+    // for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with 
+    // RECEIVED_BLOCK), we currently also decrease the approximate number. 
     node.decBlocksScheduled();
 
     // get the deletion hint node
@@ -2616,7 +2618,7 @@ assert storedBlock.findDatanode(dn) < 0 
     //
     // Modify the blocks->datanode map and node's map.
     //
-    pendingReplications.decrement(block);
+    pendingReplications.decrement(block, node);
     processAndHandleReportedBlock(node, block, ReplicaState.FINALIZED,
         delHintNode);
   }

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java?rev=1506425&r1=1506424&r2=1506425&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java Wed Jul 24 07:36:32 2013
@@ -22,8 +22,10 @@ import static org.apache.hadoop.util.Tim
 import java.io.PrintWriter;
 import java.sql.Time;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
@@ -71,14 +73,16 @@ class PendingReplicationBlocks {
 
   /**
    * Add a block to the list of pending Replications
+   * @param block The corresponding block
+   * @param targets The DataNodes where replicas of the block should be placed
    */
-  void increment(Block block, int numReplicas) {
+  void increment(Block block, DatanodeDescriptor[] targets) {
     synchronized (pendingReplications) {
       PendingBlockInfo found = pendingReplications.get(block);
       if (found == null) {
-        pendingReplications.put(block, new PendingBlockInfo(numReplicas));
+        pendingReplications.put(block, new PendingBlockInfo(targets));
       } else {
-        found.incrementReplicas(numReplicas);
+        found.incrementReplicas(targets);
         found.setTimeStamp();
       }
     }
@@ -88,15 +92,17 @@ class PendingReplicationBlocks {
    * One replication request for this block has finished.
    * Decrement the number of pending replication requests
    * for this block.
+   * 
+   * @param The DataNode that finishes the replication
    */
-  void decrement(Block block) {
+  void decrement(Block block, DatanodeDescriptor dn) {
     synchronized (pendingReplications) {
       PendingBlockInfo found = pendingReplications.get(block);
       if (found != null) {
         if(LOG.isDebugEnabled()) {
           LOG.debug("Removing pending replication for " + block);
         }
-        found.decrementReplicas();
+        found.decrementReplicas(dn);
         if (found.getNumReplicas() <= 0) {
           pendingReplications.remove(block);
         }
@@ -153,7 +159,7 @@ class PendingReplicationBlocks {
         return null;
       }
       Block[] blockList = timedOutItems.toArray(
-                                                new Block[timedOutItems.size()]);
+          new Block[timedOutItems.size()]);
       timedOutItems.clear();
       return blockList;
     }
@@ -163,16 +169,17 @@ class PendingReplicationBlocks {
    * An object that contains information about a block that 
    * is being replicated. It records the timestamp when the 
    * system started replicating the most recent copy of this
-   * block. It also records the number of replication
-   * requests that are in progress.
+   * block. It also records the list of Datanodes where the 
+   * replication requests are in progress.
    */
   static class PendingBlockInfo {
     private long timeStamp;
-    private int numReplicasInProgress;
+    private final List<DatanodeDescriptor> targets;
 
-    PendingBlockInfo(int numReplicas) {
+    PendingBlockInfo(DatanodeDescriptor[] targets) {
       this.timeStamp = now();
-      this.numReplicasInProgress = numReplicas;
+      this.targets = targets == null ? new ArrayList<DatanodeDescriptor>()
+          : new ArrayList<DatanodeDescriptor>(Arrays.asList(targets));
     }
 
     long getTimeStamp() {
@@ -183,17 +190,20 @@ class PendingReplicationBlocks {
       timeStamp = now();
     }
 
-    void incrementReplicas(int increment) {
-      numReplicasInProgress += increment;
+    void incrementReplicas(DatanodeDescriptor... newTargets) {
+      if (newTargets != null) {
+        for (DatanodeDescriptor dn : newTargets) {
+          targets.add(dn);
+        }
+      }
     }
 
-    void decrementReplicas() {
-      numReplicasInProgress--;
-      assert(numReplicasInProgress >= 0);
+    void decrementReplicas(DatanodeDescriptor dn) {
+      targets.remove(dn);
     }
 
     int getNumReplicas() {
-      return numReplicasInProgress;
+      return targets.size();
     }
   }
 
@@ -274,7 +284,7 @@ class PendingReplicationBlocks {
         out.println(block + 
                     " StartTime: " + new Time(pendingBlock.timeStamp) +
                     " NumReplicaInProgress: " + 
-                    pendingBlock.numReplicasInProgress);
+                    pendingBlock.getNumReplicas());
       }
     }
   }

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1506425&r1=1506424&r2=1506425&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Wed Jul 24 07:36:32 2013
@@ -977,7 +977,8 @@ public class DataNode extends Configured
    * @return BP registration object
    * @throws IOException
    */
-  DatanodeRegistration getDNRegistrationForBP(String bpid) 
+  @VisibleForTesting
+  public DatanodeRegistration getDNRegistrationForBP(String bpid) 
   throws IOException {
     BPOfferService bpos = blockPoolManager.get(bpid);
     if(bpos==null || bpos.bpRegistration==null) {

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java?rev=1506425&r1=1506424&r2=1506425&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java Wed Jul 24 07:36:32 2013
@@ -138,7 +138,7 @@ public interface DatanodeProtocol {
    * writes a new Block here, or another DataNode copies a Block to
    * this DataNode, it will call blockReceived().
    */
-  @AtMostOnce
+  @Idempotent
   public void blockReceivedAndDeleted(DatanodeRegistration registration,
                             String poolId,
                             StorageReceivedDeletedBlocks[] rcvdAndDeletedBlocks)

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java?rev=1506425&r1=1506424&r2=1506425&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java Wed Jul 24 07:36:32 2013
@@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.server.bl
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.util.ArrayList;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -28,13 +30,21 @@ import org.apache.hadoop.hdfs.Distribute
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
+import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.junit.Test;
 
+import com.google.common.base.Preconditions;
+
 /**
  * This class tests the internals of PendingReplicationBlocks.java,
  * as well as how PendingReplicationBlocks acts in BlockManager
@@ -44,7 +54,22 @@ public class TestPendingReplication {
   private static final int DFS_REPLICATION_INTERVAL = 1;
   // Number of datanodes in the cluster
   private static final int DATANODE_COUNT = 5;
+  
+  private DatanodeDescriptor genDatanodeId(int seed) {
+    seed = seed % 256;
+    String ip = seed + "." + seed + "." + seed + "." + seed;
+    return DFSTestUtil.getDatanodeDescriptor(ip, null);
+  }
 
+  private DatanodeDescriptor[] genDatanodes(int number) {
+    Preconditions.checkArgument(number >= 0);
+    DatanodeDescriptor[] nodes = new DatanodeDescriptor[number];
+    for (int i = 0; i < number; i++) {
+      nodes[i] = genDatanodeId(i);
+    }
+    return nodes;
+  }
+  
   @Test
   public void testPendingReplication() {
     PendingReplicationBlocks pendingReplications;
@@ -56,7 +81,7 @@ public class TestPendingReplication {
     //
     for (int i = 0; i < 10; i++) {
       Block block = new Block(i, i, 0);
-      pendingReplications.increment(block, i);
+      pendingReplications.increment(block, genDatanodes(i));
     }
     assertEquals("Size of pendingReplications ",
                  10, pendingReplications.size());
@@ -66,15 +91,16 @@ public class TestPendingReplication {
     // remove one item and reinsert it
     //
     Block blk = new Block(8, 8, 0);
-    pendingReplications.decrement(blk);             // removes one replica
+    pendingReplications.decrement(blk, genDatanodeId(7)); // removes one replica
     assertEquals("pendingReplications.getNumReplicas ",
                  7, pendingReplications.getNumReplicas(blk));
 
     for (int i = 0; i < 7; i++) {
-      pendingReplications.decrement(blk);           // removes all replicas
+      // removes all replicas
+      pendingReplications.decrement(blk, genDatanodeId(i));
     }
     assertTrue(pendingReplications.size() == 9);
-    pendingReplications.increment(blk, 8);
+    pendingReplications.increment(blk, genDatanodes(8));
     assertTrue(pendingReplications.size() == 10);
 
     //
@@ -102,7 +128,7 @@ public class TestPendingReplication {
 
     for (int i = 10; i < 15; i++) {
       Block block = new Block(i, i, 0);
-      pendingReplications.increment(block, i);
+      pendingReplications.increment(block, genDatanodes(i));
     }
     assertTrue(pendingReplications.size() == 15);
 
@@ -134,6 +160,101 @@ public class TestPendingReplication {
   }
   
   /**
+   * Test if DatanodeProtocol#blockReceivedAndDeleted can correctly update the
+   * pending replications. Also make sure the blockReceivedAndDeleted call is
+   * idempotent to the pending replications. 
+   */
+  @Test
+  public void testBlockReceived() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
+          DATANODE_COUNT).build();
+      cluster.waitActive();
+
+      DistributedFileSystem hdfs = cluster.getFileSystem();
+      FSNamesystem fsn = cluster.getNamesystem();
+      BlockManager blkManager = fsn.getBlockManager();
+    
+      final String file = "/tmp.txt";
+      final Path filePath = new Path(file);
+      short replFactor = 1;
+      DFSTestUtil.createFile(hdfs, filePath, 1024L, replFactor, 0);
+
+      // temporarily stop the heartbeat
+      ArrayList<DataNode> datanodes = cluster.getDataNodes();
+      for (int i = 0; i < DATANODE_COUNT; i++) {
+        DataNodeTestUtils.setHeartbeatsDisabledForTests(datanodes.get(i), true);
+      }
+
+      hdfs.setReplication(filePath, (short) DATANODE_COUNT);
+      BlockManagerTestUtil.computeAllPendingWork(blkManager);
+
+      assertEquals(1, blkManager.pendingReplications.size());
+      INodeFile fileNode = fsn.getFSDirectory().getINode4Write(file).asFile();
+      Block[] blocks = fileNode.getBlocks();
+      assertEquals(DATANODE_COUNT - 1,
+          blkManager.pendingReplications.getNumReplicas(blocks[0]));
+
+      LocatedBlock locatedBlock = hdfs.getClient().getLocatedBlocks(file, 0)
+          .get(0);
+      DatanodeInfo existingDn = (locatedBlock.getLocations())[0];
+      int reportDnNum = 0;
+      String poolId = cluster.getNamesystem().getBlockPoolId();
+      // let two datanodes (other than the one that already has the data) to
+      // report to NN
+      for (int i = 0; i < DATANODE_COUNT && reportDnNum < 2; i++) {
+        if (!datanodes.get(i).getDatanodeId().equals(existingDn)) {
+          DatanodeRegistration dnR = datanodes.get(i).getDNRegistrationForBP(
+              poolId);
+          StorageReceivedDeletedBlocks[] report = { 
+              new StorageReceivedDeletedBlocks(dnR.getStorageID(),
+              new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo(
+                  blocks[0], BlockStatus.RECEIVED_BLOCK, "") }) };
+          cluster.getNameNodeRpc().blockReceivedAndDeleted(dnR, poolId, report);
+          reportDnNum++;
+        }
+      }
+
+      assertEquals(DATANODE_COUNT - 3,
+          blkManager.pendingReplications.getNumReplicas(blocks[0]));
+
+      // let the same datanodes report again
+      for (int i = 0; i < DATANODE_COUNT && reportDnNum < 2; i++) {
+        if (!datanodes.get(i).getDatanodeId().equals(existingDn)) {
+          DatanodeRegistration dnR = datanodes.get(i).getDNRegistrationForBP(
+              poolId);
+          StorageReceivedDeletedBlocks[] report = 
+            { new StorageReceivedDeletedBlocks(dnR.getStorageID(),
+              new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo(
+                  blocks[0], BlockStatus.RECEIVED_BLOCK, "") }) };
+          cluster.getNameNodeRpc().blockReceivedAndDeleted(dnR, poolId, report);
+          reportDnNum++;
+        }
+      }
+
+      assertEquals(DATANODE_COUNT - 3,
+          blkManager.pendingReplications.getNumReplicas(blocks[0]));
+
+      // re-enable heartbeat for the datanode that has data
+      for (int i = 0; i < DATANODE_COUNT; i++) {
+        DataNodeTestUtils
+            .setHeartbeatsDisabledForTests(datanodes.get(i), false);
+        DataNodeTestUtils.triggerHeartbeat(datanodes.get(i));
+      }
+
+      Thread.sleep(5000);
+      assertEquals(0, blkManager.pendingReplications.size());
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+  
+  /**
    * Test if BlockManager can correctly remove corresponding pending records
    * when a file is deleted
    *