You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2020/02/13 17:58:51 UTC

[hadoop] branch branch-3.1 updated: HDFS-15086. Block scheduled counter never get decremet if the block got deleted before replication. Contributed by hemanthboyina.

This is an automated email from the ASF dual-hosted git repository.

surendralilhore pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 6a4297a  HDFS-15086. Block scheduled counter never get decremet if the block got deleted before replication. Contributed by hemanthboyina.
6a4297a is described below

commit 6a4297adc47befe87bef275e1f853df42fa70515
Author: Surendra Singh Lilhore <su...@apache.org>
AuthorDate: Thu Feb 13 16:30:19 2020 +0530

    HDFS-15086. Block scheduled counter never get decremet if the block got deleted before replication. Contributed by hemanthboyina.
---
 .../hdfs/server/blockmanagement/BlockManager.java  | 18 ++++--
 .../server/blockmanagement/DatanodeManager.java    | 20 +++++-
 .../PendingReconstructionBlocks.java               | 42 ++++++++----
 .../hadoop/hdfs/TestBlocksScheduledCounter.java    | 75 +++++++++++++++++++++-
 .../blockmanagement/TestPendingReconstruction.java | 26 ++++----
 5 files changed, 146 insertions(+), 35 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 9edf104..571a62c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -83,6 +83,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult;
 import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState;
 import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.PendingReconstructionBlocks.PendingBlockInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
@@ -1097,15 +1098,15 @@ public class BlockManager implements BlockStatsMXBean {
       DatanodeStorageInfo[] expectedStorages =
           blk.getUnderConstructionFeature().getExpectedStorageLocations();
       if (expectedStorages.length - blk.numNodes() > 0) {
-        ArrayList<DatanodeDescriptor> pendingNodes = new ArrayList<>();
+        ArrayList<DatanodeStorageInfo> pendingNodes = new ArrayList<>();
         for (DatanodeStorageInfo storage : expectedStorages) {
           DatanodeDescriptor dnd = storage.getDatanodeDescriptor();
           if (blk.findStorageInfo(dnd) == null) {
-            pendingNodes.add(dnd);
+            pendingNodes.add(storage);
           }
         }
         pendingReconstruction.increment(blk,
-            pendingNodes.toArray(new DatanodeDescriptor[pendingNodes.size()]));
+            pendingNodes.toArray(new DatanodeStorageInfo[pendingNodes.size()]));
       }
     }
   }
