You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2013/01/09 03:39:16 UTC
svn commit: r1430662 - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/net/
src/main/java/org/apache/hadoop/hdfs/server/common/
src/main/java/org/apache/hadoop/hdfs/...
Author: szetszwo
Date: Wed Jan 9 02:39:15 2013
New Revision: 1430662
URL: http://svn.apache.org/viewvc?rev=1430662&view=rev
Log:
svn merge -c -1430507 . for reverting HDFS-4353. Encapsulate connections to peers in Peer and PeerServer classes
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java
- copied unchanged from r1430506, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSocketCache.java
- copied unchanged from r1430506, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSocketCache.java
Removed:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1430662&r1=1430661&r2=1430662&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Jan 9 02:39:15 2013
@@ -180,9 +180,6 @@ Trunk (Unreleased)
HDFS-4352. Encapsulate arguments to BlockReaderFactory in a class
(Colin Patrick McCabe via todd)
- HDFS-4353. Encapsulate connections to peers in Peer and PeerServer classes
- (Colin Patrick McCabe via todd)
-
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1430662&r1=1430661&r2=1430662&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java Wed Jan 9 02:39:15 2013
@@ -18,8 +18,10 @@
package org.apache.hadoop.hdfs;
import java.io.IOException;
+import java.net.Socket;
import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
/**
* A BlockReader is responsible for reading a single block
@@ -41,18 +43,7 @@ public interface BlockReader extends Byt
*/
long skip(long n) throws IOException;
- /**
- * Close the block reader.
- *
- * @param peerCache The PeerCache to put the Peer we're using back
- * into, or null if we should simply close the Peer
- * we're using (along with its Socket).
- * Some block readers, like BlockReaderLocal, may
- * not make use of this parameter.
- *
- * @throws IOException
- */
- void close(PeerCache peerCache) throws IOException;
+ void close() throws IOException;
/**
* Read exactly the given amount of data, throwing an exception
@@ -69,4 +60,20 @@ public interface BlockReader extends Byt
* filled or the next call will return EOF.
*/
int readAll(byte[] buf, int offset, int len) throws IOException;
+
+ /**
+ * Take the socket used to talk to the DN.
+ */
+ Socket takeSocket();
+
+ /**
+ * Whether the BlockReader has reached the end of its input stream
+ * and successfully sent a status code back to the datanode.
+ */
+ boolean hasSentStatusCode();
+
+ /**
+ * @return a reference to the streams this block reader is using.
+ */
+ IOStreamPair getStreams();
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java?rev=1430662&r1=1430661&r2=1430662&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java Wed Jan 9 02:39:15 2013
@@ -19,18 +19,19 @@ package org.apache.hadoop.hdfs;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.Socket;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSClient.Conf;
-import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
-import com.google.common.base.Preconditions;
-
/**
* Utility class to create BlockReader implementations.
@@ -46,73 +47,18 @@ public class BlockReaderFactory {
@InterfaceAudience.Private
public static class Params {
private final Conf conf;
- /**
- * The peer that this BlockReader will be connected to.
- * You must set this.
- */
- private Peer peer = null;
-
- /**
- * The file name that this BlockReader pertains to.
- * This is optional and only used for display and logging purposes.
- */
+ private Socket socket = null;
private String file = null;
-
- /**
- * The block that this BlockReader is reading.
- * You must set this.
- */
private ExtendedBlock block = null;
-
- /**
- * The BlockTokenIdentifier to use, or null to use none.
- */
private Token<BlockTokenIdentifier> blockToken = null;
-
- /**
- * The offset in the block to start reading at.
- */
private long startOffset = 0;
-
- /**
- * The total number of bytes we might want to read, or -1 to assume no
- * limit.
- */
private long len = -1;
-
- /**
- * The buffer size to use.
- *
- * If this is not set, we will use the default from the Conf.
- */
private int bufferSize;
-
- /**
- * Whether or not we should verify the checksum.
- *
- * This is used instead of conf.verifyChecksum, because there are some
- * cases when we may want to explicitly turn off checksum verification,
- * such as when the caller has explicitly asked for a file to be opened
- * without checksum verification.
- */
private boolean verifyChecksum = true;
-
- /**
- * Whether or not we should try to use short circuit local reads.
- */
private boolean shortCircuitLocalReads = false;
-
- /**
- * The name of the client using this BlockReader, for logging and
- * debugging purposes.
- */
private String clientName = "";
-
- /**
- * The DataNode on which this Block resides.
- * You must set this.
- */
- private DatanodeID datanodeID = null;
+ private DataEncryptionKey encryptionKey = null;
+ private IOStreamPair ioStreamPair = null;
public Params(Conf conf) {
this.conf = conf;
@@ -121,11 +67,11 @@ public class BlockReaderFactory {
public Conf getConf() {
return conf;
}
- public Peer getPeer() {
- return peer;
+ public Socket getSocket() {
+ return socket;
}
- public Params setPeer(Peer peer) {
- this.peer = peer;
+ public Params setSocket(Socket socket) {
+ this.socket = socket;
return this;
}
public String getFile() {
@@ -191,12 +137,19 @@ public class BlockReaderFactory {
this.clientName = clientName;
return this;
}
- public Params setDatanodeID(DatanodeID datanodeID) {
- this.datanodeID = datanodeID;
+ public Params setEncryptionKey(DataEncryptionKey encryptionKey) {
+ this.encryptionKey = encryptionKey;
return this;
}
- public DatanodeID getDatanodeID() {
- return datanodeID;
+ public DataEncryptionKey getEncryptionKey() {
+ return encryptionKey;
+ }
+ public IOStreamPair getIoStreamPair() {
+ return ioStreamPair;
+ }
+ public Params setIoStreamPair(IOStreamPair ioStreamPair) {
+ this.ioStreamPair = ioStreamPair;
+ return this;
}
}
@@ -211,27 +164,24 @@ public class BlockReaderFactory {
*/
@SuppressWarnings("deprecation")
public static BlockReader newBlockReader(Params params) throws IOException {
- Preconditions.checkNotNull(params.getPeer());
- Preconditions.checkNotNull(params.getBlock());
- Preconditions.checkNotNull(params.getDatanodeID());
- // First, let's set the read and write timeouts appropriately.
- // This will keep us from blocking forever if something goes wrong during
- // network communication.
- Peer peer = params.getPeer();
- peer.setReadTimeout(params.getConf().socketTimeout);
- peer.setWriteTimeout(HdfsServerConstants.WRITE_TIMEOUT);
-
if (params.getConf().useLegacyBlockReader) {
- // The legacy BlockReader doesn't require that the Peers it uses
- // have associated ReadableByteChannels. This makes it easier to use
- // with some older Socket classes like, say, SocksSocketImpl.
- //
- // TODO: create a wrapper class that makes channel-less sockets look like
- // they have a channel, so that we can finally remove the legacy
- // RemoteBlockReader. See HDFS-2534.
+ if (params.getEncryptionKey() != null) {
+ throw new RuntimeException("Encryption is not supported with the legacy block reader.");
+ }
return RemoteBlockReader.newBlockReader(params);
} else {
- // The usual block reader.
+ Socket sock = params.getSocket();
+ if (params.getIoStreamPair() == null) {
+ params.setIoStreamPair(new IOStreamPair(NetUtils.getInputStream(sock),
+ NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT)));
+ if (params.getEncryptionKey() != null) {
+ IOStreamPair encryptedStreams =
+ DataTransferEncryptor.getEncryptedStreams(
+ params.getIoStreamPair().out, params.getIoStreamPair().in,
+ params.getEncryptionKey());
+ params.setIoStreamPair(encryptedStreams);
+ }
+ }
return RemoteBlockReader2.newBlockReader(params);
}
}
@@ -247,4 +197,4 @@ public class BlockReaderFactory {
final String poolId, final long blockId) {
return s.toString() + ":" + poolId + ":" + blockId;
}
-}
\ No newline at end of file
+}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1430662&r1=1430661&r2=1430662&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java Wed Jan 9 02:39:15 2013
@@ -649,7 +649,7 @@ class BlockReaderLocal implements BlockR
}
@Override
- public synchronized void close(PeerCache peerCache) throws IOException {
+ public synchronized void close() throws IOException {
dataIn.close();
if (checksumIn != null) {
checksumIn.close();
@@ -675,4 +675,19 @@ class BlockReaderLocal implements BlockR
public void readFully(byte[] buf, int off, int len) throws IOException {
BlockReaderUtil.readFully(this, buf, off, len);
}
+
+ @Override
+ public Socket takeSocket() {
+ return null;
+ }
+
+ @Override
+ public boolean hasSentStatusCode() {
+ return false;
+ }
+
+ @Override
+ public IOStreamPair getStreams() {
+ return null;
+ }
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1430662&r1=1430661&r2=1430662&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Wed Jan 9 02:39:15 2013
@@ -191,7 +191,7 @@ public class DFSClient implements java.i
final FileSystem.Statistics stats;
final int hdfsTimeout; // timeout value for a DFS operation.
private final String authority;
- final PeerCache peerCache;
+ final SocketCache socketCache;
final Conf dfsClientConf;
private Random r = new Random();
private SocketAddress[] localInterfaceAddrs;
@@ -433,7 +433,7 @@ public class DFSClient implements java.i
Joiner.on(',').join(localInterfaceAddrs) + "]");
}
- this.peerCache = PeerCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry);
+ this.socketCache = SocketCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry);
}
/**
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1430662&r1=1430661&r2=1430662&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Wed Jan 9 02:39:15 2013
@@ -32,15 +32,12 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.UnresolvedLinkException;
-import org.apache.hadoop.hdfs.net.EncryptedPeer;
-import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.net.TcpPeerServer;
+import org.apache.hadoop.hdfs.SocketCache.SocketAndStreams;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -49,7 +46,6 @@ import org.apache.hadoop.hdfs.protocol.L
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.ipc.RPC;
@@ -64,7 +60,7 @@ import org.apache.hadoop.security.token.
****************************************************************/
@InterfaceAudience.Private
public class DFSInputStream extends FSInputStream implements ByteBufferReadable {
- private final PeerCache peerCache;
+ private final SocketCache socketCache;
private final DFSClient dfsClient;
private boolean closed = false;
@@ -114,7 +110,7 @@ public class DFSInputStream extends FSIn
this.verifyChecksum = verifyChecksum;
this.buffersize = buffersize;
this.src = src;
- this.peerCache = dfsClient.peerCache;
+ this.socketCache = dfsClient.socketCache;
prefetchSize = dfsClient.getConf().prefetchSize;
timeWindow = dfsClient.getConf().timeWindow;
nCachedConnRetry = dfsClient.getConf().nCachedConnRetry;
@@ -428,7 +424,7 @@ public class DFSInputStream extends FSIn
// Will be getting a new BlockReader.
if (blockReader != null) {
- blockReader.close(peerCache);
+ closeBlockReader(blockReader);
blockReader = null;
}
@@ -510,7 +506,7 @@ public class DFSInputStream extends FSIn
dfsClient.checkOpen();
if (blockReader != null) {
- blockReader.close(peerCache);
+ closeBlockReader(blockReader);
blockReader = null;
}
super.close();
@@ -837,7 +833,7 @@ public class DFSInputStream extends FSIn
}
} finally {
if (reader != null) {
- reader.close(peerCache);
+ closeBlockReader(reader);
}
}
// Put chosen node into dead list, continue
@@ -845,30 +841,16 @@ public class DFSInputStream extends FSIn
}
}
- private Peer newPeer(InetSocketAddress addr) throws IOException {
- Peer peer = null;
- boolean success = false;
- Socket sock = null;
- try {
- sock = dfsClient.socketFactory.createSocket();
- NetUtils.connect(sock, addr,
- dfsClient.getRandomLocalInterfaceAddr(),
- dfsClient.getConf().socketTimeout);
- peer = TcpPeerServer.peerFromSocket(sock);
-
- // Add encryption if configured.
- DataEncryptionKey key = dfsClient.getDataEncryptionKey();
- if (key != null) {
- peer = new EncryptedPeer(peer, key);
- }
- success = true;
- return peer;
- } finally {
- if (!success) {
- IOUtils.closeQuietly(peer);
- IOUtils.closeQuietly(sock);
- }
+ /**
+ * Close the given BlockReader and cache its socket.
+ */
+ private void closeBlockReader(BlockReader reader) throws IOException {
+ if (reader.hasSentStatusCode()) {
+ IOStreamPair ioStreams = reader.getStreams();
+ Socket oldSock = reader.takeSocket();
+ socketCache.put(oldSock, ioStreams);
}
+ reader.close();
}
/**
@@ -914,16 +896,40 @@ public class DFSInputStream extends FSIn
// Allow retry since there is no way of knowing whether the cached socket
// is good until we actually use it.
for (int retries = 0; retries <= nCachedConnRetry && fromCache; ++retries) {
- Peer peer = null;
+ SocketAndStreams sockAndStreams = null;
// Don't use the cache on the last attempt - it's possible that there
// are arbitrarily many unusable sockets in the cache, but we don't
// want to fail the read.
if (retries < nCachedConnRetry) {
- peer = peerCache.get(chosenNode);
+ sockAndStreams = socketCache.get(dnAddr);
}
- if (peer == null) {
- peer = newPeer(dnAddr);
+ Socket sock;
+ if (sockAndStreams == null) {
fromCache = false;
+
+ sock = dfsClient.socketFactory.createSocket();
+
+ // TCP_NODELAY is crucial here because of bad interactions between
+ // Nagle's Algorithm and Delayed ACKs. With connection keepalive
+ // between the client and DN, the conversation looks like:
+ // 1. Client -> DN: Read block X
+ // 2. DN -> Client: data for block X
+ // 3. Client -> DN: Status OK (successful read)
+ // 4. Client -> DN: Read block Y
+ // The fact that step #3 and #4 are both in the client->DN direction
+ // triggers Nagling. If the DN is using delayed ACKs, this results
+ // in a delay of 40ms or more.
+ //
+ // TCP_NODELAY disables nagling and thus avoids this performance
+ // disaster.
+ sock.setTcpNoDelay(true);
+
+ NetUtils.connect(sock, dnAddr,
+ dfsClient.getRandomLocalInterfaceAddr(),
+ dfsClient.getConf().socketTimeout);
+ sock.setSoTimeout(dfsClient.getConf().socketTimeout);
+ } else {
+ sock = sockAndStreams.sock;
}
try {
@@ -933,13 +939,19 @@ public class DFSInputStream extends FSIn
setFile(file).setBlock(block).setBlockToken(blockToken).
setStartOffset(startOffset).setLen(len).
setBufferSize(bufferSize).setVerifyChecksum(verifyChecksum).
- setClientName(clientName).setDatanodeID(chosenNode).
- setPeer(peer));
+ setClientName(clientName).
+ setEncryptionKey(dfsClient.getDataEncryptionKey()).
+ setIoStreamPair(sockAndStreams == null ? null : sockAndStreams.ioStreams).
+ setSocket(sock));
return reader;
} catch (IOException ex) {
// Our socket is no good.
- DFSClient.LOG.debug("Error making BlockReader. Closing stale " + peer, ex);
- IOUtils.closeQuietly(peer);
+ DFSClient.LOG.debug("Error making BlockReader. Closing stale " + sock, ex);
+ if (sockAndStreams != null) {
+ sockAndStreams.close();
+ } else {
+ sock.close();
+ }
err = ex;
}
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1430662&r1=1430661&r2=1430662&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Wed Jan 9 02:39:15 2013
@@ -25,20 +25,25 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.Socket;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FSInputChecker;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
@@ -51,8 +56,7 @@ import org.apache.hadoop.util.DataChecks
@Deprecated
public class RemoteBlockReader extends FSInputChecker implements BlockReader {
- private final Peer peer;
- private final DatanodeID datanodeID;
+ Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
private final DataInputStream in;
private DataChecksum checksum;
@@ -122,9 +126,9 @@ public class RemoteBlockReader extends F
// if eos was set in the previous read, send a status code to the DN
if (eos && !eosBefore && nRead >= 0) {
if (needChecksum()) {
- sendReadResult(peer, Status.CHECKSUM_OK);
+ sendReadResult(dnSock, Status.CHECKSUM_OK);
} else {
- sendReadResult(peer, Status.SUCCESS);
+ sendReadResult(dnSock, Status.SUCCESS);
}
}
return nRead;
@@ -318,8 +322,7 @@ public class RemoteBlockReader extends F
private RemoteBlockReader(String file, String bpid, long blockId,
DataInputStream in, DataChecksum checksum, boolean verifyChecksum,
- long startOffset, long firstChunkOffset, long bytesToRead,
- Peer peer, DatanodeID datanodeID) {
+ long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock) {
// Path is used only for printing block and file information in debug
super(new Path("/blk_" + blockId + ":" + bpid + ":of:"+ file)/*too non path-like?*/,
1, verifyChecksum,
@@ -327,8 +330,7 @@ public class RemoteBlockReader extends F
checksum.getBytesPerChecksum(),
checksum.getChecksumSize());
- this.peer = peer;
- this.datanodeID = datanodeID;
+ this.dnSock = dnSock;
this.in = in;
this.checksum = checksum;
this.startOffset = Math.max( startOffset, 0 );
@@ -365,8 +367,9 @@ public class RemoteBlockReader extends F
public static RemoteBlockReader newBlockReader(BlockReaderFactory.Params params)
throws IOException {
// in and out will be closed when sock is closed (by the caller)
+ Socket sock = params.getSocket();
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
- params.getPeer().getOutputStream()));
+ NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT)));
new Sender(out).readBlock(params.getBlock(), params.getBlockToken(),
params.getClientName(), params.getStartOffset(), params.getLen());
@@ -374,13 +377,13 @@ public class RemoteBlockReader extends F
// Get bytes in block, set streams
//
DataInputStream in = new DataInputStream(
- new BufferedInputStream(params.getPeer().getInputStream(),
+ new BufferedInputStream(NetUtils.getInputStream(sock),
params.getBufferSize()));
BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
vintPrefixed(in));
- RemoteBlockReader2.checkSuccess(status, params.getPeer(),
- params.getBlock(), params.getFile());
+ RemoteBlockReader2.checkSuccess(status, sock, params.getBlock(),
+ params.getFile());
ReadOpChecksumInfoProto checksumInfo =
status.getReadOpChecksumInfo();
DataChecksum checksum = DataTransferProtoUtil.fromProto(
@@ -399,20 +402,18 @@ public class RemoteBlockReader extends F
return new RemoteBlockReader(params.getFile(), params.getBlock().getBlockPoolId(),
params.getBlock().getBlockId(), in, checksum, params.getVerifyChecksum(),
- params.getStartOffset(), firstChunkOffset, params.getLen(),
- params.getPeer(), params.getDatanodeID());
+ params.getStartOffset(), firstChunkOffset, params.getLen(), sock);
}
@Override
- public synchronized void close(PeerCache peerCache) throws IOException {
+ public synchronized void close() throws IOException {
startOffset = -1;
checksum = null;
- if (peerCache != null && sentStatusCode) {
- peerCache.put(datanodeID, peer);
- } else {
- peer.close();
+ if (dnSock != null) {
+ dnSock.close();
}
- // in will be closed when its Peer is closed.
+
+ // in will be closed when its Socket is closed.
}
@Override
@@ -426,21 +427,37 @@ public class RemoteBlockReader extends F
return readFully(this, buf, offset, len);
}
+ @Override
+ public Socket takeSocket() {
+ assert hasSentStatusCode() :
+ "BlockReader shouldn't give back sockets mid-read";
+ Socket res = dnSock;
+ dnSock = null;
+ return res;
+ }
+
+ @Override
+ public boolean hasSentStatusCode() {
+ return sentStatusCode;
+ }
+
/**
* When the reader reaches end of the read, it sends a status response
* (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
* closing our connection (which we will re-open), but won't affect
* data correctness.
*/
- void sendReadResult(Peer peer, Status statusCode) {
- assert !sentStatusCode : "already sent status code to " + peer;
+ void sendReadResult(Socket sock, Status statusCode) {
+ assert !sentStatusCode : "already sent status code to " + sock;
try {
- RemoteBlockReader2.writeReadResult(peer.getOutputStream(), statusCode);
+ RemoteBlockReader2.writeReadResult(
+ NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT),
+ statusCode);
sentStatusCode = true;
} catch (IOException e) {
// It's ok not to be able to send this. But something is probably wrong.
LOG.info("Could not send read status (" + statusCode + ") to datanode " +
- datanodeID + ": " + e.getMessage());
+ sock.getInetAddress() + ": " + e.getMessage());
}
}
@@ -460,4 +477,12 @@ public class RemoteBlockReader extends F
public int read(ByteBuffer buf) throws IOException {
throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader");
}
+
+ @Override
+ public IOStreamPair getStreams() {
+ // This class doesn't support encryption, which is the only thing this
+ // method is used for. See HDFS-3637.
+ return null;
+ }
+
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1430662&r1=1430661&r2=1430662&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java Wed Jan 9 02:39:15 2013
@@ -25,15 +25,16 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
+import java.net.Socket;
import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
-import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@@ -41,11 +42,13 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.net.SocketInputWrapper;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
-import com.google.common.annotations.VisibleForTesting;
-
/**
* This is a wrapper around connection to datanode
* and understands checksum, offset etc.
@@ -76,8 +79,11 @@ import com.google.common.annotations.Vis
public class RemoteBlockReader2 implements BlockReader {
static final Log LOG = LogFactory.getLog(RemoteBlockReader2.class);
- private final DatanodeID datanodeID;
- private final Peer peer;
+
+ Socket dnSock;
+ // for now just sending the status code (e.g. checksumOk) after the read.
+ private IOStreamPair ioStreams;
+ private final ReadableByteChannel in;
private DataChecksum checksum;
private PacketReceiver packetReceiver = new PacketReceiver(true);
@@ -109,11 +115,6 @@ public class RemoteBlockReader2 impleme
/** Amount of unread data in the current received packet */
int dataLeft = 0;
- @VisibleForTesting
- public Peer getPeer() {
- return peer;
- }
-
@Override
public synchronized int read(byte[] buf, int off, int len)
throws IOException {
@@ -154,7 +155,7 @@ public class RemoteBlockReader2 impleme
private void readNextPacket() throws IOException {
//Read packet headers.
- packetReceiver.receiveNextPacket(peer.getInputStreamChannel());
+ packetReceiver.receiveNextPacket(in);
PacketHeader curHeader = packetReceiver.getHeader();
curDataSlice = packetReceiver.getDataSlice();
@@ -235,7 +236,7 @@ public class RemoteBlockReader2 impleme
LOG.trace("Reading empty packet at end of read");
}
- packetReceiver.receiveNextPacket(peer.getInputStreamChannel());
+ packetReceiver.receiveNextPacket(in);
PacketHeader trailer = packetReceiver.getHeader();
if (!trailer.isLastPacketInBlock() ||
@@ -246,10 +247,11 @@ public class RemoteBlockReader2 impleme
}
protected RemoteBlockReader2(BlockReaderFactory.Params params,
- DataChecksum checksum, long firstChunkOffset) {
+ DataChecksum checksum, long firstChunkOffset, ReadableByteChannel in) {
// Path is used only for printing block and file information in debug
- this.datanodeID = params.getDatanodeID();
- this.peer = params.getPeer();
+ this.dnSock = params.getSocket();
+ this.ioStreams = params.getIoStreamPair();
+ this.in = in;
this.checksum = checksum;
this.verifyChecksum = params.getVerifyChecksum();
this.startOffset = Math.max( params.getStartOffset(), 0 );
@@ -266,19 +268,38 @@ public class RemoteBlockReader2 impleme
@Override
- public synchronized void close(PeerCache peerCache) throws IOException {
+ public synchronized void close() throws IOException {
packetReceiver.close();
startOffset = -1;
checksum = null;
- if (peerCache != null && sentStatusCode) {
- peerCache.put(datanodeID, peer);
- } else {
- peer.close();
+ if (dnSock != null) {
+ dnSock.close();
}
// in will be closed when its Socket is closed.
}
+
+ /**
+ * Take the socket used to talk to the DN.
+ */
+ @Override
+ public Socket takeSocket() {
+ assert hasSentStatusCode() :
+ "BlockReader shouldn't give back sockets mid-read";
+ Socket res = dnSock;
+ dnSock = null;
+ return res;
+ }
+
+ /**
+ * Whether the BlockReader has reached the end of its input stream
+ * and successfully sent a status code back to the datanode.
+ */
+ @Override
+ public boolean hasSentStatusCode() {
+ return sentStatusCode;
+ }
/**
* When the reader reaches end of the read, it sends a status response
@@ -287,14 +308,14 @@ public class RemoteBlockReader2 impleme
* data correctness.
*/
void sendReadResult(Status statusCode) {
- assert !sentStatusCode : "already sent status code to " + peer;
+ assert !sentStatusCode : "already sent status code to " + dnSock;
try {
- writeReadResult(peer.getOutputStream(), statusCode);
+ writeReadResult(ioStreams.out, statusCode);
sentStatusCode = true;
} catch (IOException e) {
// It's ok not to be able to send this. But something is probably wrong.
LOG.info("Could not send read status (" + statusCode + ") to datanode " +
- peer.getRemoteAddressString() + ": " + e.getMessage());
+ dnSock.getInetAddress() + ": " + e.getMessage());
}
}
@@ -352,20 +373,29 @@ public class RemoteBlockReader2 impleme
*/
public static BlockReader newBlockReader(BlockReaderFactory.Params params)
throws IOException {
+ IOStreamPair ioStreams = params.getIoStreamPair();
+ ReadableByteChannel ch;
+ if (ioStreams.in instanceof SocketInputWrapper) {
+ ch = ((SocketInputWrapper)ioStreams.in).getReadableByteChannel();
+ } else {
+ ch = (ReadableByteChannel) ioStreams.in;
+ }
+
// in and out will be closed when sock is closed (by the caller)
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
- params.getPeer().getOutputStream()));
+ ioStreams.out));
new Sender(out).readBlock(params.getBlock(), params.getBlockToken(),
params.getClientName(), params.getStartOffset(), params.getLen());
//
// Get bytes in block
//
- DataInputStream in = new DataInputStream(params.getPeer().getInputStream());
+ DataInputStream in = new DataInputStream(ioStreams.in);
BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
vintPrefixed(in));
- checkSuccess(status, params.getPeer(), params.getBlock(), params.getFile());
+ checkSuccess(status, params.getSocket(), params.getBlock(),
+ params.getFile());
ReadOpChecksumInfoProto checksumInfo =
status.getReadOpChecksumInfo();
DataChecksum checksum = DataTransferProtoUtil.fromProto(
@@ -382,28 +412,33 @@ public class RemoteBlockReader2 impleme
params.getStartOffset() + " for file " + params.getFile());
}
- return new RemoteBlockReader2(params, checksum, firstChunkOffset);
+ return new RemoteBlockReader2(params, checksum, firstChunkOffset, ch);
}
static void checkSuccess(
- BlockOpResponseProto status, Peer peer,
+ BlockOpResponseProto status, Socket sock,
ExtendedBlock block, String file)
throws IOException {
if (status.getStatus() != Status.SUCCESS) {
if (status.getStatus() == Status.ERROR_ACCESS_TOKEN) {
throw new InvalidBlockTokenException(
"Got access token error for OP_READ_BLOCK, self="
- + peer.getLocalAddressString() + ", remote="
- + peer.getRemoteAddressString() + ", for file " + file
+ + sock.getLocalSocketAddress() + ", remote="
+ + sock.getRemoteSocketAddress() + ", for file " + file
+ ", for pool " + block.getBlockPoolId() + " block "
+ block.getBlockId() + "_" + block.getGenerationStamp());
} else {
throw new IOException("Got error for OP_READ_BLOCK, self="
- + peer.getLocalAddressString() + ", remote="
- + peer.getRemoteAddressString() + ", for file " + file
+ + sock.getLocalSocketAddress() + ", remote="
+ + sock.getRemoteSocketAddress() + ", for file " + file
+ ", for pool " + block.getBlockPoolId() + " block "
+ block.getBlockId() + "_" + block.getGenerationStamp());
}
}
}
+
+ @Override
+ public IOStreamPair getStreams() {
+ return ioStreams;
+ }
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1430662&r1=1430661&r2=1430662&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Wed Jan 9 02:39:15 2013
@@ -46,8 +46,6 @@ import org.apache.hadoop.hdfs.BlockReade
import org.apache.hadoop.hdfs.BlockReaderFactory;
import org.apache.hadoop.hdfs.DFSClient.Conf;
import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.net.TcpPeerServer;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -209,14 +207,12 @@ public class JspHelper {
// Use the block name for file name.
BlockReader blockReader = BlockReaderFactory.newBlockReader(
new BlockReaderFactory.Params(new Conf(conf)).
- setPeer(TcpPeerServer.peerFromSocketAndKey(s, encryptionKey)).
+ setSocket(s).
setBlockToken(blockToken).setStartOffset(offsetIntoBlock).
setLen(amtToRead).
+ setEncryptionKey(encryptionKey).
setFile(BlockReaderFactory.getFileName(addr, poolId, blockId)).
- setBlock(new ExtendedBlock(poolId, blockId, 0, genStamp)).
- setDatanodeID(new DatanodeID(addr.getAddress().toString(),
- addr.getHostName(), poolId, addr.getPort(), 0, 0)));
-
+ setBlock(new ExtendedBlock(poolId, blockId, 0, genStamp)));
byte[] buf = new byte[(int)amtToRead];
int readOffset = 0;
int retries = 2;
@@ -234,7 +230,8 @@ public class JspHelper {
amtToRead -= numRead;
readOffset += numRead;
}
- blockReader.close(null);
+ blockReader = null;
+ s.close();
out.print(HtmlQuoting.quoteHtmlChars(new String(buf)));
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1430662&r1=1430661&r2=1430662&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Wed Jan 9 02:39:15 2013
@@ -90,7 +90,6 @@ import org.apache.hadoop.hdfs.DFSConfigK
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
@@ -523,19 +522,24 @@ public class DataNode extends Configured
private void initDataXceiver(Configuration conf) throws IOException {
// find free port or use privileged port provided
- TcpPeerServer tcpPeerServer;
- if (secureResources != null) {
- tcpPeerServer = new TcpPeerServer(secureResources);
+ ServerSocket ss;
+ if (secureResources == null) {
+ InetSocketAddress addr = DataNode.getStreamingAddr(conf);
+ ss = (dnConf.socketWriteTimeout > 0) ?
+ ServerSocketChannel.open().socket() : new ServerSocket();
+ Server.bind(ss, addr, 0);
} else {
- tcpPeerServer = new TcpPeerServer(dnConf.socketWriteTimeout,
- DataNode.getStreamingAddr(conf));
+ ss = secureResources.getStreamingSocket();
}
- tcpPeerServer.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
- streamingAddr = tcpPeerServer.getStreamingAddr();
+ ss.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
+
+ streamingAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
+ ss.getLocalPort());
+
LOG.info("Opened streaming server at " + streamingAddr);
this.threadGroup = new ThreadGroup("dataXceiverServer");
this.dataXceiverServer = new Daemon(threadGroup,
- new DataXceiverServer(tcpPeerServer, conf, this));
+ new DataXceiverServer(ss, conf, this));
this.threadGroup.setDaemon(true); // auto destroy when empty
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1430662&r1=1430661&r2=1430662&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Wed Jan 9 02:39:15 2013
@@ -39,7 +39,6 @@ import java.nio.channels.ClosedChannelEx
import java.util.Arrays;
import org.apache.commons.logging.Log;
-import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -65,6 +64,7 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.SocketInputWrapper;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
@@ -79,7 +79,8 @@ class DataXceiver extends Receiver imple
public static final Log LOG = DataNode.LOG;
static final Log ClientTraceLog = DataNode.ClientTraceLog;
- private final Peer peer;
+ private final Socket s;
+ private final boolean isLocal; //is a local connection?
private final String remoteAddress; // address of remote side
private final String localAddress; // local address of this daemon
private final DataNode datanode;
@@ -87,7 +88,7 @@ class DataXceiver extends Receiver imple
private final DataXceiverServer dataXceiverServer;
private final boolean connectToDnViaHostname;
private long opStartTime; //the start time of receiving an Op
- private final InputStream socketIn;
+ private final SocketInputWrapper socketIn;
private OutputStream socketOut;
/**
@@ -96,23 +97,25 @@ class DataXceiver extends Receiver imple
*/
private String previousOpClientName;
- public static DataXceiver create(Peer peer, DataNode dn,
+ public static DataXceiver create(Socket s, DataNode dn,
DataXceiverServer dataXceiverServer) throws IOException {
- return new DataXceiver(peer, dn, dataXceiverServer);
+ return new DataXceiver(s, dn, dataXceiverServer);
}
- private DataXceiver(Peer peer, DataNode datanode,
+ private DataXceiver(Socket s,
+ DataNode datanode,
DataXceiverServer dataXceiverServer) throws IOException {
- this.peer = peer;
+ this.s = s;
this.dnConf = datanode.getDnConf();
- this.socketIn = peer.getInputStream();
- this.socketOut = peer.getOutputStream();
+ this.socketIn = NetUtils.getInputStream(s);
+ this.socketOut = NetUtils.getOutputStream(s, dnConf.socketWriteTimeout);
+ this.isLocal = s.getInetAddress().equals(s.getLocalAddress());
this.datanode = datanode;
this.dataXceiverServer = dataXceiverServer;
this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname;
- remoteAddress = peer.getRemoteAddressString();
- localAddress = peer.getLocalAddressString();
+ remoteAddress = s.getRemoteSocketAddress().toString();
+ localAddress = s.getLocalSocketAddress().toString();
if (LOG.isDebugEnabled()) {
LOG.debug("Number of active connections is: "
@@ -152,10 +155,11 @@ class DataXceiver extends Receiver imple
public void run() {
int opsProcessed = 0;
Op op = null;
-
- dataXceiverServer.addPeer(peer);
+
+ dataXceiverServer.childSockets.add(s);
+
try {
- peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
+
InputStream input = socketIn;
if (dnConf.encryptDataTransfer) {
IOStreamPair encryptedStreams = null;
@@ -165,9 +169,8 @@ class DataXceiver extends Receiver imple
dnConf.encryptionAlgorithm);
} catch (InvalidMagicNumberException imne) {
LOG.info("Failed to read expected encryption handshake from client " +
- "at " + peer.getRemoteAddressString() + ". Perhaps the client " +
- "is running an older version of Hadoop which does not support " +
- "encryption");
+ "at " + s.getInetAddress() + ". Perhaps the client is running an " +
+ "older version of Hadoop which does not support encryption");
return;
}
input = encryptedStreams.in;
@@ -186,9 +189,9 @@ class DataXceiver extends Receiver imple
try {
if (opsProcessed != 0) {
assert dnConf.socketKeepaliveTimeout > 0;
- peer.setReadTimeout(dnConf.socketKeepaliveTimeout);
+ socketIn.setTimeout(dnConf.socketKeepaliveTimeout);
} else {
- peer.setReadTimeout(dnConf.socketTimeout);
+ socketIn.setTimeout(dnConf.socketTimeout);
}
op = readOp();
} catch (InterruptedIOException ignored) {
@@ -199,7 +202,7 @@ class DataXceiver extends Receiver imple
if (opsProcessed > 0 &&
(err instanceof EOFException || err instanceof ClosedChannelException)) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Cached " + peer + " closing after " + opsProcessed + " ops");
+ LOG.debug("Cached " + s.toString() + " closing after " + opsProcessed + " ops");
}
} else {
throw err;
@@ -209,13 +212,13 @@ class DataXceiver extends Receiver imple
// restore normal timeout
if (opsProcessed != 0) {
- peer.setReadTimeout(dnConf.socketTimeout);
+ s.setSoTimeout(dnConf.socketTimeout);
}
opStartTime = now();
processOp(op);
++opsProcessed;
- } while (!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0);
+ } while (!s.isClosed() && dnConf.socketKeepaliveTimeout > 0);
} catch (Throwable t) {
LOG.error(datanode.getDisplayName() + ":DataXceiver error processing " +
((op == null) ? "unknown" : op.name()) + " operation " +
@@ -227,8 +230,9 @@ class DataXceiver extends Receiver imple
+ datanode.getXceiverCount());
}
updateCurrentThreadName("Cleaning up");
- dataXceiverServer.closePeer(peer);
IOUtils.closeStream(in);
+ IOUtils.closeSocket(s);
+ dataXceiverServer.childSockets.remove(s);
}
}
@@ -282,9 +286,8 @@ class DataXceiver extends Receiver imple
ClientReadStatusProto stat = ClientReadStatusProto.parseFrom(
HdfsProtoUtil.vintPrefixed(in));
if (!stat.hasStatus()) {
- LOG.warn("Client " + peer.getRemoteAddressString() +
- " did not send a valid status code after reading. " +
- "Will close connection.");
+ LOG.warn("Client " + s.getInetAddress() + " did not send a valid status " +
+ "code after reading. Will close connection.");
IOUtils.closeStream(out);
}
} catch (IOException ioe) {
@@ -317,7 +320,7 @@ class DataXceiver extends Receiver imple
//update metrics
datanode.metrics.addReadBlockOp(elapsed());
- datanode.metrics.incrReadsFromClient(peer.isLocal());
+ datanode.metrics.incrReadsFromClient(isLocal);
}
@Override
@@ -355,8 +358,8 @@ class DataXceiver extends Receiver imple
LOG.debug("isDatanode=" + isDatanode
+ ", isClient=" + isClient
+ ", isTransfer=" + isTransfer);
- LOG.debug("writeBlock receive buf size " + peer.getReceiveBufferSize() +
- " tcp no delay " + peer.getTcpNoDelay());
+ LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() +
+ " tcp no delay " + s.getTcpNoDelay());
}
// We later mutate block's generation stamp and length, but we need to
@@ -387,8 +390,8 @@ class DataXceiver extends Receiver imple
stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
// open a block receiver
blockReceiver = new BlockReceiver(block, in,
- peer.getRemoteAddressString(),
- peer.getLocalAddressString(),
+ s.getRemoteSocketAddress().toString(),
+ s.getLocalSocketAddress().toString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode, requestedChecksum);
} else {
@@ -543,7 +546,7 @@ class DataXceiver extends Receiver imple
//update metrics
datanode.metrics.addWriteBlockOp(elapsed());
- datanode.metrics.incrWritesFromClient(peer.isLocal());
+ datanode.metrics.incrWritesFromClient(isLocal);
}
@Override
@@ -551,7 +554,7 @@ class DataXceiver extends Receiver imple
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final DatanodeInfo[] targets) throws IOException {
- checkAccess(socketOut, true, blk, blockToken,
+ checkAccess(null, true, blk, blockToken,
Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY);
previousOpClientName = clientName;
updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk);
@@ -638,9 +641,8 @@ class DataXceiver extends Receiver imple
}
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
- String msg = "Not able to copy block " + block.getBlockId() + " " +
- "to " + peer.getRemoteAddressString() + " because threads " +
- "quota is exceeded.";
+ String msg = "Not able to copy block " + block.getBlockId() + " to "
+ + s.getRemoteSocketAddress() + " because threads quota is exceeded.";
LOG.info(msg);
sendResponse(ERROR, msg);
return;
@@ -669,7 +671,7 @@ class DataXceiver extends Receiver imple
datanode.metrics.incrBytesRead((int) read);
datanode.metrics.incrBlocksRead();
- LOG.info("Copied " + block + " to " + peer.getRemoteAddressString());
+ LOG.info("Copied " + block + " to " + s.getRemoteSocketAddress());
} catch (IOException ioe) {
isOpSuccess = false;
LOG.info("opCopyBlock " + block + " received exception " + ioe);
@@ -714,9 +716,8 @@ class DataXceiver extends Receiver imple
}
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
- String msg = "Not able to receive block " + block.getBlockId() +
- " from " + peer.getRemoteAddressString() + " because threads " +
- "quota is exceeded.";
+ String msg = "Not able to receive block " + block.getBlockId() + " from "
+ + s.getRemoteSocketAddress() + " because threads quota is exceeded.";
LOG.warn(msg);
sendResponse(ERROR, msg);
return;
@@ -793,7 +794,7 @@ class DataXceiver extends Receiver imple
// notify name node
datanode.notifyNamenodeReceivedBlock(block, delHint);
- LOG.info("Moved " + block + " from " + peer.getRemoteAddressString());
+ LOG.info("Moved " + block + " from " + s.getRemoteSocketAddress());
} catch (IOException ioe) {
opStatus = ERROR;
@@ -816,7 +817,7 @@ class DataXceiver extends Receiver imple
try {
sendResponse(opStatus, errMsg);
} catch (IOException ioe) {
- LOG.warn("Error writing reply back to " + peer.getRemoteAddressString());
+ LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress());
}
IOUtils.closeStream(proxyOut);
IOUtils.closeStream(blockReceiver);
@@ -870,7 +871,7 @@ class DataXceiver extends Receiver imple
}
- private void checkAccess(OutputStream out, final boolean reply,
+ private void checkAccess(DataOutputStream out, final boolean reply,
final ExtendedBlock blk,
final Token<BlockTokenIdentifier> t,
final Op op,
@@ -885,6 +886,11 @@ class DataXceiver extends Receiver imple
} catch(InvalidToken e) {
try {
if (reply) {
+ if (out == null) {
+ out = new DataOutputStream(
+ NetUtils.getOutputStream(s, dnConf.socketWriteTimeout));
+ }
+
BlockOpResponseProto.Builder resp = BlockOpResponseProto.newBuilder()
.setStatus(ERROR_ACCESS_TOKEN);
if (mode == BlockTokenSecretManager.AccessMode.WRITE) {
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=1430662&r1=1430661&r2=1430662&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Wed Jan 9 02:39:15 2013
@@ -18,16 +18,18 @@
package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.channels.AsynchronousCloseException;
+import java.util.Collections;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.net.PeerServer;
import org.apache.hadoop.hdfs.server.balancer.Balancer;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
@@ -43,9 +45,11 @@ import org.apache.hadoop.util.Daemon;
class DataXceiverServer implements Runnable {
public static final Log LOG = DataNode.LOG;
- private final PeerServer peerServer;
- private final DataNode datanode;
- private final Set<Peer> peers = new HashSet<Peer>();
+ ServerSocket ss;
+ DataNode datanode;
+ // Record all sockets opened for data transfer
+ Set<Socket> childSockets = Collections.synchronizedSet(
+ new HashSet<Socket>());
/**
* Maximal number of concurrent xceivers per node.
@@ -105,10 +109,10 @@ class DataXceiverServer implements Runna
long estimateBlockSize;
- DataXceiverServer(PeerServer peerServer, Configuration conf,
+ DataXceiverServer(ServerSocket ss, Configuration conf,
DataNode datanode) {
- this.peerServer = peerServer;
+ this.ss = ss;
this.datanode = datanode;
this.maxXceiverCount =
@@ -126,10 +130,12 @@ class DataXceiverServer implements Runna
@Override
public void run() {
- Peer peer = null;
while (datanode.shouldRun) {
+ Socket s = null;
try {
- peer = peerServer.accept();
+ s = ss.accept();
+ s.setTcpNoDelay(true);
+ // Timeouts are set within DataXceiver.run()
// Make sure the xceiver count is not exceeded
int curXceiverCount = datanode.getXceiverCount();
@@ -140,7 +146,7 @@ class DataXceiverServer implements Runna
}
new Daemon(datanode.threadGroup,
- DataXceiver.create(peer, datanode, this))
+ DataXceiver.create(s, datanode, this))
.start();
} catch (SocketTimeoutException ignored) {
// wake up to see if should continue to run
@@ -151,10 +157,10 @@ class DataXceiverServer implements Runna
LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ace);
}
} catch (IOException ie) {
- IOUtils.cleanup(null, peer);
+ IOUtils.closeSocket(s);
LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ie);
} catch (OutOfMemoryError ie) {
- IOUtils.cleanup(null, peer);
+ IOUtils.closeSocket(s);
// DataNode can run out of memory if there is too many transfers.
// Log the event, Sleep for 30 seconds, other transfers may complete by
// then.
@@ -170,35 +176,33 @@ class DataXceiverServer implements Runna
datanode.shouldRun = false;
}
}
- synchronized (this) {
- for (Peer p : peers) {
- IOUtils.cleanup(LOG, p);
- }
- }
try {
- peerServer.close();
+ ss.close();
} catch (IOException ie) {
LOG.warn(datanode.getDisplayName()
+ " :DataXceiverServer: close exception", ie);
}
}
-
+
void kill() {
assert datanode.shouldRun == false :
"shoudRun should be set to false before killing";
try {
- this.peerServer.close();
+ this.ss.close();
} catch (IOException ie) {
LOG.warn(datanode.getDisplayName() + ":DataXceiverServer.kill(): ", ie);
}
- }
-
- synchronized void addPeer(Peer peer) {
- peers.add(peer);
- }
- synchronized void closePeer(Peer peer) {
- peers.remove(peer);
- IOUtils.cleanup(null, peer);
+ // close all the sockets that were accepted earlier
+ synchronized (childSockets) {
+ for (Iterator<Socket> it = childSockets.iterator();
+ it.hasNext();) {
+ Socket thissock = it.next();
+ try {
+ thissock.close();
+ } catch (IOException e) {
+ }
+ }
+ }
}
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1430662&r1=1430661&r2=1430662&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Wed Jan 9 02:39:15 2013
@@ -43,7 +43,6 @@ import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSClient.Conf;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -560,13 +559,13 @@ public class NamenodeFsck {
blockReader = BlockReaderFactory.newBlockReader(
new BlockReaderFactory.Params(new Conf(conf)).
- setPeer(TcpPeerServer.peerFromSocketAndKey(s,
- namenode.getRpcServer().getDataEncryptionKey())).
- setBlock(block).
+ setSocket(s).setBlock(block).
setFile(BlockReaderFactory.getFileName(targetAddr,
block.getBlockPoolId(), block.getBlockId())).
setBlockToken(lblock.getBlockToken()).
- setDatanodeID(chosenNode));
+ setEncryptionKey(namenode.getRpcServer().getDataEncryptionKey()).
+ setLen(-1));
+
} catch (IOException ex) {
// Put chosen node into dead list, continue
LOG.info("Failed to connect to " + targetAddr + ":" + ex);
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java?rev=1430662&r1=1430661&r2=1430662&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java Wed Jan 9 02:39:15 2013
@@ -32,7 +32,6 @@ import org.apache.hadoop.fs.CommonConfig
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient.Conf;
-import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -153,13 +152,13 @@ public class BlockReaderTestUtil {
return BlockReaderFactory.newBlockReader(
new BlockReaderFactory.Params(new Conf(conf)).
- setPeer(TcpPeerServer.peerFromSocket(sock)).
+ setSocket(sock).
setFile(targetAddr.toString() + ":" + block.getBlockId()).
setBlock(block).setBlockToken(testBlock.getBlockToken()).
setStartOffset(offset).setLen(lenToRead).
setBufferSize(conf.getInt(
CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096)).
- setVerifyChecksum(true).setDatanodeID(nodes[0]));
+ setVerifyChecksum(true));
}
/**
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java?rev=1430662&r1=1430661&r2=1430662&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java Wed Jan 9 02:39:15 2013
@@ -61,7 +61,7 @@ public class TestClientBlockVerification
util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true);
verify(reader).sendReadResult(Status.CHECKSUM_OK);
- reader.close(null);
+ reader.close();
}
/**
@@ -76,7 +76,7 @@ public class TestClientBlockVerification
// We asked the blockreader for the whole file, and only read
// half of it, so no CHECKSUM_OK
verify(reader, never()).sendReadResult(Status.CHECKSUM_OK);
- reader.close(null);
+ reader.close();
}
/**
@@ -92,7 +92,7 @@ public class TestClientBlockVerification
// And read half the file
util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true);
verify(reader).sendReadResult(Status.CHECKSUM_OK);
- reader.close(null);
+ reader.close();
}
/**
@@ -111,7 +111,7 @@ public class TestClientBlockVerification
util.getBlockReader(testBlock, startOffset, length));
util.readAndCheckEOS(reader, length, true);
verify(reader).sendReadResult(Status.CHECKSUM_OK);
- reader.close(null);
+ reader.close();
}
}
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java?rev=1430662&r1=1430661&r2=1430662&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java Wed Jan 9 02:39:15 2013
@@ -18,20 +18,28 @@
package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.spy;
import java.io.IOException;
import java.net.InetSocketAddress;
-
-import junit.framework.Assert;
+import java.net.Socket;
+import java.security.PrivilegedExceptionAction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.security.token.Token;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
@@ -47,31 +55,59 @@ public class TestConnCache {
static final int BLOCK_SIZE = 4096;
static final int FILE_SIZE = 3 * BLOCK_SIZE;
+ final static int CACHE_SIZE = 4;
+ final static long CACHE_EXPIRY_MS = 200;
+ static Configuration conf = null;
+ static MiniDFSCluster cluster = null;
+ static FileSystem fs = null;
+ static SocketCache cache;
+
+ static final Path testFile = new Path("/testConnCache.dat");
+ static byte authenticData[] = null;
+
+ static BlockReaderTestUtil util = null;
+
/**
* A mock Answer to remember the BlockReader used.
*
* It verifies that all invocation to DFSInputStream.getBlockReader()
- * use the same peer.
+ * use the same socket.
*/
private class MockGetBlockReader implements Answer<RemoteBlockReader2> {
public RemoteBlockReader2 reader = null;
- private Peer peer = null;
+ private Socket sock = null;
@Override
public RemoteBlockReader2 answer(InvocationOnMock invocation) throws Throwable {
RemoteBlockReader2 prevReader = reader;
reader = (RemoteBlockReader2) invocation.callRealMethod();
- if (peer == null) {
- peer = reader.getPeer();
+ if (sock == null) {
+ sock = reader.dnSock;
} else if (prevReader != null) {
- Assert.assertSame("DFSInputStream should use the same peer",
- peer, reader.getPeer());
+ assertSame("DFSInputStream should use the same socket",
+ sock, reader.dnSock);
}
return reader;
}
}
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ final int REPLICATION_FACTOR = 1;
+
+ /* create a socket cache. There is only one socket cache per jvm */
+ cache = SocketCache.getInstance(CACHE_SIZE, CACHE_EXPIRY_MS);
+
+ util = new BlockReaderTestUtil(REPLICATION_FACTOR);
+ cluster = util.getCluster();
+ conf = util.getConf();
+ fs = cluster.getFileSystem();
+
+ authenticData = util.writeFile(testFile, FILE_SIZE / 1024);
+ }
+
+
/**
* (Optionally) seek to position, read and verify data.
*
@@ -81,10 +117,9 @@ public class TestConnCache {
long pos,
byte[] buffer,
int offset,
- int length,
- byte[] authenticData)
+ int length)
throws IOException {
- Assert.assertTrue("Test buffer too small", buffer.length >= offset + length);
+ assertTrue("Test buffer too small", buffer.length >= offset + length);
if (pos >= 0)
in.seek(pos);
@@ -94,7 +129,7 @@ public class TestConnCache {
while (length > 0) {
int cnt = in.read(buffer, offset, length);
- Assert.assertTrue("Error in read", cnt > 0);
+ assertTrue("Error in read", cnt > 0);
offset += cnt;
length -= cnt;
}
@@ -110,22 +145,115 @@ public class TestConnCache {
}
/**
+ * Test the SocketCache itself.
+ */
+ @Test
+ public void testSocketCache() throws Exception {
+ // Make a client
+ InetSocketAddress nnAddr =
+ new InetSocketAddress("localhost", cluster.getNameNodePort());
+ DFSClient client = new DFSClient(nnAddr, conf);
+
+ // Find out the DN addr
+ LocatedBlock block =
+ client.getNamenode().getBlockLocations(
+ testFile.toString(), 0, FILE_SIZE)
+ .getLocatedBlocks().get(0);
+ DataNode dn = util.getDataNode(block);
+ InetSocketAddress dnAddr = dn.getXferAddress();
+
+
+ // Make some sockets to the DN
+ Socket[] dnSockets = new Socket[CACHE_SIZE];
+ for (int i = 0; i < dnSockets.length; ++i) {
+ dnSockets[i] = client.socketFactory.createSocket(
+ dnAddr.getAddress(), dnAddr.getPort());
+ }
+
+
+ // Insert a socket to the NN
+ Socket nnSock = new Socket(nnAddr.getAddress(), nnAddr.getPort());
+ cache.put(nnSock, null);
+ assertSame("Read the write", nnSock, cache.get(nnAddr).sock);
+ cache.put(nnSock, null);
+
+ // Insert DN socks
+ for (Socket dnSock : dnSockets) {
+ cache.put(dnSock, null);
+ }
+
+ assertEquals("NN socket evicted", null, cache.get(nnAddr));
+ assertTrue("Evicted socket closed", nnSock.isClosed());
+
+ // Lookup the DN socks
+ for (Socket dnSock : dnSockets) {
+ assertEquals("Retrieve cached sockets", dnSock, cache.get(dnAddr).sock);
+ dnSock.close();
+ }
+
+ assertEquals("Cache is empty", 0, cache.size());
+ }
+
+
+ /**
+ * Test the SocketCache expiry.
+ * Verify that socket cache entries expire after the set
+ * expiry time.
+ */
+ @Test
+ public void testSocketCacheExpiry() throws Exception {
+ // Make a client
+ InetSocketAddress nnAddr =
+ new InetSocketAddress("localhost", cluster.getNameNodePort());
+ DFSClient client = new DFSClient(nnAddr, conf);
+
+ // Find out the DN addr
+ LocatedBlock block =
+ client.getNamenode().getBlockLocations(
+ testFile.toString(), 0, FILE_SIZE)
+ .getLocatedBlocks().get(0);
+ DataNode dn = util.getDataNode(block);
+ InetSocketAddress dnAddr = dn.getXferAddress();
+
+
+ // Make some sockets to the DN and put in cache
+ Socket[] dnSockets = new Socket[CACHE_SIZE];
+ for (int i = 0; i < dnSockets.length; ++i) {
+ dnSockets[i] = client.socketFactory.createSocket(
+ dnAddr.getAddress(), dnAddr.getPort());
+ cache.put(dnSockets[i], null);
+ }
+
+ // Client side still has the sockets cached
+ assertEquals(CACHE_SIZE, client.socketCache.size());
+
+ //sleep for a second and see if it expired
+ Thread.sleep(CACHE_EXPIRY_MS + 1000);
+
+ // Client side has no sockets cached
+ assertEquals(0, client.socketCache.size());
+
+ //sleep for another second and see if
+ //the daemon thread runs fine on empty cache
+ Thread.sleep(CACHE_EXPIRY_MS + 1000);
+ }
+
+
+ /**
* Read a file served entirely from one DN. Seek around and read from
* different offsets. And verify that they all use the same socket.
- * @throws Exception
+ *
+ * @throws java.io.IOException
*/
@Test
@SuppressWarnings("unchecked")
- public void testReadFromOneDN() throws Exception {
- BlockReaderTestUtil util = new BlockReaderTestUtil(1,
- new HdfsConfiguration());
- final Path testFile = new Path("/testConnCache.dat");
- byte authenticData[] = util.writeFile(testFile, FILE_SIZE / 1024);
+ public void testReadFromOneDN() throws IOException {
+ LOG.info("Starting testReadFromOneDN()");
DFSClient client = new DFSClient(
- new InetSocketAddress("localhost",
- util.getCluster().getNameNodePort()), util.getConf());
- DFSInputStream in = Mockito.spy(client.open(testFile.toString()));
+ new InetSocketAddress("localhost", cluster.getNameNodePort()), conf);
+ DFSInputStream in = spy(client.open(testFile.toString()));
LOG.info("opened " + testFile.toString());
+
byte[] dataBuf = new byte[BLOCK_SIZE];
MockGetBlockReader answer = new MockGetBlockReader();
@@ -142,15 +270,18 @@ public class TestConnCache {
Matchers.anyString());
// Initial read
- pread(in, 0, dataBuf, 0, dataBuf.length, authenticData);
+ pread(in, 0, dataBuf, 0, dataBuf.length);
// Read again and verify that the socket is the same
- pread(in, FILE_SIZE - dataBuf.length, dataBuf, 0, dataBuf.length,
- authenticData);
- pread(in, 1024, dataBuf, 0, dataBuf.length, authenticData);
- // No seek; just read
- pread(in, -1, dataBuf, 0, dataBuf.length, authenticData);
- pread(in, 64, dataBuf, 0, dataBuf.length / 2, authenticData);
+ pread(in, FILE_SIZE - dataBuf.length, dataBuf, 0, dataBuf.length);
+ pread(in, 1024, dataBuf, 0, dataBuf.length);
+ pread(in, -1, dataBuf, 0, dataBuf.length); // No seek; just read
+ pread(in, 64, dataBuf, 0, dataBuf.length / 2);
in.close();
}
+
+ @AfterClass
+ public static void teardownCluster() throws Exception {
+ util.shutdown();
+ }
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java?rev=1430662&r1=1430661&r2=1430662&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java Wed Jan 9 02:39:15 2013
@@ -35,7 +35,6 @@ import org.apache.hadoop.fs.FSDataInputS
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
-import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -93,13 +92,13 @@ public class TestDataTransferKeepalive {
DFSTestUtil.createFile(fs, TEST_FILE, 1L, (short)1, 0L);
// Clients that write aren't currently re-used.
- assertEquals(0, dfsClient.peerCache.size());
+ assertEquals(0, dfsClient.socketCache.size());
assertXceiverCount(0);
// Reads the file, so we should get a
// cached socket, and should have an xceiver on the other side.
DFSTestUtil.readFile(fs, TEST_FILE);
- assertEquals(1, dfsClient.peerCache.size());
+ assertEquals(1, dfsClient.socketCache.size());
assertXceiverCount(1);
// Sleep for a bit longer than the keepalive timeout
@@ -110,13 +109,13 @@ public class TestDataTransferKeepalive {
// The socket is still in the cache, because we don't
// notice that it's closed until we try to read
// from it again.
- assertEquals(1, dfsClient.peerCache.size());
+ assertEquals(1, dfsClient.socketCache.size());
// Take it out of the cache - reading should
// give an EOF.
- Peer peer = dfsClient.peerCache.get(dn.getDatanodeId());
- assertNotNull(peer);
- assertEquals(-1, peer.getInputStream().read());
+ Socket s = dfsClient.socketCache.get(dnAddr).sock;
+ assertNotNull(s);
+ assertEquals(-1, NetUtils.getInputStream(s).read());
}
/**
@@ -175,14 +174,14 @@ public class TestDataTransferKeepalive {
}
DFSClient client = ((DistributedFileSystem)fs).dfs;
- assertEquals(5, client.peerCache.size());
+ assertEquals(5, client.socketCache.size());
// Let all the xceivers timeout
Thread.sleep(1500);
assertXceiverCount(0);
// Client side still has the sockets cached
- assertEquals(5, client.peerCache.size());
+ assertEquals(5, client.socketCache.size());
// Reading should not throw an exception.
DFSTestUtil.readFile(fs, TEST_FILE);
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java?rev=1430662&r1=1430661&r2=1430662&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java Wed Jan 9 02:39:15 2013
@@ -43,7 +43,6 @@ import org.apache.hadoop.hdfs.DFSClient.
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -148,10 +147,9 @@ public class TestBlockTokenWithDFS {
"test-blockpoolid", block.getBlockId());
blockReader = BlockReaderFactory.newBlockReader(
new BlockReaderFactory.Params(new Conf(conf)).
- setPeer(TcpPeerServer.peerFromSocket(s)).
- setBlock(block).setFile(file).
+ setSocket(s).setBlock(block).setFile(file).
setBlockToken(lblock.getBlockToken()).setStartOffset(0).
- setLen(-1).setDatanodeID(nodes[0]));
+ setLen(-1));
} catch (IOException ex) {
if (ex instanceof InvalidBlockTokenException) {
assertFalse("OP_READ_BLOCK: access token is invalid, "
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=1430662&r1=1430661&r2=1430662&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Wed Jan 9 02:39:15 2013
@@ -39,7 +39,6 @@ import org.apache.hadoop.hdfs.DFSConfigK
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -285,9 +284,8 @@ public class TestDataNodeVolumeFailure {
setFile(BlockReaderFactory.getFileName(targetAddr,
"test-blockpoolid", block.getBlockId())).
setBlock(block).setBlockToken(lblock.getBlockToken()).
- setPeer(TcpPeerServer.peerFromSocket(s)).
- setDatanodeID(datanode));
- blockReader.close(null);
+ setSocket(s));
+ blockReader.close();
}
/**