You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yq...@apache.org on 2017/05/05 03:56:26 UTC
hadoop git commit: HDFS-11530. Use HDFS specific network topology to
choose datanode in BlockPlacementPolicyDefault. Contributed by Yiqun Lin and
Chen Liang.
Repository: hadoop
Updated Branches:
refs/heads/trunk 3082552b3 -> 97c2e576c
HDFS-11530. Use HDFS specific network topology to choose datanode in BlockPlacementPolicyDefault. Contributed by Yiqun Lin and Chen Liang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/97c2e576
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/97c2e576
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/97c2e576
Branch: refs/heads/trunk
Commit: 97c2e576c91c2316c2b52bfc948bae9bff8ca49f
Parents: 3082552
Author: Yiqun Lin <yq...@apache.org>
Authored: Fri May 5 11:54:50 2017 +0800
Committer: Yiqun Lin <yq...@apache.org>
Committed: Fri May 5 11:54:50 2017 +0800
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 +
.../hadoop/hdfs/net/DFSNetworkTopology.java | 24 +++-
.../hadoop/hdfs/net/DFSTopologyNodeImpl.java | 137 +++++++++++++++++++
.../BlockPlacementPolicyDefault.java | 36 ++++-
.../blockmanagement/DatanodeDescriptor.java | 36 ++++-
.../server/blockmanagement/DatanodeManager.java | 19 ++-
.../src/main/resources/hdfs-default.xml | 8 ++
.../TestDefaultBlockPlacementPolicy.java | 46 +++++++
8 files changed, 302 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/97c2e576/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 0ca344c..b95c7e6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -1085,6 +1085,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"httpfs.buffer.size";
public static final int HTTP_BUFFER_SIZE_DEFAULT = 4096;
+ public static final String DFS_USE_DFS_NETWORK_TOPOLOGY_KEY =
+ "dfs.use.dfs.network.topology";
+ public static final boolean DFS_USE_DFS_NETWORK_TOPOLOGY_DEFAULT = false;
+
// dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
@Deprecated
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY
http://git-wip-us.apache.org/repos/asf/hadoop/blob/97c2e576/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSNetworkTopology.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSNetworkTopology.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSNetworkTopology.java
index 259e275..e74cdec 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSNetworkTopology.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSNetworkTopology.java
@@ -19,8 +19,10 @@ package org.apache.hadoop.hdfs.net;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
@@ -204,10 +206,24 @@ public class DFSNetworkTopology extends NetworkTopology {
}
if (excludedNodes != null) {
for (Node excludedNode : excludedNodes) {
- // all excluded nodes should be DatanodeDescriptor
- Preconditions.checkArgument(excludedNode instanceof DatanodeDescriptor);
- availableCount -= ((DatanodeDescriptor) excludedNode)
- .hasStorageType(type) ? 1 : 0;
+ if (excludedNode instanceof DatanodeDescriptor) {
+ availableCount -= ((DatanodeDescriptor) excludedNode)
+ .hasStorageType(type) ? 1 : 0;
+ } else if (excludedNode instanceof DFSTopologyNodeImpl) {
+ availableCount -= ((DFSTopologyNodeImpl) excludedNode)
+ .getSubtreeStorageCount(type);
+ } else if (excludedNode instanceof DatanodeInfo) {
+ // find out the corresponding DatanodeDescriptor object, beacuse
+ // we need to get its storage type info.
+ // could be expensive operation, fortunately the size of excluded
+ // nodes set is supposed to be very small.
+ String nodeLocation = excludedNode.getNetworkLocation()
+ + "/" + excludedNode.getName();
+ DatanodeDescriptor dn = (DatanodeDescriptor)getNode(nodeLocation);
+ availableCount -= dn.hasStorageType(type)? 1 : 0;
+ } else {
+ LOG.error("Unexpected node type: {}.", excludedNode.getClass());
+ }
}
}
if (availableCount <= 0) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/97c2e576/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSTopologyNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSTopologyNodeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSTopologyNodeImpl.java
index 6d80db5..002f4fc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSTopologyNodeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSTopologyNodeImpl.java
@@ -18,11 +18,14 @@
package org.apache.hadoop.hdfs.net;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.net.InnerNode;
import org.apache.hadoop.net.InnerNodeImpl;
import org.apache.hadoop.net.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.EnumMap;
import java.util.EnumSet;
@@ -36,6 +39,9 @@ import java.util.HashMap;
*/
public class DFSTopologyNodeImpl extends InnerNodeImpl {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(DFSTopologyNodeImpl.class);
+
static final InnerNodeImpl.Factory FACTORY
= new DFSTopologyNodeImpl.Factory();
@@ -127,8 +133,68 @@ public class DFSTopologyNodeImpl extends InnerNodeImpl {
}
}
+ /**
+ * Called when add() is called to add a node that already exist.
+ *
+ * In normal execution, nodes are added only once and this should not happen.
+ * However if node restarts, we may run into the case where the same node
+ * tries to add itself again with potentially different storage type info.
+ * In this case this method will update the meta data according to the new
+ * storage info.
+ *
+ * Note that it is important to also update all the ancestors if we do have
+ * updated the local node storage info.
+ *
+ * @param dnDescriptor the node that is added another time, with potentially
+ * different storage types.
+ */
+ private void updateExistingDatanode(DatanodeDescriptor dnDescriptor) {
+ if (childrenStorageInfo.containsKey(dnDescriptor.getName())) {
+ // all existing node should have an entry in childrenStorageInfo
+ boolean same = dnDescriptor.getStorageTypes().size()
+ == childrenStorageInfo.get(dnDescriptor.getName()).keySet().size();
+ for (StorageType type :
+ childrenStorageInfo.get(dnDescriptor.getName()).keySet()) {
+ same = same && dnDescriptor.hasStorageType(type);
+ }
+ if (same) {
+ // if the storage type hasn't been changed, do nothing.
+ return;
+ }
+ // not same means we need to update the storage info.
+ DFSTopologyNodeImpl parent = (DFSTopologyNodeImpl)getParent();
+ for (StorageType type :
+ childrenStorageInfo.get(dnDescriptor.getName()).keySet()) {
+ if (!dnDescriptor.hasStorageType(type)) {
+ // remove this type, because the new storage info does not have it.
+ // also need to remove decrement the count for all the ancestors.
+ // since this is the parent of n, where n is a datanode,
+ // the map must have 1 as the value of all keys
+ childrenStorageInfo.get(dnDescriptor.getName()).remove(type);
+ decStorageTypeCount(type);
+ if (parent != null) {
+ parent.childRemoveStorage(getName(), type);
+ }
+ }
+ }
+ for (StorageType type : dnDescriptor.getStorageTypes()) {
+ if (!childrenStorageInfo.get(dnDescriptor.getName())
+ .containsKey(type)) {
+ // there is a new type in new storage info, add this locally,
+ // as well as all ancestors.
+ childrenStorageInfo.get(dnDescriptor.getName()).put(type, 1);
+ incStorageTypeCount(type);
+ if (parent != null) {
+ parent.childAddStorage(getName(), type);
+ }
+ }
+ }
+ }
+ }
+
@Override
public boolean add(Node n) {
+ LOG.debug("adding node {}", n.getName());
if (!isAncestor(n)) {
throw new IllegalArgumentException(n.getName()
+ ", which is located at " + n.getNetworkLocation()
@@ -149,6 +215,7 @@ public class DFSTopologyNodeImpl extends InnerNodeImpl {
for(int i=0; i<children.size(); i++) {
if (children.get(i).getName().equals(n.getName())) {
children.set(i, n);
+ updateExistingDatanode(dnDescriptor);
return false;
}
}
@@ -227,6 +294,7 @@ public class DFSTopologyNodeImpl extends InnerNodeImpl {
@Override
public boolean remove(Node n) {
+ LOG.debug("removing node {}", n.getName());
if (!isAncestor(n)) {
throw new IllegalArgumentException(n.getName()
+ ", which is located at " + n.getNetworkLocation()
@@ -299,4 +367,73 @@ public class DFSTopologyNodeImpl extends InnerNodeImpl {
return isRemoved;
}
}
+
+ /**
+ * Called by a child node of the current node to increment a storage count.
+ *
+ * lock is needed as different datanodes may call recursively to modify
+ * the same parent.
+ * TODO : this may not happen at all, depending on how heartheat is processed
+ * @param childName the name of the child that tries to add the storage type
+ * @param type the type being incremented.
+ */
+ public synchronized void childAddStorage(
+ String childName, StorageType type) {
+ LOG.debug("child add storage: {}:{}", childName, type);
+ // childrenStorageInfo should definitely contain this node already
+ // because updateStorage is called after node added
+ Preconditions.checkArgument(childrenStorageInfo.containsKey(childName));
+ EnumMap<StorageType, Integer> typeCount =
+ childrenStorageInfo.get(childName);
+ if (typeCount.containsKey(type)) {
+ typeCount.put(type, typeCount.get(type) + 1);
+ } else {
+ // Please be aware that, the counts are always "number of datanodes in
+ // this subtree" rather than "number of storages in this storage".
+ // so if the caller is a datanode, it should always be this branch rather
+ // than the +1 branch above. This depends on the caller in
+ // DatanodeDescriptor to make sure only when a *new* storage type is added
+ // it calls this. (should not call this when a already existing storage
+ // is added).
+ // but no such restriction for inner nodes.
+ typeCount.put(type, 1);
+ }
+ if (storageTypeCounts.containsKey(type)) {
+ storageTypeCounts.put(type, storageTypeCounts.get(type) + 1);
+ } else {
+ storageTypeCounts.put(type, 1);
+ }
+ if (getParent() != null) {
+ ((DFSTopologyNodeImpl)getParent()).childAddStorage(getName(), type);
+ }
+ }
+
+ /**
+ * Called by a child node of the current node to decrement a storage count.
+ *
+ * @param childName the name of the child removing a storage type.
+ * @param type the type being removed.
+ */
+ public synchronized void childRemoveStorage(
+ String childName, StorageType type) {
+ LOG.debug("child remove storage: {}:{}", childName, type);
+ Preconditions.checkArgument(childrenStorageInfo.containsKey(childName));
+ EnumMap<StorageType, Integer> typeCount =
+ childrenStorageInfo.get(childName);
+ Preconditions.checkArgument(typeCount.containsKey(type));
+ if (typeCount.get(type) > 1) {
+ typeCount.put(type, typeCount.get(type) - 1);
+ } else {
+ typeCount.remove(type);
+ }
+ Preconditions.checkArgument(storageTypeCounts.containsKey(type));
+ if (storageTypeCounts.get(type) > 1) {
+ storageTypeCounts.put(type, storageTypeCounts.get(type) - 1);
+ } else {
+ storageTypeCounts.remove(type);
+ }
+ if (getParent() != null) {
+ ((DFSTopologyNodeImpl)getParent()).childRemoveStorage(getName(), type);
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/97c2e576/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
index 7676334..a245f0c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.AddBlockFlag;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.net.DFSNetworkTopology;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
@@ -713,7 +714,22 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
boolean badTarget = false;
DatanodeStorageInfo firstChosen = null;
while (numOfReplicas > 0) {
- DatanodeDescriptor chosenNode = chooseDataNode(scope, excludedNodes);
+ // the storage type that current node has
+ StorageType includeType = null;
+ DatanodeDescriptor chosenNode = null;
+ if (clusterMap instanceof DFSNetworkTopology) {
+ for (StorageType type : storageTypes.keySet()) {
+ chosenNode = chooseDataNode(scope, excludedNodes, type);
+
+ if (chosenNode != null) {
+ includeType = type;
+ break;
+ }
+ }
+ } else {
+ chosenNode = chooseDataNode(scope, excludedNodes);
+ }
+
if (chosenNode == null) {
break;
}
@@ -729,6 +745,13 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes
.entrySet().iterator(); iter.hasNext();) {
Map.Entry<StorageType, Integer> entry = iter.next();
+
+ // If there is one storage type the node has already contained,
+ // then no need to loop through other storage type.
+ if (includeType != null && entry.getKey() != includeType) {
+ continue;
+ }
+
storage = chooseStorage4Block(
chosenNode, blocksize, results, entry.getKey());
if (storage != null) {
@@ -782,6 +805,17 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
}
/**
+ * Choose a datanode from the given <i>scope</i> with specified
+ * storage type.
+ * @return the chosen node, if there is any.
+ */
+ protected DatanodeDescriptor chooseDataNode(final String scope,
+ final Collection<Node> excludedNodes, StorageType type) {
+ return (DatanodeDescriptor) ((DFSNetworkTopology) clusterMap)
+ .chooseRandomWithStorageTypeTwoTrial(scope, excludedNodes, type);
+ }
+
+ /**
* Choose a good storage of given storage type from datanode, and add it to
* the result list.
*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/97c2e576/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index d0583b3..4b87fd4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -35,6 +35,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.net.DFSTopologyNodeImpl;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -494,7 +495,16 @@ public class DatanodeDescriptor extends DatanodeInfo {
// blocks.
for (final DatanodeStorageInfo storageInfo : excessStorages.values()) {
if (storageInfo.numBlocks() == 0) {
- storageMap.remove(storageInfo.getStorageID());
+ DatanodeStorageInfo info =
+ storageMap.remove(storageInfo.getStorageID());
+ if (!hasStorageType(info.getStorageType())) {
+ // we removed a storage, and as result there is no more such storage
+ // type, inform the parent about this.
+ if (getParent() instanceof DFSTopologyNodeImpl) {
+ ((DFSTopologyNodeImpl) getParent()).childRemoveStorage(getName(),
+ info.getStorageType());
+ }
+ }
LOG.info("Removed storage {} from DataNode {}", storageInfo, this);
} else {
// This can occur until all block reports are received.
@@ -911,9 +921,20 @@ public class DatanodeDescriptor extends DatanodeInfo {
DatanodeStorageInfo updateStorage(DatanodeStorage s) {
synchronized (storageMap) {
DatanodeStorageInfo storage = storageMap.get(s.getStorageID());
+ DFSTopologyNodeImpl parent = null;
+ if (getParent() instanceof DFSTopologyNodeImpl) {
+ parent = (DFSTopologyNodeImpl) getParent();
+ }
+
if (storage == null) {
LOG.info("Adding new storage ID {} for DN {}", s.getStorageID(),
getXferAddr());
+ StorageType type = s.getStorageType();
+ if (!hasStorageType(type) && parent != null) {
+ // we are about to add a type this node currently does not have,
+ // inform the parent that a new type is added to this datanode
+ parent.childAddStorage(getName(), s.getStorageType());
+ }
storage = new DatanodeStorageInfo(this, s);
storageMap.put(s.getStorageID(), storage);
} else if (storage.getState() != s.getState() ||
@@ -921,8 +942,21 @@ public class DatanodeDescriptor extends DatanodeInfo {
// For backwards compatibility, make sure that the type and
// state are updated. Some reports from older datanodes do
// not include these fields so we may have assumed defaults.
+ StorageType oldType = storage.getStorageType();
+ StorageType newType = s.getStorageType();
+ if (oldType != newType && !hasStorageType(newType) && parent != null) {
+ // we are about to add a type this node currently does not have
+ // inform the parent that a new type is added to this datanode
+ // if old == new, nothing's changed. don't bother
+ parent.childAddStorage(getName(), newType);
+ }
storage.updateFromStorage(s);
storageMap.put(storage.getStorageID(), storage);
+ if (oldType != newType && !hasStorageType(oldType) && parent != null) {
+ // there is no more old type storage on this datanode, inform parent
+ // about this change.
+ parent.childRemoveStorage(getName(), oldType);
+ }
}
return storage;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/97c2e576/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index a61aa78..7dcc9fd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.net.DFSNetworkTopology;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -186,6 +187,11 @@ public class DatanodeManager {
*/
private final boolean dataNodeDiskStatsEnabled;
+ /**
+ * If we use DfsNetworkTopology to choose nodes for placing replicas.
+ */
+ private final boolean useDfsNetworkTopology;
+
@Nullable
private final SlowPeerTracker slowPeerTracker;
@Nullable
@@ -205,8 +211,17 @@ public class DatanodeManager {
final Configuration conf) throws IOException {
this.namesystem = namesystem;
this.blockManager = blockManager;
-
- networktopology = NetworkTopology.getInstance(conf);
+
+ // TODO: Enables DFSNetworkTopology by default after more stress
+ // testings/validations.
+ this.useDfsNetworkTopology = conf.getBoolean(
+ DFSConfigKeys.DFS_USE_DFS_NETWORK_TOPOLOGY_KEY,
+ DFSConfigKeys.DFS_USE_DFS_NETWORK_TOPOLOGY_DEFAULT);
+ if (useDfsNetworkTopology) {
+ networktopology = DFSNetworkTopology.getInstance(conf);
+ } else {
+ networktopology = NetworkTopology.getInstance(conf);
+ }
this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);
this.decomManager = new DecommissionManager(namesystem, blockManager,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/97c2e576/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 0f33b70..f0f2220 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4505,4 +4505,12 @@
</description>
</property>
+ <property>
+ <name>dfs.use.dfs.network.topology</name>
+ <value>false</value>
+ <description>
+ Enables DFSNetworkTopology to choose nodes for placing replicas.
+ </description>
+ </property>
+
</configuration>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/97c2e576/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java
index 0931ff4..eab1199 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.net.DFSNetworkTopology;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -98,6 +99,51 @@ public class TestDefaultBlockPlacementPolicy {
}
/**
+ * Verify local node selection with using DFSNetworkTopology.
+ */
+ @Test
+ public void testPlacementWithDFSNetworkTopology() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ final String[] racks = {"/RACK0", "/RACK0", "/RACK2", "/RACK3", "/RACK2"};
+ final String[] hosts = {"/host0", "/host1", "/host2", "/host3", "/host4"};
+
+ // enables DFSNetworkTopology
+ conf.setBoolean(DFSConfigKeys.DFS_USE_DFS_NETWORK_TOPOLOGY_KEY, true);
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
+ conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
+ DEFAULT_BLOCK_SIZE / 2);
+
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(5).racks(racks)
+ .hosts(hosts).build();
+ cluster.waitActive();
+ nameNodeRpc = cluster.getNameNodeRpc();
+ namesystem = cluster.getNamesystem();
+
+ DatanodeManager dm = namesystem.getBlockManager().getDatanodeManager();
+ assertTrue(dm.getNetworkTopology() instanceof DFSNetworkTopology);
+
+ String clientMachine = "/host3";
+ String clientRack = "/RACK3";
+ String src = "/test";
+ // Create the file with client machine
+ HdfsFileStatus fileStatus = namesystem.startFile(src, perm, clientMachine,
+ clientMachine, EnumSet.of(CreateFlag.CREATE), true, REPLICATION_FACTOR,
+ DEFAULT_BLOCK_SIZE, null, null, false);
+ LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, null,
+ null, fileStatus.getFileId(), null, null);
+
+ assertEquals("Block should be allocated sufficient locations",
+ REPLICATION_FACTOR, locatedBlock.getLocations().length);
+ assertEquals("First datanode should be rack local", clientRack,
+ locatedBlock.getLocations()[0].getNetworkLocation());
+ nameNodeRpc.abandonBlock(locatedBlock.getBlock(), fileStatus.getFileId(),
+ src, clientMachine);
+ }
+
+ /**
* Verify decommissioned nodes should not be selected.
*/
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org