You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/11/02 22:29:15 UTC

hive git commit: HIVE-12171 : LLAP: BuddyAllocator failures when querying uncompressed data (Sergey Shelukhin, reviewed by Gopal V)

Repository: hive
Updated Branches:
  refs/heads/master 71da33a6a -> 2cf051687


HIVE-12171 : LLAP: BuddyAllocator failures when querying uncompressed data (Sergey Shelukhin, reviewed by Gopal V)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2cf05168
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2cf05168
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2cf05168

Branch: refs/heads/master
Commit: 2cf05168711d081bb2c5cb2ec7ba7cca66260dd1
Parents: 71da33a
Author: Sergey Shelukhin <se...@apache.org>
Authored: Mon Nov 2 13:16:34 2015 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Mon Nov 2 13:16:34 2015 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   7 +-
 .../hadoop/hive/llap/cache/BuddyAllocator.java  |  89 +++++++++------
 .../llap/cache/LowLevelCacheMemoryManager.java  |  12 ++
 .../hadoop/hive/llap/cache/MemoryManager.java   |   1 +
 .../hive/llap/cache/TestBuddyAllocator.java     |   6 +-
 .../hive/llap/cache/TestOrcMetadataCache.java   |   4 +
 .../ql/io/orc/encoded/EncodedReaderImpl.java    | 109 ++++++++++---------
 7 files changed, 144 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/2cf05168/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 5198bb5..3ab73ad 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2308,9 +2308,10 @@ public class HiveConf extends Configuration {
     LLAP_ORC_CACHE_MAX_ALLOC("hive.llap.io.cache.orc.alloc.max", 16 * 1024 * 1024,
         "Maximum allocation possible from LLAP low-level cache for ORC. Should be as large as\n" +
         "the largest expected ORC compression buffer size. Must be power of 2."),
-    LLAP_ORC_CACHE_ARENA_SIZE("hive.llap.io.cache.orc.arena.size", 128 * 1024 * 1024,
-        "Arena size for ORC low-level cache; cache will be allocated in arena-sized steps.\n" +
-        "Must presently be a power of two."),
+    LLAP_ORC_CACHE_ARENA_COUNT("hive.llap.io.cache.orc.arena.count", 8,
+        "Arena count for LLAP low-level cache; cache will be allocated in the steps of\n" +
+        "(size/arena_count) bytes. This size must be <= 1Gb and >= max allocation; if it is\n" +
+        "not the case, an adjusted size will be used. Using powers of 2 is recommended."),
     LLAP_ORC_CACHE_MAX_SIZE("hive.llap.io.cache.orc.size", 1024L * 1024 * 1024,
         "Maximum size for ORC low-level cache; must be a multiple of arena size."),
     LLAP_ORC_CACHE_ALLOCATE_DIRECT("hive.llap.io.cache.direct", true,

http://git-wip-us.apache.org/repos/asf/hive/blob/2cf05168/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
index 2aca68d..485a145 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
@@ -40,33 +40,43 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
   private final long maxSize;
   private final boolean isDirect;
   private final LlapDaemonCacheMetrics metrics;
-
+  
+  // We don't know the acceptable size for Java array, so we'll use 1Gb boundary.
+  // That is guaranteed to fit any maximum allocation.
+  private static final int MAX_ARENA_SIZE = 1024*1024*1024;
   public BuddyAllocator(Configuration conf, MemoryManager memoryManager,
       LlapDaemonCacheMetrics metrics) {
     isDirect = HiveConf.getBoolVar(conf, ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT);
     minAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MIN_ALLOC);
     maxAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_ALLOC);
-    arenaSize = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_ARENA_SIZE);
+    int arenaCount = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_ARENA_COUNT);
     long maxSizeVal = HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_SIZE);
-    if (LlapIoImpl.LOGL.isInfoEnabled()) {
+    int arenaSizeVal = (arenaCount == 0) ? MAX_ARENA_SIZE : (int)(maxSizeVal / arenaCount);
+    arenaSizeVal = Math.max(maxAllocation, Math.min(arenaSizeVal, MAX_ARENA_SIZE));
+    if (LlapIoImpl.LOG.isInfoEnabled()) {
       LlapIoImpl.LOG.info("Buddy allocator with " + (isDirect ? "direct" : "byte")
           + " buffers; allocation sizes " + minAllocation + " - " + maxAllocation
-          + ", arena size " + arenaSize + ". total size " + maxSizeVal);
+          + ", arena size " + arenaSizeVal + ". total size " + maxSizeVal);
     }
 
     if (minAllocation < 8) {
       throw new AssertionError("Min allocation must be at least 8: " + minAllocation);
     }
