You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2012/04/27 22:13:17 UTC
svn commit: r1331570 - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/ src/test/java/org/apache/hadoop/hdfs/
Author: szetszwo
Date: Fri Apr 27 20:13:17 2012
New Revision: 1331570
URL: http://svn.apache.org/viewvc?rev=1331570&view=rev
Log:
HDFS-3334. Fix ByteRangeInputStream stream leakage. Contributed by Daryn Sharp
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteRangeInputStream.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHftpFileSystem.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1331570&r1=1331569&r2=1331570&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Apr 27 20:13:17 2012
@@ -917,6 +917,8 @@ Release 0.23.3 - UNRELEASED
HDFS-3321. Fix safe mode turn off tip message. (Ravi Prakash via szetszwo)
+ HDFS-3334. Fix ByteRangeInputStream stream leakage. (Daryn Sharp via szetszwo)
+
Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java?rev=1331570&r1=1331569&r2=1331570&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java Fri Apr 27 20:13:17 2012
@@ -27,6 +27,8 @@ import org.apache.commons.io.input.Bound
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* To support HTTP byte streams, a new connection to an HTTP server needs to be
* created each time. This class hides the complexity of those multiple
@@ -61,7 +63,7 @@ public abstract class ByteRangeInputStre
}
enum StreamStatus {
- NORMAL, SEEK
+ NORMAL, SEEK, CLOSED
}
protected InputStream in;
protected URLOpener originalURL;
@@ -89,40 +91,51 @@ public abstract class ByteRangeInputStre
protected abstract URL getResolvedUrl(final HttpURLConnection connection
) throws IOException;
- private InputStream getInputStream() throws IOException {
- if (status != StreamStatus.NORMAL) {
-
- if (in != null) {
- in.close();
- in = null;
- }
-
- // Use the original url if no resolved url exists, eg. if
- // it's the first time a request is made.
- final URLOpener opener =
- (resolvedURL.getURL() == null) ? originalURL : resolvedURL;
-
- final HttpURLConnection connection = opener.openConnection(startPos);
- connection.connect();
- checkResponseCode(connection);
-
- final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
- if (cl == null) {
- throw new IOException(StreamFile.CONTENT_LENGTH+" header is missing");
- }
- final long streamlength = Long.parseLong(cl);
- filelength = startPos + streamlength;
- // Java has a bug with >2GB request streams. It won't bounds check
- // the reads so the transfer blocks until the server times out
- in = new BoundedInputStream(connection.getInputStream(), streamlength);
-
- resolvedURL.setURL(getResolvedUrl(connection));
- status = StreamStatus.NORMAL;
+ @VisibleForTesting
+ protected InputStream getInputStream() throws IOException {
+ switch (status) {
+ case NORMAL:
+ break;
+ case SEEK:
+ if (in != null) {
+ in.close();
+ }
+ in = openInputStream();
+ status = StreamStatus.NORMAL;
+ break;
+ case CLOSED:
+ throw new IOException("Stream closed");
}
-
return in;
}
+ @VisibleForTesting
+ protected InputStream openInputStream() throws IOException {
+ // Use the original url if no resolved url exists, eg. if
+ // it's the first time a request is made.
+ final URLOpener opener =
+ (resolvedURL.getURL() == null) ? originalURL : resolvedURL;
+
+ final HttpURLConnection connection = opener.openConnection(startPos);
+ connection.connect();
+ checkResponseCode(connection);
+
+ final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
+ if (cl == null) {
+ throw new IOException(StreamFile.CONTENT_LENGTH+" header is missing");
+ }
+ final long streamlength = Long.parseLong(cl);
+ filelength = startPos + streamlength;
+ // Java has a bug with >2GB request streams. It won't bounds check
+ // the reads so the transfer blocks until the server times out
+ InputStream is =
+ new BoundedInputStream(connection.getInputStream(), streamlength);
+
+ resolvedURL.setURL(getResolvedUrl(connection));
+
+ return is;
+ }
+
private int update(final int n) throws IOException {
if (n != -1) {
currentPos += n;
@@ -150,17 +163,21 @@ public abstract class ByteRangeInputStre
* The next read() will be from that location. Can't
* seek past the end of the file.
*/
+ @Override
public void seek(long pos) throws IOException {
if (pos != currentPos) {
startPos = pos;
currentPos = pos;
- status = StreamStatus.SEEK;
+ if (status != StreamStatus.CLOSED) {
+ status = StreamStatus.SEEK;
+ }
}
}
/**
* Return the current offset from the start of the file
*/
+ @Override
public long getPos() throws IOException {
return currentPos;
}
@@ -169,7 +186,17 @@ public abstract class ByteRangeInputStre
* Seeks a different copy of the data. Returns true if
* found a new source, false otherwise.
*/
+ @Override
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
-}
\ No newline at end of file
+
+ @Override
+ public void close() throws IOException {
+ if (in != null) {
+ in.close();
+ in = null;
+ }
+ status = StreamStatus.CLOSED;
+ }
+}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteRangeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteRangeInputStream.java?rev=1331570&r1=1331569&r2=1331570&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteRangeInputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteRangeInputStream.java Fri Apr 27 20:13:17 2012
@@ -19,8 +19,10 @@ package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -169,4 +171,74 @@ public static class MockHttpURLConnectio
"HTTP_OK expected, received 206", e.getMessage());
}
}
+
+ @Test
+ public void testPropagatedClose() throws IOException {
+ ByteRangeInputStream brs = spy(
+ new HftpFileSystem.RangeHeaderInputStream(new URL("http://test/")));
+
+ InputStream mockStream = mock(InputStream.class);
+ doReturn(mockStream).when(brs).openInputStream();
+
+ int brisOpens = 0;
+ int brisCloses = 0;
+ int isCloses = 0;
+
+ // first open, shouldn't close underlying stream
+ brs.getInputStream();
+ verify(brs, times(++brisOpens)).openInputStream();
+ verify(brs, times(brisCloses)).close();
+ verify(mockStream, times(isCloses)).close();
+
+ // stream is open, shouldn't close underlying stream
+ brs.getInputStream();
+ verify(brs, times(brisOpens)).openInputStream();
+ verify(brs, times(brisCloses)).close();
+ verify(mockStream, times(isCloses)).close();
+
+ // seek forces a reopen, should close underlying stream
+ brs.seek(1);
+ brs.getInputStream();
+ verify(brs, times(++brisOpens)).openInputStream();
+ verify(brs, times(brisCloses)).close();
+ verify(mockStream, times(++isCloses)).close();
+
+ // verify that the underlying stream isn't closed after a seek
+ // ie. the state was correctly updated
+ brs.getInputStream();
+ verify(brs, times(brisOpens)).openInputStream();
+ verify(brs, times(brisCloses)).close();
+ verify(mockStream, times(isCloses)).close();
+
+ // seeking to same location should be a no-op
+ brs.seek(1);
+ brs.getInputStream();
+ verify(brs, times(brisOpens)).openInputStream();
+ verify(brs, times(brisCloses)).close();
+ verify(mockStream, times(isCloses)).close();
+
+ // close should of course close
+ brs.close();
+ verify(brs, times(++brisCloses)).close();
+ verify(mockStream, times(++isCloses)).close();
+
+ // it's already closed, underlying stream should not close
+ brs.close();
+ verify(brs, times(++brisCloses)).close();
+ verify(mockStream, times(isCloses)).close();
+
+ // it's closed, don't reopen it
+ boolean errored = false;
+ try {
+ brs.getInputStream();
+ } catch (IOException e) {
+ errored = true;
+ assertEquals("Stream closed", e.getMessage());
+ } finally {
+ assertTrue("Read a closed steam", errored);
+ }
+ verify(brs, times(brisOpens)).openInputStream();
+ verify(brs, times(brisCloses)).close();
+ verify(mockStream, times(isCloses)).close();
+ }
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHftpFileSystem.java?rev=1331570&r1=1331569&r2=1331570&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHftpFileSystem.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHftpFileSystem.java Fri Apr 27 20:13:17 2012
@@ -19,6 +19,7 @@
package org.apache.hadoop.hdfs;
import java.io.IOException;
+import java.io.InputStream;
import java.net.URISyntaxException;
import java.net.URI;
import java.net.URL;
@@ -234,6 +235,45 @@ public class TestHftpFileSystem {
assertEquals('7', in.read());
}
+ @Test
+ public void testReadClosedStream() throws IOException {
+ final Path testFile = new Path("/testfile+2");
+ FSDataOutputStream os = hdfs.create(testFile, true);
+ os.writeBytes("0123456789");
+ os.close();
+
+ // ByteRangeInputStream delays opens until reads. Make sure it doesn't
+ // open a closed stream that has never been opened
+ FSDataInputStream in = hftpFs.open(testFile);
+ in.close();
+ checkClosedStream(in);
+ checkClosedStream(in.getWrappedStream());
+
+ // force the stream to connect and then close it
+ in = hftpFs.open(testFile);
+ int ch = in.read();
+ assertEquals('0', ch);
+ in.close();
+ checkClosedStream(in);
+ checkClosedStream(in.getWrappedStream());
+
+ // make sure seeking doesn't automagically reopen the stream
+ in.seek(4);
+ checkClosedStream(in);
+ checkClosedStream(in.getWrappedStream());
+ }
+
+ private void checkClosedStream(InputStream is) {
+ IOException ioe = null;
+ try {
+ is.read();
+ } catch (IOException e) {
+ ioe = e;
+ }
+ assertNotNull("No exception on closed read", ioe);
+ assertEquals("Stream closed", ioe.getMessage());
+ }
+
public void resetFileSystem() throws IOException {
// filesystem caching has a quirk/bug that it caches based on the user's
// given uri. the result is if a filesystem is instantiated with no port,