You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by mi...@apache.org on 2017/05/02 13:53:50 UTC

[2/2] hadoop git commit: HDFS-9005. Provide configuration support for upgrade domain.

HDFS-9005. Provide configuration support for upgrade domain.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c4c55332
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c4c55332
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c4c55332

Branch: refs/heads/branch-2.8
Commit: c4c5533216eaaa64731a6f0c1bc9be9b1e91f7d6
Parents: c9bf21b
Author: Ming Ma <mi...@twitter.com>
Authored: Tue May 2 06:53:32 2017 -0700
Committer: Ming Ma <mi...@twitter.com>
Committed: Tue May 2 06:53:32 2017 -0700

----------------------------------------------------------------------
 .../hdfs/protocol/DatanodeAdminProperties.java  | 100 ++++++++
 .../apache/hadoop/hdfs/protocol/DatanodeID.java |   6 +
 .../hdfs/util/CombinedHostsFileReader.java      |  76 ++++++
 .../hdfs/util/CombinedHostsFileWriter.java      |  69 +++++
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   4 +-
 .../CombinedHostFileManager.java                | 250 +++++++++++++++++++
 .../server/blockmanagement/DatanodeManager.java |  59 +++--
 .../blockmanagement/HostConfigManager.java      |  80 ++++++
 .../server/blockmanagement/HostFileManager.java | 147 +++--------
 .../hdfs/server/blockmanagement/HostSet.java    | 114 +++++++++
 .../src/main/resources/hdfs-default.xml         |  15 ++
 .../src/site/markdown/HdfsUserGuide.md          |   6 +-
 .../apache/hadoop/hdfs/TestDatanodeReport.java  |  56 ++++-
 .../TestBlocksWithNotEnoughRacks.java           |  33 +--
 .../blockmanagement/TestDatanodeManager.java    |   8 +-
 .../blockmanagement/TestHostFileManager.java    |  10 +-
 .../hdfs/server/namenode/TestHostsFiles.java    |  70 +++---
 .../server/namenode/TestNameNodeMXBean.java     |  25 +-
 .../hdfs/server/namenode/TestStartup.java       |  53 +---
 .../TestUpgradeDomainBlockPlacementPolicy.java  | 169 +++++++++++++
 .../hadoop/hdfs/util/HostsFileWriter.java       | 122 +++++++++
 .../hdfs/util/TestCombinedHostsFileReader.java  |  79 ++++++
 .../src/test/resources/dfs.hosts.json           |   5 +
 23 files changed, 1291 insertions(+), 265 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeAdminProperties.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeAdminProperties.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeAdminProperties.java
