You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2012/11/13 22:23:10 UTC

svn commit: r1408968 - in /hadoop/common/branches/branch-1: ./ src/core/ src/core/org/apache/hadoop/net/ src/hdfs/org/apache/hadoop/hdfs/protocol/ src/hdfs/org/apache/hadoop/hdfs/server/namenode/ src/test/org/apache/hadoop/net/

Author: szetszwo
Date: Tue Nov 13 21:23:09 2012
New Revision: 1408968

URL: http://svn.apache.org/viewvc?rev=1408968&view=rev
Log:
HADOOP-8820. Backport HADOOP-8469 and HADOOP-8470: Make NetworkTopology class pluggable and add NetworkTopologyWithNodeGroup, a 4-layer implementation of NetworkTopology.  Contributed by Junping Du and Jing Zhao

Added:
    hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/net/TestNetworkTopologyWithNodeGroup.java
Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    hadoop/common/branches/branch-1/src/core/core-default.xml
    hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java
    hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
    hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1408968&r1=1408967&r2=1408968&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Tue Nov 13 21:23:09 2012
@@ -32,6 +32,10 @@ Release 1.2.0 - unreleased
     HADOOP-8988. Allow configuration of authorization for JmxJsonServlet and 
     MetricsServlet. (tucu, Jing Zhao via suresh)
 
+    HADOOP-8820. Backport HADOOP-8469 and HADOOP-8470: Make NetworkTopology
+    class pluggable and add NetworkTopologyWithNodeGroup, a 4-layer
+    implementation of NetworkTopology.  (Junping Du and Jing Zhao via szetszwo)
+
   IMPROVEMENTS
 
     HDFS-3515. Port HDFS-1457 to branch-1. (eli)

Modified: hadoop/common/branches/branch-1/src/core/core-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/core/core-default.xml?rev=1408968&r1=1408967&r2=1408968&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/core/core-default.xml (original)
+++ hadoop/common/branches/branch-1/src/core/core-default.xml Tue Nov 13 21:23:09 2012
@@ -466,7 +466,8 @@
   </description>
 </property>
 
-<!-- Rack Configuration -->
+
+<!-- Topology Configuration -->
 
 <property>
   <name>topology.node.switch.mapping.impl</name>
@@ -478,6 +479,13 @@
   </description>
 </property>
 
+ <property>
+  <name>net.topology.impl</name>
+  <value>org.apache.hadoop.net.NetworkTopology</value>
+  <description> The default implementation of NetworkTopology which is classic three layer one.
+  </description>
+</property>
+
 <property>
   <name>topology.script.file.name</name>
   <value></value>

