You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2011/10/19 23:40:20 UTC

svn commit: r1186509 - in /hadoop/common/branches/branch-0.20-security: ./ src/hdfs/org/apache/hadoop/hdfs/ src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/ src/hdfs/org/apache/hadoop/hdfs/web/ src/hdfs/org/apache/hadoop/hdfs/web/resource...

Author: szetszwo
Date: Wed Oct 19 21:40:20 2011
New Revision: 1186509

URL: http://svn.apache.org/viewvc?rev=1186509&view=rev
Log:
HDFS-2453. Fix http response code for partial content in webhdfs, added getDefaultBlockSize() and getDefaultReplication() in WebHdfsFileSystem and cleared content type in ExceptionHandler. 

Added:
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestByteRangeInputStream.java
Modified:
    hadoop/common/branches/branch-0.20-security/CHANGES.txt
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/ByteRangeInputStream.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/web/resources/ExceptionHandler.java
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java

Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1186509&r1=1186508&r2=1186509&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Wed Oct 19 21:40:20 2011
@@ -61,6 +61,10 @@ Release 0.20.205.1 - unreleased
     file or creating a file without specifying the replication parameter.
     (szetszwo)
 
+    HDFS-2453. Fix http response code for partial content in webhdfs, added
+    getDefaultBlockSize() and getDefaultReplication() in WebHdfsFileSystem
+    and cleared content type in ExceptionHandler. (szetszwo)
+
 Release 0.20.205.0 - 2011.10.06
 
   NEW FEATURES

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/ByteRangeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/ByteRangeInputStream.java?rev=1186509&r1=1186508&r2=1186509&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/ByteRangeInputStream.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/ByteRangeInputStream.java Wed Oct 19 21:40:20 2011
@@ -22,10 +22,13 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
 import java.net.URL;