new file mode 100644
index 0000000..9f7b983
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeAdminProperties.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
+
+/**
+ * The class describes the configured admin properties for a datanode.
+ *
+ * It is the static configuration specified by administrators via dfsadmin
+ * command; different from the runtime state. CombinedHostFileManager uses
+ * the class to deserialize the configurations from json-based file format.
+ *
+ * To decommission a node, use AdminStates.DECOMMISSIONED.
+ */
+public class DatanodeAdminProperties {
+  private String hostName;
+  private int port;
+  private String upgradeDomain;
+  private AdminStates adminState = AdminStates.NORMAL;
+
+  /**
+   * Return the host name of the datanode.
+   * @return the host name of the datanode.
+   */
+  public String getHostName() {
+    return hostName;
+  }
+
+  /**
+   * Set the host name of the datanode.
+   * @param hostName the host name of the datanode.
+   */
+  public void setHostName(final String hostName) {
+    this.hostName = hostName;
+  }
+
+  /**
+   * Get the port number of the datanode.
+   * @return the port number of the datanode.
+   */
+  public int getPort() {
+    return port;
+  }
+
+  /**
+   * Set the port number of the datanode.
+   * @param port the port number of the datanode.
+   */
+  public void setPort(final int port) {
+    this.port = port;
+  }
+
+  /**
+   * Get the upgrade domain of the datanode.
+   * @return the upgrade domain of the datanode.
+   */
+  public String getUpgradeDomain() {
+    return upgradeDomain;
+  }
+
+  /**
+   * Set the upgrade domain of the datanode.
+   * @param upgradeDomain the upgrade domain of the datanode.
+   */
+  public void setUpgradeDomain(final String upgradeDomain) {
+    this.upgradeDomain = upgradeDomain;
+  }
+
+  /**
+   * Get the admin state of the datanode.
+   * @return the admin state of the datanode.
+   */
+  public AdminStates getAdminState() {
+    return adminState;
+  }
+
+  /**
+   * Set the admin state of the datanode.
+   * @param adminState the admin state of the datanode.
+   */
+  public void setAdminState(final AdminStates adminState) {
+    this.adminState = adminState;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
index 86782f2..e94c07d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.classification.InterfaceStability;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import java.net.InetSocketAddress;
+
 /**
  * This class represents the primary identifier for a Datanode.
  * Datanodes are identified by how they can be contacted (hostname
@@ -272,4 +274,8 @@ public class DatanodeID implements Comparable<DatanodeID> {
   public int compareTo(DatanodeID that) {
     return getXferAddr().compareTo(that.getXferAddr());
   }
+
+  public InetSocketAddress getResolvedAddress() {
+    return new InetSocketAddress(this.getIpAddr(), this.getXferPort());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/CombinedHostsFileReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/CombinedHostsFileReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/CombinedHostsFileReader.java
new file mode 100644
index 0000000..33acb91
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/CombinedHostsFileReader.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.util;
+
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.io.IOException;
+import java.io.Reader;
+
+import java.util.Iterator;
+import java.util.Set;
+import java.util.HashSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties;
+
+/**
+ * Reader support for JSON based datanode configuration, an alternative
+ * to the exclude/include files configuration.
+ * The JSON file format is the array of elements where each element
+ * in the array describes the properties of a datanode. The properties of
+ * a datanode is defined in {@link DatanodeAdminProperties}. For example,
+ *
+ * {"hostName": "host1"}
+ * {"hostName": "host2", "port": 50, "upgradeDomain": "ud0"}
+ * {"hostName": "host3", "port": 0, "adminState": "DECOMMISSIONED"}
+ */
+@InterfaceAudience.LimitedPrivate({"HDFS"})
+@InterfaceStability.Unstable
+public final class CombinedHostsFileReader {
+  private CombinedHostsFileReader() {
+  }
+
+  /**
+   * Deserialize a set of DatanodeAdminProperties from a json file.
+   * @param hostsFile the input json file to read from.
+   * @return the set of DatanodeAdminProperties
+   * @throws IOException
+   */
+  public static Set<DatanodeAdminProperties>
+      readFile(final String hostsFile) throws IOException {
+    HashSet<DatanodeAdminProperties> allDNs = new HashSet<>();
+    ObjectMapper mapper = new ObjectMapper();
+    try (Reader input =
+         new InputStreamReader(new FileInputStream(hostsFile), "UTF-8")) {
+      Iterator<DatanodeAdminProperties> iterator =
+          mapper.readValues(new JsonFactory().createJsonParser(input),
+              DatanodeAdminProperties.class);
+      while (iterator.hasNext()) {
+        DatanodeAdminProperties properties = iterator.next();
+        allDNs.add(properties);
+      }
+    }
+    return allDNs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/CombinedHostsFileWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/CombinedHostsFileWriter.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/CombinedHostsFileWriter.java
new file mode 100644
index 0000000..ea70be2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/CombinedHostsFileWriter.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.util;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties;
+
+/**
+ * Writer support for JSON based datanode configuration, an alternative
+ * to the exclude/include files configuration.
+ * The JSON file format is the array of elements where each element
+ * in the array describes the properties of a datanode. The properties of
+ * a datanode is defined in {@link DatanodeAdminProperties}. For example,
+ *
+ * {"hostName": "host1"}
+ * {"hostName": "host2", "port": 50, "upgradeDomain": "ud0"}
+ * {"hostName": "host3", "port": 0, "adminState": "DECOMMISSIONED"}
+ */
+@InterfaceAudience.LimitedPrivate({"HDFS"})
+@InterfaceStability.Unstable
+public final class CombinedHostsFileWriter {
+  private CombinedHostsFileWriter() {
+  }
+
+  /**
+   * Serialize a set of DatanodeAdminProperties to a json file.
+   * @param hostsFile the json file name.
+   * @param allDNs the set of DatanodeAdminProperties
+   * @throws IOException
+   */
+  public static void writeFile(final String hostsFile,
+      final Set<DatanodeAdminProperties> allDNs) throws IOException {
+    StringBuilder configs = new StringBuilder();
+    try (Writer output =
+       new OutputStreamWriter(new FileOutputStream(hostsFile), "UTF-8")) {
+      for (DatanodeAdminProperties datanodeAdminProperties: allDNs) {
+        ObjectMapper mapper = new ObjectMapper();
+        configs.append(mapper.writeValueAsString(datanodeAdminProperties));
+      }
+      output.write(configs.toString());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index b62010b..51a75cd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -426,12 +426,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_METRICS_PERCENTILES_INTERVALS_KEY = "dfs.metrics.percentiles.intervals";
   public static final String  DFS_DATANODE_HOST_NAME_KEY =
       HdfsClientConfigKeys.DeprecatedKeys.DFS_DATANODE_HOST_NAME_KEY;
-  public static final String  DFS_NAMENODE_HOSTS_KEY = "dfs.namenode.hosts";
-  public static final String  DFS_NAMENODE_HOSTS_EXCLUDE_KEY = "dfs.namenode.hosts.exclude";
   public static final String  DFS_NAMENODE_CHECKPOINT_DIR_KEY =
       HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY;
   public static final String  DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY =
       HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY;
+  public static final String  DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY =
+      "dfs.namenode.hosts.provider.classname";
   public static final String  DFS_HOSTS = "dfs.hosts";
   public static final String  DFS_HOSTS_EXCLUDE = "dfs.hosts.exclude";
   public static final String  DFS_NAMENODE_AUDIT_LOGGERS_KEY = "dfs.namenode.audit.loggers";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CombinedHostFileManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CombinedHostFileManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CombinedHostFileManager.java
new file mode 100644
index 0000000..3e913b9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CombinedHostFileManager.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.UnmodifiableIterator;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Collections2;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
+
+import java.io.IOException;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Predicate;
+
+import org.apache.hadoop.hdfs.util.CombinedHostsFileReader;
+
+/**
+ * This class manages datanode configuration using a json file.
+ * Please refer to {@link CombinedHostsFileReader} for the json format.
+ * <p/>
+ * <p/>
+ * Entries may or may not specify a port.  If they don't, we consider
+ * them to apply to every DataNode on that host. The code canonicalizes the
+ * entries into IP addresses.
+ * <p/>
+ * <p/>
+ * The code ignores all entries that the DNS fails to resolve their IP
+ * addresses. This is okay because by default the NN rejects the registrations
+ * of DNs when it fails to do a forward and reverse lookup. Note that DNS
+ * resolutions are only done during the loading time to minimize the latency.
+ */
+public class CombinedHostFileManager extends HostConfigManager {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      CombinedHostFileManager.class);
+  private Configuration conf;
+  private HostProperties hostProperties = new HostProperties();
+
+  static class HostProperties {
+    private Multimap<InetAddress, DatanodeAdminProperties> allDNs =
+        HashMultimap.create();
+    // optimization. If every node in the file isn't in service, it implies
+    // any node is allowed to register with nn. This is equivalent to having
+    // an empty "include" file.
+    private boolean emptyInServiceNodeLists = true;
+    synchronized void add(InetAddress addr,
+        DatanodeAdminProperties properties) {
+      allDNs.put(addr, properties);
+      if (properties.getAdminState().equals(
+          AdminStates.NORMAL)) {
+        emptyInServiceNodeLists = false;
+      }
+    }
+
+    // If the includes list is empty, act as if everything is in the
+    // includes list.
+    synchronized boolean isIncluded(final InetSocketAddress address) {
+      return emptyInServiceNodeLists || Iterables.any(
+          allDNs.get(address.getAddress()),
+          new Predicate<DatanodeAdminProperties>() {
+            public boolean apply(DatanodeAdminProperties input) {
+              return input.getPort() == 0 ||
+                  input.getPort() == address.getPort();
+            }
+          });
+    }
+
+    synchronized boolean isExcluded(final InetSocketAddress address) {
+      return Iterables.any(allDNs.get(address.getAddress()),
+          new Predicate<DatanodeAdminProperties>() {
+            public boolean apply(DatanodeAdminProperties input) {
+              return input.getAdminState().equals(
+                  AdminStates.DECOMMISSIONED) &&
+                  (input.getPort() == 0 ||
+                      input.getPort() == address.getPort());
+            }
+          });
+    }
+
+    synchronized String getUpgradeDomain(final InetSocketAddress address) {
+      Iterable<DatanodeAdminProperties> datanode = Iterables.filter(
+          allDNs.get(address.getAddress()),
+          new Predicate<DatanodeAdminProperties>() {
+            public boolean apply(DatanodeAdminProperties input) {
+              return (input.getPort() == 0 ||
+                  input.getPort() == address.getPort());
+            }
+          });
+      return datanode.iterator().hasNext() ?
+          datanode.iterator().next().getUpgradeDomain() : null;
+    }
+
+    Iterable<InetSocketAddress> getIncludes() {
+      return new Iterable<InetSocketAddress>() {
+        @Override
+        public Iterator<InetSocketAddress> iterator() {
+            return new HostIterator(allDNs.entries());
+        }
+      };
+    }
+
+    Iterable<InetSocketAddress> getExcludes() {
+      return new Iterable<InetSocketAddress>() {
+        @Override
+        public Iterator<InetSocketAddress> iterator() {
+          return new HostIterator(
+              Collections2.filter(allDNs.entries(),
+                  new Predicate<java.util.Map.Entry<InetAddress,
+                      DatanodeAdminProperties>>() {
+                    public boolean apply(java.util.Map.Entry<InetAddress,
+                        DatanodeAdminProperties> entry) {
+                      return entry.getValue().getAdminState().equals(
+                          AdminStates.DECOMMISSIONED);
+                    }
+                  }
+              ));
+        }
+      };
+    }
+
+    static class HostIterator extends UnmodifiableIterator<InetSocketAddress> {
+      private final Iterator<Map.Entry<InetAddress,
+          DatanodeAdminProperties>> it;
+      public HostIterator(Collection<java.util.Map.Entry<InetAddress,
+          DatanodeAdminProperties>> nodes) {
+        this.it = nodes.iterator();
+      }
+      @Override
+      public boolean hasNext() {
+        return it.hasNext();
+      }
+
+      @Override
+      public InetSocketAddress next() {
+        Map.Entry<InetAddress, DatanodeAdminProperties> e = it.next();
+        return new InetSocketAddress(e.getKey(), e.getValue().getPort());
+      }
+    }
+  }
+
+  @Override
+  public Iterable<InetSocketAddress> getIncludes() {
+    return hostProperties.getIncludes();
+  }
+
+  @Override
+  public Iterable<InetSocketAddress> getExcludes() {
+    return hostProperties.getExcludes();
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void refresh() throws IOException {
+    refresh(conf.get(DFSConfigKeys.DFS_HOSTS, ""));
+  }
+  private void refresh(final String hostsFile) throws IOException {
+    HostProperties hostProps = new HostProperties();
+    Set<DatanodeAdminProperties> all =
+        CombinedHostsFileReader.readFile(hostsFile);
+    for(DatanodeAdminProperties properties : all) {
+      InetSocketAddress addr = parseEntry(hostsFile,
+          properties.getHostName(), properties.getPort());
+      if (addr != null) {
+        hostProps.add(addr.getAddress(), properties);
+      }
+    }
+    refresh(hostProps);
+  }
+
+  @VisibleForTesting
+  static InetSocketAddress parseEntry(final String fn, final String hostName,
+      final int port) {
+    InetSocketAddress addr = new InetSocketAddress(hostName, port);
+    if (addr.isUnresolved()) {
+      LOG.warn("Failed to resolve {} in {}. ", hostName, fn);
+      return null;
+    }
+    return addr;
+  }
+
+  @Override
+  public synchronized boolean isIncluded(final DatanodeID dn) {
+    return hostProperties.isIncluded(dn.getResolvedAddress());
+  }
+
+  @Override
+  public synchronized boolean isExcluded(final DatanodeID dn) {
+    return isExcluded(dn.getResolvedAddress());
+  }
+
+  private boolean isExcluded(final InetSocketAddress address) {
+    return hostProperties.isExcluded(address);
+  }
+
+  @Override
+  public synchronized String getUpgradeDomain(final DatanodeID dn) {
+    return hostProperties.getUpgradeDomain(dn.getResolvedAddress());
+  }
+
+  /**
+   * Set the properties lists by the new instances. The
+   * old instance is discarded.
+   * @param hostProperties the new properties list
+   */
+  @VisibleForTesting
+  private void refresh(final HostProperties hostProperties) {
+    synchronized (this) {
+      this.hostProperties = hostProperties;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index ef4a47a..eeda5b1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -105,7 +105,7 @@ public class DatanodeManager {
   private final int defaultIpcPort;
 
   /** Read include/exclude files*/
-  private final HostFileManager hostFileManager = new HostFileManager();
+  private HostConfigManager hostConfigManager;
 
   /** The period to wait for datanode heartbeat.*/
   private long heartbeatExpireInterval;
@@ -198,9 +198,11 @@ public class DatanodeManager {
     this.defaultIpcPort = NetUtils.createSocketAddr(
           conf.getTrimmed(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY,
               DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_DEFAULT)).getPort();
+    this.hostConfigManager = ReflectionUtils.newInstance(
+        conf.getClass(DFSConfigKeys.DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY,
+            HostFileManager.class, HostConfigManager.class), conf);
     try {
-      this.hostFileManager.refresh(conf.get(DFSConfigKeys.DFS_HOSTS, ""),
-        conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
+      this.hostConfigManager.refresh();
     } catch (IOException e) {
       LOG.error("error reading hosts files: ", e);
     }
@@ -218,7 +220,7 @@ public class DatanodeManager {
     // in the cache; so future calls to resolve will be fast.
     if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
       final ArrayList<String> locations = new ArrayList<>();
-      for (InetSocketAddress addr : hostFileManager.getIncludes()) {
+      for (InetSocketAddress addr : hostConfigManager.getIncludes()) {
         locations.add(addr.getAddress().getHostAddress());
       }
       dnsToSwitchMapping.resolve(locations);
@@ -331,8 +333,8 @@ public class DatanodeManager {
     return decomManager;
   }
 
-  HostFileManager getHostFileManager() {
-    return hostFileManager;
+  public HostConfigManager getHostConfigManager() {
+    return hostConfigManager;
   }
 
   @VisibleForTesting
@@ -622,6 +624,7 @@ public class DatanodeManager {
     networktopology.add(node); // may throw InvalidTopologyException
     host2DatanodeMap.add(node);
     checkIfClusterIsNowMultiRack(node);
+    resolveUpgradeDomain(node);
     blockManager.getBlockReportLeaseManager().register(node);
 
     if (LOG.isDebugEnabled()) {
@@ -706,7 +709,14 @@ public class DatanodeManager {
       return new HashMap<> (this.datanodesSoftwareVersions);
     }
   }
-  
+
+  void resolveUpgradeDomain(DatanodeDescriptor node) {
+    String upgradeDomain = hostConfigManager.getUpgradeDomain(node);
+    if (upgradeDomain != null && upgradeDomain.length() > 0) {
+      node.setUpgradeDomain(upgradeDomain);
+    }
+  }
+
   /**
    *  Resolve a node's network location. If the DNS to switch mapping fails 
    *  then this method guarantees default rack location. 
@@ -836,7 +846,7 @@ public class DatanodeManager {
    */
   void startDecommissioningIfExcluded(DatanodeDescriptor nodeReg) {
     // If the registered node is in exclude list, then decommission it
-    if (getHostFileManager().isExcluded(nodeReg)) {
+    if (getHostConfigManager().isExcluded(nodeReg)) {
       decomManager.startDecommission(nodeReg);
     }
   }
@@ -876,7 +886,7 @@ public class DatanodeManager {
   
       // Checks if the node is not on the hosts list.  If it is not, then
       // it will be disallowed from registering. 
-      if (!hostFileManager.isIncluded(nodeReg)) {
+      if (!hostConfigManager.isIncluded(nodeReg)) {
         throw new DisallowedDatanodeException(nodeReg);
       }
         
@@ -944,7 +954,8 @@ public class DatanodeManager {
                 getNetworkDependenciesWithDefault(nodeS));
           }
           getNetworkTopology().add(nodeS);
-            
+          resolveUpgradeDomain(nodeS);
+
           // also treat the registration message as a heartbeat
           heartbeatManager.register(nodeS);
           incrementVersionCount(nodeS.getSoftwareVersion());
@@ -976,7 +987,8 @@ public class DatanodeManager {
         }
         networktopology.add(nodeDescr);
         nodeDescr.setSoftwareVersion(nodeReg.getSoftwareVersion());
-  
+        resolveUpgradeDomain(nodeDescr);
+
         // register new datanode
         addDatanode(nodeDescr);
         // also treat the registration message as a heartbeat
@@ -1030,9 +1042,9 @@ public class DatanodeManager {
     // Update the file names and refresh internal includes and excludes list.
     if (conf == null) {
       conf = new HdfsConfiguration();
+      this.hostConfigManager.setConf(conf);
     }
-    this.hostFileManager.refresh(conf.get(DFSConfigKeys.DFS_HOSTS, ""),
-      conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
+    this.hostConfigManager.refresh();
   }
   
   /**
@@ -1044,15 +1056,16 @@ public class DatanodeManager {
   private void refreshDatanodes() {
     for(DatanodeDescriptor node : datanodeMap.values()) {
       // Check if not include.
-      if (!hostFileManager.isIncluded(node)) {
+      if (!hostConfigManager.isIncluded(node)) {
         node.setDisallowed(true); // case 2.
       } else {
-        if (hostFileManager.isExcluded(node)) {
+        if (hostConfigManager.isExcluded(node)) {
           decomManager.startDecommission(node); // case 3.
         } else {
           decomManager.stopDecommission(node); // case 4.
         }
       }
+      node.setUpgradeDomain(hostConfigManager.getUpgradeDomain(node));
     }
   }
 
@@ -1260,9 +1273,9 @@ public class DatanodeManager {
         type == DatanodeReportType.DECOMMISSIONING;
 
     ArrayList<DatanodeDescriptor> nodes;
-    final HostFileManager.HostSet foundNodes = new HostFileManager.HostSet();
-    final HostFileManager.HostSet includedNodes = hostFileManager.getIncludes();
-    final HostFileManager.HostSet excludedNodes = hostFileManager.getExcludes();
+    final HostSet foundNodes = new HostSet();
+    final Iterable<InetSocketAddress> includedNodes =
+        hostConfigManager.getIncludes();
 
     synchronized(datanodeMap) {
       nodes = new ArrayList<>(datanodeMap.size());
@@ -1273,11 +1286,11 @@ public class DatanodeManager {
         if (((listLiveNodes && !isDead) ||
             (listDeadNodes && isDead) ||
             (listDecommissioningNodes && isDecommissioning)) &&
-            hostFileManager.isIncluded(dn)) {
+            hostConfigManager.isIncluded(dn)) {
           nodes.add(dn);
         }
 
-        foundNodes.add(HostFileManager.resolvedAddressFromDatanodeID(dn));
+        foundNodes.add(dn.getResolvedAddress());
       }
     }
     Collections.sort(nodes);
@@ -1301,7 +1314,7 @@ public class DatanodeManager {
                 addr.getPort() == 0 ? defaultXferPort : addr.getPort(),
                 defaultInfoPort, defaultInfoSecurePort, defaultIpcPort));
         setDatanodeDead(dn);
-        if (excludedNodes.match(addr)) {
+        if (hostConfigManager.isExcluded(dn)) {
           dn.setDecommissioned();
         }
         nodes.add(dn);
@@ -1310,8 +1323,8 @@ public class DatanodeManager {
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("getDatanodeListForReport with " +
-          "includedNodes = " + hostFileManager.getIncludes() +
-          ", excludedNodes = " + hostFileManager.getExcludes() +
+          "includedNodes = " + hostConfigManager.getIncludes() +
+          ", excludedNodes = " + hostConfigManager.getExcludes() +
           ", foundNodes = " + foundNodes +
           ", nodes = " + nodes);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostConfigManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostConfigManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostConfigManager.java
new file mode 100644
index 0000000..f28ed29
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostConfigManager.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+/**
+ * This interface abstracts how datanode configuration is managed.
+ *
+ * Each implementation defines its own way to persist the configuration.
+ * For example, it can use one JSON file to store the configs for all
+ * datanodes; or it can use one file to store in-service datanodes and another
+ * file to store decommission-requested datanodes.
+ *
+ * These files control which DataNodes the NameNode expects to see in the
+ * cluster.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class HostConfigManager implements Configurable {
+
+  /**
+   * Return all the datanodes that are allowed to connect to the namenode.
+   * @return Iterable of all datanodes
+   */
+  public abstract Iterable<InetSocketAddress> getIncludes();
+
+  /**
+   * Return all datanodes that should be in decommissioned state.
+   * @return Iterable of those datanodes
+   */
+  public abstract Iterable<InetSocketAddress> getExcludes();
+
+  /**
+   * Check if a datanode is allowed to connect the namenode.
+   * @param dn the DatanodeID of the datanode
+   * @return boolean if dn is allowed to connect the namenode.
+   */
+  public abstract boolean isIncluded(DatanodeID dn);
+
+  /**
+   * Check if a datanode needs to be decommissioned.
+   * @param dn the DatanodeID of the datanode
+   * @return boolean if dn needs to be decommissioned.
+   */
+  public abstract boolean isExcluded(DatanodeID dn);
+
+  /**
+   * Reload the configuration.
+   */
+  public abstract void refresh() throws IOException;
+
+  /**
+   * Get the upgrade domain of a datanode.
+   * @param dn the DatanodeID of the datanode
+   * @return the upgrade domain of dn.
+   */
+  public abstract String getUpgradeDomain(DatanodeID dn);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostFileManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostFileManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostFileManager.java
index e05ef9a..bcfebf2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostFileManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostFileManager.java
@@ -18,28 +18,18 @@
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.UnmodifiableIterator;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.util.HostsFileReader;
 
-import javax.annotation.Nullable;
 import java.io.IOException;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.Collection;
 import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
 
 /**
  * This class manages the include and exclude files for HDFS.
@@ -59,11 +49,27 @@ import java.util.Map;
  * of DNs when it fails to do a forward and reverse lookup. Note that DNS
  * resolutions are only done during the loading time to minimize the latency.
  */
-class HostFileManager {
+public class HostFileManager extends HostConfigManager {
   private static final Log LOG = LogFactory.getLog(HostFileManager.class);
+  private Configuration conf;
   private HostSet includes = new HostSet();
   private HostSet excludes = new HostSet();
 
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void refresh() throws IOException {
+    refresh(conf.get(DFSConfigKeys.DFS_HOSTS, ""),
+        conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
+  }
   private static HostSet readFile(String type, String filename)
           throws IOException {
     HostSet res = new HostSet();
@@ -99,31 +105,37 @@ class HostFileManager {
     return null;
   }
 
-  static InetSocketAddress resolvedAddressFromDatanodeID(DatanodeID id) {
-    return new InetSocketAddress(id.getIpAddr(), id.getXferPort());
-  }
-
-  synchronized HostSet getIncludes() {
+  @Override
+  public synchronized HostSet getIncludes() {
     return includes;
   }
 
-  synchronized HostSet getExcludes() {
+  @Override
+  public synchronized HostSet getExcludes() {
     return excludes;
   }
 
   // If the includes list is empty, act as if everything is in the
   // includes list.
-  synchronized boolean isIncluded(DatanodeID dn) {
-    return includes.isEmpty() || includes.match
-            (resolvedAddressFromDatanodeID(dn));
+  @Override
+  public synchronized boolean isIncluded(DatanodeID dn) {
+    return includes.isEmpty() || includes.match(dn.getResolvedAddress());
+  }
+
+  @Override
+  public synchronized boolean isExcluded(DatanodeID dn) {
+    return isExcluded(dn.getResolvedAddress());
   }
 
-  synchronized boolean isExcluded(DatanodeID dn) {
-    return excludes.match(resolvedAddressFromDatanodeID(dn));
+  private boolean isExcluded(InetSocketAddress address) {
+    return excludes.match(address);
   }
 
-  synchronized boolean hasIncludes() {
-    return !includes.isEmpty();
+  @Override
+  public synchronized String getUpgradeDomain(final DatanodeID dn) {
+    // The include/exclude files based config doesn't support upgrade domain
+    // config.
+    return null;
   }
 
   /**
@@ -133,7 +145,8 @@ class HostFileManager {
    * @param excludeFile the path to the new excludes list
    * @throws IOException thrown if there is a problem reading one of the files
    */
-  void refresh(String includeFile, String excludeFile) throws IOException {
+  private void refresh(String includeFile, String excludeFile)
+      throws IOException {
     HostSet newIncludes = readFile("included", includeFile);
     HostSet newExcludes = readFile("excluded", excludeFile);
 
@@ -153,84 +166,4 @@ class HostFileManager {
       excludes = newExcludes;
     }
   }
-
-  /**
-   * The HostSet allows efficient queries on matching wildcard addresses.
-   * <p/>
-   * For InetSocketAddress A and B with the same host address,
-   * we define a partial order between A and B, A <= B iff A.getPort() == B
-   * .getPort() || B.getPort() == 0.
-   */
-  static class HostSet implements Iterable<InetSocketAddress> {
-    // Host -> lists of ports
-    private final Multimap<InetAddress, Integer> addrs = HashMultimap.create();
-
-    /**
-     * The function that checks whether there exists an entry foo in the set
-     * so that foo <= addr.
-     */
-    boolean matchedBy(InetSocketAddress addr) {
-      Collection<Integer> ports = addrs.get(addr.getAddress());
-      return addr.getPort() == 0 ? !ports.isEmpty() : ports.contains(addr
-              .getPort());
-    }
-
-    /**
-     * The function that checks whether there exists an entry foo in the set
-     * so that addr <= foo.
-     */
-    boolean match(InetSocketAddress addr) {
-      int port = addr.getPort();
-      Collection<Integer> ports = addrs.get(addr.getAddress());
-      boolean exactMatch = ports.contains(port);
-      boolean genericMatch = ports.contains(0);
-      return exactMatch || genericMatch;
-    }
-
-    boolean isEmpty() {
-      return addrs.isEmpty();
-    }
-
-    int size() {
-      return addrs.size();
-    }
-
-    void add(InetSocketAddress addr) {
-      Preconditions.checkArgument(!addr.isUnresolved());
-      addrs.put(addr.getAddress(), addr.getPort());
-    }
-
-    @Override
-    public Iterator<InetSocketAddress> iterator() {
-      return new UnmodifiableIterator<InetSocketAddress>() {
-        private final Iterator<Map.Entry<InetAddress,
-                Integer>> it = addrs.entries().iterator();
-
-        @Override
-        public boolean hasNext() {
-          return it.hasNext();
-        }
-
-        @Override
-        public InetSocketAddress next() {
-          Map.Entry<InetAddress, Integer> e = it.next();
-          return new InetSocketAddress(e.getKey(), e.getValue());
-        }
-      };
-    }
-
-    @Override
-    public String toString() {
-      StringBuilder sb = new StringBuilder("HostSet(");
-      Joiner.on(",").appendTo(sb, Iterators.transform(iterator(),
-              new Function<InetSocketAddress, String>() {
-        @Override
-        public String apply(@Nullable InetSocketAddress addr) {
-          assert addr != null;
-          return addr.getAddress().getHostAddress() + ":" + addr.getPort();
-        }
-      }));
-      return sb.append(")").toString();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostSet.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostSet.java
new file mode 100644
index 0000000..958557b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostSet.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.UnmodifiableIterator;
+
+import javax.annotation.Nullable;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+
+
+/**
+ * The HostSet allows efficient queries on matching wildcard addresses.
+ * <p/>
+ * For InetSocketAddress A and B with the same host address,
+ * we define a partial order between A and B, A <= B iff A.getPort() == B
+ * .getPort() || B.getPort() == 0.
+ */
+public class HostSet implements Iterable<InetSocketAddress> {
+  // Host -> lists of ports
+  private final Multimap<InetAddress, Integer> addrs = HashMultimap.create();
+
+  /**
+   * The function that checks whether there exists an entry foo in the set
+   * so that foo <= addr.
+   */
+  boolean matchedBy(InetSocketAddress addr) {
+    Collection<Integer> ports = addrs.get(addr.getAddress());
+    return addr.getPort() == 0 ? !ports.isEmpty() : ports.contains(addr
+        .getPort());
+  }
+
+  /**
+   * The function that checks whether there exists an entry foo in the set
+   * so that addr <= foo.
+   */
+  boolean match(InetSocketAddress addr) {
+    int port = addr.getPort();
+    Collection<Integer> ports = addrs.get(addr.getAddress());
+    boolean exactMatch = ports.contains(port);
+    boolean genericMatch = ports.contains(0);
+    return exactMatch || genericMatch;
+  }
+
+  boolean isEmpty() {
+    return addrs.isEmpty();
+  }
+
+  int size() {
+    return addrs.size();
+  }
+
+  void add(InetSocketAddress addr) {
+    Preconditions.checkArgument(!addr.isUnresolved());
+    addrs.put(addr.getAddress(), addr.getPort());
+  }
+
+  @Override
+  public Iterator<InetSocketAddress> iterator() {
+    return new UnmodifiableIterator<InetSocketAddress>() {
+      private final Iterator<Map.Entry<InetAddress,
+          Integer>> it = addrs.entries().iterator();
+
+      @Override
+      public boolean hasNext() {
+        return it.hasNext();
+      }
+
+      @Override
+      public InetSocketAddress next() {
+        Map.Entry<InetAddress, Integer> e = it.next();
+        return new InetSocketAddress(e.getKey(), e.getValue());
+      }
+    };
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("HostSet(");
+    Joiner.on(",").appendTo(sb, Iterators.transform(iterator(),
+        new Function<InetSocketAddress, String>() {
+          @Override
+          public String apply(@Nullable InetSocketAddress addr) {
+            assert addr != null;
+            return addr.getAddress().getHostAddress() + ":" + addr.getPort();
+          }
+        }));
+    return sb.append(")").toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 50d09c0..8ab785f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -3107,4 +3107,19 @@
     The size buffer to be used when creating or opening httpfs filesystem IO stream.
     </description>
   </property>
+
+<property>
+  <name>dfs.namenode.hosts.provider.classname</name>
+  <value>org.apache.hadoop.hdfs.server.blockmanagement.HostFileManager</value>
+  <description>
+    The class that provides access for host files.
+    org.apache.hadoop.hdfs.server.blockmanagement.HostFileManager is used
+    by default which loads files specified by dfs.hosts and dfs.hosts.exclude.
+    If org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager is
+    used, it will load the JSON file defined in dfs.hosts.
+    To change class name, nn restart is required. "dfsadmin -refreshNodes" only
+    refreshes the configuration files used by the class.
+  </description>
+</property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsUserGuide.md
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsUserGuide.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsUserGuide.md
index 9c6bd02..28957fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsUserGuide.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsUserGuide.md
@@ -142,12 +142,16 @@ The `bin/hdfs dfsadmin` command supports a few HDFS administration related opera
   during last upgrade.
 
 * `-refreshNodes`: Updates the namenode with the set of datanodes
-  allowed to connect to the namenode. Namenodes re-read datanode
+  allowed to connect to the namenode. By default, Namenodes re-read datanode
   hostnames in the file defined by `dfs.hosts`, `dfs.hosts.exclude`
    Hosts defined in `dfs.hosts` are the datanodes that are part of the
    cluster. If there are entries in `dfs.hosts`, only the hosts in it
    are allowed to register with the namenode. Entries in
    `dfs.hosts.exclude` are datanodes that need to be decommissioned.
+   Alternatively if `dfs.namenode.hosts.provider.classname` is set to
+   `org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager`,
+   all include and exclude hosts are specified in the JSON file defined by
+   `dfs.hosts`.
    Datanodes complete decommissioning when all the replicas from them
    are replicated to other datanodes. Decommissioned nodes are not
    automatically shutdown and are not chosen for writing for new

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeReport.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeReport.java
index 8138298..c990e0f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeReport.java
@@ -29,15 +29,19 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.hdfs.util.HostsFileWriter;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -48,7 +52,57 @@ public class TestDatanodeReport {
   static final Log LOG = LogFactory.getLog(TestDatanodeReport.class);
   final static private Configuration conf = new HdfsConfiguration();
   final static private int NUM_OF_DATANODES = 4;
-    
+
+  /**
+   * This test verifies upgrade domain is set according to the JSON host file.
+   */
+  @Test
+  public void testDatanodeReportWithUpgradeDomain() throws Exception {
+    conf.setInt(
+        DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500); // 0.5s
+    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
+    conf.setClass(DFSConfigKeys.DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY,
+        CombinedHostFileManager.class, HostConfigManager.class);
+    HostsFileWriter hostsFileWriter = new HostsFileWriter();
+    hostsFileWriter.initialize(conf, "temp/datanodeReport");
+
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    final DFSClient client = cluster.getFileSystem().dfs;
+    final String ud1 = "ud1";
+    final String ud2 = "ud2";
+
+    try {
+      //wait until the cluster is up
+      cluster.waitActive();
+
+      DatanodeAdminProperties datanode = new DatanodeAdminProperties();
+      datanode.setHostName(cluster.getDataNodes().get(0).getDatanodeId().getHostName());
+      datanode.setUpgradeDomain(ud1);
+      hostsFileWriter.initIncludeHosts(
+          new DatanodeAdminProperties[]{datanode});
+      client.refreshNodes();
+      DatanodeInfo[] all = client.datanodeReport(DatanodeReportType.ALL);
+      assertEquals(all[0].getUpgradeDomain(), ud1);
+
+      datanode.setUpgradeDomain(null);
+      hostsFileWriter.initIncludeHosts(
+          new DatanodeAdminProperties[]{datanode});
+      client.refreshNodes();
+      all = client.datanodeReport(DatanodeReportType.ALL);
+      assertEquals(all[0].getUpgradeDomain(), null);
+
+      datanode.setUpgradeDomain(ud2);
+      hostsFileWriter.initIncludeHosts(
+          new DatanodeAdminProperties[]{datanode});
+      client.refreshNodes();
+      all = client.datanodeReport(DatanodeReportType.ALL);
+      assertEquals(all[0].getUpgradeDomain(), ud2);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
   /**
    * This test attempts to different types of datanode report.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java
index 3d00e26..0b403d1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.hdfs.util.HostsFileWriter;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.log4j.Level;
 import org.junit.Test;
@@ -384,17 +385,8 @@ public class TestBlocksWithNotEnoughRacks {
     short REPLICATION_FACTOR = 2;
     final Path filePath = new Path("/testFile");
 
-    // Configure an excludes file
-    FileSystem localFileSys = FileSystem.getLocal(conf);
-    Path workingDir = new Path(MiniDFSCluster.getBaseDirectory());
-    Path dir = new Path(workingDir, "temp/decommission");
-    Path excludeFile = new Path(dir, "exclude");
-    Path includeFile = new Path(dir, "include");
-    assertTrue(localFileSys.mkdirs(dir));
-    DFSTestUtil.writeFile(localFileSys, excludeFile, "");
-    DFSTestUtil.writeFile(localFileSys, includeFile, "");
-    conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());
-    conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath());
+    HostsFileWriter hostsFileWriter = new HostsFileWriter();
+    hostsFileWriter.initialize(conf, "temp/decommission");
 
     // Two blocks and four racks
     String racks[] = {"/rack1", "/rack1", "/rack2", "/rack2"};
@@ -415,7 +407,7 @@ public class TestBlocksWithNotEnoughRacks {
       BlockLocation locs[] = fs.getFileBlockLocations(
           fs.getFileStatus(filePath), 0, Long.MAX_VALUE);
       String name = locs[0].getNames()[0];
-      DFSTestUtil.writeFile(localFileSys, excludeFile, name);
+      hostsFileWriter.initExcludeHost(name);
       ns.getBlockManager().getDatanodeManager().refreshNodes(conf);
       DFSTestUtil.waitForDecommission(fs, name);
 
@@ -423,6 +415,7 @@ public class TestBlocksWithNotEnoughRacks {
       DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
     } finally {
       cluster.shutdown();
+      hostsFileWriter.cleanup();
     }
   }
 
@@ -437,17 +430,8 @@ public class TestBlocksWithNotEnoughRacks {
     short REPLICATION_FACTOR = 5;
     final Path filePath = new Path("/testFile");
 
-    // Configure an excludes file
-    FileSystem localFileSys = FileSystem.getLocal(conf);
-    Path workingDir = new Path(MiniDFSCluster.getBaseDirectory());
-    Path dir = new Path(workingDir, "temp/decommission");
-    Path excludeFile = new Path(dir, "exclude");
-    Path includeFile = new Path(dir, "include");
-    assertTrue(localFileSys.mkdirs(dir));
-    DFSTestUtil.writeFile(localFileSys, excludeFile, "");
-    DFSTestUtil.writeFile(localFileSys, includeFile, "");
-    conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath());
-    conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());
+    HostsFileWriter hostsFileWriter = new HostsFileWriter();
+    hostsFileWriter.initialize(conf, "temp/decommission");
 
     // All hosts are on two racks, only one host on /rack2
     String racks[] = {"/rack1", "/rack2", "/rack1", "/rack1", "/rack1"};
@@ -473,7 +457,7 @@ public class TestBlocksWithNotEnoughRacks {
       for (String top : locs[0].getTopologyPaths()) {
         if (!top.startsWith("/rack2")) {
           String name = top.substring("/rack1".length()+1);
-          DFSTestUtil.writeFile(localFileSys, excludeFile, name);
+          hostsFileWriter.initExcludeHost(name);
           ns.getBlockManager().getDatanodeManager().refreshNodes(conf);
           DFSTestUtil.waitForDecommission(fs, name);
           break;
@@ -485,6 +469,7 @@ public class TestBlocksWithNotEnoughRacks {
       DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
     } finally {
       cluster.shutdown();
+      hostsFileWriter.cleanup();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
index a8a1db9..30e2aaf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
@@ -420,9 +420,9 @@ public class TestDatanodeManager {
 
     DatanodeManager dm = mockDatanodeManager(fsn, new Configuration());
     HostFileManager hm = new HostFileManager();
-    HostFileManager.HostSet noNodes = new HostFileManager.HostSet();
-    HostFileManager.HostSet oneNode = new HostFileManager.HostSet();
-    HostFileManager.HostSet twoNodes = new HostFileManager.HostSet();
+    HostSet noNodes = new HostSet();
+    HostSet oneNode = new HostSet();
+    HostSet twoNodes = new HostSet();
     DatanodeRegistration dr1 = new DatanodeRegistration(
       new DatanodeID("127.0.0.1", "127.0.0.1", "someStorageID-123",
           12345, 12345, 12345, 12345),
@@ -439,7 +439,7 @@ public class TestDatanodeManager {
     oneNode.add(entry("127.0.0.1:23456"));
 
     hm.refresh(twoNodes, noNodes);
-    Whitebox.setInternalState(dm, "hostFileManager", hm);
+    Whitebox.setInternalState(dm, "hostConfigManager", hm);
 
     // Register two data nodes to simulate them coming up.
     // We need to add two nodes, because if we have only one node, removing it

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHostFileManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHostFileManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHostFileManager.java
index 6f17040..e6be7cb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHostFileManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHostFileManager.java
@@ -40,7 +40,7 @@ public class TestHostFileManager {
 
   @Test
   public void testDeduplication() {
-    HostFileManager.HostSet s = new HostFileManager.HostSet();
+    HostSet s = new HostSet();
     // These entries will be de-duped, since they refer to the same IP
     // address + port combo.
     s.add(entry("127.0.0.1:12345"));
@@ -60,7 +60,7 @@ public class TestHostFileManager {
 
   @Test
   public void testRelation() {
-    HostFileManager.HostSet s = new HostFileManager.HostSet();
+    HostSet s = new HostSet();
     s.add(entry("127.0.0.1:123"));
     Assert.assertTrue(s.match(entry("127.0.0.1:123")));
     Assert.assertFalse(s.match(entry("127.0.0.1:12")));
@@ -105,8 +105,8 @@ public class TestHostFileManager {
     FSNamesystem fsn = mock(FSNamesystem.class);
     Configuration conf = new Configuration();
     HostFileManager hm = new HostFileManager();
-    HostFileManager.HostSet includedNodes = new HostFileManager.HostSet();
-    HostFileManager.HostSet excludedNodes = new HostFileManager.HostSet();
+    HostSet includedNodes = new HostSet();
+    HostSet excludedNodes = new HostSet();
 
     includedNodes.add(entry("127.0.0.1:12345"));
     includedNodes.add(entry("localhost:12345"));
@@ -122,7 +122,7 @@ public class TestHostFileManager {
     hm.refresh(includedNodes, excludedNodes);
 
     DatanodeManager dm = new DatanodeManager(bm, fsn, conf);
-    Whitebox.setInternalState(dm, "hostFileManager", hm);
+    Whitebox.setInternalState(dm, "hostConfigManager", hm);
     Map<String, DatanodeDescriptor> dnMap = (Map<String,
             DatanodeDescriptor>) Whitebox.getInternalState(dm, "datanodeMap");
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHostsFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHostsFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHostsFiles.java
index bf4d8ff..0ac968a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHostsFiles.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHostsFiles.java
@@ -20,14 +20,13 @@ package org.apache.hadoop.hdfs.server.namenode;
 import static org.junit.Assert.assertTrue;
 
 import java.lang.management.ManagementFactory;
-import java.io.File;
+import java.util.Arrays;
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileSystem;
@@ -37,15 +36,33 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.HostFileManager;
+import org.apache.hadoop.hdfs.util.HostsFileWriter;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 /**
  * DFS_HOSTS and DFS_HOSTS_EXCLUDE tests
  * 
  */
+@RunWith(Parameterized.class)
 public class TestHostsFiles {
   private static final Log LOG =
     LogFactory.getLog(TestHostsFiles.class.getName());
+  private Class hostFileMgrClass;
+
+  public TestHostsFiles(Class hostFileMgrClass) {
+    this.hostFileMgrClass = hostFileMgrClass;
+  }
+
+  @Parameterized.Parameters
+  public static Iterable<Object[]> data() {
+    return Arrays.asList(new Object[][]{
+        {HostFileManager.class}, {CombinedHostFileManager.class}});
+  }
 
   /*
    * Return a configuration object with low timeouts for testing and 
@@ -72,6 +89,10 @@ public class TestHostsFiles {
 
     // Indicates we have multiple racks
     conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, "xyz");
+
+    // Host file manager
+    conf.setClass(DFSConfigKeys.DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY,
+        hostFileMgrClass, HostConfigManager.class);
     return conf;
   }
 
@@ -80,18 +101,8 @@ public class TestHostsFiles {
     Configuration conf = getConf();
     short REPLICATION_FACTOR = 2;
     final Path filePath = new Path("/testFile");
-
-    // Configure an excludes file
-    FileSystem localFileSys = FileSystem.getLocal(conf);
-    Path workingDir = new Path(MiniDFSCluster.getBaseDirectory());
-    Path dir = new Path(workingDir, "temp/decommission");
-    Path excludeFile = new Path(dir, "exclude");
-    Path includeFile = new Path(dir, "include");
-    assertTrue(localFileSys.mkdirs(dir));
-    DFSTestUtil.writeFile(localFileSys, excludeFile, "");
-    DFSTestUtil.writeFile(localFileSys, includeFile, "");
-    conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());
-    conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath());
+    HostsFileWriter hostsFileWriter = new HostsFileWriter();
+    hostsFileWriter.initialize(conf, "temp/decommission");
 
     // Two blocks and four racks
     String racks[] = {"/rack1", "/rack1", "/rack2", "/rack2"};
@@ -112,9 +123,8 @@ public class TestHostsFiles {
       BlockLocation locs[] = fs.getFileBlockLocations(
           fs.getFileStatus(filePath), 0, Long.MAX_VALUE);
       String name = locs[0].getNames()[0];
-      String names = name + "\n" + "localhost:42\n";
-      LOG.info("adding '" + names + "' to exclude file " + excludeFile.toUri().getPath());
-      DFSTestUtil.writeFile(localFileSys, excludeFile, name);
+      LOG.info("adding '" + name + "' to decommission");
+      hostsFileWriter.initExcludeHost(name);
       ns.getBlockManager().getDatanodeManager().refreshNodes(conf);
       DFSTestUtil.waitForDecommission(fs, name);
 
@@ -131,9 +141,7 @@ public class TestHostsFiles {
       if (cluster != null) {
         cluster.shutdown();
       }
-      if (localFileSys.exists(dir)) {
-        FileUtils.deleteQuietly(new File(dir.toUri().getPath()));
-      }
+      hostsFileWriter.cleanup();
     }
   }
 
@@ -141,20 +149,10 @@ public class TestHostsFiles {
   public void testHostsIncludeForDeadCount() throws Exception {
     Configuration conf = getConf();
 
-    // Configure an excludes file
-    FileSystem localFileSys = FileSystem.getLocal(conf);
-    Path workingDir = new Path(MiniDFSCluster.getBaseDirectory());
-    Path dir = new Path(workingDir, "temp/decommission");
-    Path excludeFile = new Path(dir, "exclude");
-    Path includeFile = new Path(dir, "include");
-    assertTrue(localFileSys.mkdirs(dir));
-    StringBuilder includeHosts = new StringBuilder();
-    includeHosts.append("localhost:52").append("\n").append("127.0.0.1:7777")
-        .append("\n");
-    DFSTestUtil.writeFile(localFileSys, excludeFile, "");
-    DFSTestUtil.writeFile(localFileSys, includeFile, includeHosts.toString());
-    conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());
-    conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath());
+    HostsFileWriter hostsFileWriter = new HostsFileWriter();
+    hostsFileWriter.initialize(conf, "temp/decommission");
+    hostsFileWriter.initIncludeHosts(new String[]
+        {"localhost:52","127.0.0.1:7777"});
 
     MiniDFSCluster cluster = null;
     try {
@@ -174,9 +172,7 @@ public class TestHostsFiles {
       if (cluster != null) {
         cluster.shutdown();
       }
-      if (localFileSys.exists(dir)) {
-        FileUtils.deleteQuietly(new File(dir.toUri().getPath()));
-      }
+      hostsFileWriter.cleanup();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java
index e622576..c19b9cf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
 import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
+import org.apache.hadoop.hdfs.util.HostsFileWriter;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator;
 import org.apache.hadoop.net.ServerSocketUtil;
@@ -44,10 +45,10 @@ import org.mortbay.util.ajax.JSON;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import java.io.File;
-import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.net.BindException;
 import java.net.URI;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -238,8 +239,8 @@ public class TestNameNodeMXBean {
     conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1);
     MiniDFSCluster cluster = null;
-    FileSystem localFileSys = null;
-    Path dir = null;
+    HostsFileWriter hostsFileWriter = new HostsFileWriter();
+    hostsFileWriter.initialize(conf, "temp/TestNameNodeMXBean");
 
     try {
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
@@ -251,18 +252,12 @@ public class TestNameNodeMXBean {
       ObjectName mxbeanName = new ObjectName(
         "Hadoop:service=NameNode,name=NameNodeInfo");
 
-      // Define include file to generate deadNodes metrics
-      localFileSys = FileSystem.getLocal(conf);
-      Path workingDir = localFileSys.getWorkingDirectory();
-      dir = new Path(workingDir,"build/test/data/temp/TestNameNodeMXBean");
-      Path includeFile = new Path(dir, "include");
-      assertTrue(localFileSys.mkdirs(dir));
-      StringBuilder includeHosts = new StringBuilder();
+      List<String> hosts = new ArrayList<>();
       for(DataNode dn : cluster.getDataNodes()) {
-        includeHosts.append(dn.getDisplayName()).append("\n");
+        hosts.add(dn.getDisplayName());
       }
-      DFSTestUtil.writeFile(localFileSys, includeFile, includeHosts.toString());
-      conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath());
+      hostsFileWriter.initIncludeHosts(hosts.toArray(
+          new String[hosts.size()]));
       fsn.getBlockManager().getDatanodeManager().refreshNodes(conf);
 
       cluster.stopDataNode(0);
@@ -284,12 +279,10 @@ public class TestNameNodeMXBean {
         assertTrue(deadNode.containsKey("xferaddr"));
       }
     } finally {
-      if ((localFileSys != null) && localFileSys.exists(dir)) {
-        FileUtils.deleteQuietly(new File(dir.toUri().getPath()));
-      }
       if (cluster != null) {
         cluster.shutdown();
       }
+      hostsFileWriter.cleanup();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStartup.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStartup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStartup.java
index 978eef8..2fe25e0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStartup.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStartup.java
@@ -33,7 +33,6 @@ import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.net.URI;
-import java.util.ArrayList;
 import java.nio.file.Paths;
 import java.util.Collection;
 import java.util.Iterator;
@@ -65,6 +64,7 @@ import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.hdfs.util.HostsFileWriter;
 import org.apache.hadoop.hdfs.util.MD5FileUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -590,27 +590,15 @@ public class TestStartup {
   @Test
   public void testNNRestart() throws IOException, InterruptedException {
     MiniDFSCluster cluster = null;
-    FileSystem localFileSys;
-    Path hostsFile;
-    Path excludeFile;
     int HEARTBEAT_INTERVAL = 1; // heartbeat interval in seconds
-    // Set up the hosts/exclude files.
-    localFileSys = FileSystem.getLocal(config);
-    Path workingDir = localFileSys.getWorkingDirectory();
-    Path dir = new Path(workingDir, "build/test/data/work-dir/restartnn");
-    hostsFile = new Path(dir, "hosts");
-    excludeFile = new Path(dir, "exclude");
-
-    // Setup conf
-    config.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());
-    writeConfigFile(localFileSys, excludeFile, null);
-    config.set(DFSConfigKeys.DFS_HOSTS, hostsFile.toUri().getPath());
-    // write into hosts file
-    ArrayList<String>list = new ArrayList<String>();
+
+    HostsFileWriter hostsFileWriter = new HostsFileWriter();
+    hostsFileWriter.initialize(config, "work-dir/restartnn");
+
     byte b[] = {127, 0, 0, 1};
     InetAddress inetAddress = InetAddress.getByAddress(b);
-    list.add(inetAddress.getHostName());
-    writeConfigFile(localFileSys, hostsFile, list);
+    hostsFileWriter.initIncludeHosts(new String[] {inetAddress.getHostName()});
+
     int numDatanodes = 1;
     
     try {
@@ -635,37 +623,12 @@ public class TestStartup {
       fail(StringUtils.stringifyException(e));
       throw e;
     } finally {
-      cleanupFile(localFileSys, excludeFile.getParent());
       if (cluster != null) {
         cluster.shutdown();
       }
+      hostsFileWriter.cleanup();
     }
   }
-  
-  private void writeConfigFile(FileSystem localFileSys, Path name,
-      ArrayList<String> nodes) throws IOException {
-    // delete if it already exists
-    if (localFileSys.exists(name)) {
-      localFileSys.delete(name, true);
-    }
-
-    FSDataOutputStream stm = localFileSys.create(name);
-    if (nodes != null) {
-      for (Iterator<String> it = nodes.iterator(); it.hasNext();) {
-        String node = it.next();
-        stm.writeBytes(node);
-        stm.writeBytes("\n");
-      }
-    }
-    stm.close();
-  }
-  
-  private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
-    assertTrue(fileSys.exists(name));
-    fileSys.delete(name, true);
-    assertTrue(!fileSys.exists(name));
-  }
-
 
   @Test(timeout = 120000)
   public void testXattrConfiguration() throws Exception {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestUpgradeDomainBlockPlacementPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestUpgradeDomainBlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestUpgradeDomainBlockPlacementPolicy.java
new file mode 100644
index 0000000..f9a2503
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestUpgradeDomainBlockPlacementPolicy.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithUpgradeDomain;
+import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.hdfs.util.HostsFileWriter;
+import org.apache.hadoop.net.StaticMapping;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * End-to-end test case for upgrade domain
+ * The test configs upgrade domain for nodes via admin json
+ * config file and put some nodes to decommission state.
+ * The test then verifies replicas are placed on the nodes that
+ * satisfy the upgrade domain policy.
+ *
+ */
+public class TestUpgradeDomainBlockPlacementPolicy {
+
+  private static final short REPLICATION_FACTOR = (short) 3;
+  private static final int DEFAULT_BLOCK_SIZE = 1024;
+  static final String[] racks =
+      { "/RACK1", "/RACK1", "/RACK1", "/RACK2", "/RACK2", "/RACK2" };
+  /**
+   *  Use host names that can be resolved (
+   *  InetSocketAddress#isUnresolved == false). Otherwise,
+   *  CombinedHostFileManager won't allow those hosts.
+   */
+  static final String[] hosts =
+      { "127.0.0.1", "127.0.0.1", "127.0.0.1", "127.0.0.1",
+          "127.0.0.1", "127.0.0.1" };
+  static final String[] upgradeDomains =
+      { "ud1", "ud2", "ud3", "ud1", "ud2", "ud3" };
+  static final Set<DatanodeID> expectedDatanodeIDs = new HashSet<>();
+  private MiniDFSCluster cluster = null;
+  private NamenodeProtocols nameNodeRpc = null;
+  private FSNamesystem namesystem = null;
+  private PermissionStatus perm = null;
+
+  @Before
+  public void setup() throws IOException {
+    StaticMapping.resetMap();
+    Configuration conf = new HdfsConfiguration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
+    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE / 2);
+    conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+        BlockPlacementPolicyWithUpgradeDomain.class,
+        BlockPlacementPolicy.class);
+    conf.setClass(DFSConfigKeys.DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY,
+        CombinedHostFileManager.class, HostConfigManager.class);
+    HostsFileWriter hostsFileWriter = new HostsFileWriter();
+    hostsFileWriter.initialize(conf, "temp/upgradedomainpolicy");
+
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6).racks(racks)
+        .hosts(hosts).build();
+    cluster.waitActive();
+    nameNodeRpc = cluster.getNameNodeRpc();
+    namesystem = cluster.getNamesystem();
+    perm = new PermissionStatus("TestDefaultBlockPlacementPolicy", null,
+        FsPermission.getDefault());
+    refreshDatanodeAdminProperties(hostsFileWriter);
+    hostsFileWriter.cleanup();
+  }
+
+  @After
+  public void teardown() {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  /**
+   * Define admin properties for these datanodes as follows.
+   * dn0 and dn3 have upgrade domain ud1.
+   * dn1 and dn4 have upgrade domain ud2.
+   * dn2 and dn5 have upgrade domain ud3.
+   * dn0 and dn5 are decommissioned.
+   * Given dn0, dn1 and dn2 are on rack1 and dn3, dn4 and dn5 are on
+   * rack2. Then any block's replicas should be on either
+   * {dn1, dn2, d3} or {dn2, dn3, dn4}.
+   */
+  private void refreshDatanodeAdminProperties(HostsFileWriter hostsFileWriter)
+      throws IOException {
+    DatanodeAdminProperties[] datanodes = new DatanodeAdminProperties[
+        hosts.length];
+    for (int i = 0; i < hosts.length; i++) {
+      datanodes[i] = new DatanodeAdminProperties();
+      DatanodeID datanodeID = cluster.getDataNodes().get(i).getDatanodeId();
+      datanodes[i].setHostName(datanodeID.getHostName());
+      datanodes[i].setPort(datanodeID.getXferPort());
+      datanodes[i].setUpgradeDomain(upgradeDomains[i]);
+    }
+    datanodes[0].setAdminState(DatanodeInfo.AdminStates.DECOMMISSIONED);
+    datanodes[5].setAdminState(DatanodeInfo.AdminStates.DECOMMISSIONED);
+    hostsFileWriter.initIncludeHosts(datanodes);
+    cluster.getFileSystem().refreshNodes();
+
+    expectedDatanodeIDs.add(cluster.getDataNodes().get(2).getDatanodeId());
+    expectedDatanodeIDs.add(cluster.getDataNodes().get(3).getDatanodeId());
+  }
+
+  @Test
+  public void testPlacement() throws Exception {
+    String clientMachine = "127.0.0.1";
+    for (int i = 0; i < 5; i++) {
+      String src = "/test-" + i;
+      // Create the file with client machine
+      HdfsFileStatus fileStatus = namesystem.startFile(src, perm,
+          clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true,
+          REPLICATION_FACTOR, DEFAULT_BLOCK_SIZE, null, false);
+      LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine,
+          null, null, fileStatus.getFileId(), null, null);
+
+      assertEquals("Block should be allocated sufficient locations",
+          REPLICATION_FACTOR, locatedBlock.getLocations().length);
+      Set<DatanodeInfo> locs = new HashSet<>(Arrays.asList(
+          locatedBlock.getLocations()));
+      for (DatanodeID datanodeID : expectedDatanodeIDs) {
+        locs.contains(datanodeID);
+      }
+
+      nameNodeRpc.abandonBlock(locatedBlock.getBlock(), fileStatus.getFileId(),
+          src, clientMachine);
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org