You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yq...@apache.org on 2017/05/05 03:56:26 UTC

hadoop git commit: HDFS-11530. Use HDFS specific network topology to choose datanode in BlockPlacementPolicyDefault. Contributed by Yiqun Lin and Chen Liang.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 3082552b3 -> 97c2e576c


HDFS-11530. Use HDFS specific network topology to choose datanode in BlockPlacementPolicyDefault. Contributed by Yiqun Lin and Chen Liang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/97c2e576
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/97c2e576
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/97c2e576

Branch: refs/heads/trunk
Commit: 97c2e576c91c2316c2b52bfc948bae9bff8ca49f
Parents: 3082552
Author: Yiqun Lin <yq...@apache.org>
Authored: Fri May 5 11:54:50 2017 +0800
Committer: Yiqun Lin <yq...@apache.org>
Committed: Fri May 5 11:54:50 2017 +0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   4 +
 .../hadoop/hdfs/net/DFSNetworkTopology.java     |  24 +++-
 .../hadoop/hdfs/net/DFSTopologyNodeImpl.java    | 137 +++++++++++++++++++
 .../BlockPlacementPolicyDefault.java            |  36 ++++-
 .../blockmanagement/DatanodeDescriptor.java     |  36 ++++-
 .../server/blockmanagement/DatanodeManager.java |  19 ++-
 .../src/main/resources/hdfs-default.xml         |   8 ++
 .../TestDefaultBlockPlacementPolicy.java        |  46 +++++++
 8 files changed, 302 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/97c2e576/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 0ca344c..b95c7e6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -1085,6 +1085,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       "httpfs.buffer.size";
   public static final int HTTP_BUFFER_SIZE_DEFAULT = 4096;
 
+  public static final String DFS_USE_DFS_NETWORK_TOPOLOGY_KEY =
+      "dfs.use.dfs.network.topology";
+  public static final boolean DFS_USE_DFS_NETWORK_TOPOLOGY_DEFAULT = false;
+
   // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry 
   @Deprecated
   public static final String  DFS_CLIENT_RETRY_POLICY_ENABLED_KEY

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97c2e576/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSNetworkTopology.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSNetworkTopology.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSNetworkTopology.java
index 259e275..e74cdec 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSNetworkTopology.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSNetworkTopology.java
@@ -19,8 +19,10 @@ package org.apache.hadoop.hdfs.net;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
@@ -204,10 +206,24 @@ public class DFSNetworkTopology extends NetworkTopology {
     }
     if (excludedNodes != null) {
       for (Node excludedNode : excludedNodes) {
-        // all excluded nodes should be DatanodeDescriptor
-        Preconditions.checkArgument(excludedNode instanceof DatanodeDescriptor);
-        availableCount -= ((DatanodeDescriptor) excludedNode)
-            .hasStorageType(type) ? 1 : 0;
+        if (excludedNode instanceof DatanodeDescriptor) {
+          availableCount -= ((DatanodeDescriptor) excludedNode)
+              .hasStorageType(type) ? 1 : 0;
+        } else if (excludedNode instanceof DFSTopologyNodeImpl) {
+          availableCount -= ((DFSTopologyNodeImpl) excludedNode)
+              .getSubtreeStorageCount(type);
+        } else if (excludedNode instanceof DatanodeInfo) {
+          // find out the corresponding DatanodeDescriptor object, beacuse
+          // we need to get its storage type info.
+          // could be expensive operation, fortunately the size of excluded
+          // nodes set is supposed to be very small.
+          String nodeLocation = excludedNode.getNetworkLocation()
+              + "/" + excludedNode.getName();
+          DatanodeDescriptor dn = (DatanodeDescriptor)getNode(nodeLocation);
+          availableCount -= dn.hasStorageType(type)? 1 : 0;
+        } else {
+          LOG.error("Unexpected node type: {}.", excludedNode.getClass());
+        }
       }
     }
     if (availableCount <= 0) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97c2e576/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSTopologyNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSTopologyNodeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSTopologyNodeImpl.java