-    if (maxSizeVal < arenaSize || arenaSize < maxAllocation || maxAllocation < minAllocation) {
+    if (maxSizeVal < arenaSizeVal || maxAllocation < minAllocation) {
       throw new AssertionError("Inconsistent sizes of cache, arena and allocations: "
-          + minAllocation + ", " + maxAllocation + ", " + arenaSize + ", " + maxSizeVal);
+          + minAllocation + ", " + maxAllocation + ", " + arenaSizeVal + ", " + maxSizeVal);
+    }
+    if ((Integer.bitCount(minAllocation) != 1) || (Integer.bitCount(maxAllocation) != 1)) {
+      throw new AssertionError("Allocation sizes must be powers of two: "
+          + minAllocation + ", " + maxAllocation);
     }
-    if ((Integer.bitCount(minAllocation) != 1) || (Integer.bitCount(maxAllocation) != 1)
-        || (Long.bitCount(arenaSize) != 1)) {
-      // Technically, arena size only needs to be divisible by maxAlloc
-      throw new AssertionError("Allocation and arena sizes must be powers of two: "
-          + minAllocation + ", " + maxAllocation + ", " + arenaSize);
+    if ((arenaSizeVal % maxAllocation) > 0) {
+      long oldArenaSize = arenaSizeVal;
+      arenaSizeVal = (arenaSizeVal / maxAllocation) * maxAllocation;
+      LlapIoImpl.LOG.warn("Rounding arena size to " + arenaSizeVal + " from " + oldArenaSize
+          + " to be divisible by allocation size " + maxAllocation);
     }
+    arenaSize = arenaSizeVal;
     if ((maxSizeVal % arenaSize) > 0) {
       long oldMaxSize = maxSizeVal;
       maxSizeVal = (maxSizeVal / arenaSize) * arenaSize;
@@ -111,7 +121,7 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
     // TODO: reserving the entire thing is not ideal before we alloc anything. Interleave?
     memoryManager.reserveMemory(dest.length << allocLog2, true);
 
-    int ix = 0;
+    int destAllocIx = 0;
     for (int i = 0; i < dest.length; ++i) {
       if (dest[i] != null) continue;
       dest[i] = createUnallocated(); // TODO: pool of objects?
@@ -123,22 +133,29 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
     }
     long threadId = arenaCount > 1 ? Thread.currentThread().getId() : 0;
     {
-      int startIndex = (int)(threadId % arenaCount), index = startIndex;
+      int startArenaIx = (int)(threadId % arenaCount), index = startArenaIx;
       do {
-        int newIx = arenas[index].allocateFast(index, freeListIx, dest, ix, allocationSize);
-        if (newIx == dest.length) return;
-        if (newIx != -1) {  // TODO: check if it can still happen; count should take care of this.
-          ix = newIx;
-        }
-        ix = newIx;
+        int newDestIx = arenas[index].allocateFast(
+            index, freeListIx, dest, destAllocIx, allocationSize);
+        if (newDestIx == dest.length) return;
+        assert newDestIx != -1;
+        destAllocIx = newDestIx;
         if ((++index) == arenaCount) {
           index = 0;
         }
-      } while (index != startIndex);
+      } while (index != startArenaIx);
     }
 
-    // TODO: this is very hacky.
-    // We called reserveMemory so we know that somewhere in there, there's memory waiting for us.
+    // 1) We can get fragmented on large blocks of uncompressed data. The memory might be
+    // in there, but it might be in separate small blocks. This is a complicated problem, and
+    // several solutions (in order of decreasing ugliness and increasing complexity) are: just
+    // ask to evict the exact-sized block (there may be no such block), evict from a particular
+    // arena (policy would know allocator internals somewhat), store buffer mapping and ask to
+    // evict from specific choice of blocks next to each other or next to already-evicted block,
+    // and finally do a compaction (requires a block mapping and complex sync). For now we'd just
+    // force-evict some memory and avoid both complexity and ugliness, since large blocks are rare.
+    // 2) Fragmentation aside (TODO: and this is a very hacky solution for that),
+    // we called reserveMemory so we know that there's memory waiting for us somewhere.
     // However, we have a class of rare race conditions related to the order of locking/checking of
     // different allocation areas. Simple case - say we have 2 arenas, 256Kb available in arena 2.
     // We look at arena 1; someone deallocs 256Kb from arena 1 and allocs the same from arena 2;
@@ -155,22 +172,32 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
     // But for now we will just retry 5 times 0_o
     for (int attempt = 0; attempt < 5; ++attempt) {
       // Try to split bigger blocks. TODO: again, ideally we would tryLock at least once
-      for (int i = 0; i < arenaCount; ++i) {
-        int newIx = arenas[i].allocateWithSplit(i, freeListIx, dest, ix, allocationSize);
-        if (newIx == -1) break; // Shouldn't happen.
-        if (newIx == dest.length) return;
-        ix = newIx;
+      {
+        int startArenaIx = (int)((threadId + attempt) % arenaCount), arenaIx = startArenaIx;
+        do {
+          int newDestIx = arenas[arenaIx].allocateWithSplit(
+              arenaIx, freeListIx, dest, destAllocIx, allocationSize);
+          if (newDestIx == dest.length) return;
+          assert newDestIx != -1;
+          destAllocIx = newDestIx;
+          if ((++arenaIx) == arenaCount) {
+            arenaIx = 0;
+          }
+        } while (arenaIx != startArenaIx);
       }
+
       if (attempt == 0) {
         // Try to allocate memory if we haven't allocated all the way to maxSize yet; very rare.
-        for (int i = arenaCount; i < arenas.length; ++i) {
-          ix = arenas[i].allocateWithExpand(i, freeListIx, dest, ix, allocationSize);
-          if (ix == dest.length) return;
+        for (int arenaIx = arenaCount; arenaIx < arenas.length; ++arenaIx) {
+          destAllocIx = arenas[arenaIx].allocateWithExpand(
+              arenaIx, freeListIx, dest, destAllocIx, allocationSize);
+          if (destAllocIx == dest.length) return;
         }
       }
+      memoryManager.forceReservedMemory(allocationSize * (dest.length - destAllocIx));
       LlapIoImpl.LOG.warn("Failed to allocate despite reserved memory; will retry " + attempt);
     }
-    String msg = "Failed to allocate " + size + "; at " + ix + " out of " + dest.length;
+    String msg = "Failed to allocate " + size + "; at " + destAllocIx + " out of " + dest.length;
     LlapIoImpl.LOG.error(msg + "\nALLOCATOR STATE:\n" + debugDump()
         + "\nPARENT STATE:\n" + memoryManager.debugDumpForOom());
     throw new AllocatorOutOfMemoryException(msg);

http://git-wip-us.apache.org/repos/asf/hive/blob/2cf05168/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
index 4a256ee..d584ca8 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
@@ -71,6 +71,8 @@ public class LowLevelCacheMemoryManager implements MemoryManager {
           try {
             Thread.sleep(Math.min(1000, nextLog));
           } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            return false;
           }
         }
         continue;
@@ -90,6 +92,16 @@ public class LowLevelCacheMemoryManager implements MemoryManager {
     return true;
   }
 
+
+  @Override
+  public void forceReservedMemory(int memoryToEvict) {
+    while (memoryToEvict > 0) {
+      long evicted = evictor.evictSomeBlocks(memoryToEvict);
+      if (evicted == 0) return;
+      memoryToEvict -= evicted;
+    }
+  }
+
   @Override
   public void releaseMemory(long memoryToRelease) {
     long oldV;

http://git-wip-us.apache.org/repos/asf/hive/blob/2cf05168/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
index e1b0cb4..6cc262e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
@@ -22,4 +22,5 @@ public interface MemoryManager extends LlapOomDebugDump {
   boolean reserveMemory(long memoryToReserve, boolean waitForEviction);
   void releaseMemory(long memUsage);
   void updateMaxSize(long maxSize);
+  void forceReservedMemory(int memoryToEvict);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/2cf05168/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
index 6d21997..6375996 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
@@ -58,6 +58,10 @@ public class TestBuddyAllocator {
     @Override
     public void updateMaxSize(long maxSize) {
     }
+
+    @Override
+    public void forceReservedMemory(int memoryToEvict) {
+    }
   }
 
   @Test
@@ -280,7 +284,7 @@ public class TestBuddyAllocator {
     Configuration conf = new Configuration();
     conf.setInt(ConfVars.LLAP_ORC_CACHE_MIN_ALLOC.varname, min);
     conf.setInt(ConfVars.LLAP_ORC_CACHE_MAX_ALLOC.varname, max);
-    conf.setInt(ConfVars.LLAP_ORC_CACHE_ARENA_SIZE.varname, arena);
+    conf.setInt(ConfVars.LLAP_ORC_CACHE_ARENA_COUNT.varname, total/arena);
     conf.setLong(ConfVars.LLAP_ORC_CACHE_MAX_SIZE.varname, total);
     return conf;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/2cf05168/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
index b886d77..901e58a 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
@@ -78,6 +78,10 @@ public class TestOrcMetadataCache {
     @Override
     public void updateMaxSize(long maxSize) {
     }
+
+    @Override
+    public void forceReservedMemory(int memoryToEvict) {
+    }
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hive/blob/2cf05168/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index e0c0743..f789a4f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamD
 import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
 import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
 import org.apache.hadoop.hive.ql.io.orc.DataReader;
+import org.apache.hadoop.hive.ql.io.orc.OrcConf;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto;
 import org.apache.hadoop.hive.ql.io.orc.OutStream;
 import org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils;
@@ -751,7 +752,7 @@ class EncodedReaderImpl implements EncodedReader {
 
   /**
    * To achieve some sort of consistent cache boundaries, we will cache streams deterministically;
-   * in segments starting w/stream start, and going for either stream size or maximum allocation.
+   * in segments starting w/stream start, and going for either stream size or some fixed size.
    * If we are not reading the entire segment's worth of data, then we will not cache the partial
    * RGs; the breakage of cache assumptions (no interleaving blocks, etc.) is way too much PITA
    * to handle just for this case.
@@ -777,87 +778,87 @@ class EncodedReaderImpl implements EncodedReader {
     }
     // Account for maximum cache buffer size.
     long streamLen = streamEnd - streamOffset;
-    int partSize = cache.getAllocator().getMaxAllocation(),
-        partCount = (int)((streamLen / partSize) + (((streamLen % partSize) != 0) ? 1 : 0));
-    long partOffset = streamOffset, partEnd = Math.min(partOffset + partSize, streamEnd);
+    int partSize = determineUncompressedPartSize(), //
+        partCount = (int)(streamLen / partSize) + (((streamLen % partSize) != 0) ? 1 : 0);
 
     CacheChunk lastUncompressed = null;
     MemoryBuffer[] singleAlloc = new MemoryBuffer[1];
+    /*
+Starting pre-read for [12187411,17107411) at start: 12187411 end: 12449555 cache buffer: 0x5f64a8f6(2)
+Processing uncompressed file data at [12187411, 12449555)
+  */
     for (int i = 0; i < partCount; ++i) {
-      long hasEntirePartTo = -1;
-      if (partOffset == current.getOffset()) {
-        hasEntirePartTo = partOffset;
+      long partOffset = streamOffset + (i * partSize),
+           partEnd = Math.min(partOffset + partSize, streamEnd);
+      long hasEntirePartTo = partOffset; // We have 0 bytes of data for this part, for now.
+      assert partOffset <= current.getOffset();
+      if (partOffset == current.getOffset() && current instanceof CacheChunk) {
         // We assume cache chunks would always match the way we read, so check and skip it.
-        if (current instanceof CacheChunk) {
-          lastUncompressed = (CacheChunk)current;
-          assert current.getOffset() == partOffset && current.getEnd() == partEnd;
-          partOffset = partEnd;
-          partEnd = Math.min(partOffset + partSize, streamEnd);
-          continue;
-        }
+        assert current.getOffset() == partOffset && current.getEnd() == partEnd;
+        lastUncompressed = (CacheChunk)current;
+        current = current.next;
+        continue;
       }
       if (current.getOffset() >= partEnd) {
-        // We have no data at all for this part of the stream (could be unneeded), skip.
-        partOffset = partEnd;
-        partEnd = Math.min(partOffset + partSize, streamEnd);
-        continue;
+        continue; // We have no data at all for this part of the stream (could be unneeded), skip.
       }
       if (toRelease == null && dataReader.isTrackingDiskRanges()) {
         toRelease = new ArrayList<ByteBuffer>();
       }
       // We have some disk buffers... see if we have entire part, etc.
-      UncompressedCacheChunk candidateCached = null;
+      UncompressedCacheChunk candidateCached = null; // We will cache if we have the entire part.
       DiskRangeList next = current;
       while (true) {
-        if (next == null || next.getOffset() >= partEnd) {
-          if (hasEntirePartTo < partEnd && candidateCached != null) {
-            // We are missing a section at the end of the part...
-            lastUncompressed = copyAndReplaceCandidateToNonCached(
-                candidateCached, partOffset, hasEntirePartTo, cache, singleAlloc);
-            candidateCached = null;
-          }
-          break;
+        boolean noMoreDataForPart = (next == null || next.getOffset() >= partEnd);
+        if (noMoreDataForPart && hasEntirePartTo < partEnd && candidateCached != null) {
+          // We are missing a section at the end of the part... copy the start to non-cached.
+          lastUncompressed = copyAndReplaceCandidateToNonCached(
+              candidateCached, partOffset, hasEntirePartTo, cache, singleAlloc);
+          candidateCached = null;
         }
         current = next;
-        boolean wasSplit = (current.getEnd() > partEnd);
-        if (wasSplit) {
+        if (noMoreDataForPart) break; // Done with this part.
+
+        boolean wasSplit = false;
+        if (current.getEnd() > partEnd) {
+          // If the current buffer contains multiple parts, split it.
           current = current.split(partEnd);
+          wasSplit = true;
         }
         if (isDebugTracingEnabled) {
           LOG.info("Processing uncompressed file data at ["
               + current.getOffset() + ", " + current.getEnd() + ")");
         }
-        BufferChunk bc = (BufferChunk)current;
+        BufferChunk curBc = (BufferChunk)current;
         if (!wasSplit && toRelease != null) {
-          toRelease.add(bc.getChunk()); // TODO: is it valid to give zcr the modified 2nd part?
+          toRelease.add(curBc.getChunk()); // TODO: is it valid to give zcr the modified 2nd part?
         }
 
         // Track if we still have the entire part.
         long hadEntirePartTo = hasEntirePartTo;
-        if (hasEntirePartTo != -1) {
-          hasEntirePartTo = (hasEntirePartTo == current.getOffset()) ? current.getEnd() : -1;
-        }
-        if (candidateCached != null && hasEntirePartTo == -1) {
-          lastUncompressed = copyAndReplaceCandidateToNonCached(
-              candidateCached, partOffset, hadEntirePartTo, cache, singleAlloc);
-          candidateCached = null;
-        }
-
-        if (hasEntirePartTo != -1) {
+        // We have data until the end of current block if we had it until the beginning.
+        hasEntirePartTo = (hasEntirePartTo == current.getOffset()) ? current.getEnd() : -1;
+        if (hasEntirePartTo == -1) {
+          // We don't have the entire part; copy both whatever we intended to cache, and the rest,
+          // to an allocated buffer. We could try to optimize a bit if we have contiguous buffers
+          // with gaps, but it's probably not needed.
+          if (candidateCached != null) {
+            assert hadEntirePartTo != -1;
+            copyAndReplaceCandidateToNonCached(
+                candidateCached, partOffset, hadEntirePartTo, cache, singleAlloc);
+            candidateCached = null;
+          }
+          lastUncompressed = copyAndReplaceUncompressedToNonCached(curBc, cache, singleAlloc);
+          next = lastUncompressed.next; // There may be more data after the gap.
+        } else {
           // So far we have all the data from the beginning of the part.
           if (candidateCached == null) {
-            candidateCached = new UncompressedCacheChunk(bc);
+            candidateCached = new UncompressedCacheChunk(curBc);
           } else {
-            candidateCached.addChunk(bc);
+            candidateCached.addChunk(curBc);
           }
-          // We will take care of this at the end of the part, or if we find a gap.
           next = current.next;
-          continue;
         }
-        // We don't have the entire part; just copy to an allocated buffer. We could try to
-        // optimize a bit if we have contiguous buffers with gaps, but it's probably not needed.
-        lastUncompressed = copyAndReplaceUncompressedToNonCached(bc, cache, singleAlloc);
-        next = lastUncompressed.next;
       }
       if (candidateCached != null) {
         if (toCache == null) {
@@ -908,6 +909,16 @@ class EncodedReaderImpl implements EncodedReader {
     return lastUncompressed;
   }
 
+
+  private int determineUncompressedPartSize() {
+    // We will break the uncompressed data in the cache in the chunks that are the size
+    // of the prevalent ORC compression buffer (the default), or maximum allocation (since we
+    // cannot allocate bigger chunks), whichever is less.
+    long orcCbSizeDefault = ((Number)OrcConf.BUFFER_SIZE.getDefaultValue()).longValue();
+    int maxAllocSize = cache.getAllocator().getMaxAllocation();
+    return (int)Math.min(maxAllocSize, orcCbSizeDefault);
+  }
+
   private static void copyUncompressedChunk(ByteBuffer src, ByteBuffer dest) {
     int startPos = dest.position(), startLim = dest.limit();
     dest.put(src); // Copy uncompressed data to cache.