You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2013/06/14 22:17:42 UTC

svn commit: r1493233 - in /hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/main/java/org/apache/hadoop/hdfs/server/datanod...

Author: cmccabe
Date: Fri Jun 14 20:17:41 2013
New Revision: 1493233

URL: http://svn.apache.org/r1493233
Log:
HDFS-3934. duplicative dfs_hosts entries handled wrong (all files) (cmccabe)


Added:
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/HostFileManager.java
Modified:
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1493233&r1=1493232&r2=1493233&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Jun 14 20:17:41 2013
@@ -151,6 +151,9 @@ Release 2.1.0-beta - UNRELEASED
   OPTIMIZATIONS
 
   BUG FIXES
+
+    HDFS-3934. duplicative dfs_hosts entries handled wrong. (Colin Patrick
+    McCabe)
     
     HDFS-4470. Several HDFS tests attempt file operations on invalid HDFS
     paths when running on Windows. (Chris Nauroth via suresh)

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1493233&r1=1493232&r2=1493233&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Jun 14 20:17:41 2013
@@ -327,7 +327,7 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_DATANODE_HTTPS_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_DATANODE_HTTPS_DEFAULT_PORT;
   public static final String  DFS_DATANODE_IPC_ADDRESS_KEY = "dfs.datanode.ipc.address";
   public static final int     DFS_DATANODE_IPC_DEFAULT_PORT = 50020;
-  public static final String  DFS_DATANODE_IPC_ADDRESS_DEFAULT = "0.0.0.0" + DFS_DATANODE_IPC_DEFAULT_PORT;
+  public static final String  DFS_DATANODE_IPC_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_DATANODE_IPC_DEFAULT_PORT;
   public static final String  DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY = "dfs.datanode.min.supported.namenode.version";
   public static final String  DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_DEFAULT = "2.0.0-SNAPSHOT";
 

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1493233&r1=1493232&r2=1493233&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Fri Jun 14 20:17:41 2013
@@ -26,11 +26,9 @@ import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NavigableMap;
-import java.util.Set;
 import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
@@ -50,6 +48,17 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
+import org.apache.hadoop.hdfs.server.namenode.HostFileManager;
+import org.apache.hadoop.hdfs.server.namenode.HostFileManager.Entry;
+import org.apache.hadoop.hdfs.server.namenode.HostFileManager.EntrySet;
+import org.apache.hadoop.hdfs.server.namenode.HostFileManager.MutableEntrySet;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
+import org.apache.hadoop.hdfs.server.namenode.HostFileManager;
+import org.apache.hadoop.hdfs.server.namenode.HostFileManager.Entry;
+import org.apache.hadoop.hdfs.server.namenode.HostFileManager.EntrySet;
+import org.apache.hadoop.hdfs.server.namenode.HostFileManager.MutableEntrySet;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
 import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
@@ -62,16 +71,17 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
 import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
 import org.apache.hadoop.hdfs.util.CyclicIteration;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.CachedDNSToSwitchMapping;
 import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.ScriptBasedMapping;
 import org.apache.hadoop.util.Daemon;
-import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Time;
 
