You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2012/10/16 02:55:30 UTC
svn commit: r1398591 - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/
src/test/java/org/apache/hadoop/hdfs/
src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/
Author: todd
Date: Tue Oct 16 00:55:29 2012
New Revision: 1398591
URL: http://svn.apache.org/viewvc?rev=1398591&view=rev
Log:
HDFS-4049. Fix hflush performance regression due to nagling delays. Contributed by Todd Lipcon.
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/TestPacketReceiver.java
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1398591&r1=1398590&r2=1398591&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Oct 16 00:55:29 2012
@@ -440,6 +440,9 @@ Release 2.0.3-alpha - Unreleased
HDFS-4044. Duplicate ChecksumType definition in HDFS .proto files.
(Binglin Chang via suresh)
+ HDFS-4049. Fix hflush performance regression due to nagling delays
+ (todd)
+
Release 2.0.2-alpha - 2012-09-07
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java?rev=1398591&r1=1398590&r2=1398591&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java Tue Oct 16 00:55:29 2012
@@ -53,14 +53,8 @@ public class PacketReceiver implements C
private final boolean useDirectBuffers;
/**
- * Internal buffer for reading the length prefixes at the start of
- * the packet.
- */
- private final ByteBuffer lengthPrefixBuf = ByteBuffer.allocate(
- PacketHeader.PKT_LENGTHS_LEN);
-
- /**
- * The entirety of the most recently read packet, excepting the
+ * The entirety of the most recently read packet.
+ * The first PKT_LENGTHS_LEN bytes of this buffer are the
* length prefixes.
*/
private ByteBuffer curPacketBuf = null;
@@ -82,6 +76,7 @@ public class PacketReceiver implements C
public PacketReceiver(boolean useDirectBuffers) {
this.useDirectBuffers = useDirectBuffers;
+ reallocPacketBuf(PacketHeader.PKT_LENGTHS_LEN);
}
public PacketHeader getHeader() {
@@ -133,11 +128,12 @@ public class PacketReceiver implements C
// checksums were not requested
// DATA the actual block data
Preconditions.checkState(curHeader == null || !curHeader.isLastPacketInBlock());
-
- lengthPrefixBuf.clear();
- doReadFully(ch, in, lengthPrefixBuf);
- lengthPrefixBuf.flip();
- int payloadLen = lengthPrefixBuf.getInt();
+
+ curPacketBuf.clear();
+ curPacketBuf.limit(PacketHeader.PKT_LENGTHS_LEN);
+ doReadFully(ch, in, curPacketBuf);
+ curPacketBuf.flip();
+ int payloadLen = curPacketBuf.getInt();
if (payloadLen < Ints.BYTES) {
// The "payload length" includes its own length. Therefore it
@@ -146,7 +142,7 @@ public class PacketReceiver implements C
payloadLen);
}
int dataPlusChecksumLen = payloadLen - Ints.BYTES;
- int headerLen = lengthPrefixBuf.getShort();
+ int headerLen = curPacketBuf.getShort();
if (headerLen < 0) {
throw new IOException("Invalid header length " + headerLen);
}
@@ -166,13 +162,17 @@ public class PacketReceiver implements C
// Make sure we have space for the whole packet, and
// read it.
- reallocPacketBuf(dataPlusChecksumLen + headerLen);
+ reallocPacketBuf(PacketHeader.PKT_LENGTHS_LEN +
+ dataPlusChecksumLen + headerLen);
curPacketBuf.clear();
- curPacketBuf.limit(dataPlusChecksumLen + headerLen);
+ curPacketBuf.position(PacketHeader.PKT_LENGTHS_LEN);
+ curPacketBuf.limit(PacketHeader.PKT_LENGTHS_LEN +
+ dataPlusChecksumLen + headerLen);
doReadFully(ch, in, curPacketBuf);
curPacketBuf.flip();
+ curPacketBuf.position(PacketHeader.PKT_LENGTHS_LEN);
- // Extract the header from the front of the buffer.
+ // Extract the header from the front of the buffer (after the length prefixes)
byte[] headerBuf = new byte[headerLen];
curPacketBuf.get(headerBuf);
if (curHeader == null) {
@@ -197,10 +197,6 @@ public class PacketReceiver implements C
public void mirrorPacketTo(DataOutputStream mirrorOut) throws IOException {
Preconditions.checkState(!useDirectBuffers,
"Currently only supported for non-direct buffers");
- assert lengthPrefixBuf.capacity() == PacketHeader.PKT_LENGTHS_LEN;
- mirrorOut.write(lengthPrefixBuf.array(),
- lengthPrefixBuf.arrayOffset(),
- lengthPrefixBuf.capacity());
mirrorOut.write(curPacketBuf.array(),
curPacketBuf.arrayOffset(),
curPacketBuf.remaining());
@@ -223,23 +219,36 @@ public class PacketReceiver implements C
private void reslicePacket(
int headerLen, int checksumsLen, int dataLen) {
+ // Packet structure (refer to doRead() for details):
+ // PLEN HLEN HEADER CHECKSUMS DATA
+ // 32-bit 16-bit <protobuf> <variable length>
+ // |--- lenThroughHeader ----|
+ // |----------- lenThroughChecksums ----|
+ // |------------------- lenThroughData ------|
+ int lenThroughHeader = PacketHeader.PKT_LENGTHS_LEN + headerLen;
+ int lenThroughChecksums = lenThroughHeader + checksumsLen;
+ int lenThroughData = lenThroughChecksums + dataLen;
+
assert dataLen >= 0 : "invalid datalen: " + dataLen;
-
- assert curPacketBuf.position() == headerLen;
- assert checksumsLen + dataLen == curPacketBuf.remaining() :
+ assert curPacketBuf.position() == lenThroughHeader;
+ assert curPacketBuf.limit() == lenThroughData :
"headerLen= " + headerLen + " clen=" + checksumsLen + " dlen=" + dataLen +
" rem=" + curPacketBuf.remaining();
-
- curPacketBuf.position(headerLen);
- curPacketBuf.limit(headerLen + checksumsLen);
+
+ // Slice the checksums.
+ curPacketBuf.position(lenThroughHeader);
+ curPacketBuf.limit(lenThroughChecksums);
curChecksumSlice = curPacketBuf.slice();
- curPacketBuf.position(headerLen + checksumsLen);
- curPacketBuf.limit(headerLen + checksumsLen + dataLen);
+ // Slice the data.
+ curPacketBuf.position(lenThroughChecksums);
+ curPacketBuf.limit(lenThroughData);
curDataSlice = curPacketBuf.slice();
+ // Reset buffer to point to the entirety of the packet (including
+ // length prefixes)
curPacketBuf.position(0);
- curPacketBuf.limit(headerLen + checksumsLen + dataLen);
+ curPacketBuf.limit(lenThroughData);
}
@@ -258,12 +267,21 @@ public class PacketReceiver implements C
// one.
if (curPacketBuf == null ||
curPacketBuf.capacity() < atLeastCapacity) {
- returnPacketBufToPool();
+ ByteBuffer newBuf;
if (useDirectBuffers) {
- curPacketBuf = bufferPool.getBuffer(atLeastCapacity);
+ newBuf = bufferPool.getBuffer(atLeastCapacity);
} else {
- curPacketBuf = ByteBuffer.allocate(atLeastCapacity);
+ newBuf = ByteBuffer.allocate(atLeastCapacity);
}
+ // If reallocing an existing buffer, copy the old packet length
+ // prefixes over
+ if (curPacketBuf != null) {
+ curPacketBuf.flip();
+ newBuf.put(curPacketBuf);
+ }
+
+ returnPacketBufToPool();
+ curPacketBuf = newBuf;
}
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java?rev=1398591&r1=1398590&r2=1398591&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java Tue Oct 16 00:55:29 2012
@@ -20,45 +20,40 @@ package org.apache.hadoop.hdfs;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
-import org.apache.log4j.Level;
+import org.apache.hadoop.metrics2.util.Quantile;
+import org.apache.hadoop.metrics2.util.SampleQuantiles;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
import org.junit.Test;
+import com.google.common.base.Stopwatch;
+
/**
* This class tests hflushing concurrently from many threads.
*/
public class TestMultiThreadedHflush {
static final int blockSize = 1024*1024;
- static final int numBlocks = 10;
- static final int fileSize = numBlocks * blockSize + 1;
private static final int NUM_THREADS = 10;
private static final int WRITE_SIZE = 517;
private static final int NUM_WRITES_PER_THREAD = 1000;
private byte[] toWrite = null;
-
- {
- ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
- ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
- ((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL);
- ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
- ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
- ((Log4JLogger)InterDatanodeProtocol.LOG).getLogger().setLevel(Level.ALL);
- }
+
+ private final SampleQuantiles quantiles = new SampleQuantiles(
+ new Quantile[] {
+ new Quantile(0.50, 0.050),
+ new Quantile(0.75, 0.025), new Quantile(0.90, 0.010),
+ new Quantile(0.95, 0.005), new Quantile(0.99, 0.001) });
/*
* creates a file but does not close it
@@ -104,8 +99,11 @@ public class TestMultiThreadedHflush {
}
private void doAWrite() throws IOException {
+ Stopwatch sw = new Stopwatch().start();
stm.write(toWrite);
stm.hflush();
+ long micros = sw.elapsedTime(TimeUnit.MICROSECONDS);
+ quantiles.insert(micros);
}
}
@@ -115,14 +113,28 @@ public class TestMultiThreadedHflush {
* They all finish before the file is closed.
*/
@Test
- public void testMultipleHflushers() throws Exception {
+ public void testMultipleHflushersRepl1() throws Exception {
+ doTestMultipleHflushers(1);
+ }
+
+ @Test
+ public void testMultipleHflushersRepl3() throws Exception {
+ doTestMultipleHflushers(3);
+ }
+
+ private void doTestMultipleHflushers(int repl) throws Exception {
Configuration conf = new Configuration();
- MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(repl)
+ .build();
FileSystem fs = cluster.getFileSystem();
Path p = new Path("/multiple-hflushers.dat");
try {
- doMultithreadedWrites(conf, p, NUM_THREADS, WRITE_SIZE, NUM_WRITES_PER_THREAD);
+ doMultithreadedWrites(conf, p, NUM_THREADS, WRITE_SIZE,
+ NUM_WRITES_PER_THREAD, repl);
+ System.out.println("Latency quantiles (in microseconds):\n" +
+ quantiles);
} finally {
fs.close();
cluster.shutdown();
@@ -200,13 +212,13 @@ public class TestMultiThreadedHflush {
}
public void doMultithreadedWrites(
- Configuration conf, Path p, int numThreads, int bufferSize, int numWrites)
- throws Exception {
+ Configuration conf, Path p, int numThreads, int bufferSize, int numWrites,
+ int replication) throws Exception {
initBuffer(bufferSize);
// create a new file.
FileSystem fs = p.getFileSystem(conf);
- FSDataOutputStream stm = createFile(fs, p, 1);
+ FSDataOutputStream stm = createFile(fs, p, replication);
System.out.println("Created file simpleFlush.dat");
// There have been a couple issues with flushing empty buffers, so do
@@ -240,20 +252,41 @@ public class TestMultiThreadedHflush {
}
public static void main(String args[]) throws Exception {
- if (args.length != 1) {
- System.err.println(
- "usage: " + TestMultiThreadedHflush.class.getSimpleName() +
- " <path to test file> ");
- System.exit(1);
+ System.exit(ToolRunner.run(new CLIBenchmark(), args));
+ }
+
+ private static class CLIBenchmark extends Configured implements Tool {
+ public int run(String args[]) throws Exception {
+ if (args.length != 1) {
+ System.err.println(
+ "usage: " + TestMultiThreadedHflush.class.getSimpleName() +
+ " <path to test file> ");
+ System.err.println(
+ "Configurations settable by -D options:\n" +
+ " num.threads [default 10] - how many threads to run\n" +
+ " write.size [default 511] - bytes per write\n" +
+ " num.writes [default 50000] - how many writes to perform");
+ System.exit(1);
+ }
+ TestMultiThreadedHflush test = new TestMultiThreadedHflush();
+ Configuration conf = getConf();
+ Path p = new Path(args[0]);
+
+ int numThreads = conf.getInt("num.threads", 10);
+ int writeSize = conf.getInt("write.size", 511);
+ int numWrites = conf.getInt("num.writes", 50000);
+ int replication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
+ DFSConfigKeys.DFS_REPLICATION_DEFAULT);
+
+ Stopwatch sw = new Stopwatch().start();
+ test.doMultithreadedWrites(conf, p, numThreads, writeSize, numWrites,
+ replication);
+ sw.stop();
+
+ System.out.println("Finished in " + sw.elapsedMillis() + "ms");
+ System.out.println("Latency quantiles (in microseconds):\n" +
+ test.quantiles);
+ return 0;
}
- TestMultiThreadedHflush test = new TestMultiThreadedHflush();
- Configuration conf = new Configuration();
- Path p = new Path(args[0]);
- long st = System.nanoTime();
- test.doMultithreadedWrites(conf, p, 10, 511, 50000);
- long et = System.nanoTime();
-
- System.out.println("Finished in " + ((et - st) / 1000000) + "ms");
}
-
}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/TestPacketReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/TestPacketReceiver.java?rev=1398591&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/TestPacketReceiver.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/TestPacketReceiver.java Tue Oct 16 00:55:29 2012
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hdfs.AppendTestUtil;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.*;
+
+public class TestPacketReceiver {
+
+ private static long OFFSET_IN_BLOCK = 12345L;
+ private static int SEQNO = 54321;
+
+ private byte[] prepareFakePacket(byte[] data, byte[] sums) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+
+ int packetLen = data.length + sums.length + 4;
+ PacketHeader header = new PacketHeader(
+ packetLen, OFFSET_IN_BLOCK, SEQNO, false, data.length, false);
+ header.write(dos);
+
+ dos.write(sums);
+ dos.write(data);
+ dos.flush();
+ return baos.toByteArray();
+ }
+
+ private static byte[] remainingAsArray(ByteBuffer buf) {
+ byte[] b = new byte[buf.remaining()];
+ buf.get(b);
+ return b;
+ }
+
+ @Test
+ public void testReceiveAndMirror() throws IOException {
+ PacketReceiver pr = new PacketReceiver(false);
+
+ // Test three different lengths, to force reallocing
+ // the buffer as it grows.
+ doTestReceiveAndMirror(pr, 100, 10);
+ doTestReceiveAndMirror(pr, 50, 10);
+ doTestReceiveAndMirror(pr, 150, 10);
+
+ pr.close();
+ }
+
+ private void doTestReceiveAndMirror(PacketReceiver pr,
+ int dataLen, int checksumsLen) throws IOException {
+ final byte[] DATA = AppendTestUtil.initBuffer(dataLen);
+ final byte[] CHECKSUMS = AppendTestUtil.initBuffer(checksumsLen);
+
+ byte[] packet = prepareFakePacket(DATA, CHECKSUMS);
+ ByteArrayInputStream in = new ByteArrayInputStream(packet);
+
+ pr.receiveNextPacket(in);
+
+ ByteBuffer parsedData = pr.getDataSlice();
+ assertArrayEquals(DATA, remainingAsArray(parsedData));
+
+ ByteBuffer parsedChecksums = pr.getChecksumSlice();
+ assertArrayEquals(CHECKSUMS, remainingAsArray(parsedChecksums));
+
+ PacketHeader header = pr.getHeader();
+ assertEquals(SEQNO, header.getSeqno());
+ assertEquals(OFFSET_IN_BLOCK, header.getOffsetInBlock());
+
+ // Mirror the packet to an output stream and make sure it matches
+ // the packet we sent.
+ ByteArrayOutputStream mirrored = new ByteArrayOutputStream();
+ mirrored = Mockito.spy(mirrored);
+
+ pr.mirrorPacketTo(new DataOutputStream(mirrored));
+ // The write should be done in a single call. Otherwise we may hit
+ // nasty interactions with nagling (eg HDFS-4049).
+ Mockito.verify(mirrored, Mockito.times(1))
+ .write(Mockito.<byte[]>any(), Mockito.anyInt(),
+ Mockito.eq(packet.length));
+ Mockito.verifyNoMoreInteractions(mirrored);
+
+ assertArrayEquals(packet, mirrored.toByteArray());
+ }
+}