You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2013/06/03 07:28:34 UTC

svn commit: r1488848 - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/ src/main/java/org/apache/hadoop/hdfs/server/balancer/ src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/test/java/org/apache/ha...

Author: szetszwo
Date: Mon Jun  3 05:28:34 2013
New Revision: 1488848

URL: http://svn.apache.org/r1488848
Log:
Merge r1414874 and r1414878 from trunk for HDFS-3495. Update Balancer to support new NetworkTopology with NodeGroup.

Added:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java
      - copied unchanged from r1414874, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java
      - copied unchanged from r1414874, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java
Modified:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/   (props changed)
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/   (props changed)
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1414874

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1488848&r1=1488847&r2=1488848&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Jun  3 05:28:34 2013
@@ -43,6 +43,9 @@ Release 2.1.0-beta - UNRELEASED
     HDFS-3601. Add BlockPlacementPolicyWithNodeGroup to support block placement
     with 4-layer network topology.  (Junping Du via szetszwo)
 
+    HDFS-3495. Update Balancer to support new NetworkTopology with NodeGroup.
+    (Junping Du via szetszwo)
+
   IMPROVEMENTS
 
     HDFS-4222. NN is unresponsive and loses heartbeats from DNs when 

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1414874

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1488848&r1=1488847&r2=1488848&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Mon Jun  3 05:28:34 2013
@@ -169,7 +169,7 @@ import org.apache.hadoop.util.ToolRunner
  * <ol>
  * <li>The cluster is balanced. Exiting
  * <li>No block can be moved. Exiting...
- * <li>No block has been moved for 3 iterations. Exiting...
+ * <li>No block has been moved for 5 iterations. Exiting...
  * <li>Received an IO exception: failure reason. Exiting...
  * <li>Another balancer is running. Exiting...
  * </ol>
