You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2018/05/23 22:56:22 UTC

[30/50] [abbrv] hadoop git commit: HDFS-13540. DFSStripedInputStream should only allocate new buffers when reading. Contributed by Xiao Chen.

HDFS-13540. DFSStripedInputStream should only allocate new buffers when reading. Contributed by Xiao Chen.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/34e8b9f9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/34e8b9f9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/34e8b9f9

Branch: refs/heads/HDDS-4
Commit: 34e8b9f9a86fb03156861482643fba11bdee1dd4
Parents: fed2bef
Author: Sammi Chen <sa...@intel.com>
Authored: Wed May 23 19:10:09 2018 +0800
Committer: Sammi Chen <sa...@intel.com>
Committed: Wed May 23 19:10:09 2018 +0800

----------------------------------------------------------------------
 .../apache/hadoop/io/ElasticByteBufferPool.java | 12 ++++++
 .../hadoop/hdfs/DFSStripedInputStream.java      | 12 +++---
 .../hadoop/hdfs/TestDFSStripedInputStream.java  | 45 ++++++++++++++++++++
 3 files changed, 64 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/34e8b9f9/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java
index 023f37f..9dd7771 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java
@@ -116,4 +116,16 @@ public final class ElasticByteBufferPool implements ByteBufferPool {
       // poor granularity.
     }
   }
+
+  /**
+   * Get the size of the buffer pool, for the specified buffer type.
+   *
+   * @param direct Whether the size is returned for direct buffers
+   * @return The size
+   */
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public int size(boolean direct) {
+    return getBufferTree(direct).size();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/34e8b9f9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index f3b16e0..5557a50 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -116,12 +116,14 @@ public class DFSStripedInputStream extends DFSInputStream {
     return decoder.preferDirectBuffer();
   }
 
-  void resetCurStripeBuffer() {
-    if (curStripeBuf == null) {
+  private void resetCurStripeBuffer(boolean shouldAllocateBuf) {
+    if (shouldAllocateBuf && curStripeBuf == null) {
       curStripeBuf = BUFFER_POOL.getBuffer(useDirectBuffer(),
           cellSize * dataBlkNum);
     }
-    curStripeBuf.clear();
+    if (curStripeBuf != null) {
+      curStripeBuf.clear();
+    }
     curStripeRange = new StripeRange(0, 0);
   }
 
@@ -206,7 +208,7 @@ public class DFSStripedInputStream extends DFSInputStream {
    */
   @Override
   protected void closeCurrentBlockReaders() {
-    resetCurStripeBuffer();
+    resetCurStripeBuffer(false);
     if (blockReaders ==  null || blockReaders.length == 0) {
       return;
     }
@@ -296,7 +298,7 @@ public class DFSStripedInputStream extends DFSInputStream {
    */
   private void readOneStripe(CorruptedBlocks corruptedBlocks)
       throws IOException {
-    resetCurStripeBuffer();
+    resetCurStripeBuffer(true);
 
     // compute stripe range based on pos
     final long offsetInBlockGroup = getOffsetInBlockGroup();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/34e8b9f9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
index cdebee0..422746e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.io.ElasticByteBufferPool;
 import org.apache.hadoop.io.erasurecode.CodecUtil;
 import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
 import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
@@ -529,4 +530,48 @@ public class TestDFSStripedInputStream {
       }
     }
   }
+
+  @Test
+  public void testCloseDoesNotAllocateNewBuffer() throws Exception {
+    final int numBlocks = 2;
+    DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
+        stripesPerBlock, false, ecPolicy);
+    try (DFSInputStream in = fs.getClient().open(filePath.toString())) {
+      assertTrue(in instanceof DFSStripedInputStream);
+      final DFSStripedInputStream stream = (DFSStripedInputStream) in;
+      final ElasticByteBufferPool ebbp =
+          (ElasticByteBufferPool) stream.getBufferPool();
+      // first clear existing pool
+      LOG.info("Current pool size: direct: " + ebbp.size(true) + ", indirect: "
+          + ebbp.size(false));
+      emptyBufferPoolForCurrentPolicy(ebbp, true);
+      emptyBufferPoolForCurrentPolicy(ebbp, false);
+      final int startSizeDirect = ebbp.size(true);
+      final int startSizeIndirect = ebbp.size(false);
+      // close should not allocate new buffers in the pool.
+      stream.close();
+      assertEquals(startSizeDirect, ebbp.size(true));
+      assertEquals(startSizeIndirect, ebbp.size(false));
+    }
+  }
+
+  /**
+   * Empties the pool for the specified buffer type, for the current ecPolicy.
+   * <p>
+   * Note that {@link #ecPolicy} may change for difference test cases in
+   * {@link TestDFSStripedInputStreamWithRandomECPolicy}.
+   */
+  private void emptyBufferPoolForCurrentPolicy(ElasticByteBufferPool ebbp,
+      boolean direct) {
+    int size;
+    while ((size = ebbp.size(direct)) != 0) {
+      ebbp.getBuffer(direct,
+          ecPolicy.getCellSize() * ecPolicy.getNumDataUnits());
+      if (size == ebbp.size(direct)) {
+        // if getBuffer didn't decrease size, it means the pool for the buffer
+        // corresponding to current ecPolicy is empty
+        break;
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org