You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/07/23 02:43:23 UTC
hadoop git commit: HDFS-8797. WebHdfsFileSystem creates too many
connections for pread. Contributed by Jing Zhao.
Repository: hadoop
Updated Branches:
refs/heads/trunk 06e5dd2c8 -> e91ccfad0
HDFS-8797. WebHdfsFileSystem creates too many connections for pread. Contributed by Jing Zhao.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e91ccfad
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e91ccfad
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e91ccfad
Branch: refs/heads/trunk
Commit: e91ccfad07ec5b5674a84009772dd31a82b4e4de
Parents: 06e5dd2
Author: Jing Zhao <ji...@apache.org>
Authored: Wed Jul 22 17:42:31 2015 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Wed Jul 22 17:42:31 2015 -0700
----------------------------------------------------------------------
.../hadoop/hdfs/web/ByteRangeInputStream.java | 57 +++++++++++++++++---
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 +
.../hdfs/web/TestByteRangeInputStream.java | 35 ++++++------
.../org/apache/hadoop/hdfs/web/TestWebHDFS.java | 41 ++++++++++++++
4 files changed, 113 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e91ccfad/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java
index 395c9f6..bb581db 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.web;
+import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
@@ -65,6 +66,16 @@ public abstract class ByteRangeInputStream extends FSInputStream {
final boolean resolved) throws IOException;
}
+ static class InputStreamAndFileLength {
+ final Long length;
+ final InputStream in;
+
+ InputStreamAndFileLength(Long length, InputStream in) {
+ this.length = length;
+ this.in = in;
+ }
+ }
+
enum StreamStatus {
NORMAL, SEEK, CLOSED
}
@@ -101,7 +112,9 @@ public abstract class ByteRangeInputStream extends FSInputStream {
if (in != null) {
in.close();
}
- in = openInputStream();
+ InputStreamAndFileLength fin = openInputStream(startPos);
+ in = fin.in;
+ fileLength = fin.length;
status = StreamStatus.NORMAL;
break;
case CLOSED:
@@ -111,20 +124,22 @@ public abstract class ByteRangeInputStream extends FSInputStream {
}
@VisibleForTesting
- protected InputStream openInputStream() throws IOException {
+ protected InputStreamAndFileLength openInputStream(long startOffset)
+ throws IOException {
// Use the original url if no resolved url exists, eg. if
// it's the first time a request is made.
final boolean resolved = resolvedURL.getURL() != null;
final URLOpener opener = resolved? resolvedURL: originalURL;
- final HttpURLConnection connection = opener.connect(startPos, resolved);
+ final HttpURLConnection connection = opener.connect(startOffset, resolved);
resolvedURL.setURL(getResolvedUrl(connection));
InputStream in = connection.getInputStream();
+ final Long length;
final Map<String, List<String>> headers = connection.getHeaderFields();
if (isChunkedTransferEncoding(headers)) {
// file length is not known
- fileLength = null;
+ length = null;
} else {
// for non-chunked transfer-encoding, get content-length
final String cl = connection.getHeaderField(HttpHeaders.CONTENT_LENGTH);
@@ -133,14 +148,14 @@ public abstract class ByteRangeInputStream extends FSInputStream {
+ headers);
}
final long streamlength = Long.parseLong(cl);
- fileLength = startPos + streamlength;
+ length = startOffset + streamlength;
// Java has a bug with >2GB request streams. It won't bounds check
// the reads so the transfer blocks until the server times out
in = new BoundedInputStream(in, streamlength);
}
- return in;
+ return new InputStreamAndFileLength(length, in);
}
private static boolean isChunkedTransferEncoding(
@@ -204,6 +219,36 @@ public abstract class ByteRangeInputStream extends FSInputStream {
}
}
+ @Override
+ public int read(long position, byte[] buffer, int offset, int length)
+ throws IOException {
+ try (InputStream in = openInputStream(position).in) {
+ return in.read(buffer, offset, length);
+ }
+ }
+
+ @Override
+ public void readFully(long position, byte[] buffer, int offset, int length)
+ throws IOException {
+ final InputStreamAndFileLength fin = openInputStream(position);
+ if (fin.length != null && length + position > fin.length) {
+ throw new EOFException("The length to read " + length
+ + " exceeds the file length " + fin.length);
+ }
+ try {
+ int nread = 0;
+ while (nread < length) {
+ int nbytes = fin.in.read(buffer, offset + nread, length - nread);
+ if (nbytes < 0) {
+ throw new EOFException("End of file reached before reading fully.");
+ }
+ nread += nbytes;
+ }
+ } finally {
+ fin.in.close();
+ }
+ }
+
/**
* Return the current offset from the start of the file
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e91ccfad/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 66cb89e..c3eab70 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -742,6 +742,8 @@ Release 2.8.0 - UNRELEASED
HDFS-8795. Improve InvalidateBlocks#node2blocks. (yliu)
+ HDFS-8797. WebHdfsFileSystem creates too many connections for pread. (jing9)
+
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e91ccfad/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestByteRangeInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestByteRangeInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestByteRangeInputStream.java
index 11deab8..40f2b9c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestByteRangeInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestByteRangeInputStream.java
@@ -35,7 +35,9 @@ import java.net.HttpURLConnection;
import java.net.URL;
import com.google.common.net.HttpHeaders;
+import org.apache.hadoop.hdfs.web.ByteRangeInputStream.InputStreamAndFileLength;
import org.junit.Test;
+import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox;
public class TestByteRangeInputStream {
@@ -140,8 +142,9 @@ public class TestByteRangeInputStream {
public void testPropagatedClose() throws IOException {
ByteRangeInputStream bris =
mock(ByteRangeInputStream.class, CALLS_REAL_METHODS);
- InputStream mockStream = mock(InputStream.class);
- doReturn(mockStream).when(bris).openInputStream();
+ InputStreamAndFileLength mockStream = new InputStreamAndFileLength(1L,
+ mock(InputStream.class));
+ doReturn(mockStream).when(bris).openInputStream(Mockito.anyLong());
Whitebox.setInternalState(bris, "status",
ByteRangeInputStream.StreamStatus.SEEK);
@@ -151,46 +154,46 @@ public class TestByteRangeInputStream {
// first open, shouldn't close underlying stream
bris.getInputStream();
- verify(bris, times(++brisOpens)).openInputStream();
+ verify(bris, times(++brisOpens)).openInputStream(Mockito.anyLong());
verify(bris, times(brisCloses)).close();
- verify(mockStream, times(isCloses)).close();
+ verify(mockStream.in, times(isCloses)).close();
// stream is open, shouldn't close underlying stream
bris.getInputStream();
- verify(bris, times(brisOpens)).openInputStream();
+ verify(bris, times(brisOpens)).openInputStream(Mockito.anyLong());
verify(bris, times(brisCloses)).close();
- verify(mockStream, times(isCloses)).close();
+ verify(mockStream.in, times(isCloses)).close();
// seek forces a reopen, should close underlying stream
bris.seek(1);
bris.getInputStream();
- verify(bris, times(++brisOpens)).openInputStream();
+ verify(bris, times(++brisOpens)).openInputStream(Mockito.anyLong());
verify(bris, times(brisCloses)).close();
- verify(mockStream, times(++isCloses)).close();
+ verify(mockStream.in, times(++isCloses)).close();
// verify that the underlying stream isn't closed after a seek
// ie. the state was correctly updated
bris.getInputStream();
- verify(bris, times(brisOpens)).openInputStream();
+ verify(bris, times(brisOpens)).openInputStream(Mockito.anyLong());
verify(bris, times(brisCloses)).close();
- verify(mockStream, times(isCloses)).close();
+ verify(mockStream.in, times(isCloses)).close();
// seeking to same location should be a no-op
bris.seek(1);
bris.getInputStream();
- verify(bris, times(brisOpens)).openInputStream();
+ verify(bris, times(brisOpens)).openInputStream(Mockito.anyLong());
verify(bris, times(brisCloses)).close();
- verify(mockStream, times(isCloses)).close();
+ verify(mockStream.in, times(isCloses)).close();
// close should of course close
bris.close();
verify(bris, times(++brisCloses)).close();
- verify(mockStream, times(++isCloses)).close();
+ verify(mockStream.in, times(++isCloses)).close();
// it's already closed, underlying stream should not close
bris.close();
verify(bris, times(++brisCloses)).close();
- verify(mockStream, times(isCloses)).close();
+ verify(mockStream.in, times(isCloses)).close();
// it's closed, don't reopen it
boolean errored = false;
@@ -202,9 +205,9 @@ public class TestByteRangeInputStream {
} finally {
assertTrue("Read a closed steam", errored);
}
- verify(bris, times(brisOpens)).openInputStream();
+ verify(bris, times(brisOpens)).openInputStream(Mockito.anyLong());
verify(bris, times(brisCloses)).close();
- verify(mockStream, times(isCloses)).close();
+ verify(mockStream.in, times(isCloses)).close();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e91ccfad/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java
index 0563f12..8bba105 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.fail;
+import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.net.HttpURLConnection;
@@ -45,6 +46,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.TestDFSClientRetries;
@@ -561,6 +563,45 @@ public class TestWebHDFS {
}
}
+ @Test
+ public void testWebHdfsPread() throws Exception {
+ final Configuration conf = WebHdfsTestUtil.createConf();
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
+ .build();
+ byte[] content = new byte[1024];
+ RANDOM.nextBytes(content);
+ final Path foo = new Path("/foo");
+ FSDataInputStream in = null;
+ try {
+ final WebHdfsFileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
+ WebHdfsConstants.WEBHDFS_SCHEME);
+ try (OutputStream os = fs.create(foo)) {
+ os.write(content);
+ }
+
+ // pread
+ in = fs.open(foo, 1024);
+ byte[] buf = new byte[1024];
+ try {
+ in.readFully(1020, buf, 0, 5);
+ Assert.fail("EOF expected");
+ } catch (EOFException ignored) {}
+
+ // mix pread with stateful read
+ int length = in.read(buf, 0, 512);
+ in.readFully(100, new byte[1024], 0, 100);
+ int preadLen = in.read(200, new byte[1024], 0, 200);
+ Assert.assertTrue(preadLen > 0);
+ IOUtils.readFully(in, buf, length, 1024 - length);
+ Assert.assertArrayEquals(content, buf);
+ } finally {
+ if (in != null) {
+ in.close();
+ }
+ cluster.shutdown();
+ }
+ }
+
@Test(timeout = 30000)
public void testGetHomeDirectory() throws Exception {