index 6d80db5..002f4fc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSTopologyNodeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSTopologyNodeImpl.java
@@ -18,11 +18,14 @@
 package org.apache.hadoop.hdfs.net;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.net.InnerNode;
 import org.apache.hadoop.net.InnerNodeImpl;
 import org.apache.hadoop.net.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.EnumMap;
 import java.util.EnumSet;
@@ -36,6 +39,9 @@ import java.util.HashMap;
  */
 public class DFSTopologyNodeImpl extends InnerNodeImpl {
 
+  public static final Logger LOG =
+      LoggerFactory.getLogger(DFSTopologyNodeImpl.class);
+
   static final InnerNodeImpl.Factory FACTORY
       = new DFSTopologyNodeImpl.Factory();
 
@@ -127,8 +133,68 @@ public class DFSTopologyNodeImpl extends InnerNodeImpl {
     }
   }
 
+  /**
+   * Called when add() is called to add a node that already exist.
+   *
+   * In normal execution, nodes are added only once and this should not happen.
+   * However if node restarts, we may run into the case where the same node
+   * tries to add itself again with potentially different storage type info.
+   * In this case this method will update the meta data according to the new
+   * storage info.
+   *
+   * Note that it is important to also update all the ancestors if we do have
+   * updated the local node storage info.
+   *
+   * @param dnDescriptor the node that is added another time, with potentially
+   *                     different storage types.
+   */
+  private void updateExistingDatanode(DatanodeDescriptor dnDescriptor) {
+    if (childrenStorageInfo.containsKey(dnDescriptor.getName())) {
+      // all existing node should have an entry in childrenStorageInfo
+      boolean same = dnDescriptor.getStorageTypes().size()
+          == childrenStorageInfo.get(dnDescriptor.getName()).keySet().size();
+      for (StorageType type :
+          childrenStorageInfo.get(dnDescriptor.getName()).keySet()) {
+        same = same && dnDescriptor.hasStorageType(type);
+      }
+      if (same) {
+        // if the storage type hasn't been changed, do nothing.
+        return;
+      }
+      // not same means we need to update the storage info.
+      DFSTopologyNodeImpl parent = (DFSTopologyNodeImpl)getParent();
+      for (StorageType type :
+          childrenStorageInfo.get(dnDescriptor.getName()).keySet()) {
+        if (!dnDescriptor.hasStorageType(type)) {
+          // remove this type, because the new storage info does not have it.
+          // also need to remove decrement the count for all the ancestors.
+          // since this is the parent of n, where n is a datanode,
+          // the map must have 1 as the value of all keys
+          childrenStorageInfo.get(dnDescriptor.getName()).remove(type);
+          decStorageTypeCount(type);
+          if (parent != null) {
+            parent.childRemoveStorage(getName(), type);
+          }
+        }
+      }
+      for (StorageType type : dnDescriptor.getStorageTypes()) {
+        if (!childrenStorageInfo.get(dnDescriptor.getName())
+            .containsKey(type)) {
+          // there is a new type in new storage info, add this locally,
+          // as well as all ancestors.
+          childrenStorageInfo.get(dnDescriptor.getName()).put(type, 1);
+          incStorageTypeCount(type);
+          if (parent != null) {
+            parent.childAddStorage(getName(), type);
+          }
+        }
+      }
+    }
+  }
+
   @Override
   public boolean add(Node n) {
+    LOG.debug("adding node {}", n.getName());
     if (!isAncestor(n)) {
       throw new IllegalArgumentException(n.getName()
           + ", which is located at " + n.getNetworkLocation()
@@ -149,6 +215,7 @@ public class DFSTopologyNodeImpl extends InnerNodeImpl {
         for(int i=0; i<children.size(); i++) {
           if (children.get(i).getName().equals(n.getName())) {
             children.set(i, n);
+            updateExistingDatanode(dnDescriptor);
             return false;
           }
         }
@@ -227,6 +294,7 @@ public class DFSTopologyNodeImpl extends InnerNodeImpl {
 
   @Override
   public boolean remove(Node n) {
+    LOG.debug("removing node {}", n.getName());
     if (!isAncestor(n)) {
       throw new IllegalArgumentException(n.getName()
           + ", which is located at " + n.getNetworkLocation()
@@ -299,4 +367,73 @@ public class DFSTopologyNodeImpl extends InnerNodeImpl {
       return isRemoved;
     }
   }
+
+  /**
+   * Called by a child node of the current node to increment a storage count.
+   *
+   * lock is needed as different datanodes may call recursively to modify
+   * the same parent.
+   * TODO : this may not happen at all, depending on how heartheat is processed
+   * @param childName the name of the child that tries to add the storage type
+   * @param type the type being incremented.
+   */
+  public synchronized void childAddStorage(
+      String childName, StorageType type) {
+    LOG.debug("child add storage: {}:{}", childName, type);
+    // childrenStorageInfo should definitely contain this node already
+    // because updateStorage is called after node added
+    Preconditions.checkArgument(childrenStorageInfo.containsKey(childName));
+    EnumMap<StorageType, Integer> typeCount =
+        childrenStorageInfo.get(childName);
+    if (typeCount.containsKey(type)) {
+      typeCount.put(type, typeCount.get(type) + 1);
+    } else {
+      // Please be aware that, the counts are always "number of datanodes in
+      // this subtree" rather than "number of storages in this storage".
+      // so if the caller is a datanode, it should always be this branch rather
+      // than the +1 branch above. This depends on the caller in
+      // DatanodeDescriptor to make sure only when a *new* storage type is added
+      // it calls this. (should not call this when a already existing storage
+      // is added).
+      // but no such restriction for inner nodes.
+      typeCount.put(type, 1);
+    }
+    if (storageTypeCounts.containsKey(type)) {
+      storageTypeCounts.put(type, storageTypeCounts.get(type) + 1);
+    } else {
+      storageTypeCounts.put(type, 1);
+    }
+    if (getParent() != null) {
+      ((DFSTopologyNodeImpl)getParent()).childAddStorage(getName(), type);
+    }
+  }
+
+  /**
+   * Called by a child node of the current node to decrement a storage count.
+   *
+   * @param childName the name of the child removing a storage type.
+   * @param type the type being removed.
+   */
+  public synchronized void childRemoveStorage(
+      String childName, StorageType type) {
+    LOG.debug("child remove storage: {}:{}", childName, type);
+    Preconditions.checkArgument(childrenStorageInfo.containsKey(childName));
+    EnumMap<StorageType, Integer> typeCount =
+        childrenStorageInfo.get(childName);
+    Preconditions.checkArgument(typeCount.containsKey(type));
+    if (typeCount.get(type) > 1) {
+      typeCount.put(type, typeCount.get(type) - 1);
+    } else {
+      typeCount.remove(type);
+    }
+    Preconditions.checkArgument(storageTypeCounts.containsKey(type));
+    if (storageTypeCounts.get(type) > 1) {
+      storageTypeCounts.put(type, storageTypeCounts.get(type) - 1);
+    } else {
+      storageTypeCounts.remove(type);
+    }
+    if (getParent() != null) {
+      ((DFSTopologyNodeImpl)getParent()).childRemoveStorage(getName(), type);
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97c2e576/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
index 7676334..a245f0c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.AddBlockFlag;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.net.DFSNetworkTopology;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
@@ -713,7 +714,22 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
     boolean badTarget = false;
     DatanodeStorageInfo firstChosen = null;
     while (numOfReplicas > 0) {
-      DatanodeDescriptor chosenNode = chooseDataNode(scope, excludedNodes);
+      // the storage type that current node has
+      StorageType includeType = null;
+      DatanodeDescriptor chosenNode = null;
+      if (clusterMap instanceof DFSNetworkTopology) {
+        for (StorageType type : storageTypes.keySet()) {
+          chosenNode = chooseDataNode(scope, excludedNodes, type);
+
+          if (chosenNode != null) {
+            includeType = type;
+            break;
+          }
+        }
+      } else {
+        chosenNode = chooseDataNode(scope, excludedNodes);
+      }
+
       if (chosenNode == null) {
         break;
       }
@@ -729,6 +745,13 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
         for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes
             .entrySet().iterator(); iter.hasNext();) {
           Map.Entry<StorageType, Integer> entry = iter.next();
+
+          // If there is one storage type the node has already contained,
+          // then no need to loop through other storage type.
+          if (includeType != null && entry.getKey() != includeType) {
+            continue;
+          }
+
           storage = chooseStorage4Block(
               chosenNode, blocksize, results, entry.getKey());
           if (storage != null) {
@@ -782,6 +805,17 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
   }
 
   /**
+   * Choose a datanode from the given <i>scope</i> with specified
+   * storage type.
+   * @return the chosen node, if there is any.
+   */
+  protected DatanodeDescriptor chooseDataNode(final String scope,
+      final Collection<Node> excludedNodes, StorageType type) {
+    return (DatanodeDescriptor) ((DFSNetworkTopology) clusterMap)
+        .chooseRandomWithStorageTypeTwoTrial(scope, excludedNodes, type);
+  }
+
+  /**
    * Choose a good storage of given storage type from datanode, and add it to
    * the result list.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97c2e576/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index d0583b3..4b87fd4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -35,6 +35,7 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.net.DFSTopologyNodeImpl;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -494,7 +495,16 @@ public class DatanodeDescriptor extends DatanodeInfo {
       // blocks.
       for (final DatanodeStorageInfo storageInfo : excessStorages.values()) {
         if (storageInfo.numBlocks() == 0) {
-          storageMap.remove(storageInfo.getStorageID());
+          DatanodeStorageInfo info =
+              storageMap.remove(storageInfo.getStorageID());
+          if (!hasStorageType(info.getStorageType())) {
+            // we removed a storage, and as result there is no more such storage
+            // type, inform the parent about this.
+            if (getParent() instanceof DFSTopologyNodeImpl) {
+              ((DFSTopologyNodeImpl) getParent()).childRemoveStorage(getName(),
+                  info.getStorageType());
+            }
+          }
           LOG.info("Removed storage {} from DataNode {}", storageInfo, this);
         } else {
           // This can occur until all block reports are received.
@@ -911,9 +921,20 @@ public class DatanodeDescriptor extends DatanodeInfo {
   DatanodeStorageInfo updateStorage(DatanodeStorage s) {
     synchronized (storageMap) {
       DatanodeStorageInfo storage = storageMap.get(s.getStorageID());
+      DFSTopologyNodeImpl parent = null;
+      if (getParent() instanceof DFSTopologyNodeImpl) {
+        parent = (DFSTopologyNodeImpl) getParent();
+      }
+
       if (storage == null) {
         LOG.info("Adding new storage ID {} for DN {}", s.getStorageID(),
             getXferAddr());
+        StorageType type = s.getStorageType();
+        if (!hasStorageType(type) && parent != null) {
+          // we are about to add a type this node currently does not have,
+          // inform the parent that a new type is added to this datanode
+          parent.childAddStorage(getName(), s.getStorageType());
+        }
         storage = new DatanodeStorageInfo(this, s);
         storageMap.put(s.getStorageID(), storage);
       } else if (storage.getState() != s.getState() ||
@@ -921,8 +942,21 @@ public class DatanodeDescriptor extends DatanodeInfo {
         // For backwards compatibility, make sure that the type and
         // state are updated. Some reports from older datanodes do
         // not include these fields so we may have assumed defaults.
+        StorageType oldType = storage.getStorageType();
+        StorageType newType = s.getStorageType();
+        if (oldType != newType && !hasStorageType(newType) && parent != null) {
+          // we are about to add a type this node currently does not have
+          // inform the parent that a new type is added to this datanode
+          // if old == new, nothing's changed. don't bother
+          parent.childAddStorage(getName(), newType);
+        }
         storage.updateFromStorage(s);
         storageMap.put(storage.getStorageID(), storage);
+        if (oldType != newType && !hasStorageType(oldType) && parent != null) {
+          // there is no more old type storage on this datanode, inform parent
+          // about this change.
+          parent.childRemoveStorage(getName(), oldType);
+        }
       }
       return storage;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97c2e576/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index a61aa78..7dcc9fd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.net.DFSNetworkTopology;
 import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -186,6 +187,11 @@ public class DatanodeManager {
    */
   private final boolean dataNodeDiskStatsEnabled;
 
+  /**
+   * If we use DfsNetworkTopology to choose nodes for placing replicas.
+   */
+  private final boolean useDfsNetworkTopology;
+
   @Nullable
   private final SlowPeerTracker slowPeerTracker;
   @Nullable
@@ -205,8 +211,17 @@ public class DatanodeManager {
       final Configuration conf) throws IOException {
     this.namesystem = namesystem;
     this.blockManager = blockManager;
-     
-    networktopology = NetworkTopology.getInstance(conf);
+
+    // TODO: Enables DFSNetworkTopology by default after more stress
+    // testings/validations.
+    this.useDfsNetworkTopology = conf.getBoolean(
+        DFSConfigKeys.DFS_USE_DFS_NETWORK_TOPOLOGY_KEY,
+        DFSConfigKeys.DFS_USE_DFS_NETWORK_TOPOLOGY_DEFAULT);
+    if (useDfsNetworkTopology) {
+      networktopology = DFSNetworkTopology.getInstance(conf);
+    } else {
+      networktopology = NetworkTopology.getInstance(conf);
+    }
 
     this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);
     this.decomManager = new DecommissionManager(namesystem, blockManager,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97c2e576/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 0f33b70..f0f2220 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4505,4 +4505,12 @@
     </description>
   </property>
 
+  <property>
+    <name>dfs.use.dfs.network.topology</name>
+    <value>false</value>
+    <description>
+      Enables DFSNetworkTopology to choose nodes for placing replicas.
+    </description>
+  </property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97c2e576/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java
index 0931ff4..eab1199 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.net.DFSNetworkTopology;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -98,6 +99,51 @@ public class TestDefaultBlockPlacementPolicy {
   }
 
   /**
+   * Verify local node selection with using DFSNetworkTopology.
+   */
+  @Test
+  public void testPlacementWithDFSNetworkTopology() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    final String[] racks = {"/RACK0", "/RACK0", "/RACK2", "/RACK3", "/RACK2"};
+    final String[] hosts = {"/host0", "/host1", "/host2", "/host3", "/host4"};
+
+    // enables DFSNetworkTopology
+    conf.setBoolean(DFSConfigKeys.DFS_USE_DFS_NETWORK_TOPOLOGY_KEY, true);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
+    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
+        DEFAULT_BLOCK_SIZE / 2);
+
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(5).racks(racks)
+        .hosts(hosts).build();
+    cluster.waitActive();
+    nameNodeRpc = cluster.getNameNodeRpc();
+    namesystem = cluster.getNamesystem();
+
+    DatanodeManager dm = namesystem.getBlockManager().getDatanodeManager();
+    assertTrue(dm.getNetworkTopology() instanceof DFSNetworkTopology);
+
+    String clientMachine = "/host3";
+    String clientRack = "/RACK3";
+    String src = "/test";
+    // Create the file with client machine
+    HdfsFileStatus fileStatus = namesystem.startFile(src, perm, clientMachine,
+        clientMachine, EnumSet.of(CreateFlag.CREATE), true, REPLICATION_FACTOR,
+        DEFAULT_BLOCK_SIZE, null, null, false);
+    LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, null,
+        null, fileStatus.getFileId(), null, null);
+
+    assertEquals("Block should be allocated sufficient locations",
+        REPLICATION_FACTOR, locatedBlock.getLocations().length);
+    assertEquals("First datanode should be rack local", clientRack,
+        locatedBlock.getLocations()[0].getNetworkLocation());
+    nameNodeRpc.abandonBlock(locatedBlock.getBlock(), fileStatus.getFileId(),
+        src, clientMachine);
+  }
+
+  /**
    * Verify decommissioned nodes should not be selected.
    */
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org