You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2016/02/22 20:47:27 UTC
[08/50] [abbrv] hadoop git commit: HDFS-9456.
BlockPlacementPolicyWithNodeGroup should override verifyBlockPlacement().
Contributed by Xiaobing Zhou.
HDFS-9456. BlockPlacementPolicyWithNodeGroup should override verifyBlockPlacement(). Contributed by Xiaobing Zhou.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/77ba5add
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/77ba5add
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/77ba5add
Branch: refs/heads/HDFS-1312
Commit: 77ba5add0d9cb10d45ca9122bca48baa7c8fb3b8
Parents: 4b0e59f
Author: Junping Du <ju...@apache.org>
Authored: Tue Feb 16 18:55:55 2016 -0800
Committer: Junping Du <ju...@apache.org>
Committed: Tue Feb 16 18:55:55 2016 -0800
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../BlockPlacementPolicyWithNodeGroup.java | 46 +++++++++
.../BlockPlacementStatusWithNodeGroup.java | 81 +++++++++++++++
.../TestReplicationPolicyWithNodeGroup.java | 100 +++++++++++++++++++
4 files changed, 230 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/77ba5add/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index e9160c9..0b220bf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -2768,6 +2768,9 @@ Release 2.8.0 - UNRELEASED
HDFS-9801. ReconfigurableBase should update the cached configuration.
(Arpit Agarwal)
+ HDFS-9456. BlockPlacementPolicyWithNodeGroup should override
+ verifyBlockPlacement(). (Xiaobing Zhou via junping_du)
+
Release 2.7.3 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/77ba5add/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
index c62d977..194f6ba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
@@ -389,4 +389,50 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
}
return true;
}
+
+
+ @Override
+ public BlockPlacementStatus verifyBlockPlacement(DatanodeInfo[] locs,
+ int numberOfReplicas) {
+ if (locs == null) {
+ locs = DatanodeDescriptor.EMPTY_ARRAY;
+ }
+
+ List<String> locList = new ArrayList<String>();
+ /*
+ * remove the part of node group for BlockPlacementPolicyDefault to count
+ * distinct racks, e.g. "/d1/r1/n1" --> "/d1/r1"
+ */
+ for (int i = 0; i < locs.length; i++) {
+ locList.add(locs[i].getNetworkLocation());
+ locs[i].setNetworkLocation(NetworkTopology.getFirstHalf(locs[i]
+ .getNetworkLocation()));
+ }
+
+ BlockPlacementStatus defaultStatus = super.verifyBlockPlacement(locs,
+ numberOfReplicas);
+
+ // restore the part of node group back
+ for (int i = 0; i < locs.length; i++) {
+ locs[i].setNetworkLocation(locList.get(i));
+ }
+
+ int minNodeGroups = numberOfReplicas;
+ BlockPlacementStatusWithNodeGroup nodeGroupStatus =
+ new BlockPlacementStatusWithNodeGroup(
+ defaultStatus, getNodeGroupsFromNode(locs), minNodeGroups);
+ return nodeGroupStatus;
+ }
+
+ private Set<String> getNodeGroupsFromNode(DatanodeInfo[] nodes) {
+ Set<String> nodeGroups = new HashSet<>();
+ if (nodes == null) {
+ return nodeGroups;
+ }
+
+ for (DatanodeInfo node : nodes) {
+ nodeGroups.add(NetworkTopology.getLastHalf(node.getNetworkLocation()));
+ }
+ return nodeGroups;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/77ba5add/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithNodeGroup.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithNodeGroup.java
new file mode 100644
index 0000000..b98b3da
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithNodeGroup.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * An implementation of @see BlockPlacementStatus for
+ * @see BlockPlacementPolicyWithNodeGroup
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BlockPlacementStatusWithNodeGroup implements BlockPlacementStatus {
+
+ private final BlockPlacementStatus parentBlockPlacementStatus;
+ private final Set<String> currentNodeGroups;
+ private final int requiredNodeGroups;
+
+ /**
+ * @param parentBlockPlacementStatus the parent class' status
+ * @param currentNodeGroups the current set of node groups of the replicas
+ * @param requiredNodeGroups the number of required node groups
+ */
+ public BlockPlacementStatusWithNodeGroup(
+ BlockPlacementStatus parentBlockPlacementStatus,
+ Set<String> currentNodeGroups, int requiredNodeGroups) {
+ this.parentBlockPlacementStatus = parentBlockPlacementStatus;
+ this.currentNodeGroups = currentNodeGroups;
+ this.requiredNodeGroups = requiredNodeGroups;
+ }
+
+ @Override
+ public boolean isPlacementPolicySatisfied() {
+ return parentBlockPlacementStatus.isPlacementPolicySatisfied()
+ && isNodeGroupPolicySatisfied();
+ }
+
+ private boolean isNodeGroupPolicySatisfied() {
+ return requiredNodeGroups <= currentNodeGroups.size();
+ }
+
+ @Override
+ public String getErrorDescription() {
+ if (isPlacementPolicySatisfied()) {
+ return null;
+ }
+
+ StringBuilder errorDescription = new StringBuilder();
+ if (!parentBlockPlacementStatus.isPlacementPolicySatisfied()) {
+ errorDescription.append(parentBlockPlacementStatus.getErrorDescription());
+ }
+
+ if (!isNodeGroupPolicySatisfied()) {
+ if (errorDescription.length() != 0) {
+ errorDescription.append(" ");
+ }
+ errorDescription.append("The block has " + requiredNodeGroups
+ + " replicas. But it only has " + currentNodeGroups.size()
+ + " node groups " + currentNodeGroups + ".");
+ }
+ return errorDescription.toString();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/77ba5add/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java
index 7a00a3b..edcab10 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java
@@ -34,6 +34,9 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.NetworkTopologyWithNodeGroup;
@@ -130,6 +133,103 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
};
/**
+ * Test block placement verification.
+ * @throws Exception
+ */
+ @Test
+ public void testVerifyBlockPlacement() throws Exception {
+ LocatedBlock locatedBlock;
+ BlockPlacementStatus status;
+ ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L));
+ List<DatanodeStorageInfo> set = new ArrayList<>();
+
+ // 2 node groups (not enough), 2 racks (enough)
+ set.clear();
+ set.add(storages[0]);
+ set.add(storages[1]);
+ set.add(storages[4]);
+ locatedBlock = BlockManager.newLocatedBlock(b,
+ set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
+ status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
+ set.size());
+ assertFalse(status.isPlacementPolicySatisfied());
+
+ // 3 node groups (enough), 2 racks (enough)
+ set.clear();
+ set.add(storages[0]);
+ set.add(storages[2]);
+ set.add(storages[5]);
+ locatedBlock = BlockManager.newLocatedBlock(b,
+ set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
+ status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
+ set.size());
+ assertTrue(status.isPlacementPolicySatisfied());
+
+ // 2 node groups (not enough), 1 rack (not enough)
+ set.clear();
+ set.add(storages[0]);
+ set.add(storages[1]);
+ set.add(storages[2]);
+ locatedBlock = BlockManager.newLocatedBlock(b,
+ set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
+ status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
+ set.size());
+ assertFalse(status.isPlacementPolicySatisfied());
+ assertTrue(status.getErrorDescription().contains("node group"));
+ assertTrue(status.getErrorDescription().contains("more rack(s)"));
+
+ // 3 node groups (enough), 3 racks (enough)
+ set.clear();
+ set.add(storages[0]);
+ set.add(storages[5]);
+ set.add(storages[7]);
+ locatedBlock = BlockManager.newLocatedBlock(b,
+ set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
+ status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
+ set.size());
+ assertTrue(status.isPlacementPolicySatisfied());
+
+ // 3 node groups (not enough), 3 racks (enough), 4 replicas
+ set.clear();
+ set.add(storages[0]);
+ set.add(storages[1]);
+ set.add(storages[5]);
+ set.add(storages[7]);
+ locatedBlock = BlockManager.newLocatedBlock(b,
+ set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
+ status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
+ set.size());
+ assertFalse(status.isPlacementPolicySatisfied());
+ assertTrue(status.getErrorDescription().contains("node group"));
+ assertFalse(status.getErrorDescription().contains("more rack(s)"));
+
+ // 2 node groups (not enough), 1 rack (not enough)
+ set.clear();
+ set.add(storages[0]);
+ set.add(storages[1]);
+ set.add(storages[2]);
+ locatedBlock = BlockManager.newLocatedBlock(b,
+ set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
+ status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
+ set.size());
+ assertFalse(status.isPlacementPolicySatisfied());
+ assertTrue(status.getErrorDescription().contains("node group"));
+ assertTrue(status.getErrorDescription().contains("more rack(s)"));
+
+ // 1 node group (not enough), 1 rack (not enough)
+ set.clear();
+ set.add(storages[0]);
+ set.add(storages[1]);
+ locatedBlock = BlockManager.newLocatedBlock(b,
+ set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
+ status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
+ set.size());
+ assertFalse(status.isPlacementPolicySatisfied());
+ assertTrue(status.getErrorDescription().contains("node group"));
+ assertTrue(status.getErrorDescription().contains("more rack(s)"));
+ }
+
+ /**
* Scan the targets list: all targets should be on different NodeGroups.
* Return false if two targets are found on the same NodeGroup.
*/