@@ -223,7 +223,7 @@ public class Balancer {
   private Map<String, BalancerDatanode> datanodes
                  = new HashMap<String, BalancerDatanode>();
   
-  private NetworkTopology cluster = new NetworkTopology();
+  private NetworkTopology cluster;
   
   final static private int MOVER_THREAD_POOL_SIZE = 1000;
   final private ExecutorService moverExecutor = 
@@ -250,7 +250,7 @@ public class Balancer {
      * Return true if a block and its proxy are chosen; false otherwise
      */
     private boolean chooseBlockAndProxy() {
-      // iterate all source's blocks until find a good one    
+      // iterate all source's blocks until find a good one
       for (Iterator<BalancerBlock> blocks=
         source.getBlockIterator(); blocks.hasNext();) {
         if (markMovedIfGoodBlock(blocks.next())) {
@@ -294,22 +294,35 @@ public class Balancer {
      * @return true if a proxy is found; otherwise false
      */
     private boolean chooseProxySource() {
-      // check if there is replica which is on the same rack with the target
+      final DatanodeInfo targetDN = target.getDatanode();
+      boolean find = false;
       for (BalancerDatanode loc : block.getLocations()) {
-        if (cluster.isOnSameRack(loc.getDatanode(), target.getDatanode())) {
-          if (loc.addPendingBlock(this)) {
-            proxySource = loc;
+        // check if there is replica which is on the same rack with the target
+        if (cluster.isOnSameRack(loc.getDatanode(), targetDN) && addTo(loc)) {
+          find = true;
+          // if cluster is not nodegroup aware or the proxy is on the same 
+          // nodegroup with target, then we already find the nearest proxy
+          if (!cluster.isNodeGroupAware() 
+              || cluster.isOnSameNodeGroup(loc.getDatanode(), targetDN)) {
             return true;
           }
         }
-      }
-      // find out a non-busy replica
-      for (BalancerDatanode loc : block.getLocations()) {
-        if (loc.addPendingBlock(this)) {
-          proxySource = loc;
-          return true;
+        
+        if (!find) {
+          // find out a non-busy replica out of rack of target
+          find = addTo(loc);
         }
       }
+      
+      return find;
+    }
+    
+    // add a BalancerDatanode as proxy source for specific block movement
+    private boolean addTo(BalancerDatanode bdn) {
+      if (bdn.addPendingBlock(this)) {
+        proxySource = bdn;
+        return true;
+      }
       return false;
     }
     
@@ -687,7 +700,7 @@ public class Balancer {
         NodeTask task = tasks.next();
         BalancerDatanode target = task.getDatanode();
         PendingBlockMove pendingBlock = new PendingBlockMove();
-        if ( target.addPendingBlock(pendingBlock) ) { 
+        if (target.addPendingBlock(pendingBlock)) { 
           // target is not busy, so do a tentative block allocation
           pendingBlock.source = this;
           pendingBlock.target = target;
@@ -788,9 +801,10 @@ public class Balancer {
    */
   private static void checkReplicationPolicyCompatibility(Configuration conf
       ) throws UnsupportedActionException {
-    if (BlockPlacementPolicy.getInstance(conf, null, null).getClass() != 
-        BlockPlacementPolicyDefault.class) {
-      throw new UnsupportedActionException("Balancer without BlockPlacementPolicyDefault");
+    if (BlockPlacementPolicy.getInstance(conf, null, null) instanceof 
+        BlockPlacementPolicyDefault) {
+      throw new UnsupportedActionException(
+          "Balancer without BlockPlacementPolicyDefault");
     }
   }
 
@@ -805,6 +819,7 @@ public class Balancer {
     this.threshold = p.threshold;
     this.policy = p.policy;
     this.nnc = theblockpool;
+    cluster = NetworkTopology.getInstance(conf);
   }
   
   /* Shuffle datanode array */
@@ -915,9 +930,15 @@ public class Balancer {
    * Return total number of bytes to move in this iteration
    */
   private long chooseNodes() {
-    // Match nodes on the same rack first
+    // First, match nodes on the same node group if cluster has nodegroup
+    // awareness
+    if (cluster.isNodeGroupAware()) {
+      chooseNodesOnSameNodeGroup();
+    }
+    
+    // Then, match nodes on the same rack
     chooseNodes(true);
-    // Then match nodes on different racks
+    // At last, match nodes on different racks
     chooseNodes(false);
     
     assert (datanodes.size() >= sources.size()+targets.size())
@@ -932,6 +953,102 @@ public class Balancer {
     }
     return bytesToMove;
   }
+  
+  /**
+   * Decide all <source, target> pairs where source and target are 
+   * on the same NodeGroup
+   */
+  private void chooseNodesOnSameNodeGroup() {
+
+    /* first step: match each overUtilized datanode (source) to
+     * one or more underUtilized datanodes within same NodeGroup(targets).
+     */
+    chooseOnSameNodeGroup(overUtilizedDatanodes, underUtilizedDatanodes);
+
+    /* match each remaining overutilized datanode (source) to below average 
+     * utilized datanodes within the same NodeGroup(targets).
+     * Note only overutilized datanodes that haven't had that max bytes to move
+     * satisfied in step 1 are selected
+     */
+    chooseOnSameNodeGroup(overUtilizedDatanodes, belowAvgUtilizedDatanodes);
+
+    /* match each remaining underutilized datanode to above average utilized 
+     * datanodes within the same NodeGroup.
+     * Note only underutilized datanodes that have not had that max bytes to
+     * move satisfied in step 1 are selected.
+     */
+    chooseOnSameNodeGroup(underUtilizedDatanodes, aboveAvgUtilizedDatanodes);
+  }
+  
+  /**
+   * Match two sets of nodes within the same NodeGroup, one should be source
+   * nodes (utilization > Avg), and the other should be destination nodes 
+   * (utilization < Avg).
+   * @param datanodes
+   * @param candidates
+   */
+  private <D extends BalancerDatanode, C extends BalancerDatanode> void 
+      chooseOnSameNodeGroup(Collection<D> datanodes, Collection<C> candidates) {
+    for (Iterator<D> i = datanodes.iterator(); i.hasNext();) {
+      final D datanode = i.next();
+      for(; chooseOnSameNodeGroup(datanode, candidates.iterator()); );
+      if (!datanode.isMoveQuotaFull()) {
+        i.remove();
+      }
+    }
+  }
+  
+  /**
+   * Match one datanode with a set of candidates nodes within the same NodeGroup.
+   */
+  private <T extends BalancerDatanode> boolean chooseOnSameNodeGroup(
+      BalancerDatanode dn, Iterator<T> candidates) {
+    final T chosen = chooseCandidateOnSameNodeGroup(dn, candidates);
+    if (chosen == null) {
+      return false;
+    }
+    if (dn instanceof Source) {
+      matchSourceWithTargetToMove((Source)dn, chosen);
+    } else {
+      matchSourceWithTargetToMove((Source)chosen, dn);
+    }
+    if (!chosen.isMoveQuotaFull()) {
+      candidates.remove();
+    }
+    return true;
+  }
+  
+  private void matchSourceWithTargetToMove(
+      Source source, BalancerDatanode target) {
+    long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove());
+    NodeTask nodeTask = new NodeTask(target, size);
+    source.addNodeTask(nodeTask);
+    target.incScheduledSize(nodeTask.getSize());
+    sources.add(source);
+    targets.add(target);
+    LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
+        +source.datanode.getName() + " to " + target.datanode.getName());
+  }
+  
+  /** choose a datanode from <code>candidates</code> within the same NodeGroup 
+   * of <code>dn</code>.
+   */
+  private <T extends BalancerDatanode> T chooseCandidateOnSameNodeGroup(
+      BalancerDatanode dn, Iterator<T> candidates) {
+    if (dn.isMoveQuotaFull()) {
+      for(; candidates.hasNext(); ) {
+        final T c = candidates.next();
+        if (!c.isMoveQuotaFull()) {
+          candidates.remove();
+          continue;
+        }
+        if (cluster.isOnSameNodeGroup(dn.getDatanode(), c.getDatanode())) {
+          return c;
+        }
+      }
+    }
+    return null;
+  }
 
   /* if onRack is true, decide all <source, target> pairs
    * where source and target are on the same rack; Otherwise
@@ -942,33 +1059,33 @@ public class Balancer {
     /* first step: match each overUtilized datanode (source) to
      * one or more underUtilized datanodes (targets).
      */
-    chooseTargets(underUtilizedDatanodes.iterator(), onRack);
+    chooseTargets(underUtilizedDatanodes, onRack);
     
     /* match each remaining overutilized datanode (source) to 
      * below average utilized datanodes (targets).
      * Note only overutilized datanodes that haven't had that max bytes to move
      * satisfied in step 1 are selected
      */
-    chooseTargets(belowAvgUtilizedDatanodes.iterator(), onRack);
+    chooseTargets(belowAvgUtilizedDatanodes, onRack);
 
-    /* match each remaining underutilized datanode to 
-     * above average utilized datanodes.
+    /* match each remaining underutilized datanode (target) to 
+     * above average utilized datanodes (source).
      * Note only underutilized datanodes that have not had that max bytes to
      * move satisfied in step 1 are selected.
      */
-    chooseSources(aboveAvgUtilizedDatanodes.iterator(), onRack);
+    chooseSources(aboveAvgUtilizedDatanodes, onRack);
   }
    
   /* choose targets from the target candidate list for each over utilized
    * source datanode. OnRackTarget determines if the chosen target 
    * should be on the same rack as the source
    */
-  private void chooseTargets(  
-      Iterator<BalancerDatanode> targetCandidates, boolean onRackTarget ) {
+  private void chooseTargets(
+      Collection<BalancerDatanode> targetCandidates, boolean onRackTarget ) {
     for (Iterator<Source> srcIterator = overUtilizedDatanodes.iterator();
         srcIterator.hasNext();) {
       Source source = srcIterator.next();
-      while (chooseTarget(source, targetCandidates, onRackTarget)) {
+      while (chooseTarget(source, targetCandidates.iterator(), onRackTarget)) {
       }
       if (!source.isMoveQuotaFull()) {
         srcIterator.remove();
@@ -982,11 +1099,11 @@ public class Balancer {
    * should be on the same rack as the target
    */
   private void chooseSources(
-      Iterator<Source> sourceCandidates, boolean onRackSource) {
+      Collection<Source> sourceCandidates, boolean onRackSource) {
     for (Iterator<BalancerDatanode> targetIterator = 
       underUtilizedDatanodes.iterator(); targetIterator.hasNext();) {
       BalancerDatanode target = targetIterator.next();
-      while (chooseSource(target, sourceCandidates, onRackSource)) {
+      while (chooseSource(target, sourceCandidates.iterator(), onRackSource)) {
       }
       if (!target.isMoveQuotaFull()) {
         targetIterator.remove();
@@ -1026,23 +1143,15 @@ public class Balancer {
     }
     if (foundTarget) {
       assert(target != null):"Choose a null target";
-      long size = Math.min(source.availableSizeToMove(),
-          target.availableSizeToMove());
-      NodeTask nodeTask = new NodeTask(target, size);
-      source.addNodeTask(nodeTask);
-      target.incScheduledSize(nodeTask.getSize());
-      sources.add(source);
-      targets.add(target);
+      matchSourceWithTargetToMove(source, target);
       if (!target.isMoveQuotaFull()) {
         targetCandidates.remove();
       }
-      LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
-          +source.datanode + " to " + target.datanode);
       return true;
     }
     return false;
   }
-  
+
   /* For the given target, choose sources from the source candidate list.
    * OnRackSource determines if the chosen source 
    * should be on the same rack as the target
@@ -1074,18 +1183,10 @@ public class Balancer {
     }
     if (foundSource) {
       assert(source != null):"Choose a null source";
-      long size = Math.min(source.availableSizeToMove(),
-          target.availableSizeToMove());
-      NodeTask nodeTask = new NodeTask(target, size);
-      source.addNodeTask(nodeTask);
-      target.incScheduledSize(nodeTask.getSize());
-      sources.add(source);
-      targets.add(target);
+      matchSourceWithTargetToMove(source, target);
       if ( !source.isMoveQuotaFull()) {
-        sourceCandidates.remove();
-      }
-      LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
-          +source.datanode + " to " + target.datanode);
+          sourceCandidates.remove();
+        }
       return true;
     }
     return false;
@@ -1227,6 +1328,10 @@ public class Balancer {
     if (block.isLocatedOnDatanode(target)) {
       return false;
     }
+    if (cluster.isNodeGroupAware() && 
+        isOnSameNodeGroupWithReplicas(target, block, source)) {
+      return false;
+    }
 
     boolean goodBlock = false;
     if (cluster.isOnSameRack(source.getDatanode(), target.getDatanode())) {
@@ -1258,10 +1363,32 @@ public class Balancer {
     }
     return goodBlock;
   }
-  
+
+  /**
+   * Check if there are any replica (other than source) on the same node group
+   * with target. If true, then target is not a good candidate for placing 
+   * specific block replica as we don't want 2 replicas under the same nodegroup 
+   * after balance.
+   * @param target targetDataNode
+   * @param block dataBlock
+   * @param source sourceDataNode
+   * @return true if there are any replica (other than source) on the same node
+   * group with target
+   */
+  private boolean isOnSameNodeGroupWithReplicas(BalancerDatanode target,
+      BalancerBlock block, Source source) {
+    for (BalancerDatanode loc : block.locations) {
+      if (loc != source && 
+        cluster.isOnSameNodeGroup(loc.getDatanode(), target.getDatanode())) {
+          return true;
+        }
+      }
+    return false;
+  }
+
   /* reset all fields in a balancer preparing for the next iteration */
-  private void resetData() {
-    this.cluster = new NetworkTopology();
+  private void resetData(Configuration conf) {
+    this.cluster = NetworkTopology.getInstance(conf);
     this.overUtilizedDatanodes.clear();
     this.aboveAvgUtilizedDatanodes.clear();
     this.belowAvgUtilizedDatanodes.clear();
@@ -1333,7 +1460,8 @@ public class Balancer {
   }
 
   /** Run an iteration for all datanodes. */
-  private ReturnStatus run(int iteration, Formatter formatter) {
+  private ReturnStatus run(int iteration, Formatter formatter,
+      Configuration conf) {
     try {
       /* get all live datanodes of a cluster and their disk usage
        * decide the number of bytes need to be moved
@@ -1387,7 +1515,7 @@ public class Balancer {
       }
 
       // clean all lists
-      resetData();
+      resetData(conf);
       return ReturnStatus.IN_PROGRESS;
     } catch (IllegalArgumentException e) {
       System.out.println(e + ".  Exiting ...");
@@ -1435,7 +1563,7 @@ public class Balancer {
         Collections.shuffle(connectors);
         for(NameNodeConnector nnc : connectors) {
           final Balancer b = new Balancer(nnc, p, conf);
-          final ReturnStatus r = b.run(iteration, formatter);
+          final ReturnStatus r = b.run(iteration, formatter, conf);
           if (r == ReturnStatus.IN_PROGRESS) {
             done = false;
           } else if (r != ReturnStatus.SUCCESS) {

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1488848&r1=1488847&r2=1488848&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Mon Jun  3 05:28:34 2013
@@ -39,7 +39,6 @@ import org.apache.hadoop.HadoopIllegalAr
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -163,11 +162,7 @@ public class DatanodeManager {
     this.namesystem = namesystem;
     this.blockManager = blockManager;
     
-    Class<? extends NetworkTopology> networkTopologyClass =
-        conf.getClass(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
-            NetworkTopology.class, NetworkTopology.class);
-    networktopology = (NetworkTopology) ReflectionUtils.newInstance(
-        networkTopologyClass, conf);
+    networktopology = NetworkTopology.getInstance(conf);
 
     this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);
 

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1488848&r1=1488847&r2=1488848&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java Mon Jun  3 05:28:34 2013
@@ -320,7 +320,7 @@ public class MiniDFSCluster {
   /**
    * Used by builder to create and return an instance of MiniDFSCluster
    */
-  private MiniDFSCluster(Builder builder) throws IOException {
+  protected MiniDFSCluster(Builder builder) throws IOException {
     if (builder.nnTopology == null) {
       // If no topology is specified, build a single NN. 
       builder.nnTopology = MiniDFSNNTopology.simpleSingleNN(
@@ -368,7 +368,7 @@ public class MiniDFSCluster {
 
   private Configuration conf;
   private NameNodeInfo[] nameNodes;
-  private int numDataNodes;
+  protected int numDataNodes;
   protected ArrayList<DataNodeProperties> dataNodes = 
                          new ArrayList<DataNodeProperties>();
   private File base_dir;
@@ -2318,7 +2318,7 @@ public class MiniDFSCluster {
     return nameNodes[nnIndex].nameNode;
   }
   
-  private void setupDatanodeAddress(Configuration conf, boolean setupHostsFile,
+  protected void setupDatanodeAddress(Configuration conf, boolean setupHostsFile,
                            boolean checkDataNodeAddrConfig) throws IOException {
     if (setupHostsFile) {
       String hostsFile = conf.get(DFS_HOSTS, "").trim();