You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by mi...@apache.org on 2017/05/02 13:53:50 UTC
[2/2] hadoop git commit: HDFS-9005. Provide configuration support for
upgrade domain.
HDFS-9005. Provide configuration support for upgrade domain.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c4c55332
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c4c55332
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c4c55332
Branch: refs/heads/branch-2.8
Commit: c4c5533216eaaa64731a6f0c1bc9be9b1e91f7d6
Parents: c9bf21b
Author: Ming Ma <mi...@twitter.com>
Authored: Tue May 2 06:53:32 2017 -0700
Committer: Ming Ma <mi...@twitter.com>
Committed: Tue May 2 06:53:32 2017 -0700
----------------------------------------------------------------------
.../hdfs/protocol/DatanodeAdminProperties.java | 100 ++++++++
.../apache/hadoop/hdfs/protocol/DatanodeID.java | 6 +
.../hdfs/util/CombinedHostsFileReader.java | 76 ++++++
.../hdfs/util/CombinedHostsFileWriter.java | 69 +++++
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 +-
.../CombinedHostFileManager.java | 250 +++++++++++++++++++
.../server/blockmanagement/DatanodeManager.java | 59 +++--
.../blockmanagement/HostConfigManager.java | 80 ++++++
.../server/blockmanagement/HostFileManager.java | 147 +++--------
.../hdfs/server/blockmanagement/HostSet.java | 114 +++++++++
.../src/main/resources/hdfs-default.xml | 15 ++
.../src/site/markdown/HdfsUserGuide.md | 6 +-
.../apache/hadoop/hdfs/TestDatanodeReport.java | 56 ++++-
.../TestBlocksWithNotEnoughRacks.java | 33 +--
.../blockmanagement/TestDatanodeManager.java | 8 +-
.../blockmanagement/TestHostFileManager.java | 10 +-
.../hdfs/server/namenode/TestHostsFiles.java | 70 +++---
.../server/namenode/TestNameNodeMXBean.java | 25 +-
.../hdfs/server/namenode/TestStartup.java | 53 +---
.../TestUpgradeDomainBlockPlacementPolicy.java | 169 +++++++++++++
.../hadoop/hdfs/util/HostsFileWriter.java | 122 +++++++++
.../hdfs/util/TestCombinedHostsFileReader.java | 79 ++++++
.../src/test/resources/dfs.hosts.json | 5 +
23 files changed, 1291 insertions(+), 265 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeAdminProperties.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeAdminProperties.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeAdminProperties.java
new file mode 100644
index 0000000..9f7b983
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeAdminProperties.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
+
+/**
+ * The class describes the configured admin properties for a datanode.
+ *
+ * It is the static configuration specified by administrators via dfsadmin
+ * command; different from the runtime state. CombinedHostFileManager uses
+ * the class to deserialize the configurations from json-based file format.
+ *
+ * To decommission a node, use AdminStates.DECOMMISSIONED.
+ */
+public class DatanodeAdminProperties {
+ private String hostName;
+ private int port;
+ private String upgradeDomain;
+ private AdminStates adminState = AdminStates.NORMAL;
+
+ /**
+ * Return the host name of the datanode.
+ * @return the host name of the datanode.
+ */
+ public String getHostName() {
+ return hostName;
+ }
+
+ /**
+ * Set the host name of the datanode.
+ * @param hostName the host name of the datanode.
+ */
+ public void setHostName(final String hostName) {
+ this.hostName = hostName;
+ }
+
+ /**
+ * Get the port number of the datanode.
+ * @return the port number of the datanode.
+ */
+ public int getPort() {
+ return port;
+ }
+
+ /**
+ * Set the port number of the datanode.
+ * @param port the port number of the datanode.
+ */
+ public void setPort(final int port) {
+ this.port = port;
+ }
+
+ /**
+ * Get the upgrade domain of the datanode.
+ * @return the upgrade domain of the datanode.
+ */
+ public String getUpgradeDomain() {
+ return upgradeDomain;
+ }
+
+ /**
+ * Set the upgrade domain of the datanode.
+ * @param upgradeDomain the upgrade domain of the datanode.
+ */
+ public void setUpgradeDomain(final String upgradeDomain) {
+ this.upgradeDomain = upgradeDomain;
+ }
+
+ /**
+ * Get the admin state of the datanode.
+ * @return the admin state of the datanode.
+ */
+ public AdminStates getAdminState() {
+ return adminState;
+ }
+
+ /**
+ * Set the admin state of the datanode.
+ * @param adminState the admin state of the datanode.
+ */
+ public void setAdminState(final AdminStates adminState) {
+ this.adminState = adminState;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
index 86782f2..e94c07d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.classification.InterfaceStability;
import com.google.common.annotations.VisibleForTesting;
+import java.net.InetSocketAddress;
+
/**
* This class represents the primary identifier for a Datanode.
* Datanodes are identified by how they can be contacted (hostname
@@ -272,4 +274,8 @@ public class DatanodeID implements Comparable<DatanodeID> {
public int compareTo(DatanodeID that) {
return getXferAddr().compareTo(that.getXferAddr());
}
+
+ public InetSocketAddress getResolvedAddress() {
+ return new InetSocketAddress(this.getIpAddr(), this.getXferPort());
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/CombinedHostsFileReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/CombinedHostsFileReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/CombinedHostsFileReader.java
new file mode 100644
index 0000000..33acb91
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/CombinedHostsFileReader.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.util;
+
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.io.IOException;
+import java.io.Reader;
+
+import java.util.Iterator;
+import java.util.Set;
+import java.util.HashSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties;
+
+/**
+ * Reader support for JSON based datanode configuration, an alternative
+ * to the exclude/include files configuration.
+ * The JSON file format is the array of elements where each element
+ * in the array describes the properties of a datanode. The properties of
+ * a datanode is defined in {@link DatanodeAdminProperties}. For example,
+ *
+ * {"hostName": "host1"}
+ * {"hostName": "host2", "port": 50, "upgradeDomain": "ud0"}
+ * {"hostName": "host3", "port": 0, "adminState": "DECOMMISSIONED"}
+ */
+@InterfaceAudience.LimitedPrivate({"HDFS"})
+@InterfaceStability.Unstable
+public final class CombinedHostsFileReader {
+ private CombinedHostsFileReader() {
+ }
+
+ /**
+ * Deserialize a set of DatanodeAdminProperties from a json file.
+ * @param hostsFile the input json file to read from.
+ * @return the set of DatanodeAdminProperties
+ * @throws IOException
+ */
+ public static Set<DatanodeAdminProperties>
+ readFile(final String hostsFile) throws IOException {
+ HashSet<DatanodeAdminProperties> allDNs = new HashSet<>();
+ ObjectMapper mapper = new ObjectMapper();
+ try (Reader input =
+ new InputStreamReader(new FileInputStream(hostsFile), "UTF-8")) {
+ Iterator<DatanodeAdminProperties> iterator =
+ mapper.readValues(new JsonFactory().createJsonParser(input),
+ DatanodeAdminProperties.class);
+ while (iterator.hasNext()) {
+ DatanodeAdminProperties properties = iterator.next();
+ allDNs.add(properties);
+ }
+ }
+ return allDNs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/CombinedHostsFileWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/CombinedHostsFileWriter.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/CombinedHostsFileWriter.java
new file mode 100644
index 0000000..ea70be2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/CombinedHostsFileWriter.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.util;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties;
+
+/**
+ * Writer support for JSON based datanode configuration, an alternative
+ * to the exclude/include files configuration.
+ * The JSON file format is the array of elements where each element
+ * in the array describes the properties of a datanode. The properties of
+ * a datanode is defined in {@link DatanodeAdminProperties}. For example,
+ *
+ * {"hostName": "host1"}
+ * {"hostName": "host2", "port": 50, "upgradeDomain": "ud0"}
+ * {"hostName": "host3", "port": 0, "adminState": "DECOMMISSIONED"}
+ */
+@InterfaceAudience.LimitedPrivate({"HDFS"})
+@InterfaceStability.Unstable
+public final class CombinedHostsFileWriter {
+ private CombinedHostsFileWriter() {
+ }
+
+ /**
+ * Serialize a set of DatanodeAdminProperties to a json file.
+ * @param hostsFile the json file name.
+ * @param allDNs the set of DatanodeAdminProperties
+ * @throws IOException
+ */
+ public static void writeFile(final String hostsFile,
+ final Set<DatanodeAdminProperties> allDNs) throws IOException {
+ StringBuilder configs = new StringBuilder();
+ try (Writer output =
+ new OutputStreamWriter(new FileOutputStream(hostsFile), "UTF-8")) {
+ for (DatanodeAdminProperties datanodeAdminProperties: allDNs) {
+ ObjectMapper mapper = new ObjectMapper();
+ configs.append(mapper.writeValueAsString(datanodeAdminProperties));
+ }
+ output.write(configs.toString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index b62010b..51a75cd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -426,12 +426,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_METRICS_PERCENTILES_INTERVALS_KEY = "dfs.metrics.percentiles.intervals";
public static final String DFS_DATANODE_HOST_NAME_KEY =
HdfsClientConfigKeys.DeprecatedKeys.DFS_DATANODE_HOST_NAME_KEY;
- public static final String DFS_NAMENODE_HOSTS_KEY = "dfs.namenode.hosts";
- public static final String DFS_NAMENODE_HOSTS_EXCLUDE_KEY = "dfs.namenode.hosts.exclude";
public static final String DFS_NAMENODE_CHECKPOINT_DIR_KEY =
HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY;
public static final String DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY =
HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY;
+ public static final String DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY =
+ "dfs.namenode.hosts.provider.classname";
public static final String DFS_HOSTS = "dfs.hosts";
public static final String DFS_HOSTS_EXCLUDE = "dfs.hosts.exclude";
public static final String DFS_NAMENODE_AUDIT_LOGGERS_KEY = "dfs.namenode.audit.loggers";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CombinedHostFileManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CombinedHostFileManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CombinedHostFileManager.java
new file mode 100644
index 0000000..3e913b9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CombinedHostFileManager.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.UnmodifiableIterator;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Collections2;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
+
+import java.io.IOException;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Predicate;
+
+import org.apache.hadoop.hdfs.util.CombinedHostsFileReader;
+
+/**
+ * This class manages datanode configuration using a json file.
+ * Please refer to {@link CombinedHostsFileReader} for the json format.
+ * <p/>
+ * <p/>
+ * Entries may or may not specify a port. If they don't, we consider
+ * them to apply to every DataNode on that host. The code canonicalizes the
+ * entries into IP addresses.
+ * <p/>
+ * <p/>
+ * The code ignores all entries that the DNS fails to resolve their IP
+ * addresses. This is okay because by default the NN rejects the registrations
+ * of DNs when it fails to do a forward and reverse lookup. Note that DNS
+ * resolutions are only done during the loading time to minimize the latency.
+ */
+public class CombinedHostFileManager extends HostConfigManager {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ CombinedHostFileManager.class);
+ private Configuration conf;
+ private HostProperties hostProperties = new HostProperties();
+
+ static class HostProperties {
+ private Multimap<InetAddress, DatanodeAdminProperties> allDNs =
+ HashMultimap.create();
+ // optimization. If every node in the file isn't in service, it implies
+ // any node is allowed to register with nn. This is equivalent to having
+ // an empty "include" file.
+ private boolean emptyInServiceNodeLists = true;
+ synchronized void add(InetAddress addr,
+ DatanodeAdminProperties properties) {
+ allDNs.put(addr, properties);
+ if (properties.getAdminState().equals(
+ AdminStates.NORMAL)) {
+ emptyInServiceNodeLists = false;
+ }
+ }
+
+ // If the includes list is empty, act as if everything is in the
+ // includes list.
+ synchronized boolean isIncluded(final InetSocketAddress address) {
+ return emptyInServiceNodeLists || Iterables.any(
+ allDNs.get(address.getAddress()),
+ new Predicate<DatanodeAdminProperties>() {
+ public boolean apply(DatanodeAdminProperties input) {
+ return input.getPort() == 0 ||
+ input.getPort() == address.getPort();
+ }
+ });
+ }
+
+ synchronized boolean isExcluded(final InetSocketAddress address) {
+ return Iterables.any(allDNs.get(address.getAddress()),
+ new Predicate<DatanodeAdminProperties>() {
+ public boolean apply(DatanodeAdminProperties input) {
+ return input.getAdminState().equals(
+ AdminStates.DECOMMISSIONED) &&
+ (input.getPort() == 0 ||
+ input.getPort() == address.getPort());
+ }
+ });
+ }
+
+ synchronized String getUpgradeDomain(final InetSocketAddress address) {
+ Iterable<DatanodeAdminProperties> datanode = Iterables.filter(
+ allDNs.get(address.getAddress()),
+ new Predicate<DatanodeAdminProperties>() {
+ public boolean apply(DatanodeAdminProperties input) {
+ return (input.getPort() == 0 ||
+ input.getPort() == address.getPort());
+ }
+ });
+ return datanode.iterator().hasNext() ?
+ datanode.iterator().next().getUpgradeDomain() : null;
+ }
+
+ Iterable<InetSocketAddress> getIncludes() {
+ return new Iterable<InetSocketAddress>() {
+ @Override
+ public Iterator<InetSocketAddress> iterator() {
+ return new HostIterator(allDNs.entries());
+ }
+ };
+ }
+
+ Iterable<InetSocketAddress> getExcludes() {
+ return new Iterable<InetSocketAddress>() {
+ @Override
+ public Iterator<InetSocketAddress> iterator() {
+ return new HostIterator(
+ Collections2.filter(allDNs.entries(),
+ new Predicate<java.util.Map.Entry<InetAddress,
+ DatanodeAdminProperties>>() {
+ public boolean apply(java.util.Map.Entry<InetAddress,
+ DatanodeAdminProperties> entry) {
+ return entry.getValue().getAdminState().equals(
+ AdminStates.DECOMMISSIONED);
+ }
+ }
+ ));
+ }
+ };
+ }
+
+ static class HostIterator extends UnmodifiableIterator<InetSocketAddress> {
+ private final Iterator<Map.Entry<InetAddress,
+ DatanodeAdminProperties>> it;
+ public HostIterator(Collection<java.util.Map.Entry<InetAddress,
+ DatanodeAdminProperties>> nodes) {
+ this.it = nodes.iterator();
+ }
+ @Override
+ public boolean hasNext() {
+ return it.hasNext();
+ }
+
+ @Override
+ public InetSocketAddress next() {
+ Map.Entry<InetAddress, DatanodeAdminProperties> e = it.next();
+ return new InetSocketAddress(e.getKey(), e.getValue().getPort());
+ }
+ }
+ }
+
+ @Override
+ public Iterable<InetSocketAddress> getIncludes() {
+ return hostProperties.getIncludes();
+ }
+
+ @Override
+ public Iterable<InetSocketAddress> getExcludes() {
+ return hostProperties.getExcludes();
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void refresh() throws IOException {
+ refresh(conf.get(DFSConfigKeys.DFS_HOSTS, ""));
+ }
+ private void refresh(final String hostsFile) throws IOException {
+ HostProperties hostProps = new HostProperties();
+ Set<DatanodeAdminProperties> all =
+ CombinedHostsFileReader.readFile(hostsFile);
+ for(DatanodeAdminProperties properties : all) {
+ InetSocketAddress addr = parseEntry(hostsFile,
+ properties.getHostName(), properties.getPort());
+ if (addr != null) {
+ hostProps.add(addr.getAddress(), properties);
+ }
+ }
+ refresh(hostProps);
+ }
+
+ @VisibleForTesting
+ static InetSocketAddress parseEntry(final String fn, final String hostName,
+ final int port) {
+ InetSocketAddress addr = new InetSocketAddress(hostName, port);
+ if (addr.isUnresolved()) {
+ LOG.warn("Failed to resolve {} in {}. ", hostName, fn);
+ return null;
+ }
+ return addr;
+ }
+
+ @Override
+ public synchronized boolean isIncluded(final DatanodeID dn) {
+ return hostProperties.isIncluded(dn.getResolvedAddress());
+ }
+
+ @Override
+ public synchronized boolean isExcluded(final DatanodeID dn) {
+ return isExcluded(dn.getResolvedAddress());
+ }
+
+ private boolean isExcluded(final InetSocketAddress address) {
+ return hostProperties.isExcluded(address);
+ }
+
+ @Override
+ public synchronized String getUpgradeDomain(final DatanodeID dn) {
+ return hostProperties.getUpgradeDomain(dn.getResolvedAddress());
+ }
+
+ /**
+ * Set the properties lists by the new instances. The
+ * old instance is discarded.
+ * @param hostProperties the new properties list
+ */
+ @VisibleForTesting
+ private void refresh(final HostProperties hostProperties) {
+ synchronized (this) {
+ this.hostProperties = hostProperties;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index ef4a47a..eeda5b1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -105,7 +105,7 @@ public class DatanodeManager {
private final int defaultIpcPort;
/** Read include/exclude files*/
- private final HostFileManager hostFileManager = new HostFileManager();
+ private HostConfigManager hostConfigManager;
/** The period to wait for datanode heartbeat.*/
private long heartbeatExpireInterval;
@@ -198,9 +198,11 @@ public class DatanodeManager {
this.defaultIpcPort = NetUtils.createSocketAddr(
conf.getTrimmed(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY,
DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_DEFAULT)).getPort();
+ this.hostConfigManager = ReflectionUtils.newInstance(
+ conf.getClass(DFSConfigKeys.DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY,
+ HostFileManager.class, HostConfigManager.class), conf);
try {
- this.hostFileManager.refresh(conf.get(DFSConfigKeys.DFS_HOSTS, ""),
- conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
+ this.hostConfigManager.refresh();
} catch (IOException e) {
LOG.error("error reading hosts files: ", e);
}
@@ -218,7 +220,7 @@ public class DatanodeManager {
// in the cache; so future calls to resolve will be fast.
if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
final ArrayList<String> locations = new ArrayList<>();
- for (InetSocketAddress addr : hostFileManager.getIncludes()) {
+ for (InetSocketAddress addr : hostConfigManager.getIncludes()) {
locations.add(addr.getAddress().getHostAddress());
}
dnsToSwitchMapping.resolve(locations);
@@ -331,8 +333,8 @@ public class DatanodeManager {
return decomManager;
}
- HostFileManager getHostFileManager() {
- return hostFileManager;
+ public HostConfigManager getHostConfigManager() {
+ return hostConfigManager;
}
@VisibleForTesting
@@ -622,6 +624,7 @@ public class DatanodeManager {
networktopology.add(node); // may throw InvalidTopologyException
host2DatanodeMap.add(node);
checkIfClusterIsNowMultiRack(node);
+ resolveUpgradeDomain(node);
blockManager.getBlockReportLeaseManager().register(node);
if (LOG.isDebugEnabled()) {
@@ -706,7 +709,14 @@ public class DatanodeManager {
return new HashMap<> (this.datanodesSoftwareVersions);
}
}
-
+
+ void resolveUpgradeDomain(DatanodeDescriptor node) {
+ String upgradeDomain = hostConfigManager.getUpgradeDomain(node);
+ if (upgradeDomain != null && upgradeDomain.length() > 0) {
+ node.setUpgradeDomain(upgradeDomain);
+ }
+ }
+
/**
* Resolve a node's network location. If the DNS to switch mapping fails
* then this method guarantees default rack location.
@@ -836,7 +846,7 @@ public class DatanodeManager {
*/
void startDecommissioningIfExcluded(DatanodeDescriptor nodeReg) {
// If the registered node is in exclude list, then decommission it
- if (getHostFileManager().isExcluded(nodeReg)) {
+ if (getHostConfigManager().isExcluded(nodeReg)) {
decomManager.startDecommission(nodeReg);
}
}
@@ -876,7 +886,7 @@ public class DatanodeManager {
// Checks if the node is not on the hosts list. If it is not, then
// it will be disallowed from registering.
- if (!hostFileManager.isIncluded(nodeReg)) {
+ if (!hostConfigManager.isIncluded(nodeReg)) {
throw new DisallowedDatanodeException(nodeReg);
}
@@ -944,7 +954,8 @@ public class DatanodeManager {
getNetworkDependenciesWithDefault(nodeS));
}
getNetworkTopology().add(nodeS);
-
+ resolveUpgradeDomain(nodeS);
+
// also treat the registration message as a heartbeat
heartbeatManager.register(nodeS);
incrementVersionCount(nodeS.getSoftwareVersion());
@@ -976,7 +987,8 @@ public class DatanodeManager {
}
networktopology.add(nodeDescr);
nodeDescr.setSoftwareVersion(nodeReg.getSoftwareVersion());
-
+ resolveUpgradeDomain(nodeDescr);
+
// register new datanode
addDatanode(nodeDescr);
// also treat the registration message as a heartbeat
@@ -1030,9 +1042,9 @@ public class DatanodeManager {
// Update the file names and refresh internal includes and excludes list.
if (conf == null) {
conf = new HdfsConfiguration();
+ this.hostConfigManager.setConf(conf);
}
- this.hostFileManager.refresh(conf.get(DFSConfigKeys.DFS_HOSTS, ""),
- conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
+ this.hostConfigManager.refresh();
}
/**
@@ -1044,15 +1056,16 @@ public class DatanodeManager {
private void refreshDatanodes() {
for(DatanodeDescriptor node : datanodeMap.values()) {
// Check if not include.
- if (!hostFileManager.isIncluded(node)) {
+ if (!hostConfigManager.isIncluded(node)) {
node.setDisallowed(true); // case 2.
} else {
- if (hostFileManager.isExcluded(node)) {
+ if (hostConfigManager.isExcluded(node)) {
decomManager.startDecommission(node); // case 3.
} else {
decomManager.stopDecommission(node); // case 4.
}
}
+ node.setUpgradeDomain(hostConfigManager.getUpgradeDomain(node));
}
}
@@ -1260,9 +1273,9 @@ public class DatanodeManager {
type == DatanodeReportType.DECOMMISSIONING;
ArrayList<DatanodeDescriptor> nodes;
- final HostFileManager.HostSet foundNodes = new HostFileManager.HostSet();
- final HostFileManager.HostSet includedNodes = hostFileManager.getIncludes();
- final HostFileManager.HostSet excludedNodes = hostFileManager.getExcludes();
+ final HostSet foundNodes = new HostSet();
+ final Iterable<InetSocketAddress> includedNodes =
+ hostConfigManager.getIncludes();
synchronized(datanodeMap) {
nodes = new ArrayList<>(datanodeMap.size());
@@ -1273,11 +1286,11 @@ public class DatanodeManager {
if (((listLiveNodes && !isDead) ||
(listDeadNodes && isDead) ||
(listDecommissioningNodes && isDecommissioning)) &&
- hostFileManager.isIncluded(dn)) {
+ hostConfigManager.isIncluded(dn)) {
nodes.add(dn);
}
- foundNodes.add(HostFileManager.resolvedAddressFromDatanodeID(dn));
+ foundNodes.add(dn.getResolvedAddress());
}
}
Collections.sort(nodes);
@@ -1301,7 +1314,7 @@ public class DatanodeManager {
addr.getPort() == 0 ? defaultXferPort : addr.getPort(),
defaultInfoPort, defaultInfoSecurePort, defaultIpcPort));
setDatanodeDead(dn);
- if (excludedNodes.match(addr)) {
+ if (hostConfigManager.isExcluded(dn)) {
dn.setDecommissioned();
}
nodes.add(dn);
@@ -1310,8 +1323,8 @@ public class DatanodeManager {
if (LOG.isDebugEnabled()) {
LOG.debug("getDatanodeListForReport with " +
- "includedNodes = " + hostFileManager.getIncludes() +
- ", excludedNodes = " + hostFileManager.getExcludes() +
+ "includedNodes = " + hostConfigManager.getIncludes() +
+ ", excludedNodes = " + hostConfigManager.getExcludes() +
", foundNodes = " + foundNodes +
", nodes = " + nodes);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostConfigManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostConfigManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostConfigManager.java
new file mode 100644
index 0000000..f28ed29
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostConfigManager.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+/**
+ * This interface abstracts how datanode configuration is managed.
+ *
+ * Each implementation defines its own way to persist the configuration.
+ * For example, it can use one JSON file to store the configs for all
+ * datanodes; or it can use one file to store in-service datanodes and another
+ * file to store decommission-requested datanodes.
+ *
+ * These files control which DataNodes the NameNode expects to see in the
+ * cluster.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class HostConfigManager implements Configurable {
+
+ /**
+ * Return all the datanodes that are allowed to connect to the namenode.
+ * @return Iterable of all datanodes
+ */
+ public abstract Iterable<InetSocketAddress> getIncludes();
+
+ /**
+ * Return all datanodes that should be in decommissioned state.
+ * @return Iterable of those datanodes
+ */
+ public abstract Iterable<InetSocketAddress> getExcludes();
+
+ /**
+ * Check if a datanode is allowed to connect the namenode.
+ * @param dn the DatanodeID of the datanode
+ * @return boolean if dn is allowed to connect the namenode.
+ */
+ public abstract boolean isIncluded(DatanodeID dn);
+
+ /**
+ * Check if a datanode needs to be decommissioned.
+ * @param dn the DatanodeID of the datanode
+ * @return boolean if dn needs to be decommissioned.
+ */
+ public abstract boolean isExcluded(DatanodeID dn);
+
+ /**
+ * Reload the configuration.
+ */
+ public abstract void refresh() throws IOException;
+
+ /**
+ * Get the upgrade domain of a datanode.
+ * @param dn the DatanodeID of the datanode
+ * @return the upgrade domain of dn.
+ */
+ public abstract String getUpgradeDomain(DatanodeID dn);
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostFileManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostFileManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostFileManager.java
index e05ef9a..bcfebf2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostFileManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostFileManager.java
@@ -18,28 +18,18 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.UnmodifiableIterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.util.HostsFileReader;
-import javax.annotation.Nullable;
import java.io.IOException;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.Collection;
import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
/**
* This class manages the include and exclude files for HDFS.
@@ -59,11 +49,27 @@ import java.util.Map;
* of DNs when it fails to do a forward and reverse lookup. Note that DNS
* resolutions are only done during the loading time to minimize the latency.
*/
-class HostFileManager {
+public class HostFileManager extends HostConfigManager {
private static final Log LOG = LogFactory.getLog(HostFileManager.class);
+ private Configuration conf;
private HostSet includes = new HostSet();
private HostSet excludes = new HostSet();
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void refresh() throws IOException {
+ refresh(conf.get(DFSConfigKeys.DFS_HOSTS, ""),
+ conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
+ }
private static HostSet readFile(String type, String filename)
throws IOException {
HostSet res = new HostSet();
@@ -99,31 +105,37 @@ class HostFileManager {
return null;
}
- static InetSocketAddress resolvedAddressFromDatanodeID(DatanodeID id) {
- return new InetSocketAddress(id.getIpAddr(), id.getXferPort());
- }
-
- synchronized HostSet getIncludes() {
+ @Override
+ public synchronized HostSet getIncludes() {
return includes;
}
- synchronized HostSet getExcludes() {
+ @Override
+ public synchronized HostSet getExcludes() {
return excludes;
}
// If the includes list is empty, act as if everything is in the
// includes list.
- synchronized boolean isIncluded(DatanodeID dn) {
- return includes.isEmpty() || includes.match
- (resolvedAddressFromDatanodeID(dn));
+ @Override
+ public synchronized boolean isIncluded(DatanodeID dn) {
+ return includes.isEmpty() || includes.match(dn.getResolvedAddress());
+ }
+
+ @Override
+ public synchronized boolean isExcluded(DatanodeID dn) {
+ return isExcluded(dn.getResolvedAddress());
}
- synchronized boolean isExcluded(DatanodeID dn) {
- return excludes.match(resolvedAddressFromDatanodeID(dn));
+ private boolean isExcluded(InetSocketAddress address) {
+ return excludes.match(address);
}
- synchronized boolean hasIncludes() {
- return !includes.isEmpty();
+ @Override
+ public synchronized String getUpgradeDomain(final DatanodeID dn) {
+ // The include/exclude files based config doesn't support upgrade domain
+ // config.
+ return null;
}
/**
@@ -133,7 +145,8 @@ class HostFileManager {
* @param excludeFile the path to the new excludes list
* @throws IOException thrown if there is a problem reading one of the files
*/
- void refresh(String includeFile, String excludeFile) throws IOException {
+ private void refresh(String includeFile, String excludeFile)
+ throws IOException {
HostSet newIncludes = readFile("included", includeFile);
HostSet newExcludes = readFile("excluded", excludeFile);
@@ -153,84 +166,4 @@ class HostFileManager {
excludes = newExcludes;
}
}
-
- /**
- * The HostSet allows efficient queries on matching wildcard addresses.
- * <p/>
- * For InetSocketAddress A and B with the same host address,
- * we define a partial order between A and B, A <= B iff A.getPort() == B
- * .getPort() || B.getPort() == 0.
- */
- static class HostSet implements Iterable<InetSocketAddress> {
- // Host -> lists of ports
- private final Multimap<InetAddress, Integer> addrs = HashMultimap.create();
-
- /**
- * The function that checks whether there exists an entry foo in the set
- * so that foo <= addr.
- */
- boolean matchedBy(InetSocketAddress addr) {
- Collection<Integer> ports = addrs.get(addr.getAddress());
- return addr.getPort() == 0 ? !ports.isEmpty() : ports.contains(addr
- .getPort());
- }
-
- /**
- * The function that checks whether there exists an entry foo in the set
- * so that addr <= foo.
- */
- boolean match(InetSocketAddress addr) {
- int port = addr.getPort();
- Collection<Integer> ports = addrs.get(addr.getAddress());
- boolean exactMatch = ports.contains(port);
- boolean genericMatch = ports.contains(0);
- return exactMatch || genericMatch;
- }
-
- boolean isEmpty() {
- return addrs.isEmpty();
- }
-
- int size() {
- return addrs.size();
- }
-
- void add(InetSocketAddress addr) {
- Preconditions.checkArgument(!addr.isUnresolved());
- addrs.put(addr.getAddress(), addr.getPort());
- }
-
- @Override
- public Iterator<InetSocketAddress> iterator() {
- return new UnmodifiableIterator<InetSocketAddress>() {
- private final Iterator<Map.Entry<InetAddress,
- Integer>> it = addrs.entries().iterator();
-
- @Override
- public boolean hasNext() {
- return it.hasNext();
- }
-
- @Override
- public InetSocketAddress next() {
- Map.Entry<InetAddress, Integer> e = it.next();
- return new InetSocketAddress(e.getKey(), e.getValue());
- }
- };
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder("HostSet(");
- Joiner.on(",").appendTo(sb, Iterators.transform(iterator(),
- new Function<InetSocketAddress, String>() {
- @Override
- public String apply(@Nullable InetSocketAddress addr) {
- assert addr != null;
- return addr.getAddress().getHostAddress() + ":" + addr.getPort();
- }
- }));
- return sb.append(")").toString();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostSet.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostSet.java
new file mode 100644
index 0000000..958557b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostSet.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.UnmodifiableIterator;
+
+import javax.annotation.Nullable;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+
+
+/**
+ * The HostSet allows efficient queries on matching wildcard addresses.
+ * <p/>
+ * For InetSocketAddress A and B with the same host address,
+ * we define a partial order between A and B, A <= B iff A.getPort() == B
+ * .getPort() || B.getPort() == 0.
+ */
+public class HostSet implements Iterable<InetSocketAddress> {
+ // Host -> lists of ports
+ private final Multimap<InetAddress, Integer> addrs = HashMultimap.create();
+
+ /**
+ * The function that checks whether there exists an entry foo in the set
+ * so that foo <= addr.
+ */
+ boolean matchedBy(InetSocketAddress addr) {
+ Collection<Integer> ports = addrs.get(addr.getAddress());
+ return addr.getPort() == 0 ? !ports.isEmpty() : ports.contains(addr
+ .getPort());
+ }
+
+ /**
+ * The function that checks whether there exists an entry foo in the set
+ * so that addr <= foo.
+ */
+ boolean match(InetSocketAddress addr) {
+ int port = addr.getPort();
+ Collection<Integer> ports = addrs.get(addr.getAddress());
+ boolean exactMatch = ports.contains(port);
+ boolean genericMatch = ports.contains(0);
+ return exactMatch || genericMatch;
+ }
+
+ boolean isEmpty() {
+ return addrs.isEmpty();
+ }
+
+ int size() {
+ return addrs.size();
+ }
+
+ void add(InetSocketAddress addr) {
+ Preconditions.checkArgument(!addr.isUnresolved());
+ addrs.put(addr.getAddress(), addr.getPort());
+ }
+
+ @Override
+ public Iterator<InetSocketAddress> iterator() {
+ return new UnmodifiableIterator<InetSocketAddress>() {
+ private final Iterator<Map.Entry<InetAddress,
+ Integer>> it = addrs.entries().iterator();
+
+ @Override
+ public boolean hasNext() {
+ return it.hasNext();
+ }
+
+ @Override
+ public InetSocketAddress next() {
+ Map.Entry<InetAddress, Integer> e = it.next();
+ return new InetSocketAddress(e.getKey(), e.getValue());
+ }
+ };
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("HostSet(");
+ Joiner.on(",").appendTo(sb, Iterators.transform(iterator(),
+ new Function<InetSocketAddress, String>() {
+ @Override
+ public String apply(@Nullable InetSocketAddress addr) {
+ assert addr != null;
+ return addr.getAddress().getHostAddress() + ":" + addr.getPort();
+ }
+ }));
+ return sb.append(")").toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 50d09c0..8ab785f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -3107,4 +3107,19 @@
The size buffer to be used when creating or opening httpfs filesystem IO stream.
</description>
</property>
+
+<property>
+ <name>dfs.namenode.hosts.provider.classname</name>
+ <value>org.apache.hadoop.hdfs.server.blockmanagement.HostFileManager</value>
+ <description>
+ The class that provides access for host files.
+ org.apache.hadoop.hdfs.server.blockmanagement.HostFileManager is used
+ by default which loads files specified by dfs.hosts and dfs.hosts.exclude.
+ If org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager is
+ used, it will load the JSON file defined in dfs.hosts.
+ To change class name, nn restart is required. "dfsadmin -refreshNodes" only
+ refreshes the configuration files used by the class.
+ </description>
+</property>
+
</configuration>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsUserGuide.md
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsUserGuide.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsUserGuide.md
index 9c6bd02..28957fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsUserGuide.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsUserGuide.md
@@ -142,12 +142,16 @@ The `bin/hdfs dfsadmin` command supports a few HDFS administration related opera
during last upgrade.
* `-refreshNodes`: Updates the namenode with the set of datanodes
- allowed to connect to the namenode. Namenodes re-read datanode
+ allowed to connect to the namenode. By default, Namenodes re-read datanode
hostnames in the file defined by `dfs.hosts`, `dfs.hosts.exclude`
Hosts defined in `dfs.hosts` are the datanodes that are part of the
cluster. If there are entries in `dfs.hosts`, only the hosts in it
are allowed to register with the namenode. Entries in
`dfs.hosts.exclude` are datanodes that need to be decommissioned.
+ Alternatively if `dfs.namenode.hosts.provider.classname` is set to
+ `org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager`,
+ all include and exclude hosts are specified in the JSON file defined by
+ `dfs.hosts`.
Datanodes complete decommissioning when all the replicas from them
are replicated to other datanodes. Decommissioned nodes are not
automatically shutdown and are not chosen for writing for new
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeReport.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeReport.java
index 8138298..c990e0f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeReport.java
@@ -29,15 +29,19 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.hdfs.util.HostsFileWriter;
import org.junit.Assert;
import org.junit.Test;
@@ -48,7 +52,57 @@ public class TestDatanodeReport {
static final Log LOG = LogFactory.getLog(TestDatanodeReport.class);
final static private Configuration conf = new HdfsConfiguration();
final static private int NUM_OF_DATANODES = 4;
-
+
+ /**
+ * This test verifies upgrade domain is set according to the JSON host file.
+ */
+ @Test
+ public void testDatanodeReportWithUpgradeDomain() throws Exception {
+ conf.setInt(
+ DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500); // 0.5s
+ conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
+ conf.setClass(DFSConfigKeys.DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY,
+ CombinedHostFileManager.class, HostConfigManager.class);
+ HostsFileWriter hostsFileWriter = new HostsFileWriter();
+ hostsFileWriter.initialize(conf, "temp/datanodeReport");
+
+ MiniDFSCluster cluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ final DFSClient client = cluster.getFileSystem().dfs;
+ final String ud1 = "ud1";
+ final String ud2 = "ud2";
+
+ try {
+ //wait until the cluster is up
+ cluster.waitActive();
+
+ DatanodeAdminProperties datanode = new DatanodeAdminProperties();
+ datanode.setHostName(cluster.getDataNodes().get(0).getDatanodeId().getHostName());
+ datanode.setUpgradeDomain(ud1);
+ hostsFileWriter.initIncludeHosts(
+ new DatanodeAdminProperties[]{datanode});
+ client.refreshNodes();
+ DatanodeInfo[] all = client.datanodeReport(DatanodeReportType.ALL);
+ assertEquals(all[0].getUpgradeDomain(), ud1);
+
+ datanode.setUpgradeDomain(null);
+ hostsFileWriter.initIncludeHosts(
+ new DatanodeAdminProperties[]{datanode});
+ client.refreshNodes();
+ all = client.datanodeReport(DatanodeReportType.ALL);
+ assertEquals(all[0].getUpgradeDomain(), null);
+
+ datanode.setUpgradeDomain(ud2);
+ hostsFileWriter.initIncludeHosts(
+ new DatanodeAdminProperties[]{datanode});
+ client.refreshNodes();
+ all = client.datanodeReport(DatanodeReportType.ALL);
+ assertEquals(all[0].getUpgradeDomain(), ud2);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
/**
* This test attempts to different types of datanode report.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java
index 3d00e26..0b403d1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.hdfs.util.HostsFileWriter;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.Test;
@@ -384,17 +385,8 @@ public class TestBlocksWithNotEnoughRacks {
short REPLICATION_FACTOR = 2;
final Path filePath = new Path("/testFile");
- // Configure an excludes file
- FileSystem localFileSys = FileSystem.getLocal(conf);
- Path workingDir = new Path(MiniDFSCluster.getBaseDirectory());
- Path dir = new Path(workingDir, "temp/decommission");
- Path excludeFile = new Path(dir, "exclude");
- Path includeFile = new Path(dir, "include");
- assertTrue(localFileSys.mkdirs(dir));
- DFSTestUtil.writeFile(localFileSys, excludeFile, "");
- DFSTestUtil.writeFile(localFileSys, includeFile, "");
- conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());
- conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath());
+ HostsFileWriter hostsFileWriter = new HostsFileWriter();
+ hostsFileWriter.initialize(conf, "temp/decommission");
// Two blocks and four racks
String racks[] = {"/rack1", "/rack1", "/rack2", "/rack2"};
@@ -415,7 +407,7 @@ public class TestBlocksWithNotEnoughRacks {
BlockLocation locs[] = fs.getFileBlockLocations(
fs.getFileStatus(filePath), 0, Long.MAX_VALUE);
String name = locs[0].getNames()[0];
- DFSTestUtil.writeFile(localFileSys, excludeFile, name);
+ hostsFileWriter.initExcludeHost(name);
ns.getBlockManager().getDatanodeManager().refreshNodes(conf);
DFSTestUtil.waitForDecommission(fs, name);
@@ -423,6 +415,7 @@ public class TestBlocksWithNotEnoughRacks {
DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
} finally {
cluster.shutdown();
+ hostsFileWriter.cleanup();
}
}
@@ -437,17 +430,8 @@ public class TestBlocksWithNotEnoughRacks {
short REPLICATION_FACTOR = 5;
final Path filePath = new Path("/testFile");
- // Configure an excludes file
- FileSystem localFileSys = FileSystem.getLocal(conf);
- Path workingDir = new Path(MiniDFSCluster.getBaseDirectory());
- Path dir = new Path(workingDir, "temp/decommission");
- Path excludeFile = new Path(dir, "exclude");
- Path includeFile = new Path(dir, "include");
- assertTrue(localFileSys.mkdirs(dir));
- DFSTestUtil.writeFile(localFileSys, excludeFile, "");
- DFSTestUtil.writeFile(localFileSys, includeFile, "");
- conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath());
- conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());
+ HostsFileWriter hostsFileWriter = new HostsFileWriter();
+ hostsFileWriter.initialize(conf, "temp/decommission");
// All hosts are on two racks, only one host on /rack2
String racks[] = {"/rack1", "/rack2", "/rack1", "/rack1", "/rack1"};
@@ -473,7 +457,7 @@ public class TestBlocksWithNotEnoughRacks {
for (String top : locs[0].getTopologyPaths()) {
if (!top.startsWith("/rack2")) {
String name = top.substring("/rack1".length()+1);
- DFSTestUtil.writeFile(localFileSys, excludeFile, name);
+ hostsFileWriter.initExcludeHost(name);
ns.getBlockManager().getDatanodeManager().refreshNodes(conf);
DFSTestUtil.waitForDecommission(fs, name);
break;
@@ -485,6 +469,7 @@ public class TestBlocksWithNotEnoughRacks {
DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
} finally {
cluster.shutdown();
+ hostsFileWriter.cleanup();
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
index a8a1db9..30e2aaf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
@@ -420,9 +420,9 @@ public class TestDatanodeManager {
DatanodeManager dm = mockDatanodeManager(fsn, new Configuration());
HostFileManager hm = new HostFileManager();
- HostFileManager.HostSet noNodes = new HostFileManager.HostSet();
- HostFileManager.HostSet oneNode = new HostFileManager.HostSet();
- HostFileManager.HostSet twoNodes = new HostFileManager.HostSet();
+ HostSet noNodes = new HostSet();
+ HostSet oneNode = new HostSet();
+ HostSet twoNodes = new HostSet();
DatanodeRegistration dr1 = new DatanodeRegistration(
new DatanodeID("127.0.0.1", "127.0.0.1", "someStorageID-123",
12345, 12345, 12345, 12345),
@@ -439,7 +439,7 @@ public class TestDatanodeManager {
oneNode.add(entry("127.0.0.1:23456"));
hm.refresh(twoNodes, noNodes);
- Whitebox.setInternalState(dm, "hostFileManager", hm);
+ Whitebox.setInternalState(dm, "hostConfigManager", hm);
// Register two data nodes to simulate them coming up.
// We need to add two nodes, because if we have only one node, removing it
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHostFileManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHostFileManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHostFileManager.java
index 6f17040..e6be7cb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHostFileManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHostFileManager.java
@@ -40,7 +40,7 @@ public class TestHostFileManager {
@Test
public void testDeduplication() {
- HostFileManager.HostSet s = new HostFileManager.HostSet();
+ HostSet s = new HostSet();
// These entries will be de-duped, since they refer to the same IP
// address + port combo.
s.add(entry("127.0.0.1:12345"));
@@ -60,7 +60,7 @@ public class TestHostFileManager {
@Test
public void testRelation() {
- HostFileManager.HostSet s = new HostFileManager.HostSet();
+ HostSet s = new HostSet();
s.add(entry("127.0.0.1:123"));
Assert.assertTrue(s.match(entry("127.0.0.1:123")));
Assert.assertFalse(s.match(entry("127.0.0.1:12")));
@@ -105,8 +105,8 @@ public class TestHostFileManager {
FSNamesystem fsn = mock(FSNamesystem.class);
Configuration conf = new Configuration();
HostFileManager hm = new HostFileManager();
- HostFileManager.HostSet includedNodes = new HostFileManager.HostSet();
- HostFileManager.HostSet excludedNodes = new HostFileManager.HostSet();
+ HostSet includedNodes = new HostSet();
+ HostSet excludedNodes = new HostSet();
includedNodes.add(entry("127.0.0.1:12345"));
includedNodes.add(entry("localhost:12345"));
@@ -122,7 +122,7 @@ public class TestHostFileManager {
hm.refresh(includedNodes, excludedNodes);
DatanodeManager dm = new DatanodeManager(bm, fsn, conf);
- Whitebox.setInternalState(dm, "hostFileManager", hm);
+ Whitebox.setInternalState(dm, "hostConfigManager", hm);
Map<String, DatanodeDescriptor> dnMap = (Map<String,
DatanodeDescriptor>) Whitebox.getInternalState(dm, "datanodeMap");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHostsFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHostsFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHostsFiles.java
index bf4d8ff..0ac968a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHostsFiles.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHostsFiles.java
@@ -20,14 +20,13 @@ package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.assertTrue;
import java.lang.management.ManagementFactory;
-import java.io.File;
+import java.util.Arrays;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileSystem;
@@ -37,15 +36,33 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.HostFileManager;
+import org.apache.hadoop.hdfs.util.HostsFileWriter;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
/**
* DFS_HOSTS and DFS_HOSTS_EXCLUDE tests
*
*/
+@RunWith(Parameterized.class)
public class TestHostsFiles {
private static final Log LOG =
LogFactory.getLog(TestHostsFiles.class.getName());
+ private Class hostFileMgrClass;
+
+ public TestHostsFiles(Class hostFileMgrClass) {
+ this.hostFileMgrClass = hostFileMgrClass;
+ }
+
+ @Parameterized.Parameters
+ public static Iterable<Object[]> data() {
+ return Arrays.asList(new Object[][]{
+ {HostFileManager.class}, {CombinedHostFileManager.class}});
+ }
/*
* Return a configuration object with low timeouts for testing and
@@ -72,6 +89,10 @@ public class TestHostsFiles {
// Indicates we have multiple racks
conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, "xyz");
+
+ // Host file manager
+ conf.setClass(DFSConfigKeys.DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY,
+ hostFileMgrClass, HostConfigManager.class);
return conf;
}
@@ -80,18 +101,8 @@ public class TestHostsFiles {
Configuration conf = getConf();
short REPLICATION_FACTOR = 2;
final Path filePath = new Path("/testFile");
-
- // Configure an excludes file
- FileSystem localFileSys = FileSystem.getLocal(conf);
- Path workingDir = new Path(MiniDFSCluster.getBaseDirectory());
- Path dir = new Path(workingDir, "temp/decommission");
- Path excludeFile = new Path(dir, "exclude");
- Path includeFile = new Path(dir, "include");
- assertTrue(localFileSys.mkdirs(dir));
- DFSTestUtil.writeFile(localFileSys, excludeFile, "");
- DFSTestUtil.writeFile(localFileSys, includeFile, "");
- conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());
- conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath());
+ HostsFileWriter hostsFileWriter = new HostsFileWriter();
+ hostsFileWriter.initialize(conf, "temp/decommission");
// Two blocks and four racks
String racks[] = {"/rack1", "/rack1", "/rack2", "/rack2"};
@@ -112,9 +123,8 @@ public class TestHostsFiles {
BlockLocation locs[] = fs.getFileBlockLocations(
fs.getFileStatus(filePath), 0, Long.MAX_VALUE);
String name = locs[0].getNames()[0];
- String names = name + "\n" + "localhost:42\n";
- LOG.info("adding '" + names + "' to exclude file " + excludeFile.toUri().getPath());
- DFSTestUtil.writeFile(localFileSys, excludeFile, name);
+ LOG.info("adding '" + name + "' to decommission");
+ hostsFileWriter.initExcludeHost(name);
ns.getBlockManager().getDatanodeManager().refreshNodes(conf);
DFSTestUtil.waitForDecommission(fs, name);
@@ -131,9 +141,7 @@ public class TestHostsFiles {
if (cluster != null) {
cluster.shutdown();
}
- if (localFileSys.exists(dir)) {
- FileUtils.deleteQuietly(new File(dir.toUri().getPath()));
- }
+ hostsFileWriter.cleanup();
}
}
@@ -141,20 +149,10 @@ public class TestHostsFiles {
public void testHostsIncludeForDeadCount() throws Exception {
Configuration conf = getConf();
- // Configure an excludes file
- FileSystem localFileSys = FileSystem.getLocal(conf);
- Path workingDir = new Path(MiniDFSCluster.getBaseDirectory());
- Path dir = new Path(workingDir, "temp/decommission");
- Path excludeFile = new Path(dir, "exclude");
- Path includeFile = new Path(dir, "include");
- assertTrue(localFileSys.mkdirs(dir));
- StringBuilder includeHosts = new StringBuilder();
- includeHosts.append("localhost:52").append("\n").append("127.0.0.1:7777")
- .append("\n");
- DFSTestUtil.writeFile(localFileSys, excludeFile, "");
- DFSTestUtil.writeFile(localFileSys, includeFile, includeHosts.toString());
- conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());
- conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath());
+ HostsFileWriter hostsFileWriter = new HostsFileWriter();
+ hostsFileWriter.initialize(conf, "temp/decommission");
+ hostsFileWriter.initIncludeHosts(new String[]
+ {"localhost:52","127.0.0.1:7777"});
MiniDFSCluster cluster = null;
try {
@@ -174,9 +172,7 @@ public class TestHostsFiles {
if (cluster != null) {
cluster.shutdown();
}
- if (localFileSys.exists(dir)) {
- FileUtils.deleteQuietly(new File(dir.toUri().getPath()));
- }
+ hostsFileWriter.cleanup();
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java
index e622576..c19b9cf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
+import org.apache.hadoop.hdfs.util.HostsFileWriter;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator;
import org.apache.hadoop.net.ServerSocketUtil;
@@ -44,10 +45,10 @@ import org.mortbay.util.ajax.JSON;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.io.File;
-import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.BindException;
import java.net.URI;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -238,8 +239,8 @@ public class TestNameNodeMXBean {
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1);
MiniDFSCluster cluster = null;
- FileSystem localFileSys = null;
- Path dir = null;
+ HostsFileWriter hostsFileWriter = new HostsFileWriter();
+ hostsFileWriter.initialize(conf, "temp/TestNameNodeMXBean");
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
@@ -251,18 +252,12 @@ public class TestNameNodeMXBean {
ObjectName mxbeanName = new ObjectName(
"Hadoop:service=NameNode,name=NameNodeInfo");
- // Define include file to generate deadNodes metrics
- localFileSys = FileSystem.getLocal(conf);
- Path workingDir = localFileSys.getWorkingDirectory();
- dir = new Path(workingDir,"build/test/data/temp/TestNameNodeMXBean");
- Path includeFile = new Path(dir, "include");
- assertTrue(localFileSys.mkdirs(dir));
- StringBuilder includeHosts = new StringBuilder();
+ List<String> hosts = new ArrayList<>();
for(DataNode dn : cluster.getDataNodes()) {
- includeHosts.append(dn.getDisplayName()).append("\n");
+ hosts.add(dn.getDisplayName());
}
- DFSTestUtil.writeFile(localFileSys, includeFile, includeHosts.toString());
- conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath());
+ hostsFileWriter.initIncludeHosts(hosts.toArray(
+ new String[hosts.size()]));
fsn.getBlockManager().getDatanodeManager().refreshNodes(conf);
cluster.stopDataNode(0);
@@ -284,12 +279,10 @@ public class TestNameNodeMXBean {
assertTrue(deadNode.containsKey("xferaddr"));
}
} finally {
- if ((localFileSys != null) && localFileSys.exists(dir)) {
- FileUtils.deleteQuietly(new File(dir.toUri().getPath()));
- }
if (cluster != null) {
cluster.shutdown();
}
+ hostsFileWriter.cleanup();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStartup.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStartup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStartup.java
index 978eef8..2fe25e0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStartup.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStartup.java
@@ -33,7 +33,6 @@ import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.URI;
-import java.util.ArrayList;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.Iterator;
@@ -65,6 +64,7 @@ import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.hdfs.util.HostsFileWriter;
import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.test.GenericTestUtils;
@@ -590,27 +590,15 @@ public class TestStartup {
@Test
public void testNNRestart() throws IOException, InterruptedException {
MiniDFSCluster cluster = null;
- FileSystem localFileSys;
- Path hostsFile;
- Path excludeFile;
int HEARTBEAT_INTERVAL = 1; // heartbeat interval in seconds
- // Set up the hosts/exclude files.
- localFileSys = FileSystem.getLocal(config);
- Path workingDir = localFileSys.getWorkingDirectory();
- Path dir = new Path(workingDir, "build/test/data/work-dir/restartnn");
- hostsFile = new Path(dir, "hosts");
- excludeFile = new Path(dir, "exclude");
-
- // Setup conf
- config.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());
- writeConfigFile(localFileSys, excludeFile, null);
- config.set(DFSConfigKeys.DFS_HOSTS, hostsFile.toUri().getPath());
- // write into hosts file
- ArrayList<String>list = new ArrayList<String>();
+
+ HostsFileWriter hostsFileWriter = new HostsFileWriter();
+ hostsFileWriter.initialize(config, "work-dir/restartnn");
+
byte b[] = {127, 0, 0, 1};
InetAddress inetAddress = InetAddress.getByAddress(b);
- list.add(inetAddress.getHostName());
- writeConfigFile(localFileSys, hostsFile, list);
+ hostsFileWriter.initIncludeHosts(new String[] {inetAddress.getHostName()});
+
int numDatanodes = 1;
try {
@@ -635,37 +623,12 @@ public class TestStartup {
fail(StringUtils.stringifyException(e));
throw e;
} finally {
- cleanupFile(localFileSys, excludeFile.getParent());
if (cluster != null) {
cluster.shutdown();
}
+ hostsFileWriter.cleanup();
}
}
-
- private void writeConfigFile(FileSystem localFileSys, Path name,
- ArrayList<String> nodes) throws IOException {
- // delete if it already exists
- if (localFileSys.exists(name)) {
- localFileSys.delete(name, true);
- }
-
- FSDataOutputStream stm = localFileSys.create(name);
- if (nodes != null) {
- for (Iterator<String> it = nodes.iterator(); it.hasNext();) {
- String node = it.next();
- stm.writeBytes(node);
- stm.writeBytes("\n");
- }
- }
- stm.close();
- }
-
- private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
- assertTrue(fileSys.exists(name));
- fileSys.delete(name, true);
- assertTrue(!fileSys.exists(name));
- }
-
@Test(timeout = 120000)
public void testXattrConfiguration() throws Exception {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4c55332/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestUpgradeDomainBlockPlacementPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestUpgradeDomainBlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestUpgradeDomainBlockPlacementPolicy.java
new file mode 100644
index 0000000..f9a2503
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestUpgradeDomainBlockPlacementPolicy.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithUpgradeDomain;
+import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.hdfs.util.HostsFileWriter;
+import org.apache.hadoop.net.StaticMapping;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * End-to-end test case for upgrade domain
+ * The test configs upgrade domain for nodes via admin json
+ * config file and put some nodes to decommission state.
+ * The test then verifies replicas are placed on the nodes that
+ * satisfy the upgrade domain policy.
+ *
+ */
+public class TestUpgradeDomainBlockPlacementPolicy {
+
+ private static final short REPLICATION_FACTOR = (short) 3;
+ private static final int DEFAULT_BLOCK_SIZE = 1024;
+ static final String[] racks =
+ { "/RACK1", "/RACK1", "/RACK1", "/RACK2", "/RACK2", "/RACK2" };
+ /**
+ * Use host names that can be resolved (
+ * InetSocketAddress#isUnresolved == false). Otherwise,
+ * CombinedHostFileManager won't allow those hosts.
+ */
+ static final String[] hosts =
+ { "127.0.0.1", "127.0.0.1", "127.0.0.1", "127.0.0.1",
+ "127.0.0.1", "127.0.0.1" };
+ static final String[] upgradeDomains =
+ { "ud1", "ud2", "ud3", "ud1", "ud2", "ud3" };
+ static final Set<DatanodeID> expectedDatanodeIDs = new HashSet<>();
+ private MiniDFSCluster cluster = null;
+ private NamenodeProtocols nameNodeRpc = null;
+ private FSNamesystem namesystem = null;
+ private PermissionStatus perm = null;
+
+ @Before
+ public void setup() throws IOException {
+ StaticMapping.resetMap();
+ Configuration conf = new HdfsConfiguration();
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
+ conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE / 2);
+ conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+ BlockPlacementPolicyWithUpgradeDomain.class,
+ BlockPlacementPolicy.class);
+ conf.setClass(DFSConfigKeys.DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY,
+ CombinedHostFileManager.class, HostConfigManager.class);
+ HostsFileWriter hostsFileWriter = new HostsFileWriter();
+ hostsFileWriter.initialize(conf, "temp/upgradedomainpolicy");
+
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6).racks(racks)
+ .hosts(hosts).build();
+ cluster.waitActive();
+ nameNodeRpc = cluster.getNameNodeRpc();
+ namesystem = cluster.getNamesystem();
+ perm = new PermissionStatus("TestDefaultBlockPlacementPolicy", null,
+ FsPermission.getDefault());
+ refreshDatanodeAdminProperties(hostsFileWriter);
+ hostsFileWriter.cleanup();
+ }
+
+ @After
+ public void teardown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ cluster = null;
+ }
+ }
+
+ /**
+ * Define admin properties for these datanodes as follows.
+ * dn0 and dn3 have upgrade domain ud1.
+ * dn1 and dn4 have upgrade domain ud2.
+ * dn2 and dn5 have upgrade domain ud3.
+ * dn0 and dn5 are decommissioned.
+ * Given dn0, dn1 and dn2 are on rack1 and dn3, dn4 and dn5 are on
+ * rack2. Then any block's replicas should be on either
+ * {dn1, dn2, d3} or {dn2, dn3, dn4}.
+ */
+ private void refreshDatanodeAdminProperties(HostsFileWriter hostsFileWriter)
+ throws IOException {
+ DatanodeAdminProperties[] datanodes = new DatanodeAdminProperties[
+ hosts.length];
+ for (int i = 0; i < hosts.length; i++) {
+ datanodes[i] = new DatanodeAdminProperties();
+ DatanodeID datanodeID = cluster.getDataNodes().get(i).getDatanodeId();
+ datanodes[i].setHostName(datanodeID.getHostName());
+ datanodes[i].setPort(datanodeID.getXferPort());
+ datanodes[i].setUpgradeDomain(upgradeDomains[i]);
+ }
+ datanodes[0].setAdminState(DatanodeInfo.AdminStates.DECOMMISSIONED);
+ datanodes[5].setAdminState(DatanodeInfo.AdminStates.DECOMMISSIONED);
+ hostsFileWriter.initIncludeHosts(datanodes);
+ cluster.getFileSystem().refreshNodes();
+
+ expectedDatanodeIDs.add(cluster.getDataNodes().get(2).getDatanodeId());
+ expectedDatanodeIDs.add(cluster.getDataNodes().get(3).getDatanodeId());
+ }
+
+ @Test
+ public void testPlacement() throws Exception {
+ String clientMachine = "127.0.0.1";
+ for (int i = 0; i < 5; i++) {
+ String src = "/test-" + i;
+ // Create the file with client machine
+ HdfsFileStatus fileStatus = namesystem.startFile(src, perm,
+ clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true,
+ REPLICATION_FACTOR, DEFAULT_BLOCK_SIZE, null, false);
+ LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine,
+ null, null, fileStatus.getFileId(), null, null);
+
+ assertEquals("Block should be allocated sufficient locations",
+ REPLICATION_FACTOR, locatedBlock.getLocations().length);
+ Set<DatanodeInfo> locs = new HashSet<>(Arrays.asList(
+ locatedBlock.getLocations()));
+ for (DatanodeID datanodeID : expectedDatanodeIDs) {
+ locs.contains(datanodeID);
+ }
+
+ nameNodeRpc.abandonBlock(locatedBlock.getBlock(), fileStatus.getFileId(),
+ src, clientMachine);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org