You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2016/06/29 19:46:19 UTC
hadoop git commit: HDFS-9805. Add server-side configuration for
enabling TCP_NODELAY for DataTransferProtocol and default it to true (Gary
Helmling via cmccabe)
Repository: hadoop
Updated Branches:
refs/heads/trunk abe7fc22c -> e4a254562
HDFS-9805. Add server-side configuration for enabling TCP_NODELAY for DataTransferProtocol and default it to true (Gary Helmling via cmccabe)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e4a25456
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e4a25456
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e4a25456
Branch: refs/heads/trunk
Commit: e4a25456202feeee9880d822a8e6f9c19cbcf24a
Parents: abe7fc2
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Wed Jun 29 12:34:45 2016 -0700
Committer: Colin Patrick Mccabe <cm...@cloudera.com>
Committed: Wed Jun 29 12:41:42 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/DFSUtilClient.java | 9 +
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 6 +
.../hadoop/hdfs/server/datanode/DNConf.java | 8 +
.../hadoop/hdfs/server/datanode/DataNode.java | 11 +-
.../hdfs/server/datanode/DataXceiver.java | 2 +
.../erasurecode/StripedBlockWriter.java | 2 +
.../src/main/resources/hdfs-default.xml | 8 +
.../server/datanode/TestDataNodeTcpNoDelay.java | 460 +++++++++++++++++++
8 files changed, 503 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4a25456/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
index 7ce93fd..d7faa68 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
@@ -82,6 +82,8 @@ import java.util.Locale;
import java.util.Map;
import java.util.Set;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES;
@@ -735,6 +737,7 @@ public class DFSUtilClient {
String dnAddr = dn.getXferAddr(connectToDnViaHostname);
LOG.debug("Connecting to datanode {}", dnAddr);
NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout);
+ sock.setTcpNoDelay(getClientDataTransferTcpNoDelay(conf));
sock.setSoTimeout(timeout);
OutputStream unbufOut = NetUtils.getOutputStream(sock);
@@ -756,4 +759,10 @@ public class DFSUtilClient {
}
}
}
+
+ private static boolean getClientDataTransferTcpNoDelay(Configuration conf) {
+ return conf.getBoolean(
+ DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY,
+ DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4a25456/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 6640ec6..08365cd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -930,6 +930,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_DEFAULT =
HdfsConstants.DEFAULT_DATA_SOCKET_SIZE;
+ public static final String
+ DFS_DATA_TRANSFER_SERVER_TCPNODELAY =
+ "dfs.data.transfer.server.tcpnodelay";
+ public static final boolean
+ DFS_DATA_TRANSFER_SERVER_TCPNODELAY_DEFAULT = true;
+
// Disk Balancer Keys
public static final String DFS_DISK_BALANCER_ENABLED =
"dfs.disk.balancer.enabled";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4a25456/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
index b616414..942672e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
@@ -76,6 +76,7 @@ public class DNConf {
final int socketKeepaliveTimeout;
private final int transferSocketSendBufferSize;
private final int transferSocketRecvBufferSize;
+ private final boolean tcpNoDelay;
final boolean transferToAllowed;
final boolean dropCacheBehindWrites;
@@ -132,6 +133,9 @@ public class DNConf {
this.transferSocketRecvBufferSize = conf.getInt(
DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_KEY,
DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_DEFAULT);
+ this.tcpNoDelay = conf.getBoolean(
+ DFSConfigKeys.DFS_DATA_TRANSFER_SERVER_TCPNODELAY,
+ DFSConfigKeys.DFS_DATA_TRANSFER_SERVER_TCPNODELAY_DEFAULT);
/* Based on results on different platforms, we might need set the default
* to false on some of them. */
@@ -361,6 +365,10 @@ public class DNConf {
return transferSocketSendBufferSize;
}
+ public boolean getDataTransferServerTcpNoDelay() {
+ return tcpNoDelay;
+ }
+
public long getBpReadyTimeout() {
return bpReadyTimeout;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4a25456/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index bb63074..32de34c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -71,7 +71,6 @@ import java.net.Socket;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
@@ -96,6 +95,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.ObjectName;
+import javax.net.SocketFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -390,6 +390,8 @@ public class DataNode extends ReconfigurableBase
private DiskBalancer diskBalancer;
+ private final SocketFactory socketFactory;
+
private static Tracer createTracer(Configuration conf) {
return new Tracer.Builder("DataNode").
conf(TraceUtils.wrapHadoopConf(DATANODE_HTRACE_PREFIX , conf)).
@@ -419,6 +421,7 @@ public class DataNode extends ReconfigurableBase
this.pipelineSupportECN = false;
this.checkDiskErrorInterval =
ThreadLocalRandom.current().nextInt(5000, (int) (5000 * 1.25));
+ this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
initOOBTimeout();
}
@@ -477,6 +480,8 @@ public class DataNode extends ReconfigurableBase
LOG.debug(this.fileDescriptorPassingDisabledReason);
}
+ this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
+
try {
hostName = getHostName(conf);
LOG.info("Configured hostname is " + hostName);
@@ -1676,8 +1681,7 @@ public class DataNode extends ReconfigurableBase
* Creates either NIO or regular depending on socketWriteTimeout.
*/
public Socket newSocket() throws IOException {
- return (dnConf.socketWriteTimeout > 0) ?
- SocketChannel.open().socket() : new Socket();
+ return socketFactory.createSocket();
}
/**
@@ -2327,6 +2331,7 @@ public class DataNode extends ReconfigurableBase
}
sock = newSocket();
NetUtils.connect(sock, curTarget, dnConf.socketTimeout);
+ sock.setTcpNoDelay(dnConf.getDataTransferServerTcpNoDelay());
sock.setSoTimeout(targets.length * dnConf.socketTimeout);
//
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4a25456/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 9236a19..c2cf76e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -747,6 +747,7 @@ class DataXceiver extends Receiver implements Runnable {
int writeTimeout = dnConf.socketWriteTimeout +
(HdfsConstants.WRITE_TIMEOUT_EXTENSION * targets.length);
NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
+ mirrorSock.setTcpNoDelay(dnConf.getDataTransferServerTcpNoDelay());
mirrorSock.setSoTimeout(timeoutValue);
mirrorSock.setKeepAlive(true);
if (dnConf.getTransferSocketSendBufferSize() > 0) {
@@ -1118,6 +1119,7 @@ class DataXceiver extends Receiver implements Runnable {
InetSocketAddress proxyAddr = NetUtils.createSocketAddr(dnAddr);
proxySock = datanode.newSocket();
NetUtils.connect(proxySock, proxyAddr, dnConf.socketTimeout);
+ proxySock.setTcpNoDelay(dnConf.getDataTransferServerTcpNoDelay());
proxySock.setSoTimeout(dnConf.socketTimeout);
proxySock.setKeepAlive(true);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4a25456/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java
index a62f3c2..32e8843 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java
@@ -102,6 +102,8 @@ class StripedBlockWriter {
socket = datanode.newSocket();
NetUtils.connect(socket, targetAddr,
datanode.getDnConf().getSocketTimeout());
+ socket.setTcpNoDelay(
+ datanode.getDnConf().getDataTransferServerTcpNoDelay());
socket.setSoTimeout(datanode.getDnConf().getSocketTimeout());
Token<BlockTokenIdentifier> blockToken =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4a25456/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 856e6b4..a198b71 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -3431,6 +3431,14 @@
</property>
<property>
+ <name>dfs.data.transfer.server.tcpnodelay</name>
+ <value>true</value>
+ <description>
+ If true, set TCP_NODELAY to sockets for transferring data between Datanodes.
+ </description>
+</property>
+
+<property>
<name>dfs.datanode.balance.max.concurrent.moves</name>
<value>50</value>
<description>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4a25456/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeTcpNoDelay.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeTcpNoDelay.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeTcpNoDelay.java
new file mode 100644
index 0000000..4d9f119
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeTcpNoDelay.java
@@ -0,0 +1,460 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.StandardSocketFactory;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.net.SocketFactory;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Checks that used sockets have TCP_NODELAY set when configured.
+ */
+public class TestDataNodeTcpNoDelay {
+ private static final Log LOG =
+ LogFactory.getLog(TestDataNodeTcpNoDelay.class);
+ private static Configuration baseConf;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ baseConf = new HdfsConfiguration();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+
+ }
+
+ @Test
+ public void testTcpNoDelayEnabled() throws Exception {
+ Configuration testConf = new Configuration(baseConf);
+ // here we do not have to config TCP_NDELAY settings, since they should be
+ // active by default
+ testConf.set(HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY,
+ SocketFactoryWrapper.class.getName());
+
+ SocketFactory defaultFactory = NetUtils.getDefaultSocketFactory(testConf);
+ LOG.info("Socket factory is " + defaultFactory.getClass().getName());
+ MiniDFSCluster dfsCluster =
+ new MiniDFSCluster.Builder(testConf).numDataNodes(3).build();
+ dfsCluster.waitActive();
+
+ DistributedFileSystem dfs = dfsCluster.getFileSystem();
+
+ try {
+ createData(dfs);
+ transferBlock(dfs);
+
+ // check that TCP_NODELAY has been set on all sockets
+ assertTrue(SocketFactoryWrapper.wasTcpNoDelayActive());
+ } finally {
+ SocketFactoryWrapper.reset();
+ dfsCluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testTcpNoDelayDisabled() throws Exception {
+ Configuration testConf = new Configuration(baseConf);
+ // disable TCP_NODELAY in settings
+ setTcpNoDelay(testConf, false);
+ testConf.set(HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY,
+ SocketFactoryWrapper.class.getName());
+
+ SocketFactory defaultFactory = NetUtils.getDefaultSocketFactory(testConf);
+ LOG.info("Socket factory is " + defaultFactory.getClass().getName());
+ MiniDFSCluster dfsCluster =
+ new MiniDFSCluster.Builder(testConf).numDataNodes(3).build();
+ dfsCluster.waitActive();
+
+ DistributedFileSystem dfs = dfsCluster.getFileSystem();
+
+ try {
+ createData(dfs);
+ transferBlock(dfs);
+
+ // we can only check that TCP_NODELAY was disabled on some sockets,
+ // since part of the client write path always enables TCP_NODELAY
+ // by necessity
+ assertFalse(SocketFactoryWrapper.wasTcpNoDelayActive());
+ } finally {
+ SocketFactoryWrapper.reset();
+ dfsCluster.shutdown();
+ }
+ }
+
+
+ private void createData(DistributedFileSystem dfs) throws Exception {
+ Path dir = new Path("test-dir");
+ for (int i = 0; i < 3; i++) {
+ Path f = new Path(dir, "file" + i);
+ DFSTestUtil.createFile(dfs, f, 10240, (short) 3, 0);
+ }
+ }
+
+ /**
+ * Tests the {@code DataNode#transferBlocks()} path by re-replicating an
+ * existing block.
+ */
+ private void transferBlock(DistributedFileSystem dfs) throws Exception {
+ Path dir = new Path("test-block-transfer");
+ Path f = new Path(dir, "testfile");
+ DFSTestUtil.createFile(dfs, f, 10240, (short) 1, 0);
+
+ // force a block transfer to another DN
+ dfs.setReplication(f, (short) 2);
+ DFSTestUtil.waitForReplication(dfs, f, (short) 2, 20000);
+ }
+
+ /**
+ * Sets known TCP_NODELAY configs to the given value.
+ */
+ private void setTcpNoDelay(Configuration conf, boolean value) {
+ conf.setBoolean(
+ HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY, value);
+ conf.setBoolean(
+ DFSConfigKeys.DFS_DATA_TRANSFER_SERVER_TCPNODELAY, value);
+ conf.setBoolean(
+ CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_KEY, value);
+ conf.setBoolean(
+ CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY, value);
+ }
+
+ public static class SocketFactoryWrapper extends StandardSocketFactory {
+ private static List<SocketWrapper> sockets = new ArrayList<SocketWrapper>();
+
+ public static boolean wasTcpNoDelayActive() {
+ LOG.info("Checking " + sockets.size() + " sockets for TCP_NODELAY");
+ for (SocketWrapper sw : sockets) {
+ if (!sw.getLastTcpNoDelay()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public static void reset() {
+ sockets = new ArrayList<>();
+ }
+
+ @Override
+ public Socket createSocket() throws IOException {
+ LOG.info("Creating new socket");
+ SocketWrapper wrapper = new SocketWrapper(super.createSocket());
+ sockets.add(wrapper);
+ return wrapper;
+ }
+
+ @Override
+ public Socket createSocket(String host, int port)
+ throws IOException, UnknownHostException {
+ LOG.info("Creating socket for " + host);
+ SocketWrapper wrapper =
+ new SocketWrapper(super.createSocket(host, port));
+ sockets.add(wrapper);
+ return wrapper;
+ }
+
+ @Override
+ public Socket createSocket(String host, int port,
+ InetAddress localHostAddr, int localPort)
+ throws IOException, UnknownHostException {
+ LOG.info("Creating socket for " + host);
+ SocketWrapper wrapper = new SocketWrapper(
+ super.createSocket(host, port, localHostAddr, localPort));
+ sockets.add(wrapper);
+ return wrapper;
+ }
+
+ @Override
+ public Socket createSocket(InetAddress addr, int port) throws IOException {
+ LOG.info("Creating socket for " + addr);
+ SocketWrapper wrapper =
+ new SocketWrapper(super.createSocket(addr, port));
+ sockets.add(wrapper);
+ return wrapper;
+ }
+
+ @Override
+ public Socket createSocket(InetAddress addr, int port,
+ InetAddress localHostAddr, int localPort)
+ throws IOException {
+ LOG.info("Creating socket for " + addr);
+ SocketWrapper wrapper = new SocketWrapper(
+ super.createSocket(addr, port, localHostAddr, localPort));
+ sockets.add(wrapper);
+ return wrapper;
+ }
+ }
+
+ public static class SocketWrapper extends Socket {
+ private final Socket wrapped;
+ private boolean tcpNoDelay;
+
+ public SocketWrapper(Socket socket) {
+ this.wrapped = socket;
+ }
+
+ // Override methods, check whether tcpnodelay has been set for each socket
+ // created. This isn't perfect, as we could still send before tcpnodelay
+ // is set, but should at least trigger when tcpnodelay is never set at all.
+
+ @Override
+ public void connect(SocketAddress endpoint) throws IOException {
+ wrapped.connect(endpoint);
+ }
+
+ @Override
+ public void connect(SocketAddress endpoint, int timeout)
+ throws IOException {
+ wrapped.connect(endpoint, timeout);
+ }
+
+ @Override
+ public void bind(SocketAddress bindpoint) throws IOException {
+ wrapped.bind(bindpoint);
+ }
+
+ @Override
+ public InetAddress getInetAddress() {
+ return wrapped.getInetAddress();
+ }
+
+ @Override
+ public InetAddress getLocalAddress() {
+ return wrapped.getLocalAddress();
+ }
+
+ @Override
+ public int getPort() {
+ return wrapped.getPort();
+ }
+
+ @Override
+ public int getLocalPort() {
+ return wrapped.getLocalPort();
+ }
+
+ @Override
+ public SocketAddress getRemoteSocketAddress() {
+ return wrapped.getRemoteSocketAddress();
+ }
+
+ @Override
+ public SocketAddress getLocalSocketAddress() {
+ return wrapped.getLocalSocketAddress();
+ }
+
+ @Override
+ public SocketChannel getChannel() {
+ return wrapped.getChannel();
+ }
+
+ @Override
+ public InputStream getInputStream() throws IOException {
+ return wrapped.getInputStream();
+ }
+
+ @Override
+ public OutputStream getOutputStream() throws IOException {
+ return wrapped.getOutputStream();
+ }
+
+ @Override
+ public void setTcpNoDelay(boolean on) throws SocketException {
+ wrapped.setTcpNoDelay(on);
+ this.tcpNoDelay = on;
+ }
+
+ @Override
+ public boolean getTcpNoDelay() throws SocketException {
+ return wrapped.getTcpNoDelay();
+ }
+
+ @Override
+ public void setSoLinger(boolean on, int linger) throws SocketException {
+ wrapped.setSoLinger(on, linger);
+ }
+
+ @Override
+ public int getSoLinger() throws SocketException {
+ return wrapped.getSoLinger();
+ }
+
+ @Override
+ public void sendUrgentData(int data) throws IOException {
+ wrapped.sendUrgentData(data);
+ }
+
+ @Override
+ public void setOOBInline(boolean on) throws SocketException {
+ wrapped.setOOBInline(on);
+ }
+
+ @Override
+ public boolean getOOBInline() throws SocketException {
+ return wrapped.getOOBInline();
+ }
+
+ @Override
+ public synchronized void setSoTimeout(int timeout) throws SocketException {
+ wrapped.setSoTimeout(timeout);
+ }
+
+ @Override
+ public synchronized int getSoTimeout() throws SocketException {
+ return wrapped.getSoTimeout();
+ }
+
+ @Override
+ public synchronized void setSendBufferSize(int size)
+ throws SocketException {
+ wrapped.setSendBufferSize(size);
+ }
+
+ @Override
+ public synchronized int getSendBufferSize() throws SocketException {
+ return wrapped.getSendBufferSize();
+ }
+
+ @Override
+ public synchronized void setReceiveBufferSize(int size)
+ throws SocketException {
+ wrapped.setReceiveBufferSize(size);
+ }
+
+ @Override
+ public synchronized int getReceiveBufferSize() throws SocketException {
+ return wrapped.getReceiveBufferSize();
+ }
+
+ @Override
+ public void setKeepAlive(boolean on) throws SocketException {
+ wrapped.setKeepAlive(on);
+ }
+
+ @Override
+ public boolean getKeepAlive() throws SocketException {
+ return wrapped.getKeepAlive();
+ }
+
+ @Override
+ public void setTrafficClass(int tc) throws SocketException {
+ wrapped.setTrafficClass(tc);
+ }
+
+ @Override
+ public int getTrafficClass() throws SocketException {
+ return wrapped.getTrafficClass();
+ }
+
+ @Override
+ public void setReuseAddress(boolean on) throws SocketException {
+ wrapped.setReuseAddress(on);
+ }
+
+ @Override
+ public boolean getReuseAddress() throws SocketException {
+ return wrapped.getReuseAddress();
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ wrapped.close();
+ }
+
+ @Override
+ public void shutdownInput() throws IOException {
+ wrapped.shutdownInput();
+ }
+
+ @Override
+ public void shutdownOutput() throws IOException {
+ wrapped.shutdownOutput();
+ }
+
+ @Override
+ public String toString() {
+ return wrapped.toString();
+ }
+
+ @Override
+ public boolean isConnected() {
+ return wrapped.isConnected();
+ }
+
+ @Override
+ public boolean isBound() {
+ return wrapped.isBound();
+ }
+
+ @Override
+ public boolean isClosed() {
+ return wrapped.isClosed();
+ }
+
+ @Override
+ public boolean isInputShutdown() {
+ return wrapped.isInputShutdown();
+ }
+
+ @Override
+ public boolean isOutputShutdown() {
+ return wrapped.isOutputShutdown();
+ }
+
+ @Override
+ public void setPerformancePreferences(int connectionTime, int latency,
+ int bandwidth) {
+ wrapped.setPerformancePreferences(connectionTime, latency, bandwidth);
+ }
+
+ public boolean getLastTcpNoDelay() {
+ return tcpNoDelay;
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org