You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2018/05/23 17:13:00 UTC
[12/18] hadoop git commit: HDFS-13540. DFSStripedInputStream should
only allocate new buffers when reading. Contributed by Xiao Chen.
HDFS-13540. DFSStripedInputStream should only allocate new buffers when reading. Contributed by Xiao Chen.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/34e8b9f9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/34e8b9f9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/34e8b9f9
Branch: refs/heads/HDDS-48
Commit: 34e8b9f9a86fb03156861482643fba11bdee1dd4
Parents: fed2bef
Author: Sammi Chen <sa...@intel.com>
Authored: Wed May 23 19:10:09 2018 +0800
Committer: Sammi Chen <sa...@intel.com>
Committed: Wed May 23 19:10:09 2018 +0800
----------------------------------------------------------------------
.../apache/hadoop/io/ElasticByteBufferPool.java | 12 ++++++
.../hadoop/hdfs/DFSStripedInputStream.java | 12 +++---
.../hadoop/hdfs/TestDFSStripedInputStream.java | 45 ++++++++++++++++++++
3 files changed, 64 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34e8b9f9/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java
index 023f37f..9dd7771 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java
@@ -116,4 +116,16 @@ public final class ElasticByteBufferPool implements ByteBufferPool {
// poor granularity.
}
}
+
+ /**
+ * Get the size of the buffer pool, for the specified buffer type.
+ *
+ * @param direct Whether the size is returned for direct buffers
+ * @return The size
+ */
+ @InterfaceAudience.Private
+ @InterfaceStability.Unstable
+ public int size(boolean direct) {
+ return getBufferTree(direct).size();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34e8b9f9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index f3b16e0..5557a50 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -116,12 +116,14 @@ public class DFSStripedInputStream extends DFSInputStream {
return decoder.preferDirectBuffer();
}
- void resetCurStripeBuffer() {
- if (curStripeBuf == null) {
+ private void resetCurStripeBuffer(boolean shouldAllocateBuf) {
+ if (shouldAllocateBuf && curStripeBuf == null) {
curStripeBuf = BUFFER_POOL.getBuffer(useDirectBuffer(),
cellSize * dataBlkNum);
}
- curStripeBuf.clear();
+ if (curStripeBuf != null) {
+ curStripeBuf.clear();
+ }
curStripeRange = new StripeRange(0, 0);
}
@@ -206,7 +208,7 @@ public class DFSStripedInputStream extends DFSInputStream {
*/
@Override
protected void closeCurrentBlockReaders() {
- resetCurStripeBuffer();
+ resetCurStripeBuffer(false);
if (blockReaders == null || blockReaders.length == 0) {
return;
}
@@ -296,7 +298,7 @@ public class DFSStripedInputStream extends DFSInputStream {
*/
private void readOneStripe(CorruptedBlocks corruptedBlocks)
throws IOException {
- resetCurStripeBuffer();
+ resetCurStripeBuffer(true);
// compute stripe range based on pos
final long offsetInBlockGroup = getOffsetInBlockGroup();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34e8b9f9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
index cdebee0..422746e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
@@ -529,4 +530,48 @@ public class TestDFSStripedInputStream {
}
}
}
+
+ @Test
+ public void testCloseDoesNotAllocateNewBuffer() throws Exception {
+ final int numBlocks = 2;
+ DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
+ stripesPerBlock, false, ecPolicy);
+ try (DFSInputStream in = fs.getClient().open(filePath.toString())) {
+ assertTrue(in instanceof DFSStripedInputStream);
+ final DFSStripedInputStream stream = (DFSStripedInputStream) in;
+ final ElasticByteBufferPool ebbp =
+ (ElasticByteBufferPool) stream.getBufferPool();
+ // first clear existing pool
+ LOG.info("Current pool size: direct: " + ebbp.size(true) + ", indirect: "
+ + ebbp.size(false));
+ emptyBufferPoolForCurrentPolicy(ebbp, true);
+ emptyBufferPoolForCurrentPolicy(ebbp, false);
+ final int startSizeDirect = ebbp.size(true);
+ final int startSizeIndirect = ebbp.size(false);
+ // close should not allocate new buffers in the pool.
+ stream.close();
+ assertEquals(startSizeDirect, ebbp.size(true));
+ assertEquals(startSizeIndirect, ebbp.size(false));
+ }
+ }
+
+ /**
+ * Empties the pool for the specified buffer type, for the current ecPolicy.
+ * <p>
+ * Note that {@link #ecPolicy} may change for difference test cases in
+ * {@link TestDFSStripedInputStreamWithRandomECPolicy}.
+ */
+ private void emptyBufferPoolForCurrentPolicy(ElasticByteBufferPool ebbp,
+ boolean direct) {
+ int size;
+ while ((size = ebbp.size(direct)) != 0) {
+ ebbp.getBuffer(direct,
+ ecPolicy.getCellSize() * ecPolicy.getNumDataUnits());
+ if (size == ebbp.size(direct)) {
+ // if getBuffer didn't decrease size, it means the pool for the buffer
+ // corresponding to current ecPolicy is empty
+ break;
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org