+import java.util.StringTokenizer;
 
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.hdfs.server.namenode.StreamFile;
+import org.apache.hadoop.hdfs.web.resources.OffsetParam;
 
 /**
  * To support HTTP byte streams, a new connection to an HTTP server needs to be
@@ -42,6 +45,8 @@ public class ByteRangeInputStream extend
    */
   static class URLOpener {
     protected URL url;
+    /** The url with offset parameter */
+    private URL offsetUrl;
   
     public URLOpener(URL u) {
       url = u;
@@ -54,12 +59,55 @@ public class ByteRangeInputStream extend
     public URL getURL() {
       return url;
     }
-  
-    public HttpURLConnection openConnection() throws IOException {
-      return (HttpURLConnection)url.openConnection();
+
+    HttpURLConnection openConnection() throws IOException {
+      return (HttpURLConnection)offsetUrl.openConnection();
+    }
+
+    private HttpURLConnection openConnection(final long offset) throws IOException {
+      offsetUrl = offset == 0L? url: new URL(url + "&" + new OffsetParam(offset));
+      final HttpURLConnection conn = openConnection();
+      conn.setRequestMethod("GET");
+      if (offset != 0L) {
+        conn.setRequestProperty("Range", "bytes=" + offset + "-");
+      }
+      return conn;
     }  
   }
   
+  static private final String OFFSET_PARAM_PREFIX = OffsetParam.NAME + "=";
+
+  /** Remove offset parameter, if there is any, from the url */
+  static URL removeOffsetParam(final URL url) throws MalformedURLException {
+    String query = url.getQuery();
+    if (query == null) {
+      return url;
+    }
+    final String lower = query.toLowerCase();
+    if (!lower.startsWith(OFFSET_PARAM_PREFIX)
+        && !lower.contains("&" + OFFSET_PARAM_PREFIX)) {
+      return url;
+    }
+
+    //rebuild query
+    StringBuilder b = null;
+    for(final StringTokenizer st = new StringTokenizer(query, "&");
+        st.hasMoreTokens();) {
+      final String token = st.nextToken();
+      if (!token.toLowerCase().startsWith(OFFSET_PARAM_PREFIX)) {
+        if (b == null) {
+          b = new StringBuilder("?").append(token);
+        } else {
+          b.append('&').append(token);
+        }
+      }
+    }
+    query = b == null? "": b.toString();
+
+    final String urlStr = url.toString();
+    return new URL(urlStr.substring(0, urlStr.indexOf('?')) + query);
+  }
+
   enum StreamStatus {
     NORMAL, SEEK
   }
@@ -95,12 +143,8 @@ public class ByteRangeInputStream extend
       final URLOpener opener =
         (resolvedURL.getURL() == null) ? originalURL : resolvedURL;
 
-      final HttpURLConnection connection = opener.openConnection();
+      final HttpURLConnection connection = opener.openConnection(startPos);
       try {
-        connection.setRequestMethod("GET");
-        if (startPos != 0) {
-          connection.setRequestProperty("Range", "bytes="+startPos+"-");
-        }
         connection.connect();
         final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
         filelength = (cl == null) ? -1 : Long.parseLong(cl);
@@ -125,7 +169,7 @@ public class ByteRangeInputStream extend
         throw new IOException("HTTP_OK expected, received " + respCode);
       }
 
-      resolvedURL.setURL(connection.getURL());
+      resolvedURL.setURL(removeOffsetParam(connection.getURL()));
       status = StreamStatus.NORMAL;
     }
     

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java?rev=1186509&r1=1186508&r2=1186509&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java Wed Oct 19 21:40:20 2011
@@ -100,7 +100,7 @@ public class DatanodeWebHdfsMethods {
           final ReplicationParam replication,
       @QueryParam(BlockSizeParam.NAME) @DefaultValue(BlockSizeParam.DEFAULT)
           final BlockSizeParam blockSize
-      ) throws IOException, URISyntaxException, InterruptedException {
+      ) throws IOException, InterruptedException {
 
     if (LOG.isTraceEnabled()) {
       LOG.trace(op + ": " + path + ", ugi=" + ugi
@@ -156,7 +156,7 @@ public class DatanodeWebHdfsMethods {
           final PostOpParam op,
       @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
           final BufferSizeParam bufferSize
-      ) throws IOException, URISyntaxException, InterruptedException {
+      ) throws IOException, InterruptedException {
 
     if (LOG.isTraceEnabled()) {
       LOG.trace(op + ": " + path + ", ugi=" + ugi
@@ -209,7 +209,7 @@ public class DatanodeWebHdfsMethods {
           final LengthParam length,
       @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
           final BufferSizeParam bufferSize
-      ) throws IOException, URISyntaxException, InterruptedException {
+      ) throws IOException, InterruptedException {
 
     if (LOG.isTraceEnabled()) {
       LOG.trace(op + ": " + path + ", ugi=" + ugi
@@ -248,7 +248,11 @@ public class DatanodeWebHdfsMethods {
           }
         }
       };
-      return Response.ok(streaming).type(MediaType.APPLICATION_OCTET_STREAM).build();
+
+      final int status = offset.getValue() == 0?
+          HttpServletResponse.SC_OK: HttpServletResponse.SC_PARTIAL_CONTENT;
+      return Response.status(status).entity(streaming).type(
+          MediaType.APPLICATION_OCTET_STREAM).build();
     }
     case GETFILECHECKSUM:
     {

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java?rev=1186509&r1=1186508&r2=1186509&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java Wed Oct 19 21:40:20 2011
@@ -31,6 +31,8 @@ import java.net.URL;
 import java.security.PrivilegedExceptionAction;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.ContentSummary;
@@ -45,6 +47,7 @@ import org.apache.hadoop.hdfs.ByteRangeI
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -91,6 +94,7 @@ import org.mortbay.util.ajax.JSON;
 /** A FileSystem for HDFS over the web. */
 public class WebHdfsFileSystem extends FileSystem
     implements DelegationTokenRenewer.Renewable {
+  public static final Log LOG = LogFactory.getLog(WebHdfsFileSystem.class);
   /** File System URI: {SCHEME}://namenode:port/path/to/file */
   public static final String SCHEME = "webhdfs";
   /** Http URI: http://namenode:port/{PATH_PREFIX}/path/to/file */
@@ -413,6 +417,17 @@ public class WebHdfsFileSystem extends F
     run(op, p, new ModificationTimeParam(mtime), new AccessTimeParam(atime));
   }
 
+  @Override
+  public long getDefaultBlockSize() {
+    return getConf().getLong("dfs.block.size", FSConstants.DEFAULT_BLOCK_SIZE);
+  }
+
+  @Override
+  public short getDefaultReplication() {
+    return (short)getConf().getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
+        DFSConfigKeys.DFS_REPLICATION_DEFAULT);
+  }
+
   private FSDataOutputStream write(final HttpOpParam.Op op,
       final HttpURLConnection conn, final int bufferSize) throws IOException {
     return new FSDataOutputStream(new BufferedOutputStream(

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/web/resources/ExceptionHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/web/resources/ExceptionHandler.java?rev=1186509&r1=1186508&r2=1186509&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/web/resources/ExceptionHandler.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/web/resources/ExceptionHandler.java Wed Oct 19 21:40:20 2011
@@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.web.resou
 import java.io.FileNotFoundException;
 import java.io.IOException;
 
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.ext.ExceptionMapper;
@@ -36,12 +38,17 @@ import com.sun.jersey.api.ParamException
 public class ExceptionHandler implements ExceptionMapper<Exception> {
   public static final Log LOG = LogFactory.getLog(ExceptionHandler.class);
 
+  private @Context HttpServletResponse response;
+
   @Override
   public Response toResponse(Exception e) {
     if (LOG.isTraceEnabled()) {
       LOG.trace("GOT EXCEPITION", e);
     }
 
+    //clear content type
+    response.setContentType(null);
+
     //Convert exception
     if (e instanceof ParamException) {
       final ParamException paramexception = (ParamException)e;

Added: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestByteRangeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestByteRangeInputStream.java?rev=1186509&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestByteRangeInputStream.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestByteRangeInputStream.java Wed Oct 19 21:40:20 2011
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+import org.apache.hadoop.hdfs.ByteRangeInputStream.URLOpener;
+import org.junit.Test;
+
+class MockHttpURLConnection extends HttpURLConnection {
+  public MockHttpURLConnection(URL u) {
+    super(u);
+  }
+  
+  @Override
+  public boolean usingProxy(){
+    return false;
+  }
+  
+  @Override
+  public void disconnect() {
+  }
+  
+  @Override
+  public void connect() {
+  }
+  
+  @Override
+  public InputStream getInputStream() throws IOException {
+    return new ByteArrayInputStream("asdf".getBytes());
+  } 
+
+  @Override
+  public URL getURL() {
+    URL u = null;
+    try {
+      u = new URL("http://resolvedurl/");
+    } catch (Exception e) {
+      System.out.println(e.getMessage());
+    }
+    return u;
+  }
+  
+  @Override
+  public int getResponseCode() {
+    if (responseCode != -1) {
+      return responseCode;
+    } else {
+      if (getRequestProperty("Range") == null) {
+        return 200;
+      } else {
+        return 206;
+      }
+    }
+  }
+
+  public void setResponseCode(int resCode) {
+    responseCode = resCode;
+  }
+}
+
+public class TestByteRangeInputStream {
+  @Test
+  public void testRemoveOffset() throws IOException {
+    { //no offset
+      String s = "http://test/Abc?Length=99";
+      assertEquals(s, ByteRangeInputStream.removeOffsetParam(new URL(s)).toString());
+    }
+
+    { //no parameters
+      String s = "http://test/Abc";
+      assertEquals(s, ByteRangeInputStream.removeOffsetParam(new URL(s)).toString());
+    }
+
+    { //offset as first parameter
+      String s = "http://test/Abc?offset=10&Length=99";
+      assertEquals("http://test/Abc?Length=99",
+          ByteRangeInputStream.removeOffsetParam(new URL(s)).toString());
+    }
+
+    { //offset as second parameter
+      String s = "http://test/Abc?op=read&OFFset=10&Length=99";
+      assertEquals("http://test/Abc?op=read&Length=99",
+          ByteRangeInputStream.removeOffsetParam(new URL(s)).toString());
+    }
+
+    { //offset as last parameter
+      String s = "http://test/Abc?Length=99&offset=10";
+      assertEquals("http://test/Abc?Length=99",
+          ByteRangeInputStream.removeOffsetParam(new URL(s)).toString());
+    }
+
+    { //offset as the only parameter
+      String s = "http://test/Abc?offset=10";
+      assertEquals("http://test/Abc",
+          ByteRangeInputStream.removeOffsetParam(new URL(s)).toString());
+    }
+  }
+  
+  @Test
+  public void testByteRange() throws IOException {
+    URLOpener ospy = spy(new URLOpener(new URL("http://test/")));
+    doReturn(new MockHttpURLConnection(ospy.getURL())).when(ospy)
+        .openConnection();
+    URLOpener rspy = spy(new URLOpener((URL) null));
+    doReturn(new MockHttpURLConnection(rspy.getURL())).when(rspy)
+        .openConnection();
+    ByteRangeInputStream is = new ByteRangeInputStream(ospy, rspy);
+
+    assertEquals("getPos wrong", 0, is.getPos());
+
+    is.read();
+
+    assertNull("Initial call made incorrectly (Range Check)", ospy
+        .openConnection().getRequestProperty("Range"));
+
+    assertEquals("getPos should be 1 after reading one byte", 1, is.getPos());
+
+    is.read();
+
+    assertEquals("getPos should be 2 after reading two bytes", 2, is.getPos());
+
+    // No additional connections should have been made (no seek)
+
+    rspy.setURL(new URL("http://resolvedurl/"));
+
+    is.seek(100);
+    is.read();
+
+    assertEquals("Seek to 100 bytes made incorrectly (Range Check)",
+        "bytes=100-", rspy.openConnection().getRequestProperty("Range"));
+
+    assertEquals("getPos should be 101 after reading one byte", 101,
+        is.getPos());
+
+    verify(rspy, times(2)).openConnection();
+
+    is.seek(101);
+    is.read();
+
+    verify(rspy, times(2)).openConnection();
+
+    // Seek to 101 should not result in another request"
+
+    is.seek(2500);
+    is.read();
+
+    assertEquals("Seek to 2500 bytes made incorrectly (Range Check)",
+        "bytes=2500-", rspy.openConnection().getRequestProperty("Range"));
+
+    ((MockHttpURLConnection) rspy.openConnection()).setResponseCode(200);
+    is.seek(500);
+    
+    try {
+      is.read();
+      fail("Exception should be thrown when 200 response is given "
+           + "but 206 is expected");
+    } catch (IOException e) {
+      assertEquals("Should fail because incorrect response code was sent",
+                   "HTTP_PARTIAL expected, received 200", e.getMessage());
+    }
+
+    ((MockHttpURLConnection) rspy.openConnection()).setResponseCode(206);
+    is.seek(0);
+
+    try {
+      is.read();
+      fail("Exception should be thrown when 206 response is given "
+           + "but 200 is expected");
+    } catch (IOException e) {
+      assertEquals("Should fail because incorrect response code was sent",
+                   "HTTP_OK expected, received 206", e.getMessage());
+    }
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java?rev=1186509&r1=1186508&r2=1186509&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java Wed Oct 19 21:40:20 2011
@@ -129,6 +129,7 @@ public class TestWebHdfsFileSystemContra
   //For WebHdfsFileSystem,
   //disable testListStatusReturnsNullForNonExistentFile
   //and add testListStatusThrowsExceptionForNonExistentFile below.
+  @Override
   public void testListStatusReturnsNullForNonExistentFile() {}
 
   public void testListStatusThrowsExceptionForNonExistentFile() throws Exception {
@@ -140,6 +141,8 @@ public class TestWebHdfsFileSystemContra
     }
   }
   
+  //the following are new tests (i.e. not over-riding the super class methods)
+
   public void testGetFileBlockLocations() throws IOException {
     final String f = "/test/testGetFileBlockLocations";
     final Path p = path(f);
@@ -192,4 +195,45 @@ public class TestWebHdfsFileSystemContra
       WebHdfsFileSystem.LOG.info("This is expected.", fnfe);
     }
   }
+
+  public void testSeek() throws IOException {
+    final Path p = new Path("/test/testSeek");
+    createFile(p);
+
+    final int one_third = data.length/3;
+    final int two_third = one_third*2;
+
+    { //test seek
+      final int offset = one_third; 
+      final int len = data.length - offset;
+      final byte[] buf = new byte[len];
+
+      final FSDataInputStream in = fs.open(p);
+      in.seek(offset);
+      
+      //read all remaining data
+      in.readFully(buf);
+      in.close();
+  
+      for (int i = 0; i < buf.length; i++) {
+        assertEquals("Position " + i + ", offset=" + offset + ", length=" + len,
+            data[i + offset], buf[i]);
+      }
+    }
+
+    { //test position read (read the data after the two_third location)
+      final int offset = two_third; 
+      final int len = data.length - offset;
+      final byte[] buf = new byte[len];
+
+      final FSDataInputStream in = fs.open(p);
+      in.readFully(offset, buf);
+      in.close();
+  
+      for (int i = 0; i < buf.length; i++) {
+        assertEquals("Position " + i + ", offset=" + offset + ", length=" + len,
+            data[i + offset], buf[i]);
+      }
+    }
+  }
 }