You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/02/04 22:13:46 UTC

svn commit: r1067287 - in /hadoop/hdfs/branches/branch-0.22: CHANGES.txt src/java/org/apache/hadoop/hdfs/DFSOutputStream.java src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java

Author: todd
Date: Fri Feb  4 21:13:41 2011
New Revision: 1067287

URL: http://svn.apache.org/viewvc?rev=1067287&view=rev
Log:
HDFS-1529. Incorrect handling of interrupts in waitForAckedSeqno can cause deadlock. Contributed by Todd Lipcon.

Modified:
    hadoop/hdfs/branches/branch-0.22/CHANGES.txt
    hadoop/hdfs/branches/branch-0.22/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
    hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java

Modified: hadoop/hdfs/branches/branch-0.22/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.22/CHANGES.txt?rev=1067287&r1=1067286&r2=1067287&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.22/CHANGES.txt (original)
+++ hadoop/hdfs/branches/branch-0.22/CHANGES.txt Fri Feb  4 21:13:41 2011
@@ -439,6 +439,9 @@ Release 0.22.0 - Unreleased
 
     HDFS-900. Corrupt replicas are not processed correctly in block report (shv)
 
+    HDFS-1529. Incorrect handling of interrupts in waitForAckedSeqno can cause
+    deadlock (todd)
+
 Release 0.21.1 - Unreleased
 
   IMPROVEMENTS

Modified: hadoop/hdfs/branches/branch-0.22/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.22/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1067287&r1=1067286&r2=1067287&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.22/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/hdfs/branches/branch-0.22/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java Fri Feb  4 21:13:41 2011
@@ -24,6 +24,7 @@ import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
+import java.io.InterruptedIOException;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
@@ -1167,7 +1168,16 @@ class DFSOutputStream extends FSOutputSu
       while (!closed && dataQueue.size() + ackQueue.size()  > MAX_PACKETS) {
         try {
           dataQueue.wait();
-        } catch (InterruptedException  e) {
+        } catch (InterruptedException e) {
+          // If we get interrupted while waiting to queue data, we still need to get rid
+          // of the current packet. This is because we have an invariant that if
+          // currentPacket gets full, it will get queued before the next writeChunk.
+          //
+          // Rather than wait around for space in the queue, we should instead try to
+          // return to the caller as soon as possible, even though we slightly overrun
+          // the MAX_PACKETS iength.
+          Thread.currentThread().interrupt();
+          break;
         }
       }
       isClosed();
@@ -1339,6 +1349,11 @@ class DFSOutputStream extends FSOutputSu
           throw ioe;
         }
       }
+    } catch (InterruptedIOException interrupt) {
+      // This kind of error doesn't mean that the stream itself is broken - just the
+      // flushing thread got interrupted. So, we shouldn't close down the writer,
+      // but instead just propagate the error
+      throw interrupt;
     } catch (IOException e) {
       DFSClient.LOG.warn("Error while syncing", e);
       synchronized (this) {
@@ -1416,7 +1431,8 @@ class DFSOutputStream extends FSOutputSu
         try {
           dataQueue.wait(1000); // when we receive an ack, we notify on dataQueue
         } catch (InterruptedException ie) {
-          Thread.currentThread().interrupt();
+          throw new InterruptedIOException(
+            "Interrupted while waiting for data to be acknowledged by pipeline");
         }
       }
     }

Modified: hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java?rev=1067287&r1=1067286&r2=1067287&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java (original)
+++ hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java Fri Feb  4 21:13:41 2011
@@ -26,9 +26,12 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.log4j.Level;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import org.junit.Test;
 
