You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2011/10/19 23:38:59 UTC

svn commit: r1186508 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/ src/main/java/org/apache/hadoop/hdfs/web/ src/main/java/org/apa...

Author: szetszwo
Date: Wed Oct 19 21:38:58 2011
New Revision: 1186508

URL: http://svn.apache.org/viewvc?rev=1186508&view=rev
Log:
HDFS-2453. Fix http response code for partial content in webhdfs, added getDefaultBlockSize() and getDefaultReplication() in WebHdfsFileSystem and cleared content type in ExceptionHandler. 

Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ExceptionHandler.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteRangeInputStream.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1186508&r1=1186507&r2=1186508&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Oct 19 21:38:58 2011
@@ -132,6 +132,10 @@ Trunk (unreleased changes)
     HDFS-2188. Make FSEditLog create its journals from a list of URIs rather 
     than NNStorage. (Ivan Kelly via jitendra)
 
+    HDFS-2453. Fix http response code for partial content in webhdfs, added
+    getDefaultBlockSize() and getDefaultReplication() in WebHdfsFileSystem
+    and cleared content type in ExceptionHandler.  (szetszwo)
+
 Release 0.23.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java?rev=1186508&r1=1186507&r2=1186508&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java Wed Oct 19 21:38:58 2011
@@ -22,10 +22,13 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
 import java.net.URL;
