You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/02/04 22:13:46 UTC
svn commit: r1067287 - in /hadoop/hdfs/branches/branch-0.22: CHANGES.txt
src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java
Author: todd
Date: Fri Feb 4 21:13:41 2011
New Revision: 1067287
URL: http://svn.apache.org/viewvc?rev=1067287&view=rev
Log:
HDFS-1529. Incorrect handling of interrupts in waitForAckedSeqno can cause deadlock. Contributed by Todd Lipcon.
Modified:
hadoop/hdfs/branches/branch-0.22/CHANGES.txt
hadoop/hdfs/branches/branch-0.22/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java
Modified: hadoop/hdfs/branches/branch-0.22/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.22/CHANGES.txt?rev=1067287&r1=1067286&r2=1067287&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.22/CHANGES.txt (original)
+++ hadoop/hdfs/branches/branch-0.22/CHANGES.txt Fri Feb 4 21:13:41 2011
@@ -439,6 +439,9 @@ Release 0.22.0 - Unreleased
HDFS-900. Corrupt replicas are not processed correctly in block report (shv)
+ HDFS-1529. Incorrect handling of interrupts in waitForAckedSeqno can cause
+ deadlock (todd)
+
Release 0.21.1 - Unreleased
IMPROVEMENTS
Modified: hadoop/hdfs/branches/branch-0.22/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.22/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1067287&r1=1067286&r2=1067287&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.22/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/hdfs/branches/branch-0.22/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java Fri Feb 4 21:13:41 2011
@@ -24,6 +24,7 @@ import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
+import java.io.InterruptedIOException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
@@ -1167,7 +1168,16 @@ class DFSOutputStream extends FSOutputSu
while (!closed && dataQueue.size() + ackQueue.size() > MAX_PACKETS) {
try {
dataQueue.wait();
- } catch (InterruptedException e) {
+ } catch (InterruptedException e) {
+ // If we get interrupted while waiting to queue data, we still need to get rid
+ // of the current packet. This is because we have an invariant that if
+ // currentPacket gets full, it will get queued before the next writeChunk.
+ //
+ // Rather than wait around for space in the queue, we should instead try to
+ // return to the caller as soon as possible, even though we slightly overrun
+ // the MAX_PACKETS iength.
+ Thread.currentThread().interrupt();
+ break;
}
}
isClosed();
@@ -1339,6 +1349,11 @@ class DFSOutputStream extends FSOutputSu
throw ioe;
}
}
+ } catch (InterruptedIOException interrupt) {
+ // This kind of error doesn't mean that the stream itself is broken - just the
+ // flushing thread got interrupted. So, we shouldn't close down the writer,
+ // but instead just propagate the error
+ throw interrupt;
} catch (IOException e) {
DFSClient.LOG.warn("Error while syncing", e);
synchronized (this) {
@@ -1416,7 +1431,8 @@ class DFSOutputStream extends FSOutputSu
try {
dataQueue.wait(1000); // when we receive an ack, we notify on dataQueue
} catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
+ throw new InterruptedIOException(
+ "Interrupted while waiting for data to be acknowledged by pipeline");
}
}
}
Modified: hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java?rev=1067287&r1=1067286&r2=1067287&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java (original)
+++ hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java Fri Feb 4 21:13:41 2011
@@ -26,9 +26,12 @@ import org.apache.hadoop.hdfs.server.dat
import org.apache.log4j.Level;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import org.junit.Test;
+import java.io.InterruptedIOException;
import java.io.IOException;
/** Class contains a set of tests to verify the correctness of
@@ -170,38 +173,107 @@ public class TestHFlush {
System.out.println("p=" + p);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATANODE_NUM).build();
- DistributedFileSystem fs = (DistributedFileSystem)cluster.getFileSystem();
+ try {
+ DistributedFileSystem fs = (DistributedFileSystem)cluster.getFileSystem();
+
+ byte[] fileContents = AppendTestUtil.initBuffer(fileLen);
+
+ // create a new file.
+ FSDataOutputStream stm = AppendTestUtil.createFile(fs, p, DATANODE_NUM);
+ stm.write(fileContents, 0, 1);
+ Thread.sleep(timeout);
+ stm.hflush();
+ System.out.println("Wrote 1 byte and hflush " + p);
+
+ // write another byte
+ Thread.sleep(timeout);
+ stm.write(fileContents, 1, 1);
+ stm.hflush();
+
+ stm.write(fileContents, 2, 1);
+ Thread.sleep(timeout);
+ stm.hflush();
+
+ stm.write(fileContents, 3, 1);
+ Thread.sleep(timeout);
+ stm.write(fileContents, 4, 1);
+ stm.hflush();
+
+ stm.write(fileContents, 5, 1);
+ Thread.sleep(timeout);
+ stm.close();
+
+ // verify that entire file is good
+ AppendTestUtil.checkFullFile(fs, p, fileLen,
+ fileContents, "Failed to slowly write to a file");
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testHFlushInterrupted() throws Exception {
+ final int DATANODE_NUM = 2;
+ final int fileLen = 6;
byte[] fileContents = AppendTestUtil.initBuffer(fileLen);
+ Configuration conf = new HdfsConfiguration();
+ final Path p = new Path("/hflush-interrupted");
- // create a new file.
- FSDataOutputStream stm = AppendTestUtil.createFile(fs, p, DATANODE_NUM);
+ System.out.println("p=" + p);
- stm.write(fileContents, 0, 1);
- Thread.sleep(timeout);
- stm.hflush();
- System.out.println("Wrote 1 byte and hflush " + p);
-
- // write another byte
- Thread.sleep(timeout);
- stm.write(fileContents, 1, 1);
- stm.hflush();
-
- stm.write(fileContents, 2, 1);
- Thread.sleep(timeout);
- stm.hflush();
-
- stm.write(fileContents, 3, 1);
- Thread.sleep(timeout);
- stm.write(fileContents, 4, 1);
- stm.hflush();
-
- stm.write(fileContents, 5, 1);
- Thread.sleep(timeout);
- stm.close();
-
- // verify that entire file is good
- AppendTestUtil.checkFullFile(fs, p, fileLen,
- fileContents, "Failed to slowly write to a file");
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATANODE_NUM).build();
+ try {
+ DistributedFileSystem fs = (DistributedFileSystem)cluster.getFileSystem();
+
+ // create a new file.
+ FSDataOutputStream stm = AppendTestUtil.createFile(fs, p, DATANODE_NUM);
+
+ stm.write(fileContents, 0, 2);
+ Thread.currentThread().interrupt();
+ try {
+ stm.hflush();
+ // If we made it past the hflush(), then that means that the ack made it back
+ // from the pipeline before we got to the wait() call. In that case we should
+ // still have interrupted status.
+ assertTrue(Thread.currentThread().interrupted());
+ } catch (InterruptedIOException ie) {
+ System.out.println("Got expected exception during flush");
+ }
+ assertFalse(Thread.currentThread().interrupted());
+
+ // Try again to flush should succeed since we no longer have interrupt status
+ stm.hflush();
+
+ // Write some more data and flush
+ stm.write(fileContents, 2, 2);
+ stm.hflush();
+
+ // Write some data and close while interrupted
+
+ stm.write(fileContents, 4, 2);
+ Thread.currentThread().interrupt();
+ try {
+ stm.close();
+ // If we made it past the close(), then that means that the ack made it back
+ // from the pipeline before we got to the wait() call. In that case we should
+ // still have interrupted status.
+ assertTrue(Thread.currentThread().interrupted());
+ } catch (InterruptedIOException ioe) {
+ System.out.println("Got expected exception during close");
+ // If we got the exception, we shouldn't have interrupted status anymore.
+ assertFalse(Thread.currentThread().interrupted());
+
+ // Now do a successful close.
+ stm.close();
+ }
+
+
+ // verify that entire file is good
+ AppendTestUtil.checkFullFile(fs, p, fileLen,
+ fileContents, "Failed to deal with thread interruptions");
+ } finally {
+ cluster.shutdown();
+ }
}
}