You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2012/12/16 09:28:21 UTC

svn commit: r1422482 - in /hadoop/common/branches/branch-1: ./ src/core/org/apache/hadoop/net/ src/hdfs/org/apache/hadoop/hdfs/server/balancer/ src/hdfs/org/apache/hadoop/hdfs/server/namenode/ src/test/org/apache/hadoop/hdfs/ src/test/org/apache/hadoop...

Author: szetszwo
Date: Sun Dec 16 08:28:17 2012
New Revision: 1422482

URL: http://svn.apache.org/viewvc?rev=1422482&view=rev
Log:
HDFS-3942. Backport HDFS-3495 and HDFS-4234: Update Balancer to support new NetworkTopology with NodeGroup and use generic code for choosing datanode in Balancer.  Contributed by Junping Du

Added:
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java
Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java
    hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java
    hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1422482&r1=1422481&r2=1422482&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Sun Dec 16 08:28:17 2012
@@ -51,6 +51,10 @@ Release 1.2.0 - unreleased
     HADOOP-7096. Allow setting of end-of-record delimiter for TextInputFormat
     (Ahmed Radwan, backported by suresh)
 
+    HDFS-3942. Backport HDFS-3495 and HDFS-4234: Update Balancer to support new
+    NetworkTopology with NodeGroup and use generic code for choosing datanode
+    in Balancer.  (Junping Du via szetszwo)
+
   IMPROVEMENTS
 
     HDFS-3515. Port HDFS-1457 to branch-1. (eli)

Modified: hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java?rev=1422482&r1=1422481&r2=1422482&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java (original)
+++ hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java Sun Dec 16 08:28:17 2012
@@ -26,6 +26,8 @@ import java.util.concurrent.locks.Reentr
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /** The class represents a cluster of computer with a tree hierarchical
  * network topology.
@@ -341,6 +343,19 @@ public class NetworkTopology {
   public NetworkTopology() {
     clusterMap = new InnerNode(InnerNode.ROOT);
   }
+  
+  /**
+   * Get an instance of NetworkTopology based on the value of the configuration
+   * parameter net.topology.impl.
+   * 
+   * @param conf the configuration to be used
+   * @return an instance of NetworkTopology
+   */
+  public static NetworkTopology getInstance(Configuration conf){
+    return (NetworkTopology) ReflectionUtils.newInstance(
+        conf.getClass("net.topology.impl", NetworkTopology.class,
+            NetworkTopology.class), conf);
+  }
 
   /** Add a leaf node
    * Update node counter & rack counter if neccessary

Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1422482&r1=1422481&r2=1422482&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java Sun Dec 16 08:28:17 2012
@@ -60,7 +60,6 @@ import org.apache.hadoop.hdfs.protocol.F
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicy;
 import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyDefault;
@@ -79,6 +78,7 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Daemon;
@@ -173,7 +173,7 @@ import org.apache.hadoop.util.ToolRunner
  * <ol>
  * <li>The cluster is balanced. Exiting
  * <li>No block can be moved. Exiting...
- * <li>No block has been moved for 3 iterations. Exiting...
+ * <li>No block has been moved for 5 iterations. Exiting...
  * <li>Received an IO exception: failure reason. Exiting...
  * <li>Another balancer is running. Exiting...
  * </ol>
@@ -227,7 +227,7 @@ public class Balancer implements Tool {
   private Map<String, BalancerDatanode> datanodes
                  = new HashMap<String, BalancerDatanode>();
   
-  private NetworkTopology cluster = new NetworkTopology();
+  private NetworkTopology cluster;
   
   private double avgUtilization = 0.0D;
   
@@ -542,7 +542,7 @@ public class Balancer implements Tool {
     }
     
     /** Decide if still need to move more bytes */
