You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ki...@apache.org on 2020/10/23 00:49:08 UTC

[hadoop] branch branch-3.2 updated: HDFS-15622. Deleted blocks linger in the replications queue. Contributed by Ahmed Hussein.

This is an automated email from the ASF dual-hosted git repository.

kihwal pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new f363c3b  HDFS-15622. Deleted blocks linger in the replications queue. Contributed by Ahmed Hussein.
f363c3b is described below

commit f363c3b315472e45fd1d0f8225e31184b6c353a1
Author: Kihwal Lee <ki...@apache.org>
AuthorDate: Thu Oct 22 19:48:42 2020 -0500

    HDFS-15622. Deleted blocks linger in the replications queue. Contributed by Ahmed Hussein.
    
    (cherry picked from commit da1b6e3cc286db00b385f3280627d2b2063b4e59)
---
 .../blockmanagement/LowRedundancyBlocks.java       | 30 ++++++++++----
 .../TestLowRedundancyBlockQueues.java              | 47 +++++++++++++++++++++-
 .../blockmanagement/TestReplicationPolicy.java     | 17 ++++++--
 3 files changed, 81 insertions(+), 13 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java
index f6ef248..d719e93 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -500,6 +501,8 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
    * the block count is met or iteration reaches the end of the lowest priority
    * list, in which case bookmarks for each block list are reset to the heads
    * of their respective lists.
+   * If a block is deleted (has invalid bcId), it will be removed from the low
+   * redundancy queues.
    *
    * @param blocksToProcess - number of blocks to fetch from low redundancy
    *          blocks.
@@ -515,21 +518,32 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
 
     int count = 0;
     int priority = 0;
+    HashSet<BlockInfo> toRemove = new HashSet<>();
     for (; count < blocksToProcess && priority < LEVEL; priority++) {
-      if (priority == QUEUE_WITH_CORRUPT_BLOCKS) {
-        // do not choose corrupted blocks.
-        continue;
-      }
-
       // Go through all blocks that need reconstructions with current priority.
       // Set the iterator to the first unprocessed block at this priority level
+      // We do not want to skip QUEUE_WITH_CORRUPT_BLOCKS because we still need
+      // to look for deleted blocks if any.
+      final boolean inCorruptLevel = (QUEUE_WITH_CORRUPT_BLOCKS == priority);
       final Iterator<BlockInfo> i = priorityQueues.get(priority).getBookmark();
       final List<BlockInfo> blocks = new LinkedList<>();
-      blocksToReconstruct.add(blocks);
-      // Loop through all remaining blocks in the list.
+      if (!inCorruptLevel) {
+        blocksToReconstruct.add(blocks);
+      }
       for(; count < blocksToProcess && i.hasNext(); count++) {
-        blocks.add(i.next());
+        BlockInfo block = i.next();
+        if (block.isDeleted()) {
+          toRemove.add(block);
+          continue;
+        }
+        if (!inCorruptLevel) {
+          blocks.add(block);
+        }
+      }
+      for (BlockInfo bInfo : toRemove) {
+        remove(bInfo, priority);
       }
+      toRemove.clear();
     }
 
     if (priority == LEVEL || resetIterators) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java
