You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2009/08/24 19:40:59 UTC

svn commit: r807315 - in /hadoop/hdfs/branches/HDFS-265: CHANGES.txt src/java/org/apache/hadoop/hdfs/DFSClient.java

Author: hairong
Date: Mon Aug 24 17:40:58 2009
New Revision: 807315

URL: http://svn.apache.org/viewvc?rev=807315&view=rev
Log:
HDFS-547. TestHDFSFileSystemContract#testOutputStreamClosedTwice sometimes fails with ClosedByInterruptException. Contributed by Hairong Kuang.

Modified:
    hadoop/hdfs/branches/HDFS-265/CHANGES.txt
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java

Modified: hadoop/hdfs/branches/HDFS-265/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/CHANGES.txt?rev=807315&r1=807314&r2=807315&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-265/CHANGES.txt Mon Aug 24 17:40:58 2009
@@ -161,6 +161,9 @@
 
     HDFS-534. Include avro in ivy.  (szetszwo)
 
+    HDFS-547. TestHDFSFileSystemContract#testOutputStreamClosedTwice
+    sometimes fails with CloseByInterruptException. (hairong)
+
 Release 0.20.1 - Unreleased
 
   IMPROVEMENTS

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=807315&r1=807314&r2=807315&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java Mon Aug 24 17:40:58 2009
@@ -2433,17 +2433,15 @@
                   }
                 }
               }
-              if (streamerClosed || hasError || !clientRunning) {
-                continue;
+              
+              if (ackQueue.isEmpty()) { // done receiving all acks
+                if (response != null) {
+                  response.close(); // notify responder to close
+                }
+                // indicate end-of-block
+                blockStream.writeInt(0);
+                blockStream.flush();
               }
-
-              // done receiving all acks
-              if (response != null) {
-                response.close(); // notify responder to close
-              }
-              // indicate end-of-block
-              blockStream.writeInt(0);
-              blockStream.flush();
             }
             if (LOG.isDebugEnabled()) {
               LOG.debug("DataStreamer block " + block +
@@ -2500,10 +2498,16 @@
        * close both streamer and DFSOutputStream, should be called only 
        * by an external thread and only after all data to be sent has 
        * been flushed to datanode.
+       * 
+       * Interrupt this data streamer if force is true
+       * 
+       * @param force if this data stream is forced to be closed 
        */
-      void close() {
+      void close(boolean force) {
         streamerClosed = true;
-        this.interrupt();
+        if (force) {
+          this.interrupt();
+        }
       }
 
       private void closeResponder() {
@@ -3267,7 +3271,7 @@
         }
       } catch (IOException e) {
           lastException = new IOException("IOException flush:" + e);
-          closeThreads();
+          closeThreads(true);
           throw e;
       }
     }
@@ -3308,13 +3312,14 @@
       }
       streamer.setLastException(new IOException("Lease timeout of " +
                                (hdfsTimeout/1000) + " seconds expired."));
-      closeThreads();
+      closeThreads(true);
     }
  
     // shutdown datastreamer and responseprocessor threads.
-    private void closeThreads() throws IOException {
+    // interrupt datastreamer if force is true
+    private void closeThreads(boolean force) throws IOException {
       try {
-        streamer.close();
+        streamer.close(force);
         streamer.join();
         if (s != null) {
           s.close();
@@ -3358,7 +3363,7 @@
         }
 
         flushInternal();             // flush all data to Datanodes
-        closeThreads();
+        closeThreads(false);
         completeFile();
         leasechecker.remove(src);
       } finally {