You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2016/06/29 19:46:19 UTC

hadoop git commit: HDFS-9805. Add server-side configuration for enabling TCP_NODELAY for DataTransferProtocol and default it to true (Gary Helmling via cmccabe)

Repository: hadoop
Updated Branches:
  refs/heads/trunk abe7fc22c -> e4a254562


HDFS-9805. Add server-side configuration for enabling TCP_NODELAY for DataTransferProtocol and default it to true (Gary Helmling via cmccabe)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e4a25456
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e4a25456
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e4a25456

Branch: refs/heads/trunk
Commit: e4a25456202feeee9880d822a8e6f9c19cbcf24a
Parents: abe7fc2
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Wed Jun 29 12:34:45 2016 -0700
Committer: Colin Patrick Mccabe <cm...@cloudera.com>
Committed: Wed Jun 29 12:41:42 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSUtilClient.java   |   9 +
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   6 +
 .../hadoop/hdfs/server/datanode/DNConf.java     |   8 +
 .../hadoop/hdfs/server/datanode/DataNode.java   |  11 +-
 .../hdfs/server/datanode/DataXceiver.java       |   2 +
 .../erasurecode/StripedBlockWriter.java         |   2 +
 .../src/main/resources/hdfs-default.xml         |   8 +
 .../server/datanode/TestDataNodeTcpNoDelay.java | 460 +++++++++++++++++++
 8 files changed, 503 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4a25456/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
index 7ce93fd..d7faa68 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
@@ -82,6 +82,8 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES;
 
@@ -735,6 +737,7 @@ public class DFSUtilClient {
       String dnAddr = dn.getXferAddr(connectToDnViaHostname);
       LOG.debug("Connecting to datanode {}", dnAddr);
       NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout);
+      sock.setTcpNoDelay(getClientDataTransferTcpNoDelay(conf));
       sock.setSoTimeout(timeout);
 
       OutputStream unbufOut = NetUtils.getOutputStream(sock);
@@ -756,4 +759,10 @@ public class DFSUtilClient {
       }
     }
   }
