You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2013/01/09 22:34:14 UTC
svn commit: r1431097 [2/2] - in
/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/net/
src/main/java/org/apache/hadoop/hdfs/server/common/ src/main/java/org/a...
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1431097&r1=1431096&r2=1431097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Wed Jan 9 21:34:13 2013
@@ -39,6 +39,7 @@ import java.nio.channels.ClosedChannelEx
import java.util.Arrays;
import org.apache.commons.logging.Log;
+import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -64,7 +65,6 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.net.SocketInputWrapper;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
@@ -79,8 +79,7 @@ class DataXceiver extends Receiver imple
public static final Log LOG = DataNode.LOG;
static final Log ClientTraceLog = DataNode.ClientTraceLog;
- private final Socket s;
- private final boolean isLocal; //is a local connection?
+ private final Peer peer;
private final String remoteAddress; // address of remote side
private final String localAddress; // local address of this daemon
private final DataNode datanode;
@@ -88,7 +87,7 @@ class DataXceiver extends Receiver imple
private final DataXceiverServer dataXceiverServer;
private final boolean connectToDnViaHostname;
private long opStartTime; //the start time of receiving an Op
- private final SocketInputWrapper socketIn;
+ private final InputStream socketIn;
private OutputStream socketOut;
/**
@@ -97,25 +96,23 @@ class DataXceiver extends Receiver imple
*/
private String previousOpClientName;
- public static DataXceiver create(Socket s, DataNode dn,
+ public static DataXceiver create(Peer peer, DataNode dn,
DataXceiverServer dataXceiverServer) throws IOException {
- return new DataXceiver(s, dn, dataXceiverServer);
+ return new DataXceiver(peer, dn, dataXceiverServer);
}
- private DataXceiver(Socket s,
- DataNode datanode,
+ private DataXceiver(Peer peer, DataNode datanode,
DataXceiverServer dataXceiverServer) throws IOException {
- this.s = s;
+ this.peer = peer;
this.dnConf = datanode.getDnConf();
- this.socketIn = NetUtils.getInputStream(s);
- this.socketOut = NetUtils.getOutputStream(s, dnConf.socketWriteTimeout);
- this.isLocal = s.getInetAddress().equals(s.getLocalAddress());
+ this.socketIn = peer.getInputStream();
+ this.socketOut = peer.getOutputStream();
this.datanode = datanode;
this.dataXceiverServer = dataXceiverServer;
this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname;
- remoteAddress = s.getRemoteSocketAddress().toString();
- localAddress = s.getLocalSocketAddress().toString();
+ remoteAddress = peer.getRemoteAddressString();
+ localAddress = peer.getLocalAddressString();
if (LOG.isDebugEnabled()) {
LOG.debug("Number of active connections is: "
@@ -155,11 +152,10 @@ class DataXceiver extends Receiver imple
public void run() {
int opsProcessed = 0;
Op op = null;
-
- dataXceiverServer.childSockets.add(s);
-
+
+ dataXceiverServer.addPeer(peer);
try {
-
+ peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
InputStream input = socketIn;
if (dnConf.encryptDataTransfer) {
IOStreamPair encryptedStreams = null;
@@ -169,8 +165,9 @@ class DataXceiver extends Receiver imple
dnConf.encryptionAlgorithm);
} catch (InvalidMagicNumberException imne) {
LOG.info("Failed to read expected encryption handshake from client " +
- "at " + s.getInetAddress() + ". Perhaps the client is running an " +
- "older version of Hadoop which does not support encryption");
+ "at " + peer.getRemoteAddressString() + ". Perhaps the client " +
+ "is running an older version of Hadoop which does not support " +
+ "encryption");
return;
}
input = encryptedStreams.in;
@@ -189,9 +186,9 @@ class DataXceiver extends Receiver imple
try {
if (opsProcessed != 0) {
assert dnConf.socketKeepaliveTimeout > 0;
- socketIn.setTimeout(dnConf.socketKeepaliveTimeout);
+ peer.setReadTimeout(dnConf.socketKeepaliveTimeout);
} else {
- socketIn.setTimeout(dnConf.socketTimeout);
+ peer.setReadTimeout(dnConf.socketTimeout);
}
op = readOp();
} catch (InterruptedIOException ignored) {
@@ -202,7 +199,7 @@ class DataXceiver extends Receiver imple
if (opsProcessed > 0 &&
(err instanceof EOFException || err instanceof ClosedChannelException)) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Cached " + s.toString() + " closing after " + opsProcessed + " ops");
+ LOG.debug("Cached " + peer + " closing after " + opsProcessed + " ops");
}
} else {
throw err;
@@ -212,13 +209,13 @@ class DataXceiver extends Receiver imple
// restore normal timeout
if (opsProcessed != 0) {
- s.setSoTimeout(dnConf.socketTimeout);
+ peer.setReadTimeout(dnConf.socketTimeout);
}
opStartTime = now();
processOp(op);
++opsProcessed;
- } while (!s.isClosed() && dnConf.socketKeepaliveTimeout > 0);
+ } while (!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0);
} catch (Throwable t) {
LOG.error(datanode.getDisplayName() + ":DataXceiver error processing " +
((op == null) ? "unknown" : op.name()) + " operation " +
@@ -230,9 +227,8 @@ class DataXceiver extends Receiver imple
+ datanode.getXceiverCount());
}
updateCurrentThreadName("Cleaning up");
+ dataXceiverServer.closePeer(peer);
IOUtils.closeStream(in);
- IOUtils.closeSocket(s);
- dataXceiverServer.childSockets.remove(s);
}
}
@@ -286,8 +282,9 @@ class DataXceiver extends Receiver imple
ClientReadStatusProto stat = ClientReadStatusProto.parseFrom(
HdfsProtoUtil.vintPrefixed(in));
if (!stat.hasStatus()) {
- LOG.warn("Client " + s.getInetAddress() + " did not send a valid status " +
- "code after reading. Will close connection.");
+ LOG.warn("Client " + peer.getRemoteAddressString() +
+ " did not send a valid status code after reading. " +
+ "Will close connection.");
IOUtils.closeStream(out);
}
} catch (IOException ioe) {
@@ -320,7 +317,7 @@ class DataXceiver extends Receiver imple
//update metrics
datanode.metrics.addReadBlockOp(elapsed());
- datanode.metrics.incrReadsFromClient(isLocal);
+ datanode.metrics.incrReadsFromClient(peer.isLocal());
}
@Override
@@ -358,8 +355,8 @@ class DataXceiver extends Receiver imple
LOG.debug("isDatanode=" + isDatanode
+ ", isClient=" + isClient
+ ", isTransfer=" + isTransfer);
- LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() +
- " tcp no delay " + s.getTcpNoDelay());
+ LOG.debug("writeBlock receive buf size " + peer.getReceiveBufferSize() +
+ " tcp no delay " + peer.getTcpNoDelay());
}
// We later mutate block's generation stamp and length, but we need to
@@ -390,8 +387,8 @@ class DataXceiver extends Receiver imple
stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
// open a block receiver
blockReceiver = new BlockReceiver(block, in,
- s.getRemoteSocketAddress().toString(),
- s.getLocalSocketAddress().toString(),
+ peer.getRemoteAddressString(),
+ peer.getLocalAddressString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode, requestedChecksum);
} else {
@@ -546,7 +543,7 @@ class DataXceiver extends Receiver imple
//update metrics
datanode.metrics.addWriteBlockOp(elapsed());
- datanode.metrics.incrWritesFromClient(isLocal);
+ datanode.metrics.incrWritesFromClient(peer.isLocal());
}
@Override
@@ -554,7 +551,7 @@ class DataXceiver extends Receiver imple
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final DatanodeInfo[] targets) throws IOException {
- checkAccess(null, true, blk, blockToken,
+ checkAccess(socketOut, true, blk, blockToken,
Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY);
previousOpClientName = clientName;
updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk);
@@ -641,8 +638,9 @@ class DataXceiver extends Receiver imple
}
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
- String msg = "Not able to copy block " + block.getBlockId() + " to "
- + s.getRemoteSocketAddress() + " because threads quota is exceeded.";
+ String msg = "Not able to copy block " + block.getBlockId() + " " +
+ "to " + peer.getRemoteAddressString() + " because threads " +
+ "quota is exceeded.";
LOG.info(msg);
sendResponse(ERROR, msg);
return;
@@ -671,7 +669,7 @@ class DataXceiver extends Receiver imple
datanode.metrics.incrBytesRead((int) read);
datanode.metrics.incrBlocksRead();
- LOG.info("Copied " + block + " to " + s.getRemoteSocketAddress());
+ LOG.info("Copied " + block + " to " + peer.getRemoteAddressString());
} catch (IOException ioe) {
isOpSuccess = false;
LOG.info("opCopyBlock " + block + " received exception " + ioe);
@@ -716,8 +714,9 @@ class DataXceiver extends Receiver imple
}
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
- String msg = "Not able to receive block " + block.getBlockId() + " from "
- + s.getRemoteSocketAddress() + " because threads quota is exceeded.";
+ String msg = "Not able to receive block " + block.getBlockId() +
+ " from " + peer.getRemoteAddressString() + " because threads " +
+ "quota is exceeded.";
LOG.warn(msg);
sendResponse(ERROR, msg);
return;
@@ -794,7 +793,7 @@ class DataXceiver extends Receiver imple
// notify name node
datanode.notifyNamenodeReceivedBlock(block, delHint);
- LOG.info("Moved " + block + " from " + s.getRemoteSocketAddress());
+ LOG.info("Moved " + block + " from " + peer.getRemoteAddressString());
} catch (IOException ioe) {
opStatus = ERROR;
@@ -817,7 +816,7 @@ class DataXceiver extends Receiver imple
try {
sendResponse(opStatus, errMsg);
} catch (IOException ioe) {
- LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress());
+ LOG.warn("Error writing reply back to " + peer.getRemoteAddressString());
}
IOUtils.closeStream(proxyOut);
IOUtils.closeStream(blockReceiver);
@@ -871,7 +870,7 @@ class DataXceiver extends Receiver imple
}
- private void checkAccess(DataOutputStream out, final boolean reply,
+ private void checkAccess(OutputStream out, final boolean reply,
final ExtendedBlock blk,
final Token<BlockTokenIdentifier> t,
final Op op,
@@ -886,11 +885,6 @@ class DataXceiver extends Receiver imple
} catch(InvalidToken e) {
try {
if (reply) {
- if (out == null) {
- out = new DataOutputStream(
- NetUtils.getOutputStream(s, dnConf.socketWriteTimeout));
- }
-
BlockOpResponseProto.Builder resp = BlockOpResponseProto.newBuilder()
.setStatus(ERROR_ACCESS_TOKEN);
if (mode == BlockTokenSecretManager.AccessMode.WRITE) {
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=1431097&r1=1431096&r2=1431097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Wed Jan 9 21:34:13 2013
@@ -18,18 +18,16 @@
package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
-import java.net.ServerSocket;
-import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.channels.AsynchronousCloseException;
-import java.util.Collections;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.net.PeerServer;
import org.apache.hadoop.hdfs.server.balancer.Balancer;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
@@ -45,11 +43,9 @@ import org.apache.hadoop.util.Daemon;
class DataXceiverServer implements Runnable {
public static final Log LOG = DataNode.LOG;
- ServerSocket ss;
- DataNode datanode;
- // Record all sockets opened for data transfer
- Set<Socket> childSockets = Collections.synchronizedSet(
- new HashSet<Socket>());
+ private final PeerServer peerServer;
+ private final DataNode datanode;
+ private final Set<Peer> peers = new HashSet<Peer>();
/**
* Maximal number of concurrent xceivers per node.
@@ -109,10 +105,10 @@ class DataXceiverServer implements Runna
long estimateBlockSize;
- DataXceiverServer(ServerSocket ss, Configuration conf,
+ DataXceiverServer(PeerServer peerServer, Configuration conf,
DataNode datanode) {
- this.ss = ss;
+ this.peerServer = peerServer;
this.datanode = datanode;
this.maxXceiverCount =
@@ -130,12 +126,10 @@ class DataXceiverServer implements Runna
@Override
public void run() {
+ Peer peer = null;
while (datanode.shouldRun) {
- Socket s = null;
try {
- s = ss.accept();
- s.setTcpNoDelay(true);
- // Timeouts are set within DataXceiver.run()
+ peer = peerServer.accept();
// Make sure the xceiver count is not exceeded
int curXceiverCount = datanode.getXceiverCount();
@@ -146,7 +140,7 @@ class DataXceiverServer implements Runna
}
new Daemon(datanode.threadGroup,
- DataXceiver.create(s, datanode, this))
+ DataXceiver.create(peer, datanode, this))
.start();
} catch (SocketTimeoutException ignored) {
// wake up to see if should continue to run
@@ -157,10 +151,10 @@ class DataXceiverServer implements Runna
LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ace);
}
} catch (IOException ie) {
- IOUtils.closeSocket(s);
+ IOUtils.cleanup(null, peer);
LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ie);
} catch (OutOfMemoryError ie) {
- IOUtils.closeSocket(s);
+ IOUtils.cleanup(null, peer);
// DataNode can run out of memory if there is too many transfers.
// Log the event, Sleep for 30 seconds, other transfers may complete by
// then.
@@ -176,33 +170,35 @@ class DataXceiverServer implements Runna
datanode.shouldRun = false;
}
}
+ synchronized (this) {
+ for (Peer p : peers) {
+ IOUtils.cleanup(LOG, p);
+ }
+ }
try {
- ss.close();
+ peerServer.close();
} catch (IOException ie) {
LOG.warn(datanode.getDisplayName()
+ " :DataXceiverServer: close exception", ie);
}
}
-
+
void kill() {
assert datanode.shouldRun == false :
"shoudRun should be set to false before killing";
try {
- this.ss.close();
+ this.peerServer.close();
} catch (IOException ie) {
LOG.warn(datanode.getDisplayName() + ":DataXceiverServer.kill(): ", ie);
}
+ }
+
+ synchronized void addPeer(Peer peer) {
+ peers.add(peer);
+ }
- // close all the sockets that were accepted earlier
- synchronized (childSockets) {
- for (Iterator<Socket> it = childSockets.iterator();
- it.hasNext();) {
- Socket thissock = it.next();
- try {
- thissock.close();
- } catch (IOException e) {
- }
- }
- }
+ synchronized void closePeer(Peer peer) {
+ peers.remove(peer);
+ IOUtils.cleanup(null, peer);
}
}
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1431097&r1=1431096&r2=1431097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Wed Jan 9 21:34:13 2013
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.BlockReade
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -559,9 +560,10 @@ public class NamenodeFsck {
String file = BlockReaderFactory.getFileName(targetAddr, block.getBlockPoolId(),
block.getBlockId());
blockReader = BlockReaderFactory.newBlockReader(
- conf, s, file, block, lblock
- .getBlockToken(), 0, -1,
- namenode.getRpcServer().getDataEncryptionKey());
+ conf, file, block, lblock.getBlockToken(), 0, -1, true, "fsck",
+ TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer().
+ getDataEncryptionKey()),
+ chosenNode);
} catch (IOException ex) {
// Put chosen node into dead list, continue
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java?rev=1431097&r1=1431096&r2=1431097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java Wed Jan 9 21:34:13 2013
@@ -31,6 +31,7 @@ import java.util.Random;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -150,12 +151,12 @@ public class BlockReaderTestUtil {
sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
return BlockReaderFactory.newBlockReader(
- new DFSClient.Conf(conf),
- sock, targetAddr.toString()+ ":" + block.getBlockId(), block,
+ conf,
+ targetAddr.toString()+ ":" + block.getBlockId(), block,
testBlock.getBlockToken(),
offset, lenToRead,
- conf.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
- true, "", null, null);
+ true, "BlockReaderTestUtil", TcpPeerServer.peerFromSocket(sock),
+ nodes[0]);
}
/**
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java?rev=1431097&r1=1431096&r2=1431097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java Wed Jan 9 21:34:13 2013
@@ -61,7 +61,7 @@ public class TestClientBlockVerification
util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true);
verify(reader).sendReadResult(Status.CHECKSUM_OK);
- reader.close();
+ reader.close(null);
}
/**
@@ -76,7 +76,7 @@ public class TestClientBlockVerification
// We asked the blockreader for the whole file, and only read
// half of it, so no CHECKSUM_OK
verify(reader, never()).sendReadResult(Status.CHECKSUM_OK);
- reader.close();
+ reader.close(null);
}
/**
@@ -92,7 +92,7 @@ public class TestClientBlockVerification
// And read half the file
util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true);
verify(reader).sendReadResult(Status.CHECKSUM_OK);
- reader.close();
+ reader.close(null);
}
/**
@@ -111,7 +111,7 @@ public class TestClientBlockVerification
util.getBlockReader(testBlock, startOffset, length));
util.readAndCheckEOS(reader, length, true);
verify(reader).sendReadResult(Status.CHECKSUM_OK);
- reader.close();
+ reader.close(null);
}
}
}
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java?rev=1431097&r1=1431096&r2=1431097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java Wed Jan 9 21:34:13 2013
@@ -18,28 +18,20 @@
package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.spy;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.security.PrivilegedExceptionAction;
+
+import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.security.token.Token;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
@@ -55,59 +47,31 @@ public class TestConnCache {
static final int BLOCK_SIZE = 4096;
static final int FILE_SIZE = 3 * BLOCK_SIZE;
- final static int CACHE_SIZE = 4;
- final static long CACHE_EXPIRY_MS = 200;
- static Configuration conf = null;
- static MiniDFSCluster cluster = null;
- static FileSystem fs = null;
- static SocketCache cache;
-
- static final Path testFile = new Path("/testConnCache.dat");
- static byte authenticData[] = null;
-
- static BlockReaderTestUtil util = null;
-
/**
* A mock Answer to remember the BlockReader used.
*
* It verifies that all invocation to DFSInputStream.getBlockReader()
- * use the same socket.
+ * use the same peer.
*/
private class MockGetBlockReader implements Answer<RemoteBlockReader2> {
public RemoteBlockReader2 reader = null;
- private Socket sock = null;
+ private Peer peer = null;
@Override
public RemoteBlockReader2 answer(InvocationOnMock invocation) throws Throwable {
RemoteBlockReader2 prevReader = reader;
reader = (RemoteBlockReader2) invocation.callRealMethod();
- if (sock == null) {
- sock = reader.dnSock;
+ if (peer == null) {
+ peer = reader.getPeer();
} else if (prevReader != null) {
- assertSame("DFSInputStream should use the same socket",
- sock, reader.dnSock);
+ Assert.assertSame("DFSInputStream should use the same peer",
+ peer, reader.getPeer());
}
return reader;
}
}
- @BeforeClass
- public static void setupCluster() throws Exception {
- final int REPLICATION_FACTOR = 1;
-
- /* create a socket cache. There is only one socket cache per jvm */
- cache = SocketCache.getInstance(CACHE_SIZE, CACHE_EXPIRY_MS);
-
- util = new BlockReaderTestUtil(REPLICATION_FACTOR);
- cluster = util.getCluster();
- conf = util.getConf();
- fs = cluster.getFileSystem();
-
- authenticData = util.writeFile(testFile, FILE_SIZE / 1024);
- }
-
-
/**
* (Optionally) seek to position, read and verify data.
*
@@ -117,9 +81,10 @@ public class TestConnCache {
long pos,
byte[] buffer,
int offset,
- int length)
+ int length,
+ byte[] authenticData)
throws IOException {
- assertTrue("Test buffer too small", buffer.length >= offset + length);
+ Assert.assertTrue("Test buffer too small", buffer.length >= offset + length);
if (pos >= 0)
in.seek(pos);
@@ -129,7 +94,7 @@ public class TestConnCache {
while (length > 0) {
int cnt = in.read(buffer, offset, length);
- assertTrue("Error in read", cnt > 0);
+ Assert.assertTrue("Error in read", cnt > 0);
offset += cnt;
length -= cnt;
}
@@ -145,115 +110,22 @@ public class TestConnCache {
}
/**
- * Test the SocketCache itself.
- */
- @Test
- public void testSocketCache() throws Exception {
- // Make a client
- InetSocketAddress nnAddr =
- new InetSocketAddress("localhost", cluster.getNameNodePort());
- DFSClient client = new DFSClient(nnAddr, conf);
-
- // Find out the DN addr
- LocatedBlock block =
- client.getNamenode().getBlockLocations(
- testFile.toString(), 0, FILE_SIZE)
- .getLocatedBlocks().get(0);
- DataNode dn = util.getDataNode(block);
- InetSocketAddress dnAddr = dn.getXferAddress();
-
-
- // Make some sockets to the DN
- Socket[] dnSockets = new Socket[CACHE_SIZE];
- for (int i = 0; i < dnSockets.length; ++i) {
- dnSockets[i] = client.socketFactory.createSocket(
- dnAddr.getAddress(), dnAddr.getPort());
- }
-
-
- // Insert a socket to the NN
- Socket nnSock = new Socket(nnAddr.getAddress(), nnAddr.getPort());
- cache.put(nnSock, null);
- assertSame("Read the write", nnSock, cache.get(nnAddr).sock);
- cache.put(nnSock, null);
-
- // Insert DN socks
- for (Socket dnSock : dnSockets) {
- cache.put(dnSock, null);
- }
-
- assertEquals("NN socket evicted", null, cache.get(nnAddr));
- assertTrue("Evicted socket closed", nnSock.isClosed());
-
- // Lookup the DN socks
- for (Socket dnSock : dnSockets) {
- assertEquals("Retrieve cached sockets", dnSock, cache.get(dnAddr).sock);
- dnSock.close();
- }
-
- assertEquals("Cache is empty", 0, cache.size());
- }
-
-
- /**
- * Test the SocketCache expiry.
- * Verify that socket cache entries expire after the set
- * expiry time.
- */
- @Test
- public void testSocketCacheExpiry() throws Exception {
- // Make a client
- InetSocketAddress nnAddr =
- new InetSocketAddress("localhost", cluster.getNameNodePort());
- DFSClient client = new DFSClient(nnAddr, conf);
-
- // Find out the DN addr
- LocatedBlock block =
- client.getNamenode().getBlockLocations(
- testFile.toString(), 0, FILE_SIZE)
- .getLocatedBlocks().get(0);
- DataNode dn = util.getDataNode(block);
- InetSocketAddress dnAddr = dn.getXferAddress();
-
-
- // Make some sockets to the DN and put in cache
- Socket[] dnSockets = new Socket[CACHE_SIZE];
- for (int i = 0; i < dnSockets.length; ++i) {
- dnSockets[i] = client.socketFactory.createSocket(
- dnAddr.getAddress(), dnAddr.getPort());
- cache.put(dnSockets[i], null);
- }
-
- // Client side still has the sockets cached
- assertEquals(CACHE_SIZE, client.socketCache.size());
-
- //sleep for a second and see if it expired
- Thread.sleep(CACHE_EXPIRY_MS + 1000);
-
- // Client side has no sockets cached
- assertEquals(0, client.socketCache.size());
-
- //sleep for another second and see if
- //the daemon thread runs fine on empty cache
- Thread.sleep(CACHE_EXPIRY_MS + 1000);
- }
-
-
- /**
* Read a file served entirely from one DN. Seek around and read from
* different offsets. And verify that they all use the same socket.
- *
- * @throws java.io.IOException
+ * @throws Exception
*/
@Test
@SuppressWarnings("unchecked")
- public void testReadFromOneDN() throws IOException {
- LOG.info("Starting testReadFromOneDN()");
+ public void testReadFromOneDN() throws Exception {
+ BlockReaderTestUtil util = new BlockReaderTestUtil(1,
+ new HdfsConfiguration());
+ final Path testFile = new Path("/testConnCache.dat");
+ byte authenticData[] = util.writeFile(testFile, FILE_SIZE / 1024);
DFSClient client = new DFSClient(
- new InetSocketAddress("localhost", cluster.getNameNodePort()), conf);
- DFSInputStream in = spy(client.open(testFile.toString()));
+ new InetSocketAddress("localhost",
+ util.getCluster().getNameNodePort()), util.getConf());
+ DFSInputStream in = Mockito.spy(client.open(testFile.toString()));
LOG.info("opened " + testFile.toString());
-
byte[] dataBuf = new byte[BLOCK_SIZE];
MockGetBlockReader answer = new MockGetBlockReader();
@@ -270,18 +142,15 @@ public class TestConnCache {
Matchers.anyString());
// Initial read
- pread(in, 0, dataBuf, 0, dataBuf.length);
+ pread(in, 0, dataBuf, 0, dataBuf.length, authenticData);
// Read again and verify that the socket is the same
- pread(in, FILE_SIZE - dataBuf.length, dataBuf, 0, dataBuf.length);
- pread(in, 1024, dataBuf, 0, dataBuf.length);
- pread(in, -1, dataBuf, 0, dataBuf.length); // No seek; just read
- pread(in, 64, dataBuf, 0, dataBuf.length / 2);
+ pread(in, FILE_SIZE - dataBuf.length, dataBuf, 0, dataBuf.length,
+ authenticData);
+ pread(in, 1024, dataBuf, 0, dataBuf.length, authenticData);
+ // No seek; just read
+ pread(in, -1, dataBuf, 0, dataBuf.length, authenticData);
+ pread(in, 64, dataBuf, 0, dataBuf.length / 2, authenticData);
in.close();
}
-
- @AfterClass
- public static void teardownCluster() throws Exception {
- util.shutdown();
- }
}
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java?rev=1431097&r1=1431096&r2=1431097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java Wed Jan 9 21:34:13 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FSDataInputS
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
+import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -92,13 +93,13 @@ public class TestDataTransferKeepalive {
DFSTestUtil.createFile(fs, TEST_FILE, 1L, (short)1, 0L);
// Clients that write aren't currently re-used.
- assertEquals(0, dfsClient.socketCache.size());
+ assertEquals(0, dfsClient.peerCache.size());
assertXceiverCount(0);
// Reads the file, so we should get a
// cached socket, and should have an xceiver on the other side.
DFSTestUtil.readFile(fs, TEST_FILE);
- assertEquals(1, dfsClient.socketCache.size());
+ assertEquals(1, dfsClient.peerCache.size());
assertXceiverCount(1);
// Sleep for a bit longer than the keepalive timeout
@@ -109,13 +110,13 @@ public class TestDataTransferKeepalive {
// The socket is still in the cache, because we don't
// notice that it's closed until we try to read
// from it again.
- assertEquals(1, dfsClient.socketCache.size());
+ assertEquals(1, dfsClient.peerCache.size());
// Take it out of the cache - reading should
// give an EOF.
- Socket s = dfsClient.socketCache.get(dnAddr).sock;
- assertNotNull(s);
- assertEquals(-1, NetUtils.getInputStream(s).read());
+ Peer peer = dfsClient.peerCache.get(dn.getDatanodeId());
+ assertNotNull(peer);
+ assertEquals(-1, peer.getInputStream().read());
}
/**
@@ -174,14 +175,14 @@ public class TestDataTransferKeepalive {
}
DFSClient client = ((DistributedFileSystem)fs).dfs;
- assertEquals(5, client.socketCache.size());
+ assertEquals(5, client.peerCache.size());
// Let all the xceivers timeout
Thread.sleep(1500);
assertXceiverCount(0);
// Client side still has the sockets cached
- assertEquals(5, client.socketCache.size());
+ assertEquals(5, client.peerCache.size());
// Reading should not throw an exception.
DFSTestUtil.readFile(fs, TEST_FILE);
Added: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java?rev=1431097&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java (added)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java Wed Jan 9 21:34:13 2013
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+/**
+ * This class tests disabling client connection caching in a single node
+ * mini-cluster.
+ */
+public class TestDisableConnCache {
+ static final Log LOG = LogFactory.getLog(TestDisableConnCache.class);
+
+ static final int BLOCK_SIZE = 4096;
+ static final int FILE_SIZE = 3 * BLOCK_SIZE;
+
+ /**
+ * Test that the socket cache can be disabled by setting the capacity to
+ * 0. Regression test for HDFS-3365.
+ * @throws Exception
+ */
+ @Test
+ public void testDisableCache() throws Exception {
+ HdfsConfiguration confWithoutCache = new HdfsConfiguration();
+ // Configure a new instance with no peer caching, ensure that it doesn't
+ // cache anything
+ confWithoutCache.setInt(
+ DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, 0);
+ BlockReaderTestUtil util = new BlockReaderTestUtil(1, confWithoutCache);
+ final Path testFile = new Path("/testConnCache.dat");
+ util.writeFile(testFile, FILE_SIZE / 1024);
+ FileSystem fsWithoutCache = FileSystem.newInstance(util.getConf());
+ try {
+ DFSTestUtil.readFile(fsWithoutCache, testFile);
+ assertEquals(0, ((DistributedFileSystem)fsWithoutCache).dfs.peerCache.size());
+ } finally {
+ fsWithoutCache.close();
+ util.shutdown();
+ }
+ }
+}
\ No newline at end of file
Added: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java?rev=1431097&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java (added)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java Wed Jan 9 21:34:13 2013
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.ReadableByteChannel;
+import java.util.HashSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.junit.Test;
+
+public class TestPeerCache {
+ static final Log LOG = LogFactory.getLog(TestPeerCache.class);
+
+ private static final int CAPACITY = 3;
+ private static final int EXPIRY_PERIOD = 20;
+ private static PeerCache cache =
+ PeerCache.getInstance(CAPACITY, EXPIRY_PERIOD);
+
+ private static class FakePeer implements Peer {
+ private boolean closed = false;
+
+ private DatanodeID dnId;
+
+ public FakePeer(DatanodeID dnId) {
+ this.dnId = dnId;
+ }
+
+ @Override
+ public ReadableByteChannel getInputStreamChannel() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setReadTimeout(int timeoutMs) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getReceiveBufferSize() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean getTcpNoDelay() throws IOException {
+ return false;
+ }
+
+ @Override
+ public void setWriteTimeout(int timeoutMs) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isClosed() {
+ return closed;
+ }
+
+ @Override
+ public void close() throws IOException {
+ closed = true;
+ }
+
+ @Override
+ public String getRemoteAddressString() {
+ return dnId.getInfoAddr();
+ }
+
+ @Override
+ public String getLocalAddressString() {
+ return "127.0.0.1:123";
+ }
+
+ @Override
+ public InputStream getInputStream() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public OutputStream getOutputStream() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isLocal() {
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "FakePeer(dnId=" + dnId + ")";
+ }
+ }
+
+ @Test
+ public void testAddAndRetrieve() throws Exception {
+ DatanodeID dnId = new DatanodeID("192.168.0.1",
+ "fakehostname", "fake_storage_id",
+ 100, 101, 102);
+ FakePeer peer = new FakePeer(dnId);
+ cache.put(dnId, peer);
+ assertTrue(!peer.isClosed());
+ assertEquals(1, cache.size());
+ assertEquals(peer, cache.get(dnId));
+ assertEquals(0, cache.size());
+ cache.clear();
+ }
+
+ @Test
+ public void testExpiry() throws Exception {
+ DatanodeID dnIds[] = new DatanodeID[CAPACITY];
+ FakePeer peers[] = new FakePeer[CAPACITY];
+ for (int i = 0; i < CAPACITY; ++i) {
+ dnIds[i] = new DatanodeID("192.168.0.1",
+ "fakehostname_" + i, "fake_storage_id",
+ 100, 101, 102);
+ peers[i] = new FakePeer(dnIds[i]);
+ }
+ for (int i = 0; i < CAPACITY; ++i) {
+ cache.put(dnIds[i], peers[i]);
+ }
+ // Check that the peers are cached
+ assertEquals(CAPACITY, cache.size());
+
+ // Wait for the peers to expire
+ Thread.sleep(EXPIRY_PERIOD * 50);
+ assertEquals(0, cache.size());
+
+ // make sure that the peers were closed when they were expired
+ for (int i = 0; i < CAPACITY; ++i) {
+ assertTrue(peers[i].isClosed());
+ }
+
+ // sleep for another second and see if
+ // the daemon thread runs fine on empty cache
+ Thread.sleep(EXPIRY_PERIOD * 50);
+ cache.clear();
+ }
+
+ @Test
+ public void testEviction() throws Exception {
+ DatanodeID dnIds[] = new DatanodeID[CAPACITY + 1];
+ FakePeer peers[] = new FakePeer[CAPACITY + 1];
+ for (int i = 0; i < dnIds.length; ++i) {
+ dnIds[i] = new DatanodeID("192.168.0.1",
+ "fakehostname_" + i, "fake_storage_id_" + i,
+ 100, 101, 102);
+ peers[i] = new FakePeer(dnIds[i]);
+ }
+ for (int i = 0; i < CAPACITY; ++i) {
+ cache.put(dnIds[i], peers[i]);
+ }
+ // Check that the peers are cached
+ assertEquals(CAPACITY, cache.size());
+
+ // Add another entry and check that the first entry was evicted
+ cache.put(dnIds[CAPACITY], peers[CAPACITY]);
+ assertEquals(CAPACITY, cache.size());
+ assertSame(null, cache.get(dnIds[0]));
+
+ // Make sure that the other entries are still there
+ for (int i = 1; i < CAPACITY; ++i) {
+ Peer peer = cache.get(dnIds[i]);
+ assertSame(peers[i], peer);
+ assertTrue(!peer.isClosed());
+ peer.close();
+ }
+ assertEquals(1, cache.size());
+ cache.clear();
+ }
+
+ @Test
+ public void testMultiplePeersWithSameDnId() throws Exception {
+ DatanodeID dnId = new DatanodeID("192.168.0.1",
+ "fakehostname", "fake_storage_id",
+ 100, 101, 102);
+ HashSet<FakePeer> peers = new HashSet<FakePeer>(CAPACITY);
+ for (int i = 0; i < CAPACITY; ++i) {
+ FakePeer peer = new FakePeer(dnId);
+ peers.add(peer);
+ cache.put(dnId, peer);
+ }
+ // Check that all of the peers ended up in the cache
+ assertEquals(CAPACITY, cache.size());
+ while (!peers.isEmpty()) {
+ Peer peer = cache.get(dnId);
+ assertTrue(peer != null);
+ assertTrue(!peer.isClosed());
+ peers.remove(peer);
+ }
+ assertEquals(0, cache.size());
+ cache.clear();
+ }
+}
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java?rev=1431097&r1=1431096&r2=1431097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java Wed Jan 9 21:34:13 2013
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -145,8 +146,9 @@ public class TestBlockTokenWithDFS {
String file = BlockReaderFactory.getFileName(targetAddr,
"test-blockpoolid", block.getBlockId());
blockReader = BlockReaderFactory.newBlockReader(
- conf, s, file, block,
- lblock.getBlockToken(), 0, -1, null);
+ conf, file, block, lblock.getBlockToken(), 0, -1,
+ true, "TestBlockTokenWithDFS", TcpPeerServer.peerFromSocket(s),
+ nodes[0]);
} catch (IOException ex) {
if (ex instanceof InvalidBlockTokenException) {
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=1431097&r1=1431096&r2=1431097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Wed Jan 9 21:34:13 2013
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.DFSConfigK
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -280,8 +281,9 @@ public class TestDataNodeVolumeFailure {
String file = BlockReaderFactory.getFileName(targetAddr,
"test-blockpoolid",
block.getBlockId());
- BlockReaderFactory.newBlockReader(conf, s, file, block, lblock
- .getBlockToken(), 0, -1, null);
+ BlockReaderFactory.newBlockReader(conf, file, block,
+ lblock.getBlockToken(), 0, -1, true, "TestDataNodeVolumeFailure",
+ TcpPeerServer.peerFromSocket(s), datanode);
// nothing - if it fails - it will throw and exception
}