You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2012/11/17 04:25:21 UTC
svn commit: r1410690 - in /hadoop/common/branches/branch-1: ./
src/core/org/apache/hadoop/net/
src/hdfs/org/apache/hadoop/hdfs/server/namenode/
src/test/org/apache/hadoop/hdfs/
src/test/org/apache/hadoop/hdfs/server/namenode/
Author: szetszwo
Date: Sat Nov 17 03:25:18 2012
New Revision: 1410690
URL: http://svn.apache.org/viewvc?rev=1410690&view=rev
Log:
HDFS-3941. Backport HDFS-3498 and HDFS-3601: Support replica removal in BlockPlacementPolicy and make BlockPlacementPolicyDefault extensible for reusing code in subclasses, and add BlockPlacementPolicyWithNodeGroup to support block placement with 4-layer network topology. Contributed by Junping Du and Jing Zhao
Added:
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyWithNodeGroup.java
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicyWithNodeGroup.java
Modified:
hadoop/common/branches/branch-1/CHANGES.txt
hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/DFSTestUtil.java
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java
Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1410690&r1=1410689&r2=1410690&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Sat Nov 17 03:25:18 2012
@@ -36,6 +36,12 @@ Release 1.2.0 - unreleased
class pluggable and add NetworkTopologyWithNodeGroup, a 4-layer
implementation of NetworkTopology. (Junping Du and Jing Zhao via szetszwo)
+ HDFS-3941. Backport HDFS-3498 and HDFS-3601: Support replica removal in
+ BlockPlacementPolicy and make BlockPlacementPolicyDefault extensible for
+ reusing code in subclasses, and add BlockPlacementPolicyWithNodeGroup to
+ support block placement with 4-layer network topology. (Junping Du and
+ Jing Zhao via szetszwo)
+
IMPROVEMENTS
HDFS-3515. Port HDFS-1457 to branch-1. (eli)
Modified: hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java?rev=1410690&r1=1410689&r2=1410690&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java (original)
+++ hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java Sat Nov 17 03:25:18 2012
@@ -732,6 +732,31 @@ public class NetworkTopology {
}
return tree.toString();
}
+
+ /**
+ * Divide networklocation string into two parts by last separator, and get the
+ * first part here.
+ *
+ * @param networkLocation
+ * @return
+ */
+ public static String getFirstHalf(String networkLocation) {
+ int index = networkLocation.lastIndexOf(NodeBase.PATH_SEPARATOR_STR);
+ return networkLocation.substring(0, index);
+ }
+
+ /**
+ * Divide networklocation string into two parts by last separator, and get the
+ * second part here.
+ *
+ * @param networkLocation
+ * @return
+ */
+ public static String getLastHalf(String networkLocation) {
+ int index = networkLocation.lastIndexOf(NodeBase.PATH_SEPARATOR_STR);
+ return networkLocation.substring(index);
+ }
+
/* swap two array items */
static protected void swap(Node[] nodes, int i, int j) {
Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java?rev=1410690&r1=1410689&r2=1410690&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java (original)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java Sat Nov 17 03:25:18 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.na
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
@@ -215,5 +216,80 @@ public abstract class BlockPlacementPoli
excludedNodes,
blocksize);
}
+
+ /**
+ * Adjust rackmap, moreThanOne, and exactlyOne after removing replica on cur.
+ *
+ * @param rackMap a map from rack to replica
+ * @param moreThanOne The List of replica nodes on rack which has more than
+ * one replica
+ * @param exactlyOne The List of replica nodes on rack with only one replica
+ * @param cur current replica to remove
+ */
+ public void adjustSetsWithChosenReplica(final Map<String,
+ List<DatanodeDescriptor>> rackMap,
+ final List<DatanodeDescriptor> moreThanOne,
+ final List<DatanodeDescriptor> exactlyOne, final DatanodeInfo cur) {
+
+ String rack = getRack(cur);
+ final List<DatanodeDescriptor> datanodes = rackMap.get(rack);
+ datanodes.remove(cur);
+ if (datanodes.isEmpty()) {
+ rackMap.remove(rack);
+ }
+ if (moreThanOne.remove(cur)) {
+ if (datanodes.size() == 1) {
+ moreThanOne.remove(datanodes.get(0));
+ exactlyOne.add(datanodes.get(0));
+ }
+ } else {
+ exactlyOne.remove(cur);
+ }
+ }
+
+ /**
+ * Get rack string from a data node
+ * @param datanode
+ * @return rack of data node
+ */
+ protected String getRack(final DatanodeInfo datanode) {
+ return datanode.getNetworkLocation();
+ }
+
+ /**
+ * Split data nodes into two sets, one set includes nodes on rack with
+ * more than one replica, the other set contains the remaining nodes.
+ *
+ * @param dataNodes
+ * @param rackMap a map from rack to datanodes
+ * @param moreThanOne contains nodes on rack with more than one replica
+ * @param exactlyOne remains contains the remaining nodes
+ */
+ public void splitNodesWithRack(
+ Collection<DatanodeDescriptor> dataNodes,
+ final Map<String, List<DatanodeDescriptor>> rackMap,
+ final List<DatanodeDescriptor> moreThanOne,
+ final List<DatanodeDescriptor> exactlyOne) {
+ for(DatanodeDescriptor node : dataNodes) {
+ final String rackName = getRack(node);
+ List<DatanodeDescriptor> datanodeList = rackMap.get(rackName);
+ if (datanodeList == null) {
+ datanodeList = new ArrayList<DatanodeDescriptor>();
+ rackMap.put(rackName, datanodeList);
+ }
+ datanodeList.add(node);
+ }
+
+ // split nodes into two sets
+ for(List<DatanodeDescriptor> datanodeList : rackMap.values()) {
+ if (datanodeList.size() == 1) {
+ // exactlyOne contains nodes on rack with only one replica
+ exactlyOne.add(datanodeList.get(0));
+ } else {
+ // moreThanOne contains nodes on rack with more than one replica
+ moreThanOne.addAll(datanodeList);
+ }
+ }
+ }
}
\ No newline at end of file
Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java?rev=1410690&r1=1410689&r2=1410690&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java Sat Nov 17 03:25:18 2012
@@ -47,8 +47,8 @@ import org.apache.hadoop.net.NodeBase;
*/
@InterfaceAudience.Private
public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
- private boolean considerLoad;
- private NetworkTopology clusterMap;
+ protected boolean considerLoad;
+ protected NetworkTopology clusterMap;
private FSClusterStats stats;
private long staleInterval; // interval used to identify stale DataNodes
@@ -132,6 +132,7 @@ public class BlockPlacementPolicyDefault
new ArrayList<DatanodeDescriptor>(chosenNodes);
for (Node node:chosenNodes) {
excludedNodes.put(node, node);
+ adjustExcludedNodes(excludedNodes, node);
}
if (!clusterMap.contains(writer)) {
@@ -225,7 +226,7 @@ public class BlockPlacementPolicyDefault
* choose a node on the same rack
* @return the chosen node
*/
- private DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine,
+ protected DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine,
HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
List<DatanodeDescriptor> results, boolean avoidStaleNodes)
throws NotEnoughReplicasException {
@@ -256,7 +257,7 @@ public class BlockPlacementPolicyDefault
* in the cluster.
* @return the chosen node
*/
- private DatanodeDescriptor chooseLocalRack(DatanodeDescriptor localMachine,
+ protected DatanodeDescriptor chooseLocalRack(DatanodeDescriptor localMachine,
HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
List<DatanodeDescriptor> results, boolean avoidStaleNodes)
throws NotEnoughReplicasException {
@@ -303,8 +304,7 @@ public class BlockPlacementPolicyDefault
* if not enough nodes are available, choose the remaining ones
* from the local rack
*/
-
- private void chooseRemoteRack(int numOfReplicas,
+ protected void chooseRemoteRack(int numOfReplicas,
DatanodeDescriptor localMachine,
HashMap<Node, Node> excludedNodes,
long blocksize,
@@ -328,7 +328,7 @@ public class BlockPlacementPolicyDefault
/* Randomly choose one target from <i>nodes</i>.
* @return the chosen node
*/
- private DatanodeDescriptor chooseRandom(String nodes,
+ protected DatanodeDescriptor chooseRandom(String nodes,
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
@@ -342,11 +342,12 @@ public class BlockPlacementPolicyDefault
(DatanodeDescriptor)(clusterMap.chooseRandom(nodes));
Node oldNode = excludedNodes.put(chosenNode, chosenNode);
- if (oldNode == null) { // choosendNode was not in the excluded list
+ if (oldNode == null) { // chosendNode was not in the excluded list
numOfAvailableNodes--;
if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results,
avoidStaleNodes)) {
results.add(chosenNode);
+ adjustExcludedNodes(excludedNodes, chosenNode);
return chosenNode;
}
}
@@ -358,7 +359,7 @@ public class BlockPlacementPolicyDefault
/* Randomly choose <i>numOfReplicas</i> targets from <i>nodes</i>.
*/
- private void chooseRandom(int numOfReplicas,
+ protected void chooseRandom(int numOfReplicas,
String nodes,
HashMap<Node, Node> excludedNodes,
long blocksize,
@@ -380,6 +381,7 @@ public class BlockPlacementPolicyDefault
avoidStaleNodes)) {
numOfReplicas--;
results.add(chosenNode);
+ adjustExcludedNodes(excludedNodes, chosenNode);
}
}
}
@@ -389,7 +391,22 @@ public class BlockPlacementPolicyDefault
"Not able to place enough replicas");
}
}
-
+
+ /**
+ * After choosing a node to place replica, adjust excluded nodes accordingly.
+ * It should do nothing here as chosenNode is already put into exlcudeNodes,
+ * but it can be overridden in subclass to put more related nodes into
+ * excludedNodes.
+ *
+ * @param excludedNodes
+ * @param chosenNode
+ * @return Number of nodes that should be added into the excludedNodes
+ */
+ protected void adjustExcludedNodes(HashMap<Node, Node> excludedNodes,
+ Node chosenNode) {
+ // do nothing
+ }
+
/* judge if a node is a good target.
* return true if <i>node</i> has enough space,
* does not have too much load, and the rack does not have too many nodes
@@ -417,7 +434,7 @@ public class BlockPlacementPolicyDefault
* does not have too much load,
* and the rack does not have too many nodes.
*/
- private boolean isGoodTarget(DatanodeDescriptor node,
+ protected boolean isGoodTarget(DatanodeDescriptor node,
long blockSize, int maxTargetPerLoc,
boolean considerLoad,
List<DatanodeDescriptor> results,
@@ -548,8 +565,7 @@ public class BlockPlacementPolicyDefault
// pick replica from the first Set. If first is empty, then pick replicas
// from second set.
- Iterator<DatanodeDescriptor> iter =
- first.isEmpty() ? second.iterator() : first.iterator();
+ Iterator<DatanodeDescriptor> iter = pickupReplicaSet(first, second);
// pick node with least free space
while (iter.hasNext() ) {
@@ -562,5 +578,19 @@ public class BlockPlacementPolicyDefault
}
return cur;
}
+
+ /**
+ * Pick up replica node set for deleting replica as over-replicated. First set
+ * contains replica nodes on rack with more than one replica while second set
+ * contains remaining replica nodes. So pick up first set if not empty. If
+ * first is empty, then pick second.
+ */
+ protected Iterator<DatanodeDescriptor> pickupReplicaSet(
+ Collection<DatanodeDescriptor> first,
+ Collection<DatanodeDescriptor> second) {
+ Iterator<DatanodeDescriptor> iter = first.isEmpty() ? second.iterator()
+ : first.iterator();
+ return iter;
+ }
}
Added: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyWithNodeGroup.java?rev=1410690&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyWithNodeGroup.java (added)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyWithNodeGroup.java Sat Nov 17 03:25:18 2012
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.NetworkTopologyWithNodeGroup;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
+
+/** The class is responsible for choosing the desired number of targets
+ * for placing block replicas on environment with node-group layer.
+ * The replica placement strategy is adjusted to:
+ * If the writer is on a datanode, the 1st replica is placed on the local
+ * node (or local node-group), otherwise a random datanode.
+ * The 2nd replica is placed on a datanode that is on a different rack with 1st
+ * replica node.
+ * The 3rd replica is placed on a datanode which is on a different node-group
+ * but the same rack as the second replica node.
+ */
+public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefault {
+
+ BlockPlacementPolicyWithNodeGroup(Configuration conf, FSClusterStats stats,
+ NetworkTopology clusterMap) {
+ initialize(conf, stats, clusterMap);
+ }
+
+ BlockPlacementPolicyWithNodeGroup() {
+ }
+
+ @Override
+ public void initialize(Configuration conf, FSClusterStats stats,
+ NetworkTopology clusterMap) {
+ super.initialize(conf, stats, clusterMap);
+ }
+
+ /** choose local node of localMachine as the target.
+ * if localMachine is not available, choose a node on the same nodegroup or
+ * rack instead.
+ * @return the chosen node
+ */
+ @Override
+ protected DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine,
+ HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
+ List<DatanodeDescriptor> results, boolean avoidStaleNodes)
+ throws NotEnoughReplicasException {
+ // if no local machine, randomly choose one node
+ if (localMachine == null)
+ return chooseRandom(NodeBase.ROOT, excludedNodes,
+ blocksize, maxNodesPerRack, results, avoidStaleNodes);
+
+ // otherwise try local machine first
+ Node oldNode = excludedNodes.put(localMachine, localMachine);
+ if (oldNode == null) { // was not in the excluded list
+ if (isGoodTarget(localMachine, blocksize,
+ maxNodesPerRack, false, results, avoidStaleNodes)) {
+ results.add(localMachine);
+ // Nodes under same nodegroup should be excluded.
+ addNodeGroupToExcludedNodes(excludedNodes,
+ localMachine.getNetworkLocation());
+ return localMachine;
+ }
+ }
+
+ // try a node on local node group
+ DatanodeDescriptor chosenNode = chooseLocalNodeGroup(
+ (NetworkTopologyWithNodeGroup)clusterMap, localMachine, excludedNodes,
+ blocksize, maxNodesPerRack, results, avoidStaleNodes);
+ if (chosenNode != null) {
+ return chosenNode;
+ }
+ // try a node on local rack
+ return chooseLocalRack(localMachine, excludedNodes,
+ blocksize, maxNodesPerRack, results, avoidStaleNodes);
+ }
+
+ @Override
+ protected void adjustExcludedNodes(HashMap<Node, Node> excludedNodes,
+ Node chosenNode) {
+ // as node-group aware implementation, it should make sure no two replica
+ // are placing on the same node group.
+ addNodeGroupToExcludedNodes(excludedNodes, chosenNode.getNetworkLocation());
+ }
+
+ // add all nodes under specific nodegroup to excludedNodes.
+ private void addNodeGroupToExcludedNodes(HashMap<Node, Node> excludedNodes,
+ String nodeGroup) {
+ List<Node> leafNodes = clusterMap.getLeaves(nodeGroup);
+ for (Node node : leafNodes) {
+ excludedNodes.put(node, node);
+ }
+ }
+
+ @Override
+ protected DatanodeDescriptor chooseLocalRack(DatanodeDescriptor localMachine,
+ HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
+ List<DatanodeDescriptor> results, boolean avoidStaleNodes)
+ throws NotEnoughReplicasException {
+ // no local machine, so choose a random machine
+ if (localMachine == null) {
+ return chooseRandom(NodeBase.ROOT, excludedNodes,
+ blocksize, maxNodesPerRack, results,
+ avoidStaleNodes);
+ }
+
+ // choose one from the local rack, but off-nodegroup
+ try {
+ return chooseRandom(NetworkTopology.getFirstHalf(
+ localMachine.getNetworkLocation()),
+ excludedNodes, blocksize,
+ maxNodesPerRack, results,
+ avoidStaleNodes);
+ } catch (NotEnoughReplicasException e1) {
+ // find the second replica
+ DatanodeDescriptor newLocal=null;
+ for(Iterator<DatanodeDescriptor> iter=results.iterator();
+ iter.hasNext();) {
+ DatanodeDescriptor nextNode = iter.next();
+ if (nextNode != localMachine) {
+ newLocal = nextNode;
+ break;
+ }
+ }
+ if (newLocal != null) {
+ try {
+ return chooseRandom(
+ clusterMap.getRack(newLocal.getNetworkLocation()), excludedNodes,
+ blocksize, maxNodesPerRack, results, avoidStaleNodes);
+ } catch(NotEnoughReplicasException e2) {
+ //otherwise randomly choose one from the network
+ return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes);
+ }
+ } else {
+ //otherwise randomly choose one from the network
+ return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes);
+ }
+ }
+ }
+
+ @Override
+ protected void chooseRemoteRack(int numOfReplicas,
+ DatanodeDescriptor localMachine, HashMap<Node, Node> excludedNodes,
+ long blocksize, int maxReplicasPerRack, List<DatanodeDescriptor> results,
+ boolean avoidStaleNodes) throws NotEnoughReplicasException {
+ int oldNumOfReplicas = results.size();
+ // randomly choose one node from remote racks
+ try {
+ chooseRandom(
+ numOfReplicas,
+ "~" + NetworkTopology.getFirstHalf(localMachine.getNetworkLocation()),
+ excludedNodes, blocksize, maxReplicasPerRack, results,
+ avoidStaleNodes);
+ } catch (NotEnoughReplicasException e) {
+ chooseRandom(numOfReplicas - (results.size() - oldNumOfReplicas),
+ localMachine.getNetworkLocation(), excludedNodes, blocksize,
+ maxReplicasPerRack, results, avoidStaleNodes);
+ }
+ }
+
+ /* choose one node from the nodegroup that <i>localMachine</i> is on.
+ * if no such node is available, choose one node from the nodegroup where
+ * a second replica is on.
+ * if still no such node is available, choose a random node in the cluster.
+ * @return the chosen node
+ */
+ private DatanodeDescriptor chooseLocalNodeGroup(
+ NetworkTopologyWithNodeGroup clusterMap, DatanodeDescriptor localMachine,
+ HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
+ List<DatanodeDescriptor> results, boolean avoidStaleNodes)
+ throws NotEnoughReplicasException {
+ // no local machine, so choose a random machine
+ if (localMachine == null) {
+ return chooseRandom(NodeBase.ROOT, excludedNodes,
+ blocksize, maxNodesPerRack, results, avoidStaleNodes);
+ }
+
+ // choose one from the local node group
+ try {
+ return chooseRandom(
+ clusterMap.getNodeGroup(localMachine.getNetworkLocation()),
+ excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
+ } catch (NotEnoughReplicasException e1) {
+ // find the second replica
+ DatanodeDescriptor newLocal=null;
+ for(Iterator<DatanodeDescriptor> iter=results.iterator();
+ iter.hasNext();) {
+ DatanodeDescriptor nextNode = iter.next();
+ if (nextNode != localMachine) {
+ newLocal = nextNode;
+ break;
+ }
+ }
+ if (newLocal != null) {
+ try {
+ return chooseRandom(
+ clusterMap.getNodeGroup(newLocal.getNetworkLocation()),
+ excludedNodes, blocksize, maxNodesPerRack, results,
+ avoidStaleNodes);
+ } catch(NotEnoughReplicasException e2) {
+ //otherwise randomly choose one from the network
+ return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes);
+ }
+ } else {
+ //otherwise randomly choose one from the network
+ return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes);
+ }
+ }
+ }
+
+ @Override
+ protected String getRack(final DatanodeInfo cur) {
+ String nodeGroupString = cur.getNetworkLocation();
+ return NetworkTopology.getFirstHalf(nodeGroupString);
+ }
+
+ /**
+ * Pick up replica node set for deleting replica as over-replicated.
+ * First set contains replica nodes on rack with more than one
+ * replica while second set contains remaining replica nodes.
+ * If first is not empty, divide first set into two subsets:
+ * moreThanOne contains nodes on nodegroup with more than one replica
+ * exactlyOne contains the remaining nodes in first set
+ * then pickup priSet if not empty.
+ * If first is empty, then pick second.
+ */
+ @Override
+ public Iterator<DatanodeDescriptor> pickupReplicaSet(
+ Collection<DatanodeDescriptor> first,
+ Collection<DatanodeDescriptor> second) {
+ // If no replica within same rack, return directly.
+ if (first.isEmpty()) {
+ return second.iterator();
+ }
+ // Split data nodes in the first set into two sets,
+ // moreThanOne contains nodes on nodegroup with more than one replica
+ // exactlyOne contains the remaining nodes
+ Map<String, List<DatanodeDescriptor>> nodeGroupMap =
+ new HashMap<String, List<DatanodeDescriptor>>();
+
+ for(DatanodeDescriptor node : first) {
+ final String nodeGroupName =
+ NetworkTopology.getLastHalf(node.getNetworkLocation());
+ List<DatanodeDescriptor> datanodeList =
+ nodeGroupMap.get(nodeGroupName);
+ if (datanodeList == null) {
+ datanodeList = new ArrayList<DatanodeDescriptor>();
+ nodeGroupMap.put(nodeGroupName, datanodeList);
+ }
+ datanodeList.add(node);
+ }
+
+ final List<DatanodeDescriptor> moreThanOne = new ArrayList<DatanodeDescriptor>();
+ final List<DatanodeDescriptor> exactlyOne = new ArrayList<DatanodeDescriptor>();
+ // split nodes into two sets
+ for(List<DatanodeDescriptor> datanodeList : nodeGroupMap.values()) {
+ if (datanodeList.size() == 1 ) {
+ // exactlyOne contains nodes on nodegroup with exactly one replica
+ exactlyOne.add(datanodeList.get(0));
+ } else {
+ // moreThanOne contains nodes on nodegroup with more than one replica
+ moreThanOne.addAll(datanodeList);
+ }
+ }
+
+ Iterator<DatanodeDescriptor> iter =
+ moreThanOne.isEmpty() ? exactlyOne.iterator() : moreThanOne.iterator();
+ return iter;
+ }
+}
\ No newline at end of file
Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1410690&r1=1410689&r2=1410690&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Sat Nov 17 03:25:18 2012
@@ -45,7 +45,6 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Random;
import java.util.Set;
@@ -4142,35 +4141,15 @@ public class FSNamesystem implements FSC
DatanodeDescriptor delNodeHint) {
INodeFile inode = blocksMap.getINode(b);
// first form a rack to datanodes map and
- HashMap<String, ArrayList<DatanodeDescriptor>> rackMap =
- new HashMap<String, ArrayList<DatanodeDescriptor>>();
- for (Iterator<DatanodeDescriptor> iter = nonExcess.iterator();
- iter.hasNext();) {
- DatanodeDescriptor node = iter.next();
- String rackName = node.getNetworkLocation();
- ArrayList<DatanodeDescriptor> datanodeList = rackMap.get(rackName);
- if(datanodeList==null) {
- datanodeList = new ArrayList<DatanodeDescriptor>();
- }
- datanodeList.add(node);
- rackMap.put(rackName, datanodeList);
- }
-
+ final Map<String, List<DatanodeDescriptor>> rackMap =
+ new HashMap<String, List<DatanodeDescriptor>>();
+ final List<DatanodeDescriptor> moreThanOne = new ArrayList<DatanodeDescriptor>();
+ final List<DatanodeDescriptor> exactlyOne = new ArrayList<DatanodeDescriptor>();
+
// split nodes into two sets
- // priSet contains nodes on rack with more than one replica
- // remains contains the remaining nodes
- ArrayList<DatanodeDescriptor> priSet = new ArrayList<DatanodeDescriptor>();
- ArrayList<DatanodeDescriptor> remains = new ArrayList<DatanodeDescriptor>();
- for( Iterator<Entry<String, ArrayList<DatanodeDescriptor>>> iter =
- rackMap.entrySet().iterator(); iter.hasNext(); ) {
- Entry<String, ArrayList<DatanodeDescriptor>> rackEntry = iter.next();
- ArrayList<DatanodeDescriptor> datanodeList = rackEntry.getValue();
- if( datanodeList.size() == 1 ) {
- remains.add(datanodeList.get(0));
- } else {
- priSet.addAll(datanodeList);
- }
- }
+ // moreThanOne contains nodes on rack with more than one replica
+ // exactlyOne contains the remaining nodes
+ replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne, exactlyOne);
// pick one node to delete that favors the delete hint
// otherwise pick one with least space from priSet if it is not empty
@@ -4180,29 +4159,19 @@ public class FSNamesystem implements FSC
DatanodeInfo cur = null;
// check if we can del delNodeHint
- if (firstOne && delNodeHint !=null && nonExcess.contains(delNodeHint) &&
- (priSet.contains(delNodeHint) || (addedNode != null && !priSet.contains(addedNode))) ) {
+ if (firstOne && delNodeHint !=null && nonExcess.contains(delNodeHint)
+ && (moreThanOne.contains(delNodeHint)
+ || (addedNode != null && !moreThanOne.contains(addedNode))) ) {
cur = delNodeHint;
} else { // regular excessive replica removal
- cur = replicator.chooseReplicaToDelete(inode, b, replication, priSet, remains);
+ cur = replicator.chooseReplicaToDelete(inode, b, replication,
+ moreThanOne, exactlyOne);
}
firstOne = false;
- // adjust rackmap, priSet, and remains
- String rack = cur.getNetworkLocation();
- ArrayList<DatanodeDescriptor> datanodes = rackMap.get(rack);
- datanodes.remove(cur);
- if(datanodes.isEmpty()) {
- rackMap.remove(rack);
- }
- if( priSet.remove(cur) ) {
- if (datanodes.size() == 1) {
- priSet.remove(datanodes.get(0));
- remains.add(datanodes.get(0));
- }
- } else {
- remains.remove(cur);
- }
+ // adjust rackmap, moreThanOne, and exactlyOne
+ replicator.adjustSetsWithChosenReplica(rackMap, moreThanOne,
+ exactlyOne, cur);
nonExcess.remove(cur);
Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1410690&r1=1410689&r2=1410690&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/DFSTestUtil.java Sat Nov 17 03:25:18 2012
@@ -31,21 +31,24 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.namenode.DatanodeDescriptor;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileSystem.Statistics;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
-import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
/** Utilities for HDFS tests */
public class DFSTestUtil {
@@ -372,4 +375,9 @@ public class DFSTestUtil {
return result;
}
+
+ public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
+ String rackLocation) {
+ return new DatanodeDescriptor(new DatanodeID(ipAddr), rackLocation);
+ }
}
Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java?rev=1410690&r1=1410689&r2=1410690&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java Sat Nov 17 03:25:18 2012
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import junit.framework.TestCase;
@@ -682,5 +683,46 @@ public class TestReplicationPolicy exten
blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
assertEquals(blocksReplWorkMultiplier, 3);
}
-
+
+ /**
+ * Test for the chooseReplicaToDelete are processed based on block locality
+ * and free space
+ */
+ public void testChooseReplicaToDelete() throws Exception {
+ List<DatanodeDescriptor> replicaNodeList = new ArrayList<DatanodeDescriptor>();
+ final Map<String, List<DatanodeDescriptor>> rackMap = new HashMap<String, List<DatanodeDescriptor>>();
+
+ dataNodes[0].setRemaining(4 * 1024 * 1024);
+ replicaNodeList.add(dataNodes[0]);
+
+ dataNodes[1].setRemaining(3 * 1024 * 1024);
+ replicaNodeList.add(dataNodes[1]);
+
+ dataNodes[2].setRemaining(2 * 1024 * 1024);
+ replicaNodeList.add(dataNodes[2]);
+
+ dataNodes[5].setRemaining(1 * 1024 * 1024);
+ replicaNodeList.add(dataNodes[5]);
+
+ List<DatanodeDescriptor> first = new ArrayList<DatanodeDescriptor>();
+ List<DatanodeDescriptor> second = new ArrayList<DatanodeDescriptor>();
+ replicator.splitNodesWithRack(replicaNodeList, rackMap, first, second);
+ // dataNodes[0] and dataNodes[1] are in first set as their rack has two
+ // replica nodes, while datanodes[2] and dataNodes[5] are in second set.
+ assertEquals(2, first.size());
+ assertEquals(2, second.size());
+ DatanodeDescriptor chosenNode = replicator.chooseReplicaToDelete(null,
+ null, (short) 3, first, second);
+ // Within first set, dataNodes[1] with less free space
+ assertEquals(chosenNode, dataNodes[1]);
+
+ replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosenNode);
+ assertEquals(0, first.size());
+ assertEquals(3, second.size());
+ // Within second set, dataNodes[5] with less free space
+ chosenNode = replicator.chooseReplicaToDelete(null, null, (short) 2, first,
+ second);
+ assertEquals(chosenNode, dataNodes[5]);
+ }
+
}
Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicyWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicyWithNodeGroup.java?rev=1410690&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicyWithNodeGroup.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicyWithNodeGroup.java Sat Nov 17 03:25:18 2012
@@ -0,0 +1,485 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
+import org.junit.Test;
+
+public class TestReplicationPolicyWithNodeGroup {
+ private static final int BLOCK_SIZE = 1024;
+ private static final int NUM_OF_DATANODES = 8;
+ private static final Configuration CONF = new Configuration();
+ private static final NetworkTopology cluster;
+ private static final NameNode namenode;
+ private static final BlockPlacementPolicy replicator;
+ private static final String filename = "/dummyfile.txt";
+
+ private final static DatanodeDescriptor dataNodes[] = new DatanodeDescriptor[] {
+ DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/d1/r1/n1"),
+ DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/d1/r1/n1"),
+ DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/d1/r1/n2"),
+ DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/d1/r2/n3"),
+ DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/d1/r2/n3"),
+ DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/d1/r2/n4"),
+ DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/d2/r3/n5"),
+ DFSTestUtil.getDatanodeDescriptor("8.8.8.8", "/d2/r3/n6")
+ };
+
+ private final static DatanodeDescriptor NODE =
+ new DatanodeDescriptor(DFSTestUtil.getDatanodeDescriptor("9.9.9.9", "/d2/r4/n7"));
+
+ static {
+ try {
+ FileSystem.setDefaultUri(CONF, "hdfs://localhost:0");
+ CONF.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
+ // Set properties to make HDFS aware of NodeGroup.
+ CONF.set("dfs.block.replicator.classname",
+ "org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyWithNodeGroup");
+ CONF.set("net.topology.impl",
+ "org.apache.hadoop.net.NetworkTopologyWithNodeGroup");
+ NameNode.format(CONF);
+ namenode = new NameNode(CONF);
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw (RuntimeException)new RuntimeException().initCause(e);
+ }
+ FSNamesystem fsNamesystem = FSNamesystem.getFSNamesystem();
+ replicator = fsNamesystem.replicator;
+ cluster = fsNamesystem.clusterMap;
+ // construct network topology
+ for(int i=0; i<NUM_OF_DATANODES; i++) {
+ cluster.add(dataNodes[i]);
+ }
+ setupDataNodeCapacity();
+ }
+
+ private static void setupDataNodeCapacity() {
+ for(int i=0; i<NUM_OF_DATANODES; i++) {
+ dataNodes[i].updateHeartbeat(
+ 2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+ 2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0);
+ }
+ }
+
+ /**
+ * In this testcase, client is dataNodes[0]. So the 1st replica should be
+ * placed on dataNodes[0], the 2nd replica should be placed on
+ * different rack and third should be placed on different node (and node group)
+ * of rack chosen for 2nd node.
+ * The only excpetion is when the <i>numOfReplicas</i> is 2,
+ * the 1st is on dataNodes[0] and the 2nd is on a different rack.
+ * @throws Exception
+ */
+ @Test
+ public void testChooseTarget1() throws Exception {
+ dataNodes[0].updateHeartbeat(
+ 2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+ FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 4); // overloaded
+
+ DatanodeDescriptor[] targets;
+ targets = replicator.chooseTarget(filename, 0, dataNodes[0],
+ new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+ assertEquals(targets.length, 0);
+
+ targets = replicator.chooseTarget(filename, 1, dataNodes[0],
+ new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+ assertEquals(targets.length, 1);
+ assertEquals(targets[0], dataNodes[0]);
+
+ targets = replicator.chooseTarget(filename, 2, dataNodes[0],
+ new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+ assertEquals(targets.length, 2);
+ assertEquals(targets[0], dataNodes[0]);
+ assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+
+ targets = replicator.chooseTarget(filename, 3, dataNodes[0],
+ new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+ assertEquals(targets.length, 3);
+ assertEquals(targets[0], dataNodes[0]);
+ assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+ assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
+ assertFalse(cluster.isOnSameNodeGroup(targets[1], targets[2]));
+
+ targets = replicator.chooseTarget(filename, 4, dataNodes[0],
+ new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+ assertEquals(targets.length, 4);
+ assertEquals(targets[0], dataNodes[0]);
+ assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
+ cluster.isOnSameRack(targets[2], targets[3]));
+ assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
+ // Make sure no more than one replicas are on the same nodegroup
+ verifyNoTwoTargetsOnSameNodeGroup(targets);
+
+ dataNodes[0].updateHeartbeat(
+ 2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+ FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0);
+ }
+
+ private void verifyNoTwoTargetsOnSameNodeGroup(DatanodeDescriptor[] targets) {
+ Set<String> nodeGroupSet = new HashSet<String>();
+ for (DatanodeDescriptor target: targets) {
+ nodeGroupSet.add(target.getNetworkLocation());
+ }
+ assertEquals(nodeGroupSet.size(), targets.length);
+ }
+
+ /**
+ * In this testcase, client is dataNodes[0], but the dataNodes[1] is
+ * not allowed to be chosen. So the 1st replica should be
+ * placed on dataNodes[0], the 2nd replica should be placed on a different
+ * rack, the 3rd should be on same rack as the 2nd replica but in different
+ * node group, and the rest should be placed on a third rack.
+ * @throws Exception
+ */
+ @Test
+ public void testChooseTarget2() throws Exception {
+ HashMap<Node, Node> excludedNodes;
+ DatanodeDescriptor[] targets;
+ BlockPlacementPolicyDefault repl = (BlockPlacementPolicyDefault)replicator;
+ List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
+
+ excludedNodes = new HashMap<Node, Node>();
+ excludedNodes.put(dataNodes[1], dataNodes[1]);
+ targets = repl.chooseTarget(4, dataNodes[0], chosenNodes,
+ excludedNodes, BLOCK_SIZE);
+ assertEquals(targets.length, 4);
+ assertEquals(targets[0], dataNodes[0]);
+ assertTrue(cluster.isNodeGroupAware());
+ // Make sure no replicas are on the same nodegroup
+ for (int i=1;i<4;i++) {
+ assertFalse(cluster.isOnSameNodeGroup(targets[0], targets[i]));
+ }
+ assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
+ cluster.isOnSameRack(targets[2], targets[3]));
+ assertFalse(cluster.isOnSameRack(targets[1], targets[3]));
+ }
+
+ /**
+ * In this testcase, client is dataNodes[0], but dataNodes[0] is not qualified
+ * to be chosen. So the 1st replica should be placed on dataNodes[1],
+ * the 2nd replica should be placed on a different rack,
+ * the 3rd replica should be placed on the same rack as the 2nd replica but in different nodegroup,
+ * and the rest should be placed on the third rack.
+ * @throws Exception
+ */
+ @Test
+ public void testChooseTarget3() throws Exception {
+ // make data node 0 to be not qualified to choose
+ dataNodes[0].updateHeartbeat(
+ 2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+ (FSConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0); // no space
+
+ DatanodeDescriptor[] targets;
+ targets = replicator.chooseTarget(filename, 0, dataNodes[0],
+ new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+ assertEquals(targets.length, 0);
+
+ targets = replicator.chooseTarget(filename, 1, dataNodes[0],
+ new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+ assertEquals(targets.length, 1);
+ assertEquals(targets[0], dataNodes[1]);
+
+ targets = replicator.chooseTarget(filename, 2, dataNodes[0],
+ new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+ assertEquals(targets.length, 2);
+ assertEquals(targets[0], dataNodes[1]);
+ assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+
+ targets = replicator.chooseTarget(filename, 3, dataNodes[0],
+ new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+ assertEquals(targets.length, 3);
+ assertEquals(targets[0], dataNodes[1]);
+ assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
+ assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+
+ targets = replicator.chooseTarget(filename, 4, dataNodes[0],
+ new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+ assertEquals(targets.length, 4);
+ assertEquals(targets[0], dataNodes[1]);
+ assertTrue(cluster.isNodeGroupAware());
+ verifyNoTwoTargetsOnSameNodeGroup(targets);
+ assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
+ cluster.isOnSameRack(targets[2], targets[3]));
+
+ dataNodes[0].updateHeartbeat(
+ 2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+ FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0);
+ }
+
+ /**
+ * In this testcase, client is dataNodes[0], but none of the nodes on rack 1
+ * is qualified to be chosen. So the 1st replica should be placed on either
+ * rack 2 or rack 3.
+ * the 2nd replica should be placed on a different rack,
+ * the 3rd replica should be placed on the same rack as the 1st replica, but
+ * in different node group.
+ * @throws Exception
+ */
+ @Test
+ public void testChooseTarget4() throws Exception {
+ // make data node 0-2 to be not qualified to choose: not enough disk space
+ for(int i=0; i<3; i++) {
+ dataNodes[i].updateHeartbeat(
+ 2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+ (FSConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0);
+ }
+
+ DatanodeDescriptor[] targets;
+ targets = replicator.chooseTarget(filename, 0, dataNodes[0],
+ new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+ assertEquals(targets.length, 0);
+
+ targets = replicator.chooseTarget(filename, 1, dataNodes[0],
+ new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+ assertEquals(targets.length, 1);
+ assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
+
+ targets = replicator.chooseTarget(filename, 2, dataNodes[0],
+ new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+ assertEquals(targets.length, 2);
+ assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
+ assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+
+ targets = replicator.chooseTarget(filename, 3, dataNodes[0],
+ new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+ assertEquals(targets.length, 3);
+ for(int i=0; i<3; i++) {
+ assertFalse(cluster.isOnSameRack(targets[i], dataNodes[0]));
+ }
+ verifyNoTwoTargetsOnSameNodeGroup(targets);
+ assertTrue(cluster.isOnSameRack(targets[0], targets[1]) ||
+ cluster.isOnSameRack(targets[1], targets[2]));
+ assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
+ }
+
+ /**
+ * In this testcase, client is is a node outside of file system.
+ * So the 1st replica can be placed on any node.
+ * the 2nd replica should be placed on a different rack,
+ * the 3rd replica should be placed on the same rack as the 2nd replica,
+ * @throws Exception
+ */
+ @Test
+ public void testChooseTarget5() throws Exception {
+ setupDataNodeCapacity();
+ DatanodeDescriptor[] targets;
+ targets = replicator.chooseTarget(filename, 0, NODE,
+ new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+ assertEquals(targets.length, 0);
+
+ targets = replicator.chooseTarget(filename, 1, NODE,
+ new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+ assertEquals(targets.length, 1);
+
+ targets = replicator.chooseTarget(filename, 2, NODE,
+ new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+ assertEquals(targets.length, 2);
+ assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+
+ targets = replicator.chooseTarget(filename, 3, NODE,
+ new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+ assertEquals(targets.length, 3);
+ assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
+ assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+ verifyNoTwoTargetsOnSameNodeGroup(targets);
+ }
+
+ /**
+ * This testcase tests re-replication, when dataNodes[0] is already chosen.
+ * So the 1st replica can be placed on random rack.
+ * the 2nd replica should be placed on different node and nodegroup by same rack as
+ * the 1st replica. The 3rd replica can be placed randomly.
+ * @throws Exception
+ */
+ @Test
+ public void testRereplicate1() throws Exception {
+ setupDataNodeCapacity();
+ List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
+ chosenNodes.add(dataNodes[0]);
+ DatanodeDescriptor[] targets;
+
+ targets = replicator.chooseTarget(filename,
+ 0, dataNodes[0], chosenNodes, BLOCK_SIZE);
+ assertEquals(targets.length, 0);
+
+ targets = replicator.chooseTarget(filename,
+ 1, dataNodes[0], chosenNodes, BLOCK_SIZE);
+ assertEquals(targets.length, 1);
+ assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
+
+ targets = replicator.chooseTarget(filename,
+ 2, dataNodes[0], chosenNodes, BLOCK_SIZE);
+ assertEquals(targets.length, 2);
+ assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
+ assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+
+ targets = replicator.chooseTarget(filename,
+ 3, dataNodes[0], chosenNodes, BLOCK_SIZE);
+ assertEquals(targets.length, 3);
+ assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
+ assertFalse(cluster.isOnSameNodeGroup(dataNodes[0], targets[0]));
+ assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
+ }
+
+ /**
+ * This testcase tests re-replication,
+ * when dataNodes[0] and dataNodes[1] are already chosen.
+ * So the 1st replica should be placed on a different rack of rack 1.
+ * the rest replicas can be placed randomly,
+ * @throws Exception
+ */
+ @Test
+ public void testRereplicate2() throws Exception {
+ setupDataNodeCapacity();
+ List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
+ chosenNodes.add(dataNodes[0]);
+ chosenNodes.add(dataNodes[1]);
+
+ DatanodeDescriptor[] targets;
+ targets = replicator.chooseTarget(filename,
+ 0, dataNodes[0], chosenNodes, BLOCK_SIZE);
+ assertEquals(targets.length, 0);
+
+ targets = replicator.chooseTarget(filename,
+ 1, dataNodes[0], chosenNodes, BLOCK_SIZE);
+ assertEquals(targets.length, 1);
+ assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
+
+ targets = replicator.chooseTarget(filename,
+ 2, dataNodes[0], chosenNodes, BLOCK_SIZE);
+ assertEquals(targets.length, 2);
+ assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]) &&
+ cluster.isOnSameRack(dataNodes[0], targets[1]));
+ }
+
+ /**
+ * This testcase tests re-replication,
+ * when dataNodes[0] and dataNodes[3] are already chosen.
+ * So the 1st replica should be placed on the rack that the writer resides.
+ * the rest replicas can be placed randomly,
+ * @throws Exception
+ */
+ @Test
+ public void testRereplicate3() throws Exception {
+ setupDataNodeCapacity();
+ List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
+ chosenNodes.add(dataNodes[0]);
+ chosenNodes.add(dataNodes[3]);
+
+ DatanodeDescriptor[] targets;
+ targets = replicator.chooseTarget(filename,
+ 0, dataNodes[0], chosenNodes, BLOCK_SIZE);
+ assertEquals(targets.length, 0);
+
+ targets = replicator.chooseTarget(filename,
+ 1, dataNodes[0], chosenNodes, BLOCK_SIZE);
+ assertEquals(targets.length, 1);
+ assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
+ assertFalse(cluster.isOnSameRack(dataNodes[3], targets[0]));
+
+ targets = replicator.chooseTarget(filename,
+ 1, dataNodes[3], chosenNodes, BLOCK_SIZE);
+ assertEquals(targets.length, 1);
+ assertTrue(cluster.isOnSameRack(dataNodes[3], targets[0]));
+ assertFalse(cluster.isOnSameNodeGroup(dataNodes[3], targets[0]));
+ assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
+
+ targets = replicator.chooseTarget(filename,
+ 2, dataNodes[0], chosenNodes, BLOCK_SIZE);
+ assertEquals(targets.length, 2);
+ assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
+ assertFalse(cluster.isOnSameNodeGroup(dataNodes[0], targets[0]));
+ targets = replicator.chooseTarget(filename,
+ 2, dataNodes[3], chosenNodes, BLOCK_SIZE);
+ assertEquals(targets.length, 2);
+ assertTrue(cluster.isOnSameRack(dataNodes[3], targets[0]));
+ }
+
+ /**
+ * Test for the chooseReplicaToDelete are processed based on
+ * block locality and free space
+ */
+ @Test
+ public void testChooseReplicaToDelete() throws Exception {
+ List<DatanodeDescriptor> replicaNodeList =
+ new ArrayList<DatanodeDescriptor>();
+ final Map<String, List<DatanodeDescriptor>> rackMap =
+ new HashMap<String, List<DatanodeDescriptor>>();
+ dataNodes[0].setRemaining(4*1024*1024);
+ replicaNodeList.add(dataNodes[0]);
+
+ dataNodes[1].setRemaining(3*1024*1024);
+ replicaNodeList.add(dataNodes[1]);
+
+ dataNodes[2].setRemaining(2*1024*1024);
+ replicaNodeList.add(dataNodes[2]);
+
+ dataNodes[5].setRemaining(1*1024*1024);
+ replicaNodeList.add(dataNodes[5]);
+
+ List<DatanodeDescriptor> first = new ArrayList<DatanodeDescriptor>();
+ List<DatanodeDescriptor> second = new ArrayList<DatanodeDescriptor>();
+ replicator.splitNodesWithRack(
+ replicaNodeList, rackMap, first, second);
+ assertEquals(3, first.size());
+ assertEquals(1, second.size());
+ DatanodeDescriptor chosenNode = replicator.chooseReplicaToDelete(
+ null, null, (short)3, first, second);
+ // Within first set {dataNodes[0], dataNodes[1], dataNodes[2]},
+ // dataNodes[0] and dataNodes[1] are in the same nodegroup,
+ // but dataNodes[1] is chosen as less free space
+ assertEquals(chosenNode, dataNodes[1]);
+
+ replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosenNode);
+ assertEquals(2, first.size());
+ assertEquals(1, second.size());
+ // Within first set {dataNodes[0], dataNodes[2]}, dataNodes[2] is chosen
+ // as less free space
+ chosenNode = replicator.chooseReplicaToDelete(
+ null, null, (short)2, first, second);
+ assertEquals(chosenNode, dataNodes[2]);
+
+ replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosenNode);
+ assertEquals(0, first.size());
+ assertEquals(2, second.size());
+ // Within second set, dataNodes[5] with less free space
+ chosenNode = replicator.chooseReplicaToDelete(
+ null, null, (short)1, first, second);
+ assertEquals(chosenNode, dataNodes[5]);
+ }
+
+}