@@ -120,8 +130,14 @@ public class DatanodeManager {
 
   private final DNSToSwitchMapping dnsToSwitchMapping;
 
+  private final int defaultXferPort;
+  
+  private final int defaultInfoPort;
+
+  private final int defaultIpcPort;
+
   /** Read include/exclude files*/
-  private final HostsFileReader hostsReader;
+  private final HostFileManager hostFileManager = new HostFileManager();
 
   /** The period to wait for datanode heartbeat.*/
   private final long heartbeatExpireInterval;
@@ -162,13 +178,25 @@ public class DatanodeManager {
     this.namesystem = namesystem;
     this.blockManager = blockManager;
     
-    networktopology = NetworkTopology.getInstance(conf);
-
     this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);
 
-    this.hostsReader = new HostsFileReader(
-        conf.get(DFSConfigKeys.DFS_HOSTS, ""),
+    networktopology = NetworkTopology.getInstance(conf);
+
+    this.defaultXferPort = NetUtils.createSocketAddr(
+          conf.get(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY,
+              DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT)).getPort();
+    this.defaultInfoPort = NetUtils.createSocketAddr(
+          conf.get(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY,
+              DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_DEFAULT)).getPort();
+    this.defaultIpcPort = NetUtils.createSocketAddr(
+          conf.get(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY,
+              DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_DEFAULT)).getPort();
+    try {
+      this.hostFileManager.refresh(conf.get(DFSConfigKeys.DFS_HOSTS, ""),
         conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
+    } catch (IOException e) {
+      LOG.error("error reading hosts files: ", e);
+    }
 
     this.dnsToSwitchMapping = ReflectionUtils.newInstance(
         conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, 
@@ -178,9 +206,15 @@ public class DatanodeManager {
     // locations of those hosts in the include list and store the mapping
     // in the cache; so future calls to resolve will be fast.
     if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
-      dnsToSwitchMapping.resolve(new ArrayList<String>(hostsReader.getHosts()));
-    }
-    
+      final ArrayList<String> locations = new ArrayList<String>();
+      for (Entry entry : hostFileManager.getIncludes()) {
+        if (!entry.getIpAddress().isEmpty()) {
+          locations.add(entry.getIpAddress());
+        }
+      }
+      dnsToSwitchMapping.resolve(locations);
+    };
+
     final long heartbeatIntervalSeconds = conf.getLong(
         DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
         DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
@@ -533,14 +567,6 @@ public class DatanodeManager {
     return networkLocation;
   }
 
-  private boolean inHostsList(DatanodeID node) {
-     return checkInList(node, hostsReader.getHosts(), false);
-  }
-  
-  private boolean inExcludedHostsList(DatanodeID node) {
-    return checkInList(node, hostsReader.getExcludedHosts(), true);
-  }
-
   /**
    * Remove an already decommissioned data node who is neither in include nor
    * exclude hosts lists from the the list of live or dead nodes.  This is used
@@ -570,51 +596,27 @@ public class DatanodeManager {
   private void removeDecomNodeFromList(final List<DatanodeDescriptor> nodeList) {
     // If the include list is empty, any nodes are welcomed and it does not
     // make sense to exclude any nodes from the cluster. Therefore, no remove.
-    if (hostsReader.getHosts().isEmpty()) {
+    if (!hostFileManager.hasIncludes()) {
       return;
     }
-    
+
     for (Iterator<DatanodeDescriptor> it = nodeList.iterator(); it.hasNext();) {
       DatanodeDescriptor node = it.next();
-      if ((!inHostsList(node)) && (!inExcludedHostsList(node))
+      if ((!hostFileManager.isIncluded(node)) && (!hostFileManager.isExcluded(node))
           && node.isDecommissioned()) {
         // Include list is not empty, an existing datanode does not appear
         // in both include or exclude lists and it has been decommissioned.
-        // Remove it from the node list.
         it.remove();
       }
     }
   }
 
   /**
-   * Check if the given DatanodeID is in the given (include or exclude) list.
-   * 
-   * @param node the DatanodeID to check
-   * @param hostsList the list of hosts in the include/exclude file
-   * @param isExcludeList true if this is the exclude list
-   * @return true if the node is in the list, false otherwise
-   */
-  private static boolean checkInList(final DatanodeID node,
-      final Set<String> hostsList,
-      final boolean isExcludeList) {
-    // if include list is empty, host is in include list
-    if ( (!isExcludeList) && (hostsList.isEmpty()) ){
-      return true;
-    }
-    for (String name : getNodeNamesForHostFiltering(node)) {
-      if (hostsList.contains(name)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  /**
    * Decommission the node if it is in exclude list.
    */
   private void checkDecommissioning(DatanodeDescriptor nodeReg) { 
     // If the registered node is in exclude list, then decommission it
-    if (inExcludedHostsList(nodeReg)) {
+    if (hostFileManager.isExcluded(nodeReg)) {
       startDecommission(nodeReg);
     }
   }
@@ -710,7 +712,7 @@ public class DatanodeManager {
   
       // Checks if the node is not on the hosts list.  If it is not, then
       // it will be disallowed from registering. 
-      if (!inHostsList(nodeReg)) {
+      if (!hostFileManager.isIncluded(nodeReg)) {
         throw new DisallowedDatanodeException(nodeReg);
       }
         
@@ -844,9 +846,8 @@ public class DatanodeManager {
     if (conf == null) {
       conf = new HdfsConfiguration();
     }
-    hostsReader.updateFileNames(conf.get(DFSConfigKeys.DFS_HOSTS, ""), 
-                                conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
-    hostsReader.refresh();
+    this.hostFileManager.refresh(conf.get(DFSConfigKeys.DFS_HOSTS, ""),
+      conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
   }
   
   /**
@@ -858,10 +859,10 @@ public class DatanodeManager {
   private void refreshDatanodes() {
     for(DatanodeDescriptor node : datanodeMap.values()) {
       // Check if not include.
-      if (!inHostsList(node)) {
+      if (!hostFileManager.isIncluded(node)) {
         node.setDisallowed(true); // case 2.
       } else {
-        if (inExcludedHostsList(node)) {
+        if (hostFileManager.isExcluded(node)) {
           startDecommission(node); // case 3.
         } else {
           stopDecommission(node); // case 4.
@@ -1076,25 +1077,10 @@ public class DatanodeManager {
     boolean listDeadNodes = type == DatanodeReportType.ALL ||
                             type == DatanodeReportType.DEAD;
 
-    HashMap<String, String> mustList = new HashMap<String, String>();
-
-    if (listDeadNodes) {
-      // Put all nodes referenced in the hosts files in the map
-      Iterator<String> it = hostsReader.getHosts().iterator();
-      while (it.hasNext()) {
-        mustList.put(it.next(), "");
-      }
-      it = hostsReader.getExcludedHosts().iterator(); 
-      while (it.hasNext()) {
-        mustList.put(it.next(), "");
-      }
-    }
-
     ArrayList<DatanodeDescriptor> nodes = null;
-    
+    final MutableEntrySet foundNodes = new MutableEntrySet();
     synchronized(datanodeMap) {
-      nodes = new ArrayList<DatanodeDescriptor>(datanodeMap.size() + 
-                                                mustList.size());
+      nodes = new ArrayList<DatanodeDescriptor>(datanodeMap.size());
       Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
       while (it.hasNext()) { 
         DatanodeDescriptor dn = it.next();
@@ -1102,47 +1088,45 @@ public class DatanodeManager {
         if ( (isDead && listDeadNodes) || (!isDead && listLiveNodes) ) {
           nodes.add(dn);
         }
-        for (String name : getNodeNamesForHostFiltering(dn)) {
-          mustList.remove(name);
-        }
+        foundNodes.add(dn);
       }
     }
-    
+
     if (listDeadNodes) {
-      Iterator<String> it = mustList.keySet().iterator();
-      while (it.hasNext()) {
-        // The remaining nodes are ones that are referenced by the hosts
-        // files but that we do not know about, ie that we have never
-        // head from. Eg. a host that is no longer part of the cluster
-        // or a bogus entry was given in the hosts files
-        DatanodeID dnId = parseDNFromHostsEntry(it.next());
-        DatanodeDescriptor dn = new DatanodeDescriptor(dnId); 
-        dn.setLastUpdate(0); // Consider this node dead for reporting
-        nodes.add(dn);
+      final EntrySet includedNodes = hostFileManager.getIncludes();
+      final EntrySet excludedNodes = hostFileManager.getExcludes();
+      for (Entry entry : includedNodes) {
+        if ((foundNodes.find(entry) == null) &&
+            (excludedNodes.find(entry) == null)) {
+          // The remaining nodes are ones that are referenced by the hosts
+          // files but that we do not know about, ie that we have never
+          // head from. Eg. an entry that is no longer part of the cluster
+          // or a bogus entry was given in the hosts files
+          //
+          // If the host file entry specified the xferPort, we use that.
+          // Otherwise, we guess that it is the default xfer port.
+          // We can't ask the DataNode what it had configured, because it's
+          // dead.
+          DatanodeDescriptor dn =
+              new DatanodeDescriptor(new DatanodeID(entry.getIpAddress(),
+                  entry.getPrefix(), "",
+                  entry.getPort() == 0 ? defaultXferPort : entry.getPort(),
+                  defaultInfoPort, defaultIpcPort));
+          dn.setLastUpdate(0); // Consider this node dead for reporting
+          nodes.add(dn);
+        }
       }
     }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("getDatanodeListForReport with " +
+          "includedNodes = " + hostFileManager.getIncludes() +
+          ", excludedNodes = " + hostFileManager.getExcludes() +
+          ", foundNodes = " + foundNodes +
+          ", nodes = " + nodes);
+    }
     return nodes;
   }
   
-  private static List<String> getNodeNamesForHostFiltering(DatanodeID node) {
-    String ip = node.getIpAddr();
-    String regHostName = node.getHostName();
-    int xferPort = node.getXferPort();
-    
-    List<String> names = new ArrayList<String>(); 
-    names.add(ip);
-    names.add(ip + ":" + xferPort);
-    names.add(regHostName);
-    names.add(regHostName + ":" + xferPort);
-
-    String peerHostName = node.getPeerHostName();
-    if (peerHostName != null) {
-      names.add(peerHostName);
-      names.add(peerHostName + ":" + xferPort);
-    }
-    return names;
-  }
-
   /**
    * Checks if name resolution was successful for the given address.  If IP
    * address and host name are the same, then it means name resolution has

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1493233&r1=1493232&r2=1493233&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Jun 14 20:17:41 2013
@@ -936,7 +936,8 @@ public class DataNode extends Configured
     MBeans.register("DataNode", "DataNodeInfo", this);
   }
   
-  int getXferPort() {
+  @VisibleForTesting
+  public int getXferPort() {
     return streamingAddr.getPort();
   }
   

Added: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/HostFileManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/HostFileManager.java?rev=1493233&view=auto
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/HostFileManager.java (added)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/HostFileManager.java Fri Jun 14 20:17:41 2013
@@ -0,0 +1,358 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.util.HostsFileReader;
+
+/**
+ * This class manages the include and exclude files for HDFS.
+ * 
+ * These files control which DataNodes the NameNode expects to see in the
+ * cluster.  Loosely speaking, the include file, if it exists and is not
+ * empty, is a list of everything we expect to see.  The exclude file is 
+ * a list of everything we want to ignore if we do see it.
+ *
+ * Entries may or may not specify a port.  If they don't, we consider
+ * them to apply to every DataNode on that host.  For example, putting 
+ * 192.168.0.100 in the excludes file blacklists both 192.168.0.100:5000 and
+ * 192.168.0.100:6000.  This case comes up in unit tests.
+ *
+ * When reading the hosts files, we try to find the IP address for each
+ * entry.  This is important because it allows us to de-duplicate entries.
+ * If the user specifies a node as foo.bar.com in the include file, but
+ * 192.168.0.100 in the exclude file, we need to realize that these are 
+ * the same node.  Resolving the IP address also allows us to give more
+ * information back to getDatanodeListForReport, which makes the web UI 
+ * look nicer (among other things.)  See HDFS-3934 for more details.
+ *
+ * DNS resolution can be slow.  For this reason, we ONLY do it when (re)reading
+ * the hosts files.  In all other cases, we rely on the cached values either
+ * in the DatanodeID objects, or in HostFileManager#Entry.
+ * We also don't want to be holding locks when doing this.
+ * See HDFS-3990 for more discussion of DNS overheads.
+ * 
+ * Not all entries in the hosts files will have an associated IP address. 
+ * Some entries may be "registration names."  The "registration name" of 
+ * a DataNode is either the actual hostname, or an arbitrary string configured
+ * by dfs.datanode.hostname.  It's possible to add registration names to the
+ * include or exclude files.  If we can't find an IP address associated with
+ * a host file entry, we assume it's a registered hostname and act accordingly.
+ * The "registration name" feature is a little odd and it may be removed in the
+ * future (I hope?)
+ */
+public class HostFileManager {
+  private static final Log LOG = LogFactory.getLog(HostFileManager.class);
+
+  public static class Entry {
+    /**
+     * This what the user put on the line before the colon, or the whole line
+     * if there is no colon.
+     */
+    private final String prefix;
+    
+    /**
+     * This is the port which was specified after the colon.  It is 0 if no
+     * port was given.
+     */
+    private final int port;
+
+    /**
+     * If we can resolve the IP address, this is it.  Otherwise, it is the 
+     * empty string.
+     */
+    private final String ipAddress;
+
+    /**
+     * Parse a hosts file Entry.
+     */
+    static Entry parse(String fileName, String entry) throws IOException {
+      final String prefix;
+      final int port;
+      String ipAddress = "";
+      
+      int idx = entry.indexOf(':');
+      if (-1 == idx) {
+        prefix = entry;
+        port = 0;
+      } else {
+        prefix = entry.substring(0, idx);
+        String portStr = entry.substring(idx + 1);
+        try {
+          port = Integer.valueOf(portStr);
+        } catch (NumberFormatException e) {
+          throw new IOException("unable to parse port number for " +
+              "'" + entry + "'", e);
+        }
+      }
+      try {
+        // Let's see if we can resolve this prefix to an IP address.
+        // This may fail; one example is with a registered hostname
+        // which is not actually a real DNS name.
+        InetAddress addr = InetAddress.getByName(prefix);
+        ipAddress = addr.getHostAddress();
+      } catch (UnknownHostException e) {
+        LOG.info("When reading " + fileName + ", could not look up " +
+            "IP address for " + prefix + ".  We will assume this is a " +
+            "registration name.", e);
+      }
+      return new Entry(prefix, port, ipAddress);
+    }
+
+    public String getIdentifier() {
+      return ipAddress.isEmpty() ? prefix : ipAddress;
+    }
+
+    public Entry(String prefix, int port, String ipAddress) {
+      this.prefix = prefix;
+      this.port = port;
+      this.ipAddress = ipAddress;
+    }
+
+    public String getPrefix() {
+      return prefix;
+    }
+
+    public int getPort() {
+      return port;
+    }
+
+    public String getIpAddress() {
+      return ipAddress;
+    }
+
+    public String toString() {
+      StringBuilder bld = new StringBuilder();
+      bld.append("Entry{").append(prefix).append(", port=").
+          append(port).append(", ipAddress=").append(ipAddress).append("}");
+      return bld.toString();
+    }
+  }
+
+  public static class EntrySet implements Iterable<Entry> {
+    /**
+     * The index.  Each Entry appears in here exactly once.
+     *
+     * It may be indexed by one of:
+     *     ipAddress:port
+     *     ipAddress
+     *     registeredHostname:port
+     *     registeredHostname
+     *     
+     * The different indexing strategies reflect the fact that we may or may
+     * not have a port or IP address for each entry.
+     */
+    TreeMap<String, Entry> index = new TreeMap<String, Entry>();
+
+    public boolean isEmpty() {
+      return index.isEmpty();
+    }
+
+    public Entry find(DatanodeID datanodeID) {
+      Entry entry;
+      int xferPort = datanodeID.getXferPort();
+      assert(xferPort > 0);
+      String datanodeIpAddr = datanodeID.getIpAddr();
+      if (datanodeIpAddr != null) {
+        entry = index.get(datanodeIpAddr + ":" + xferPort);
+        if (entry != null) {
+          return entry;
+        }
+        entry = index.get(datanodeIpAddr);
+        if (entry != null) {
+          return entry;
+        }
+      }
+      String registeredHostName = datanodeID.getHostName();
+      if (registeredHostName != null) {
+        entry = index.get(registeredHostName + ":" + xferPort);
+        if (entry != null) {
+          return entry;
+        }
+        entry = index.get(registeredHostName);
+        if (entry != null) {
+          return entry;
+        }
+      }
+      return null;
+    }
+
+    public Entry find(Entry toFind) {
+      int port = toFind.getPort();
+      if (port != 0) {
+        return index.get(toFind.getIdentifier() + ":" + port);
+      } else {
+        // An Entry with no port matches any entry with the same identifer.
+        // In other words, we treat 0 as "any port."
+        Map.Entry<String, Entry> ceil =
+            index.ceilingEntry(toFind.getIdentifier());
+        if ((ceil != null) &&
+            (ceil.getValue().getIdentifier().equals(
+                toFind.getIdentifier()))) {
+          return ceil.getValue();
+        }
+        return null;
+      }
+    }
+
+    public String toString() {
+      StringBuilder bld = new StringBuilder();
+      
+      bld.append("HostSet(");
+      for (Map.Entry<String, Entry> entry : index.entrySet()) {
+        bld.append("\n\t");
+        bld.append(entry.getKey()).append("->").
+            append(entry.getValue().toString());
+      }
+      bld.append("\n)");
+      return bld.toString();
+    }
+
+    @Override
+    public Iterator<Entry> iterator() {
+      return index.values().iterator();
+    }
+  }
+
+  public static class MutableEntrySet extends EntrySet {
+    public void add(DatanodeID datanodeID) {
+      Entry entry = new Entry(datanodeID.getHostName(),
+          datanodeID.getXferPort(), datanodeID.getIpAddr());
+      index.put(datanodeID.getIpAddr() + ":" + datanodeID.getXferPort(),
+          entry);
+    }
+
+    public void add(Entry entry) {
+      int port = entry.getPort();
+      if (port != 0) {
+        index.put(entry.getIdentifier() + ":" + port, entry);
+      } else {
+        index.put(entry.getIdentifier(), entry);
+      }
+    }
+
+    void readFile(String type, String filename) throws IOException {
+      if (filename.isEmpty()) {
+        return;
+      }
+      HashSet<String> entrySet = new HashSet<String>();
+      HostsFileReader.readFileToSet(type, filename, entrySet);
+      for (String str : entrySet) {
+        Entry entry = Entry.parse(filename, str);
+        add(entry);
+      }
+    }
+  }
+
+  private EntrySet includes = new EntrySet();
+  private EntrySet excludes = new EntrySet();
+
+  public HostFileManager() {
+  }
+
+  public void refresh(String includeFile, String excludeFile)
+      throws IOException {
+    MutableEntrySet newIncludes = new MutableEntrySet();
+    IOException includeException = null;
+    try {
+      newIncludes.readFile("included", includeFile);
+    } catch (IOException e) {
+      includeException = e;
+    }
+    MutableEntrySet newExcludes = new MutableEntrySet();
+    IOException excludeException = null;
+    try {
+      newExcludes.readFile("excluded", excludeFile);
+    } catch (IOException e) {
+      excludeException = e;
+    }
+    synchronized(this) {
+      if (includeException == null) {
+        includes = newIncludes;
+      }
+      if (excludeException == null) {
+        excludes = newExcludes;
+      }
+    }
+    if (includeException == null) {
+      LOG.info("read includes:\n" + newIncludes);
+    } else {
+      LOG.error("failed to read include file '" + includeFile + "'. " +
+          "Continuing to use previous include list.",
+          includeException);
+    }
+    if (excludeException == null) {
+      LOG.info("read excludes:\n" + newExcludes);
+    } else {
+      LOG.error("failed to read exclude file '" + excludeFile + "'." +
+          "Continuing to use previous exclude list.",
+          excludeException);
+    }
+    if (includeException != null) {
+      throw new IOException("error reading hosts file " + includeFile,
+          includeException);
+    }
+    if (excludeException != null) {
+      throw new IOException("error reading exclude file " + excludeFile,
+          excludeException);
+    }
+  }
+
+  public synchronized boolean isIncluded(DatanodeID dn) {
+    if (includes.isEmpty()) {
+      // If the includes list is empty, act as if everything is in the
+      // includes list.
+      return true;
+    } else {
+      return includes.find(dn) != null;
+    }
+  }
+
+  public synchronized boolean isExcluded(DatanodeID dn) {
+    return excludes.find(dn) != null;
+  }
+
+  public synchronized boolean hasIncludes() {
+    return !includes.isEmpty();
+  }
+
+  /**
+   * @return          the includes as an immutable set.
+   */
+  public synchronized EntrySet getIncludes() {
+    return includes;
+  }
+
+  /**
+   * @return          the excludes as an immutable set.
+   */
+  public synchronized EntrySet getExcludes() {
+    return excludes;
+  }
+}

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java?rev=1493233&r1=1493232&r2=1493233&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java Fri Jun 14 20:17:41 2013
@@ -223,6 +223,7 @@ public class TestDatanodeRegistration {
       
       DatanodeRegistration mockDnReg = mock(DatanodeRegistration.class);
       doReturn(HdfsConstants.LAYOUT_VERSION).when(mockDnReg).getVersion();
+      doReturn(123).when(mockDnReg).getXferPort();
       doReturn("fake-storage-id").when(mockDnReg).getStorageID();
       doReturn(mockStorageInfo).when(mockDnReg).getStorageInfo();
       
@@ -270,12 +271,14 @@ public class TestDatanodeRegistration {
       
       DatanodeRegistration mockDnReg = mock(DatanodeRegistration.class);
       doReturn(HdfsConstants.LAYOUT_VERSION).when(mockDnReg).getVersion();
+      doReturn(123).when(mockDnReg).getXferPort();
       doReturn("fake-storage-id").when(mockDnReg).getStorageID();
       doReturn(mockStorageInfo).when(mockDnReg).getStorageInfo();
       
       // Should succeed when software versions are the same and CTimes are the
       // same.
       doReturn(VersionInfo.getVersion()).when(mockDnReg).getSoftwareVersion();
+      doReturn(123).when(mockDnReg).getXferPort();
       rpcServer.registerDatanode(mockDnReg);
       
       // Should succeed when software versions are the same and CTimes are

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java?rev=1493233&r1=1493232&r2=1493233&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java Fri Jun 14 20:17:41 2013
@@ -24,7 +24,9 @@ import static org.junit.Assert.assertTru
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Random;
 
 import org.apache.commons.logging.Log;
@@ -41,9 +43,11 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.HostFileManager;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -327,7 +331,7 @@ public class TestDecommission {
   /**
    * Tests decommission for non federated cluster
    */
-  @Test
+  @Test(timeout=360000)
   public void testDecommission() throws IOException {
     testDecommission(1, 6);
   }
@@ -335,7 +339,7 @@ public class TestDecommission {
   /**
    * Tests recommission for non federated cluster
    */
-  @Test
+  @Test(timeout=360000)
   public void testRecommission() throws IOException {
     testRecommission(1, 6);
   }
@@ -343,7 +347,7 @@ public class TestDecommission {
   /**
    * Test decommission for federeated cluster
    */
-  @Test
+  @Test(timeout=360000)
   public void testDecommissionFederation() throws IOException {
     testDecommission(2, 2);
   }
@@ -445,7 +449,7 @@ public class TestDecommission {
    * Tests cluster storage statistics during decommissioning for non
    * federated cluster
    */
-  @Test
+  @Test(timeout=360000)
   public void testClusterStats() throws Exception {
     testClusterStats(1);
   }
@@ -454,7 +458,7 @@ public class TestDecommission {
    * Tests cluster storage statistics during decommissioning for
    * federated cluster
    */
-  @Test
+  @Test(timeout=360000)
   public void testClusterStatsFederation() throws Exception {
     testClusterStats(3);
   }
@@ -491,7 +495,7 @@ public class TestDecommission {
    * in the include file are allowed to connect to the namenode in a non
    * federated cluster.
    */
-  @Test
+  @Test(timeout=360000)
   public void testHostsFile() throws IOException, InterruptedException {
     // Test for a single namenode cluster
     testHostsFile(1);
@@ -502,7 +506,7 @@ public class TestDecommission {
    * in the include file are allowed to connect to the namenode in a 
    * federated cluster.
    */
-  @Test
+  @Test(timeout=360000)
   public void testHostsFileFederation() throws IOException, InterruptedException {
     // Test for 3 namenode federated cluster
     testHostsFile(3);
@@ -519,8 +523,8 @@ public class TestDecommission {
     // Now empty hosts file and ensure the datanode is disallowed
     // from talking to namenode, resulting in it's shutdown.
     ArrayList<String>list = new ArrayList<String>();
-    final String badHostname = "BOGUSHOST";
-    list.add(badHostname);
+    final String bogusIp = "127.0.30.1";
+    list.add(bogusIp);
     writeConfigFile(hostsFile, list);
     
     for (int j = 0; j < numNameNodes; j++) {
@@ -544,7 +548,150 @@ public class TestDecommission {
       assertEquals("There should be 2 dead nodes", 2, info.length);
       DatanodeID id = cluster.getDataNodes().get(0).getDatanodeId();
       assertEquals(id.getHostName(), info[0].getHostName());
-      assertEquals(badHostname, info[1].getHostName());
+      assertEquals(bogusIp, info[1].getHostName());
+    }
+  }
+
+  @Test(timeout=360000)
+  public void testDuplicateHostsEntries() throws IOException,
+      InterruptedException {
+    Configuration hdfsConf = new Configuration(conf);
+    cluster = new MiniDFSCluster.Builder(hdfsConf)
+        .numDataNodes(1).setupHostsFile(true).build();
+    cluster.waitActive();
+    int dnPort = cluster.getDataNodes().get(0).getXferPort();
+
+    // pick some random ports that don't overlap with our DN's port
+    // or with each other.
+    Random random = new Random(System.currentTimeMillis());
+    int port1 = dnPort;
+    while (port1 == dnPort) {
+      port1 = random.nextInt(6000) + 1000;
+    }
+    int port2 = dnPort;
+    while ((port2 == dnPort) || (port2 == port1)) {
+      port2 = random.nextInt(6000) + 1000;
+    }
+
+    // Now empty hosts file and ensure the datanode is disallowed
+    // from talking to namenode, resulting in it's shutdown.
+    ArrayList<String> nodes = new ArrayList<String>();
+
+    // These entries will be de-duped by the NameNode, since they refer
+    // to the same IP address + port combo.
+    nodes.add("127.0.0.1:" + port1);
+    nodes.add("localhost:" + port1);
+    nodes.add("127.0.0.1:" + port1);
+
+    // The following entries should not be de-duped.
+    nodes.add("127.0.0.1:" + port2);
+    nodes.add("127.0.30.1:" + port1);
+    writeConfigFile(hostsFile,  nodes);
+
+    refreshNodes(cluster.getNamesystem(0), hdfsConf);
+
+    DFSClient client = getDfsClient(cluster.getNameNode(0), hdfsConf);
+    DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
+    for (int i = 0 ; i < 5 && info.length != 0; i++) {
+      LOG.info("Waiting for datanode to be marked dead");
+      Thread.sleep(HEARTBEAT_INTERVAL * 1000);
+      info = client.datanodeReport(DatanodeReportType.LIVE);
+    }
+    assertEquals("Number of live nodes should be 0", 0, info.length);
+
+    // Test that non-live and bogus hostnames are considered "dead".
+    // The dead report should have an entry for (1) the DN  that is
+    // now considered dead because it is no longer allowed to connect
+    // and (2) the bogus entries in the hosts file.
+    DatanodeInfo deadDns[] = client.datanodeReport(DatanodeReportType.DEAD);
+    HashMap<String, DatanodeInfo> deadByXferAddr =
+        new HashMap<String, DatanodeInfo>();
+    for (DatanodeInfo dn : deadDns) {
+      LOG.info("DEAD DatanodeInfo: xferAddr = " + dn.getXferAddr() +
+          ", ipAddr = " + dn.getIpAddr() +
+          ", hostname = " + dn.getHostName());
+      deadByXferAddr.put(dn.getXferAddr(), dn);
+    }
+    // The real DataNode should be included in the list.
+    String realDnIpPort = cluster.getDataNodes().get(0).
+        getXferAddress().getAddress().getHostAddress() + ":" +
+        cluster.getDataNodes().get(0).getXferPort();
+    Assert.assertNotNull("failed to find real datanode IP " + realDnIpPort,
+        deadByXferAddr.remove(realDnIpPort));
+    // The fake datanode with address 127.0.30.1 should be included in this list.
+    Assert.assertNotNull(deadByXferAddr.remove(
+        "127.0.30.1:" + port1));
+    // Now look for the two copies of 127.0.0.1 with port1 and port2.
+    Iterator<Map.Entry<String, DatanodeInfo>> iter =
+            deadByXferAddr.entrySet().iterator();
+    boolean foundPort1 = false, foundPort2 = false;
+    while (iter.hasNext()) {
+      Map.Entry<String, DatanodeInfo> entry = iter.next();
+      DatanodeInfo dn = entry.getValue();
+      if (dn.getXferPort() == port1) {
+        foundPort1 = true;
+        iter.remove();
+      } else if (dn.getXferPort() == port2) {
+        foundPort2 = true;
+        iter.remove();
+      }
+    }
+    Assert.assertTrue("did not find a dead entry with port " + port1,
+        foundPort1);
+    Assert.assertTrue("did not find a dead entry with port " + port2,
+        foundPort2);
+    Assert.assertTrue(deadByXferAddr.isEmpty());
+  }
+
+  @Test(timeout=360000)
+  public void testIncludeByRegistrationName() throws IOException,
+      InterruptedException {
+    Configuration hdfsConf = new Configuration(conf);
+    final String registrationName = "--registration-name--";
+    final String nonExistentDn = "127.0.0.40";
+    hdfsConf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, registrationName);
+    cluster = new MiniDFSCluster.Builder(hdfsConf)
+        .numDataNodes(1).checkDataNodeHostConfig(true)
+        .setupHostsFile(true).build();
+    cluster.waitActive();
+
+    // Set up an includes file that doesn't have our datanode.
+    ArrayList<String> nodes = new ArrayList<String>();
+    nodes.add(nonExistentDn);
+    writeConfigFile(hostsFile,  nodes);
+    refreshNodes(cluster.getNamesystem(0), hdfsConf);
+
+    // Wait for the DN to be marked dead.
+    DFSClient client = getDfsClient(cluster.getNameNode(0), hdfsConf);
+    while (true) {
+      DatanodeInfo info[] = client.datanodeReport(DatanodeReportType.DEAD);
+      if (info.length == 1) {
+        break;
+      }
+      LOG.info("Waiting for datanode to be marked dead");
+      Thread.sleep(HEARTBEAT_INTERVAL * 1000);
+    }
+
+    // Use a non-empty include file with our registration name.
+    // It should work.
+    int dnPort = cluster.getDataNodes().get(0).getXferPort();
+    nodes = new ArrayList<String>();
+    nodes.add(registrationName + ":" + dnPort);
+    writeConfigFile(hostsFile,  nodes);
+    refreshNodes(cluster.getNamesystem(0), hdfsConf);
+    cluster.restartDataNode(0);
+
+    // Wait for the DN to come back.
+    while (true) {
+      DatanodeInfo info[] = client.datanodeReport(DatanodeReportType.LIVE);
+      if (info.length == 1) {
+        Assert.assertFalse(info[0].isDecommissioned());
+        Assert.assertFalse(info[0].isDecommissionInProgress());
+        assertEquals(registrationName, info[0].getHostName());
+        break;
+      }
+      LOG.info("Waiting for datanode to come back");
+      Thread.sleep(HEARTBEAT_INTERVAL * 1000);
     }
   }
 }