You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2013/06/03 19:14:28 UTC
svn commit: r1489065 - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/
src/main/java/org/apache/hadoop/hdfs/server/datanode/ src/test/java/or...
Author: cmccabe
Date: Mon Jun 3 17:14:27 2013
New Revision: 1489065
URL: http://svn.apache.org/r1489065
Log:
HDFS-3934. duplicative dfs_hosts entries handled wrong. (cmccabe)
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1489065&r1=1489064&r2=1489065&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Jun 3 17:14:27 2013
@@ -238,6 +238,9 @@ Trunk (Unreleased)
HDFS-4687. TestDelegationTokenForProxyUser#testWebHdfsDoAs is flaky with
JDK7. (Andrew Wang via atm)
+ HDFS-3934. duplicative dfs_hosts entries handled wrong. (Colin Patrick
+ McCabe)
+
Release 2.1.0-beta - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1489065&r1=1489064&r2=1489065&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Mon Jun 3 17:14:27 2013
@@ -251,6 +251,9 @@ public class DFSConfigKeys extends Commo
public static final String DFS_DATANODE_HOST_NAME_KEY = "dfs.datanode.hostname";
public static final String DFS_NAMENODE_HOSTS_KEY = "dfs.namenode.hosts";
public static final String DFS_NAMENODE_HOSTS_EXCLUDE_KEY = "dfs.namenode.hosts.exclude";
+ public static final String DFS_NAMENODE_HOSTS_DNS_RESOLUTION_INTERVAL_SECONDS =
+ "dfs.namenode.hosts.dns.resolution.interval.seconds";
+ public static final int DFS_NAMENODE_HOSTS_DNS_RESOLUTION_INTERVAL_SECONDS_DEFAULT = 300;
public static final String DFS_CLIENT_SOCKET_TIMEOUT_KEY = "dfs.client.socket-timeout";
public static final String DFS_NAMENODE_CHECKPOINT_DIR_KEY = "dfs.namenode.checkpoint.dir";
public static final String DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY = "dfs.namenode.checkpoint.edits.dir";
@@ -324,7 +327,7 @@ public class DFSConfigKeys extends Commo
public static final String DFS_DATANODE_HTTPS_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_DATANODE_HTTPS_DEFAULT_PORT;
public static final String DFS_DATANODE_IPC_ADDRESS_KEY = "dfs.datanode.ipc.address";
public static final int DFS_DATANODE_IPC_DEFAULT_PORT = 50020;
- public static final String DFS_DATANODE_IPC_ADDRESS_DEFAULT = "0.0.0.0" + DFS_DATANODE_IPC_DEFAULT_PORT;
+ public static final String DFS_DATANODE_IPC_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_DATANODE_IPC_DEFAULT_PORT;
public static final String DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY = "dfs.datanode.min.supported.namenode.version";
public static final String DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_DEFAULT = "3.0.0-SNAPSHOT";
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1489065&r1=1489064&r2=1489065&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Mon Jun 3 17:14:27 2013
@@ -26,11 +26,9 @@ import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableMap;
-import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
@@ -50,6 +48,10 @@ import org.apache.hadoop.hdfs.protocol.H
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
+import org.apache.hadoop.hdfs.server.namenode.HostFileManager;
+import org.apache.hadoop.hdfs.server.namenode.HostFileManager.Entry;
+import org.apache.hadoop.hdfs.server.namenode.HostFileManager.EntrySet;
+import org.apache.hadoop.hdfs.server.namenode.HostFileManager.MutableEntrySet;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
@@ -62,16 +64,17 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
import org.apache.hadoop.hdfs.util.CyclicIteration;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.CachedDNSToSwitchMapping;
import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.net.ScriptBasedMapping;
import org.apache.hadoop.util.Daemon;
-import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
@@ -120,8 +123,14 @@ public class DatanodeManager {
private final DNSToSwitchMapping dnsToSwitchMapping;
+ private final int defaultXferPort;
+
+ private final int defaultInfoPort;
+
+ private final int defaultIpcPort;
+
/** Read include/exclude files*/
- private final HostsFileReader hostsReader;
+ private final HostFileManager hostFileManager;
/** The period to wait for datanode heartbeat.*/
private final long heartbeatExpireInterval;
@@ -162,13 +171,30 @@ public class DatanodeManager {
this.namesystem = namesystem;
this.blockManager = blockManager;
+ int dnsResolutionSeconds = conf.getInt(
+ DFSConfigKeys.DFS_NAMENODE_HOSTS_DNS_RESOLUTION_INTERVAL_SECONDS,
+ DFSConfigKeys.DFS_NAMENODE_HOSTS_DNS_RESOLUTION_INTERVAL_SECONDS_DEFAULT);
+ this.hostFileManager = new HostFileManager(dnsResolutionSeconds);
+
networktopology = NetworkTopology.getInstance(conf);
this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);
- this.hostsReader = new HostsFileReader(
- conf.get(DFSConfigKeys.DFS_HOSTS, ""),
+ this.defaultXferPort = NetUtils.createSocketAddr(
+ conf.get(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY,
+ DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT)).getPort();
+ this.defaultInfoPort = NetUtils.createSocketAddr(
+ conf.get(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY,
+ DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_DEFAULT)).getPort();
+ this.defaultIpcPort = NetUtils.createSocketAddr(
+ conf.get(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY,
+ DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_DEFAULT)).getPort();
+ try {
+ this.hostFileManager.refresh(conf.get(DFSConfigKeys.DFS_HOSTS, ""),
conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
+ } catch (IOException e) {
+ LOG.error("error reading hosts files: ", e);
+ }
this.dnsToSwitchMapping = ReflectionUtils.newInstance(
conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
@@ -178,9 +204,15 @@ public class DatanodeManager {
// locations of those hosts in the include list and store the mapping
// in the cache; so future calls to resolve will be fast.
if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
- dnsToSwitchMapping.resolve(new ArrayList<String>(hostsReader.getHosts()));
- }
-
+ final ArrayList<String> locations = new ArrayList<String>();
+ for (Entry entry : hostFileManager.getIncludes()) {
+ if (!entry.getIpAddress().isEmpty()) {
+ locations.add(entry.getIpAddress());
+ }
+ }
+ dnsToSwitchMapping.resolve(locations);
+ };
+
final long heartbeatIntervalSeconds = conf.getLong(
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
@@ -264,6 +296,7 @@ public class DatanodeManager {
}
void close() {
+ IOUtils.cleanup(LOG, hostFileManager);
if (decommissionthread != null) {
decommissionthread.interrupt();
try {
@@ -533,14 +566,6 @@ public class DatanodeManager {
return networkLocation;
}
- private boolean inHostsList(DatanodeID node) {
- return checkInList(node, hostsReader.getHosts(), false);
- }
-
- private boolean inExcludedHostsList(DatanodeID node) {
- return checkInList(node, hostsReader.getExcludedHosts(), true);
- }
-
/**
* Remove an already decommissioned data node who is neither in include nor
* exclude hosts lists from the the list of live or dead nodes. This is used
@@ -570,13 +595,13 @@ public class DatanodeManager {
private void removeDecomNodeFromList(final List<DatanodeDescriptor> nodeList) {
// If the include list is empty, any nodes are welcomed and it does not
// make sense to exclude any nodes from the cluster. Therefore, no remove.
- if (hostsReader.getHosts().isEmpty()) {
+ if (!hostFileManager.hasIncludes()) {
return;
}
-
+
for (Iterator<DatanodeDescriptor> it = nodeList.iterator(); it.hasNext();) {
DatanodeDescriptor node = it.next();
- if ((!inHostsList(node)) && (!inExcludedHostsList(node))
+ if ((!hostFileManager.isIncluded(node)) && (!hostFileManager.isExcluded(node))
&& node.isDecommissioned()) {
// Include list is not empty, an existing datanode does not appear
// in both include or exclude lists and it has been decommissioned.
@@ -587,34 +612,11 @@ public class DatanodeManager {
}
/**
- * Check if the given DatanodeID is in the given (include or exclude) list.
- *
- * @param node the DatanodeID to check
- * @param hostsList the list of hosts in the include/exclude file
- * @param isExcludeList true if this is the exclude list
- * @return true if the node is in the list, false otherwise
- */
- private static boolean checkInList(final DatanodeID node,
- final Set<String> hostsList,
- final boolean isExcludeList) {
- // if include list is empty, host is in include list
- if ( (!isExcludeList) && (hostsList.isEmpty()) ){
- return true;
- }
- for (String name : getNodeNamesForHostFiltering(node)) {
- if (hostsList.contains(name)) {
- return true;
- }
- }
- return false;
- }
-
- /**
* Decommission the node if it is in exclude list.
*/
private void checkDecommissioning(DatanodeDescriptor nodeReg) {
// If the registered node is in exclude list, then decommission it
- if (inExcludedHostsList(nodeReg)) {
+ if (hostFileManager.isExcluded(nodeReg)) {
startDecommission(nodeReg);
}
}
@@ -710,7 +712,7 @@ public class DatanodeManager {
// Checks if the node is not on the hosts list. If it is not, then
// it will be disallowed from registering.
- if (!inHostsList(nodeReg)) {
+ if (!hostFileManager.isIncluded(nodeReg)) {
throw new DisallowedDatanodeException(nodeReg);
}
@@ -844,9 +846,8 @@ public class DatanodeManager {
if (conf == null) {
conf = new HdfsConfiguration();
}
- hostsReader.updateFileNames(conf.get(DFSConfigKeys.DFS_HOSTS, ""),
- conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
- hostsReader.refresh();
+ this.hostFileManager.refresh(conf.get(DFSConfigKeys.DFS_HOSTS, ""),
+ conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
}
/**
@@ -858,10 +859,10 @@ public class DatanodeManager {
private void refreshDatanodes() {
for(DatanodeDescriptor node : datanodeMap.values()) {
// Check if not include.
- if (!inHostsList(node)) {
+ if (!hostFileManager.isIncluded(node)) {
node.setDisallowed(true); // case 2.
} else {
- if (inExcludedHostsList(node)) {
+ if (hostFileManager.isExcluded(node)) {
startDecommission(node); // case 3.
} else {
stopDecommission(node); // case 4.
@@ -1076,25 +1077,10 @@ public class DatanodeManager {
boolean listDeadNodes = type == DatanodeReportType.ALL ||
type == DatanodeReportType.DEAD;
- HashMap<String, String> mustList = new HashMap<String, String>();
-
- if (listDeadNodes) {
- // Put all nodes referenced in the hosts files in the map
- Iterator<String> it = hostsReader.getHosts().iterator();
- while (it.hasNext()) {
- mustList.put(it.next(), "");
- }
- it = hostsReader.getExcludedHosts().iterator();
- while (it.hasNext()) {
- mustList.put(it.next(), "");
- }
- }
-
ArrayList<DatanodeDescriptor> nodes = null;
-
+ final MutableEntrySet foundNodes = new MutableEntrySet();
synchronized(datanodeMap) {
- nodes = new ArrayList<DatanodeDescriptor>(datanodeMap.size() +
- mustList.size());
+ nodes = new ArrayList<DatanodeDescriptor>(datanodeMap.size());
Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
while (it.hasNext()) {
DatanodeDescriptor dn = it.next();
@@ -1102,47 +1088,45 @@ public class DatanodeManager {
if ( (isDead && listDeadNodes) || (!isDead && listLiveNodes) ) {
nodes.add(dn);
}
- for (String name : getNodeNamesForHostFiltering(dn)) {
- mustList.remove(name);
- }
+ foundNodes.add(dn);
}
}
-
+
if (listDeadNodes) {
- Iterator<String> it = mustList.keySet().iterator();
- while (it.hasNext()) {
- // The remaining nodes are ones that are referenced by the hosts
- // files but that we do not know about, ie that we have never
- // head from. Eg. a host that is no longer part of the cluster
- // or a bogus entry was given in the hosts files
- DatanodeID dnId = parseDNFromHostsEntry(it.next());
- DatanodeDescriptor dn = new DatanodeDescriptor(dnId);
- dn.setLastUpdate(0); // Consider this node dead for reporting
- nodes.add(dn);
+ final EntrySet includedNodes = hostFileManager.getIncludes();
+ final EntrySet excludedNodes = hostFileManager.getExcludes();
+ for (Entry entry : includedNodes) {
+ if ((foundNodes.find(entry) == null) &&
+ (excludedNodes.find(entry) == null)) {
+ // The remaining nodes are ones that are referenced by the hosts
+ // files but that we do not know about, ie that we have never
+ // head from. Eg. an entry that is no longer part of the cluster
+ // or a bogus entry was given in the hosts files
+ //
+ // If the host file entry specified the xferPort, we use that.
+ // Otherwise, we guess that it is the default xfer port.
+ // We can't ask the DataNode what it had configured, because it's
+ // dead.
+ DatanodeDescriptor dn =
+ new DatanodeDescriptor(new DatanodeID(entry.getIpAddress(),
+ entry.getPrefix(), "",
+ entry.getPort() == 0 ? defaultXferPort : entry.getPort(),
+ defaultInfoPort, defaultIpcPort));
+ dn.setLastUpdate(0); // Consider this node dead for reporting
+ nodes.add(dn);
+ }
}
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getDatanodeListForReport with " +
+ "includedNodes = " + hostFileManager.getIncludes() +
+ ", excludedNodes = " + hostFileManager.getExcludes() +
+ ", foundNodes = " + foundNodes +
+ ", nodes = " + nodes);
+ }
return nodes;
}
- private static List<String> getNodeNamesForHostFiltering(DatanodeID node) {
- String ip = node.getIpAddr();
- String regHostName = node.getHostName();
- int xferPort = node.getXferPort();
-
- List<String> names = new ArrayList<String>();
- names.add(ip);
- names.add(ip + ":" + xferPort);
- names.add(regHostName);
- names.add(regHostName + ":" + xferPort);
-
- String peerHostName = node.getPeerHostName();
- if (peerHostName != null) {
- names.add(peerHostName);
- names.add(peerHostName + ":" + xferPort);
- }
- return names;
- }
-
/**
* Checks if name resolution was successful for the given address. If IP
* address and host name are the same, then it means name resolution has
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1489065&r1=1489064&r2=1489065&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Mon Jun 3 17:14:27 2013
@@ -936,7 +936,8 @@ public class DataNode extends Configured
MBeans.register("DataNode", "DataNodeInfo", this);
}
- int getXferPort() {
+ @VisibleForTesting
+ public int getXferPort() {
return streamingAddr.getPort();
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java?rev=1489065&r1=1489064&r2=1489065&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java Mon Jun 3 17:14:27 2013
@@ -222,6 +222,7 @@ public class TestDatanodeRegistration {
DatanodeRegistration mockDnReg = mock(DatanodeRegistration.class);
doReturn(HdfsConstants.LAYOUT_VERSION).when(mockDnReg).getVersion();
+ doReturn(123).when(mockDnReg).getXferPort();
doReturn("fake-storage-id").when(mockDnReg).getStorageID();
doReturn(mockStorageInfo).when(mockDnReg).getStorageInfo();
@@ -275,6 +276,7 @@ public class TestDatanodeRegistration {
// Should succeed when software versions are the same and CTimes are the
// same.
doReturn(VersionInfo.getVersion()).when(mockDnReg).getSoftwareVersion();
+ doReturn(123).when(mockDnReg).getXferPort();
rpcServer.registerDatanode(mockDnReg);
// Should succeed when software versions are the same and CTimes are
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java?rev=1489065&r1=1489064&r2=1489065&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java Mon Jun 3 17:14:27 2013
@@ -24,7 +24,9 @@ import static org.junit.Assert.assertTru
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.Map;
import java.util.Random;
import org.apache.commons.logging.Log;
@@ -41,9 +43,11 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.HostFileManager;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -75,6 +79,7 @@ public class TestDecommission {
Path dir = new Path(workingDir, System.getProperty("test.build.data", "target/test/data") + "/work-dir/decommission");
hostsFile = new Path(dir, "hosts");
excludeFile = new Path(dir, "exclude");
+ HostFileManager.dnsResolutionDisabledForTesting = false;
// Setup conf
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false);
@@ -96,6 +101,7 @@ public class TestDecommission {
if (cluster != null) {
cluster.shutdown();
}
+ HostFileManager.dnsResolutionDisabledForTesting = false;
}
private void writeConfigFile(Path name, ArrayList<String> nodes)
@@ -327,7 +333,7 @@ public class TestDecommission {
/**
* Tests decommission for non federated cluster
*/
- @Test
+ @Test(timeout=360000)
public void testDecommission() throws IOException {
testDecommission(1, 6);
}
@@ -335,7 +341,7 @@ public class TestDecommission {
/**
* Tests recommission for non federated cluster
*/
- @Test
+ @Test(timeout=360000)
public void testRecommission() throws IOException {
testRecommission(1, 6);
}
@@ -343,7 +349,7 @@ public class TestDecommission {
/**
* Test decommission for federeated cluster
*/
- @Test
+ @Test(timeout=360000)
public void testDecommissionFederation() throws IOException {
testDecommission(2, 2);
}
@@ -445,7 +451,7 @@ public class TestDecommission {
* Tests cluster storage statistics during decommissioning for non
* federated cluster
*/
- @Test
+ @Test(timeout=360000)
public void testClusterStats() throws Exception {
testClusterStats(1);
}
@@ -454,7 +460,7 @@ public class TestDecommission {
* Tests cluster storage statistics during decommissioning for
* federated cluster
*/
- @Test
+ @Test(timeout=360000)
public void testClusterStatsFederation() throws Exception {
testClusterStats(3);
}
@@ -491,7 +497,7 @@ public class TestDecommission {
* in the include file are allowed to connect to the namenode in a non
* federated cluster.
*/
- @Test
+ @Test(timeout=360000)
public void testHostsFile() throws IOException, InterruptedException {
// Test for a single namenode cluster
testHostsFile(1);
@@ -502,7 +508,7 @@ public class TestDecommission {
* in the include file are allowed to connect to the namenode in a
* federated cluster.
*/
- @Test
+ @Test(timeout=360000)
public void testHostsFileFederation() throws IOException, InterruptedException {
// Test for 3 namenode federated cluster
testHostsFile(3);
@@ -519,8 +525,8 @@ public class TestDecommission {
// Now empty hosts file and ensure the datanode is disallowed
// from talking to namenode, resulting in it's shutdown.
ArrayList<String>list = new ArrayList<String>();
- final String badHostname = "BOGUSHOST";
- list.add(badHostname);
+ final String bogusIp = "127.0.30.1";
+ list.add(bogusIp);
writeConfigFile(hostsFile, list);
for (int j = 0; j < numNameNodes; j++) {
@@ -544,7 +550,199 @@ public class TestDecommission {
assertEquals("There should be 2 dead nodes", 2, info.length);
DatanodeID id = cluster.getDataNodes().get(0).getDatanodeId();
assertEquals(id.getHostName(), info[0].getHostName());
- assertEquals(badHostname, info[1].getHostName());
+ assertEquals(bogusIp, info[1].getHostName());
+ }
+ }
+
+ @Test(timeout=360000)
+ public void testDuplicateHostsEntries() throws IOException,
+ InterruptedException {
+ Configuration hdfsConf = new Configuration(conf);
+ cluster = new MiniDFSCluster.Builder(hdfsConf)
+ .numDataNodes(1).setupHostsFile(true).build();
+ cluster.waitActive();
+ int dnPort = cluster.getDataNodes().get(0).getXferPort();
+
+ // pick some random ports that don't overlap with our DN's port
+ // or with each other.
+ Random random = new Random(System.currentTimeMillis());
+ int port1 = dnPort;
+ while (port1 == dnPort) {
+ port1 = random.nextInt(6000) + 1000;
+ }
+ int port2 = dnPort;
+ while ((port2 == dnPort) || (port2 == port1)) {
+ port2 = random.nextInt(6000) + 1000;
+ }
+
+ // Now empty hosts file and ensure the datanode is disallowed
+ // from talking to namenode, resulting in it's shutdown.
+ ArrayList<String> nodes = new ArrayList<String>();
+
+ // These entries will be de-duped by the NameNode, since they refer
+ // to the same IP address + port combo.
+ nodes.add("127.0.0.1:" + port1);
+ nodes.add("localhost:" + port1);
+ nodes.add("127.0.0.1:" + port1);
+
+ // The following entries should not be de-duped.
+ nodes.add("127.0.0.1:" + port2);
+ nodes.add("127.0.30.1:" + port1);
+ writeConfigFile(hostsFile, nodes);
+
+ refreshNodes(cluster.getNamesystem(0), hdfsConf);
+
+ DFSClient client = getDfsClient(cluster.getNameNode(0), hdfsConf);
+ DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
+ for (int i = 0 ; i < 5 && info.length != 0; i++) {
+ LOG.info("Waiting for datanode to be marked dead");
+ Thread.sleep(HEARTBEAT_INTERVAL * 1000);
+ info = client.datanodeReport(DatanodeReportType.LIVE);
+ }
+ assertEquals("Number of live nodes should be 0", 0, info.length);
+
+ // Test that non-live and bogus hostnames are considered "dead".
+ // The dead report should have an entry for (1) the DN that is
+ // now considered dead because it is no longer allowed to connect
+ // and (2) the bogus entries in the hosts file.
+ DatanodeInfo deadDns[] = client.datanodeReport(DatanodeReportType.DEAD);
+ HashMap<String, DatanodeInfo> deadByXferAddr =
+ new HashMap<String, DatanodeInfo>();
+ for (DatanodeInfo dn : deadDns) {
+ LOG.info("DEAD DatanodeInfo: xferAddr = " + dn.getXferAddr() +
+ ", ipAddr = " + dn.getIpAddr() +
+ ", hostname = " + dn.getHostName());
+ deadByXferAddr.put(dn.getXferAddr(), dn);
+ }
+ // The real DataNode should be included in the list.
+ String realDnIpPort = cluster.getDataNodes().get(0).
+ getXferAddress().getAddress().getHostAddress() + ":" +
+ cluster.getDataNodes().get(0).getXferPort();
+ Assert.assertNotNull("failed to find real datanode IP " + realDnIpPort,
+ deadByXferAddr.remove(realDnIpPort));
+ // The fake datanode with address 127.0.30.1 should be included in this list.
+ Assert.assertNotNull(deadByXferAddr.remove(
+ "127.0.30.1:" + port1));
+ // Now look for the two copies of 127.0.0.1 with port1 and port2.
+ Iterator<Map.Entry<String, DatanodeInfo>> iter =
+ deadByXferAddr.entrySet().iterator();
+ boolean foundPort1 = false, foundPort2 = false;
+ while (iter.hasNext()) {
+ Map.Entry<String, DatanodeInfo> entry = iter.next();
+ DatanodeInfo dn = entry.getValue();
+ if (dn.getXferPort() == port1) {
+ foundPort1 = true;
+ iter.remove();
+ } else if (dn.getXferPort() == port2) {
+ foundPort2 = true;
+ iter.remove();
+ }
+ }
+ Assert.assertTrue("did not find a dead entry with port " + port1,
+ foundPort1);
+ Assert.assertTrue("did not find a dead entry with port " + port2,
+ foundPort2);
+ Assert.assertTrue(deadByXferAddr.isEmpty());
+ }
+
+ @Test(timeout=360000)
+ public void testIncludeByRegistrationName() throws IOException,
+ InterruptedException {
+ Configuration hdfsConf = new Configuration(conf);
+ final String registrationName = "--registration-name--";
+ final String nonExistentDn = "127.0.0.40";
+ hdfsConf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, registrationName);
+ cluster = new MiniDFSCluster.Builder(hdfsConf)
+ .numDataNodes(1).checkDataNodeHostConfig(true)
+ .setupHostsFile(true).build();
+ cluster.waitActive();
+
+ // Set up an includes file that doesn't have our datanode.
+ ArrayList<String> nodes = new ArrayList<String>();
+ nodes.add(nonExistentDn);
+ writeConfigFile(hostsFile, nodes);
+ refreshNodes(cluster.getNamesystem(0), hdfsConf);
+
+ // Wait for the DN to be marked dead.
+ DFSClient client = getDfsClient(cluster.getNameNode(0), hdfsConf);
+ while (true) {
+ DatanodeInfo info[] = client.datanodeReport(DatanodeReportType.DEAD);
+ if (info.length == 1) {
+ break;
+ }
+ LOG.info("Waiting for datanode to be marked dead");
+ Thread.sleep(HEARTBEAT_INTERVAL * 1000);
+ }
+
+ // Use a non-empty include file with our registration name.
+ // It should work.
+ int dnPort = cluster.getDataNodes().get(0).getXferPort();
+ nodes = new ArrayList<String>();
+ nodes.add(registrationName + ":" + dnPort);
+ writeConfigFile(hostsFile, nodes);
+ refreshNodes(cluster.getNamesystem(0), hdfsConf);
+ cluster.restartDataNode(0);
+
+ // Wait for the DN to come back.
+ while (true) {
+ DatanodeInfo info[] = client.datanodeReport(DatanodeReportType.LIVE);
+ if (info.length == 1) {
+ Assert.assertFalse(info[0].isDecommissioned());
+ Assert.assertFalse(info[0].isDecommissionInProgress());
+ assertEquals(registrationName, info[0].getHostName());
+ break;
+ }
+ LOG.info("Waiting for datanode to come back");
+ Thread.sleep(HEARTBEAT_INTERVAL * 1000);
+ }
+ }
+
+ @Test(timeout=360000)
+ public void testBackgroundDnsResolution() throws IOException,
+ InterruptedException {
+ // Create cluster
+ Configuration hdfsConf = new Configuration(conf);
+ hdfsConf.setInt(
+ DFSConfigKeys.DFS_NAMENODE_HOSTS_DNS_RESOLUTION_INTERVAL_SECONDS, 1);
+ cluster = new MiniDFSCluster.Builder(hdfsConf)
+ .numDataNodes(1).checkDataNodeHostConfig(true)
+ .setupHostsFile(true).build();
+ cluster.waitActive();
+
+ // Set up an includes file with just 127.0.0.1
+ ArrayList<String> nodes = new ArrayList<String>();
+ String localhost = java.net.InetAddress.getLocalHost().getHostName();
+ nodes.add(localhost);
+ writeConfigFile(hostsFile, nodes);
+ HostFileManager.dnsResolutionDisabledForTesting = true;
+ refreshNodes(cluster.getNamesystem(0), hdfsConf);
+
+ // The DN will be marked dead because DNS resolution was turned off,
+ // and we are in the include file by hostname only.
+ DFSClient client = getDfsClient(cluster.getNameNode(0), hdfsConf);
+ while (true) {
+ DatanodeInfo info[] = client.datanodeReport(DatanodeReportType.DEAD);
+ if (info.length == 1) {
+ break;
+ }
+ LOG.info("Waiting for datanode to be marked dead");
+ Thread.sleep(HEARTBEAT_INTERVAL * 1000);
+ }
+
+ // Allow hostname resolution
+ HostFileManager.dnsResolutionDisabledForTesting = false;
+ cluster.restartDataNode(0);
+
+ // Wait for the DN to come back.
+ while (true) {
+ DatanodeInfo info[] = client.datanodeReport(DatanodeReportType.LIVE);
+ if (info.length == 1) {
+ Assert.assertFalse(info[0].isDecommissioned());
+ Assert.assertFalse(info[0].isDecommissionInProgress());
+ break;
+ }
+ LOG.info("Waiting for datanode to come back");
+ Thread.sleep(HEARTBEAT_INTERVAL * 1000);
}
}
}