Modified: hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java?rev=1408968&r1=1408967&r2=1408968&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java (original)
+++ hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java Tue Nov 13 21:23:09 2012
@@ -52,8 +52,8 @@ public class NetworkTopology {
   /* Inner Node represent a switch/router of a data center or rack.
    * Different from a leave node, it has non-null children.
    */
-  private class InnerNode extends NodeBase {
-    private ArrayList<Node> children=new ArrayList<Node>();
+  static class InnerNode extends NodeBase {
+    protected List<Node> children=new ArrayList<Node>();
     private int numOfLeaves;
         
     /** Construct an InnerNode from a path-like string */
@@ -73,7 +73,7 @@ public class NetworkTopology {
     }
         
     /** Get its children */
-    Collection<Node> getChildren() {return children;}
+    List<Node> getChildren() {return children;}
         
     /** Return the number of children this node has */
     int getNumOfChildren() {
@@ -166,8 +166,7 @@ public class NetworkTopology {
         }
         if (parentNode == null) {
           // create a new InnerNode
-          parentNode = new InnerNode(parentName, getPath(this),
-                                     this, this.getLevel()+1);
+          parentNode = createParentNode(parentName);
           children.add(parentNode);
         }
         // add n to the subtree of the next ancestor node
@@ -179,7 +178,23 @@ public class NetworkTopology {
         }
       }
     }
-        
+
+    /**
+     * Creates a parent node to be added to the list of children.  
+     * Creates a node using the InnerNode four argument constructor specifying 
+     * the name, location, parent, and level of this node.
+     * 
+     * <p>To be overridden in subclasses for specific InnerNode implementations,
+     * as alternative to overriding the full {@link #add(Node)} method.
+     * 
+     * @param parentName The name of the parent node
+     * @return A new inner node
+     * @see InnerNode#InnerNode(String, String, InnerNode, int)
+     */
+    protected InnerNode createParentNode(String parentName) {
+      return new InnerNode(parentName, getPath(this), this, this.getLevel()+1);
+    }
+
     /** Remove node <i>n</i> from the subtree of this node
      * @param n node to be deleted 
      * @return true if the node is deleted; false otherwise
@@ -251,7 +266,7 @@ public class NetworkTopology {
         
     /** get <i>leafIndex</i> leaf of this subtree 
      * if it is not in the <i>excludedNode</i>*/
-    private Node getLeaf(int leafIndex, Node excludedNode) {
+    protected Node getLeaf(int leafIndex, Node excludedNode) {
       int count=0;
       // check if the excluded node a leaf
       boolean isLeaf =
@@ -259,7 +274,7 @@ public class NetworkTopology {
       // calculate the total number of excluded leaf nodes
       int numOfExcludedLeaves =
         isLeaf ? 1 : ((InnerNode)excludedNode).getNumOfLeaves();
-      if (isRack()) { // children are leaves
+      if (isLeafParent()) { // children are leaves
         if (isLeaf) { // excluded node is a leaf node
           int excludedIndex = children.indexOf(excludedNode);
           if (excludedIndex != -1 && leafIndex >= 0) {
@@ -296,22 +311,37 @@ public class NetworkTopology {
         return null;
       }
     }
-        
+
+    protected boolean isLeafParent() {
+      return isRack();
+    }
+
+    /**
+     * Determine if children a leaves, default implementation calls {@link #isRack()}
+     * <p>To be overridden in subclasses for specific InnerNode implementations,
+     * as alternative to overriding the full {@link #getLeaf(int, Node)} method.
+     * 
+     * @return true if children are leaves, false otherwise
+     */
+    protected boolean areChildrenLeaves() {
+      return isRack();
+    }
+
     int getNumOfLeaves() {
       return numOfLeaves;
     }
   } // end of InnerNode
     
-  InnerNode clusterMap = new InnerNode(InnerNode.ROOT); // the root
+  InnerNode clusterMap;
   /** Depth of all leaf nodes */
   private int depthOfAllLeaves = -1;
-  private int numOfRacks = 0;  // rack counter
-  private ReadWriteLock netlock;
+  protected int numOfRacks = 0;  // rack counter
+  protected ReadWriteLock netlock = new ReentrantReadWriteLock();
     
   public NetworkTopology() {
-    netlock = new ReentrantReadWriteLock();
+    clusterMap = new InnerNode(InnerNode.ROOT);
   }
-    
+
   /** Add a leaf node
    * Update node counter & rack counter if neccessary
    * @param node
@@ -328,7 +358,7 @@ public class NetworkTopology {
     }
     netlock.writeLock().lock();
     try {
-      Node rack = getNode(node.getNetworkLocation());
+      Node rack = getNodeForNetworkLocation(node);
       if (rack != null && !(rack instanceof InnerNode)) {
         throw new IllegalArgumentException("Unexpected data node " 
                                            + node.toString() 
@@ -358,7 +388,26 @@ public class NetworkTopology {
       netlock.writeLock().unlock();
     }
   }
-    
+
+  /**
+   * Return a reference to the node given its string representation.
+   * Default implementation delegates to {@link #getNode(String)}.
+   * 
+   * <p>To be overridden in subclasses for specific NetworkTopology 
+   * implementations, as alternative to overriding the full {@link #add(Node)}
+   *  method.
+   * 
+   * @param node The string representation of this node's network location is
+   * used to retrieve a Node object. 
+   * @return a reference to the node; null if the node is not in the tree
+   * 
+   * @see #add(Node)
+   * @see #getNode(String)
+   */
+  protected Node getNodeForNetworkLocation(Node node) {
+    return getNode(node.getNetworkLocation());
+  }
+  
   /** Remove a node
    * Update node counter & rack counter if neccessary
    * @param node
@@ -424,7 +473,21 @@ public class NetworkTopology {
       netlock.readLock().unlock();
     }
   }
-    
+  
+  /** Given a string representation of a rack for a specific network
+   *  location
+   * 
+   * To be overridden in subclasses for specific NetworkTopology 
+   * implementations, as alternative to overriding the full 
+   * {@link #getRack(String)} method.
+   * @param loc
+   *          a path-like string representation of a network location
+   * @return a rack string
+   */
+  public String getRack(String loc) {
+    return loc;
+  }
+
   /** Return the total number of racks */
   public int getNumOfRacks() {
     netlock.readLock().lock();
@@ -506,13 +569,44 @@ public class NetworkTopology {
       
     netlock.readLock().lock();
     try {
-      return node1.getParent()==node2.getParent();
+      return isSameParents(node1, node2);
     } finally {
       netlock.readLock().unlock();
     }
   }
+  
+  /**
+   * Check if network topology is aware of NodeGroup
+   */
+  public boolean isNodeGroupAware() {
+    return false;
+  }
+  
+  /** 
+   * Return false directly as not aware of NodeGroup, to be override in sub-class
+   */
+  public boolean isOnSameNodeGroup(Node node1, Node node2) {
+    return false;
+  }
+
+  /**
+   * Compare the parents of each node for equality
+   * 
+   * <p>To be overridden in subclasses for specific NetworkTopology 
+   * implementations, as alternative to overriding the full 
+   * {@link #isOnSameRack(Node, Node)} method.
+   * 
+   * @param node1 the first node to compare
+   * @param node2 the second node to compare
+   * @return true if their parents are equal, false otherwise
+   * 
+   * @see #isOnSameRack(Node, Node)
+   */
+  protected boolean isSameParents(Node node1, Node node2) {
+    return node1.getParent()==node2.getParent();
+  }
     
-  final private static Random r = new Random();
+  final protected static Random r = new Random();
   /** randomly choose one node from <i>scope</i>
    * if scope starts with ~, choose one from the all nodes except for the
    * ones in <i>scope</i>; otherwise, choose one from <i>scope</i>
@@ -560,7 +654,25 @@ public class NetworkTopology {
     int leaveIndex = r.nextInt(numOfDatanodes);
     return innerNode.getLeaf(leaveIndex, node);
   }
-       
+
+  /** return leaves in <i>scope</i>
+   * @param scope a path string
+   * @return leaves nodes under specific scope
+   */
+  public List<Node> getLeaves(String scope) {
+    Node node = getNode(scope);
+    List<Node> leafNodes = new ArrayList<Node>();
+    if (!(node instanceof InnerNode)) {
+      leafNodes.add(node);
+    } else {
+      InnerNode innerNode = (InnerNode) node;
+      for (int i = 0; i < innerNode.getNumOfLeaves(); i++) {
+        leafNodes.add(innerNode.getLeaf(i, null));
+      }
+    }
+    return leafNodes;
+  }
+  
   /** return the number of leaves in <i>scope</i> but not in <i>excludedNodes</i>
    * if scope starts with ~, return the number of nodes that are not
    * in <i>scope</i> and <i>excludedNodes</i>; 
@@ -622,7 +734,7 @@ public class NetworkTopology {
   }
 
   /* swap two array items */
-  static private void swap(Node[] nodes, int i, int j) {
+  static protected void swap(Node[] nodes, int i, int j) {
     Node tempNode;
     tempNode = nodes[j];
     nodes[j] = nodes[i];

Added: hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java?rev=1408968&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java (added)
+++ hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java Tue Nov 13 21:23:09 2012
@@ -0,0 +1,402 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.net;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * The class extends NetworkTopology to represents a cluster of computer with
+ *  a 4-layers hierarchical network topology.
+ * In this network topology, leaves represent data nodes (computers) and inner
+ * nodes represent switches/routers that manage traffic in/out of data centers,
+ * racks or physical host (with virtual switch).
+ * 
+ * @see NetworkTopology
+ */
+@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
+@InterfaceStability.Unstable
+public class NetworkTopologyWithNodeGroup extends NetworkTopology {
+
+  public final static String DEFAULT_NODEGROUP = "/default-nodegroup";
+
+  public NetworkTopologyWithNodeGroup() {
+    clusterMap = new InnerNodeWithNodeGroup(InnerNode.ROOT);
+  }
+
+  @Override
+  protected Node getNodeForNetworkLocation(Node node) {
+    // if node only with default rack info, here we need to add default
+    // nodegroup info
+    if (NetworkTopology.DEFAULT_RACK.equals(node.getNetworkLocation())) {
+      node.setNetworkLocation(node.getNetworkLocation()
+          + DEFAULT_NODEGROUP);
+    }
+    Node nodeGroup = getNode(node.getNetworkLocation());
+    if (nodeGroup == null) {
+      nodeGroup = new InnerNodeWithNodeGroup(node.getNetworkLocation());
+    }
+    return getNode(nodeGroup.getNetworkLocation());
+  }
+
+  @Override
+  public String getRack(String loc) {
+    netlock.readLock().lock();
+    try {
+      loc = InnerNode.normalize(loc);
+      Node locNode = getNode(loc);
+      if (locNode instanceof InnerNodeWithNodeGroup) {
+        InnerNodeWithNodeGroup node = (InnerNodeWithNodeGroup) locNode;
+        if (node.isRack()) {
+          return loc;
+        } else if (node.isNodeGroup()) {
+          return node.getNetworkLocation();
+        } else {
+          // may be a data center
+          return null;
+        }
+      } else {
+        // not in cluster map, don't handle it
+        return loc;
+      }
+    } finally {
+      netlock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Given a string representation of a node group for a specific network
+   * location
+   * 
+   * @param loc
+   *            a path-like string representation of a network location
+   * @return a node group string
+   */
+  public String getNodeGroup(String loc) {
+    netlock.readLock().lock();
+    try {
+      loc = InnerNode.normalize(loc);
+      Node locNode = getNode(loc);
+      if (locNode instanceof InnerNodeWithNodeGroup) {
+        InnerNodeWithNodeGroup node = (InnerNodeWithNodeGroup) locNode;
+        if (node.isNodeGroup()) {
+          return loc;
+        } else if (node.isRack()) {
+          // not sure the node group for a rack
+          return null;
+        } else {
+          // may be a leaf node
+          return getNodeGroup(node.getNetworkLocation());
+        }
+      } else {
+        // not in cluster map, don't handle it
+        return loc;
+      }
+    } finally {
+      netlock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public boolean isOnSameRack( Node node1,  Node node2) {
+    if (node1 == null || node2 == null || 
+        node1.getParent() == null || node2.getParent() == null) {
+      return false;
+    }
+    netlock.readLock().lock();
+    try {
+      return isSameParents(node1.getParent(), node2.getParent());
+    } finally {
+      netlock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Check if two nodes are on the same node group (hypervisor) The
+   * assumption here is: each nodes are leaf nodes.
+   * 
+   * @param node1
+   *            one node (can be null)
+   * @param node2
+   *            another node (can be null)
+   * @return true if node1 and node2 are on the same node group; false
+   *         otherwise
+   * @exception IllegalArgumentException
+   *                when either node1 or node2 is null, or node1 or node2 do
+   *                not belong to the cluster
+   */
+  @Override
+  public boolean isOnSameNodeGroup(Node node1, Node node2) {
+    if (node1 == null || node2 == null) {
+      return false;
+    }
+    netlock.readLock().lock();
+    try {
+      return isSameParents(node1, node2);
+    } finally {
+      netlock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Check if network topology is aware of NodeGroup
+   */
+  @Override
+  public boolean isNodeGroupAware() {
+    return true;
+  }
+
+  /** Add a leaf node
+   * Update node counter & rack counter if necessary
+   * @param node node to be added; can be null
+   * @exception IllegalArgumentException if add a node to a leave 
+   *                                     or node to be added is not a leaf
+   */
+  @Override
+  public void add(Node node) {
+    if (node==null) return;
+    if( node instanceof InnerNode ) {
+      throw new IllegalArgumentException(
+        "Not allow to add an inner node: "+NodeBase.getPath(node));
+    }
+    netlock.writeLock().lock();
+    try {
+      Node rack = null;
+
+      // if node only with default rack info, here we need to add default 
+      // nodegroup info
+      if (NetworkTopology.DEFAULT_RACK.equals(node.getNetworkLocation())) {
+        node.setNetworkLocation(node.getNetworkLocation() + 
+            NetworkTopologyWithNodeGroup.DEFAULT_NODEGROUP);
+      }
+      Node nodeGroup = getNode(node.getNetworkLocation());
+      if (nodeGroup == null) {
+        nodeGroup = new InnerNodeWithNodeGroup(node.getNetworkLocation());
+      }
+      rack = getNode(nodeGroup.getNetworkLocation());
+
+      if (rack != null && !(rack instanceof InnerNode)) {
+        throw new IllegalArgumentException("Unexpected data node " 
+            + node.toString() 
+            + " at an illegal network location");
+      }
+      if (clusterMap.add(node)) {
+        LOG.info("Adding a new node: " + NodeBase.getPath(node));
+        if (rack == null) {
+          // We only track rack number here
+          numOfRacks++;
+        }
+      }
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("NetworkTopology became:\n" + this.toString());
+      }
+    } finally {
+      netlock.writeLock().unlock();
+    }
+  }
+
+  /** Remove a node
+   * Update node counter and rack counter if necessary
+   * @param node node to be removed; can be null
+   */
+  @Override
+  public void remove(Node node) {
+    if (node==null) return;
+    if( node instanceof InnerNode ) {
+      throw new IllegalArgumentException(
+          "Not allow to remove an inner node: "+NodeBase.getPath(node));
+    }
+    LOG.info("Removing a node: "+NodeBase.getPath(node));
+    netlock.writeLock().lock();
+    try {
+      if (clusterMap.remove(node)) {
+        Node nodeGroup = getNode(node.getNetworkLocation());
+        if (nodeGroup == null) {
+          nodeGroup = new InnerNode(node.getNetworkLocation());
+        }
+        InnerNode rack = (InnerNode)getNode(nodeGroup.getNetworkLocation());
+        if (rack == null) {
+          numOfRacks--;
+        }
+      }
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("NetworkTopology became:\n" + this.toString());
+      }
+    } finally {
+      netlock.writeLock().unlock();
+    }
+  }
+
+  /** Sort nodes array by their distances to <i>reader</i>
+   * It linearly scans the array, if a local node is found, swap it with
+   * the first element of the array.
+   * If a local node group node is found, swap it with the first element 
+   * following the local node.
+   * If a local rack node is found, swap it with the first element following
+   * the local node group node.
+   * If neither local node, node group node or local rack node is found, put a 
+   * random replica location at position 0.
+   * It leaves the rest nodes untouched.
+   * @param reader the node that wishes to read a block from one of the nodes
+   * @param nodes the list of nodes containing data for the reader
+   */
+  @Override
+  public void pseudoSortByDistance( Node reader, Node[] nodes ) {
+
+    if (reader != null && !this.contains(reader)) {
+      // if reader is not a datanode (not in NetworkTopology tree), we will 
+      // replace this reader with a sibling leaf node in tree.
+      Node nodeGroup = getNode(reader.getNetworkLocation());
+      if (nodeGroup != null && nodeGroup instanceof InnerNode) {
+        InnerNode parentNode = (InnerNode) nodeGroup;
+        // replace reader with the first children of its parent in tree
+        reader = parentNode.getLeaf(0, null);
+      } else {
+        return;
+      }
+    }
+    int tempIndex = 0;
+    int localRackNode = -1;
+    int localNodeGroupNode = -1;
+    if (reader != null) {  
+      //scan the array to find the local node & local rack node
+      for (int i = 0; i < nodes.length; i++) {
+        if (tempIndex == 0 && reader == nodes[i]) { //local node
+          //swap the local node and the node at position 0
+          if (i != 0) {
+            swap(nodes, tempIndex, i);
+          }
+          tempIndex=1;
+
+          if (localRackNode != -1 && localNodeGroupNode != -1) {
+            if (localRackNode == 0) {
+              localRackNode = i;
+            }
+            if (localNodeGroupNode == 0) {
+              localNodeGroupNode = i;
+            }
+            break;
+          }
+        } else if (localNodeGroupNode == -1 && isOnSameNodeGroup(reader, 
+            nodes[i])) {
+          //local node group
+          localNodeGroupNode = i;
+          // node local and rack local are already found
+          if(tempIndex != 0 && localRackNode != -1) break;
+        } else if (localRackNode == -1 && isOnSameRack(reader, nodes[i])) {
+          localRackNode = i;
+          if (tempIndex != 0 && localNodeGroupNode != -1) break;
+        }
+      }
+
+      // swap the local nodegroup node and the node at position tempIndex
+      if(localNodeGroupNode != -1 && localNodeGroupNode != tempIndex) {
+        swap(nodes, tempIndex, localNodeGroupNode);
+        if (localRackNode == tempIndex) {
+          localRackNode = localNodeGroupNode;
+        }
+        tempIndex++;
+      }
+
+      // swap the local rack node and the node at position tempIndex
+      if(localRackNode != -1 && localRackNode != tempIndex) {
+        swap(nodes, tempIndex, localRackNode);
+        tempIndex++;
+      }
+    }
+
+    // put a random node at position 0 if there is not a local/local-nodegroup/
+    // local-rack node
+    if (tempIndex == 0 && localNodeGroupNode == -1 && localRackNode == -1
+        && nodes.length != 0) {
+      swap(nodes, 0, r.nextInt(nodes.length));
+    }
+  }
+
+  /** InnerNodeWithNodeGroup represents a switch/router of a data center, rack
+   * or physical host. Different from a leaf node, it has non-null children.
+   */
+  static class InnerNodeWithNodeGroup extends InnerNode {
+    public InnerNodeWithNodeGroup(String name, String location, 
+        InnerNode parent, int level) {
+      super(name, location, parent, level);
+    }
+
+    public InnerNodeWithNodeGroup(String name, String location) {
+      super(name, location);
+    }
+
+    public InnerNodeWithNodeGroup(String path) {
+      super(path);
+    }
+
+    @Override
+    boolean isRack() {
+      // it is node group
+      if (getChildren().isEmpty()) {
+        return false;
+      }
+
+      Node firstChild = children.get(0);
+
+      if (firstChild instanceof InnerNode) {
+        Node firstGrandChild = (((InnerNode) firstChild).children).get(0);
+        if (firstGrandChild instanceof InnerNode) {
+          // it is datacenter
+          return false;
+        } else {
+          return true;
+        }
+      }
+      return false;
+    }
+    
+    @Override
+    protected boolean isLeafParent() {
+      return isNodeGroup();
+    }
+
+    /**
+     * Judge if this node represents a node group
+     * 
+     * @return true if it has no child or its children are not InnerNodes
+     */
+    boolean isNodeGroup() {
+      if (children.isEmpty()) {
+        return true;
+      }
+      Node firstChild = children.get(0);
+      if (firstChild instanceof InnerNode) {
+        // it is rack or datacenter
+        return false;
+      }
+      return true;
+    }
+
+    @Override
+    protected InnerNode createParentNode(String parentName) {
+      return new InnerNodeWithNodeGroup(parentName, getPath(this), this,
+          this.getLevel() + 1);
+    }
+
+    @Override
+    protected boolean areChildrenLeaves() {
+      return isNodeGroup();
+    }
+  }
+}

Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java?rev=1408968&r1=1408967&r2=1408968&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java (original)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java Tue Nov 13 21:23:09 2012
@@ -166,10 +166,10 @@ public class DatanodeInfo extends Datano
     this.xceiverCount = xceiverCount; 
   }
 
-  /** rack name **/
+  /** network location **/
   public synchronized String getNetworkLocation() {return location;}
     
-  /** Sets the rack name */
+  /** Sets the network location */
   public synchronized void setNetworkLocation(String location) {
     this.location = NodeBase.normalize(location);
   }

Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1408968&r1=1408967&r2=1408968&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Nov 13 21:23:09 2012
@@ -342,7 +342,7 @@ public class FSNamesystem implements FSC
   private Host2NodesMap host2DataNodeMap = new Host2NodesMap();
     
   // datanode networktoplogy
-  NetworkTopology clusterMap = new NetworkTopology();
+  NetworkTopology clusterMap;
   private DNSToSwitchMapping dnsToSwitchMapping;
   
   // for block replicas placement
@@ -521,7 +521,11 @@ public class FSNamesystem implements FSC
 
     this.blocksInvalidateWorkPct = 
               DFSUtil.getInvalidateWorkPctPerIteration(conf);
+
     this.blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
+    this.clusterMap = (NetworkTopology) ReflectionUtils.newInstance(
+        conf.getClass("net.topology.impl", NetworkTopology.class,
+            NetworkTopology.class), conf);
 
     this.replicator = BlockPlacementPolicy.getInstance(conf, this, clusterMap);
     this.defaultReplication = conf.getInt("dfs.replication", 3);
@@ -1026,8 +1030,20 @@ public class FSNamesystem implements FSC
         true);
     if (blocks != null) {
       //sort the blocks
-      DatanodeDescriptor client = host2DataNodeMap.getDatanodeByHost(
+      // In some deployment cases, cluster is with separation of task tracker 
+      // and datanode which means client machines will not always be recognized 
+      // as known data nodes, so here we should try to get node (but not 
+      // datanode only) for locality based sort.
+      Node client = host2DataNodeMap.getDatanodeByHost(
           clientMachine);
+      if (client == null) {
+        List<String> hosts = new ArrayList<String> (1);
+        hosts.add(clientMachine);
+        String rName = dnsToSwitchMapping.resolve(hosts).get(0);
+        if (rName != null)
+          client = new NodeBase(clientMachine, rName);
+      }   
+
       DFSUtil.StaleComparator comparator = null;
       if (checkForStaleDataNodes) {
         comparator = new DFSUtil.StaleComparator(staleInterval);

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/net/TestNetworkTopologyWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/net/TestNetworkTopologyWithNodeGroup.java?rev=1408968&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/net/TestNetworkTopologyWithNodeGroup.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/net/TestNetworkTopologyWithNodeGroup.java Tue Nov 13 21:23:09 2012
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.net;
+
+import java.util.HashMap;
+import java.util.Map;
+import junit.framework.TestCase;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.server.namenode.DatanodeDescriptor;
+
+public class TestNetworkTopologyWithNodeGroup extends TestCase {
+  private final static NetworkTopologyWithNodeGroup cluster = new 
+      NetworkTopologyWithNodeGroup();
+
+  private final static DatanodeDescriptor dataNodes[] = new DatanodeDescriptor[] {
+      new DatanodeDescriptor(new DatanodeID("h1:5020"), "/d1/r1/n1"),
+      new DatanodeDescriptor(new DatanodeID("h2:5020"), "/d1/r1/n1"),
+      new DatanodeDescriptor(new DatanodeID("h3:5020"), "/d1/r1/n2"),
+      new DatanodeDescriptor(new DatanodeID("h4:5020"), "/d1/r2/n3"),
+      new DatanodeDescriptor(new DatanodeID("h5:5020"), "/d1/r2/n3"),
+      new DatanodeDescriptor(new DatanodeID("h6:5020"), "/d1/r2/n4"),
+      new DatanodeDescriptor(new DatanodeID("h7:5020"), "/d2/r3/n5"),
+      new DatanodeDescriptor(new DatanodeID("h8:5020"), "/d2/r3/n6")
+  };
+
+  private final static NodeBase computeNode = new NodeBase("/d1/r1/n1/h9");
+
+  static {
+    for(int i=0; i<dataNodes.length; i++) {
+      cluster.add(dataNodes[i]);
+    }
+  }
+
+  public void testNumOfChildren() throws Exception {
+    assertEquals(cluster.getNumOfLeaves(), dataNodes.length);
+  }
+
+  public void testNumOfRacks() throws Exception {
+    assertEquals(cluster.getNumOfRacks(), 3);
+  }
+
+  public void testRacks() throws Exception {
+    assertEquals(cluster.getNumOfRacks(), 3);
+    assertTrue(cluster.isOnSameRack(dataNodes[0], dataNodes[1]));
+    assertTrue(cluster.isOnSameRack(dataNodes[1], dataNodes[2]));
+    assertFalse(cluster.isOnSameRack(dataNodes[2], dataNodes[3]));
+    assertTrue(cluster.isOnSameRack(dataNodes[3], dataNodes[4]));
+    assertTrue(cluster.isOnSameRack(dataNodes[4], dataNodes[5]));
+    assertFalse(cluster.isOnSameRack(dataNodes[5], dataNodes[6]));
+    assertTrue(cluster.isOnSameRack(dataNodes[6], dataNodes[7]));
+  }
+
+  public void testNodeGroups() throws Exception {
+    assertEquals(cluster.getNumOfRacks(), 3);
+    assertTrue(cluster.isOnSameNodeGroup(dataNodes[0], dataNodes[1]));
+    assertFalse(cluster.isOnSameNodeGroup(dataNodes[1], dataNodes[2]));
+    assertFalse(cluster.isOnSameNodeGroup(dataNodes[2], dataNodes[3]));
+    assertTrue(cluster.isOnSameNodeGroup(dataNodes[3], dataNodes[4]));
+    assertFalse(cluster.isOnSameNodeGroup(dataNodes[4], dataNodes[5]));
+    assertFalse(cluster.isOnSameNodeGroup(dataNodes[5], dataNodes[6]));
+    assertFalse(cluster.isOnSameNodeGroup(dataNodes[6], dataNodes[7]));
+  }
+
+  public void testGetDistance() throws Exception {
+    assertEquals(cluster.getDistance(dataNodes[0], dataNodes[0]), 0);
+    assertEquals(cluster.getDistance(dataNodes[0], dataNodes[1]), 2);
+    assertEquals(cluster.getDistance(dataNodes[0], dataNodes[2]), 4);
+    assertEquals(cluster.getDistance(dataNodes[0], dataNodes[3]), 6);
+    assertEquals(cluster.getDistance(dataNodes[0], dataNodes[6]), 8);
+  }
+
+  public void testPseudoSortByDistance() throws Exception {
+    DatanodeDescriptor[] testNodes = new DatanodeDescriptor[4];
+
+    // array contains both local node, local node group & local rack node
+    testNodes[0] = dataNodes[1];
+    testNodes[1] = dataNodes[2];
+    testNodes[2] = dataNodes[3];
+    testNodes[3] = dataNodes[0];
+    cluster.pseudoSortByDistance(dataNodes[0], testNodes );
+    assertTrue(testNodes[0] == dataNodes[0]);
+    assertTrue(testNodes[1] == dataNodes[1]);
+    assertTrue(testNodes[2] == dataNodes[2]);
+    assertTrue(testNodes[3] == dataNodes[3]);
+
+    // array contains local node & local node group
+    testNodes[0] = dataNodes[3];
+    testNodes[1] = dataNodes[4];
+    testNodes[2] = dataNodes[1];
+    testNodes[3] = dataNodes[0];
+    cluster.pseudoSortByDistance(dataNodes[0], testNodes );
+    assertTrue(testNodes[0] == dataNodes[0]);
+    assertTrue(testNodes[1] == dataNodes[1]);
+
+    // array contains local node & rack node
+    testNodes[0] = dataNodes[5];
+    testNodes[1] = dataNodes[3];
+    testNodes[2] = dataNodes[2];
+    testNodes[3] = dataNodes[0];
+    cluster.pseudoSortByDistance(dataNodes[0], testNodes );
+    assertTrue(testNodes[0] == dataNodes[0]);
+    assertTrue(testNodes[1] == dataNodes[2]);
+
+    // array contains local-nodegroup node (not a data node also) & rack node
+    testNodes[0] = dataNodes[6];
+    testNodes[1] = dataNodes[7];
+    testNodes[2] = dataNodes[2];
+    testNodes[3] = dataNodes[0];
+    cluster.pseudoSortByDistance(computeNode, testNodes );
+    assertTrue(testNodes[0] == dataNodes[0]);
+    assertTrue(testNodes[1] == dataNodes[2]);
+  }
+
+  /**
+   * This picks a large number of nodes at random in order to ensure coverage
+   * 
+   * @param numNodes the number of nodes
+   * @param excludedScope the excluded scope
+   * @return the frequency that nodes were chosen
+   */
+  private Map<Node, Integer> pickNodesAtRandom(int numNodes,
+      String excludedScope) {
+    Map<Node, Integer> frequency = new HashMap<Node, Integer>();
+    for (DatanodeDescriptor dnd : dataNodes) {
+      frequency.put(dnd, 0);
+    }
+
+    for (int j = 0; j < numNodes; j++) {
+      Node random = cluster.chooseRandom(excludedScope);
+      frequency.put(random, frequency.get(random) + 1);
+    }
+    return frequency;
+  }
+
+  /**
+   * This test checks that chooseRandom works for an excluded node.
+   */
+  public void testChooseRandomExcludedNode() {
+    String scope = "~" + NodeBase.getPath(dataNodes[0]);
+    Map<Node, Integer> frequency = pickNodesAtRandom(100, scope);
+
+    for (Node key : dataNodes) {
+      // all nodes except the first should be more than zero
+      assertTrue(frequency.get(key) > 0 || key == dataNodes[0]);
+    }
+  }
+
+}