You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/11/02 22:29:15 UTC
hive git commit: HIVE-12171 : LLAP: BuddyAllocator failures when
querying uncompressed data (Sergey Shelukhin, reviewed by Gopal V)
Repository: hive
Updated Branches:
refs/heads/master 71da33a6a -> 2cf051687
HIVE-12171 : LLAP: BuddyAllocator failures when querying uncompressed data (Sergey Shelukhin, reviewed by Gopal V)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2cf05168
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2cf05168
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2cf05168
Branch: refs/heads/master
Commit: 2cf05168711d081bb2c5cb2ec7ba7cca66260dd1
Parents: 71da33a
Author: Sergey Shelukhin <se...@apache.org>
Authored: Mon Nov 2 13:16:34 2015 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Mon Nov 2 13:16:34 2015 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 7 +-
.../hadoop/hive/llap/cache/BuddyAllocator.java | 89 +++++++++------
.../llap/cache/LowLevelCacheMemoryManager.java | 12 ++
.../hadoop/hive/llap/cache/MemoryManager.java | 1 +
.../hive/llap/cache/TestBuddyAllocator.java | 6 +-
.../hive/llap/cache/TestOrcMetadataCache.java | 4 +
.../ql/io/orc/encoded/EncodedReaderImpl.java | 109 ++++++++++---------
7 files changed, 144 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/2cf05168/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 5198bb5..3ab73ad 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2308,9 +2308,10 @@ public class HiveConf extends Configuration {
LLAP_ORC_CACHE_MAX_ALLOC("hive.llap.io.cache.orc.alloc.max", 16 * 1024 * 1024,
"Maximum allocation possible from LLAP low-level cache for ORC. Should be as large as\n" +
"the largest expected ORC compression buffer size. Must be power of 2."),
- LLAP_ORC_CACHE_ARENA_SIZE("hive.llap.io.cache.orc.arena.size", 128 * 1024 * 1024,
- "Arena size for ORC low-level cache; cache will be allocated in arena-sized steps.\n" +
- "Must presently be a power of two."),
+ LLAP_ORC_CACHE_ARENA_COUNT("hive.llap.io.cache.orc.arena.count", 8,
+ "Arena count for LLAP low-level cache; cache will be allocated in the steps of\n" +
+ "(size/arena_count) bytes. This size must be <= 1Gb and >= max allocation; if it is\n" +
+ "not the case, an adjusted size will be used. Using powers of 2 is recommended."),
LLAP_ORC_CACHE_MAX_SIZE("hive.llap.io.cache.orc.size", 1024L * 1024 * 1024,
"Maximum size for ORC low-level cache; must be a multiple of arena size."),
LLAP_ORC_CACHE_ALLOCATE_DIRECT("hive.llap.io.cache.direct", true,
http://git-wip-us.apache.org/repos/asf/hive/blob/2cf05168/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
index 2aca68d..485a145 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
@@ -40,33 +40,43 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
private final long maxSize;
private final boolean isDirect;
private final LlapDaemonCacheMetrics metrics;
-
+
+ // We don't know the acceptable size for Java array, so we'll use 1Gb boundary.
+ // That is guaranteed to fit any maximum allocation.
+ private static final int MAX_ARENA_SIZE = 1024*1024*1024;
public BuddyAllocator(Configuration conf, MemoryManager memoryManager,
LlapDaemonCacheMetrics metrics) {
isDirect = HiveConf.getBoolVar(conf, ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT);
minAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MIN_ALLOC);
maxAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_ALLOC);
- arenaSize = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_ARENA_SIZE);
+ int arenaCount = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_ARENA_COUNT);
long maxSizeVal = HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_SIZE);
- if (LlapIoImpl.LOGL.isInfoEnabled()) {
+ int arenaSizeVal = (arenaCount == 0) ? MAX_ARENA_SIZE : (int)(maxSizeVal / arenaCount);
+ arenaSizeVal = Math.max(maxAllocation, Math.min(arenaSizeVal, MAX_ARENA_SIZE));
+ if (LlapIoImpl.LOG.isInfoEnabled()) {
LlapIoImpl.LOG.info("Buddy allocator with " + (isDirect ? "direct" : "byte")
+ " buffers; allocation sizes " + minAllocation + " - " + maxAllocation
- + ", arena size " + arenaSize + ". total size " + maxSizeVal);
+ + ", arena size " + arenaSizeVal + ". total size " + maxSizeVal);
}
if (minAllocation < 8) {
throw new AssertionError("Min allocation must be at least 8: " + minAllocation);
}
- if (maxSizeVal < arenaSize || arenaSize < maxAllocation || maxAllocation < minAllocation) {
+ if (maxSizeVal < arenaSizeVal || maxAllocation < minAllocation) {
throw new AssertionError("Inconsistent sizes of cache, arena and allocations: "
- + minAllocation + ", " + maxAllocation + ", " + arenaSize + ", " + maxSizeVal);
+ + minAllocation + ", " + maxAllocation + ", " + arenaSizeVal + ", " + maxSizeVal);
+ }
+ if ((Integer.bitCount(minAllocation) != 1) || (Integer.bitCount(maxAllocation) != 1)) {
+ throw new AssertionError("Allocation sizes must be powers of two: "
+ + minAllocation + ", " + maxAllocation);
}
- if ((Integer.bitCount(minAllocation) != 1) || (Integer.bitCount(maxAllocation) != 1)
- || (Long.bitCount(arenaSize) != 1)) {
- // Technically, arena size only needs to be divisible by maxAlloc
- throw new AssertionError("Allocation and arena sizes must be powers of two: "
- + minAllocation + ", " + maxAllocation + ", " + arenaSize);
+ if ((arenaSizeVal % maxAllocation) > 0) {
+ long oldArenaSize = arenaSizeVal;
+ arenaSizeVal = (arenaSizeVal / maxAllocation) * maxAllocation;
+ LlapIoImpl.LOG.warn("Rounding arena size to " + arenaSizeVal + " from " + oldArenaSize
+ + " to be divisible by allocation size " + maxAllocation);
}
+ arenaSize = arenaSizeVal;
if ((maxSizeVal % arenaSize) > 0) {
long oldMaxSize = maxSizeVal;
maxSizeVal = (maxSizeVal / arenaSize) * arenaSize;
@@ -111,7 +121,7 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
// TODO: reserving the entire thing is not ideal before we alloc anything. Interleave?
memoryManager.reserveMemory(dest.length << allocLog2, true);
- int ix = 0;
+ int destAllocIx = 0;
for (int i = 0; i < dest.length; ++i) {
if (dest[i] != null) continue;
dest[i] = createUnallocated(); // TODO: pool of objects?
@@ -123,22 +133,29 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
}
long threadId = arenaCount > 1 ? Thread.currentThread().getId() : 0;
{
- int startIndex = (int)(threadId % arenaCount), index = startIndex;
+ int startArenaIx = (int)(threadId % arenaCount), index = startArenaIx;
do {
- int newIx = arenas[index].allocateFast(index, freeListIx, dest, ix, allocationSize);
- if (newIx == dest.length) return;
- if (newIx != -1) { // TODO: check if it can still happen; count should take care of this.
- ix = newIx;
- }
- ix = newIx;
+ int newDestIx = arenas[index].allocateFast(
+ index, freeListIx, dest, destAllocIx, allocationSize);
+ if (newDestIx == dest.length) return;
+ assert newDestIx != -1;
+ destAllocIx = newDestIx;
if ((++index) == arenaCount) {
index = 0;
}
- } while (index != startIndex);
+ } while (index != startArenaIx);
}
- // TODO: this is very hacky.
- // We called reserveMemory so we know that somewhere in there, there's memory waiting for us.
+ // 1) We can get fragmented on large blocks of uncompressed data. The memory might be
+ // in there, but it might be in separate small blocks. This is a complicated problem, and
+ // several solutions (in order of decreasing ugliness and increasing complexity) are: just
+ // ask to evict the exact-sized block (there may be no such block), evict from a particular
+ // arena (policy would know allocator internals somewhat), store buffer mapping and ask to
+ // evict from specific choice of blocks next to each other or next to already-evicted block,
+ // and finally do a compaction (requires a block mapping and complex sync). For now we'd just
+ // force-evict some memory and avoid both complexity and ugliness, since large blocks are rare.
+ // 2) Fragmentation aside (TODO: and this is a very hacky solution for that),
+ // we called reserveMemory so we know that there's memory waiting for us somewhere.
// However, we have a class of rare race conditions related to the order of locking/checking of
// different allocation areas. Simple case - say we have 2 arenas, 256Kb available in arena 2.
// We look at arena 1; someone deallocs 256Kb from arena 1 and allocs the same from arena 2;
@@ -155,22 +172,32 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
// But for now we will just retry 5 times 0_o
for (int attempt = 0; attempt < 5; ++attempt) {
// Try to split bigger blocks. TODO: again, ideally we would tryLock at least once
- for (int i = 0; i < arenaCount; ++i) {
- int newIx = arenas[i].allocateWithSplit(i, freeListIx, dest, ix, allocationSize);
- if (newIx == -1) break; // Shouldn't happen.
- if (newIx == dest.length) return;
- ix = newIx;
+ {
+ int startArenaIx = (int)((threadId + attempt) % arenaCount), arenaIx = startArenaIx;
+ do {
+ int newDestIx = arenas[arenaIx].allocateWithSplit(
+ arenaIx, freeListIx, dest, destAllocIx, allocationSize);
+ if (newDestIx == dest.length) return;
+ assert newDestIx != -1;
+ destAllocIx = newDestIx;
+ if ((++arenaIx) == arenaCount) {
+ arenaIx = 0;
+ }
+ } while (arenaIx != startArenaIx);
}
+
if (attempt == 0) {
// Try to allocate memory if we haven't allocated all the way to maxSize yet; very rare.
- for (int i = arenaCount; i < arenas.length; ++i) {
- ix = arenas[i].allocateWithExpand(i, freeListIx, dest, ix, allocationSize);
- if (ix == dest.length) return;
+ for (int arenaIx = arenaCount; arenaIx < arenas.length; ++arenaIx) {
+ destAllocIx = arenas[arenaIx].allocateWithExpand(
+ arenaIx, freeListIx, dest, destAllocIx, allocationSize);
+ if (destAllocIx == dest.length) return;
}
}
+ memoryManager.forceReservedMemory(allocationSize * (dest.length - destAllocIx));
LlapIoImpl.LOG.warn("Failed to allocate despite reserved memory; will retry " + attempt);
}
- String msg = "Failed to allocate " + size + "; at " + ix + " out of " + dest.length;
+ String msg = "Failed to allocate " + size + "; at " + destAllocIx + " out of " + dest.length;
LlapIoImpl.LOG.error(msg + "\nALLOCATOR STATE:\n" + debugDump()
+ "\nPARENT STATE:\n" + memoryManager.debugDumpForOom());
throw new AllocatorOutOfMemoryException(msg);
http://git-wip-us.apache.org/repos/asf/hive/blob/2cf05168/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
index 4a256ee..d584ca8 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
@@ -71,6 +71,8 @@ public class LowLevelCacheMemoryManager implements MemoryManager {
try {
Thread.sleep(Math.min(1000, nextLog));
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return false;
}
}
continue;
@@ -90,6 +92,16 @@ public class LowLevelCacheMemoryManager implements MemoryManager {
return true;
}
+
+ @Override
+ public void forceReservedMemory(int memoryToEvict) {
+ while (memoryToEvict > 0) {
+ long evicted = evictor.evictSomeBlocks(memoryToEvict);
+ if (evicted == 0) return;
+ memoryToEvict -= evicted;
+ }
+ }
+
@Override
public void releaseMemory(long memoryToRelease) {
long oldV;
http://git-wip-us.apache.org/repos/asf/hive/blob/2cf05168/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
index e1b0cb4..6cc262e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
@@ -22,4 +22,5 @@ public interface MemoryManager extends LlapOomDebugDump {
boolean reserveMemory(long memoryToReserve, boolean waitForEviction);
void releaseMemory(long memUsage);
void updateMaxSize(long maxSize);
+ void forceReservedMemory(int memoryToEvict);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2cf05168/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
index 6d21997..6375996 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
@@ -58,6 +58,10 @@ public class TestBuddyAllocator {
@Override
public void updateMaxSize(long maxSize) {
}
+
+ @Override
+ public void forceReservedMemory(int memoryToEvict) {
+ }
}
@Test
@@ -280,7 +284,7 @@ public class TestBuddyAllocator {
Configuration conf = new Configuration();
conf.setInt(ConfVars.LLAP_ORC_CACHE_MIN_ALLOC.varname, min);
conf.setInt(ConfVars.LLAP_ORC_CACHE_MAX_ALLOC.varname, max);
- conf.setInt(ConfVars.LLAP_ORC_CACHE_ARENA_SIZE.varname, arena);
+ conf.setInt(ConfVars.LLAP_ORC_CACHE_ARENA_COUNT.varname, total/arena);
conf.setLong(ConfVars.LLAP_ORC_CACHE_MAX_SIZE.varname, total);
return conf;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2cf05168/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
index b886d77..901e58a 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
@@ -78,6 +78,10 @@ public class TestOrcMetadataCache {
@Override
public void updateMaxSize(long maxSize) {
}
+
+ @Override
+ public void forceReservedMemory(int memoryToEvict) {
+ }
}
@Test
http://git-wip-us.apache.org/repos/asf/hive/blob/2cf05168/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index e0c0743..f789a4f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamD
import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
import org.apache.hadoop.hive.ql.io.orc.DataReader;
+import org.apache.hadoop.hive.ql.io.orc.OrcConf;
import org.apache.hadoop.hive.ql.io.orc.OrcProto;
import org.apache.hadoop.hive.ql.io.orc.OutStream;
import org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils;
@@ -751,7 +752,7 @@ class EncodedReaderImpl implements EncodedReader {
/**
* To achieve some sort of consistent cache boundaries, we will cache streams deterministically;
- * in segments starting w/stream start, and going for either stream size or maximum allocation.
+ * in segments starting w/stream start, and going for either stream size or some fixed size.
* If we are not reading the entire segment's worth of data, then we will not cache the partial
* RGs; the breakage of cache assumptions (no interleaving blocks, etc.) is way too much PITA
* to handle just for this case.
@@ -777,87 +778,87 @@ class EncodedReaderImpl implements EncodedReader {
}
// Account for maximum cache buffer size.
long streamLen = streamEnd - streamOffset;
- int partSize = cache.getAllocator().getMaxAllocation(),
- partCount = (int)((streamLen / partSize) + (((streamLen % partSize) != 0) ? 1 : 0));
- long partOffset = streamOffset, partEnd = Math.min(partOffset + partSize, streamEnd);
+ int partSize = determineUncompressedPartSize(), //
+ partCount = (int)(streamLen / partSize) + (((streamLen % partSize) != 0) ? 1 : 0);
CacheChunk lastUncompressed = null;
MemoryBuffer[] singleAlloc = new MemoryBuffer[1];
+ /*
+Starting pre-read for [12187411,17107411) at start: 12187411 end: 12449555 cache buffer: 0x5f64a8f6(2)
+Processing uncompressed file data at [12187411, 12449555)
+ */
for (int i = 0; i < partCount; ++i) {
- long hasEntirePartTo = -1;
- if (partOffset == current.getOffset()) {
- hasEntirePartTo = partOffset;
+ long partOffset = streamOffset + (i * partSize),
+ partEnd = Math.min(partOffset + partSize, streamEnd);
+ long hasEntirePartTo = partOffset; // We have 0 bytes of data for this part, for now.
+ assert partOffset <= current.getOffset();
+ if (partOffset == current.getOffset() && current instanceof CacheChunk) {
// We assume cache chunks would always match the way we read, so check and skip it.
- if (current instanceof CacheChunk) {
- lastUncompressed = (CacheChunk)current;
- assert current.getOffset() == partOffset && current.getEnd() == partEnd;
- partOffset = partEnd;
- partEnd = Math.min(partOffset + partSize, streamEnd);
- continue;
- }
+ assert current.getOffset() == partOffset && current.getEnd() == partEnd;
+ lastUncompressed = (CacheChunk)current;
+ current = current.next;
+ continue;
}
if (current.getOffset() >= partEnd) {
- // We have no data at all for this part of the stream (could be unneeded), skip.
- partOffset = partEnd;
- partEnd = Math.min(partOffset + partSize, streamEnd);
- continue;
+ continue; // We have no data at all for this part of the stream (could be unneeded), skip.
}
if (toRelease == null && dataReader.isTrackingDiskRanges()) {
toRelease = new ArrayList<ByteBuffer>();
}
// We have some disk buffers... see if we have entire part, etc.
- UncompressedCacheChunk candidateCached = null;
+ UncompressedCacheChunk candidateCached = null; // We will cache if we have the entire part.
DiskRangeList next = current;
while (true) {
- if (next == null || next.getOffset() >= partEnd) {
- if (hasEntirePartTo < partEnd && candidateCached != null) {
- // We are missing a section at the end of the part...
- lastUncompressed = copyAndReplaceCandidateToNonCached(
- candidateCached, partOffset, hasEntirePartTo, cache, singleAlloc);
- candidateCached = null;
- }
- break;
+ boolean noMoreDataForPart = (next == null || next.getOffset() >= partEnd);
+ if (noMoreDataForPart && hasEntirePartTo < partEnd && candidateCached != null) {
+ // We are missing a section at the end of the part... copy the start to non-cached.
+ lastUncompressed = copyAndReplaceCandidateToNonCached(
+ candidateCached, partOffset, hasEntirePartTo, cache, singleAlloc);
+ candidateCached = null;
}
current = next;
- boolean wasSplit = (current.getEnd() > partEnd);
- if (wasSplit) {
+ if (noMoreDataForPart) break; // Done with this part.
+
+ boolean wasSplit = false;
+ if (current.getEnd() > partEnd) {
+ // If the current buffer contains multiple parts, split it.
current = current.split(partEnd);
+ wasSplit = true;
}
if (isDebugTracingEnabled) {
LOG.info("Processing uncompressed file data at ["
+ current.getOffset() + ", " + current.getEnd() + ")");
}
- BufferChunk bc = (BufferChunk)current;
+ BufferChunk curBc = (BufferChunk)current;
if (!wasSplit && toRelease != null) {
- toRelease.add(bc.getChunk()); // TODO: is it valid to give zcr the modified 2nd part?
+ toRelease.add(curBc.getChunk()); // TODO: is it valid to give zcr the modified 2nd part?
}
// Track if we still have the entire part.
long hadEntirePartTo = hasEntirePartTo;
- if (hasEntirePartTo != -1) {
- hasEntirePartTo = (hasEntirePartTo == current.getOffset()) ? current.getEnd() : -1;
- }
- if (candidateCached != null && hasEntirePartTo == -1) {
- lastUncompressed = copyAndReplaceCandidateToNonCached(
- candidateCached, partOffset, hadEntirePartTo, cache, singleAlloc);
- candidateCached = null;
- }
-
- if (hasEntirePartTo != -1) {
+ // We have data until the end of current block if we had it until the beginning.
+ hasEntirePartTo = (hasEntirePartTo == current.getOffset()) ? current.getEnd() : -1;
+ if (hasEntirePartTo == -1) {
+ // We don't have the entire part; copy both whatever we intended to cache, and the rest,
+ // to an allocated buffer. We could try to optimize a bit if we have contiguous buffers
+ // with gaps, but it's probably not needed.
+ if (candidateCached != null) {
+ assert hadEntirePartTo != -1;
+ copyAndReplaceCandidateToNonCached(
+ candidateCached, partOffset, hadEntirePartTo, cache, singleAlloc);
+ candidateCached = null;
+ }
+ lastUncompressed = copyAndReplaceUncompressedToNonCached(curBc, cache, singleAlloc);
+ next = lastUncompressed.next; // There may be more data after the gap.
+ } else {
// So far we have all the data from the beginning of the part.
if (candidateCached == null) {
- candidateCached = new UncompressedCacheChunk(bc);
+ candidateCached = new UncompressedCacheChunk(curBc);
} else {
- candidateCached.addChunk(bc);
+ candidateCached.addChunk(curBc);
}
- // We will take care of this at the end of the part, or if we find a gap.
next = current.next;
- continue;
}
- // We don't have the entire part; just copy to an allocated buffer. We could try to
- // optimize a bit if we have contiguous buffers with gaps, but it's probably not needed.
- lastUncompressed = copyAndReplaceUncompressedToNonCached(bc, cache, singleAlloc);
- next = lastUncompressed.next;
}
if (candidateCached != null) {
if (toCache == null) {
@@ -908,6 +909,16 @@ class EncodedReaderImpl implements EncodedReader {
return lastUncompressed;
}
+
+ private int determineUncompressedPartSize() {
+ // We will break the uncompressed data in the cache in the chunks that are the size
+ // of the prevalent ORC compression buffer (the default), or maximum allocation (since we
+ // cannot allocate bigger chunks), whichever is less.
+ long orcCbSizeDefault = ((Number)OrcConf.BUFFER_SIZE.getDefaultValue()).longValue();
+ int maxAllocSize = cache.getAllocator().getMaxAllocation();
+ return (int)Math.min(maxAllocSize, orcCbSizeDefault);
+ }
+
private static void copyUncompressedChunk(ByteBuffer src, ByteBuffer dest) {
int startPos = dest.position(), startLim = dest.limit();
dest.put(src); // Copy uncompressed data to cache.