-    protected boolean isMoveQuotaFull() {
+    protected boolean hasSpaceForScheduling() {
       return scheduledSize<maxSizeToMove;
     }
 
@@ -746,8 +746,8 @@ public class Balancer implements Tool {
       long startTime = Util.now();
       this.blocksToReceive = 2*scheduledSize;
       boolean isTimeUp = false;
-      while(!isTimeUp && scheduledSize>0 &&
-          (!srcBlockList.isEmpty() || blocksToReceive>0)) {
+      while(!isTimeUp && scheduledSize > 0 &&
+          (!srcBlockList.isEmpty() || blocksToReceive > 0)) {
         PendingBlockMove pendingBlock = chooseNextBlockToMove();
         if (pendingBlock != null) {
           // move the block
@@ -796,7 +796,8 @@ public class Balancer implements Tool {
    */
   private void checkReplicationPolicyCompatibility(Configuration conf)
       throws UnsupportedActionException {
-    if (BlockPlacementPolicy.getInstance(conf, null, null).getClass() != BlockPlacementPolicyDefault.class) {
+    if (!(BlockPlacementPolicy.getInstance(conf, null, null) 
+        instanceof BlockPlacementPolicyDefault)) {
       throw new UnsupportedActionException(
           "Balancer without BlockPlacementPolicyDefault");
     }
@@ -1065,6 +1066,36 @@ public class Balancer implements Tool {
     LOG.info(msg);
   }
   
+  /** A matcher interface for matching nodes. */
+  private interface Matcher {
+    /** Given the cluster topology, does the left node match the right node? */
+    boolean match(NetworkTopology cluster, Node left,  Node right);
+  }
+
+  /** Match datanodes in the same node group. */
+  static final Matcher SAME_NODE_GROUP = new Matcher() {
+    @Override
+    public boolean match(NetworkTopology cluster, Node left, Node right) {
+      return cluster.isOnSameNodeGroup(left, right);
+    }
+  };
+
+  /** Match datanodes in the same rack. */
+  static final Matcher SAME_RACK = new Matcher() {
+    @Override
+    public boolean match(NetworkTopology cluster, Node left, Node right) {
+      return cluster.isOnSameRack(left, right);
+    }
+  };
+
+  /** Match any datanode with any other datanode. */
+  static final Matcher ANY_OTHER = new Matcher() {
+    @Override
+    public boolean match(NetworkTopology cluster, Node left, Node right) {
+      return left != right;
+    }
+  };
+  
   /* Decide all <source, target> pairs and
    * the number of bytes to move from a source to a target
    * Maximum bytes to be moved per node is
@@ -1072,10 +1103,15 @@ public class Balancer implements Tool {
    * Return total number of bytes to move in this iteration
    */
   private long chooseNodes() {
-    // Match nodes on the same rack first
-    chooseNodes(true);
-    // Then match nodes on different racks
-    chooseNodes(false);
+    // First, match nodes on the same node group if cluster is node group aware
+    if (cluster.isNodeGroupAware()) {
+      chooseNodes(SAME_NODE_GROUP);
+    }
+    
+    // Then, match nodes on the same rack
+    chooseNodes(SAME_RACK);
+    // At last, match all remaining nodes
+    chooseNodes(ANY_OTHER);
     
     assert (datanodes.size() == 
       overUtilizedDatanodes.size()+underUtilizedDatanodes.size()+
@@ -1089,167 +1125,99 @@ public class Balancer implements Tool {
     }
     return bytesToMove;
   }
-
-  /* if onRack is true, decide all <source, target> pairs
-   * where source and target are on the same rack; Otherwise
-   * decide all <source, target> pairs where source and target are
-   * on different racks
-   */
-  private void chooseNodes(boolean onRack) {
+  
+  /** Decide all <source, target> pairs according to the matcher. */
+  private void chooseNodes(final Matcher matcher) {
     /* first step: match each overUtilized datanode (source) to
      * one or more underUtilized datanodes (targets).
      */
-    chooseTargets(underUtilizedDatanodes.iterator(), onRack);
+    chooseDatanodes(overUtilizedDatanodes, underUtilizedDatanodes, matcher);
     
     /* match each remaining overutilized datanode (source) to 
      * below average utilized datanodes (targets).
      * Note only overutilized datanodes that haven't had that max bytes to move
      * satisfied in step 1 are selected
      */
-    chooseTargets(belowAvgUtilizedDatanodes.iterator(), onRack);
+    chooseDatanodes(overUtilizedDatanodes, belowAvgUtilizedDatanodes, matcher);
 
-    /* match each remaining underutilized datanode to 
-     * above average utilized datanodes.
+    /* match each remaining underutilized datanode (target) to 
+     * above average utilized datanodes (source).
      * Note only underutilized datanodes that have not had that max bytes to
      * move satisfied in step 1 are selected.
      */
-    chooseSources(aboveAvgUtilizedDatanodes.iterator(), onRack);
+    chooseDatanodes(underUtilizedDatanodes, aboveAvgUtilizedDatanodes, matcher);
   }
-   
-  /* choose targets from the target candidate list for each over utilized
-   * source datanode. OnRackTarget determines if the chosen target 
-   * should be on the same rack as the source
+  
+  /**
+   * For each datanode, choose matching nodes from the candidates. Either the
+   * datanodes or the candidates are source nodes with (utilization > Avg), and
+   * the others are target nodes with (utilization < Avg).
    */
-  private void chooseTargets(  
-      Iterator<BalancerDatanode> targetCandidates, boolean onRackTarget ) {
-    for (Iterator<Source> srcIterator = overUtilizedDatanodes.iterator();
-        srcIterator.hasNext();) {
-      Source source = srcIterator.next();
-      while (chooseTarget(source, targetCandidates, onRackTarget)) {
-      }
-      if (!source.isMoveQuotaFull()) {
-        srcIterator.remove();
+  private <D extends BalancerDatanode, C extends BalancerDatanode> void 
+      chooseDatanodes(Collection<D> datanodes, Collection<C> candidates,
+          Matcher matcher) {
+    for (Iterator<D> i = datanodes.iterator(); i.hasNext();) {
+      final D datanode = i.next();
+      for(; chooseForOneDatanode(datanode, candidates, matcher); );
+      if (!datanode.hasSpaceForScheduling()) {
+        i.remove();
       }
     }
-    return;
   }
   
-  /* choose sources from the source candidate list for each under utilized
-   * target datanode. onRackSource determines if the chosen source 
-   * should be on the same rack as the target
+  /**
+   * For the given datanode, choose a candidate and then schedule it.
+   * @return true if a candidate is chosen; false if no candidates is chosen.
    */
-  private void chooseSources(
-      Iterator<Source> sourceCandidates, boolean onRackSource) {
-    for (Iterator<BalancerDatanode> targetIterator = 
-      underUtilizedDatanodes.iterator(); targetIterator.hasNext();) {
-      BalancerDatanode target = targetIterator.next();
-      while (chooseSource(target, sourceCandidates, onRackSource)) {
-      }
-      if (!target.isMoveQuotaFull()) {
-        targetIterator.remove();
-      }
-    }
-    return;
-  }
+  private <C extends BalancerDatanode> boolean chooseForOneDatanode(
+      BalancerDatanode dn, Collection<C> candidates, Matcher matcher) {
+    final Iterator<C> i = candidates.iterator();
+    final C chosen = chooseCandidate(dn, i, matcher);
 
-  /* For the given source, choose targets from the target candidate list.
-   * OnRackTarget determines if the chosen target 
-   * should be on the same rack as the source
-   */
-  private boolean chooseTarget(Source source,
-      Iterator<BalancerDatanode> targetCandidates, boolean onRackTarget) {
-    if (!source.isMoveQuotaFull()) {
+    if (chosen == null) {
       return false;
     }
-    boolean foundTarget = false;
-    BalancerDatanode target = null;
-    while (!foundTarget && targetCandidates.hasNext()) {
-      target = targetCandidates.next();
-      if (!target.isMoveQuotaFull()) {
-        targetCandidates.remove();
-        continue;
-      }
-      if (onRackTarget) {
-        // choose from on-rack nodes
-        if (cluster.isOnSameRack(source.datanode, target.datanode)) {
-          foundTarget = true;
-        }
-      } else {
-        // choose from off-rack nodes
-        if (!cluster.isOnSameRack(source.datanode, target.datanode)) {
-          foundTarget = true;
-        }
-      }
+    if (dn instanceof Source) {
+      matchSourceWithTargetToMove((Source)dn, chosen);
+    } else {
+      matchSourceWithTargetToMove((Source)chosen, dn);
     }
-    if (foundTarget) {
-      assert(target != null):"Choose a null target";
-      long size = Math.min(source.availableSizeToMove(),
-          target.availableSizeToMove());
-      NodeTask nodeTask = new NodeTask(target, size);
-      source.addNodeTask(nodeTask);
-      target.incScheduledSize(nodeTask.getSize());
-      sources.add(source);
-      targets.add(target);
-      if (!target.isMoveQuotaFull()) {
-        targetCandidates.remove();
-      }
-      LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
-          +source.datanode.getName() + " to " + target.datanode.getName());
-      return true;
+    if (!chosen.hasSpaceForScheduling()) {
+      i.remove();
     }
-    return false;
+    return true;
   }
   
-  /* For the given target, choose sources from the source candidate list.
-   * OnRackSource determines if the chosen source 
-   * should be on the same rack as the target
-   */
-  private boolean chooseSource(BalancerDatanode target,
-      Iterator<Source> sourceCandidates, boolean onRackSource) {
-    if (!target.isMoveQuotaFull()) {
-      return false;
-    }
-    boolean foundSource = false;
-    Source source = null;
-    while (!foundSource && sourceCandidates.hasNext()) {
-      source = sourceCandidates.next();
-      if (!source.isMoveQuotaFull()) {
-        sourceCandidates.remove();
-        continue;
-      }
-      if (onRackSource) {
-        // choose from on-rack nodes
-        if ( cluster.isOnSameRack(source.getDatanode(), target.getDatanode())) {
-          foundSource = true;
-        }
-      } else {
-        // choose from off-rack nodes
-        if (!cluster.isOnSameRack(source.datanode, target.datanode)) {
-          foundSource = true;
+  private void matchSourceWithTargetToMove(
+      Source source, BalancerDatanode target) {
+    long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove());
+    NodeTask nodeTask = new NodeTask(target, size);
+    source.addNodeTask(nodeTask);
+    target.incScheduledSize(nodeTask.getSize());
+    sources.add(source);
+    targets.add(target);
+    LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
+        +source.datanode.getName() + " to " + target.datanode.getName());
+  }
+  
+  /** Choose a candidate for the given datanode. */
+  private <D extends BalancerDatanode, C extends BalancerDatanode>
+      C chooseCandidate(D dn, Iterator<C> candidates, Matcher matcher) {
+    if (dn.hasSpaceForScheduling()) {
+      for(; candidates.hasNext(); ) {
+        final C c = candidates.next();
+        if (!c.hasSpaceForScheduling()) {
+          candidates.remove();
+        } else if (matcher.match(cluster, dn.getDatanode(), c.getDatanode())) {
+          return c;
         }
       }
     }
-    if (foundSource) {
-      assert(source != null):"Choose a null source";
-      long size = Math.min(source.availableSizeToMove(),
-          target.availableSizeToMove());
-      NodeTask nodeTask = new NodeTask(target, size);
-      source.addNodeTask(nodeTask);
-      target.incScheduledSize(nodeTask.getSize());
-      sources.add(source);
-      targets.add(target);
-      if ( !source.isMoveQuotaFull()) {
-        sourceCandidates.remove();
-      }
-      LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
-          +source.datanode.getName() + " to " + target.datanode.getName());
-      return true;
-    }
-    return false;
+    return null;
   }
 
   private static class BytesMoved {
-    private long bytesMoved = 0L;;
+    private long bytesMoved = 0L;
     private synchronized void inc( long bytes ) {
       bytesMoved += bytes;
     }
@@ -1391,6 +1359,11 @@ public class Balancer implements Tool {
     if (block.isLocatedOnDatanode(target)) {
       return false;
     }
+    
+    if (cluster.isNodeGroupAware() && 
+        isOnSameNodeGroupWithReplicas(target, block, source)) {
+      return false;
+    }
 
     boolean goodBlock = false;
     if (cluster.isOnSameRack(source.getDatanode(), target.getDatanode())) {
@@ -1423,16 +1396,38 @@ public class Balancer implements Tool {
     return goodBlock;
   }
   
+  /**
+   * Check if there are any replica (other than source) on the same node group
+   * with target. If true, then target is not a good candidate for placing 
+   * specific block replica as we don't want 2 replicas under the same nodegroup 
+   * after balance.
+   * @param target targetDataNode
+   * @param block dataBlock
+   * @param source sourceDataNode
+   * @return true if there are any replica (other than source) on the same node
+   * group with target
+   */
+  private boolean isOnSameNodeGroupWithReplicas(BalancerDatanode target,
+      BalancerBlock block, Source source) {
+    for (BalancerDatanode loc : block.locations) {
+      if (loc != source && 
+          cluster.isOnSameNodeGroup(loc.getDatanode(), target.getDatanode())) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   /* reset all fields in a balancer preparing for the next iteration */
   private void resetData() {
-    this.cluster = new NetworkTopology();
+    this.cluster = NetworkTopology.getInstance(conf);
     this.overUtilizedDatanodes.clear();
     this.aboveAvgUtilizedDatanodes.clear();
     this.belowAvgUtilizedDatanodes.clear();
     this.underUtilizedDatanodes.clear();
     this.datanodes.clear();
     this.sources.clear();
-    this.targets.clear();  
+    this.targets.clear();
     this.avgUtilization = 0.0D;
     cleanGlobalBlockList();
     this.movedBlocks.cleanup();
@@ -1505,7 +1500,7 @@ public class Balancer implements Tool {
       Formatter formatter = new Formatter(System.out);
       System.out.println("Time Stamp               Iteration#  Bytes Already Moved  Bytes Left To Move  Bytes Being Moved");
       int iterations = 0;
-      while (true ) {
+      while (true) {
         /* get all live datanodes of a cluster and their disk usage
          * decide the number of bytes need to be moved
          */
@@ -1655,6 +1650,7 @@ public class Balancer implements Tool {
   /** set this balancer's configuration */
   public void setConf(Configuration conf) {
     this.conf = conf;
+    this.cluster = NetworkTopology.getInstance(conf);
     movedBlocks.setWinWidth(conf);
   }
 

Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1422482&r1=1422481&r2=1422482&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Sun Dec 16 08:28:17 2012
@@ -524,9 +524,7 @@ public class FSNamesystem implements FSC
               DFSUtil.getInvalidateWorkPctPerIteration(conf);
 
     this.blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
-    this.clusterMap = (NetworkTopology) ReflectionUtils.newInstance(
-        conf.getClass("net.topology.impl", NetworkTopology.class,
-            NetworkTopology.class), conf);
+    this.clusterMap = NetworkTopology.getInstance(conf);
 
     this.maxCorruptFilesReturned = conf.getInt(
         DFSConfigKeys.DFS_MAX_CORRUPT_FILES_RETURNED_KEY,
@@ -3863,7 +3861,6 @@ public class FSNamesystem implements FSC
     // Is the block being reported the last block of an underconstruction file?
     boolean blockUnderConstruction = false;
     if (fileINode.isUnderConstruction()) {
-      INodeFileUnderConstruction cons = (INodeFileUnderConstruction) fileINode;
       Block last = fileINode.getLastBlock();
       if (last == null) {
         // This should never happen, but better to handle it properly than to throw

Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1422482&r1=1422481&r2=1422482&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java Sun Dec 16 08:28:17 2012
@@ -27,6 +27,7 @@ import java.nio.channels.FileChannel;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
@@ -69,13 +70,12 @@ public class MiniDFSCluster {
   }
 
   private Configuration conf;
-  private NameNode nameNode;
-  private int numDataNodes;
-  private ArrayList<DataNodeProperties> dataNodes = 
+  protected NameNode nameNode;
+  protected int numDataNodes;
+  protected List<DataNodeProperties> dataNodes = 
                          new ArrayList<DataNodeProperties>();
   private File base_dir;
-  private File data_dir;
-  
+  protected File data_dir;
   
   /**
    * This null constructor is used only when wishing to start a data node cluster
@@ -439,8 +439,6 @@ public class MiniDFSCluster {
     waitActive();
   }
   
-  
-  
   /**
    * Modify the config and start up the DataNodes.  The info port for
    * DataNodes is guaranteed to use a free port.

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java?rev=1422482&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java Sun Dec 16 08:28:17 2012
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.StaticMapping;
+
+public class MiniDFSClusterWithNodeGroup extends MiniDFSCluster {
+
+  private static String[] NODE_GROUPS = null;
+  private static final Log LOG = LogFactory.getLog(MiniDFSClusterWithNodeGroup.class);
+  
+  public MiniDFSClusterWithNodeGroup(int nameNodePort, 
+          Configuration conf,
+          int numDataNodes,
+          boolean format,
+          boolean manageDfsDirs,
+          StartupOption operation,
+          String[] racks,
+          long[] simulatedCapacities) throws IOException {
+    super(nameNodePort, conf, numDataNodes, format, manageDfsDirs, 
+        manageDfsDirs, operation, racks, null, simulatedCapacities);
+  }
+  
+  public MiniDFSClusterWithNodeGroup(int nameNodePort, 
+          Configuration conf,
+          int numDataNodes,
+          boolean format,
+          boolean manageDfsDirs,
+          StartupOption operation,
+          String[] racks,
+          String[] hosts,
+          long[] simulatedCapacities) throws IOException {
+    super(nameNodePort, conf, numDataNodes, format, manageDfsDirs, 
+        manageDfsDirs, operation, racks, hosts, simulatedCapacities);
+  }
+
+  // NODE_GROUPS should be set before constructor being executed.
+  public static void setNodeGroups(String[] nodeGroups) {
+    NODE_GROUPS = nodeGroups;
+  }
+
+  public synchronized void startDataNodes(Configuration conf, int numDataNodes,
+      boolean manageDfsDirs, StartupOption operation, 
+      String[] racks, String[] nodeGroups, String[] hosts,
+      long[] simulatedCapacities) throws IOException {
+    conf.set("slave.host.name", "127.0.0.1");
+
+    int curDatanodesNum = dataNodes.size();
+    // for mincluster's the default initialDelay for BRs is 0
+    if (conf.get(DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY) == null) {
+      conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY, 0);
+    }
+    // If minicluster's name node is null assume that the conf has been
+    // set with the right address:port of the name node.
+    //
+    if (nameNode != null) { // set conf from the name node
+        InetSocketAddress nnAddr = nameNode.getNameNodeAddress(); 
+        int nameNodePort = nnAddr.getPort(); 
+        FileSystem.setDefaultUri(conf, 
+                                 "hdfs://"+ nnAddr.getHostName() +
+                                 ":" + Integer.toString(nameNodePort));
+      }
+    if (racks != null && numDataNodes > racks.length ) {
+      throw new IllegalArgumentException( "The length of racks [" + racks.length
+          + "] is less than the number of datanodes [" + numDataNodes + "].");
+    }
+
+    if (nodeGroups != null && numDataNodes > nodeGroups.length ) {
+      throw new IllegalArgumentException( "The length of nodeGroups [" + nodeGroups.length
+          + "] is less than the number of datanodes [" + numDataNodes + "].");
+    }
+
+    if (hosts != null && numDataNodes > hosts.length ) {
+      throw new IllegalArgumentException( "The length of hosts [" + hosts.length
+          + "] is less than the number of datanodes [" + numDataNodes + "].");
+    }
+    //Generate some hostnames if required
+    if (racks != null && hosts == null) {
+      hosts = new String[numDataNodes];
+      for (int i = curDatanodesNum; i < curDatanodesNum + numDataNodes; i++) {
+        hosts[i - curDatanodesNum] = "host" + i + ".foo.com";
+      }
+    }
+
+    if (simulatedCapacities != null 
+        && numDataNodes > simulatedCapacities.length) {
+      throw new IllegalArgumentException( "The length of simulatedCapacities [" 
+          + simulatedCapacities.length
+          + "] is less than the number of datanodes [" + numDataNodes + "].");
+    }
+    
+    
+    // Set up the right ports for the datanodes
+    conf.set("dfs.datanode.address", "127.0.0.1:0");
+    conf.set("dfs.datanode.http.address", "127.0.0.1:0");
+    conf.set("dfs.datanode.ipc.address", "127.0.0.1:0");
+
+    String [] dnArgs = (operation == null ||
+    operation != StartupOption.ROLLBACK) ?
+        null : new String[] {operation.getName()};
+
+    for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) {
+      Configuration dnConf = new Configuration(conf);
+      
+      if (manageDfsDirs) {
+        File dir1 = new File(data_dir, "data"+(2*i+1));
+        File dir2 = new File(data_dir, "data"+(2*i+2));
+        dir1.mkdirs();
+        dir2.mkdirs();
+        if (!dir1.isDirectory() || !dir2.isDirectory()) { 
+          throw new IOException("Mkdirs failed to create directory for DataNode "
+              + i + ": " + dir1 + " or " + dir2);
+        }
+        dnConf.set(DataNode.DATA_DIR_KEY, dir1.getPath() + "," + dir2.getPath());
+      }
+      if (simulatedCapacities != null) {
+        dnConf.setBoolean("dfs.datanode.simulateddatastorage", true);
+        dnConf.setLong(SimulatedFSDataset.CONFIG_PROPERTY_CAPACITY,
+        simulatedCapacities[i-curDatanodesNum]);
+      }
+      LOG.info("Starting DataNode " + i + " with "
+          + DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY + ": "
+          + dnConf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY));
+      if (hosts != null) {
+        dnConf.set("slave.host.name", hosts[i - curDatanodesNum]);
+        LOG.info("Starting DataNode " + i + " with hostname set to: "
+            + dnConf.get("slave.host.name"));
+      }
+      if (racks != null) {
+        String name = hosts[i - curDatanodesNum];
+        if (nodeGroups == null) {
+          LOG.info("Adding node with hostname : " + name + " to rack " +
+             racks[i-curDatanodesNum]);
+          StaticMapping.addNodeToRack(name,racks[i-curDatanodesNum]);
+        } else {
+          LOG.info("Adding node with hostname : " + name + " to serverGroup " +
+              nodeGroups[i-curDatanodesNum] + " and rack " +
+              racks[i-curDatanodesNum]);
+          StaticMapping.addNodeToRack(name,racks[i-curDatanodesNum] + 
+              nodeGroups[i-curDatanodesNum]);
+        }
+      }
+      Configuration newconf = new Configuration(dnConf); // save config
+      if (hosts != null) {
+        NetUtils.addStaticResolution(hosts[i - curDatanodesNum], "localhost");
+      }
+      DataNode dn = DataNode.instantiateDataNode(dnArgs, dnConf);
+      if(dn == null)
+        throw new IOException("Cannot start DataNode in "
+          + dnConf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY));
+      //since the HDFS does things based on IP:port, we need to add the mapping
+      //for IP:port to rackId
+      String ipAddr = dn.getSelfAddr().getAddress().getHostAddress();
+      if (racks != null) {
+        int port = dn.getSelfAddr().getPort();
+        if (nodeGroups == null) {
+          LOG.info("Adding node with IP:port : " + ipAddr + ":" + port +
+              " to rack " + racks[i-curDatanodesNum]);
+          StaticMapping.addNodeToRack(ipAddr + ":" + port,
+              racks[i-curDatanodesNum]);
+        } else {
+          LOG.info("Adding node with IP:port : " + ipAddr + ":" + port + " to nodeGroup " +
+          nodeGroups[i-curDatanodesNum] + " and rack " + racks[i-curDatanodesNum]);
+          StaticMapping.addNodeToRack(ipAddr + ":" + port, racks[i-curDatanodesNum] + 
+              nodeGroups[i-curDatanodesNum]);
+        }
+      }
+      DataNode.runDatanodeDaemon(dn);
+      dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs));
+    }
+    curDatanodesNum += numDataNodes;
+    this.numDataNodes += numDataNodes;
+    waitActive();
+  }
+
+  public synchronized void startDataNodes(Configuration conf, int numDataNodes, 
+      boolean manageDfsDirs, StartupOption operation, 
+      String[] racks, String[] nodeGroups, String[] hosts,
+      long[] simulatedCapacities,
+      boolean setupHostsFile) throws IOException {
+    startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, nodeGroups, 
+        hosts, simulatedCapacities);
+  }
+
+  // This is for initialize from parent class.
+  @Override
+  public synchronized void startDataNodes(Configuration conf, int numDataNodes, 
+          boolean manageDfsDirs, StartupOption operation, 
+          String[] racks, String[] hosts,
+          long[] simulatedCapacities) throws IOException {
+    startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, NODE_GROUPS, hosts,
+        simulatedCapacities);
+  }
+  
+}
+

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java?rev=1422482&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java Sun Dec 16 08:28:17 2012
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.balancer;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSClusterWithNodeGroup;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.net.NetworkTopology;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * This class tests if a balancer schedules tasks correctly.
+ */
+public class TestBalancerWithNodeGroup {
+  
+  final private static long CAPACITY = 500L;
+  final private static String RACK0 = "/rack0";
+  final private static String RACK1 = "/rack1";
+  final private static String NODEGROUP0 = "/nodegroup0";
+  final private static String NODEGROUP1 = "/nodegroup1";
+  final private static String NODEGROUP2 = "/nodegroup2";
+  final static private String fileName = "/tmp.txt";
+  final static private Path filePath = new Path(fileName);
+  private static final Log LOG = LogFactory.getLog(
+      "org.apache.hadoop.hdfs.TestBalancerWithNodeGroup");
+  MiniDFSClusterWithNodeGroup cluster;
+
+  ClientProtocol client;
+  private Balancer balancer;
+
+  static final long TIMEOUT = 20000L; //msec
+  static final double CAPACITY_ALLOWED_VARIANCE = 0.005;  // 0.5%
+  static final double BALANCE_ALLOWED_VARIANCE = 0.11;    // 10%+delta
+  static final int DEFAULT_BLOCK_SIZE = 5;
+  private static final Random r = new Random();
+
+  static {
+    Balancer.setBlockMoveWaitTime(1000L) ;
+  }
+  
+  private void initConf(Configuration conf) {
+    conf.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+    conf.setInt("io.bytes.per.checksum", DEFAULT_BLOCK_SIZE);
+    conf.setLong("dfs.heartbeat.interval", 1L);
+    conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+    conf.setLong("dfs.balancer.movedWinWidth", 2000L);
+    conf.set("net.topology.impl", "org.apache.hadoop.net.NetworkTopologyWithNodeGroup");
+    conf.set("dfs.block.replicator.classname",
+        "org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyWithNodeGroup");
+  }
+
+  /* create a file with a length of <code>fileLen</code> */
+  private void createFile(long fileLen, short replicationFactor)
+      throws IOException {
+    FileSystem fs = cluster.getFileSystem();
+    DFSTestUtil.createFile(fs, filePath, fileLen, 
+        replicationFactor, r.nextLong());
+    DFSTestUtil.waitReplication(fs, filePath, replicationFactor);
+  }
+
+  // create and initiate conf for balancer
+  private Configuration createConf() {
+    Configuration conf = new Configuration();
+    initConf(conf);
+    return conf;
+  }
+  
+  /**
+   * Wait until heartbeat gives expected results, within CAPACITY_ALLOWED_VARIANCE, 
+   * summed over all nodes.  Times out after TIMEOUT msec.
+   * @param expectedUsedSpace
+   * @param expectedTotalSpace
+   * @throws IOException - if getStats() fails
+   * @throws TimeoutException
+   */
+  private void waitForHeartBeat(long expectedUsedSpace, long expectedTotalSpace)
+  throws IOException, TimeoutException {
+    long timeout = TIMEOUT;
+    long failtime = (timeout <= 0L) ? Long.MAX_VALUE
+             : System.currentTimeMillis() + timeout;
+
+    while (true) {
+      long[] status = client.getStats();
+      double totalSpaceVariance = Math.abs((double)status[0] - expectedTotalSpace) 
+          / expectedTotalSpace;
+      double usedSpaceVariance = Math.abs((double)status[1] - expectedUsedSpace) 
+          / expectedUsedSpace;
+      if (totalSpaceVariance < CAPACITY_ALLOWED_VARIANCE 
+          && usedSpaceVariance < CAPACITY_ALLOWED_VARIANCE)
+        break; //done
+
+      if (System.currentTimeMillis() > failtime) {
+        throw new TimeoutException("Cluster failed to reached expected values of "
+            + "totalSpace (current: " + status[0] 
+            + ", expected: " + expectedTotalSpace 
+            + "), or usedSpace (current: " + status[1] 
+            + ", expected: " + expectedUsedSpace
+            + "), in more than " + timeout + " msec.");
+      }
+      try {
+        Thread.sleep(100L);
+      } catch(InterruptedException ignored) {
+      }
+    }
+  }
+  
+  /**
+   * Wait until balanced: each datanode gives utilization within 
+   * BALANCE_ALLOWED_VARIANCE of average
+   * @throws IOException
+   * @throws TimeoutException
+   */
+  private void waitForBalancer(long totalUsedSpace, long totalCapacity) 
+      throws IOException, TimeoutException {
+    long timeout = TIMEOUT;
+    long failtime = (timeout <= 0L) ? Long.MAX_VALUE
+        : System.currentTimeMillis() + timeout;
+    final double avgUtilization = ((double)totalUsedSpace) / totalCapacity;
+    boolean balanced;
+    do {
+      DatanodeInfo[] datanodeReport = 
+          client.getDatanodeReport(DatanodeReportType.ALL);
+      assertEquals(datanodeReport.length, cluster.getDataNodes().size());
+      balanced = true;
+      for (DatanodeInfo datanode : datanodeReport) {
+        double nodeUtilization = ((double)datanode.getDfsUsed())
+            / datanode.getCapacity();
+        if (Math.abs(avgUtilization - nodeUtilization) >
+            BALANCE_ALLOWED_VARIANCE) {
+          balanced = false;
+          if (System.currentTimeMillis() > failtime) {
+            throw new TimeoutException(
+                "Rebalancing expected avg utilization to become "
+                + avgUtilization + ", but on datanode " + datanode
+                + " it remains at " + nodeUtilization
+                + " after more than " + TIMEOUT + " msec.");
+          }
+          try {
+            Thread.sleep(100);
+          } catch (InterruptedException ignored) {
+          }
+          break;
+        }
+      }
+    } while (!balanced);
+  }
+  
+  /** Start balancer and check if the cluster is balanced after the run */
+  private void runBalancer(Configuration conf,
+      long totalUsedSpace, long totalCapacity) throws Exception {
+    waitForHeartBeat(totalUsedSpace, totalCapacity);
+
+    // start rebalancing
+    balancer = new Balancer(conf);
+    
+    final int r = balancer.run(new String[0]);
+    
+    assertEquals(Balancer.SUCCESS, r);
+
+    waitForHeartBeat(totalUsedSpace, totalCapacity);
+    LOG.info("Rebalancing with default factor.");
+    waitForBalancer(totalUsedSpace, totalCapacity);
+  }
+  
+  private void runBalancerCanFinish(Configuration conf,
+      long totalUsedSpace, long totalCapacity) throws Exception {
+    waitForHeartBeat(totalUsedSpace, totalCapacity);
+
+    balancer = new Balancer(conf);
+    final int r = balancer.run(new String[0]);
+    Assert.assertTrue(r == Balancer.SUCCESS ||
+        (r == Balancer.NO_MOVE_PROGRESS));
+    waitForHeartBeat(totalUsedSpace, totalCapacity);
+    LOG.info("Rebalancing ends successful.");
+  }
+  
+  /**
+   * Create a 4 nodes cluster: 2 nodes (n0, n1) in RACK0/NODEGROUP0, 1 node (n2)
+   * in RACK1/NODEGROUP1 and 1 node (n3) in RACK1/NODEGROUP2. Fill the cluster 
+   * to 60% and 3 replicas, so n2 and n3 will have replica for all blocks according
+   * to replica placement policy with NodeGroup. As a result, n2 and n3 will be
+   * filled with 80% (60% x 4 / 3), and no blocks can be migrated from n2 and n3
+   * to n0 or n1 as balancer policy with node group. Thus, we expect the balancer
+   * to end in 5 iterations without move block process.
+   */
+  @Test
+  public void testBalancerEndInNoMoveProgress() throws Exception {
+    Configuration conf = createConf();
+    long[] capacities = new long[]{CAPACITY, CAPACITY, CAPACITY, CAPACITY};
+    String[] racks = new String[]{RACK0, RACK0, RACK1, RACK1};
+    String[] nodeGroups = new String[]{NODEGROUP0, NODEGROUP0, NODEGROUP1, NODEGROUP2};
+    
+    int numOfDatanodes = capacities.length;
+    assertEquals(numOfDatanodes, racks.length);
+    assertEquals(numOfDatanodes, nodeGroups.length);
+    MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups);
+    cluster = new MiniDFSClusterWithNodeGroup(0, conf, capacities.length,
+        true, true, null, racks, capacities);
+    try {
+      cluster.waitActive();
+      client = DFSClient.createNamenode(conf);
+
+      long totalCapacity = 0L;
+      for(long capacity : capacities) {
+        totalCapacity += capacity;
+      }
+      
+      // fill up the cluster to be 60% full
+      long totalUsedSpace = totalCapacity * 6 / 10;
+      
+      createFile(totalUsedSpace / 3, (short) 3);
+
+      // run balancer which can finish in 5 iterations with no block movement.
+      runBalancerCanFinish(conf, totalUsedSpace, totalCapacity);
+
+    } finally {
+      cluster.shutdown();
+    }
+  }
+  
+  /**
+   * Create a cluster with even distribution, and a new empty node is added to
+   * the cluster, then test rack locality for balancer policy. 
+   */
+  @Test
+  public void testBalancerWithRackLocality() throws Exception {
+    Configuration conf = createConf();
+    long[] capacities = new long[]{CAPACITY, CAPACITY};
+    String[] racks = new String[]{RACK0, RACK1};
+    String[] nodeGroups = new String[]{NODEGROUP0, NODEGROUP1};
+    
+    int numOfDatanodes = capacities.length;
+    assertEquals(numOfDatanodes, racks.length);
+    MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups);
+    
+    MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups);
+    cluster = new MiniDFSClusterWithNodeGroup(0, conf, capacities.length,
+        true, true, null, racks, capacities);
+    try {
+      cluster.waitActive();
+      
+      client = DFSClient.createNamenode(conf);
+      
+      long totalCapacity = 0L;
+      for(long capacity : capacities) {
+        totalCapacity += capacity;
+      }
+      
+      // fill up the cluster to be 30% full
+      long totalUsedSpace = totalCapacity * 3 / 10;
+      createFile(totalUsedSpace / numOfDatanodes, (short) numOfDatanodes);
+      
+      long newCapacity = CAPACITY;
+      String newRack = RACK1;
+      String newNodeGroup = NODEGROUP2;
+      
+      // start up an empty node with the same capacity and on the same rack
+      cluster.startDataNodes(conf, 1, true, null, new String[]{newRack},
+          new String[]{newNodeGroup}, new long[] {newCapacity});
+
+      totalCapacity += newCapacity;
+
+      // run balancer and validate results
+      runBalancer(conf, totalUsedSpace, totalCapacity);
+      
+      DatanodeInfo[] datanodeReport = 
+              client.getDatanodeReport(DatanodeReportType.ALL);
+      
+      Map<String, Integer> rackToUsedCapacity = new HashMap<String, Integer>();
+      for (DatanodeInfo datanode: datanodeReport) {
+        String rack = NetworkTopology.getFirstHalf(datanode.getNetworkLocation());
+        int usedCapacity = (int) datanode.getDfsUsed();
+         
+        if (rackToUsedCapacity.get(rack) != null) {
+          rackToUsedCapacity.put(rack, usedCapacity + rackToUsedCapacity.get(rack));
+        } else {
+          rackToUsedCapacity.put(rack, usedCapacity);
+        }
+      }
+      assertEquals(rackToUsedCapacity.size(), 2);
+      assertEquals(rackToUsedCapacity.get(RACK0), rackToUsedCapacity.get(RACK1));
+      
+    } finally {
+      cluster.shutdown();
+    }
+  }
+  
+  /** Create a cluster with even distribution, and a new empty node is added to
+   *  the cluster, then test rack locality for balancer policy. 
+   **/
+  @Test
+  public void testBalancerWithNodeGroup() throws Exception {
+    Configuration conf = createConf();
+    long[] capacities = new long[]{CAPACITY, CAPACITY};
+    String[] racks = new String[]{RACK0, RACK1};
+    String[] nodeGroups = new String[]{NODEGROUP0, NODEGROUP1};
+    
+    int numOfDatanodes = capacities.length;
+    assertEquals(numOfDatanodes, racks.length);
+    MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups);
+    cluster = new MiniDFSClusterWithNodeGroup(0, conf, capacities.length,
+        true, true, null, racks, capacities);
+    try {
+      cluster.waitActive();
+      client = DFSClient.createNamenode(conf);
+      
+      long totalCapacity = 0L;
+      for(long capacity : capacities) {
+        totalCapacity += capacity;
+      }
+      
+      // fill up the cluster to be 30% full
+      long totalUsedSpace = totalCapacity*3/10;
+      
+      createFile(totalUsedSpace / numOfDatanodes, (short) numOfDatanodes);
+      
+      long newCapacity = CAPACITY;
+      String newRack = RACK1;
+      String newNodeGroup = NODEGROUP2;
+      // start up an empty node with the same capacity and on the same rack
+      cluster.startDataNodes(conf, 1, true, null, new String[]{newRack},
+          new String[]{newNodeGroup}, new long[] {newCapacity});
+
+      totalCapacity += newCapacity;
+
+      // run balancer and validate results
+      runBalancer(conf, totalUsedSpace, totalCapacity);
+      
+    } finally {
+      cluster.shutdown();
+    }
+  }
+  
+}
\ No newline at end of file