You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by el...@apache.org on 2011/06/28 17:59:31 UTC
svn commit: r1140694 - in /hadoop/common/trunk/hdfs: ./
src/java/org/apache/hadoop/hdfs/
src/java/org/apache/hadoop/hdfs/server/namenode/
src/test/hdfs/org/apache/hadoop/hdfs/
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/
Author: eli
Date: Tue Jun 28 15:59:31 2011
New Revision: 1140694
URL: http://svn.apache.org/viewvc?rev=1140694&view=rev
Log:
HDFS-2110. StreamFile and ByteRangeInputStream cleanup. Contributed by Eli Collins
Modified:
hadoop/common/trunk/hdfs/CHANGES.txt
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestByteRangeInputStream.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStreamFile.java
Modified: hadoop/common/trunk/hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/CHANGES.txt?rev=1140694&r1=1140693&r2=1140694&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hdfs/CHANGES.txt Tue Jun 28 15:59:31 2011
@@ -532,6 +532,8 @@ Trunk (unreleased changes)
HDFS-1723. quota errors messages should use the same scale. (Jim Plush via
atm)
+ HDFS-2110. StreamFile and ByteRangeInputStream cleanup. (eli)
+
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java?rev=1140694&r1=1140693&r2=1140694&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java Tue Jun 28 15:59:31 2011
@@ -26,7 +26,6 @@ import java.net.URL;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
-
/**
* To support HTTP byte streams, a new connection to an HTTP server needs to be
* created each time. This class hides the complexity of those multiple
@@ -60,7 +59,9 @@ class ByteRangeInputStream extends FSInp
}
}
-
+ enum StreamStatus {
+ NORMAL, SEEK
+ }
protected InputStream in;
protected URLOpener originalURL;
protected URLOpener resolvedURL;
@@ -68,9 +69,7 @@ class ByteRangeInputStream extends FSInp
protected long currentPos = 0;
protected long filelength;
- protected int status = STATUS_SEEK;
- protected static final int STATUS_NORMAL = 0;
- protected static final int STATUS_SEEK = 1;
+ StreamStatus status = StreamStatus.SEEK;
ByteRangeInputStream(final URL url) {
this(new URLOpener(url), new URLOpener(null));
@@ -82,18 +81,19 @@ class ByteRangeInputStream extends FSInp
}
private InputStream getInputStream() throws IOException {
- if (status != STATUS_NORMAL) {
+ if (status != StreamStatus.NORMAL) {
if (in != null) {
in.close();
in = null;
}
- // use the original url if no resolved url exists (e.g., if it's
- // the first time a request is made)
- final URLOpener o = resolvedURL.getURL() == null? originalURL: resolvedURL;
+ // Use the original url if no resolved url exists, eg. if
+ // it's the first time a request is made.
+ final URLOpener opener =
+ (resolvedURL.getURL() == null) ? originalURL : resolvedURL;
- final HttpURLConnection connection = o.openConnection();
+ final HttpURLConnection connection = opener.openConnection();
try {
connection.setRequestMethod("GET");
if (startPos != 0) {
@@ -101,36 +101,35 @@ class ByteRangeInputStream extends FSInp
}
connection.connect();
final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
- filelength = cl == null? -1: Long.parseLong(cl);
+ filelength = (cl == null) ? -1 : Long.parseLong(cl);
if (HftpFileSystem.LOG.isDebugEnabled()) {
HftpFileSystem.LOG.debug("filelength = " + filelength);
}
in = connection.getInputStream();
- } catch(IOException ioe) {
+ } catch (IOException ioe) {
HftpFileSystem.throwIOExceptionFromConnection(connection, ioe);
}
- if (startPos != 0 && connection.getResponseCode() != 206) {
- // we asked for a byte range but did not receive a partial content
+ int respCode = connection.getResponseCode();
+ if (startPos != 0 && respCode != HttpURLConnection.HTTP_PARTIAL) {
+ // We asked for a byte range but did not receive a partial content
// response...
- throw new IOException("206 expected, but received "
- + connection.getResponseCode());
- } else if(startPos == 0 && connection.getResponseCode() != 200) {
- // we asked for all bytes from the beginning but didn't receive a 200
+ throw new IOException("HTTP_PARTIAL expected, received " + respCode);
+ } else if (startPos == 0 && respCode != HttpURLConnection.HTTP_OK) {
+ // We asked for all bytes from the beginning but didn't receive a 200
// response (none of the other 2xx codes are valid here)
- throw new IOException("200 expected, but received "
- + connection.getResponseCode());
+ throw new IOException("HTTP_OK expected, received " + respCode);
}
-
+
resolvedURL.setURL(connection.getURL());
- status = STATUS_NORMAL;
+ status = StreamStatus.NORMAL;
}
return in;
}
- private void update(final boolean isEOF, final int n
- ) throws IOException {
+ private void update(final boolean isEOF, final int n)
+ throws IOException {
if (!isEOF) {
currentPos += n;
} else if (currentPos < filelength) {
@@ -154,7 +153,7 @@ class ByteRangeInputStream extends FSInp
if (pos != currentPos) {
startPos = pos;
currentPos = pos;
- status = STATUS_SEEK;
+ status = StreamStatus.SEEK;
}
}
@@ -162,7 +161,7 @@ class ByteRangeInputStream extends FSInp
* Return the current offset from the start of the file
*/
public long getPos() throws IOException {
- return currentPos; // keep total count?
+ return currentPos;
}
/**
@@ -172,7 +171,4 @@ class ByteRangeInputStream extends FSInp
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
-
-}
-
-
+}
\ No newline at end of file
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java?rev=1140694&r1=1140693&r2=1140694&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java Tue Jun 28 15:59:31 2011
@@ -245,17 +245,26 @@ public class HftpFileSystem extends File
}
}
-
- /*
- Construct URL pointing to file on namenode
- */
- URL getNamenodeFileURL(Path f) throws IOException {
- return getNamenodeURL("/data" + f.toUri().getPath(), "ugi=" + getUgiParameter());
+ /**
+ * Return a URL pointing to given path on the namenode.
+ *
+ * @param p path to obtain the URL for
+ * @return namenode URL referring to the given path
+ * @throws IOException on error constructing the URL
+ */
+ URL getNamenodeFileURL(Path p) throws IOException {
+ return getNamenodeURL("/data" + p.toUri().getPath(),
+ "ugi=" + getUgiParameter());
}
- /*
- Construct URL pointing to namenode.
- */
+ /**
+ * Return a URL pointing to given path on the namenode.
+ *
+ * @param path to obtain the URL for
+ * @param query string to append to the path
+ * @return namenode URL referring to the given path
+ * @throws IOException on error constructing the URL
+ */
URL getNamenodeURL(String path, String query) throws IOException {
try {
final URL url = new URI("http", null, nnAddr.getHostName(),
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java?rev=1140694&r1=1140693&r2=1140694&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java Tue Jun 28 15:59:31 2011
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.util.Enumeration;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hdfs.DFSInputSt
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DatanodeJspHelper;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.mortbay.jetty.InclusiveByteRange;
@@ -96,7 +98,7 @@ public class StreamFile extends DfsServl
filename + "\"");
response.setContentType("application/octet-stream");
response.setHeader(CONTENT_LENGTH, "" + fileLen);
- StreamFile.writeTo(in, os, 0L, fileLen);
+ StreamFile.copyFromOffset(in, os, 0L, fileLen);
}
} catch(IOException e) {
if (LOG.isDebugEnabled()) {
@@ -113,75 +115,46 @@ public class StreamFile extends DfsServl
}
}
+ /**
+ * Send a partial content response with the given range. If there are
+ * no satisfiable ranges, or if multiple ranges are requested, which
+ * is unsupported, respond with range not satisfiable.
+ *
+ * @param in stream to read from
+ * @param out stream to write to
+ * @param response http response to use
+ * @param contentLength for the response header
+ * @param ranges to write to respond with
+ * @throws IOException on error sending the response
+ */
static void sendPartialData(FSInputStream in,
- OutputStream os,
+ OutputStream out,
HttpServletResponse response,
long contentLength,
List<?> ranges)
- throws IOException {
-
+ throws IOException {
if (ranges == null || ranges.size() != 1) {
- // if there are no satisfiable ranges, or if multiple ranges are
- // requested (we don't support multiple range requests), send 416 response
response.setContentLength(0);
- int status = HttpServletResponse.SC_REQUESTED_RANGE_NOT_SATISFIABLE;
- response.setStatus(status);
- response.setHeader("Content-Range",
+ response.setStatus(HttpServletResponse.SC_REQUESTED_RANGE_NOT_SATISFIABLE);
+ response.setHeader("Content-Range",
InclusiveByteRange.to416HeaderRangeString(contentLength));
} else {
- // if there is only a single valid range (must be satisfiable
- // since were here now), send that range with a 206 response
InclusiveByteRange singleSatisfiableRange =
(InclusiveByteRange)ranges.get(0);
long singleLength = singleSatisfiableRange.getSize(contentLength);
response.setStatus(HttpServletResponse.SC_PARTIAL_CONTENT);
response.setHeader("Content-Range",
singleSatisfiableRange.toHeaderRangeString(contentLength));
- System.out.println("first: "+singleSatisfiableRange.getFirst(contentLength));
- System.out.println("singleLength: "+singleLength);
-
- StreamFile.writeTo(in,
- os,
- singleSatisfiableRange.getFirst(contentLength),
- singleLength);
+ copyFromOffset(in, out,
+ singleSatisfiableRange.getFirst(contentLength),
+ singleLength);
}
}
-
- static void writeTo(FSInputStream in,
- OutputStream os,
- long start,
- long count)
- throws IOException {
- byte buf[] = new byte[4096];
- long bytesRemaining = count;
- int bytesRead;
- int bytesToRead;
-
- in.seek(start);
-
- while (true) {
- // number of bytes to read this iteration
- bytesToRead = (int)(bytesRemaining<buf.length ?
- bytesRemaining:
- buf.length);
-
- // number of bytes actually read this iteration
- bytesRead = in.read(buf, 0, bytesToRead);
-
- // if we can't read anymore, break
- if (bytesRead == -1) {
- break;
- }
-
- os.write(buf, 0, bytesRead);
-
- bytesRemaining -= bytesRead;
-
- // if we don't need to read anymore, break
- if (bytesRemaining <= 0) {
- break;
- }
- }
+ /* Copy count bytes at the given offset from one stream to another */
+ static void copyFromOffset(FSInputStream in, OutputStream out, long offset,
+ long count) throws IOException {
+ in.seek(offset);
+ IOUtils.copyBytes(in, out, count);
}
}
Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestByteRangeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestByteRangeInputStream.java?rev=1140694&r1=1140693&r2=1140694&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestByteRangeInputStream.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestByteRangeInputStream.java Tue Jun 28 15:59:31 2011
@@ -23,12 +23,13 @@ import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
-import junit.framework.TestCase;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+
import org.apache.hadoop.hdfs.ByteRangeInputStream;
import org.apache.hadoop.hdfs.ByteRangeInputStream.URLOpener;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
class MockHttpURLConnection extends HttpURLConnection {
MockURL m;
@@ -101,13 +102,9 @@ class MockURL extends URLOpener {
}
}
-
-
-public class TestByteRangeInputStream extends TestCase {
-
- private static final Log LOG =
- LogFactory.getLog(TestByteRangeInputStream.class);
+public class TestByteRangeInputStream {
+ @Test
public void testByteRange() throws IOException, InterruptedException {
MockURL o = new MockURL("http://test/");
MockURL r = new MockURL((URL)null);
@@ -168,7 +165,7 @@ public class TestByteRangeInputStream ex
+ "but 206 is expected");
} catch (IOException e) {
assertEquals("Should fail because incorrect response code was sent",
- "206 expected, but received 200", e.getMessage());
+ "HTTP_PARTIAL expected, received 200", e.getMessage());
}
r.responseCode = 206;
@@ -180,10 +177,7 @@ public class TestByteRangeInputStream ex
+ "but 200 is expected");
} catch (IOException e) {
assertEquals("Should fail because incorrect response code was sent",
- "200 expected, but received 206", e.getMessage());
+ "HTTP_OK expected, received 206", e.getMessage());
}
-
-
-
}
}
Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStreamFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStreamFile.java?rev=1140694&r1=1140693&r2=1140694&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStreamFile.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStreamFile.java Tue Jun 28 15:59:31 2011
@@ -219,7 +219,7 @@ public class TestStreamFile extends Test
assertTrue("Pairs array must be even", pairs.length % 2 == 0);
for (int i = 0; i < pairs.length; i+=2) {
- StreamFile.writeTo(fsin, os, pairs[i], pairs[i+1]);
+ StreamFile.copyFromOffset(fsin, os, pairs[i], pairs[i+1]);
assertArrayEquals("Reading " + pairs[i+1]
+ " bytes from offset " + pairs[i],
getOutputArray(pairs[i], pairs[i+1]),