You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2016/02/22 20:47:27 UTC

[08/50] [abbrv] hadoop git commit: HDFS-9456. BlockPlacementPolicyWithNodeGroup should override verifyBlockPlacement(). Contributed by Xiaobing Zhou.

HDFS-9456. BlockPlacementPolicyWithNodeGroup should override verifyBlockPlacement(). Contributed by Xiaobing Zhou.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/77ba5add
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/77ba5add
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/77ba5add

Branch: refs/heads/HDFS-1312
Commit: 77ba5add0d9cb10d45ca9122bca48baa7c8fb3b8
Parents: 4b0e59f
Author: Junping Du <ju...@apache.org>
Authored: Tue Feb 16 18:55:55 2016 -0800
Committer: Junping Du <ju...@apache.org>
Committed: Tue Feb 16 18:55:55 2016 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../BlockPlacementPolicyWithNodeGroup.java      |  46 +++++++++
 .../BlockPlacementStatusWithNodeGroup.java      |  81 +++++++++++++++
 .../TestReplicationPolicyWithNodeGroup.java     | 100 +++++++++++++++++++
 4 files changed, 230 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/77ba5add/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index e9160c9..0b220bf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -2768,6 +2768,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-9801. ReconfigurableBase should update the cached configuration.
     (Arpit Agarwal)
 
+    HDFS-9456. BlockPlacementPolicyWithNodeGroup should override 
+    verifyBlockPlacement(). (Xiaobing Zhou via junping_du)
+
 Release 2.7.3 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77ba5add/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
index c62d977..194f6ba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
@@ -389,4 +389,50 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
     }
     return true;
   }
