You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2013/06/03 06:35:04 UTC
svn commit: r1488844 - in
/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/ src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/
src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/
Author: szetszwo
Date: Mon Jun 3 04:35:04 2013
New Revision: 1488844
URL: http://svn.apache.org/r1488844
Log:
svn merge -c 1353807 from trunk for HDFS-3498. Support replica removal in BlockPlacementPolicy and make BlockPlacementPolicyDefault extensible for reusing code in subclasses.
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/ (props changed)
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/ (props changed)
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1353807
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1488844&r1=1488843&r2=1488844&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Jun 3 04:35:04 2013
@@ -1032,6 +1032,10 @@ Release 2.0.3-alpha - 2013-02-06
HDFS-3131. Improve TestStorageRestore. (Brandon Li via atm)
+ HDFS-3498. Support replica removal in BlockPlacementPolicy and make
+ BlockPlacementPolicyDefault extensible for reusing code in subclasses.
+ (Junping Du via szetszwo)
+
OPTIMIZATIONS
HDFS-3429. DataNode reads checksums even if client does not need them (todd)
Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1353807
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1488844&r1=1488843&r2=1488844&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Mon Jun 3 04:35:04 2013
@@ -2422,30 +2422,14 @@ assert storedBlock.findDatanode(dn) < 0
BlockCollection bc = getBlockCollection(b);
final Map<String, List<DatanodeDescriptor>> rackMap
= new HashMap<String, List<DatanodeDescriptor>>();
- for(final Iterator<DatanodeDescriptor> iter = nonExcess.iterator();
- iter.hasNext(); ) {
- final DatanodeDescriptor node = iter.next();
- final String rackName = node.getNetworkLocation();
- List<DatanodeDescriptor> datanodeList = rackMap.get(rackName);
- if (datanodeList == null) {
- datanodeList = new ArrayList<DatanodeDescriptor>();
- rackMap.put(rackName, datanodeList);
- }
- datanodeList.add(node);
- }
+ final List<DatanodeDescriptor> moreThanOne = new ArrayList<DatanodeDescriptor>();
+ final List<DatanodeDescriptor> exactlyOne = new ArrayList<DatanodeDescriptor>();
// split nodes into two sets
- // priSet contains nodes on rack with more than one replica
- // remains contains the remaining nodes
- final List<DatanodeDescriptor> priSet = new ArrayList<DatanodeDescriptor>();
- final List<DatanodeDescriptor> remains = new ArrayList<DatanodeDescriptor>();
- for(List<DatanodeDescriptor> datanodeList : rackMap.values()) {
- if (datanodeList.size() == 1 ) {
- remains.add(datanodeList.get(0));
- } else {
- priSet.addAll(datanodeList);
- }
- }
+ // moreThanOne contains nodes on rack with more than one replica
+ // exactlyOne contains the remaining nodes
+ replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne,
+ exactlyOne);
// pick one node to delete that favors the delete hint
// otherwise pick one with least space from priSet if it is not empty
@@ -2455,30 +2439,18 @@ assert storedBlock.findDatanode(dn) < 0
// check if we can delete delNodeHint
final DatanodeInfo cur;
if (firstOne && delNodeHint !=null && nonExcess.contains(delNodeHint)
- && (priSet.contains(delNodeHint)
- || (addedNode != null && !priSet.contains(addedNode))) ) {
+ && (moreThanOne.contains(delNodeHint)
+ || (addedNode != null && !moreThanOne.contains(addedNode))) ) {
cur = delNodeHint;
} else { // regular excessive replica removal
cur = replicator.chooseReplicaToDelete(bc, b, replication,
- priSet, remains);
+ moreThanOne, exactlyOne);
}
firstOne = false;
- // adjust rackmap, priSet, and remains
- String rack = cur.getNetworkLocation();
- final List<DatanodeDescriptor> datanodes = rackMap.get(rack);
- datanodes.remove(cur);
- if (datanodes.isEmpty()) {
- rackMap.remove(rack);
- }
- if (priSet.remove(cur)) {
- if (datanodes.size() == 1) {
- priSet.remove(datanodes.get(0));
- remains.add(datanodes.get(0));
- }
- } else {
- remains.remove(cur);
- }
+ // adjust rackmap, moreThanOne, and exactlyOne
+ replicator.adjustSetsWithChosenReplica(rackMap, moreThanOne,
+ exactlyOne, cur);
nonExcess.remove(cur);
addToExcessReplicate(cur, b);
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java?rev=1488844&r1=1488843&r2=1488844&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java Mon Jun 3 04:35:04 2013
@@ -21,12 +21,14 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
import org.apache.hadoop.net.NetworkTopology;
@@ -201,4 +203,79 @@ public abstract class BlockPlacementPoli
return replicator;
}
+ /**
+ * Adjust rackmap, moreThanOne, and exactlyOne after removing replica on cur.
+ *
+ * @param rackMap a map from rack to replica
+ * @param moreThanOne The List of replica nodes on rack which has more than
+ * one replica
+ * @param exactlyOne The List of replica nodes on rack with only one replica
+ * @param cur current replica to remove
+ */
+ public void adjustSetsWithChosenReplica(final Map<String,
+ List<DatanodeDescriptor>> rackMap,
+ final List<DatanodeDescriptor> moreThanOne,
+ final List<DatanodeDescriptor> exactlyOne, final DatanodeInfo cur) {
+
+ String rack = getRack(cur);
+ final List<DatanodeDescriptor> datanodes = rackMap.get(rack);
+ datanodes.remove(cur);
+ if (datanodes.isEmpty()) {
+ rackMap.remove(rack);
+ }
+ if (moreThanOne.remove(cur)) {
+ if (datanodes.size() == 1) {
+ moreThanOne.remove(datanodes.get(0));
+ exactlyOne.add(datanodes.get(0));
+ }
+ } else {
+ exactlyOne.remove(cur);
+ }
+ }
+
+ /**
+ * Get rack string from a data node
+ * @param datanode
+ * @return rack of data node
+ */
+ protected String getRack(final DatanodeInfo datanode) {
+ return datanode.getNetworkLocation();
+ }
+
+ /**
+ * Split data nodes into two sets, one set includes nodes on rack with
+ * more than one replica, the other set contains the remaining nodes.
+ *
+ * @param dataNodes
+ * @param rackMap a map from rack to datanodes
+ * @param moreThanOne contains nodes on rack with more than one replica
+ * @param exactlyOne remains contains the remaining nodes
+ */
+ public void splitNodesWithRack(
+ Collection<DatanodeDescriptor> dataNodes,
+ final Map<String, List<DatanodeDescriptor>> rackMap,
+ final List<DatanodeDescriptor> moreThanOne,
+ final List<DatanodeDescriptor> exactlyOne) {
+ for(DatanodeDescriptor node : dataNodes) {
+ final String rackName = getRack(node);
+ List<DatanodeDescriptor> datanodeList = rackMap.get(rackName);
+ if (datanodeList == null) {
+ datanodeList = new ArrayList<DatanodeDescriptor>();
+ rackMap.put(rackName, datanodeList);
+ }
+ datanodeList.add(node);
+ }
+
+ // split nodes into two sets
+ for(List<DatanodeDescriptor> datanodeList : rackMap.values()) {
+ if (datanodeList.size() == 1) {
+ // exactlyOne contains nodes on rack with only one replica
+ exactlyOne.add(datanodeList.get(0));
+ } else {
+ // moreThanOne contains nodes on rack with more than one replica
+ moreThanOne.addAll(datanodeList);
+ }
+ }
+ }
+
}
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1488844&r1=1488843&r2=1488844&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Mon Jun 3 04:35:04 2013
@@ -57,17 +57,17 @@ public class BlockPlacementPolicyDefault
"For more information, please enable DEBUG log level on "
+ LOG.getClass().getName();
- private boolean considerLoad;
+ protected boolean considerLoad;
private boolean preferLocalNode = true;
- private NetworkTopology clusterMap;
+ protected NetworkTopology clusterMap;
private FSClusterStats stats;
- private long heartbeatInterval; // interval for DataNode heartbeats
+ protected long heartbeatInterval; // interval for DataNode heartbeats
private long staleInterval; // interval used to identify stale DataNodes
/**
* A miss of that many heartbeats is tolerated for replica deletion policy.
*/
- private int tolerateHeartbeatMultiplier;
+ protected int tolerateHeartbeatMultiplier;
BlockPlacementPolicyDefault(Configuration conf, FSClusterStats stats,
NetworkTopology clusterMap) {
@@ -95,7 +95,7 @@ public class BlockPlacementPolicyDefault
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);
}
- private ThreadLocal<StringBuilder> threadLocalBuilder =
+ protected ThreadLocal<StringBuilder> threadLocalBuilder =
new ThreadLocal<StringBuilder>() {
@Override
protected StringBuilder initialValue() {
@@ -319,7 +319,7 @@ public class BlockPlacementPolicyDefault
* choose a node on the same rack
* @return the chosen node
*/
- private DatanodeDescriptor chooseLocalNode(
+ protected DatanodeDescriptor chooseLocalNode(
DatanodeDescriptor localMachine,
HashMap<Node, Node> excludedNodes,
long blocksize,
@@ -354,7 +354,7 @@ public class BlockPlacementPolicyDefault
* in the cluster.
* @return the chosen node
*/
- private DatanodeDescriptor chooseLocalRack(
+ protected DatanodeDescriptor chooseLocalRack(
DatanodeDescriptor localMachine,
HashMap<Node, Node> excludedNodes,
long blocksize,
@@ -406,7 +406,7 @@ public class BlockPlacementPolicyDefault
* from the local rack
*/
- private void chooseRemoteRack(int numOfReplicas,
+ protected void chooseRemoteRack(int numOfReplicas,
DatanodeDescriptor localMachine,
HashMap<Node, Node> excludedNodes,
long blocksize,
@@ -430,7 +430,7 @@ public class BlockPlacementPolicyDefault
/* Randomly choose one target from <i>nodes</i>.
* @return the chosen node
*/
- private DatanodeDescriptor chooseRandom(
+ protected DatanodeDescriptor chooseRandom(
String nodes,
HashMap<Node, Node> excludedNodes,
long blocksize,
@@ -476,7 +476,7 @@ public class BlockPlacementPolicyDefault
/* Randomly choose <i>numOfReplicas</i> targets from <i>nodes</i>.
*/
- private void chooseRandom(int numOfReplicas,
+ protected void chooseRandom(int numOfReplicas,
String nodes,
HashMap<Node, Node> excludedNodes,
long blocksize,
@@ -551,7 +551,7 @@ public class BlockPlacementPolicyDefault
* does not have too much load,
* and the rack does not have too many nodes.
*/
- private boolean isGoodTarget(DatanodeDescriptor node,
+ protected boolean isGoodTarget(DatanodeDescriptor node,
long blockSize, int maxTargetPerRack,
boolean considerLoad,
List<DatanodeDescriptor> results,
@@ -699,8 +699,7 @@ public class BlockPlacementPolicyDefault
// pick replica from the first Set. If first is empty, then pick replicas
// from second set.
- Iterator<DatanodeDescriptor> iter =
- first.isEmpty() ? second.iterator() : first.iterator();
+ Iterator<DatanodeDescriptor> iter = pickupReplicaSet(first, second);
// Pick the node with the oldest heartbeat or with the least free space,
// if all hearbeats are within the tolerable heartbeat interval
@@ -719,6 +718,20 @@ public class BlockPlacementPolicyDefault
}
return oldestHeartbeatNode != null ? oldestHeartbeatNode : minSpaceNode;
}
+
+ /**
+ * Pick up replica node set for deleting replica as over-replicated.
+ * First set contains replica nodes on rack with more than one
+ * replica while second set contains remaining replica nodes.
+ * So pick up first set if not empty. If first is empty, then pick second.
+ */
+ protected Iterator<DatanodeDescriptor> pickupReplicaSet(
+ Collection<DatanodeDescriptor> first,
+ Collection<DatanodeDescriptor> second) {
+ Iterator<DatanodeDescriptor> iter =
+ first.isEmpty() ? second.iterator() : first.iterator();
+ return iter;
+ }
@VisibleForTesting
void setPreferLocalNode(boolean prefer) {
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java?rev=1488844&r1=1488843&r2=1488844&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java Mon Jun 3 04:35:04 2013
@@ -27,6 +27,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
@@ -78,7 +79,7 @@ public class TestReplicationPolicy {
DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/d1/r2"),
DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/d1/r2"),
DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/d2/r3"),
- DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/d2/r3")
+ DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/d2/r3")
};
FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
@@ -952,4 +953,50 @@ public class TestReplicationPolicy {
exception.expect(IllegalArgumentException.class);
blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
}
+
+ /**
+ * Test for the chooseReplicaToDelete are processed based on
+ * block locality and free space
+ */
+ @Test
+ public void testChooseReplicaToDelete() throws Exception {
+ List<DatanodeDescriptor> replicaNodeList = new
+ ArrayList<DatanodeDescriptor>();
+ final Map<String, List<DatanodeDescriptor>> rackMap
+ = new HashMap<String, List<DatanodeDescriptor>>();
+
+ dataNodes[0].setRemaining(4*1024*1024);
+ replicaNodeList.add(dataNodes[0]);
+
+ dataNodes[1].setRemaining(3*1024*1024);
+ replicaNodeList.add(dataNodes[1]);
+
+ dataNodes[2].setRemaining(2*1024*1024);
+ replicaNodeList.add(dataNodes[2]);
+
+ dataNodes[5].setRemaining(1*1024*1024);
+ replicaNodeList.add(dataNodes[5]);
+
+ List<DatanodeDescriptor> first = new ArrayList<DatanodeDescriptor>();
+ List<DatanodeDescriptor> second = new ArrayList<DatanodeDescriptor>();
+ replicator.splitNodesWithRack(
+ replicaNodeList, rackMap, first, second);
+ // dataNodes[0] and dataNodes[1] are in first set as their rack has two
+ // replica nodes, while datanodes[2] and dataNodes[5] are in second set.
+ assertEquals(2, first.size());
+ assertEquals(2, second.size());
+ DatanodeDescriptor chosenNode = replicator.chooseReplicaToDelete(
+ null, null, (short)3, first, second);
+ // Within first set, dataNodes[1] with less free space
+ assertEquals(chosenNode, dataNodes[1]);
+
+ replicator.adjustSetsWithChosenReplica(
+ rackMap, first, second, chosenNode);
+ assertEquals(0, first.size());
+ assertEquals(3, second.size());
+ // Within second set, dataNodes[5] with less free space
+ chosenNode = replicator.chooseReplicaToDelete(
+ null, null, (short)2, first, second);
+ assertEquals(chosenNode, dataNodes[5]);
+ }
}