You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/05/01 09:01:57 UTC

[40/50] [abbrv] hadoop git commit: HDFS-5574. Remove buffer copy in BlockReader.skip. Contributed by Binglin Chang.

HDFS-5574. Remove buffer copy in BlockReader.skip. Contributed by Binglin Chang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e89fc53a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e89fc53a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e89fc53a

Branch: refs/heads/HDFS-7240
Commit: e89fc53a1d264fde407dd2c36defab5241cd0b52
Parents: f5b3847
Author: Akira Ajisaka <aa...@apache.org>
Authored: Thu Apr 30 19:09:57 2015 +0900
Committer: Akira Ajisaka <aa...@apache.org>
Committed: Thu Apr 30 19:11:22 2015 +0900

----------------------------------------------------------------------
 .../org/apache/hadoop/fs/FSInputChecker.java    |  25 +++-
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../apache/hadoop/hdfs/RemoteBlockReader.java   |  18 +--
 .../apache/hadoop/hdfs/RemoteBlockReader2.java  |  37 +++---
 .../apache/hadoop/hdfs/TestBlockReaderBase.java |  94 +++++++++++++++
 .../apache/hadoop/hdfs/TestDFSInputStream.java  | 114 +++++++++++++++++++
 .../hadoop/hdfs/TestRemoteBlockReader.java      |  27 +++++
 .../hadoop/hdfs/TestRemoteBlockReader2.java     |  25 ++++
 8 files changed, 309 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e89fc53a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputChecker.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputChecker.java
index 889ccc1..9b66c95 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputChecker.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputChecker.java
@@ -214,7 +214,30 @@ abstract public class FSInputChecker extends FSInputStream {
     count = readChecksumChunk(buf, 0, maxChunkSize);
     if (count < 0) count = 0;
   }
-  
+
+  /**
+   * Like read(byte[], int, int), but does not provide a dest buffer,
+   * so the read data is discarded.
+   * @param      len maximum number of bytes to read.
+   * @return     the number of bytes read.
+   * @throws     IOException  if an I/O error occurs.
+   */
+  final protected synchronized int readAndDiscard(int len) throws IOException {
+    int total = 0;
+    while (total < len) {
+      if (pos >= count) {
+        count = readChecksumChunk(buf, 0, maxChunkSize);
+        if (count <= 0) {
+          break;
+        }
+      }
+      int rd = Math.min(count - pos, len - total);
+      pos += rd;
+      total += rd;
+    }
+    return total;
+  }
+
   /*
    * Read characters into a portion of an array, reading from the underlying
    * stream at most once if necessary.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e89fc53a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index fbeb45d..c538b78 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -483,6 +483,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-8283. DataStreamer cleanup and some minor improvement. (szetszwo via
     jing9)
 
+    HDFS-5574. Remove buffer copy in BlockReader.skip.
+    (Binglin Chang via aajisaka)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e89fc53a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
index ce96ac9..d70f419 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
@@ -97,7 +97,6 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
   private boolean eos = false;
   private boolean sentStatusCode = false;
   
-  byte[] skipBuf = null;
   ByteBuffer checksumBytes = null;
   /** Amount of unread data in the current received packet */
   int dataLeft = 0;
@@ -126,10 +125,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
     if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
       // Skip these bytes. But don't call this.skip()!
       int toSkip = (int)(startOffset - firstChunkOffset);