+
+
+  @Override
+  public BlockPlacementStatus verifyBlockPlacement(DatanodeInfo[] locs,
+      int numberOfReplicas) {
+    if (locs == null) {
+      locs = DatanodeDescriptor.EMPTY_ARRAY;
+    }
+
+    List<String> locList = new ArrayList<String>();
+    /*
+     * remove the part of node group for BlockPlacementPolicyDefault to count
+     * distinct racks, e.g. "/d1/r1/n1" --> "/d1/r1"
+     */
+    for (int i = 0; i < locs.length; i++) {
+      locList.add(locs[i].getNetworkLocation());
+      locs[i].setNetworkLocation(NetworkTopology.getFirstHalf(locs[i]
+          .getNetworkLocation()));
+    }
+
+    BlockPlacementStatus defaultStatus = super.verifyBlockPlacement(locs,
+        numberOfReplicas);
+
+    // restore the part of node group back
+    for (int i = 0; i < locs.length; i++) {
+      locs[i].setNetworkLocation(locList.get(i));
+    }
+
+    int minNodeGroups = numberOfReplicas;
+    BlockPlacementStatusWithNodeGroup nodeGroupStatus =
+        new BlockPlacementStatusWithNodeGroup(
+            defaultStatus, getNodeGroupsFromNode(locs), minNodeGroups);
+    return nodeGroupStatus;
+  }
+
+  private Set<String> getNodeGroupsFromNode(DatanodeInfo[] nodes) {
+    Set<String> nodeGroups = new HashSet<>();
+    if (nodes == null) {
+      return nodeGroups;
+    }
+
+    for (DatanodeInfo node : nodes) {
+      nodeGroups.add(NetworkTopology.getLastHalf(node.getNetworkLocation()));
+    }
+    return nodeGroups;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77ba5add/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithNodeGroup.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithNodeGroup.java
new file mode 100644
index 0000000..b98b3da
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithNodeGroup.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * An implementation of @see BlockPlacementStatus for
+ * @see BlockPlacementPolicyWithNodeGroup
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BlockPlacementStatusWithNodeGroup implements BlockPlacementStatus {
+
+  private final BlockPlacementStatus parentBlockPlacementStatus;
+  private final Set<String> currentNodeGroups;
+  private final int requiredNodeGroups;
+
+  /**
+   * @param parentBlockPlacementStatus the parent class' status
+   * @param currentNodeGroups the current set of node groups of the replicas
+   * @param requiredNodeGroups the number of required node groups
+   */
+  public BlockPlacementStatusWithNodeGroup(
+      BlockPlacementStatus parentBlockPlacementStatus,
+      Set<String> currentNodeGroups, int requiredNodeGroups) {
+    this.parentBlockPlacementStatus = parentBlockPlacementStatus;
+    this.currentNodeGroups = currentNodeGroups;
+    this.requiredNodeGroups = requiredNodeGroups;
+  }
+
+  @Override
+  public boolean isPlacementPolicySatisfied() {
+    return parentBlockPlacementStatus.isPlacementPolicySatisfied()
+        && isNodeGroupPolicySatisfied();
+  }
+
+  private boolean isNodeGroupPolicySatisfied() {
+    return requiredNodeGroups <= currentNodeGroups.size();
+  }
+
+  @Override
+  public String getErrorDescription() {
+    if (isPlacementPolicySatisfied()) {
+      return null;
+    }
+
+    StringBuilder errorDescription = new StringBuilder();
+    if (!parentBlockPlacementStatus.isPlacementPolicySatisfied()) {
+      errorDescription.append(parentBlockPlacementStatus.getErrorDescription());
+    }
+
+    if (!isNodeGroupPolicySatisfied()) {
+      if (errorDescription.length() != 0) {
+        errorDescription.append(" ");
+      }
+      errorDescription.append("The block has " + requiredNodeGroups
+          + " replicas. But it only has " + currentNodeGroups.size()
+          + " node groups " + currentNodeGroups + ".");
+    }
+    return errorDescription.toString();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77ba5add/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java
index 7a00a3b..edcab10 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java
@@ -34,6 +34,9 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.NetworkTopologyWithNodeGroup;
@@ -130,6 +133,103 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
   };
 
   /**
+   * Test block placement verification.
+   * @throws Exception
+   */
+  @Test
+  public void testVerifyBlockPlacement() throws Exception {
+    LocatedBlock locatedBlock;
+    BlockPlacementStatus status;
+    ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L));
+    List<DatanodeStorageInfo> set = new ArrayList<>();
+
+    // 2 node groups (not enough), 2 racks (enough)
+    set.clear();
+    set.add(storages[0]);
+    set.add(storages[1]);
+    set.add(storages[4]);
+    locatedBlock = BlockManager.newLocatedBlock(b,
+        set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
+    status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
+        set.size());
+    assertFalse(status.isPlacementPolicySatisfied());
+
+    // 3 node groups (enough), 2 racks (enough)
+    set.clear();
+    set.add(storages[0]);
+    set.add(storages[2]);
+    set.add(storages[5]);
+    locatedBlock = BlockManager.newLocatedBlock(b,
+        set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
+    status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
+        set.size());
+    assertTrue(status.isPlacementPolicySatisfied());
+
+    // 2 node groups (not enough), 1 rack (not enough)
+    set.clear();
+    set.add(storages[0]);
+    set.add(storages[1]);
+    set.add(storages[2]);
+    locatedBlock = BlockManager.newLocatedBlock(b,
+        set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
+    status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
+        set.size());
+    assertFalse(status.isPlacementPolicySatisfied());
+    assertTrue(status.getErrorDescription().contains("node group"));
+    assertTrue(status.getErrorDescription().contains("more rack(s)"));
+
+    // 3 node groups (enough), 3 racks (enough)
+    set.clear();
+    set.add(storages[0]);
+    set.add(storages[5]);
+    set.add(storages[7]);
+    locatedBlock = BlockManager.newLocatedBlock(b,
+        set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
+    status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
+        set.size());
+    assertTrue(status.isPlacementPolicySatisfied());
+
+    // 3 node groups (not enough), 3 racks (enough), 4 replicas
+    set.clear();
+    set.add(storages[0]);
+    set.add(storages[1]);
+    set.add(storages[5]);
+    set.add(storages[7]);
+    locatedBlock = BlockManager.newLocatedBlock(b,
+        set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
+    status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
+        set.size());
+    assertFalse(status.isPlacementPolicySatisfied());
+    assertTrue(status.getErrorDescription().contains("node group"));
+    assertFalse(status.getErrorDescription().contains("more rack(s)"));
+
+    // 2 node groups (not enough), 1 rack (not enough)
+    set.clear();
+    set.add(storages[0]);
+    set.add(storages[1]);
+    set.add(storages[2]);
+    locatedBlock = BlockManager.newLocatedBlock(b,
+        set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
+    status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
+        set.size());
+    assertFalse(status.isPlacementPolicySatisfied());
+    assertTrue(status.getErrorDescription().contains("node group"));
+    assertTrue(status.getErrorDescription().contains("more rack(s)"));
+
+    // 1 node group (not enough), 1 rack (not enough)
+    set.clear();
+    set.add(storages[0]);
+    set.add(storages[1]);
+    locatedBlock = BlockManager.newLocatedBlock(b,
+        set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
+    status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
+        set.size());
+    assertFalse(status.isPlacementPolicySatisfied());
+    assertTrue(status.getErrorDescription().contains("node group"));
+    assertTrue(status.getErrorDescription().contains("more rack(s)"));
+  }
+
+  /**
    * Scan the targets list: all targets should be on different NodeGroups.
    * Return false if two targets are found on the same NodeGroup.
    */