You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2012/10/16 02:54:48 UTC

svn commit: r1398590 - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ src/test/java/org/apache/hadoop/hdfs/ src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/

Author: todd
Date: Tue Oct 16 00:54:47 2012
New Revision: 1398590

URL: http://svn.apache.org/viewvc?rev=1398590&view=rev
Log:
HDFS-4049. Fix hflush performance regression due to nagling delays. Contributed by Todd Lipcon.

Added:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/TestPacketReceiver.java
Modified:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1398590&r1=1398589&r2=1398590&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Oct 16 00:54:47 2012
@@ -106,6 +106,9 @@ Release 2.0.3-alpha - Unreleased
     HDFS-4044. Duplicate ChecksumType definition in HDFS .proto files.
     (Binglin Chang via suresh)
 
+    HDFS-4049. Fix hflush performance regression due to nagling delays
+    (todd)
+
 Release 2.0.2-alpha - 2012-09-07 
   
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java?rev=1398590&r1=1398589&r2=1398590&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java Tue Oct 16 00:54:47 2012
@@ -53,14 +53,8 @@ public class PacketReceiver implements C
   private final boolean useDirectBuffers;
 
   /**
-   * Internal buffer for reading the length prefixes at the start of
-   * the packet.
-   */
-  private final ByteBuffer lengthPrefixBuf = ByteBuffer.allocate(
-      PacketHeader.PKT_LENGTHS_LEN);
-
-  /**
-   * The entirety of the most recently read packet, excepting the
+   * The entirety of the most recently read packet.
+   * The first PKT_LENGTHS_LEN bytes of this buffer are the
    * length prefixes.
    */
   private ByteBuffer curPacketBuf = null;
@@ -82,6 +76,7 @@ public class PacketReceiver implements C
   
   public PacketReceiver(boolean useDirectBuffers) {
     this.useDirectBuffers = useDirectBuffers;
+    reallocPacketBuf(PacketHeader.PKT_LENGTHS_LEN);
   }
 
   public PacketHeader getHeader() {
@@ -133,11 +128,12 @@ public class PacketReceiver implements C
     //            checksums were not requested
     // DATA       the actual block data
     Preconditions.checkState(curHeader == null || !curHeader.isLastPacketInBlock());
-    
-    lengthPrefixBuf.clear();
-    doReadFully(ch, in, lengthPrefixBuf);
-    lengthPrefixBuf.flip();
-    int payloadLen = lengthPrefixBuf.getInt();
+
+    curPacketBuf.clear();
+    curPacketBuf.limit(PacketHeader.PKT_LENGTHS_LEN);
+    doReadFully(ch, in, curPacketBuf);
+    curPacketBuf.flip();
+    int payloadLen = curPacketBuf.getInt();
     
     if (payloadLen < Ints.BYTES) {
       // The "payload length" includes its own length. Therefore it
@@ -146,7 +142,7 @@ public class PacketReceiver implements C
           payloadLen);
     }
     int dataPlusChecksumLen = payloadLen - Ints.BYTES;
-    int headerLen = lengthPrefixBuf.getShort();
+    int headerLen = curPacketBuf.getShort();
     if (headerLen < 0) {
       throw new IOException("Invalid header length " + headerLen);
     }
@@ -166,13 +162,17 @@ public class PacketReceiver implements C
 
     // Make sure we have space for the whole packet, and
     // read it.
-    reallocPacketBuf(dataPlusChecksumLen + headerLen);
+    reallocPacketBuf(PacketHeader.PKT_LENGTHS_LEN +
+        dataPlusChecksumLen + headerLen);
     curPacketBuf.clear();
-    curPacketBuf.limit(dataPlusChecksumLen + headerLen);
+    curPacketBuf.position(PacketHeader.PKT_LENGTHS_LEN);
+    curPacketBuf.limit(PacketHeader.PKT_LENGTHS_LEN +
+        dataPlusChecksumLen + headerLen);
     doReadFully(ch, in, curPacketBuf);
     curPacketBuf.flip();
+    curPacketBuf.position(PacketHeader.PKT_LENGTHS_LEN);
 