index e63a8d8..ef614fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.hdfs.StripedFileTestUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -41,6 +42,7 @@ import static org.junit.Assert.fail;
 public class TestLowRedundancyBlockQueues {
 
   private final ErasureCodingPolicy ecPolicy;
+  private static AtomicLong mockINodeId = new AtomicLong(0);
 
   public TestLowRedundancyBlockQueues(ErasureCodingPolicy policy) {
     ecPolicy = policy;
@@ -52,7 +54,15 @@ public class TestLowRedundancyBlockQueues {
   }
 
   private BlockInfo genBlockInfo(long id) {
-    return new BlockInfoContiguous(new Block(id), (short) 3);
+    return genBlockInfo(id, false);
+  }
+
+  private BlockInfo genBlockInfo(long id, boolean isCorruptBlock) {
+    BlockInfo bInfo = new BlockInfoContiguous(new Block(id), (short) 3);
+    if (!isCorruptBlock) {
+      bInfo.setBlockCollectionId(mockINodeId.incrementAndGet());
+    }
+    return bInfo;
   }
 
   private BlockInfo genStripedBlockInfo(long id, long numBytes) {
@@ -93,6 +103,41 @@ public class TestLowRedundancyBlockQueues {
         queues.getHighestPriorityECBlockCount());
   }
 
+  /**
+   * Tests that deleted blocks should not be returned by
+   * {@link LowRedundancyBlocks#chooseLowRedundancyBlocks(int, boolean)}.
+   * @throws Exception
+   */
+  @Test
+  public void testDeletedBlocks() throws Exception {
+    int numBlocks = 5;
+    LowRedundancyBlocks queues = new LowRedundancyBlocks();
+    // create 5 blockinfos. The first one is corrupt.
+    for (int ind = 0; ind < numBlocks; ind++) {
+      BlockInfo blockInfo = genBlockInfo(ind, ind == 0);
+      queues.add(blockInfo, 2, 0, 0, 3);
+    }
+    List<List<BlockInfo>> blocks;
+    // Get two blocks from the queue, but we should only get one because first
+    // block is deleted.
+    blocks = queues.chooseLowRedundancyBlocks(2, false);
+
+    assertEquals(1, blocks.get(2).size());
+    assertEquals(1, blocks.get(2).get(0).getBlockId());
+
+    // Get the next blocks - should be ID 2
+    blocks = queues.chooseLowRedundancyBlocks(1, false);
+    assertEquals(2, blocks.get(2).get(0).getBlockId());
+
+    // Get the next block, but also reset this time - should be ID 3 returned
+    blocks = queues.chooseLowRedundancyBlocks(1, true);
+    assertEquals(3, blocks.get(2).get(0).getBlockId());
+
+    // Get one more block and due to resetting the queue it will be block id 1
+    blocks = queues.chooseLowRedundancyBlocks(1, false);
+    assertEquals(1, blocks.get(2).get(0).getBlockId());
+  }
+
   @Test
   public void testQueuePositionCanBeReset() throws Throwable {
     LowRedundancyBlocks queues = new LowRedundancyBlocks();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
index a6db359..0980c2f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
@@ -38,6 +38,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.AddBlockFlag;
@@ -82,7 +83,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
   // The interval for marking a datanode as stale,
   private static final long staleInterval =
       DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT;
-
+  private static AtomicLong mockINodeId = new AtomicLong(0);
   @Rule
   public ExpectedException exception = ExpectedException.none();
 
@@ -825,7 +826,15 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
   }
 
   private BlockInfo genBlockInfo(long id) {
-    return new BlockInfoContiguous(new Block(id), (short) 3);
+    return genBlockInfo(id, false);
+  }
+
+  private BlockInfo genBlockInfo(long id, boolean isBlockCorrupted) {
+    BlockInfo bInfo = new BlockInfoContiguous(new Block(id), (short) 3);
+    if (!isBlockCorrupted) {
+      bInfo.setBlockCollectionId(mockINodeId.incrementAndGet());
+    }
+    return bInfo;
   }
 
   /**
@@ -848,7 +857,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
         // Adding the blocks directly to normal priority
 
         neededReconstruction.add(genBlockInfo(ThreadLocalRandom.current().
-            nextLong()), 2, 0, 0, 3);
+            nextLong(), true), 2, 0, 0, 3);
       }
       // Lets wait for the replication interval, to start process normal
       // priority blocks
@@ -856,7 +865,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
       
       // Adding the block directly to high priority list
       neededReconstruction.add(genBlockInfo(ThreadLocalRandom.current().
-          nextLong()), 1, 0, 0, 3);
+          nextLong(), true), 1, 0, 0, 3);
 
       // Lets wait for the replication interval
       Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org