You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2012/09/10 20:46:59 UTC

svn commit: r1383033 - in /hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/qjournal/client/ src/main/java/org/apache/hadoop/hdfs/qjournal/server/ src/test/java/org/apache/hadoop/hdfs/qjournal/se...

Author: todd
Date: Mon Sep 10 18:46:58 2012
New Revision: 1383033

URL: http://svn.apache.org/viewvc?rev=1383033&view=rev
Log:
HDFS-3898. QJM: enable TCP_NODELAY for IPC. Contributed by Todd Lipcon.

Modified:
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt?rev=1383033&r1=1383032&r2=1383033&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt Mon Sep 10 18:46:58 2012
@@ -52,3 +52,5 @@ HDFS-3726. If a logger misses an RPC, do
 HDFS-3893. QJM: Make QJM work with security enabled. (atm)
 
 HDFS-3897. QJM: TestBlockToken fails after HDFS-3893. (atm)
+
+HDFS-3898. QJM: enable TCP_NODELAY for IPC (todd)

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java?rev=1383033&r1=1383032&r2=1383033&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java Mon Sep 10 18:46:58 2012
@@ -29,6 +29,7 @@ import java.util.concurrent.Executors;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
@@ -166,16 +167,26 @@ public class IPCLoggerChannel implements
   }
   
   protected QJournalProtocol createProxy() throws IOException {
+    final Configuration confCopy = new Configuration(conf);
+    
+    // Need to set NODELAY or else batches larger than MTU can trigger 
+    // 40ms nagling delays.
+    confCopy.setBoolean(
+        CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_KEY,
+        true);
+    
+    RPC.setProtocolEngine(confCopy,
+        QJournalProtocolPB.class, ProtobufRpcEngine.class);
     return SecurityUtil.doAsLoginUser(
         new PrivilegedExceptionAction<QJournalProtocol>() {
           @Override
           public QJournalProtocol run() throws IOException {
-            RPC.setProtocolEngine(conf,
+            RPC.setProtocolEngine(confCopy,
                 QJournalProtocolPB.class, ProtobufRpcEngine.class);
             QJournalProtocolPB pbproxy = RPC.getProxy(
                 QJournalProtocolPB.class,
                 RPC.getProtocolVersion(QJournalProtocolPB.class),
-                addr, conf);
+                addr, confCopy);
             return new QJournalProtocolTranslatorPB(pbproxy);
           }
         });

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java?rev=1383033&r1=1383032&r2=1383033&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java Mon Sep 10 18:46:58 2012
@@ -23,6 +23,7 @@ import java.net.URL;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
@@ -54,8 +55,15 @@ class JournalNodeRpcServer implements QJ
   JournalNodeRpcServer(Configuration conf, JournalNode jn) throws IOException {
     this.jn = jn;
     
-    InetSocketAddress addr = getAddress(conf);
-    RPC.setProtocolEngine(conf, QJournalProtocolPB.class,
+    Configuration confCopy = new Configuration(conf);
+    
+    // Ensure that nagling doesn't kick in, which could cause latency issues.
+    confCopy.setBoolean(
+        CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY,
+        true);
+    
+    InetSocketAddress addr = getAddress(confCopy);
+    RPC.setProtocolEngine(confCopy, QJournalProtocolPB.class,
         ProtobufRpcEngine.class);
     QJournalProtocolServerSideTranslatorPB translator =
         new QJournalProtocolServerSideTranslatorPB(this);
@@ -65,13 +73,13 @@ class JournalNodeRpcServer implements QJ
     this.server = RPC.getServer(
         QJournalProtocolPB.class,
         service, addr.getHostName(),
-            addr.getPort(), HANDLER_COUNT, false, conf,
+            addr.getPort(), HANDLER_COUNT, false, confCopy,
             null /*secretManager*/);
 
     // set service-level authorization security policy
-    if (conf.getBoolean(
+    if (confCopy.getBoolean(
       CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
-          server.refreshServiceAcl(conf, new HDFSPolicyProvider());
+          server.refreshServiceAcl(confCopy, new HDFSPolicyProvider());
     }
   }
 

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java?rev=1383033&r1=1383032&r2=1383033&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java Mon Sep 10 18:46:58 2012
@@ -23,15 +23,17 @@ import static org.junit.Assert.assertFal
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import java.io.IOException;
+import java.io.File;
 import java.net.HttpURLConnection;
 import java.net.InetSocketAddress;
 import java.net.URL;
 import java.util.concurrent.ExecutionException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
 import org.apache.hadoop.hdfs.qjournal.client.IPCLoggerChannel;
@@ -49,6 +51,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import com.google.common.base.Charsets;
+import com.google.common.base.Stopwatch;
 import com.google.common.primitives.Bytes;
 import com.google.common.primitives.Ints;
 
@@ -70,6 +73,12 @@ public class TestJournalNode {
   
   @Before
   public void setup() throws Exception {
+    File editsDir = new File(MiniDFSCluster.getBaseDirectory() +
+        File.separator + "TestJournalNode");
+    FileUtil.fullyDelete(editsDir);
+    
+    conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY,
+        editsDir.getAbsolutePath());
     conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY,
         "0.0.0.0:0");
     jn = new JournalNode();
@@ -276,6 +285,39 @@ public class TestJournalNode {
     }
   }
   
+  /**
+   * Simple test of how fast the code path is to write edits.
+   * This isn't a true unit test, but can be run manually to
+   * check performance.
+   * 
+   * At the time of development, this test ran in ~4sec on an
+   * SSD-enabled laptop (1.8ms/batch).
+   */
+  @Test(timeout=100000)
+  public void testPerformance() throws Exception {
+    doPerfTest(8192, 1024); // 8MB
+  }
+  
+  private void doPerfTest(int editsSize, int numEdits) throws Exception {
+    byte[] data = new byte[editsSize];
+    ch.newEpoch(1).get();
+    ch.setEpoch(1);
+    ch.startLogSegment(1).get();
+    
+    Stopwatch sw = new Stopwatch().start();
+    for (int i = 1; i < numEdits; i++) {
+      ch.sendEdits(1L, i, 1, data).get();
+    }
+    long time = sw.elapsedMillis();
+    
+    System.err.println("Wrote " + numEdits + " batches of " + editsSize +
+        " bytes in " + time + "ms");
+    float avgRtt = (float)time/(float)numEdits;
+    long throughput = ((long)numEdits * editsSize * 1000L)/time;
+    System.err.println("Time per batch: " + avgRtt);
+    System.err.println("Throughput: " + throughput + " bytes/sec");
+  }
+  
   // TODO:
   // - add test that checks formatting behavior
   // - add test that checks rejects newEpoch if nsinfo doesn't match