-    // Extract the header from the front of the buffer.
+    // Extract the header from the front of the buffer (after the length prefixes)
     byte[] headerBuf = new byte[headerLen];
     curPacketBuf.get(headerBuf);
     if (curHeader == null) {
@@ -197,10 +197,6 @@ public class PacketReceiver implements C
   public void mirrorPacketTo(DataOutputStream mirrorOut) throws IOException {
     Preconditions.checkState(!useDirectBuffers,
         "Currently only supported for non-direct buffers");
-    assert lengthPrefixBuf.capacity() == PacketHeader.PKT_LENGTHS_LEN;
-    mirrorOut.write(lengthPrefixBuf.array(),
-        lengthPrefixBuf.arrayOffset(),
-        lengthPrefixBuf.capacity());
     mirrorOut.write(curPacketBuf.array(),
         curPacketBuf.arrayOffset(),
         curPacketBuf.remaining());
@@ -223,23 +219,36 @@ public class PacketReceiver implements C
 
   private void reslicePacket(
       int headerLen, int checksumsLen, int dataLen) {
+    // Packet structure (refer to doRead() for details):
+    //   PLEN    HLEN      HEADER     CHECKSUMS  DATA
+    //   32-bit  16-bit   <protobuf>  <variable length>
+    //   |--- lenThroughHeader ----|
+    //   |----------- lenThroughChecksums   ----|
+    //   |------------------- lenThroughData    ------| 
+    int lenThroughHeader = PacketHeader.PKT_LENGTHS_LEN + headerLen;
+    int lenThroughChecksums = lenThroughHeader + checksumsLen;
+    int lenThroughData = lenThroughChecksums + dataLen;
+
     assert dataLen >= 0 : "invalid datalen: " + dataLen;
-    
-    assert curPacketBuf.position() == headerLen;
-    assert checksumsLen + dataLen == curPacketBuf.remaining() :
+    assert curPacketBuf.position() == lenThroughHeader;
+    assert curPacketBuf.limit() == lenThroughData :
       "headerLen= " + headerLen + " clen=" + checksumsLen + " dlen=" + dataLen +
       " rem=" + curPacketBuf.remaining();
-    
-    curPacketBuf.position(headerLen);
-    curPacketBuf.limit(headerLen + checksumsLen);
+
+    // Slice the checksums.
+    curPacketBuf.position(lenThroughHeader);
+    curPacketBuf.limit(lenThroughChecksums);
     curChecksumSlice = curPacketBuf.slice();
 
-    curPacketBuf.position(headerLen + checksumsLen);
-    curPacketBuf.limit(headerLen + checksumsLen + dataLen);
+    // Slice the data.
+    curPacketBuf.position(lenThroughChecksums);
+    curPacketBuf.limit(lenThroughData);
     curDataSlice = curPacketBuf.slice();
     
+    // Reset buffer to point to the entirety of the packet (including
+    // length prefixes)
     curPacketBuf.position(0);
-    curPacketBuf.limit(headerLen + checksumsLen + dataLen);
+    curPacketBuf.limit(lenThroughData);
   }
 
   
@@ -258,12 +267,21 @@ public class PacketReceiver implements C
     // one.
     if (curPacketBuf == null ||
         curPacketBuf.capacity() < atLeastCapacity) {
-      returnPacketBufToPool();
+      ByteBuffer newBuf;
       if (useDirectBuffers) {
-        curPacketBuf = bufferPool.getBuffer(atLeastCapacity);
+        newBuf = bufferPool.getBuffer(atLeastCapacity);
       } else {
-        curPacketBuf = ByteBuffer.allocate(atLeastCapacity);
+        newBuf = ByteBuffer.allocate(atLeastCapacity);
       }
+      // If reallocing an existing buffer, copy the old packet length
+      // prefixes over
+      if (curPacketBuf != null) {
+        curPacketBuf.flip();
+        newBuf.put(curPacketBuf);
+      }
+      
+      returnPacketBufToPool();
+      curPacketBuf = newBuf;
     }
   }
   

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java?rev=1398590&r1=1398589&r2=1398590&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java Tue Oct 16 00:54:47 2012
@@ -20,45 +20,40 @@ package org.apache.hadoop.hdfs;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
-import org.apache.log4j.Level;
+import org.apache.hadoop.metrics2.util.Quantile;
+import org.apache.hadoop.metrics2.util.SampleQuantiles;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
 import org.junit.Test;
 
+import com.google.common.base.Stopwatch;
+
 /**
  * This class tests hflushing concurrently from many threads.
  */
 public class TestMultiThreadedHflush {
   static final int blockSize = 1024*1024;
-  static final int numBlocks = 10;
-  static final int fileSize = numBlocks * blockSize + 1;
 
   private static final int NUM_THREADS = 10;
   private static final int WRITE_SIZE = 517;
   private static final int NUM_WRITES_PER_THREAD = 1000;
   
   private byte[] toWrite = null;
-
-  {
-    ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
-    ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
-    ((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL);
-    ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
-    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
-    ((Log4JLogger)InterDatanodeProtocol.LOG).getLogger().setLevel(Level.ALL);
-  }
+  
+  private final SampleQuantiles quantiles = new SampleQuantiles(
+      new Quantile[] {
+        new Quantile(0.50, 0.050),
+        new Quantile(0.75, 0.025), new Quantile(0.90, 0.010),
+        new Quantile(0.95, 0.005), new Quantile(0.99, 0.001) });
 
   /*
    * creates a file but does not close it
@@ -104,8 +99,11 @@ public class TestMultiThreadedHflush {
     }
 
     private void doAWrite() throws IOException {
+      Stopwatch sw = new Stopwatch().start();
       stm.write(toWrite);
       stm.hflush();
+      long micros = sw.elapsedTime(TimeUnit.MICROSECONDS);
+      quantiles.insert(micros);
     }
   }
 
@@ -115,14 +113,28 @@ public class TestMultiThreadedHflush {
    * They all finish before the file is closed.
    */
   @Test
-  public void testMultipleHflushers() throws Exception {
+  public void testMultipleHflushersRepl1() throws Exception {
+    doTestMultipleHflushers(1);
+  }
+  
+  @Test
+  public void testMultipleHflushersRepl3() throws Exception {
+    doTestMultipleHflushers(3);
+  }
+  
+  private void doTestMultipleHflushers(int repl) throws Exception {
     Configuration conf = new Configuration();
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(repl)
+        .build();
 
     FileSystem fs = cluster.getFileSystem();
     Path p = new Path("/multiple-hflushers.dat");
     try {
-      doMultithreadedWrites(conf, p, NUM_THREADS, WRITE_SIZE, NUM_WRITES_PER_THREAD);
+      doMultithreadedWrites(conf, p, NUM_THREADS, WRITE_SIZE,
+          NUM_WRITES_PER_THREAD, repl);
+      System.out.println("Latency quantiles (in microseconds):\n" +
+          quantiles);
     } finally {
       fs.close();
       cluster.shutdown();
@@ -200,13 +212,13 @@ public class TestMultiThreadedHflush {
   }
 
   public void doMultithreadedWrites(
-    Configuration conf, Path p, int numThreads, int bufferSize, int numWrites)
-    throws Exception {
+      Configuration conf, Path p, int numThreads, int bufferSize, int numWrites,
+      int replication) throws Exception {
     initBuffer(bufferSize);
 
     // create a new file.
     FileSystem fs = p.getFileSystem(conf);
-    FSDataOutputStream stm = createFile(fs, p, 1);
+    FSDataOutputStream stm = createFile(fs, p, replication);
     System.out.println("Created file simpleFlush.dat");
 
     // There have been a couple issues with flushing empty buffers, so do
@@ -240,20 +252,41 @@ public class TestMultiThreadedHflush {
   }
 
   public static void main(String args[]) throws Exception {
-    if (args.length != 1) {
-      System.err.println(
-        "usage: " + TestMultiThreadedHflush.class.getSimpleName() +
-        " <path to test file> ");
-      System.exit(1);
+    System.exit(ToolRunner.run(new CLIBenchmark(), args));
+  }
+  
+  private static class CLIBenchmark extends Configured implements Tool {
+    public int run(String args[]) throws Exception {
+      if (args.length != 1) {
+        System.err.println(
+          "usage: " + TestMultiThreadedHflush.class.getSimpleName() +
+          " <path to test file> ");
+        System.err.println(
+            "Configurations settable by -D options:\n" +
+            "  num.threads [default 10] - how many threads to run\n" +
+            "  write.size [default 511] - bytes per write\n" +
+            "  num.writes [default 50000] - how many writes to perform");
+        System.exit(1);
+      }
+      TestMultiThreadedHflush test = new TestMultiThreadedHflush();
+      Configuration conf = getConf();
+      Path p = new Path(args[0]);
+      
+      int numThreads = conf.getInt("num.threads", 10);
+      int writeSize = conf.getInt("write.size", 511);
+      int numWrites = conf.getInt("num.writes", 50000);
+      int replication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
+          DFSConfigKeys.DFS_REPLICATION_DEFAULT);
+      
+      Stopwatch sw = new Stopwatch().start();
+      test.doMultithreadedWrites(conf, p, numThreads, writeSize, numWrites,
+          replication);
+      sw.stop();
+  
+      System.out.println("Finished in " + sw.elapsedMillis() + "ms");
+      System.out.println("Latency quantiles (in microseconds):\n" +
+          test.quantiles);
+      return 0;
     }
-    TestMultiThreadedHflush test = new TestMultiThreadedHflush();
-    Configuration conf = new Configuration();
-    Path p = new Path(args[0]);
-    long st = System.nanoTime();
-    test.doMultithreadedWrites(conf, p, 10, 511, 50000);
-    long et = System.nanoTime();
-
-    System.out.println("Finished in " + ((et - st) / 1000000) + "ms");
   }
-
 }

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/TestPacketReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/TestPacketReceiver.java?rev=1398590&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/TestPacketReceiver.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/TestPacketReceiver.java Tue Oct 16 00:54:47 2012
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hdfs.AppendTestUtil;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.*;
+
+public class TestPacketReceiver {
+
+  private static long OFFSET_IN_BLOCK = 12345L;
+  private static int SEQNO = 54321;
+
+  private byte[] prepareFakePacket(byte[] data, byte[] sums) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(baos);
+    
+    int packetLen = data.length + sums.length + 4;
+    PacketHeader header = new PacketHeader(
+        packetLen, OFFSET_IN_BLOCK, SEQNO, false, data.length, false);
+    header.write(dos);
+    
+    dos.write(sums);
+    dos.write(data);
+    dos.flush();
+    return baos.toByteArray();
+  }
+  
+  private static byte[] remainingAsArray(ByteBuffer buf) {
+    byte[] b = new byte[buf.remaining()];
+    buf.get(b);
+    return b;
+  }
+  
+  @Test
+  public void testReceiveAndMirror() throws IOException {
+    PacketReceiver pr = new PacketReceiver(false);
+
+    // Test three different lengths, to force reallocing
+    // the buffer as it grows.
+    doTestReceiveAndMirror(pr, 100, 10);
+    doTestReceiveAndMirror(pr, 50, 10);
+    doTestReceiveAndMirror(pr, 150, 10);
+
+    pr.close();
+  }
+  
+  private void doTestReceiveAndMirror(PacketReceiver pr,
+      int dataLen, int checksumsLen) throws IOException {
+    final byte[] DATA = AppendTestUtil.initBuffer(dataLen);
+    final byte[] CHECKSUMS = AppendTestUtil.initBuffer(checksumsLen);
+
+    byte[] packet = prepareFakePacket(DATA, CHECKSUMS);
+    ByteArrayInputStream in = new ByteArrayInputStream(packet);
+    
+    pr.receiveNextPacket(in);
+    
+    ByteBuffer parsedData = pr.getDataSlice();
+    assertArrayEquals(DATA, remainingAsArray(parsedData));
+
+    ByteBuffer parsedChecksums = pr.getChecksumSlice();
+    assertArrayEquals(CHECKSUMS, remainingAsArray(parsedChecksums));
+    
+    PacketHeader header = pr.getHeader();
+    assertEquals(SEQNO, header.getSeqno());
+    assertEquals(OFFSET_IN_BLOCK, header.getOffsetInBlock());
+    
+    // Mirror the packet to an output stream and make sure it matches
+    // the packet we sent.
+    ByteArrayOutputStream mirrored = new ByteArrayOutputStream();
+    mirrored = Mockito.spy(mirrored);
+
+    pr.mirrorPacketTo(new DataOutputStream(mirrored));
+    // The write should be done in a single call. Otherwise we may hit
+    // nasty interactions with nagling (eg HDFS-4049).
+    Mockito.verify(mirrored, Mockito.times(1))
+      .write(Mockito.<byte[]>any(), Mockito.anyInt(),
+          Mockito.eq(packet.length));
+    Mockito.verifyNoMoreInteractions(mirrored);
+
+    assertArrayEquals(packet, mirrored.toByteArray());
+  }
+}