You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by el...@apache.org on 2012/04/02 21:15:51 UTC
svn commit: r1308478 - in /hadoop/common/branches/branch-1: ./ src/hdfs/
src/hdfs/org/apache/hadoop/hdfs/ src/hdfs/org/apache/hadoop/hdfs/protocol/
src/hdfs/org/apache/hadoop/hdfs/server/datanode/
src/test/org/apache/hadoop/hdfs/ src/test/org/apache/ha...
Author: eli
Date: Mon Apr 2 19:15:51 2012
New Revision: 1308478
URL: http://svn.apache.org/viewvc?rev=1308478&view=rev
Log:
HDFS-3150. Add option for clients to contact DNs via hostname. Contributed by Eli Collins
Modified:
hadoop/common/branches/branch-1/CHANGES.txt
hadoop/common/branches/branch-1/src/hdfs/hdfs-default.xml
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/BlockReaderLocal.java
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/protocol/DatanodeID.java
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileCreation.java
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1308478&r1=1308477&r2=1308478&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Mon Apr 2 19:15:51 2012
@@ -14,6 +14,8 @@ Release 1.1.0 - unreleased
HADOOP-7806. Support binding to sub-interfaces. (harsh, eli via eli)
+ HDFS-3150. Add option for clients to contact DNs via hostname. (eli)
+
IMPROVEMENTS
MAPREDUCE-3597. [Rumen] Provide a way to access other info of history file
Modified: hadoop/common/branches/branch-1/src/hdfs/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/hdfs-default.xml?rev=1308478&r1=1308477&r2=1308478&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/hdfs-default.xml (original)
+++ hadoop/common/branches/branch-1/src/hdfs/hdfs-default.xml Mon Apr 2 19:15:51 2012
@@ -28,7 +28,7 @@ creations/deletions), or "all".</descrip
<name>dfs.datanode.address</name>
<value>0.0.0.0:50010</value>
<description>
- The address where the datanode server will listen to.
+ The datanode server address and port for data transfer.
If the port is 0 then the server will start on a free port.
</description>
</property>
@@ -434,4 +434,20 @@ creations/deletions), or "all".</descrip
</description>
</property>
+<property>
+ <name>dfs.client.use.datanode.hostname</name>
+ <value>false</value>
+ <description>Whether clients should use datanode hostnames when
+ connecting to datanodes.
+ </description>
+</property>
+
+<property>
+ <name>dfs.datanode.use.datanode.hostname</name>
+ <value>false</value>
+ <description>Whether datanodes should use datanode hostnames when
+ connecting to other datanodes for data transfer.
+ </description>
+</property>
+
</configuration>
Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/BlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1308478&r1=1308477&r2=1308478&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/BlockReaderLocal.java (original)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/BlockReaderLocal.java Mon Apr 2 19:15:51 2012
@@ -84,11 +84,11 @@ class BlockReaderLocal extends FSInputCh
}
private synchronized ClientDatanodeProtocol getDatanodeProxy(
- DatanodeInfo node, Configuration conf, int socketTimeout)
- throws IOException {
+ DatanodeInfo node, Configuration conf, int socketTimeout,
+ boolean connectToDnViaHostname) throws IOException {
if (proxy == null) {
proxy = DFSClient.createClientDatanodeProtocolProxy(node, conf,
- socketTimeout);
+ socketTimeout, connectToDnViaHostname);
}
return proxy;
}
@@ -135,13 +135,14 @@ class BlockReaderLocal extends FSInputCh
*/
static BlockReaderLocal newBlockReader(Configuration conf,
String file, Block blk, Token<BlockTokenIdentifier> token, DatanodeInfo node,
- int socketTimeout, long startOffset, long length) throws IOException {
+ int socketTimeout, long startOffset, long length, boolean connectToDnViaHostname)
+ throws IOException {
LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort());
// check the cache first
BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
if (pathinfo == null) {
- pathinfo = getBlockPathInfo(blk, node, conf, socketTimeout, token);
+ pathinfo = getBlockPathInfo(blk, node, conf, socketTimeout, token, connectToDnViaHostname);
}
// check to see if the file exists. It may so happen that the
@@ -216,11 +217,12 @@ class BlockReaderLocal extends FSInputCh
private static BlockLocalPathInfo getBlockPathInfo(Block blk,
DatanodeInfo node, Configuration conf, int timeout,
- Token<BlockTokenIdentifier> token) throws IOException {
+ Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname)
+ throws IOException {
LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.ipcPort);
BlockLocalPathInfo pathinfo = null;
ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(node,
- conf, timeout);
+ conf, timeout, connectToDnViaHostname);
try {
// make RPC to local datanode to find local pathnames of blocks
pathinfo = proxy.getBlockLocalPathInfo(blk, token);
Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1308478&r1=1308477&r2=1308478&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Mon Apr 2 19:15:51 2012
@@ -94,6 +94,7 @@ public class DFSClient implements FSCons
private final FileSystem.Statistics stats;
private int maxBlockAcquireFailures;
private boolean shortCircuitLocalReads;
+ private boolean connectToDnViaHostname;
/**
* We assume we're talking to another CDH server, which supports
@@ -147,10 +148,12 @@ public class DFSClient implements FSCons
/** Create {@link ClientDatanodeProtocol} proxy with block/token */
static ClientDatanodeProtocol createClientDatanodeProtocolProxy (
- DatanodeID datanodeid, Configuration conf,
- Block block, Token<BlockTokenIdentifier> token, int socketTimeout) throws IOException {
- InetSocketAddress addr = NetUtils.makeSocketAddr(
- datanodeid.getHost(), datanodeid.getIpcPort());
+ DatanodeInfo di, Configuration conf,
+ Block block, Token<BlockTokenIdentifier> token, int socketTimeout,
+ boolean connectToDnViaHostname) throws IOException {
+ final String dnName = di.getNameWithIpcPort(connectToDnViaHostname);
+ LOG.debug("Connecting to " + dnName);
+ InetSocketAddress addr = NetUtils.createSocketAddr(dnName);
if (ClientDatanodeProtocol.LOG.isDebugEnabled()) {
ClientDatanodeProtocol.LOG.info("ClientDatanodeProtocol addr=" + addr);
}
@@ -164,10 +167,11 @@ public class DFSClient implements FSCons
/** Create {@link ClientDatanodeProtocol} proxy using kerberos ticket */
static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
- DatanodeID datanodeid, Configuration conf, int socketTimeout)
- throws IOException {
- InetSocketAddress addr = NetUtils.createSocketAddr(
- datanodeid.getHost() + ":" + datanodeid.getIpcPort());
+ DatanodeInfo di, Configuration conf, int socketTimeout,
+ boolean connectToDnViaHostname) throws IOException {
+ final String dnName = di.getNameWithIpcPort(connectToDnViaHostname);
+ LOG.debug("Connecting to " + dnName);
+ InetSocketAddress addr = NetUtils.createSocketAddr(dnName);
if (ClientDatanodeProtocol.LOG.isDebugEnabled()) {
ClientDatanodeProtocol.LOG.info("ClientDatanodeProtocol addr=" + addr);
}
@@ -252,6 +256,12 @@ public class DFSClient implements FSCons
if (LOG.isDebugEnabled()) {
LOG.debug("Short circuit read is " + shortCircuitLocalReads);
}
+ this.connectToDnViaHostname = conf.getBoolean(
+ DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME,
+ DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Connect to datanode via hostname is " + connectToDnViaHostname);
+ }
}
static int getMaxBlockAcquireFailures(Configuration conf) {
@@ -350,14 +360,14 @@ public class DFSClient implements FSCons
/**
* Get {@link BlockReader} for short circuited local reads.
*/
- private static BlockReader getLocalBlockReader(Configuration conf,
+ private BlockReader getLocalBlockReader(Configuration conf,
String src, Block blk, Token<BlockTokenIdentifier> accessToken,
DatanodeInfo chosenNode, int socketTimeout, long offsetIntoBlock)
throws InvalidToken, IOException {
try {
return BlockReaderLocal.newBlockReader(conf, src, blk, accessToken,
chosenNode, socketTimeout, offsetIntoBlock, blk.getNumBytes()
- - offsetIntoBlock);
+ - offsetIntoBlock, connectToDnViaHostname);
} catch (RemoteException re) {
throw re.unwrapRemoteException(InvalidToken.class,
AccessControlException.class);
@@ -888,7 +898,7 @@ public class DFSClient implements FSCons
*/
public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException {
checkOpen();
- return getFileChecksum(src, namenode, socketFactory, socketTimeout);
+ return getFileChecksum(src, namenode, socketFactory, socketTimeout, connectToDnViaHostname);
}
/**
@@ -899,6 +909,12 @@ public class DFSClient implements FSCons
public static MD5MD5CRC32FileChecksum getFileChecksum(String src,
ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout
) throws IOException {
+ return getFileChecksum(src, namenode, socketFactory, socketTimeout, false);
+ }
+
+ private static MD5MD5CRC32FileChecksum getFileChecksum(String src,
+ ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout,
+ boolean connectToDnViaHostname) throws IOException {
//get all block locations
LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE);
if (null == blockLocations) {
@@ -933,8 +949,10 @@ public class DFSClient implements FSCons
for(int j = 0; !done && j < datanodes.length; j++) {
//connect to a datanode
final Socket sock = socketFactory.createSocket();
+ final String dnName = datanodes[j].getName(connectToDnViaHostname);
+ LOG.debug("Connecting to " + dnName);
NetUtils.connect(sock,
- NetUtils.createSocketAddr(datanodes[j].getName()),
+ NetUtils.createSocketAddr(dnName),
timeout);
sock.setSoTimeout(timeout);
@@ -946,7 +964,7 @@ public class DFSClient implements FSCons
// get block MD5
try {
if (LOG.isDebugEnabled()) {
- LOG.debug("write to " + datanodes[j].getName() + ": "
+ LOG.debug("write to " + dnName + ": "
+ DataTransferProtocol.OP_BLOCK_CHECKSUM +
", block=" + block);
}
@@ -964,7 +982,7 @@ public class DFSClient implements FSCons
if (LOG.isDebugEnabled()) {
LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
+ "for file " + src + " for block " + block
- + " from datanode " + datanodes[j].getName()
+ + " from datanode " + dnName
+ ". Will retry the block once.");
}
lastRetriedIndex = i;
@@ -974,7 +992,7 @@ public class DFSClient implements FSCons
break;
} else {
throw new IOException("Bad response " + reply + " for block "
- + block + " from datanode " + datanodes[j].getName());
+ + block + " from datanode " + dnName);
}
}
@@ -1005,12 +1023,10 @@ public class DFSClient implements FSCons
LOG.debug("set bytesPerCRC=" + bytesPerCRC
+ ", crcPerBlock=" + crcPerBlock);
}
- LOG.debug("got reply from " + datanodes[j].getName()
- + ": md5=" + md5);
+ LOG.debug("got reply from " + dnName + ": md5=" + md5);
}
} catch (IOException ie) {
- LOG.warn("src=" + src + ", datanodes[" + j + "].getName()="
- + datanodes[j].getName(), ie);
+ LOG.warn("src=" + src + ", datanodes[" + j + "]=" + dnName, ie);
} finally {
IOUtils.closeStream(in);
IOUtils.closeStream(out);
@@ -1396,7 +1412,7 @@ public class DFSClient implements FSCons
}
}
- /** Utility class to encapsulate data node info and its ip address. */
+ /** Utility class to encapsulate data node info and its address. */
private static class DNAddrPair {
DatanodeInfo info;
InetSocketAddress addr;
@@ -1880,7 +1896,8 @@ public class DFSClient implements FSCons
DatanodeInfo primaryNode = last.getLocations()[0];
try {
primary = createClientDatanodeProtocolProxy(primaryNode, conf,
- last.getBlock(), last.getBlockToken(), socketTimeout);
+ last.getBlock(), last.getBlockToken(), socketTimeout,
+ connectToDnViaHostname);
Block newBlock = primary.getBlockInfo(last.getBlock());
long newBlockSize = newBlock.getNumBytes();
long delta = newBlockSize - last.getBlockSize();
@@ -2093,6 +2110,7 @@ public class DFSClient implements FSCons
try {
s = socketFactory.createSocket();
+ LOG.debug("Connecting to " + targetAddr);
NetUtils.connect(s, targetAddr, socketTimeout);
s.setSoTimeout(socketTimeout);
blockReader = RemoteBlockReader.newBlockReader(s, src, blk.getBlockId(),
@@ -2260,8 +2278,8 @@ public class DFSClient implements FSCons
DatanodeInfo[] nodes = block.getLocations();
try {
DatanodeInfo chosenNode = bestNode(nodes, deadNodes);
- InetSocketAddress targetAddr =
- NetUtils.createSocketAddr(chosenNode.getName());
+ InetSocketAddress targetAddr =
+ NetUtils.createSocketAddr(chosenNode.getName(connectToDnViaHostname));
return new DNAddrPair(chosenNode, targetAddr);
} catch (IOException ie) {
String blockInfo = block.getBlock() + " file=" + src;
@@ -2324,6 +2342,7 @@ public class DFSClient implements FSCons
} else {
// go to the datanode
dn = socketFactory.createSocket();
+ LOG.debug("Connecting to " + targetAddr);
NetUtils.connect(dn, targetAddr, socketTimeout);
dn.setSoTimeout(socketTimeout);
reader = RemoteBlockReader.newBlockReader(dn, src,
@@ -3122,7 +3141,7 @@ public class DFSClient implements FSCons
// to each DN and two rpcs to the NN.
int recoveryTimeout = (newnodes.length * 2 + 2) * socketTimeout;
primary = createClientDatanodeProtocolProxy(primaryNode, conf, block,
- accessToken, recoveryTimeout);
+ accessToken, recoveryTimeout, connectToDnViaHostname);
newBlock = primary.recoverBlock(block, isAppend, newnodes);
} catch (IOException e) {
LOG.warn("Failed recovery attempt #" + recoveryErrorCount +
@@ -3423,10 +3442,11 @@ public class DFSClient implements FSCons
boolean result = false;
try {
- LOG.debug("Connecting to " + nodes[0].getName());
- InetSocketAddress target = NetUtils.createSocketAddr(nodes[0].getName());
+ final String dnName = nodes[0].getName(connectToDnViaHostname);
+ InetSocketAddress target = NetUtils.createSocketAddr(dnName);
s = socketFactory.createSocket();
timeoutValue = 3000 * nodes.length + socketTimeout;
+ LOG.debug("Connecting to " + dnName);
NetUtils.connect(s, target, timeoutValue);
s.setSoTimeout(timeoutValue);
s.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1308478&r1=1308477&r2=1308478&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java Mon Apr 2 19:15:51 2012
@@ -38,7 +38,9 @@ public class DFSConfigKeys extends Commo
public static final int DFS_BYTES_PER_CHECKSUM_DEFAULT = 512;
public static final String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size";
public static final int DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
-
+ public static final String DFS_CLIENT_USE_DN_HOSTNAME = "dfs.client.use.datanode.hostname";
+ public static final boolean DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT = false;
+
public static final String DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address";
public static final String DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT = "localhost:50100";
public static final String DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY = "dfs.namenode.backup.http-address";
@@ -96,6 +98,8 @@ public class DFSConfigKeys extends Commo
public static final boolean DFS_NAMENODE_NAME_DIR_RESTORE_DEFAULT = false;
public static final String DFS_LIST_LIMIT = "dfs.ls.limit";
public static final int DFS_LIST_LIMIT_DEFAULT = 1000;
+ public static final String DFS_DATANODE_USE_DN_HOSTNAME = "dfs.datanode.use.datanode.hostname";
+ public static final boolean DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT = false;
//Delegation token related keys
public static final String DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_KEY = "dfs.namenode.delegation.key.update-interval";
Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/protocol/DatanodeID.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/protocol/DatanodeID.java?rev=1308478&r1=1308477&r2=1308478&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/protocol/DatanodeID.java (original)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/protocol/DatanodeID.java Mon Apr 2 19:15:51 2012
@@ -34,10 +34,10 @@ import org.apache.hadoop.io.WritableComp
public class DatanodeID implements WritableComparable<DatanodeID> {
public static final DatanodeID[] EMPTY_ARRAY = {};
- public String name; /// hostname:portNumber
- public String storageID; /// unique per cluster storageID
- protected int infoPort; /// the port where the infoserver is running
- public int ipcPort; /// the port where the ipc server is running
+ public String name; // hostname:port (data transfer port)
+ public String storageID; // unique per cluster storageID
+ protected int infoPort; // info server port
+ public int ipcPort; // ipc server port
/** Equivalent to DatanodeID(""). */
public DatanodeID() {this("");}
Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java?rev=1308478&r1=1308477&r2=1308478&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java (original)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java Mon Apr 2 19:15:51 2012
@@ -181,7 +181,20 @@ public class DatanodeInfo extends Datano
public void setHostName(String host) {
hostName = host;
}
-
+
+ /** Return hostname:port if requested, ip:port otherwise */
+ public String getName(boolean useHostname) {
+ return useHostname ? getHostName() + ":" + getPort() : getName();
+ }
+
+ /** Return hostname:ipcPort if requested, ip:ipcPort otherwise */
+ public String getNameWithIpcPort(boolean useHostname) {
+ // NB: DatanodeID#getHost returns the IP, ie the name without
+ // the port, not the hostname as the name implies
+ return useHostname ? getHostName() + ":" + getIpcPort()
+ : getHost() + ":" + getIpcPort();
+ }
+
/** A formatted string for reporting the status of the DataNode. */
public String getDatanodeReport() {
StringBuffer buffer = new StringBuffer();
Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1308478&r1=1308477&r2=1308478&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Mon Apr 2 19:15:51 2012
@@ -232,6 +232,7 @@ public class DataNode extends Configured
BlockTokenSecretManager blockTokenSecretManager;
boolean isBlockTokenInitialized = false;
final String userWithLocalPathAccess;
+ private boolean connectToDnViaHostname;
/**
* Testing hook that allows tests to delay the sending of blockReceived RPCs
@@ -411,7 +412,7 @@ public class DataNode extends Configured
selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
tmpPort);
this.dnRegistration.setName(machineName + ":" + tmpPort);
- LOG.info("Opened info server at " + tmpPort);
+ LOG.info("Opened data transfer server at " + tmpPort);
this.threadGroup = new ThreadGroup("dataXceiverServer");
this.dataXceiverServer = new Daemon(threadGroup,
@@ -444,6 +445,11 @@ public class DataNode extends Configured
reason + ".");
}
+ this.connectToDnViaHostname = conf.getBoolean(
+ DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
+ DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
+ LOG.debug("Connect to datanode via hostname is " + connectToDnViaHostname);
+
//create a servlet to serve full-file content
InetSocketAddress infoSocAddr = DataNode.getInfoAddr(conf);
String infoHost = infoSocAddr.getHostName();
@@ -579,9 +585,10 @@ public class DataNode extends Configured
}
public static InterDatanodeProtocol createInterDataNodeProtocolProxy(
- DatanodeID datanodeid, final Configuration conf, final int socketTimeout) throws IOException {
- final InetSocketAddress addr = NetUtils.createSocketAddr(
- datanodeid.getHost() + ":" + datanodeid.getIpcPort());
+ DatanodeInfo info, final Configuration conf, final int socketTimeout,
+ boolean connectToDnViaHostname) throws IOException {
+ final String dnName = info.getNameWithIpcPort(connectToDnViaHostname);
+ final InetSocketAddress addr = NetUtils.createSocketAddr(dnName);
if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
InterDatanodeProtocol.LOG.info("InterDatanodeProtocol addr=" + addr);
}
@@ -1378,9 +1385,10 @@ public class DataNode extends Configured
BlockSender blockSender = null;
try {
- InetSocketAddress curTarget =
- NetUtils.createSocketAddr(targets[0].getName());
+ final String dnName = targets[0].getName(connectToDnViaHostname);
+ InetSocketAddress curTarget = NetUtils.createSocketAddr(dnName);
sock = newSocket();
+ LOG.debug("Connecting to " + dnName);
NetUtils.connect(sock, curTarget, socketTimeout);
sock.setSoTimeout(targets.length * socketTimeout);
@@ -1870,7 +1878,6 @@ public class DataNode extends Configured
private LocatedBlock recoverBlock(Block block, boolean keepLength,
DatanodeInfo[] targets, boolean closeFile) throws IOException {
- DatanodeID[] datanodeids = (DatanodeID[])targets;
// If the block is already being recovered, then skip recovering it.
// This can happen if the namenode and client start recovering the same
// file at the same time.
@@ -1896,10 +1903,11 @@ public class DataNode extends Configured
int rwrCount = 0;
List<BlockRecord> blockRecords = new ArrayList<BlockRecord>();
- for(DatanodeID id : datanodeids) {
+ for (DatanodeInfo id : targets) {
try {
- InterDatanodeProtocol datanode = dnRegistration.equals(id)?
- this: DataNode.createInterDataNodeProtocolProxy(id, getConf(), socketTimeout);
+ InterDatanodeProtocol datanode = dnRegistration.equals(id) ? this
+ : DataNode.createInterDataNodeProtocolProxy(
+ id, getConf(), socketTimeout, connectToDnViaHostname);
BlockRecoveryInfo info = datanode.startBlockRecovery(block);
if (info == null) {
LOG.info("No block metadata found for block " + block + " on datanode "
@@ -1957,7 +1965,7 @@ public class DataNode extends Configured
if (syncList.isEmpty() && errorCount > 0) {
throw new IOException("All datanodes failed: block=" + block
- + ", datanodeids=" + Arrays.asList(datanodeids));
+ + ", datanodeids=" + Arrays.asList(targets));
}
if (!keepLength) {
block.setNumBytes(minlength);
Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1308478&r1=1308477&r2=1308478&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Mon Apr 2 19:15:51 2012
@@ -28,6 +28,7 @@ import java.net.Socket;
import java.net.SocketException;
import org.apache.commons.logging.Log;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -58,6 +59,7 @@ class DataXceiver implements Runnable, F
final String localAddress; // local address of this daemon
DataNode datanode;
DataXceiverServer dataXceiverServer;
+ private boolean connectToDnViaHostname;
public DataXceiver(Socket s, DataNode datanode,
DataXceiverServer dataXceiverServer) {
@@ -69,6 +71,9 @@ class DataXceiver implements Runnable, F
remoteAddress = s.getRemoteSocketAddress().toString();
localAddress = s.getLocalSocketAddress().toString();
LOG.debug("Number of active connections is: " + datanode.getXceiverCount());
+ this.connectToDnViaHostname = datanode.getConf().getBoolean(
+ DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
+ DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
}
/**
@@ -308,14 +313,17 @@ class DataXceiver implements Runnable, F
if (targets.length > 0) {
InetSocketAddress mirrorTarget = null;
// Connect to backup machine
+ final String mirrorAddrString =
+ targets[0].getName(connectToDnViaHostname);
mirrorNode = targets[0].getName();
- mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
+ mirrorTarget = NetUtils.createSocketAddr(mirrorAddrString);
mirrorSock = datanode.newSocket();
try {
int timeoutValue = datanode.socketTimeout +
(HdfsConstants.READ_TIMEOUT_EXTENSION * numTargets);
int writeTimeout = datanode.socketWriteTimeout +
(HdfsConstants.WRITE_TIMEOUT_EXTENSION * numTargets);
+ LOG.debug("Connecting to " + mirrorAddrString);
NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
mirrorSock.setSoTimeout(timeoutValue);
mirrorSock.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
@@ -608,9 +616,11 @@ class DataXceiver implements Runnable, F
try {
// get the output stream to the proxy
- InetSocketAddress proxyAddr = NetUtils.createSocketAddr(
- proxySource.getName());
+ final String proxyAddrString =
+ proxySource.getName(connectToDnViaHostname);
+ InetSocketAddress proxyAddr = NetUtils.createSocketAddr(proxyAddrString);
proxySock = datanode.newSocket();
+ LOG.debug("Connecting to " + proxyAddrString);
NetUtils.connect(proxySock, proxyAddr, datanode.socketTimeout);
proxySock.setSoTimeout(datanode.socketTimeout);
Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=1308478&r1=1308477&r2=1308478&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java Mon Apr 2 19:15:51 2012
@@ -466,7 +466,7 @@ public class TestDFSClientRetries extend
try {
proxy = DFSClient.createClientDatanodeProtocolProxy(dnInfo, conf,
- fakeBlock.getBlock(), fakeBlock.getBlockToken(), 500);
+ fakeBlock.getBlock(), fakeBlock.getBlockToken(), 500, false);
fail ("Did not get expected exception: SocketTimeoutException");
} catch (SocketTimeoutException e) {
Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileCreation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileCreation.java?rev=1308478&r1=1308477&r2=1308478&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileCreation.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileCreation.java Mon Apr 2 19:15:51 2012
@@ -170,11 +170,30 @@ public class TestFileCreation extends ju
stm.close();
}
+ public void testFileCreation() throws IOException {
+ checkFileCreation(false);
+ }
+
+ public void testFileCreationByHostname() throws IOException {
+ checkFileCreation(true);
+ }
+
/**
* Test that file data becomes available before file is closed.
+ * @param useDnHostname if clients should access DNs by hostname (vs IP)
*/
- public void testFileCreation() throws IOException {
+ public void checkFileCreation(boolean useDnHostname) throws IOException {
Configuration conf = new Configuration();
+
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME, useDnHostname);
+ if (useDnHostname) {
+ // Since the mini cluster only listens on the loopback we have to
+ // ensure the hostname used to access DNs maps to the loopback. We
+ // do this by telling the DN to advertise localhost as its hostname
+ // instead of the default hostname.
+ conf.set("slave.host.name", "localhost");
+ }
+
if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
}
Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java?rev=1308478&r1=1308477&r2=1308478&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java Mon Apr 2 19:15:51 2012
@@ -208,7 +208,7 @@ public class TestShortCircuitLocalRead {
@Override
public ClientDatanodeProtocol run() throws Exception {
return DFSClient.createClientDatanodeProtocolProxy(
- dnInfo, conf, 60000);
+ dnInfo, conf, 60000, false);
}
});
@@ -226,7 +226,7 @@ public class TestShortCircuitLocalRead {
@Override
public ClientDatanodeProtocol run() throws Exception {
return DFSClient.createClientDatanodeProtocolProxy(
- dnInfo, conf, 60000);
+ dnInfo, conf, 60000, false);
}
});
try {
Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java?rev=1308478&r1=1308477&r2=1308478&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java Mon Apr 2 19:15:51 2012
@@ -36,6 +36,7 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -120,14 +121,34 @@ public class TestInterDatanodeProtocol e
return blocks.get(blocks.size() - 1);
}
+ /** Test block MD access via a DN */
+ public void testBlockMetaDataInfo() throws Exception {
+ checkBlockMetaDataInfo(false);
+ }
+
+ /** The same as above, but use hostnames for DN<->DN communication */
+ public void testBlockMetaDataInfoWithHostname() throws Exception {
+ checkBlockMetaDataInfo(true);
+ }
+
/**
* The following test first creates a file.
* It verifies the block information from a datanode.
- * Then, it updates the block with new information and verifies again.
+ * Then, it updates the block with new information and verifies again.
+ * @param useDnHostname if DNs should access DNs by hostname (vs IP)
*/
- public void testBlockMetaDataInfo() throws Exception {
+ private void checkBlockMetaDataInfo(boolean useDnHostname) throws Exception {
MiniDFSCluster cluster = null;
+ conf.setBoolean(DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME, useDnHostname);
+ if (useDnHostname) {
+ // Since the mini cluster only listens on the loopback we have to
+ // ensure the hostname used to access DNs maps to the loopback. We
+ // do this by telling the DN to advertise localhost as its hostname
+ // instead of the default hostname.
+ conf.set("slave.host.name", "localhost");
+ }
+
try {
cluster = new MiniDFSCluster(conf, 3, true, null);
cluster.waitActive();
@@ -148,7 +169,7 @@ public class TestInterDatanodeProtocol e
DataNode datanode = cluster.getDataNode(datanodeinfo[0].getIpcPort());
assertTrue(datanode != null);
InterDatanodeProtocol idp = DataNode.createInterDataNodeProtocolProxy(
- datanodeinfo[0], conf, datanode.socketTimeout);
+ datanodeinfo[0], conf, datanode.socketTimeout, useDnHostname);
//stop block scanner, so we could compare lastScanTime
datanode.blockScannerThread.interrupt();
@@ -184,7 +205,7 @@ public class TestInterDatanodeProtocol e
try {
proxy = DataNode.createInterDataNodeProtocolProxy(
- dInfo, conf, 500);
+ dInfo, conf, 500, false);
fail ("Expected SocketTimeoutException exception, but did not get.");
} catch (SocketTimeoutException e) {
DataNode.LOG.info("Got expected Exception: SocketTimeoutException");