+import java.io.InterruptedIOException;
 import java.io.IOException;
 
 /** Class contains a set of tests to verify the correctness of 
@@ -170,38 +173,107 @@ public class TestHFlush {
     System.out.println("p=" + p);
     
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATANODE_NUM).build();
-    DistributedFileSystem fs = (DistributedFileSystem)cluster.getFileSystem();
+    try {
+      DistributedFileSystem fs = (DistributedFileSystem)cluster.getFileSystem();
+
+      byte[] fileContents = AppendTestUtil.initBuffer(fileLen);
+
+      // create a new file.
+      FSDataOutputStream stm = AppendTestUtil.createFile(fs, p, DATANODE_NUM);
 
+      stm.write(fileContents, 0, 1);
+      Thread.sleep(timeout);
+      stm.hflush();
+      System.out.println("Wrote 1 byte and hflush " + p);
+
+      // write another byte
+      Thread.sleep(timeout);
+      stm.write(fileContents, 1, 1);
+      stm.hflush();
+
+      stm.write(fileContents, 2, 1);
+      Thread.sleep(timeout);
+      stm.hflush();
+
+      stm.write(fileContents, 3, 1);
+      Thread.sleep(timeout);
+      stm.write(fileContents, 4, 1);
+      stm.hflush();
+
+      stm.write(fileContents, 5, 1);
+      Thread.sleep(timeout);
+      stm.close();
+
+      // verify that entire file is good
+      AppendTestUtil.checkFullFile(fs, p, fileLen,
+          fileContents, "Failed to slowly write to a file");
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testHFlushInterrupted() throws Exception {
+    final int DATANODE_NUM = 2;
+    final int fileLen = 6;
     byte[] fileContents = AppendTestUtil.initBuffer(fileLen);
+    Configuration conf = new HdfsConfiguration();
+    final Path p = new Path("/hflush-interrupted");
 
-    // create a new file.
-    FSDataOutputStream stm = AppendTestUtil.createFile(fs, p, DATANODE_NUM);
+    System.out.println("p=" + p);
 
-    stm.write(fileContents, 0, 1);
-    Thread.sleep(timeout);
-    stm.hflush();
-    System.out.println("Wrote 1 byte and hflush " + p);
-
-    // write another byte
-    Thread.sleep(timeout);
-    stm.write(fileContents, 1, 1);
-    stm.hflush();
-    
-    stm.write(fileContents, 2, 1);
-    Thread.sleep(timeout);
-    stm.hflush();
-    
-    stm.write(fileContents, 3, 1);
-    Thread.sleep(timeout);
-    stm.write(fileContents, 4, 1);
-    stm.hflush();
-    
-    stm.write(fileContents, 5, 1);
-    Thread.sleep(timeout);
-    stm.close();
-
-    // verify that entire file is good
-    AppendTestUtil.checkFullFile(fs, p, fileLen,
-        fileContents, "Failed to slowly write to a file");
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATANODE_NUM).build();
+    try {
+      DistributedFileSystem fs = (DistributedFileSystem)cluster.getFileSystem();
+
+      // create a new file.
+      FSDataOutputStream stm = AppendTestUtil.createFile(fs, p, DATANODE_NUM);
+
+      stm.write(fileContents, 0, 2);
+      Thread.currentThread().interrupt();
+      try {
+        stm.hflush();
+        // If we made it past the hflush(), then that means that the ack made it back
+        // from the pipeline before we got to the wait() call. In that case we should
+        // still have interrupted status.
+        assertTrue(Thread.currentThread().interrupted());
+      } catch (InterruptedIOException ie) {
+        System.out.println("Got expected exception during flush");
+      }
+      assertFalse(Thread.currentThread().interrupted());
+
+      // Try again to flush should succeed since we no longer have interrupt status
+      stm.hflush();
+
+      // Write some more data and flush
+      stm.write(fileContents, 2, 2);
+      stm.hflush();
+
+      // Write some data and close while interrupted
+
+      stm.write(fileContents, 4, 2);
+      Thread.currentThread().interrupt();
+      try {
+        stm.close();
+        // If we made it past the close(), then that means that the ack made it back
+        // from the pipeline before we got to the wait() call. In that case we should
+        // still have interrupted status.
+        assertTrue(Thread.currentThread().interrupted());
+      } catch (InterruptedIOException ioe) {
+        System.out.println("Got expected exception during close");
+        // If we got the exception, we shouldn't have interrupted status anymore.
+        assertFalse(Thread.currentThread().interrupted());
+
+        // Now do a successful close.
+        stm.close();
+      }
+
+
+      // verify that entire file is good
+      AppendTestUtil.checkFullFile(fs, p, fileLen,
+        fileContents, "Failed to deal with thread interruptions");
+    } finally {
+      cluster.shutdown();
+    }
   }
 }