You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by el...@apache.org on 2011/06/28 17:59:31 UTC

svn commit: r1140694 - in /hadoop/common/trunk/hdfs: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/org/apache/hadoop/hdfs/ src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/

Author: eli
Date: Tue Jun 28 15:59:31 2011
New Revision: 1140694

URL: http://svn.apache.org/viewvc?rev=1140694&view=rev
Log:
HDFS-2110. StreamFile and ByteRangeInputStream cleanup. Contributed by Eli Collins

Modified:
    hadoop/common/trunk/hdfs/CHANGES.txt
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestByteRangeInputStream.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStreamFile.java

Modified: hadoop/common/trunk/hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/CHANGES.txt?rev=1140694&r1=1140693&r2=1140694&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hdfs/CHANGES.txt Tue Jun 28 15:59:31 2011
@@ -532,6 +532,8 @@ Trunk (unreleased changes)
     HDFS-1723. quota errors messages should use the same scale. (Jim Plush via
     atm)
 
+    HDFS-2110. StreamFile and ByteRangeInputStream cleanup. (eli)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java?rev=1140694&r1=1140693&r2=1140694&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java Tue Jun 28 15:59:31 2011
@@ -26,7 +26,6 @@ import java.net.URL;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.hdfs.server.namenode.StreamFile;
 
