You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2019/10/04 05:27:43 UTC
[hadoop] branch branch-3.2 updated: HDFS-14637. Namenode may not
replicate blocks to meet the policy after enabling upgradeDomain.
Contributed by Stephen O'Donnell.
This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 9661931 HDFS-14637. Namenode may not replicate blocks to meet the policy after enabling upgradeDomain. Contributed by Stephen O'Donnell.
9661931 is described below
commit 966193153f9cd75e009c8db3502e1b3ba2cdfa25
Author: Stephen O'Donnell <so...@cloudera.com>
AuthorDate: Thu Oct 3 22:12:27 2019 -0700
HDFS-14637. Namenode may not replicate blocks to meet the policy after enabling upgradeDomain. Contributed by Stephen O'Donnell.
Reviewed-by: Ayush Saxena <ay...@apache.org>
Signed-off-by: Wei-Chiu Chuang <we...@apache.org>
(cherry picked from commit c99a12167ff9566012ef32104a3964887d62c899)
Conflicts:
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java
hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-infra/src/main/java/org/apache/hadoop/tools/dynamometer/BlockPlacementPolicyAlwaysSatisfied.java
---
.../hdfs/server/blockmanagement/BlockManager.java | 69 ++++++++++----
.../blockmanagement/BlockPlacementStatus.java | 8 ++
.../BlockPlacementStatusDefault.java | 8 ++
.../BlockPlacementStatusWithNodeGroup.java | 11 +++
.../BlockPlacementStatusWithUpgradeDomain.java | 22 ++++-
.../java/org/apache/hadoop/hdfs/DFSTestUtil.java | 17 +++-
.../blockmanagement/BlockManagerTestUtil.java | 30 +++++-
.../TestBlockPlacementStatusDefault.java | 57 +++++++++++
.../TestBlockPlacementStatusWithUpgradeDomain.java | 59 +++++++++++-
.../TestBlocksWithNotEnoughRacks.java | 106 ++++++++++++++++++++-
10 files changed, 357 insertions(+), 30 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 929e9b2..55d06a6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -2013,6 +2013,7 @@ public class BlockManager implements BlockStatsMXBean {
(pendingReplicaNum > 0 || isPlacementPolicySatisfied(block));
}
+ @VisibleForTesting
BlockReconstructionWork scheduleReconstruction(BlockInfo block,
int priority) {
// skip abandoned block or block reopened for append
@@ -2057,7 +2058,9 @@ public class BlockManager implements BlockStatsMXBean {
additionalReplRequired = requiredRedundancy - numReplicas.liveReplicas()
- pendingNum;
} else {
- additionalReplRequired = 1; // Needed on a new rack
+ // Violates placement policy. Needed on a new rack or domain etc.
+ BlockPlacementStatus placementStatus = getBlockPlacementStatus(block);
+ additionalReplRequired = placementStatus.getAdditionalReplicasRequired();
}
final BlockCollection bc = getBlockCollection(block);
@@ -2090,20 +2093,6 @@ public class BlockManager implements BlockStatsMXBean {
}
}
- private boolean isInNewRack(DatanodeDescriptor[] srcs,
- DatanodeDescriptor target) {
- LOG.debug("check if target {} increases racks, srcs={}", target,
- Arrays.asList(srcs));
- for (DatanodeDescriptor src : srcs) {
- if (!src.isDecommissionInProgress() &&
- src.getNetworkLocation().equals(target.getNetworkLocation())) {
- LOG.debug("the target {} is in the same rack with src {}", target, src);
- return false;
- }
- }
- return true;
- }
-
private boolean validateReconstructionWork(BlockReconstructionWork rw) {
BlockInfo block = rw.getBlock();
int priority = rw.getPriority();
@@ -2129,10 +2118,16 @@ public class BlockManager implements BlockStatsMXBean {
}
DatanodeStorageInfo[] targets = rw.getTargets();
+ BlockPlacementStatus placementStatus = getBlockPlacementStatus(block);
if ((numReplicas.liveReplicas() >= requiredRedundancy) &&
- (!isPlacementPolicySatisfied(block)) ) {
- if (!isInNewRack(rw.getSrcNodes(), targets[0].getDatanodeDescriptor())) {
- // No use continuing, unless a new rack in this case
+ (!placementStatus.isPlacementPolicySatisfied())) {
+ BlockPlacementStatus newPlacementStatus =
+ getBlockPlacementStatus(block, targets);
+ if (!newPlacementStatus.isPlacementPolicySatisfied() &&
+ (newPlacementStatus.getAdditionalReplicasRequired() >=
+ placementStatus.getAdditionalReplicasRequired())) {
+ // If the new targets do not meet the placement policy, or at least
+ // reduce the number of replicas needed, then no use continuing.
return false;
}
// mark that the reconstruction work is to replicate internal block to a
@@ -4526,7 +4521,25 @@ public class BlockManager implements BlockStatsMXBean {
}
boolean isPlacementPolicySatisfied(BlockInfo storedBlock) {
+ return getBlockPlacementStatus(storedBlock, null)
+ .isPlacementPolicySatisfied();
+ }
+
+ BlockPlacementStatus getBlockPlacementStatus(BlockInfo storedBlock) {
+ return getBlockPlacementStatus(storedBlock, null);
+ }
+
+ BlockPlacementStatus getBlockPlacementStatus(BlockInfo storedBlock,
+ DatanodeStorageInfo[] additionalStorage) {
List<DatanodeDescriptor> liveNodes = new ArrayList<>();
+ if (additionalStorage != null) {
+ // additionalNodes, are potential new targets for the block. If there are
+ // any passed, include them when checking the placement policy to see if
+ // the policy is met, when it may not have been met without these nodes.
+ for (DatanodeStorageInfo s : additionalStorage) {
+ liveNodes.add(getDatanodeDescriptorFromStorage(s));
+ }
+ }
Collection<DatanodeDescriptor> corruptNodes = corruptReplicas
.getNodes(storedBlock);
for (DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock)) {
@@ -4534,7 +4547,22 @@ public class BlockManager implements BlockStatsMXBean {
&& storage.getState() == State.NORMAL) {
// assume the policy is satisfied for blocks on PROVIDED storage
// as long as the storage is in normal state.
- return true;
+ return new BlockPlacementStatus() {
+ @Override
+ public boolean isPlacementPolicySatisfied() {
+ return true;
+ }
+
+ @Override
+ public String getErrorDescription() {
+ return null;
+ }
+
+ @Override
+ public int getAdditionalReplicasRequired() {
+ return 0;
+ }
+ };
}
final DatanodeDescriptor cur = getDatanodeDescriptorFromStorage(storage);
// Nodes under maintenance should be counted as valid replicas from
@@ -4550,8 +4578,7 @@ public class BlockManager implements BlockStatsMXBean {
.getPolicy(blockType);
int numReplicas = blockType == STRIPED ? ((BlockInfoStriped) storedBlock)
.getRealTotalBlockNum() : storedBlock.getReplication();
- return placementPolicy.verifyBlockPlacement(locs, numReplicas)
- .isPlacementPolicySatisfied();
+ return placementPolicy.verifyBlockPlacement(locs, numReplicas);
}
boolean isNeededReconstructionForMaintenance(BlockInfo storedBlock,
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatus.java
index e2ac54a..a227666 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatus.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatus.java
@@ -39,4 +39,12 @@ public interface BlockPlacementStatus {
*/
public String getErrorDescription();
+ /**
+ * Return the number of additional replicas needed to ensure the block
+ * placement policy is satisfied.
+ * @return The number of new replicas needed to satisify the placement policy
+ * or zero if no extra are needed
+ */
+ int getAdditionalReplicasRequired();
+
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusDefault.java
index 75bb65d..7612142 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusDefault.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusDefault.java
@@ -45,4 +45,12 @@ public class BlockPlacementStatusDefault implements BlockPlacementStatus {
" more rack(s). Total number of racks in the cluster: " + totalRacks;
}
+ @Override
+ public int getAdditionalReplicasRequired() {
+ if (isPlacementPolicySatisfied()) {
+ return 0;
+ } else {
+ return requiredRacks - currentRacks;
+ }
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithNodeGroup.java
index b98b3da..ac5a5b5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithNodeGroup.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithNodeGroup.java
@@ -78,4 +78,15 @@ public class BlockPlacementStatusWithNodeGroup implements BlockPlacementStatus {
}
return errorDescription.toString();
}
+
+ @Override
+ public int getAdditionalReplicasRequired() {
+ if (isPlacementPolicySatisfied()) {
+ return 0;
+ } else {
+ int parent = parentBlockPlacementStatus.getAdditionalReplicasRequired();
+ int child = requiredNodeGroups - currentNodeGroups.size();
+ return Math.max(parent, child);
+ }
+ }
}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithUpgradeDomain.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithUpgradeDomain.java
index 4b3c3cc..b839ced 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithUpgradeDomain.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithUpgradeDomain.java
@@ -85,4 +85,24 @@ public class BlockPlacementStatusWithUpgradeDomain implements
}
return errorDescription.toString();
}
-}
\ No newline at end of file
+
+ @Override
+ public int getAdditionalReplicasRequired() {
+ if (isPlacementPolicySatisfied()) {
+ return 0;
+ } else {
+ // It is possible for a block to have the correct number of upgrade
+ // domains, but only a single rack, or be on multiple racks, but only in
+ // one upgrade domain.
+ int parent = parentBlockPlacementStatus.getAdditionalReplicasRequired();
+ int child;
+
+ if (numberOfReplicas <= upgradeDomainFactor) {
+ child = numberOfReplicas - upgradeDomains.size();
+ } else {
+ child = upgradeDomainFactor - upgradeDomains.size();
+ }
+ return Math.max(parent, child);
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index e4458b6..2068e41 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -536,17 +536,24 @@ public class DFSTestUtil {
}
}
+ public static void waitForReplication(MiniDFSCluster cluster, ExtendedBlock b,
+ int racks, int replicas, int neededReplicas)
+ throws TimeoutException, InterruptedException {
+ waitForReplication(cluster, b, racks, replicas, neededReplicas, 0);
+ }
+
/*
* Wait up to 20s for the given block to be replicated across
* the requested number of racks, with the requested number of
* replicas, and the requested number of replicas still needed.
*/
public static void waitForReplication(MiniDFSCluster cluster, ExtendedBlock b,
- int racks, int replicas, int neededReplicas)
+ int racks, int replicas, int neededReplicas, int neededDomains)
throws TimeoutException, InterruptedException {
int curRacks = 0;
int curReplicas = 0;
int curNeededReplicas = 0;
+ int curDomains = 0;
int count = 0;
final int ATTEMPTS = 20;
@@ -557,17 +564,21 @@ public class DFSTestUtil {
curRacks = r[0];
curReplicas = r[1];
curNeededReplicas = r[2];
+ curDomains = r[3];
count++;
} while ((curRacks != racks ||
curReplicas != replicas ||
- curNeededReplicas != neededReplicas) && count < ATTEMPTS);
+ curNeededReplicas != neededReplicas ||
+ (neededDomains != 0 && curDomains != neededDomains))
+ && count < ATTEMPTS);
if (count == ATTEMPTS) {
throw new TimeoutException("Timed out waiting for replication."
+ " Needed replicas = "+neededReplicas
+ " Cur needed replicas = "+curNeededReplicas
+ " Replicas = "+replicas+" Cur replicas = "+curReplicas
- + " Racks = "+racks+" Cur racks = "+curRacks);
+ + " Racks = "+racks+" Cur racks = "+curRacks
+ + " Domains = "+neededDomains+" Cur domains = "+curDomains);
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
index ae61f8c..4d6f202 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
@@ -81,7 +81,8 @@ public class BlockManagerTestUtil {
/**
* @return a tuple of the replica state (number racks, number live
- * replicas, and number needed replicas) for the given block.
+ * replicas, number needed replicas and number of UpgradeDomains) for the
+ * given block.
*/
public static int[] getReplicaInfo(final FSNamesystem namesystem, final Block b) {
final BlockManager bm = namesystem.getBlockManager();
@@ -90,7 +91,8 @@ public class BlockManagerTestUtil {
final BlockInfo storedBlock = bm.getStoredBlock(b);
return new int[]{getNumberOfRacks(bm, b),
bm.countNodes(storedBlock).liveReplicas(),
- bm.neededReconstruction.contains(storedBlock) ? 1 : 0};
+ bm.neededReconstruction.contains(storedBlock) ? 1 : 0,
+ getNumberOfDomains(bm, b)};
} finally {
namesystem.readUnlock();
}
@@ -121,6 +123,30 @@ public class BlockManagerTestUtil {
}
/**
+ * @return the number of UpgradeDomains over which a given block is replicated
+ * decommissioning/decommissioned nodes are not counted. corrupt replicas
+ * are also ignored.
+ */
+ private static int getNumberOfDomains(final BlockManager blockManager,
+ final Block b) {
+ final Set<String> domSet = new HashSet<String>(0);
+ final Collection<DatanodeDescriptor> corruptNodes =
+ getCorruptReplicas(blockManager).getNodes(b);
+ for(DatanodeStorageInfo storage : blockManager.blocksMap.getStorages(b)) {
+ final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
+ if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
+ if ((corruptNodes == null) || !corruptNodes.contains(cur)) {
+ String domain = cur.getUpgradeDomain();
+ if (domain != null && !domSet.contains(domain)) {
+ domSet.add(domain);
+ }
+ }
+ }
+ }
+ return domSet.size();
+ }
+
+ /**
* @return redundancy monitor thread instance from block manager.
*/
public static Daemon getRedundancyThread(final BlockManager blockManager) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementStatusDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementStatusDefault.java
new file mode 100644
index 0000000..6b07334
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementStatusDefault.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import org.junit.Test;
+
+/**
+ * Unit tests to validate the BlockPlacementStatusDefault policy, focusing on
+ * the getAdditionAlReplicasRequired method.
+ */
+public class TestBlockPlacementStatusDefault {
+
+ @Test
+ public void testIsPolicySatisfiedCorrectly() {
+ // 2 current racks and 2 expected
+ BlockPlacementStatusDefault bps =
+ new BlockPlacementStatusDefault(2, 2, 5);
+ assertTrue(bps.isPlacementPolicySatisfied());
+ assertEquals(0, bps.getAdditionalReplicasRequired());
+
+ // 1 current rack and 2 expected
+ bps =
+ new BlockPlacementStatusDefault(1, 2, 5);
+ assertFalse(bps.isPlacementPolicySatisfied());
+ assertEquals(1, bps.getAdditionalReplicasRequired());
+
+ // 3 current racks and 2 expected
+ bps =
+ new BlockPlacementStatusDefault(3, 2, 5);
+ assertTrue(bps.isPlacementPolicySatisfied());
+ assertEquals(0, bps.getAdditionalReplicasRequired());
+
+ // 1 current rack and 2 expected, but only 1 rack on the cluster
+ bps =
+ new BlockPlacementStatusDefault(1, 2, 1);
+ assertTrue(bps.isPlacementPolicySatisfied());
+ assertEquals(0, bps.getAdditionalReplicasRequired());
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementStatusWithUpgradeDomain.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementStatusWithUpgradeDomain.java
index bfff932..1e0fb76 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementStatusWithUpgradeDomain.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementStatusWithUpgradeDomain.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -49,11 +50,13 @@ public class TestBlockPlacementStatusWithUpgradeDomain {
@Test
public void testIsPolicySatisfiedParentFalse() {
when(bpsd.isPlacementPolicySatisfied()).thenReturn(false);
+ when(bpsd.getAdditionalReplicasRequired()).thenReturn(1);
BlockPlacementStatusWithUpgradeDomain bps =
new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 3, 3);
// Parent policy is not satisfied but upgrade domain policy is
assertFalse(bps.isPlacementPolicySatisfied());
+ assertEquals(1, bps.getAdditionalReplicasRequired());
}
@Test
@@ -63,21 +66,73 @@ public class TestBlockPlacementStatusWithUpgradeDomain {
// Number of domains, replicas and upgradeDomainFactor is equal and parent
// policy is satisfied
assertTrue(bps.isPlacementPolicySatisfied());
+ assertEquals(0, bps.getAdditionalReplicasRequired());
}
@Test
- public void testIsPolicySatisifedSmallDomains() {
+ public void testIsPolicySatisfiedSmallDomains() {
// Number of domains is less than replicas but equal to factor
BlockPlacementStatusWithUpgradeDomain bps =
new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 4, 3);
assertTrue(bps.isPlacementPolicySatisfied());
+ assertEquals(0, bps.getAdditionalReplicasRequired());
// Same as above but replicas is greater than factor
bps = new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 4, 2);
assertTrue(bps.isPlacementPolicySatisfied());
+ assertEquals(0, bps.getAdditionalReplicasRequired());
// Number of domains is less than replicas and factor
bps = new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 4, 4);
assertFalse(bps.isPlacementPolicySatisfied());
+ assertEquals(1, bps.getAdditionalReplicasRequired());
}
-}
\ No newline at end of file
+
+ @Test
+ public void testIsPolicySatisfiedSmallReplicas() {
+ // Replication factor 1 file
+ upgradeDomains.clear();
+ upgradeDomains.add("1");
+ BlockPlacementStatusWithUpgradeDomain bps =
+ new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 1, 3);
+ assertTrue(bps.isPlacementPolicySatisfied());
+ assertEquals(0, bps.getAdditionalReplicasRequired());
+
+ // Replication factor 2 file, but one domain
+ bps =
+ new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 2, 3);
+ assertFalse(bps.isPlacementPolicySatisfied());
+ assertEquals(1, bps.getAdditionalReplicasRequired());
+
+ // Replication factor 2 file, but two domains
+ upgradeDomains.add("2");
+ bps =
+ new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 2, 3);
+ assertTrue(bps.isPlacementPolicySatisfied());
+ assertEquals(0, bps.getAdditionalReplicasRequired());
+ }
+
+ @Test
+ public void testPolicyIsNotSatisfiedInsufficientDomains() {
+ // Insufficient Domains - 1 domain, replication factor 3
+ upgradeDomains.clear();
+ upgradeDomains.add("1");
+ BlockPlacementStatusWithUpgradeDomain bps =
+ new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 3, 3);
+ assertFalse(bps.isPlacementPolicySatisfied());
+ assertEquals(2, bps.getAdditionalReplicasRequired());
+
+ // One domain, replication factor 2 file
+ bps =
+ new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 2, 3);
+ assertFalse(bps.isPlacementPolicySatisfied());
+ assertEquals(1, bps.getAdditionalReplicasRequired());
+
+ // 2 domains, replication factor 3
+ upgradeDomains.add("2");
+ bps =
+ new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 3, 3);
+ assertFalse(bps.isPlacementPolicySatisfied());
+ assertEquals(1, bps.getAdditionalReplicasRequired());
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java
index 5e59443..b773b0c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java
@@ -21,8 +21,9 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertArrayEquals;
+import java.io.IOException;
import java.util.ArrayList;
-
+import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -43,6 +44,8 @@ import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Test;
import org.slf4j.event.Level;
+import static org.junit.Assert.*;
+
public class TestBlocksWithNotEnoughRacks {
public static final Logger LOG =
LoggerFactory.getLogger(TestBlocksWithNotEnoughRacks.class);
@@ -473,4 +476,105 @@ public class TestBlocksWithNotEnoughRacks {
hostsFileWriter.cleanup();
}
}
+
+ @Test
+ public void testMultipleReplicasScheduledForUpgradeDomain() throws Exception {
+ Configuration conf = getConf();
+ final short replicationFactor = 3;
+ final Path filePath = new Path("/testFile");
+
+ conf.set("dfs.block.replicator.classname",
+ "org.apache.hadoop.hdfs.server.blockmanagement." +
+ "BlockPlacementPolicyWithUpgradeDomain");
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(6).build();
+ cluster.waitClusterUp();
+
+ List<DatanodeDescriptor> dnDescriptors = getDnDescriptors(cluster);
+
+ try {
+ // Create a file with one block with a replication factor of 3
+ // No upgrade domains are set.
+ final FileSystem fs = cluster.getFileSystem();
+ DFSTestUtil.createFile(fs, filePath, 1L, replicationFactor, 1L);
+ ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, filePath);
+
+ BlockManager bm = cluster.getNamesystem().getBlockManager();
+ BlockInfo storedBlock = bm.getStoredBlock(b.getLocalBlock());
+
+ // The block should be replicated OK - so Reconstruction Work will be null
+ BlockReconstructionWork work = bm.scheduleReconstruction(storedBlock, 2);
+ assertNull(work);
+ // Set the upgradeDomain to "3" for the 3 nodes hosting the block.
+ // Then alternately set the remaining 3 nodes to have an upgradeDomain
+ // of 0 or 1 giving a total of 3 upgradeDomains.
+ for (int i=0; i<storedBlock.getReplication(); i++) {
+ storedBlock.getDatanode(i).setUpgradeDomain("3");
+ }
+ int udInd = 0;
+ for (DatanodeDescriptor d : dnDescriptors) {
+ if (d.getUpgradeDomain() == null) {
+ d.setUpgradeDomain(Integer.toString(udInd % 2));
+ udInd++;
+ }
+ }
+ // Now reconWork is non-null and 2 extra targets are needed
+ work = bm.scheduleReconstruction(storedBlock, 2);
+ assertEquals(2, work.getAdditionalReplRequired());
+
+ // Add the block to the replication queue and ensure it is replicated
+ // correctly.
+ bm.neededReconstruction.add(storedBlock, 3, 0, 0, replicationFactor);
+ DFSTestUtil.waitForReplication(cluster, b, 1, replicationFactor, 0, 3);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testUnderReplicatedRespectsRacksAndUpgradeDomain()
+ throws Exception {
+ Configuration conf = getConf();
+ final short replicationFactor = 3;
+ final Path filePath = new Path("/testFile");
+
+ conf.set("dfs.block.replicator.classname",
+ "org.apache.hadoop.hdfs.server.blockmanagement." +
+ "BlockPlacementPolicyWithUpgradeDomain");
+
+ // All hosts are on two racks
+ String[] racks = {"/r1", "/r1", "/r1", "/r2", "/r2", "/r2"};
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(6).racks(racks).build();
+ cluster.waitClusterUp();
+ List<DatanodeDescriptor> dnDescriptors = getDnDescriptors(cluster);
+ for (int i=0; i < dnDescriptors.size(); i++) {
+ dnDescriptors.get(i).setUpgradeDomain(Integer.toString(i%3));
+ }
+ try {
+ final FileSystem fs = cluster.getFileSystem();
+ DFSTestUtil.createFile(fs, filePath, 1L, (short)1, 1L);
+ ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, filePath);
+ fs.setReplication(filePath, replicationFactor);
+ DFSTestUtil.waitForReplication(cluster, b, 2, replicationFactor, 0, 3);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ private List<DatanodeDescriptor> getDnDescriptors(MiniDFSCluster cluster)
+ throws IOException {
+ List<DatanodeDescriptor> dnDesc = new ArrayList<>();
+ DatanodeManager dnManager = cluster.getNamesystem().getBlockManager()
+ .getDatanodeManager();
+ for (DataNode dn : cluster.getDataNodes()) {
+ DatanodeDescriptor d = dnManager.getDatanode(dn.getDatanodeUuid());
+ if (d == null) {
+ throw new IOException("DatanodeDescriptor not found for DN "+
+ dn.getDatanodeUuid());
+ }
+ dnDesc.add(d);
+ }
+ return dnDesc;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org