You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/10/04 21:58:25 UTC
hive git commit: HIVE-17613 : remove object pools for short,
same-thread allocations (Sergey Shelukhin, reviewed by Gopal Vijayaraghavan)
Repository: hive
Updated Branches:
refs/heads/master 31077be9b -> 4df092674
HIVE-17613 : remove object pools for short, same-thread allocations (Sergey Shelukhin, reviewed by Gopal Vijayaraghavan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4df09267
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4df09267
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4df09267
Branch: refs/heads/master
Commit: 4df09267441ef6e65108f69f9ac6b3ba18768ab2
Parents: 31077be
Author: sergey <se...@apache.org>
Authored: Wed Oct 4 14:56:33 2017 -0700
Committer: sergey <se...@apache.org>
Committed: Wed Oct 4 14:57:46 2017 -0700
----------------------------------------------------------------------
.../llap/io/encoded/SerDeEncodedDataReader.java | 15 +----
.../hive/llap/cache/TestLowLevelCacheImpl.java | 4 +-
.../hadoop/hive/llap/LlapCacheAwareFs.java | 4 +-
.../hive/ql/io/orc/encoded/CacheChunk.java | 12 +---
.../ql/io/orc/encoded/EncodedReaderImpl.java | 69 ++++----------------
5 files changed, 16 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/4df09267/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
index 943ac6e..599b519 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
@@ -129,23 +129,10 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
t.reset();
}
});
- public static final FixedSizedObjectPool<CacheChunk> TCC_POOL =
- new FixedSizedObjectPool<>(1024, new PoolObjectHelper<CacheChunk>() {
- @Override
- public CacheChunk create() {
- return new CacheChunk();
- }
- @Override
- public void resetBeforeOffer(CacheChunk t) {
- t.reset();
- }
- });
private final static DiskRangeListFactory CC_FACTORY = new DiskRangeListFactory() {
@Override
public DiskRangeList createCacheChunk(MemoryBuffer buffer, long offset, long end) {
- CacheChunk tcc = TCC_POOL.take();
- tcc.init(buffer, offset, end);
- return tcc;
+ return new CacheChunk(buffer, offset, end);
}
};
http://git-wip-us.apache.org/repos/asf/hive/blob/4df09267/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
index ab10285..3320351 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
@@ -48,9 +48,7 @@ public class TestLowLevelCacheImpl {
private static final DiskRangeListFactory testFactory = new DiskRangeListFactory() {
public DiskRangeList createCacheChunk(MemoryBuffer buffer, long offset, long end) {
- CacheChunk cc = new CacheChunk();
- cc.init(buffer, offset, end);
- return cc;
+ return new CacheChunk(buffer, offset, end);
}
};
http://git-wip-us.apache.org/repos/asf/hive/blob/4df09267/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
index 5c1eed3..626aeb9 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
@@ -204,9 +204,7 @@ public class LlapCacheAwareFs extends FileSystem {
@Override
public DiskRangeList createCacheChunk(
MemoryBuffer buffer, long startOffset, long endOffset) {
- CacheChunk result = new CacheChunk(); // TODO: pool?
- result.init(buffer, startOffset, endOffset);
- return result;
+ return new CacheChunk(buffer, startOffset, endOffset);
}
}, gotAllData);
if (LOG.isInfoEnabled()) {
http://git-wip-us.apache.org/repos/asf/hive/blob/4df09267/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java
index 16fdbf7..4eedca1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java
@@ -35,15 +35,11 @@ import com.google.common.annotations.VisibleForTesting;
public class CacheChunk extends DiskRangeList {
protected MemoryBuffer buffer;
- public CacheChunk() {
- super(-1, -1);
- }
-
- public void init(MemoryBuffer buffer, long offset, long end) {
+ public CacheChunk(MemoryBuffer buffer, long offset, long end) {
+ super(offset, end);
this.buffer = buffer;
this.offset = offset;
this.end = end;
- this.next = this.prev = null; // Just in case.
}
@Override
@@ -81,10 +77,6 @@ public class CacheChunk extends DiskRangeList {
throw new UnsupportedOperationException();
}
- public void reset() {
- init(null, -1, -1);
- }
-
public void adjustEnd(long l) {
this.end += l;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/4df09267/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 31d5dd3..80b7be8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -112,19 +112,15 @@ class EncodedReaderImpl implements EncodedReader {
private static final Object POOLS_CREATION_LOCK = new Object();
private static Pools POOLS;
private static class Pools {
- Pool<CacheChunk> tccPool;
- Pool<ProcCacheChunk> pccPool;
Pool<OrcEncodedColumnBatch> ecbPool;
Pool<ColumnStreamData> csdPool;
}
private final static DiskRangeListFactory CC_FACTORY = new DiskRangeListFactory() {
- @Override
- public DiskRangeList createCacheChunk(MemoryBuffer buffer, long offset, long end) {
- CacheChunk tcc = POOLS.tccPool.take();
- tcc.init(buffer, offset, end);
- return tcc;
- }
- };
+ @Override
+ public DiskRangeList createCacheChunk(MemoryBuffer buffer, long offset, long end) {
+ return new CacheChunk(buffer, offset, end);
+ }
+ };
private final Object fileKey;
private final DataReader dataReader;
private boolean isDataReaderOpen = false;
@@ -517,7 +513,6 @@ class EncodedReaderImpl implements EncodedReader {
LOG.error("Error during the cleanup after another error; ignoring", t);
}
}
- releaseCacheChunksIntoObjectPool(toRead.next);
}
private static int countMaxStreams(Area area) {
@@ -677,8 +672,7 @@ class EncodedReaderImpl implements EncodedReader {
private int count;
public UncompressedCacheChunk(BufferChunk bc) {
- super();
- init(null, bc.getOffset(), bc.getEnd());
+ super(null, bc.getOffset(), bc.getEnd());
chunk = bc;
count = 1;
}
@@ -720,21 +714,15 @@ class EncodedReaderImpl implements EncodedReader {
* the DiskRange list created for the request, and everyone treats it like regular CacheChunk.
*/
private static class ProcCacheChunk extends CacheChunk {
- public void init(long cbStartOffset, long cbEndOffset, boolean isCompressed,
+ public ProcCacheChunk(long cbStartOffset, long cbEndOffset, boolean isCompressed,
ByteBuffer originalData, MemoryBuffer targetBuffer, int originalCbIndex) {
- super.init(targetBuffer, cbStartOffset, cbEndOffset);
+ super(targetBuffer, cbStartOffset, cbEndOffset);
this.isOriginalDataCompressed = isCompressed;
this.originalData = originalData;
this.originalCbIndex = originalCbIndex;
}
@Override
- public void reset() {
- super.reset();
- this.originalData = null;
- }
-
- @Override
public String toString() {
return super.toString() + ", original is set " + (this.originalData != null)
+ ", buffer was replaced " + (originalCbIndex == -1);
@@ -1177,8 +1165,7 @@ class EncodedReaderImpl implements EncodedReader {
MemoryBuffer buffer = singleAlloc[0];
cacheWrapper.reuseBuffer(buffer);
ByteBuffer dest = buffer.getByteBufferRaw();
- CacheChunk tcc = POOLS.tccPool.take();
- tcc.init(buffer, partOffset, candidateEnd);
+ CacheChunk tcc = new CacheChunk(buffer, partOffset, candidateEnd);
copyAndReplaceUncompressedChunks(candidateCached, dest, tcc, false);
return tcc;
}
@@ -1192,8 +1179,7 @@ class EncodedReaderImpl implements EncodedReader {
MemoryBuffer buffer = singleAlloc[0];
cacheWrapper.reuseBuffer(buffer);
ByteBuffer dest = buffer.getByteBufferRaw();
- CacheChunk tcc = POOLS.tccPool.take();
- tcc.init(buffer, bc.getOffset(), bc.getEnd());
+ CacheChunk tcc = new CacheChunk(buffer, bc.getOffset(), bc.getEnd());
copyUncompressedChunk(bc.getChunk(), dest);
bc.replaceSelfWith(tcc);
return tcc;
@@ -1238,17 +1224,6 @@ class EncodedReaderImpl implements EncodedReader {
}
}
- public static void releaseCacheChunksIntoObjectPool(DiskRangeList current) {
- while (current != null) {
- if (current instanceof ProcCacheChunk) {
- POOLS.pccPool.offer((ProcCacheChunk)current);
- } else if (current instanceof CacheChunk) {
- POOLS.tccPool.offer((CacheChunk)current);
- }
- current = current.next;
- }
- }
-
private void ponderReleaseInitialRefcount(
long unlockUntilCOffset, long streamStartOffset, CacheChunk cc) {
// Don't release if the buffer contains any data beyond the acceptable boundary.
@@ -1655,8 +1630,7 @@ class EncodedReaderImpl implements EncodedReader {
// Add it to result in order we are processing.
cacheBuffers.add(futureAlloc);
// Add it to the list of work to decompress.
- ProcCacheChunk cc = POOLS.pccPool.take();
- cc.init(cbStartOffset, cbEndOffset, !isUncompressed,
+ ProcCacheChunk cc = new ProcCacheChunk(cbStartOffset, cbEndOffset, !isUncompressed,
fullCompressionBlock, futureAlloc, cacheBuffers.size() - 1);
toDecompress.add(cc);
// Adjust the compression block position.
@@ -1690,26 +1664,6 @@ class EncodedReaderImpl implements EncodedReader {
private static Pools createPools(PoolFactory pf) {
Pools pools = new Pools();
- pools.pccPool = pf.createPool(1024, new PoolObjectHelper<ProcCacheChunk>() {
- @Override
- public ProcCacheChunk create() {
- return new ProcCacheChunk();
- }
- @Override
- public void resetBeforeOffer(ProcCacheChunk t) {
- t.reset();
- }
- });
- pools.tccPool = pf.createPool(1024, new PoolObjectHelper<CacheChunk>() {
- @Override
- public CacheChunk create() {
- return new CacheChunk();
- }
- @Override
- public void resetBeforeOffer(CacheChunk t) {
- t.reset();
- }
- });
pools.ecbPool = pf.createEncodedColumnBatchPool();
pools.csdPool = pf.createColumnStreamDataPool();
return pools;
@@ -1937,7 +1891,6 @@ class EncodedReaderImpl implements EncodedReader {
LOG.error("Error during the cleanup after another error; ignoring", t);
}
}
- releaseCacheChunksIntoObjectPool(toRead.next);
}