-
 /**
  * To support HTTP byte streams, a new connection to an HTTP server needs to be
  * created each time. This class hides the complexity of those multiple 
@@ -60,7 +59,9 @@ class ByteRangeInputStream extends FSInp
     }  
   }
   
-  
+  enum StreamStatus {
+    NORMAL, SEEK
+  }
   protected InputStream in;
   protected URLOpener originalURL;
   protected URLOpener resolvedURL;
@@ -68,9 +69,7 @@ class ByteRangeInputStream extends FSInp
   protected long currentPos = 0;
   protected long filelength;
 
-  protected int status = STATUS_SEEK;
-  protected static final int STATUS_NORMAL = 0;
-  protected static final int STATUS_SEEK = 1;
+  StreamStatus status = StreamStatus.SEEK;
 
   ByteRangeInputStream(final URL url) {
     this(new URLOpener(url), new URLOpener(null));
@@ -82,18 +81,19 @@ class ByteRangeInputStream extends FSInp
   }
   
   private InputStream getInputStream() throws IOException {
-    if (status != STATUS_NORMAL) {
+    if (status != StreamStatus.NORMAL) {
       
       if (in != null) {
         in.close();
         in = null;
       }
       
-      // use the original url  if no resolved url exists (e.g., if it's 
-      // the first time a request is made)
-      final URLOpener o = resolvedURL.getURL() == null? originalURL: resolvedURL;
+      // Use the original url if no resolved url exists, eg. if
+      // it's the first time a request is made.
+      final URLOpener opener =
+        (resolvedURL.getURL() == null) ? originalURL : resolvedURL;
 
-      final HttpURLConnection connection = o.openConnection();
+      final HttpURLConnection connection = opener.openConnection();
       try {
         connection.setRequestMethod("GET");
         if (startPos != 0) {
@@ -101,36 +101,35 @@ class ByteRangeInputStream extends FSInp
         }
         connection.connect();
         final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
-        filelength = cl == null? -1: Long.parseLong(cl);
+        filelength = (cl == null) ? -1 : Long.parseLong(cl);
         if (HftpFileSystem.LOG.isDebugEnabled()) {
           HftpFileSystem.LOG.debug("filelength = " + filelength);
         }
         in = connection.getInputStream();
-      } catch(IOException ioe) {
+      } catch (IOException ioe) {
         HftpFileSystem.throwIOExceptionFromConnection(connection, ioe);
       }
       
-      if (startPos != 0 && connection.getResponseCode() != 206) {
-        // we asked for a byte range but did not receive a partial content
+      int respCode = connection.getResponseCode();
+      if (startPos != 0 && respCode != HttpURLConnection.HTTP_PARTIAL) {
+        // We asked for a byte range but did not receive a partial content
         // response...
-        throw new IOException("206 expected, but received "
-                              + connection.getResponseCode());
-      } else if(startPos == 0 && connection.getResponseCode() != 200) {
-        // we asked for all bytes from the beginning but didn't receive a 200
+        throw new IOException("HTTP_PARTIAL expected, received " + respCode);
+      } else if (startPos == 0 && respCode != HttpURLConnection.HTTP_OK) {
+        // We asked for all bytes from the beginning but didn't receive a 200
         // response (none of the other 2xx codes are valid here)
-        throw new IOException("200 expected, but received "
-                              + connection.getResponseCode());
+        throw new IOException("HTTP_OK expected, received " + respCode);
       }
-      
+
       resolvedURL.setURL(connection.getURL());
-      status = STATUS_NORMAL;
+      status = StreamStatus.NORMAL;
     }
     
     return in;
   }
   
-  private void update(final boolean isEOF, final int n
-      ) throws IOException {
+  private void update(final boolean isEOF, final int n)
+      throws IOException {
     if (!isEOF) {
       currentPos += n;
     } else if (currentPos < filelength) {
@@ -154,7 +153,7 @@ class ByteRangeInputStream extends FSInp
     if (pos != currentPos) {
       startPos = pos;
       currentPos = pos;
-      status = STATUS_SEEK;
+      status = StreamStatus.SEEK;
     }
   }
 
@@ -162,7 +161,7 @@ class ByteRangeInputStream extends FSInp
    * Return the current offset from the start of the file
    */
   public long getPos() throws IOException {
-    return currentPos; // keep total count?
+    return currentPos;
   }
 
   /**
@@ -172,7 +171,4 @@ class ByteRangeInputStream extends FSInp
   public boolean seekToNewSource(long targetPos) throws IOException {
     return false;
   }
-
-}
-
-
+}
\ No newline at end of file

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java?rev=1140694&r1=1140693&r2=1140694&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java Tue Jun 28 15:59:31 2011
@@ -245,17 +245,26 @@ public class HftpFileSystem extends File
     } 
   }
 
-
-  /* 
-    Construct URL pointing to file on namenode
-  */
-  URL getNamenodeFileURL(Path f) throws IOException {
-    return getNamenodeURL("/data" + f.toUri().getPath(), "ugi=" + getUgiParameter());
+  /**
+   * Return a URL pointing to given path on the namenode.
+   *
+   * @param p path to obtain the URL for
+   * @return namenode URL referring to the given path
+   * @throws IOException on error constructing the URL
+   */
+  URL getNamenodeFileURL(Path p) throws IOException {
+    return getNamenodeURL("/data" + p.toUri().getPath(),
+                          "ugi=" + getUgiParameter());
   }
 
-  /* 
-    Construct URL pointing to namenode. 
-  */
+  /**
+   * Return a URL pointing to given path on the namenode.
+   *
+   * @param path to obtain the URL for
+   * @param query string to append to the path
+   * @return namenode URL referring to the given path
+   * @throws IOException on error constructing the URL
+   */
   URL getNamenodeURL(String path, String query) throws IOException {
     try {
       final URL url = new URI("http", null, nnAddr.getHostName(),

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java?rev=1140694&r1=1140693&r2=1140694&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java Tue Jun 28 15:59:31 2011
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.PrintWriter;
 import java.util.Enumeration;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hdfs.DFSInputSt
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeJspHelper;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.mortbay.jetty.InclusiveByteRange;
 
@@ -96,7 +98,7 @@ public class StreamFile extends DfsServl
                            filename + "\"");
         response.setContentType("application/octet-stream");
         response.setHeader(CONTENT_LENGTH, "" + fileLen);
-        StreamFile.writeTo(in, os, 0L, fileLen);
+        StreamFile.copyFromOffset(in, os, 0L, fileLen);
       }
     } catch(IOException e) {
       if (LOG.isDebugEnabled()) {
@@ -113,75 +115,46 @@ public class StreamFile extends DfsServl
     }      
   }
   
+  /**
+   * Send a partial content response with the given range. If there are
+   * no satisfiable ranges, or if multiple ranges are requested, which
+   * is unsupported, respond with range not satisfiable.
+   *
+   * @param in stream to read from
+   * @param out stream to write to
+   * @param response http response to use
+   * @param contentLength for the response header
+   * @param ranges to write to respond with
+   * @throws IOException on error sending the response
+   */
   static void sendPartialData(FSInputStream in,
-                              OutputStream os,
+                              OutputStream out,
                               HttpServletResponse response,
                               long contentLength,
                               List<?> ranges)
-  throws IOException {
-
+      throws IOException {
     if (ranges == null || ranges.size() != 1) {
-      //  if there are no satisfiable ranges, or if multiple ranges are
-      // requested (we don't support multiple range requests), send 416 response
       response.setContentLength(0);
-      int status = HttpServletResponse.SC_REQUESTED_RANGE_NOT_SATISFIABLE;
-      response.setStatus(status);
-      response.setHeader("Content-Range", 
+      response.setStatus(HttpServletResponse.SC_REQUESTED_RANGE_NOT_SATISFIABLE);
+      response.setHeader("Content-Range",
                 InclusiveByteRange.to416HeaderRangeString(contentLength));
     } else {
-      //  if there is only a single valid range (must be satisfiable 
-      //  since were here now), send that range with a 206 response
       InclusiveByteRange singleSatisfiableRange =
         (InclusiveByteRange)ranges.get(0);
       long singleLength = singleSatisfiableRange.getSize(contentLength);
       response.setStatus(HttpServletResponse.SC_PARTIAL_CONTENT);
       response.setHeader("Content-Range", 
         singleSatisfiableRange.toHeaderRangeString(contentLength));
-      System.out.println("first: "+singleSatisfiableRange.getFirst(contentLength));
-      System.out.println("singleLength: "+singleLength);
-      
-      StreamFile.writeTo(in,
-                         os,
-                         singleSatisfiableRange.getFirst(contentLength),
-                         singleLength);
+      copyFromOffset(in, out,
+                     singleSatisfiableRange.getFirst(contentLength),
+                     singleLength);
     }
   }
-  
-  static void writeTo(FSInputStream in,
-                      OutputStream os,
-                      long start,
-                      long count) 
-  throws IOException {
-    byte buf[] = new byte[4096];
-    long bytesRemaining = count;
-    int bytesRead;
-    int bytesToRead;
-
-    in.seek(start);
-
-    while (true) {
-      // number of bytes to read this iteration
-      bytesToRead = (int)(bytesRemaining<buf.length ? 
-                                                      bytesRemaining:
-                                                      buf.length);
-      
-      // number of bytes actually read this iteration
-      bytesRead = in.read(buf, 0, bytesToRead);
-
-      // if we can't read anymore, break
-      if (bytesRead == -1) {
-        break;
-      } 
-      
-      os.write(buf, 0, bytesRead);
-
-      bytesRemaining -= bytesRead;
-
-      // if we don't need to read anymore, break
-      if (bytesRemaining <= 0) {
-        break;
-      }
 
-    } 
+  /* Copy count bytes at the given offset from one stream to another */
+  static void copyFromOffset(FSInputStream in, OutputStream out, long offset,
+      long count) throws IOException {
+    in.seek(offset);
+    IOUtils.copyBytes(in, out, count);
   }
 }

Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestByteRangeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestByteRangeInputStream.java?rev=1140694&r1=1140693&r2=1140694&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestByteRangeInputStream.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestByteRangeInputStream.java Tue Jun 28 15:59:31 2011
@@ -23,12 +23,13 @@ import java.io.InputStream;
 import java.net.HttpURLConnection;
 import java.net.MalformedURLException;
 import java.net.URL;
-import junit.framework.TestCase;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+
 import org.apache.hadoop.hdfs.ByteRangeInputStream;
 import org.apache.hadoop.hdfs.ByteRangeInputStream.URLOpener;
 
+import org.junit.Test;
+import static org.junit.Assert.*;
+
 class MockHttpURLConnection extends HttpURLConnection {
   MockURL m;
   
@@ -101,13 +102,9 @@ class MockURL extends URLOpener {
   }
 }
 
-
-
-public class TestByteRangeInputStream extends TestCase {
-  
-  private static final Log LOG = 
-                           LogFactory.getLog(TestByteRangeInputStream.class);
+public class TestByteRangeInputStream {
   
+  @Test
   public void testByteRange() throws IOException, InterruptedException {
     MockURL o = new MockURL("http://test/");
     MockURL r =  new MockURL((URL)null);
@@ -168,7 +165,7 @@ public class TestByteRangeInputStream ex
            + "but 206 is expected");
     } catch (IOException e) {
       assertEquals("Should fail because incorrect response code was sent",
-                   "206 expected, but received 200", e.getMessage());
+                   "HTTP_PARTIAL expected, received 200", e.getMessage());
     }
 
     r.responseCode = 206;
@@ -180,10 +177,7 @@ public class TestByteRangeInputStream ex
            + "but 200 is expected");
     } catch (IOException e) {
       assertEquals("Should fail because incorrect response code was sent",
-                   "200 expected, but received 206", e.getMessage());
+                   "HTTP_OK expected, received 206", e.getMessage());
     }
-
-
-
   }
 }

Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStreamFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStreamFile.java?rev=1140694&r1=1140693&r2=1140694&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStreamFile.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStreamFile.java Tue Jun 28 15:59:31 2011
@@ -219,7 +219,7 @@ public class TestStreamFile extends Test
     assertTrue("Pairs array must be even", pairs.length % 2 == 0);
     
     for (int i = 0; i < pairs.length; i+=2) {
-      StreamFile.writeTo(fsin, os, pairs[i], pairs[i+1]);
+      StreamFile.copyFromOffset(fsin, os, pairs[i], pairs[i+1]);
       assertArrayEquals("Reading " + pairs[i+1]
                         + " bytes from offset " + pairs[i],
                         getOutputArray(pairs[i], pairs[i+1]),