+
+  private static boolean getClientDataTransferTcpNoDelay(Configuration conf) {
+    return conf.getBoolean(
+        DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY,
+        DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4a25456/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 6640ec6..08365cd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -930,6 +930,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_DEFAULT =
       HdfsConstants.DEFAULT_DATA_SOCKET_SIZE;
 
+  public static final String
+      DFS_DATA_TRANSFER_SERVER_TCPNODELAY =
+      "dfs.data.transfer.server.tcpnodelay";
+  public static final boolean
+      DFS_DATA_TRANSFER_SERVER_TCPNODELAY_DEFAULT = true;
+
   // Disk Balancer Keys
   public static final String DFS_DISK_BALANCER_ENABLED =
       "dfs.disk.balancer.enabled";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4a25456/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
index b616414..942672e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
@@ -76,6 +76,7 @@ public class DNConf {
   final int socketKeepaliveTimeout;
   private final int transferSocketSendBufferSize;
   private final int transferSocketRecvBufferSize;
+  private final boolean tcpNoDelay;
 
   final boolean transferToAllowed;
   final boolean dropCacheBehindWrites;
@@ -132,6 +133,9 @@ public class DNConf {
     this.transferSocketRecvBufferSize = conf.getInt(
         DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_KEY,
         DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_DEFAULT);
+    this.tcpNoDelay = conf.getBoolean(
+        DFSConfigKeys.DFS_DATA_TRANSFER_SERVER_TCPNODELAY,
+        DFSConfigKeys.DFS_DATA_TRANSFER_SERVER_TCPNODELAY_DEFAULT);
 
     /* Based on results on different platforms, we might need set the default
      * to false on some of them. */
@@ -361,6 +365,10 @@ public class DNConf {
     return transferSocketSendBufferSize;
   }
 
+  public boolean getDataTransferServerTcpNoDelay() {
+    return tcpNoDelay;
+  }
+
   public long getBpReadyTimeout() {
     return bpReadyTimeout;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4a25456/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index bb63074..32de34c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -71,7 +71,6 @@ import java.net.Socket;
 import java.net.URI;
 import java.net.UnknownHostException;
 import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -96,6 +95,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.management.ObjectName;
+import javax.net.SocketFactory;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -390,6 +390,8 @@ public class DataNode extends ReconfigurableBase
   private DiskBalancer diskBalancer;
 
 
+  private final SocketFactory socketFactory;
+
   private static Tracer createTracer(Configuration conf) {
     return new Tracer.Builder("DataNode").
         conf(TraceUtils.wrapHadoopConf(DATANODE_HTRACE_PREFIX , conf)).
@@ -419,6 +421,7 @@ public class DataNode extends ReconfigurableBase
     this.pipelineSupportECN = false;
     this.checkDiskErrorInterval =
         ThreadLocalRandom.current().nextInt(5000, (int) (5000 * 1.25));
+    this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
     initOOBTimeout();
   }
 
@@ -477,6 +480,8 @@ public class DataNode extends ReconfigurableBase
       LOG.debug(this.fileDescriptorPassingDisabledReason);
     }
 
+    this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
+
     try {
       hostName = getHostName(conf);
       LOG.info("Configured hostname is " + hostName);
@@ -1676,8 +1681,7 @@ public class DataNode extends ReconfigurableBase
    * Creates either NIO or regular depending on socketWriteTimeout.
    */
   public Socket newSocket() throws IOException {
-    return (dnConf.socketWriteTimeout > 0) ? 
-           SocketChannel.open().socket() : new Socket();                                   
+    return socketFactory.createSocket();
   }
 
   /**
@@ -2327,6 +2331,7 @@ public class DataNode extends ReconfigurableBase
         }
         sock = newSocket();
         NetUtils.connect(sock, curTarget, dnConf.socketTimeout);
+        sock.setTcpNoDelay(dnConf.getDataTransferServerTcpNoDelay());
         sock.setSoTimeout(targets.length * dnConf.socketTimeout);
 
         //

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4a25456/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 9236a19..c2cf76e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -747,6 +747,7 @@ class DataXceiver extends Receiver implements Runnable {
           int writeTimeout = dnConf.socketWriteTimeout +
               (HdfsConstants.WRITE_TIMEOUT_EXTENSION * targets.length);
           NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
+          mirrorSock.setTcpNoDelay(dnConf.getDataTransferServerTcpNoDelay());
           mirrorSock.setSoTimeout(timeoutValue);
           mirrorSock.setKeepAlive(true);
           if (dnConf.getTransferSocketSendBufferSize() > 0) {
@@ -1118,6 +1119,7 @@ class DataXceiver extends Receiver implements Runnable {
         InetSocketAddress proxyAddr = NetUtils.createSocketAddr(dnAddr);
         proxySock = datanode.newSocket();
         NetUtils.connect(proxySock, proxyAddr, dnConf.socketTimeout);
+        proxySock.setTcpNoDelay(dnConf.getDataTransferServerTcpNoDelay());
         proxySock.setSoTimeout(dnConf.socketTimeout);
         proxySock.setKeepAlive(true);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4a25456/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java
index a62f3c2..32e8843 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java
@@ -102,6 +102,8 @@ class StripedBlockWriter {
       socket = datanode.newSocket();
       NetUtils.connect(socket, targetAddr,
           datanode.getDnConf().getSocketTimeout());
+      socket.setTcpNoDelay(
+          datanode.getDnConf().getDataTransferServerTcpNoDelay());
       socket.setSoTimeout(datanode.getDnConf().getSocketTimeout());
 
       Token<BlockTokenIdentifier> blockToken =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4a25456/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 856e6b4..a198b71 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -3431,6 +3431,14 @@
 </property>
 
 <property>
+  <name>dfs.data.transfer.server.tcpnodelay</name>
+  <value>true</value>
+  <description>
+    If true, set TCP_NODELAY to sockets for transferring data between Datanodes.
+  </description>
+</property>
+
+<property>
   <name>dfs.datanode.balance.max.concurrent.moves</name>
   <value>50</value>
   <description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4a25456/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeTcpNoDelay.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeTcpNoDelay.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeTcpNoDelay.java
new file mode 100644
index 0000000..4d9f119
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeTcpNoDelay.java
@@ -0,0 +1,460 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.StandardSocketFactory;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.net.SocketFactory;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Checks that used sockets have TCP_NODELAY set when configured.
+ */
+public class TestDataNodeTcpNoDelay {
+  private static final Log LOG =
+      LogFactory.getLog(TestDataNodeTcpNoDelay.class);
+  private static Configuration baseConf;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    baseConf = new HdfsConfiguration();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+
+  }
+
+  @Test
+  public void testTcpNoDelayEnabled() throws Exception {
+    Configuration testConf = new Configuration(baseConf);
+    // here we do not have to config TCP_NDELAY settings, since they should be
+    // active by default
+    testConf.set(HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY,
+        SocketFactoryWrapper.class.getName());
+
+    SocketFactory defaultFactory = NetUtils.getDefaultSocketFactory(testConf);
+    LOG.info("Socket factory is " + defaultFactory.getClass().getName());
+    MiniDFSCluster dfsCluster =
+        new MiniDFSCluster.Builder(testConf).numDataNodes(3).build();
+    dfsCluster.waitActive();
+
+    DistributedFileSystem dfs = dfsCluster.getFileSystem();
+
+    try {
+      createData(dfs);
+      transferBlock(dfs);
+
+      // check that TCP_NODELAY has been set on all sockets
+      assertTrue(SocketFactoryWrapper.wasTcpNoDelayActive());
+    } finally {
+      SocketFactoryWrapper.reset();
+      dfsCluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testTcpNoDelayDisabled() throws Exception {
+    Configuration testConf = new Configuration(baseConf);
+    // disable TCP_NODELAY in settings
+    setTcpNoDelay(testConf, false);
+    testConf.set(HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY,
+        SocketFactoryWrapper.class.getName());
+
+    SocketFactory defaultFactory = NetUtils.getDefaultSocketFactory(testConf);
+    LOG.info("Socket factory is " + defaultFactory.getClass().getName());
+    MiniDFSCluster dfsCluster =
+        new MiniDFSCluster.Builder(testConf).numDataNodes(3).build();
+    dfsCluster.waitActive();
+
+    DistributedFileSystem dfs = dfsCluster.getFileSystem();
+
+    try {
+      createData(dfs);
+      transferBlock(dfs);
+
+      // we can only check that TCP_NODELAY was disabled on some sockets,
+      // since part of the client write path always enables TCP_NODELAY
+      // by necessity
+      assertFalse(SocketFactoryWrapper.wasTcpNoDelayActive());
+    } finally {
+      SocketFactoryWrapper.reset();
+      dfsCluster.shutdown();
+    }
+  }
+
+
+  private void createData(DistributedFileSystem dfs) throws Exception {
+    Path dir = new Path("test-dir");
+    for (int i = 0; i < 3; i++) {
+      Path f = new Path(dir, "file" + i);
+      DFSTestUtil.createFile(dfs, f, 10240, (short) 3, 0);
+    }
+  }
+
+  /**
+   * Tests the {@code DataNode#transferBlocks()} path by re-replicating an
+   * existing block.
+   */
+  private void transferBlock(DistributedFileSystem dfs) throws Exception {
+    Path dir = new Path("test-block-transfer");
+    Path f = new Path(dir, "testfile");
+    DFSTestUtil.createFile(dfs, f, 10240, (short) 1, 0);
+
+    // force a block transfer to another DN
+    dfs.setReplication(f, (short) 2);
+    DFSTestUtil.waitForReplication(dfs, f, (short) 2, 20000);
+  }
+
+  /**
+   * Sets known TCP_NODELAY configs to the given value.
+   */
+  private void setTcpNoDelay(Configuration conf, boolean value) {
+    conf.setBoolean(
+        HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY, value);
+    conf.setBoolean(
+        DFSConfigKeys.DFS_DATA_TRANSFER_SERVER_TCPNODELAY, value);
+    conf.setBoolean(
+        CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_KEY, value);
+    conf.setBoolean(
+        CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY, value);
+  }
+
+  public static class SocketFactoryWrapper extends StandardSocketFactory {
+    private static List<SocketWrapper> sockets = new ArrayList<SocketWrapper>();
+
+    public static boolean wasTcpNoDelayActive() {
+      LOG.info("Checking " + sockets.size() + " sockets for TCP_NODELAY");
+      for (SocketWrapper sw : sockets) {
+        if (!sw.getLastTcpNoDelay()) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    public static void reset() {
+      sockets = new ArrayList<>();
+    }
+
+    @Override
+    public Socket createSocket() throws IOException {
+      LOG.info("Creating new socket");
+      SocketWrapper wrapper = new SocketWrapper(super.createSocket());
+      sockets.add(wrapper);
+      return wrapper;
+    }
+
+    @Override
+    public Socket createSocket(String host, int port)
+        throws IOException, UnknownHostException {
+      LOG.info("Creating socket for " + host);
+      SocketWrapper wrapper =
+          new SocketWrapper(super.createSocket(host, port));
+      sockets.add(wrapper);
+      return wrapper;
+    }
+
+    @Override
+    public Socket createSocket(String host, int port,
+                               InetAddress localHostAddr, int localPort)
+        throws IOException, UnknownHostException {
+      LOG.info("Creating socket for " + host);
+      SocketWrapper wrapper = new SocketWrapper(
+          super.createSocket(host, port, localHostAddr, localPort));
+      sockets.add(wrapper);
+      return wrapper;
+    }
+
+    @Override
+    public Socket createSocket(InetAddress addr, int port) throws IOException {
+      LOG.info("Creating socket for " + addr);
+      SocketWrapper wrapper =
+          new SocketWrapper(super.createSocket(addr, port));
+      sockets.add(wrapper);
+      return wrapper;
+    }
+
+    @Override
+    public Socket createSocket(InetAddress addr, int port,
+                               InetAddress localHostAddr, int localPort)
+        throws IOException {
+      LOG.info("Creating socket for " + addr);
+      SocketWrapper wrapper = new SocketWrapper(
+          super.createSocket(addr, port, localHostAddr, localPort));
+      sockets.add(wrapper);
+      return wrapper;
+    }
+  }
+
+  public static class SocketWrapper extends Socket {
+    private final Socket wrapped;
+    private boolean tcpNoDelay;
+
+    public SocketWrapper(Socket socket) {
+      this.wrapped = socket;
+    }
+
+    // Override methods, check whether tcpnodelay has been set for each socket
+    // created. This isn't perfect, as we could still send before tcpnodelay
+    // is set, but should at least trigger when tcpnodelay is never set at all.
+
+    @Override
+    public void connect(SocketAddress endpoint) throws IOException {
+      wrapped.connect(endpoint);
+    }
+
+    @Override
+    public void connect(SocketAddress endpoint, int timeout)
+        throws IOException {
+      wrapped.connect(endpoint, timeout);
+    }
+
+    @Override
+    public void bind(SocketAddress bindpoint) throws IOException {
+      wrapped.bind(bindpoint);
+    }
+
+    @Override
+    public InetAddress getInetAddress() {
+      return wrapped.getInetAddress();
+    }
+
+    @Override
+    public InetAddress getLocalAddress() {
+      return wrapped.getLocalAddress();
+    }
+
+    @Override
+    public int getPort() {
+      return wrapped.getPort();
+    }
+
+    @Override
+    public int getLocalPort() {
+      return wrapped.getLocalPort();
+    }
+
+    @Override
+    public SocketAddress getRemoteSocketAddress() {
+      return wrapped.getRemoteSocketAddress();
+    }
+
+    @Override
+    public SocketAddress getLocalSocketAddress() {
+      return wrapped.getLocalSocketAddress();
+    }
+
+    @Override
+    public SocketChannel getChannel() {
+      return wrapped.getChannel();
+    }
+
+    @Override
+    public InputStream getInputStream() throws IOException {
+      return wrapped.getInputStream();
+    }
+
+    @Override
+    public OutputStream getOutputStream() throws IOException {
+      return wrapped.getOutputStream();
+    }
+
+    @Override
+    public void setTcpNoDelay(boolean on) throws SocketException {
+      wrapped.setTcpNoDelay(on);
+      this.tcpNoDelay = on;
+    }
+
+    @Override
+    public boolean getTcpNoDelay() throws SocketException {
+      return wrapped.getTcpNoDelay();
+    }
+
+    @Override
+    public void setSoLinger(boolean on, int linger) throws SocketException {
+      wrapped.setSoLinger(on, linger);
+    }
+
+    @Override
+    public int getSoLinger() throws SocketException {
+      return wrapped.getSoLinger();
+    }
+
+    @Override
+    public void sendUrgentData(int data) throws IOException {
+      wrapped.sendUrgentData(data);
+    }
+
+    @Override
+    public void setOOBInline(boolean on) throws SocketException {
+      wrapped.setOOBInline(on);
+    }
+
+    @Override
+    public boolean getOOBInline() throws SocketException {
+      return wrapped.getOOBInline();
+    }
+
+    @Override
+    public synchronized void setSoTimeout(int timeout) throws SocketException {
+      wrapped.setSoTimeout(timeout);
+    }
+
+    @Override
+    public synchronized int getSoTimeout() throws SocketException {
+      return wrapped.getSoTimeout();
+    }
+
+    @Override
+    public synchronized void setSendBufferSize(int size)
+        throws SocketException {
+      wrapped.setSendBufferSize(size);
+    }
+
+    @Override
+    public synchronized int getSendBufferSize() throws SocketException {
+      return wrapped.getSendBufferSize();
+    }
+
+    @Override
+    public synchronized void setReceiveBufferSize(int size)
+        throws SocketException {
+      wrapped.setReceiveBufferSize(size);
+    }
+
+    @Override
+    public synchronized int getReceiveBufferSize() throws SocketException {
+      return wrapped.getReceiveBufferSize();
+    }
+
+    @Override
+    public void setKeepAlive(boolean on) throws SocketException {
+      wrapped.setKeepAlive(on);
+    }
+
+    @Override
+    public boolean getKeepAlive() throws SocketException {
+      return wrapped.getKeepAlive();
+    }
+
+    @Override
+    public void setTrafficClass(int tc) throws SocketException {
+      wrapped.setTrafficClass(tc);
+    }
+
+    @Override
+    public int getTrafficClass() throws SocketException {
+      return wrapped.getTrafficClass();
+    }
+
+    @Override
+    public void setReuseAddress(boolean on) throws SocketException {
+      wrapped.setReuseAddress(on);
+    }
+
+    @Override
+    public boolean getReuseAddress() throws SocketException {
+      return wrapped.getReuseAddress();
+    }
+
+    @Override
+    public synchronized void close() throws IOException {
+      wrapped.close();
+    }
+
+    @Override
+    public void shutdownInput() throws IOException {
+      wrapped.shutdownInput();
+    }
+
+    @Override
+    public void shutdownOutput() throws IOException {
+      wrapped.shutdownOutput();
+    }
+
+    @Override
+    public String toString() {
+      return wrapped.toString();
+    }
+
+    @Override
+    public boolean isConnected() {
+      return wrapped.isConnected();
+    }
+
+    @Override
+    public boolean isBound() {
+      return wrapped.isBound();
+    }
+
+    @Override
+    public boolean isClosed() {
+      return wrapped.isClosed();
+    }
+
+    @Override
+    public boolean isInputShutdown() {
+      return wrapped.isInputShutdown();
+    }
+
+    @Override
+    public boolean isOutputShutdown() {
+      return wrapped.isOutputShutdown();
+    }
+
+    @Override
+    public void setPerformancePreferences(int connectionTime, int latency,
+                                          int bandwidth) {
+      wrapped.setPerformancePreferences(connectionTime, latency, bandwidth);
+    }
+
+    public boolean getLastTcpNoDelay() {
+      return tcpNoDelay;
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org