You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2011/10/19 23:40:20 UTC
svn commit: r1186509 - in /hadoop/common/branches/branch-0.20-security: ./
src/hdfs/org/apache/hadoop/hdfs/
src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/
src/hdfs/org/apache/hadoop/hdfs/web/
src/hdfs/org/apache/hadoop/hdfs/web/resource...
Author: szetszwo
Date: Wed Oct 19 21:40:20 2011
New Revision: 1186509
URL: http://svn.apache.org/viewvc?rev=1186509&view=rev
Log:
HDFS-2453. Fix http response code for partial content in webhdfs, added getDefaultBlockSize() and getDefaultReplication() in WebHdfsFileSystem and cleared content type in ExceptionHandler.
Added:
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestByteRangeInputStream.java
Modified:
hadoop/common/branches/branch-0.20-security/CHANGES.txt
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/ByteRangeInputStream.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/web/resources/ExceptionHandler.java
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java
Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1186509&r1=1186508&r2=1186509&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Wed Oct 19 21:40:20 2011
@@ -61,6 +61,10 @@ Release 0.20.205.1 - unreleased
file or creating a file without specifying the replication parameter.
(szetszwo)
+ HDFS-2453. Fix http response code for partial content in webhdfs, added
+ getDefaultBlockSize() and getDefaultReplication() in WebHdfsFileSystem
+ and cleared content type in ExceptionHandler. (szetszwo)
+
Release 0.20.205.0 - 2011.10.06
NEW FEATURES
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/ByteRangeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/ByteRangeInputStream.java?rev=1186509&r1=1186508&r2=1186509&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/ByteRangeInputStream.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/ByteRangeInputStream.java Wed Oct 19 21:40:20 2011
@@ -22,10 +22,13 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
import java.net.URL;
+import java.util.StringTokenizer;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
+import org.apache.hadoop.hdfs.web.resources.OffsetParam;
/**
* To support HTTP byte streams, a new connection to an HTTP server needs to be
@@ -42,6 +45,8 @@ public class ByteRangeInputStream extend
*/
static class URLOpener {
protected URL url;
+ /** The url with offset parameter */
+ private URL offsetUrl;
public URLOpener(URL u) {
url = u;
@@ -54,12 +59,55 @@ public class ByteRangeInputStream extend
public URL getURL() {
return url;
}
-
- public HttpURLConnection openConnection() throws IOException {
- return (HttpURLConnection)url.openConnection();
+
+ HttpURLConnection openConnection() throws IOException {
+ return (HttpURLConnection)offsetUrl.openConnection();
+ }
+
+ private HttpURLConnection openConnection(final long offset) throws IOException {
+ offsetUrl = offset == 0L? url: new URL(url + "&" + new OffsetParam(offset));
+ final HttpURLConnection conn = openConnection();
+ conn.setRequestMethod("GET");
+ if (offset != 0L) {
+ conn.setRequestProperty("Range", "bytes=" + offset + "-");
+ }
+ return conn;
}
}
+ static private final String OFFSET_PARAM_PREFIX = OffsetParam.NAME + "=";
+
+ /** Remove offset parameter, if there is any, from the url */
+ static URL removeOffsetParam(final URL url) throws MalformedURLException {
+ String query = url.getQuery();
+ if (query == null) {
+ return url;
+ }
+ final String lower = query.toLowerCase();
+ if (!lower.startsWith(OFFSET_PARAM_PREFIX)
+ && !lower.contains("&" + OFFSET_PARAM_PREFIX)) {
+ return url;
+ }
+
+ //rebuild query
+ StringBuilder b = null;
+ for(final StringTokenizer st = new StringTokenizer(query, "&");
+ st.hasMoreTokens();) {
+ final String token = st.nextToken();
+ if (!token.toLowerCase().startsWith(OFFSET_PARAM_PREFIX)) {
+ if (b == null) {
+ b = new StringBuilder("?").append(token);
+ } else {
+ b.append('&').append(token);
+ }
+ }
+ }
+ query = b == null? "": b.toString();
+
+ final String urlStr = url.toString();
+ return new URL(urlStr.substring(0, urlStr.indexOf('?')) + query);
+ }
+
enum StreamStatus {
NORMAL, SEEK
}
@@ -95,12 +143,8 @@ public class ByteRangeInputStream extend
final URLOpener opener =
(resolvedURL.getURL() == null) ? originalURL : resolvedURL;
- final HttpURLConnection connection = opener.openConnection();
+ final HttpURLConnection connection = opener.openConnection(startPos);
try {
- connection.setRequestMethod("GET");
- if (startPos != 0) {
- connection.setRequestProperty("Range", "bytes="+startPos+"-");
- }
connection.connect();
final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
filelength = (cl == null) ? -1 : Long.parseLong(cl);
@@ -125,7 +169,7 @@ public class ByteRangeInputStream extend
throw new IOException("HTTP_OK expected, received " + respCode);
}
- resolvedURL.setURL(connection.getURL());
+ resolvedURL.setURL(removeOffsetParam(connection.getURL()));
status = StreamStatus.NORMAL;
}
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java?rev=1186509&r1=1186508&r2=1186509&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java Wed Oct 19 21:40:20 2011
@@ -100,7 +100,7 @@ public class DatanodeWebHdfsMethods {
final ReplicationParam replication,
@QueryParam(BlockSizeParam.NAME) @DefaultValue(BlockSizeParam.DEFAULT)
final BlockSizeParam blockSize
- ) throws IOException, URISyntaxException, InterruptedException {
+ ) throws IOException, InterruptedException {
if (LOG.isTraceEnabled()) {
LOG.trace(op + ": " + path + ", ugi=" + ugi
@@ -156,7 +156,7 @@ public class DatanodeWebHdfsMethods {
final PostOpParam op,
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
final BufferSizeParam bufferSize
- ) throws IOException, URISyntaxException, InterruptedException {
+ ) throws IOException, InterruptedException {
if (LOG.isTraceEnabled()) {
LOG.trace(op + ": " + path + ", ugi=" + ugi
@@ -209,7 +209,7 @@ public class DatanodeWebHdfsMethods {
final LengthParam length,
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
final BufferSizeParam bufferSize
- ) throws IOException, URISyntaxException, InterruptedException {
+ ) throws IOException, InterruptedException {
if (LOG.isTraceEnabled()) {
LOG.trace(op + ": " + path + ", ugi=" + ugi
@@ -248,7 +248,11 @@ public class DatanodeWebHdfsMethods {
}
}
};
- return Response.ok(streaming).type(MediaType.APPLICATION_OCTET_STREAM).build();
+
+ final int status = offset.getValue() == 0?
+ HttpServletResponse.SC_OK: HttpServletResponse.SC_PARTIAL_CONTENT;
+ return Response.status(status).entity(streaming).type(
+ MediaType.APPLICATION_OCTET_STREAM).build();
}
case GETFILECHECKSUM:
{
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java?rev=1186509&r1=1186508&r2=1186509&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java Wed Oct 19 21:40:20 2011
@@ -31,6 +31,8 @@ import java.net.URL;
import java.security.PrivilegedExceptionAction;
import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.ContentSummary;
@@ -45,6 +47,7 @@ import org.apache.hadoop.hdfs.ByteRangeI
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -91,6 +94,7 @@ import org.mortbay.util.ajax.JSON;
/** A FileSystem for HDFS over the web. */
public class WebHdfsFileSystem extends FileSystem
implements DelegationTokenRenewer.Renewable {
+ public static final Log LOG = LogFactory.getLog(WebHdfsFileSystem.class);
/** File System URI: {SCHEME}://namenode:port/path/to/file */
public static final String SCHEME = "webhdfs";
/** Http URI: http://namenode:port/{PATH_PREFIX}/path/to/file */
@@ -413,6 +417,17 @@ public class WebHdfsFileSystem extends F
run(op, p, new ModificationTimeParam(mtime), new AccessTimeParam(atime));
}
+ @Override
+ public long getDefaultBlockSize() {
+ return getConf().getLong("dfs.block.size", FSConstants.DEFAULT_BLOCK_SIZE);
+ }
+
+ @Override
+ public short getDefaultReplication() {
+ return (short)getConf().getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
+ DFSConfigKeys.DFS_REPLICATION_DEFAULT);
+ }
+
private FSDataOutputStream write(final HttpOpParam.Op op,
final HttpURLConnection conn, final int bufferSize) throws IOException {
return new FSDataOutputStream(new BufferedOutputStream(
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/web/resources/ExceptionHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/web/resources/ExceptionHandler.java?rev=1186509&r1=1186508&r2=1186509&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/web/resources/ExceptionHandler.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/web/resources/ExceptionHandler.java Wed Oct 19 21:40:20 2011
@@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.web.resou
import java.io.FileNotFoundException;
import java.io.IOException;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.ext.ExceptionMapper;
@@ -36,12 +38,17 @@ import com.sun.jersey.api.ParamException
public class ExceptionHandler implements ExceptionMapper<Exception> {
public static final Log LOG = LogFactory.getLog(ExceptionHandler.class);
+ private @Context HttpServletResponse response;
+
@Override
public Response toResponse(Exception e) {
if (LOG.isTraceEnabled()) {
LOG.trace("GOT EXCEPITION", e);
}
+ //clear content type
+ response.setContentType(null);
+
//Convert exception
if (e instanceof ParamException) {
final ParamException paramexception = (ParamException)e;
Added: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestByteRangeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestByteRangeInputStream.java?rev=1186509&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestByteRangeInputStream.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestByteRangeInputStream.java Wed Oct 19 21:40:20 2011
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+import org.apache.hadoop.hdfs.ByteRangeInputStream.URLOpener;
+import org.junit.Test;
+
+class MockHttpURLConnection extends HttpURLConnection {
+ public MockHttpURLConnection(URL u) {
+ super(u);
+ }
+
+ @Override
+ public boolean usingProxy(){
+ return false;
+ }
+
+ @Override
+ public void disconnect() {
+ }
+
+ @Override
+ public void connect() {
+ }
+
+ @Override
+ public InputStream getInputStream() throws IOException {
+ return new ByteArrayInputStream("asdf".getBytes());
+ }
+
+ @Override
+ public URL getURL() {
+ URL u = null;
+ try {
+ u = new URL("http://resolvedurl/");
+ } catch (Exception e) {
+ System.out.println(e.getMessage());
+ }
+ return u;
+ }
+
+ @Override
+ public int getResponseCode() {
+ if (responseCode != -1) {
+ return responseCode;
+ } else {
+ if (getRequestProperty("Range") == null) {
+ return 200;
+ } else {
+ return 206;
+ }
+ }
+ }
+
+ public void setResponseCode(int resCode) {
+ responseCode = resCode;
+ }
+}
+
+public class TestByteRangeInputStream {
+ @Test
+ public void testRemoveOffset() throws IOException {
+ { //no offset
+ String s = "http://test/Abc?Length=99";
+ assertEquals(s, ByteRangeInputStream.removeOffsetParam(new URL(s)).toString());
+ }
+
+ { //no parameters
+ String s = "http://test/Abc";
+ assertEquals(s, ByteRangeInputStream.removeOffsetParam(new URL(s)).toString());
+ }
+
+ { //offset as first parameter
+ String s = "http://test/Abc?offset=10&Length=99";
+ assertEquals("http://test/Abc?Length=99",
+ ByteRangeInputStream.removeOffsetParam(new URL(s)).toString());
+ }
+
+ { //offset as second parameter
+ String s = "http://test/Abc?op=read&OFFset=10&Length=99";
+ assertEquals("http://test/Abc?op=read&Length=99",
+ ByteRangeInputStream.removeOffsetParam(new URL(s)).toString());
+ }
+
+ { //offset as last parameter
+ String s = "http://test/Abc?Length=99&offset=10";
+ assertEquals("http://test/Abc?Length=99",
+ ByteRangeInputStream.removeOffsetParam(new URL(s)).toString());
+ }
+
+ { //offset as the only parameter
+ String s = "http://test/Abc?offset=10";
+ assertEquals("http://test/Abc",
+ ByteRangeInputStream.removeOffsetParam(new URL(s)).toString());
+ }
+ }
+
+ @Test
+ public void testByteRange() throws IOException {
+ URLOpener ospy = spy(new URLOpener(new URL("http://test/")));
+ doReturn(new MockHttpURLConnection(ospy.getURL())).when(ospy)
+ .openConnection();
+ URLOpener rspy = spy(new URLOpener((URL) null));
+ doReturn(new MockHttpURLConnection(rspy.getURL())).when(rspy)
+ .openConnection();
+ ByteRangeInputStream is = new ByteRangeInputStream(ospy, rspy);
+
+ assertEquals("getPos wrong", 0, is.getPos());
+
+ is.read();
+
+ assertNull("Initial call made incorrectly (Range Check)", ospy
+ .openConnection().getRequestProperty("Range"));
+
+ assertEquals("getPos should be 1 after reading one byte", 1, is.getPos());
+
+ is.read();
+
+ assertEquals("getPos should be 2 after reading two bytes", 2, is.getPos());
+
+ // No additional connections should have been made (no seek)
+
+ rspy.setURL(new URL("http://resolvedurl/"));
+
+ is.seek(100);
+ is.read();
+
+ assertEquals("Seek to 100 bytes made incorrectly (Range Check)",
+ "bytes=100-", rspy.openConnection().getRequestProperty("Range"));
+
+ assertEquals("getPos should be 101 after reading one byte", 101,
+ is.getPos());
+
+ verify(rspy, times(2)).openConnection();
+
+ is.seek(101);
+ is.read();
+
+ verify(rspy, times(2)).openConnection();
+
+ // Seek to 101 should not result in another request"
+
+ is.seek(2500);
+ is.read();
+
+ assertEquals("Seek to 2500 bytes made incorrectly (Range Check)",
+ "bytes=2500-", rspy.openConnection().getRequestProperty("Range"));
+
+ ((MockHttpURLConnection) rspy.openConnection()).setResponseCode(200);
+ is.seek(500);
+
+ try {
+ is.read();
+ fail("Exception should be thrown when 200 response is given "
+ + "but 206 is expected");
+ } catch (IOException e) {
+ assertEquals("Should fail because incorrect response code was sent",
+ "HTTP_PARTIAL expected, received 200", e.getMessage());
+ }
+
+ ((MockHttpURLConnection) rspy.openConnection()).setResponseCode(206);
+ is.seek(0);
+
+ try {
+ is.read();
+ fail("Exception should be thrown when 206 response is given "
+ + "but 200 is expected");
+ } catch (IOException e) {
+ assertEquals("Should fail because incorrect response code was sent",
+ "HTTP_OK expected, received 206", e.getMessage());
+ }
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java?rev=1186509&r1=1186508&r2=1186509&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java Wed Oct 19 21:40:20 2011
@@ -129,6 +129,7 @@ public class TestWebHdfsFileSystemContra
//For WebHdfsFileSystem,
//disable testListStatusReturnsNullForNonExistentFile
//and add testListStatusThrowsExceptionForNonExistentFile below.
+ @Override
public void testListStatusReturnsNullForNonExistentFile() {}
public void testListStatusThrowsExceptionForNonExistentFile() throws Exception {
@@ -140,6 +141,8 @@ public class TestWebHdfsFileSystemContra
}
}
+ //the following are new tests (i.e. not over-riding the super class methods)
+
public void testGetFileBlockLocations() throws IOException {
final String f = "/test/testGetFileBlockLocations";
final Path p = path(f);
@@ -192,4 +195,45 @@ public class TestWebHdfsFileSystemContra
WebHdfsFileSystem.LOG.info("This is expected.", fnfe);
}
}
+
+ public void testSeek() throws IOException {
+ final Path p = new Path("/test/testSeek");
+ createFile(p);
+
+ final int one_third = data.length/3;
+ final int two_third = one_third*2;
+
+ { //test seek
+ final int offset = one_third;
+ final int len = data.length - offset;
+ final byte[] buf = new byte[len];
+
+ final FSDataInputStream in = fs.open(p);
+ in.seek(offset);
+
+ //read all remaining data
+ in.readFully(buf);
+ in.close();
+
+ for (int i = 0; i < buf.length; i++) {
+ assertEquals("Position " + i + ", offset=" + offset + ", length=" + len,
+ data[i + offset], buf[i]);
+ }
+ }
+
+ { //test position read (read the data after the two_third location)
+ final int offset = two_third;
+ final int len = data.length - offset;
+ final byte[] buf = new byte[len];
+
+ final FSDataInputStream in = fs.open(p);
+ in.readFully(offset, buf);
+ in.close();
+
+ for (int i = 0; i < buf.length; i++) {
+ assertEquals("Position " + i + ", offset=" + offset + ", length=" + len,
+ data[i + offset], buf[i]);
+ }
+ }
+ }
}