You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2010/07/19 22:35:54 UTC

svn commit: r965621 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/

Author: szetszwo
Date: Mon Jul 19 20:35:54 2010
New Revision: 965621

URL: http://svn.apache.org/viewvc?rev=965621&view=rev
Log:
HDFS-1085. HFTP read may fail silently on the client side if there is an exception on the server side.

Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStreamFile.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=965621&r1=965620&r2=965621&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Mon Jul 19 20:35:54 2010
@@ -145,6 +145,9 @@ Trunk (unreleased changes)
 
     HDFS-1007. HFTP needs to be updated to use delegation tokens (boryas)
 
+    HDFS-1085. HFTP read may fail silently on the client side if there is an
+    exception on the server side.  (szetszwo)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java?rev=965621&r1=965620&r2=965621&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java Mon Jul 19 20:35:54 2010
@@ -22,7 +22,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.HttpURLConnection;
 import java.net.URL;
+
 import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.hdfs.server.namenode.StreamFile;
 
 
 /**
@@ -64,6 +66,8 @@ class ByteRangeInputStream extends FSInp
   protected URLOpener resolvedURL;
   protected long startPos = 0;
   protected long currentPos = 0;
+  protected long filelength;
+
   protected int status = STATUS_SEEK;
   protected static final int STATUS_NORMAL = 0;
   protected static final int STATUS_SEEK = 1;
@@ -102,6 +106,11 @@ class ByteRangeInputStream extends FSInp
         connection.setRequestProperty("Range", "bytes="+startPos+"-");
       }
       connection.connect();
+      final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
+      filelength = cl == null? -1: Long.parseLong(cl);
+      if (HftpFileSystem.LOG.isDebugEnabled()) {
+        HftpFileSystem.LOG.debug("filelength = " + filelength);
+      }
       in = connection.getInputStream();
       
       if (startPos != 0 && connection.getResponseCode() != 206) {
@@ -123,12 +132,20 @@ class ByteRangeInputStream extends FSInp
     return in;
   }
   
-  public int read() throws IOException {
-    int ret = getInputStream().read();
-    if (ret != -1) {
-     currentPos++;
+  private void update(final boolean isEOF, final int n
+      ) throws IOException {
+    if (!isEOF) {
+      currentPos += n;
+    } else if (currentPos < filelength) {
+      throw new IOException("Got EOF but currentPos = " + currentPos
+          + " < filelength = " + filelength);
     }
-    return ret;
+  }
+
+  public int read() throws IOException {
+    final int b = getInputStream().read();
+    update(b == -1, 1);
+    return b;
   }
   
   /**

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=965621&r1=965620&r2=965621&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSInputStream.java Mon Jul 19 20:35:54 2010
@@ -26,6 +26,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.UnresolvedLinkException;
@@ -47,7 +48,8 @@ import org.apache.hadoop.util.StringUtil
  * DFSInputStream provides bytes from a named file.  It handles 
  * negotiation of the namenode and various datanodes as necessary.
  ****************************************************************/
-class DFSInputStream extends FSInputStream {
+@InterfaceAudience.Private
+public class DFSInputStream extends FSInputStream {
   private final DFSClient dfsClient;
   private Socket s = null;
   private boolean closed = false;

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java?rev=965621&r1=965620&r2=965621&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java Mon Jul 19 20:35:54 2010
@@ -24,6 +24,7 @@ import java.net.InetSocketAddress;
 import java.security.PrivilegedExceptionAction;
 import java.util.Enumeration;
 import java.util.List;
+
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
@@ -32,6 +33,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSInputStream;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -42,6 +44,8 @@ public class StreamFile extends DfsServl
   /** for java.io.Serializable */
   private static final long serialVersionUID = 1L;
 
+  public static final String CONTENT_LENGTH = "Content-Length";
+
   static InetSocketAddress nameNodeAddr;
   static DataNode datanode = null;
   static {
@@ -79,7 +83,7 @@ public class StreamFile extends DfsServl
       return;
     }
     
-    Enumeration reqRanges = request.getHeaders("Range");
+    Enumeration<?> reqRanges = request.getHeaders("Range");
     if (reqRanges != null && !reqRanges.hasMoreElements())
       reqRanges = null;
 
@@ -91,13 +95,13 @@ public class StreamFile extends DfsServl
       return;
     }
     
-    long fileLen = dfs.getFileInfo(filename).getLen();
-    FSInputStream in = dfs.open(filename);
+    final DFSInputStream in = dfs.open(filename);
+    final long fileLen = in.getFileLength();
     OutputStream os = response.getOutputStream();
 
     try {
       if (reqRanges != null) {
-        List ranges = InclusiveByteRange.satisfiableRanges(reqRanges,
+        List<?> ranges = InclusiveByteRange.satisfiableRanges(reqRanges,
                                                            fileLen);
         StreamFile.sendPartialData(in, os, response, fileLen, ranges);
       } else {
@@ -105,12 +109,21 @@ public class StreamFile extends DfsServl
         response.setHeader("Content-Disposition", "attachment; filename=\"" + 
                            filename + "\"");
         response.setContentType("application/octet-stream");
+        response.setHeader(CONTENT_LENGTH, "" + fileLen);
         StreamFile.writeTo(in, os, 0L, fileLen);
       }
+    } catch(IOException e) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("response.isCommitted()=" + response.isCommitted(), e);
+      }
+      throw e;
     } finally {
-      in.close();
-      os.close();
-      dfs.close();
+      try {
+        in.close();
+        os.close();
+      } finally {
+        dfs.close();
+      }
     }      
   }
   