@@ -2155,8 +2156,7 @@ public class BlockManager implements BlockStatsMXBean {
     // Move the block-replication into a "pending" state.
     // The reason we use 'pending' is so we can retry
     // reconstructions that fail after an appropriate amount of time.
-    pendingReconstruction.increment(block,
-        DatanodeStorageInfo.toDatanodeDescriptors(targets));
+    pendingReconstruction.increment(block, targets);
     blockLog.debug("BLOCK* block {} is moved from neededReconstruction to "
         + "pendingReconstruction", block);
 
@@ -4023,7 +4023,7 @@ public class BlockManager implements BlockStatsMXBean {
     BlockInfo storedBlock = getStoredBlock(block);
     if (storedBlock != null &&
         block.getGenerationStamp() == storedBlock.getGenerationStamp()) {
-      if (pendingReconstruction.decrement(storedBlock, node)) {
+      if (pendingReconstruction.decrement(storedBlock, storageInfo)) {
         NameNode.getNameNodeMetrics().incSuccessfulReReplications();
       }
     }
@@ -4438,7 +4438,11 @@ public class BlockManager implements BlockStatsMXBean {
     addToInvalidates(block);
     removeBlockFromMap(block);
     // Remove the block from pendingReconstruction and neededReconstruction
-    pendingReconstruction.remove(block);
+    PendingBlockInfo remove = pendingReconstruction.remove(block);
+    if (remove != null) {
+      DatanodeStorageInfo.decrementBlocksScheduled(remove.getTargets()
+          .toArray(new DatanodeStorageInfo[remove.getTargets().size()]));
+    }
     neededReconstruction.remove(block, LowRedundancyBlocks.LEVEL);
     postponedMisreplicatedBlocks.remove(block);
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 539d898..a8de424 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -1687,8 +1687,24 @@ public class DatanodeManager {
       List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
           numReplicationTasks);
       if (pendingList != null && !pendingList.isEmpty()) {
-        cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
-            pendingList));
+        // If the block is deleted, the block size will become
+        // BlockCommand.NO_ACK (LONG.MAX_VALUE) . This kind of block we don't
+        // need
+        // to send for replication or reconstruction
+        Iterator<BlockTargetPair> iterator = pendingList.iterator();
+        while (iterator.hasNext()) {
+          BlockTargetPair cmd = iterator.next();
+          if (cmd.block != null
+              && cmd.block.getNumBytes() == BlockCommand.NO_ACK) {
+            // block deleted
+            DatanodeStorageInfo.decrementBlocksScheduled(cmd.targets);
+            iterator.remove();
+          }
+        }
+        if (!pendingList.isEmpty()) {
+          cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
+              pendingList));
+        }
       }
       // check pending erasure coding tasks
       List<BlockECReconstructionInfo> pendingECList = nodeinfo
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReconstructionBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReconstructionBlocks.java
index fb93d82..6e1af57 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReconstructionBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReconstructionBlocks.java
@@ -81,7 +81,7 @@ class PendingReconstructionBlocks {
    * @param block The corresponding block
    * @param targets The DataNodes where replicas of the block should be placed
    */
-  void increment(BlockInfo block, DatanodeDescriptor... targets) {
+  void increment(BlockInfo block, DatanodeStorageInfo... targets) {
     synchronized (pendingReconstructions) {
       PendingBlockInfo found = pendingReconstructions.get(block);
       if (found == null) {
@@ -101,7 +101,7 @@ class PendingReconstructionBlocks {
    * @param dn The DataNode that finishes the reconstruction
    * @return true if the block is decremented to 0 and got removed.
    */
-  boolean decrement(BlockInfo block, DatanodeDescriptor dn) {
+  boolean decrement(BlockInfo block, DatanodeStorageInfo dn) {
     boolean removed = false;
     synchronized (pendingReconstructions) {
       PendingBlockInfo found = pendingReconstructions.get(block);
@@ -124,9 +124,9 @@ class PendingReconstructionBlocks {
    *          The given block whose pending reconstruction requests need to be
    *          removed
    */
-  void remove(BlockInfo block) {
+  PendingBlockInfo remove(BlockInfo block) {
     synchronized (pendingReconstructions) {
-      pendingReconstructions.remove(block);
+      return pendingReconstructions.remove(block);
     }
   }
 
@@ -200,11 +200,11 @@ class PendingReconstructionBlocks {
    */
   static class PendingBlockInfo {
     private long timeStamp;
-    private final List<DatanodeDescriptor> targets;
+    private final List<DatanodeStorageInfo> targets;
 
-    PendingBlockInfo(DatanodeDescriptor[] targets) {
+    PendingBlockInfo(DatanodeStorageInfo[] targets) {
       this.timeStamp = monotonicNow();
-      this.targets = targets == null ? new ArrayList<DatanodeDescriptor>()
+      this.targets = targets == null ? new ArrayList<DatanodeStorageInfo>()
           : new ArrayList<>(Arrays.asList(targets));
     }
 
@@ -216,9 +216,9 @@ class PendingReconstructionBlocks {
       timeStamp = monotonicNow();
     }
 
-    void incrementReplicas(DatanodeDescriptor... newTargets) {
+    void incrementReplicas(DatanodeStorageInfo... newTargets) {
       if (newTargets != null) {
-        for (DatanodeDescriptor newTarget : newTargets) {
+        for (DatanodeStorageInfo newTarget : newTargets) {
           if (!targets.contains(newTarget)) {
             targets.add(newTarget);
           }
@@ -226,13 +226,23 @@ class PendingReconstructionBlocks {
       }
     }
 
-    void decrementReplicas(DatanodeDescriptor dn) {
-      targets.remove(dn);
+    void decrementReplicas(DatanodeStorageInfo dn) {
+      Iterator<DatanodeStorageInfo> iterator = targets.iterator();
+      while (iterator.hasNext()) {
+        DatanodeStorageInfo next = iterator.next();
+        if (next.getDatanodeDescriptor() == dn.getDatanodeDescriptor()) {
+          iterator.remove();
+        }
+      }
     }
 
     int getNumReplicas() {
       return targets.size();
     }
+
+    List<DatanodeStorageInfo> getTargets() {
+      return targets;
+    }
   }
 
   /*
@@ -318,4 +328,14 @@ class PendingReconstructionBlocks {
       }
     }
   }
+
+  List<DatanodeStorageInfo> getTargets(BlockInfo block) {
+    synchronized (pendingReconstructions) {
+      PendingBlockInfo found = pendingReconstructions.get(block);
+      if (found != null) {
+        return found.targets;
+      }
+    }
+    return null;
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java
index 1894278..95f4e2c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java
@@ -22,11 +22,19 @@ import static org.junit.Assert.assertEquals;
 import java.io.IOException;
 import java.util.ArrayList;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.junit.After;
 import org.junit.Test;
 
@@ -129,4 +137,69 @@ public class TestBlocksScheduledCounter {
           0, descriptor.getBlocksScheduled());
     }
   }
-}
+
+  /**
+   * Test if Block Scheduled counter decrement if scheduled blocks file is.
+   * deleted
+   * @throws Exception
+   */
+  @Test
+  public void testScheduledBlocksCounterDecrementOnDeletedBlock()
+      throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
+    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(5).build();
+    cluster.waitActive();
+    BlockManager bm = cluster.getNamesystem().getBlockManager();
+    try {
+      DistributedFileSystem dfs = cluster.getFileSystem();
+      // 1. create a file
+      Path filePath = new Path("/tmp.txt");
+      DFSTestUtil.createFile(dfs, filePath, 1024, (short) 3, 0L);
+
+      // 2. disable the heartbeats
+      for (DataNode dn : cluster.getDataNodes()) {
+        DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
+      }
+
+      DatanodeManager datanodeManager =
+          cluster.getNamesystem().getBlockManager().getDatanodeManager();
+      ArrayList<DatanodeDescriptor> dnList =
+          new ArrayList<DatanodeDescriptor>();
+      datanodeManager.fetchDatanodes(dnList, dnList, false);
+
+      // 3. mark a couple of blocks as corrupt
+      LocatedBlock block = NameNodeAdapter
+          .getBlockLocations(cluster.getNameNode(), filePath.toString(), 0, 1)
+          .get(0);
+      DatanodeInfo[] locs = block.getLocations();
+      cluster.getNamesystem().writeLock();
+      try {
+        bm.findAndMarkBlockAsCorrupt(block.getBlock(), locs[0], "STORAGE_ID",
+            "TEST");
+        bm.findAndMarkBlockAsCorrupt(block.getBlock(), locs[1], "STORAGE_ID",
+            "TEST");
+        BlockManagerTestUtil.computeAllPendingWork(bm);
+        BlockManagerTestUtil.updateState(bm);
+        assertEquals(1L, bm.getPendingReconstructionBlocksCount());
+      } finally {
+        cluster.getNamesystem().writeUnlock();
+      }
+
+      // 4. delete the file
+      dfs.delete(filePath, true);
+      int blocksScheduled = 0;
+      for (DatanodeDescriptor descriptor : dnList) {
+        if (descriptor.getBlocksScheduled() != 0) {
+          blocksScheduled += descriptor.getBlocksScheduled();
+        }
+      }
+      assertEquals(0, blocksScheduled);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReconstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReconstruction.java
index dc37ec06..340f467 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReconstruction.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReconstruction.java
@@ -85,8 +85,7 @@ public class TestPendingReconstruction {
       BlockInfo block = genBlockInfo(i, i, 0);
       DatanodeStorageInfo[] targets = new DatanodeStorageInfo[i];
       System.arraycopy(storages, 0, targets, 0, i);
-      pendingReconstructions.increment(block,
-          DatanodeStorageInfo.toDatanodeDescriptors(targets));
+      pendingReconstructions.increment(block, targets);
     }
     assertEquals("Size of pendingReconstruction ",
                  10, pendingReconstructions.size());
@@ -96,25 +95,24 @@ public class TestPendingReconstruction {
     // remove one item
     //
     BlockInfo blk = genBlockInfo(8, 8, 0);
-    pendingReconstructions.decrement(blk, storages[7].getDatanodeDescriptor()); // removes one replica
+    pendingReconstructions.decrement(blk, storages[7]); // removes one replica
     assertEquals("pendingReconstructions.getNumReplicas ",
                  7, pendingReconstructions.getNumReplicas(blk));
 
     //
     // insert the same item twice should be counted as once
     //
-    pendingReconstructions.increment(blk, storages[0].getDatanodeDescriptor());
+    pendingReconstructions.increment(blk, storages[0]);
     assertEquals("pendingReconstructions.getNumReplicas ",
         7, pendingReconstructions.getNumReplicas(blk));
 
     for (int i = 0; i < 7; i++) {
       // removes all replicas
-      pendingReconstructions.decrement(blk, storages[i].getDatanodeDescriptor());
+      pendingReconstructions.decrement(blk, storages[i]);
     }
     assertTrue(pendingReconstructions.size() == 9);
     pendingReconstructions.increment(blk,
-        DatanodeStorageInfo.toDatanodeDescriptors(
-            DFSTestUtil.createDatanodeStorageInfos(8)));
+        DFSTestUtil.createDatanodeStorageInfos(8));
     assertTrue(pendingReconstructions.size() == 10);
 
     //
@@ -144,8 +142,7 @@ public class TestPendingReconstruction {
     for (int i = 10; i < 15; i++) {
       BlockInfo block = genBlockInfo(i, i, 0);
       pendingReconstructions.increment(block,
-          DatanodeStorageInfo.toDatanodeDescriptors(
-              DFSTestUtil.createDatanodeStorageInfos(i)));
+          DFSTestUtil.createDatanodeStorageInfos(i));
     }
     assertEquals(15, pendingReconstructions.size());
     assertEquals(0L, pendingReconstructions.getNumTimedOuts());
@@ -213,8 +210,7 @@ public class TestPendingReconstruction {
       blockInfo = new BlockInfoContiguous(block, (short) 3);
 
       pendingReconstruction.increment(blockInfo,
-          DatanodeStorageInfo.toDatanodeDescriptors(
-              DFSTestUtil.createDatanodeStorageInfos(1)));
+          DFSTestUtil.createDatanodeStorageInfos(1));
       BlockCollection bc = Mockito.mock(BlockCollection.class);
       // Place into blocksmap with GenerationStamp = 1
       blockInfo.setGenerationStamp(1);
@@ -230,8 +226,7 @@ public class TestPendingReconstruction {
       block = new Block(2, 2, 0);
       blockInfo = new BlockInfoContiguous(block, (short) 3);
       pendingReconstruction.increment(blockInfo,
-          DatanodeStorageInfo.toDatanodeDescriptors(
-              DFSTestUtil.createDatanodeStorageInfos(1)));
+          DFSTestUtil.createDatanodeStorageInfos(1));
 
       // verify 2 blocks in pendingReconstructions
       assertEquals("Size of pendingReconstructions ", 2,
@@ -277,7 +272,8 @@ public class TestPendingReconstruction {
           getDatanodes().iterator().next() };
 
       // Add a stored block to the pendingReconstruction.
-      pendingReconstruction.increment(storedBlock, desc);
+      pendingReconstruction.increment(blockInfo,
+          DFSTestUtil.createDatanodeStorageInfos(1));
       assertEquals("Size of pendingReconstructions ", 1,
           pendingReconstruction.size());
 
@@ -306,6 +302,8 @@ public class TestPendingReconstruction {
         fsn.writeUnlock();
       }
 
+      GenericTestUtils.waitFor(() -> pendingReconstruction.size() == 0, 500,
+          10000);
       // The pending queue should be empty.
       assertEquals("Size of pendingReconstructions ", 0,
           pendingReconstruction.size());


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org