You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/05/04 22:04:29 UTC
[10/50] [abbrv] hadoop git commit: HDFS-5574. Remove buffer copy in
BlockReader.skip. Contributed by Binglin Chang.
HDFS-5574. Remove buffer copy in BlockReader.skip. Contributed by Binglin Chang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/69f6468e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/69f6468e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/69f6468e
Branch: refs/heads/YARN-2928
Commit: 69f6468e5a9db3620e933207e18e3c8f37b234ad
Parents: 26fee33
Author: Akira Ajisaka <aa...@apache.org>
Authored: Thu Apr 30 19:09:57 2015 +0900
Committer: Zhijie Shen <zj...@apache.org>
Committed: Mon May 4 12:58:54 2015 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/fs/FSInputChecker.java | 25 +++-
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../apache/hadoop/hdfs/RemoteBlockReader.java | 18 +--
.../apache/hadoop/hdfs/RemoteBlockReader2.java | 37 +++---
.../apache/hadoop/hdfs/TestBlockReaderBase.java | 94 +++++++++++++++
.../apache/hadoop/hdfs/TestDFSInputStream.java | 114 +++++++++++++++++++
.../hadoop/hdfs/TestRemoteBlockReader.java | 27 +++++
.../hadoop/hdfs/TestRemoteBlockReader2.java | 25 ++++
8 files changed, 309 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/69f6468e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputChecker.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputChecker.java
index 889ccc1..9b66c95 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputChecker.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputChecker.java
@@ -214,7 +214,30 @@ abstract public class FSInputChecker extends FSInputStream {
count = readChecksumChunk(buf, 0, maxChunkSize);
if (count < 0) count = 0;
}
-
+
+ /**
+ * Like read(byte[], int, int), but does not provide a dest buffer,
+ * so the read data is discarded.
+ * @param len maximum number of bytes to read.
+ * @return the number of bytes read.
+ * @throws IOException if an I/O error occurs.
+ */
+ final protected synchronized int readAndDiscard(int len) throws IOException {
+ int total = 0;
+ while (total < len) {
+ if (pos >= count) {
+ count = readChecksumChunk(buf, 0, maxChunkSize);
+ if (count <= 0) {
+ break;
+ }
+ }
+ int rd = Math.min(count - pos, len - total);
+ pos += rd;
+ total += rd;
+ }
+ return total;
+ }
+
/*
* Read characters into a portion of an array, reading from the underlying
* stream at most once if necessary.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/69f6468e/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index fbeb45d..c538b78 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -483,6 +483,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8283. DataStreamer cleanup and some minor improvement. (szetszwo via
jing9)
+ HDFS-5574. Remove buffer copy in BlockReader.skip.
+ (Binglin Chang via aajisaka)
+
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
http://git-wip-us.apache.org/repos/asf/hadoop/blob/69f6468e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
index ce96ac9..d70f419 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
@@ -97,7 +97,6 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
private boolean eos = false;
private boolean sentStatusCode = false;
- byte[] skipBuf = null;
ByteBuffer checksumBytes = null;
/** Amount of unread data in the current received packet */
int dataLeft = 0;
@@ -126,10 +125,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
// Skip these bytes. But don't call this.skip()!
int toSkip = (int)(startOffset - firstChunkOffset);
- if ( skipBuf == null ) {
- skipBuf = new byte[bytesPerChecksum];
- }
- if ( super.read(skipBuf, 0, toSkip) != toSkip ) {
+ if ( super.readAndDiscard(toSkip) != toSkip ) {
// should never happen
throw new IOException("Could not skip required number of bytes");
}
@@ -152,15 +148,11 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
public synchronized long skip(long n) throws IOException {
/* How can we make sure we don't throw a ChecksumException, at least
* in majority of the cases?. This one throws. */
- if ( skipBuf == null ) {
- skipBuf = new byte[bytesPerChecksum];
- }
-
long nSkipped = 0;
- while ( nSkipped < n ) {
- int toSkip = (int)Math.min(n-nSkipped, skipBuf.length);
- int ret = read(skipBuf, 0, toSkip);
- if ( ret <= 0 ) {
+ while (nSkipped < n) {
+ int toSkip = (int)Math.min(n-nSkipped, Integer.MAX_VALUE);
+ int ret = readAndDiscard(toSkip);
+ if (ret <= 0) {
return nSkipped;
}
nSkipped += ret;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/69f6468e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
index 9245a84..c368d65 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
@@ -122,12 +122,7 @@ public class RemoteBlockReader2 implements BlockReader {
private final boolean verifyChecksum;
private boolean sentStatusCode = false;
-
- byte[] skipBuf = null;
- ByteBuffer checksumBytes = null;
- /** Amount of unread data in the current received packet */
- int dataLeft = 0;
-
+
@VisibleForTesting
public Peer getPeer() {
return peer;
@@ -172,7 +167,7 @@ public class RemoteBlockReader2 implements BlockReader {
@Override
- public int read(ByteBuffer buf) throws IOException {
+ public synchronized int read(ByteBuffer buf) throws IOException {
if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
TraceScope scope = Trace.startSpan(
"RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER);
@@ -257,21 +252,23 @@ public class RemoteBlockReader2 implements BlockReader {
@Override
public synchronized long skip(long n) throws IOException {
/* How can we make sure we don't throw a ChecksumException, at least
- * in majority of the cases?. This one throws. */
- if ( skipBuf == null ) {
- skipBuf = new byte[bytesPerChecksum];
- }
-
- long nSkipped = 0;
- while ( nSkipped < n ) {
- int toSkip = (int)Math.min(n-nSkipped, skipBuf.length);
- int ret = read(skipBuf, 0, toSkip);
- if ( ret <= 0 ) {
- return nSkipped;
+ * in majority of the cases?. This one throws. */
+ long skipped = 0;
+ while (skipped < n) {
+ long needToSkip = n - skipped;
+ if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
+ readNextPacket();
}
- nSkipped += ret;
+ if (curDataSlice.remaining() == 0) {
+ // we're at EOF now
+ break;
+ }
+
+ int skip = (int)Math.min(curDataSlice.remaining(), needToSkip);
+ curDataSlice.position(curDataSlice.position() + skip);
+ skipped += skip;
}
- return nSkipped;
+ return skipped;
}
private void readTrailingEmptyPacket() throws IOException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/69f6468e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderBase.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderBase.java
new file mode 100644
index 0000000..3d916a7
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderBase.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+abstract public class TestBlockReaderBase {
+ private BlockReaderTestUtil util;
+ private byte[] blockData;
+ private BlockReader reader;
+
+ /**
+ * if override this, make sure return array length is less than
+ * block size.
+ */
+ byte [] getBlockData() {
+ int length = 1 << 22;
+ byte[] data = new byte[length];
+ for (int i = 0; i < length; i++) {
+ data[i] = (byte) (i % 133);
+ }
+ return data;
+ }
+
+ private BlockReader getBlockReader(LocatedBlock block) throws Exception {
+ return util.getBlockReader(block, 0, blockData.length);
+ }
+
+ abstract HdfsConfiguration createConf();
+
+ @Before
+ public void setup() throws Exception {
+ util = new BlockReaderTestUtil(1, createConf());
+ blockData = getBlockData();
+ DistributedFileSystem fs = util.getCluster().getFileSystem();
+ Path testfile = new Path("/testfile");
+ FSDataOutputStream fout = fs.create(testfile);
+ fout.write(blockData);
+ fout.close();
+ LocatedBlock blk = util.getFileBlocks(testfile, blockData.length).get(0);
+ reader = getBlockReader(blk);
+ }
+
+ @After
+ public void shutdown() throws Exception {
+ util.shutdown();
+ }
+
+ @Test(timeout=60000)
+ public void testSkip() throws IOException {
+ Random random = new Random();
+ byte [] buf = new byte[1];
+ for (int pos = 0; pos < blockData.length;) {
+ long skip = random.nextInt(100) + 1;
+ long skipped = reader.skip(skip);
+ if (pos + skip >= blockData.length) {
+ assertEquals(blockData.length, pos + skipped);
+ break;
+ } else {
+ assertEquals(skip, skipped);
+ pos += skipped;
+ assertEquals(1, reader.read(buf, 0, 1));
+
+ assertEquals(blockData[pos], buf[0]);
+ pos += 1;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/69f6468e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java
new file mode 100644
index 0000000..b9ec2ce
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.assertEquals;
+import static org.hamcrest.CoreMatchers.equalTo;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.junit.Assume;
+import org.junit.Test;
+
+public class TestDFSInputStream {
+ private void testSkipInner(MiniDFSCluster cluster) throws IOException {
+ DistributedFileSystem fs = cluster.getFileSystem();
+ DFSClient client = fs.dfs;
+ Path file = new Path("/testfile");
+ int fileLength = 1 << 22;
+ byte[] fileContent = new byte[fileLength];
+ for (int i = 0; i < fileLength; i++) {
+ fileContent[i] = (byte) (i % 133);
+ }
+ FSDataOutputStream fout = fs.create(file);
+ fout.write(fileContent);
+ fout.close();
+ Random random = new Random();
+ for (int i = 3; i < 18; i++) {
+ DFSInputStream fin = client.open("/testfile");
+ for (long pos = 0; pos < fileLength;) {
+ long skip = random.nextInt(1 << i) + 1;
+ long skipped = fin.skip(skip);
+ if (pos + skip >= fileLength) {
+ assertEquals(fileLength, pos + skipped);
+ break;
+ } else {
+ assertEquals(skip, skipped);
+ pos += skipped;
+ int data = fin.read();
+ assertEquals(pos % 133, data);
+ pos += 1;
+ }
+ }
+ fin.close();
+ }
+ }
+
+ @Test(timeout=60000)
+ public void testSkipWithRemoteBlockReader() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, true);
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+ try {
+ testSkipInner(cluster);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ @Test(timeout=60000)
+ public void testSkipWithRemoteBlockReader2() throws IOException {
+ Configuration conf = new Configuration();
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+ try {
+ testSkipInner(cluster);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ @Test(timeout=60000)
+ public void testSkipWithLocalBlockReader() throws IOException {
+ Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
+ TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+ DomainSocket.disableBindPathValidation();
+ Configuration conf = new Configuration();
+ conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
+ conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+ new File(sockDir.getDir(),
+ "TestShortCircuitLocalRead._PORT.sock").getAbsolutePath());
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+ try {
+ DFSInputStream.tcpReadsDisabledForTesting = true;
+ testSkipInner(cluster);
+ } finally {
+ DFSInputStream.tcpReadsDisabledForTesting = false;
+ cluster.shutdown();
+ sockDir.close();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/69f6468e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRemoteBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRemoteBlockReader.java
new file mode 100644
index 0000000..8ab110d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRemoteBlockReader.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+public class TestRemoteBlockReader extends TestBlockReaderBase {
+
+ HdfsConfiguration createConf() {
+ HdfsConfiguration conf = new HdfsConfiguration();
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, true);
+ return conf;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/69f6468e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRemoteBlockReader2.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRemoteBlockReader2.java
new file mode 100644
index 0000000..c23b4b7
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRemoteBlockReader2.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+public class TestRemoteBlockReader2 extends TestBlockReaderBase {
+ HdfsConfiguration createConf() {
+ HdfsConfiguration conf = new HdfsConfiguration();
+ return conf;
+ }
+}