You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2010/12/03 08:54:55 UTC
svn commit: r1041718 - in /hadoop/common/branches/branch-0.20-append:
CHANGES.txt src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
src/test/org/apache/hadoop/hdfs/TestMultiThreadedSync.java
Author: hairong
Date: Fri Dec 3 07:54:55 2010
New Revision: 1041718
URL: http://svn.apache.org/viewvc?rev=1041718&view=rev
Log:
HDFS-895. Allow hflush/sync to occur in parallel with new writes to the file. Contributed by Todd Lipcon.
Added:
hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestMultiThreadedSync.java
Modified:
hadoop/common/branches/branch-0.20-append/CHANGES.txt
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
Modified: hadoop/common/branches/branch-0.20-append/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/CHANGES.txt?rev=1041718&r1=1041717&r2=1041718&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-append/CHANGES.txt Fri Dec 3 07:54:55 2010
@@ -25,6 +25,9 @@ Release 0.20-append - Unreleased
HDFS-1202. DataBlockScanner throws NPE when updated before
initialized. (Todd Lipcon via dhruba)
+ HDFS-895. Allow hflush/sync to occur in parallel with new writes to
+ the file. (Todd Lipcon via hairong)
+
IMPROVEMENTS
BUG FIXES
Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1041718&r1=1041717&r2=1041718&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Fri Dec 3 07:54:55 2010
@@ -2171,6 +2171,8 @@ public class DFSClient implements FSCons
private DataStreamer streamer = new DataStreamer();;
private ResponseProcessor response = null;
private long currentSeqno = 0;
+ private long lastQueuedSeqno = -1;
+ private long lastAckedSeqno = -1;
private long bytesCurBlock = 0; // bytes writen in current block
private int packetSize = 0; // write packet size, including the header.
private int chunksPerPacket = 0;
@@ -2180,7 +2182,7 @@ public class DFSClient implements FSCons
private volatile int errorIndex = 0;
private volatile IOException lastException = null;
private long artificialSlowdown = 0;
- private long lastFlushOffset = -1; // offset when flush was invoked
+ private long lastFlushOffset = 0; // offset when flush was invoked
private boolean persistBlocks = false; // persist blocks on namenode
private int recoveryErrorCount = 0; // number of times block recovery failed
private int maxRecoveryErrorCount = 5; // try block recovery 5 times
@@ -2563,6 +2565,8 @@ public class DFSClient implements FSCons
lastPacketInBlock = one.lastPacketInBlock;
synchronized (ackQueue) {
+ assert ack.getSeqno() == lastAckedSeqno + 1;
+ lastAckedSeqno = ack.getSeqno();
ackQueue.removeFirst();
ackQueue.notifyAll();
}
@@ -3178,11 +3182,9 @@ public class DFSClient implements FSCons
if (bytesCurBlock == blockSize) {
currentPacket.lastPacketInBlock = true;
bytesCurBlock = 0;
- lastFlushOffset = -1;
+ lastFlushOffset = 0;
}
- dataQueue.addLast(currentPacket);
- dataQueue.notifyAll();
- currentPacket = null;
+ enqueueCurrentPacket();
// If this was the first write after reopening a file, then the above
// write filled up any partial chunk. Tell the summer to generate full
@@ -3198,58 +3200,102 @@ public class DFSClient implements FSCons
//LOG.debug("DFSClient writeChunk done length " + len +
// " checksum length " + cklen);
}
-
+
+ private synchronized void enqueueCurrentPacket() {
+ synchronized (dataQueue) {
+ if (currentPacket == null) return;
+ dataQueue.addLast(currentPacket);
+ dataQueue.notifyAll();
+ lastQueuedSeqno = currentPacket.seqno;
+ currentPacket = null;
+ }
+ }
+
/**
* All data is written out to datanodes. It is not guaranteed
* that data has been flushed to persistent store on the
* datanode. Block allocations are persisted on namenode.
*/
- public synchronized void sync() throws IOException {
+ public void sync() throws IOException {
+ checkOpen();
+ if (closed) {
+ throw new IOException("DFSOutputStream is closed");
+ }
try {
- /* Record current blockOffset. This might be changed inside
- * flushBuffer() where a partial checksum chunk might be flushed.
- * After the flush, reset the bytesCurBlock back to its previous value,
- * any partial checksum chunk will be sent now and in next packet.
- */
- long saveOffset = bytesCurBlock;
-
- // flush checksum buffer, but keep checksum buffer intact
- flushBuffer(true);
-
- LOG.debug("DFSClient flush() : saveOffset " + saveOffset +
- " bytesCurBlock " + bytesCurBlock +
- " lastFlushOffset " + lastFlushOffset);
-
- // Flush only if we haven't already flushed till this offset.
- if (lastFlushOffset != bytesCurBlock) {
-
- // record the valid offset of this flush
- lastFlushOffset = bytesCurBlock;
+ long toWaitFor;
+ synchronized (this) {
+ /* Record current blockOffset. This might be changed inside
+ * flushBuffer() where a partial checksum chunk might be flushed.
+ * After the flush, reset the bytesCurBlock back to its previous value,
+ * any partial checksum chunk will be sent now and in next packet.
+ */
+ long saveOffset = bytesCurBlock;
+ Packet oldCurrentPacket = currentPacket;
- // wait for all packets to be sent and acknowledged
- flushInternal();
- } else {
- // just discard the current packet since it is already been sent.
- currentPacket = null;
+ // flush checksum buffer, but keep checksum buffer intact
+ flushBuffer(true);
+ // bytesCurBlock potentially incremented if there was buffered data
+
+ // Flush only if we haven't already flushed till this offset.
+ if (lastFlushOffset != bytesCurBlock) {
+ assert bytesCurBlock > lastFlushOffset;
+ // record the valid offset of this flush
+ lastFlushOffset = bytesCurBlock;
+ enqueueCurrentPacket();
+ } else {
+ // just discard the current packet since it is already been sent.
+ if (oldCurrentPacket == null && currentPacket != null) {
+ // If we didn't previously have a packet queued, and now we do,
+ // but we don't plan on sending it, then we should not
+ // skip a sequence number for it!
+ currentSeqno--;
+ }
+ currentPacket = null;
+ }
+ // Restore state of stream. Record the last flush offset
+ // of the last full chunk that was flushed.
+ //
+ bytesCurBlock = saveOffset;
+ toWaitFor = lastQueuedSeqno;
}
-
- // Restore state of stream. Record the last flush offset
- // of the last full chunk that was flushed.
- //
- bytesCurBlock = saveOffset;
+ waitForAckedSeqno(toWaitFor);
// If any new blocks were allocated since the last flush,
// then persist block locations on namenode.
//
- if (persistBlocks) {
- namenode.fsync(src, clientName);
+ boolean willPersist;
+ synchronized (this) {
+ willPersist = persistBlocks && !closed;
persistBlocks = false;
}
+ if (willPersist) {
+ try {
+ namenode.fsync(src, clientName);
+ } catch (IOException ioe) {
+ DFSClient.LOG.warn("Unable to persist blocks in hflush for " + src, ioe);
+ // If we got an error here, it might be because some other thread called
+ // close before our hflush completed. In that case, we should throw an
+ // exception that the stream is closed.
+ isClosed();
+ if (closed) {
+ throw new IOException("DFSOutputStream is closed");
+ }
+
+ // If we aren't closed but failed to sync, we should expose that to the
+ // caller.
+ throw ioe;
+ }
+ }
} catch (IOException e) {
- lastException = new IOException("IOException flush:" + e);
- closed = true;
- closeThreads();
- throw e;
+ LOG.warn("Error while syncing", e);
+ synchronized (this) {
+ if (!closed) {
+ lastException = new IOException("IOException flush:" + e);
+ closed = true;
+ closeThreads();
+ }
+ }
+ throw e;
}
}
@@ -3275,57 +3321,34 @@ public class DFSClient implements FSCons
* Waits till all existing data is flushed and confirmations
* received from datanodes.
*/
- private synchronized void flushInternal() throws IOException {
- checkOpen();
+ private void flushInternal() throws IOException {
isClosed();
+ checkOpen();
- while (!closed) {
- synchronized (dataQueue) {
- isClosed();
- //
- // If there is data in the current buffer, send it across
- //
- if (currentPacket != null) {
- dataQueue.addLast(currentPacket);
- dataQueue.notifyAll();
- currentPacket = null;
- }
-
- // wait for all buffers to be flushed to datanodes
- if (!closed && dataQueue.size() != 0) {
- try {
- dataQueue.wait();
- } catch (InterruptedException e) {
- }
- continue;
- }
- }
+ long toWaitFor;
+ synchronized (this) {
+ enqueueCurrentPacket();
+ toWaitFor = lastQueuedSeqno;
+ }
- // wait for all acks to be received back from datanodes
- synchronized (ackQueue) {
- if (!closed && ackQueue.size() != 0) {
- try {
- ackQueue.wait();
- } catch (InterruptedException e) {
- }
- continue;
- }
- }
+ waitForAckedSeqno(toWaitFor);
+ }
- // acquire both the locks and verify that we are
- // *really done*. In the case of error recovery,
- // packets might move back from ackQueue to dataQueue.
- //
- synchronized (dataQueue) {
- synchronized (ackQueue) {
- if (dataQueue.size() + ackQueue.size() == 0) {
- break; // we are done
- }
+ private void waitForAckedSeqno(long seqnumToWaitFor) throws IOException {
+ synchronized (ackQueue) {
+ while (!closed) {
+ isClosed();
+ if (lastAckedSeqno >= seqnumToWaitFor) {
+ break;
}
+ try {
+ ackQueue.wait();
+ } catch (InterruptedException ie) {}
}
}
+ isClosed();
}
-
+
/**
* Closes this output stream and releases any system
* resources associated with this stream.
Added: hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestMultiThreadedSync.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestMultiThreadedSync.java?rev=1041718&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestMultiThreadedSync.java (added)
+++ hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestMultiThreadedSync.java Fri Dec 3 07:54:55 2010
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.junit.Test;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
+import org.apache.log4j.Level;
+
+/**
+ * This class tests the building blocks that are needed to
+ * support HDFS appends.
+ */
+public class TestMultiThreadedSync {
+ static final int blockSize = 1024*1024;
+ static final int numBlocks = 10;
+ static final int fileSize = numBlocks * blockSize + 1;
+
+ private static final int NUM_THREADS = 10;
+ private static final int WRITE_SIZE = 517;
+ private static final int NUM_WRITES_PER_THREAD = 1000;
+
+ private byte[] toWrite = null;
+
+ {
+ ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)InterDatanodeProtocol.LOG).getLogger().setLevel(Level.ALL);
+ }
+
+ /*
+ * creates a file but does not close it
+ */
+ private FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl)
+ throws IOException {
+ FSDataOutputStream stm = fileSys.create(name, true,
+ fileSys.getConf().getInt("io.file.buffer.size", 4096),
+ (short)repl, (long)blockSize);
+ return stm;
+ }
+
+ private void initBuffer(int size) {
+ long seed = AppendTestUtil.nextLong();
+ toWrite = AppendTestUtil.randomBytes(seed, size);
+ }
+
+ private class WriterThread extends Thread {
+ private final FSDataOutputStream stm;
+ private final AtomicReference<Throwable> thrown;
+ private final int numWrites;
+ private final CountDownLatch countdown;
+
+ public WriterThread(FSDataOutputStream stm,
+ AtomicReference<Throwable> thrown,
+ CountDownLatch countdown, int numWrites) {
+ this.stm = stm;
+ this.thrown = thrown;
+ this.numWrites = numWrites;
+ this.countdown = countdown;
+ }
+
+ public void run() {
+ try {
+ countdown.await();
+ for (int i = 0; i < numWrites && thrown.get() == null; i++) {
+ doAWrite();
+ }
+ } catch (Throwable t) {
+ thrown.compareAndSet(null, t);
+ }
+ }
+
+ private void doAWrite() throws IOException {
+ stm.write(toWrite);
+ stm.sync();
+ }
+ }
+
+
+ @Test
+ public void testMultipleSyncers() throws Exception {
+ Configuration conf = new Configuration();
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+ FileSystem fs = cluster.getFileSystem();
+ Path p = new Path("/multiple-syncers.dat");
+ try {
+ doMultithreadedWrites(conf, p, NUM_THREADS, WRITE_SIZE, NUM_WRITES_PER_THREAD);
+ } finally {
+ fs.close();
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * Test case where a bunch of threads are continuously calling sync() while another
+ * thread appends some data and then closes the file.
+ *
+ * The syncing threads should eventually catch an IOException stating that the stream
+ * was closed -- and not an NPE or anything like that.
+ */
+ @Test
+ public void testSyncWhileClosing() throws Throwable {
+ Configuration conf = new Configuration();
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+ FileSystem fs = cluster.getFileSystem();
+ Path p = new Path("/sync-and-close.dat");
+
+ final FSDataOutputStream stm = createFile(fs, p, 1);
+
+
+ ArrayList<Thread> flushers = new ArrayList<Thread>();
+ final AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
+ try {
+ for (int i = 0; i < 10; i++) {
+ Thread flusher = new Thread() {
+ public void run() {
+ try {
+ while (true) {
+ try {
+ stm.sync();
+ } catch (IOException ioe) {
+ if (!ioe.toString().contains("DFSOutputStream is closed")) {
+ throw ioe;
+ } else {
+ return;
+ }
+ }
+ }
+ } catch (Throwable t) {
+ thrown.set(t);
+ }
+ }
+ };
+ flusher.start();
+ flushers.add(flusher);
+ }
+
+ // Write some data
+ for (int i = 0; i < 10000; i++) {
+ stm.write(1);
+ }
+
+ // Close it while the flushing threads are still flushing
+ stm.close();
+
+ // Wait for the flushers to all die.
+ for (Thread t : flushers) {
+ t.join();
+ }
+
+ // They should have all gotten the expected exception, not anything
+ // else.
+ if (thrown.get() != null) {
+ throw thrown.get();
+ }
+
+ } finally {
+ fs.close();
+ cluster.shutdown();
+ }
+ }
+
+ public void doMultithreadedWrites(
+ Configuration conf, Path p, int numThreads, int bufferSize, int numWrites)
+ throws Exception {
+ initBuffer(bufferSize);
+
+ // create a new file.
+ FileSystem fs = p.getFileSystem(conf);
+ FSDataOutputStream stm = createFile(fs, p, 1);
+ System.out.println("Created file simpleFlush.dat");
+
+ // TODO move this bit to another test case
+ // There have been a couple issues with flushing empty buffers, so do
+ // some empty flushes first.
+ stm.sync();
+ stm.sync();
+ stm.write(1);
+ stm.sync();
+ stm.sync();
+
+ CountDownLatch countdown = new CountDownLatch(1);
+ ArrayList<Thread> threads = new ArrayList<Thread>();
+ AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
+ for (int i = 0; i < numThreads; i++) {
+ Thread t = new WriterThread(stm, thrown, countdown, numWrites);
+ threads.add(t);
+ t.start();
+ }
+
+ // Start all the threads at the same time for maximum raciness!
+ countdown.countDown();
+
+ for (Thread t : threads) {
+ t.join();
+ }
+ if (thrown.get() != null) {
+
+ throw new RuntimeException("Deferred", thrown.get());
+ }
+ stm.close();
+ System.out.println("Closed file.");
+ }
+
+ public static void main(String args[]) throws Exception {
+ TestMultiThreadedSync test = new TestMultiThreadedSync();
+ Configuration conf = new Configuration();
+ Path p = new Path("/user/todd/test.dat");
+ long st = System.nanoTime();
+ test.doMultithreadedWrites(conf, p, 10, 511, 50000);
+ long et = System.nanoTime();
+
+ System.out.println("Finished in " + ((et - st) / 1000000) + "ms");
+ }
+
+}