+import java.util.StringTokenizer;
 
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.hdfs.server.namenode.StreamFile;
+import org.apache.hadoop.hdfs.web.resources.OffsetParam;
 
 /**
  * To support HTTP byte streams, a new connection to an HTTP server needs to be
@@ -42,6 +45,8 @@ public class ByteRangeInputStream extend
    */
   static class URLOpener {
     protected URL url;
+    /** The url with offset parameter */
+    private URL offsetUrl;
   
     public URLOpener(URL u) {
       url = u;
@@ -54,12 +59,55 @@ public class ByteRangeInputStream extend
     public URL getURL() {
       return url;
     }
-  
-    public HttpURLConnection openConnection() throws IOException {
-      return (HttpURLConnection)url.openConnection();
+
+    HttpURLConnection openConnection() throws IOException {
+      return (HttpURLConnection)offsetUrl.openConnection();
+    }
+
+    private HttpURLConnection openConnection(final long offset) throws IOException {
+      offsetUrl = offset == 0L? url: new URL(url + "&" + new OffsetParam(offset));
+      final HttpURLConnection conn = openConnection();
+      conn.setRequestMethod("GET");
+      if (offset != 0L) {
+        conn.setRequestProperty("Range", "bytes=" + offset + "-");
+      }
+      return conn;
     }  
   }
   
+  static private final String OFFSET_PARAM_PREFIX = OffsetParam.NAME + "=";
+
+  /** Remove offset parameter, if there is any, from the url */
+  static URL removeOffsetParam(final URL url) throws MalformedURLException {
+    String query = url.getQuery();
+    if (query == null) {
+      return url;
+    }
+    final String lower = query.toLowerCase();
+    if (!lower.startsWith(OFFSET_PARAM_PREFIX)
+        && !lower.contains("&" + OFFSET_PARAM_PREFIX)) {
+      return url;
+    }
+
+    //rebuild query
+    StringBuilder b = null;
+    for(final StringTokenizer st = new StringTokenizer(query, "&");
+        st.hasMoreTokens();) {
+      final String token = st.nextToken();
+      if (!token.toLowerCase().startsWith(OFFSET_PARAM_PREFIX)) {
+        if (b == null) {
+          b = new StringBuilder("?").append(token);
+        } else {
+          b.append('&').append(token);
+        }
+      }
+    }
+    query = b == null? "": b.toString();
+
+    final String urlStr = url.toString();
+    return new URL(urlStr.substring(0, urlStr.indexOf('?')) + query);
+  }
+
   enum StreamStatus {
     NORMAL, SEEK
   }
@@ -95,12 +143,8 @@ public class ByteRangeInputStream extend
       final URLOpener opener =
         (resolvedURL.getURL() == null) ? originalURL : resolvedURL;
 
-      final HttpURLConnection connection = opener.openConnection();
+      final HttpURLConnection connection = opener.openConnection(startPos);
       try {
-        connection.setRequestMethod("GET");
-        if (startPos != 0) {
-          connection.setRequestProperty("Range", "bytes="+startPos+"-");
-        }
         connection.connect();
         final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
         filelength = (cl == null) ? -1 : Long.parseLong(cl);
@@ -125,7 +169,7 @@ public class ByteRangeInputStream extend
         throw new IOException("HTTP_OK expected, received " + respCode);
       }
 
-      resolvedURL.setURL(connection.getURL());
+      resolvedURL.setURL(removeOffsetParam(connection.getURL()));
       status = StreamStatus.NORMAL;
     }
     

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java?rev=1186508&r1=1186507&r2=1186508&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java Wed Oct 19 21:38:58 2011
@@ -102,7 +102,7 @@ public class DatanodeWebHdfsMethods {
           final ReplicationParam replication,
       @QueryParam(BlockSizeParam.NAME) @DefaultValue(BlockSizeParam.DEFAULT)
           final BlockSizeParam blockSize
-      ) throws IOException, URISyntaxException, InterruptedException {
+      ) throws IOException, InterruptedException {
 
     if (LOG.isTraceEnabled()) {
       LOG.trace(op + ": " + path + ", ugi=" + ugi
@@ -162,7 +162,7 @@ public class DatanodeWebHdfsMethods {
           final PostOpParam op,
       @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
           final BufferSizeParam bufferSize
-      ) throws IOException, URISyntaxException, InterruptedException {
+      ) throws IOException, InterruptedException {
 
     if (LOG.isTraceEnabled()) {
       LOG.trace(op + ": " + path + ", ugi=" + ugi
@@ -216,7 +216,7 @@ public class DatanodeWebHdfsMethods {
           final LengthParam length,
       @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
           final BufferSizeParam bufferSize
-      ) throws IOException, URISyntaxException, InterruptedException {
+      ) throws IOException, InterruptedException {
 
     if (LOG.isTraceEnabled()) {
       LOG.trace(op + ": " + path + ", ugi=" + ugi
@@ -255,7 +255,11 @@ public class DatanodeWebHdfsMethods {
           }
         }
       };
-      return Response.ok(streaming).type(MediaType.APPLICATION_OCTET_STREAM).build();
+
+      final int status = offset.getValue() == 0?
+          HttpServletResponse.SC_OK: HttpServletResponse.SC_PARTIAL_CONTENT;
+      return Response.status(status).entity(streaming).type(
+          MediaType.APPLICATION_OCTET_STREAM).build();
     }
     case GETFILECHECKSUM:
     {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java?rev=1186508&r1=1186507&r2=1186508&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java Wed Oct 19 21:38:58 2011
@@ -31,6 +31,8 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.ContentSummary;
@@ -44,6 +46,7 @@ import org.apache.hadoop.fs.ParentNotDir
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.ByteRangeInputStream;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HftpFileSystem;
 import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
@@ -86,6 +89,7 @@ import org.mortbay.util.ajax.JSON;
 
 /** A FileSystem for HDFS over the web. */
 public class WebHdfsFileSystem extends HftpFileSystem {
+  public static final Log LOG = LogFactory.getLog(WebHdfsFileSystem.class);
   /** File System URI: {SCHEME}://namenode:port/path/to/file */
   public static final String SCHEME = "webhdfs";
   /** Http URI: http://namenode:port/{PATH_PREFIX}/path/to/file */
@@ -340,6 +344,18 @@ public class WebHdfsFileSystem extends H
     run(op, p, new ModificationTimeParam(mtime), new AccessTimeParam(atime));
   }
 
+  @Override
+  public long getDefaultBlockSize() {
+    return getConf().getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
+        DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
+  }
+
+  @Override
+  public short getDefaultReplication() {
+    return (short)getConf().getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
+        DFSConfigKeys.DFS_REPLICATION_DEFAULT);
+  }
+
   private FSDataOutputStream write(final HttpOpParam.Op op,
       final HttpURLConnection conn, final int bufferSize) throws IOException {
     return new FSDataOutputStream(new BufferedOutputStream(

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ExceptionHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ExceptionHandler.java?rev=1186508&r1=1186507&r2=1186508&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ExceptionHandler.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ExceptionHandler.java Wed Oct 19 21:38:58 2011
@@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.web.resou
 import java.io.FileNotFoundException;
 import java.io.IOException;
 
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.ext.ExceptionMapper;
@@ -36,12 +38,17 @@ import com.sun.jersey.api.ParamException
 public class ExceptionHandler implements ExceptionMapper<Exception> {
   public static final Log LOG = LogFactory.getLog(ExceptionHandler.class);
 
+  private @Context HttpServletResponse response;
+
   @Override
   public Response toResponse(Exception e) {
     if (LOG.isTraceEnabled()) {
       LOG.trace("GOT EXCEPITION", e);
     }
 
+    //clear content type
+    response.setContentType(null);
+
     //Convert exception
     if (e instanceof ParamException) {
       final ParamException paramexception = (ParamException)e;

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteRangeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteRangeInputStream.java?rev=1186508&r1=1186507&r2=1186508&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteRangeInputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteRangeInputStream.java Wed Oct 19 21:38:58 2011
@@ -35,28 +35,29 @@ import org.apache.hadoop.hdfs.ByteRangeI
 import org.junit.Test;
 
 class MockHttpURLConnection extends HttpURLConnection {
-  private int responseCode = -1;
-  URL m;
-
   public MockHttpURLConnection(URL u) {
     super(u);
-    m = u;
   }
   
+  @Override
   public boolean usingProxy(){
     return false;
   }
   
+  @Override
   public void disconnect() {
   }
   
-  public void connect() throws IOException {
+  @Override
+  public void connect() {
   }
   
+  @Override
   public InputStream getInputStream() throws IOException {
     return new ByteArrayInputStream("asdf".getBytes());
   } 
 
+  @Override
   public URL getURL() {
     URL u = null;
     try {
@@ -67,6 +68,7 @@ class MockHttpURLConnection extends Http
     return u;
   }
   
+  @Override
   public int getResponseCode() {
     if (responseCode != -1) {
       return responseCode;
@@ -82,10 +84,45 @@ class MockHttpURLConnection extends Http
   public void setResponseCode(int resCode) {
     responseCode = resCode;
   }
-
 }
 
 public class TestByteRangeInputStream {
+  @Test
+  public void testRemoveOffset() throws IOException {
+    { //no offset
+      String s = "http://test/Abc?Length=99";
+      assertEquals(s, ByteRangeInputStream.removeOffsetParam(new URL(s)).toString());
+    }
+
+    { //no parameters
+      String s = "http://test/Abc";
+      assertEquals(s, ByteRangeInputStream.removeOffsetParam(new URL(s)).toString());
+    }
+
+    { //offset as first parameter
+      String s = "http://test/Abc?offset=10&Length=99";
+      assertEquals("http://test/Abc?Length=99",
+          ByteRangeInputStream.removeOffsetParam(new URL(s)).toString());
+    }
+
+    { //offset as second parameter
+      String s = "http://test/Abc?op=read&OFFset=10&Length=99";
+      assertEquals("http://test/Abc?op=read&Length=99",
+          ByteRangeInputStream.removeOffsetParam(new URL(s)).toString());
+    }
+
+    { //offset as last parameter
+      String s = "http://test/Abc?Length=99&offset=10";
+      assertEquals("http://test/Abc?Length=99",
+          ByteRangeInputStream.removeOffsetParam(new URL(s)).toString());
+    }
+
+    { //offset as the only parameter
+      String s = "http://test/Abc?offset=10";
+      assertEquals("http://test/Abc",
+          ByteRangeInputStream.removeOffsetParam(new URL(s)).toString());
+    }
+  }
   
   @Test
   public void testByteRange() throws IOException {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java?rev=1186508&r1=1186507&r2=1186508&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java Wed Oct 19 21:38:58 2011
@@ -123,6 +123,8 @@ public class TestWebHdfsFileSystemContra
     }    
   }
   
+  //the following are new tests (i.e. not over-riding the super class methods)
+
   public void testGetFileBlockLocations() throws IOException {
     final String f = "/test/testGetFileBlockLocations";
     createFile(path(f));
@@ -172,4 +174,45 @@ public class TestWebHdfsFileSystemContra
       WebHdfsFileSystem.LOG.info("This is expected.", fnfe);
     }
   }
+
+  public void testSeek() throws IOException {
+    final Path p = new Path("/test/testSeek");
+    createFile(p);
+
+    final int one_third = data.length/3;
+    final int two_third = one_third*2;
+
+    { //test seek
+      final int offset = one_third; 
+      final int len = data.length - offset;
+      final byte[] buf = new byte[len];
+
+      final FSDataInputStream in = fs.open(p);
+      in.seek(offset);
+      
+      //read all remaining data
+      in.readFully(buf);
+      in.close();
+  
+      for (int i = 0; i < buf.length; i++) {
+        assertEquals("Position " + i + ", offset=" + offset + ", length=" + len,
+            data[i + offset], buf[i]);
+      }
+    }
+
+    { //test position read (read the data after the two_third location)
+      final int offset = two_third; 
+      final int len = data.length - offset;
+      final byte[] buf = new byte[len];
+
+      final FSDataInputStream in = fs.open(p);
+      in.readFully(offset, buf);
+      in.close();
+  
+      for (int i = 0; i < buf.length; i++) {
+        assertEquals("Position " + i + ", offset=" + offset + ", length=" + len,
+            data[i + offset], buf[i]);
+      }
+    }
+  }
 }