@@ -118,7 +131,7 @@ public class StreamFile extends DfsServl
                               OutputStream os,
                               HttpServletResponse response,
                               long contentLength,
-                              List ranges)
+                              List<?> ranges)
   throws IOException {
 
     if (ranges == null || ranges.size() != 1) {

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStreamFile.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStreamFile.java?rev=965621&r1=965620&r2=965621&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStreamFile.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStreamFile.java Mon Jul 19 20:35:54 2010
@@ -17,24 +17,21 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import java.io.ByteArrayInputStream;
+import static org.junit.Assert.assertArrayEquals;
+
 import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.Enumeration;
 import java.util.List;
 import java.util.Vector;
+
 import javax.servlet.http.HttpServletResponse;
+
 import junit.framework.TestCase;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+
 import org.apache.hadoop.fs.FSInputStream;
-import org.apache.hadoop.hdfs.server.namenode.StreamFile;
 import org.mortbay.jetty.InclusiveByteRange;
-import static org.junit.Assert.*;
 
 /*
   Mock input stream class that always outputs the current position of the stream
@@ -78,7 +75,6 @@ class MockHttpServletResponse implements
     status = sc;
   }
   
-  @SuppressWarnings("deprecation")
   public void setStatus(int sc, java.lang.String sm) {
   }
   
@@ -189,8 +185,6 @@ class MockHttpServletResponse implements
 
 public class TestStreamFile extends TestCase {
   
-  private static final Log LOG =  LogFactory.getLog(TestStreamFile.class);
-
   // return an array matching the output of mockfsinputstream
   private static byte[] getOutputArray(int start, int count) {
     byte[] a = new byte[count];
@@ -231,9 +225,9 @@ public class TestStreamFile extends Test
     
   }
   
-  private List strToRanges(String s, int contentLength) {
+  private List<?> strToRanges(String s, int contentLength) {
     List<String> l = Arrays.asList(new String[]{"bytes="+s});
-    Enumeration e = (new Vector<String>(l)).elements();
+    Enumeration<?> e = (new Vector<String>(l)).elements();
     return InclusiveByteRange.satisfiableRanges(e, contentLength);
   }
   
@@ -243,7 +237,7 @@ public class TestStreamFile extends Test
 
     // test if multiple ranges, then 416
     { 
-      List ranges = strToRanges("0-,10-300", 500);
+      List<?> ranges = strToRanges("0-,10-300", 500);
       MockHttpServletResponse response = new MockHttpServletResponse();
       StreamFile.sendPartialData(in, os, response, 500, ranges);
       assertEquals("Multiple ranges should result in a 416 error",
@@ -261,7 +255,7 @@ public class TestStreamFile extends Test
 
     // test if invalid single range (out of bounds), then 416
     { 
-      List ranges = strToRanges("600-800", 500);
+      List<?> ranges = strToRanges("600-800", 500);
       MockHttpServletResponse response = new MockHttpServletResponse();
       StreamFile.sendPartialData(in, os, response, 500, ranges);
       assertEquals("Single (but invalid) range should result in a 416",
@@ -271,7 +265,7 @@ public class TestStreamFile extends Test
       
     // test if one (valid) range, then 206
     { 
-      List ranges = strToRanges("100-300", 500);
+      List<?> ranges = strToRanges("100-300", 500);
       MockHttpServletResponse response = new MockHttpServletResponse();
       StreamFile.sendPartialData(in, os, response, 500, ranges);
       assertEquals("Single (valid) range should result in a 206",