-      if ( skipBuf == null ) {
-        skipBuf = new byte[bytesPerChecksum];
-      }
-      if ( super.read(skipBuf, 0, toSkip) != toSkip ) {
+      if ( super.readAndDiscard(toSkip) != toSkip ) {
         // should never happen
         throw new IOException("Could not skip required number of bytes");
       }
@@ -152,15 +148,11 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
   public synchronized long skip(long n) throws IOException {
     /* How can we make sure we don't throw a ChecksumException, at least
      * in majority of the cases?. This one throws. */  
-    if ( skipBuf == null ) {
-      skipBuf = new byte[bytesPerChecksum]; 
-    }
-
     long nSkipped = 0;
-    while ( nSkipped < n ) {
-      int toSkip = (int)Math.min(n-nSkipped, skipBuf.length);
-      int ret = read(skipBuf, 0, toSkip);
-      if ( ret <= 0 ) {
+    while (nSkipped < n) {
+      int toSkip = (int)Math.min(n-nSkipped, Integer.MAX_VALUE);
+      int ret = readAndDiscard(toSkip);
+      if (ret <= 0) {
         return nSkipped;
       }
       nSkipped += ret;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e89fc53a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
index 9245a84..c368d65 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
@@ -122,12 +122,7 @@ public class RemoteBlockReader2  implements BlockReader {
   private final boolean verifyChecksum;
 
   private boolean sentStatusCode = false;
-  
-  byte[] skipBuf = null;
-  ByteBuffer checksumBytes = null;
-  /** Amount of unread data in the current received packet */
-  int dataLeft = 0;
-  
+
   @VisibleForTesting
   public Peer getPeer() {
     return peer;
@@ -172,7 +167,7 @@ public class RemoteBlockReader2  implements BlockReader {
 
 
   @Override
-  public int read(ByteBuffer buf) throws IOException {
+  public synchronized int read(ByteBuffer buf) throws IOException {
     if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
       TraceScope scope = Trace.startSpan(
           "RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER);
@@ -257,21 +252,23 @@ public class RemoteBlockReader2  implements BlockReader {
   @Override
   public synchronized long skip(long n) throws IOException {
     /* How can we make sure we don't throw a ChecksumException, at least
-     * in majority of the cases?. This one throws. */  
-    if ( skipBuf == null ) {
-      skipBuf = new byte[bytesPerChecksum]; 
-    }
-
-    long nSkipped = 0;
-    while ( nSkipped < n ) {
-      int toSkip = (int)Math.min(n-nSkipped, skipBuf.length);
-      int ret = read(skipBuf, 0, toSkip);
-      if ( ret <= 0 ) {
-        return nSkipped;
+     * in majority of the cases?. This one throws. */
+    long skipped = 0;
+    while (skipped < n) {
+      long needToSkip = n - skipped;
+      if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
+        readNextPacket();
       }
-      nSkipped += ret;
+      if (curDataSlice.remaining() == 0) {
+        // we're at EOF now
+        break;
+      }
+
+      int skip = (int)Math.min(curDataSlice.remaining(), needToSkip);
+      curDataSlice.position(curDataSlice.position() + skip);
+      skipped += skip;
     }
-    return nSkipped;
+    return skipped;
   }
 
   private void readTrailingEmptyPacket() throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e89fc53a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderBase.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderBase.java
new file mode 100644
index 0000000..3d916a7
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderBase.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+abstract public class TestBlockReaderBase {
+  private BlockReaderTestUtil util;
+  private byte[] blockData;
+  private BlockReader reader;
+
+  /**
+   * if override this, make sure return array length is less than
+   * block size.
+   */
+  byte [] getBlockData() {
+    int length = 1 << 22;
+    byte[] data = new byte[length];
+    for (int i = 0; i < length; i++) {
+      data[i] = (byte) (i % 133);
+    }
+    return data;
+  }
+
+  private BlockReader getBlockReader(LocatedBlock block) throws Exception {
+    return util.getBlockReader(block, 0, blockData.length);
+  }
+
+  abstract HdfsConfiguration createConf();
+
+  @Before
+  public void setup() throws Exception {
+    util = new BlockReaderTestUtil(1, createConf());
+    blockData = getBlockData();
+    DistributedFileSystem fs = util.getCluster().getFileSystem();
+    Path testfile = new Path("/testfile");
+    FSDataOutputStream fout = fs.create(testfile);
+    fout.write(blockData);
+    fout.close();
+    LocatedBlock blk = util.getFileBlocks(testfile, blockData.length).get(0);
+    reader = getBlockReader(blk);
+  }
+
+  @After
+  public void shutdown() throws Exception {
+    util.shutdown();
+  }
+
+  @Test(timeout=60000)
+  public void testSkip() throws IOException {
+    Random random = new Random();
+    byte [] buf = new byte[1];
+    for (int pos = 0; pos < blockData.length;) {
+      long skip = random.nextInt(100) + 1;
+      long skipped = reader.skip(skip);
+      if (pos + skip >= blockData.length) {
+        assertEquals(blockData.length, pos + skipped);
+        break;
+      } else {
+        assertEquals(skip, skipped);
+        pos += skipped;
+        assertEquals(1, reader.read(buf, 0, 1));
+
+        assertEquals(blockData[pos], buf[0]);
+        pos += 1;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e89fc53a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java
new file mode 100644
index 0000000..b9ec2ce
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.assertEquals;
+import static org.hamcrest.CoreMatchers.equalTo;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.junit.Assume;
+import org.junit.Test;
+
+public class TestDFSInputStream {
+  private void testSkipInner(MiniDFSCluster cluster) throws IOException {
+    DistributedFileSystem fs = cluster.getFileSystem();
+    DFSClient client = fs.dfs;
+    Path file = new Path("/testfile");
+    int fileLength = 1 << 22;
+    byte[] fileContent = new byte[fileLength];
+    for (int i = 0; i < fileLength; i++) {
+      fileContent[i] = (byte) (i % 133);
+    }
+    FSDataOutputStream fout = fs.create(file);
+    fout.write(fileContent);
+    fout.close();
+    Random random = new Random();
+    for (int i = 3; i < 18; i++) {
+      DFSInputStream fin = client.open("/testfile");
+      for (long pos = 0; pos < fileLength;) {
+        long skip = random.nextInt(1 << i) + 1;
+        long skipped = fin.skip(skip);
+        if (pos + skip >= fileLength) {
+          assertEquals(fileLength, pos + skipped);
+          break;
+        } else {
+          assertEquals(skip, skipped);
+          pos += skipped;
+          int data = fin.read();
+          assertEquals(pos % 133, data);
+          pos += 1;
+        }
+      }
+      fin.close();
+    }
+  }
+
+  @Test(timeout=60000)
+  public void testSkipWithRemoteBlockReader() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, true);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+    try {
+      testSkipInner(cluster);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  @Test(timeout=60000)
+  public void testSkipWithRemoteBlockReader2() throws IOException {
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+    try {
+      testSkipInner(cluster);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  @Test(timeout=60000)
+  public void testSkipWithLocalBlockReader() throws IOException {
+    Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    DomainSocket.disableBindPathValidation();
+    Configuration conf = new Configuration();
+    conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
+    conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+        new File(sockDir.getDir(),
+          "TestShortCircuitLocalRead._PORT.sock").getAbsolutePath());
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+    try {
+      DFSInputStream.tcpReadsDisabledForTesting = true;
+      testSkipInner(cluster);
+    } finally {
+      DFSInputStream.tcpReadsDisabledForTesting = false;
+      cluster.shutdown();
+      sockDir.close();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e89fc53a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRemoteBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRemoteBlockReader.java
new file mode 100644
index 0000000..8ab110d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRemoteBlockReader.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+public class TestRemoteBlockReader extends TestBlockReaderBase {
+
+  HdfsConfiguration createConf() {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, true);
+    return conf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e89fc53a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRemoteBlockReader2.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRemoteBlockReader2.java
new file mode 100644
index 0000000..c23b4b7
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRemoteBlockReader2.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+public class TestRemoteBlockReader2 extends TestBlockReaderBase {
+  HdfsConfiguration createConf() {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    return conf;
+  }
+}