You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/04/26 19:54:02 UTC

[39/50] [abbrv] hive git commit: HIVE-13241 : LLAP: Incremental Caching marks some small chunks as "incomplete CB" (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

HIVE-13241 : LLAP: Incremental Caching marks some small chunks as "incomplete CB" (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/eead54c9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/eead54c9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/eead54c9

Branch: refs/heads/llap
Commit: eead54c981984d946ef0d9f932667ea48650a2c3
Parents: 53249a3
Author: Sergey Shelukhin <se...@apache.org>
Authored: Mon Apr 25 18:20:53 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Mon Apr 25 18:20:53 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   4 +
 .../hive/llap/cache/EvictionDispatcher.java     |   5 +
 .../hive/llap/io/api/impl/LlapIoImpl.java       |   4 +-
 .../llap/io/encoded/OrcEncodedDataReader.java   |  16 ++-
 .../llap/io/metadata/OrcFileEstimateErrors.java | 121 +++++++++++++++++++
 .../hive/llap/io/metadata/OrcMetadataCache.java |  58 ++++++++-
 .../hive/llap/cache/TestOrcMetadataCache.java   |   2 +-
 .../ql/io/orc/encoded/EncodedReaderImpl.java    |  98 ++++++++-------
 8 files changed, 256 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/eead54c9/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 8ccc262..bae3999 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2603,6 +2603,10 @@ public class HiveConf extends Configuration {
         "modification time, which is almost certain to identify file uniquely. However, if you\n" +
         "use a FS without file IDs and rewrite files a lot (or are paranoid), you might want\n" +
         "to avoid this setting."),
+    LLAP_CACHE_ENABLE_ORC_GAP_CACHE("hive.llap.orc.gap.cache", true,
+        "Whether LLAP cache for ORC should remember gaps in ORC RG read estimates, to avoid\n" +
+        "re-reading the data that was read once and discarded because it is unneeded. This is\n" +
+        "only necessary for ORC files written before HIVE-9660 (Hive 2.1?)."),
     LLAP_IO_USE_FILEID_PATH("hive.llap.io.use.fileid.path", true,
         "Whether LLAP should use fileId (inode)-based path to ensure better consistency for the\n" +
         "cases of file overwrites. This is supported on HDFS."),

http://git-wip-us.apache.org/repos/asf/hive/blob/eead54c9/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
index bae571e..91932e2 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.llap.cache;
 
+import org.apache.hadoop.hive.llap.io.metadata.OrcFileEstimateErrors;
 import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
 import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache;
 import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
@@ -49,4 +50,8 @@ public final class EvictionDispatcher implements EvictionListener {
   public void notifyEvicted(OrcStripeMetadata buffer) {
     metadataCache.notifyEvicted(buffer);
   }
+
+  public void notifyEvicted(OrcFileEstimateErrors buffer) {
+    metadataCache.notifyEvicted(buffer);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/eead54c9/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index d43ff15..6a72b4c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.io.Allocator;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.cache.BuddyAllocator;
 import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
 import org.apache.hadoop.hive.llap.cache.EvictionAwareAllocator;
@@ -117,7 +118,8 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
       EvictionAwareAllocator allocator = new BuddyAllocator(conf, memManager, cacheMetrics);
       this.allocator = allocator;
       orcCache = new LowLevelCacheImpl(cacheMetrics, cachePolicy, allocator, true);
-      metadataCache = new OrcMetadataCache(memManager, cachePolicy);
+      boolean useGapCache = HiveConf.getBoolVar(conf, ConfVars.LLAP_CACHE_ENABLE_ORC_GAP_CACHE);
+      metadataCache = new OrcMetadataCache(memManager, cachePolicy, useGapCache);
       // And finally cache policy uses cache to notify it of eviction. The cycle is complete!
       cachePolicy.setEvictionListener(new EvictionDispatcher(orcCache, metadataCache));
       cachePolicy.setParentDebugDumper(orcCache);

http://git-wip-us.apache.org/repos/asf/hive/blob/eead54c9/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index eb953c7..406f8f6 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -817,15 +817,23 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     @Override
     public DiskRangeList getFileData(Object fileKey, DiskRangeList range,
         long baseOffset, DiskRangeListFactory factory, BooleanRef gotAllData) {
-      return (lowLevelCache == null) ? range : lowLevelCache.getFileData(
-          fileKey, range, baseOffset, factory, counters, gotAllData);
+      DiskRangeList result = (lowLevelCache == null) ? range
+          : lowLevelCache.getFileData(fileKey, range, baseOffset, factory, counters, gotAllData);
+      if (gotAllData.value) return result;
+      return (metadataCache == null) ? range
+          : metadataCache.getIncompleteCbs(fileKey, range, baseOffset, factory, gotAllData);
     }
 
     @Override
     public long[] putFileData(Object fileKey, DiskRange[] ranges,
         MemoryBuffer[] data, long baseOffset) {
-      return (lowLevelCache == null) ? null : lowLevelCache.putFileData(
-          fileKey, ranges, data, baseOffset, Priority.NORMAL, counters);
+      if (data != null) {
+        return (lowLevelCache == null) ? null : lowLevelCache.putFileData(
+            fileKey, ranges, data, baseOffset, Priority.NORMAL, counters);
+      } else if (metadataCache != null) {
+        metadataCache.putIncompleteCbs(fileKey, ranges, baseOffset);
+      }
+      return null;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/eead54c9/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
new file mode 100644
index 0000000..ad88b98
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.metadata;
+
+import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
+import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
+import org.apache.hadoop.hive.common.io.DiskRangeList.MutateHelper;
+import org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator;
+import org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator.ObjectEstimator;
+import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
+import org.apache.hadoop.hive.llap.cache.LlapCacheableBuffer;
+import org.apache.hadoop.hive.ql.io.SyntheticFileId;
+
+public class OrcFileEstimateErrors extends LlapCacheableBuffer {
+  private final Object fileKey;
+  private int estimatedMemUsage;
+  private final ConcurrentHashMap<Long, Integer> cache = new ConcurrentHashMap<>();
+
+  private final static HashMap<Class<?>, ObjectEstimator> SIZE_ESTIMATORS;
+  private final static ObjectEstimator SIZE_ESTIMATOR;
+  static {
+    SIZE_ESTIMATORS = IncrementalObjectSizeEstimator.createEstimators(createDummy(
+        new SyntheticFileId(new Path("/"), 0, 0)));
+    SIZE_ESTIMATOR = SIZE_ESTIMATORS.get(OrcFileEstimateErrors.class);
+  }
+
+  public OrcFileEstimateErrors(Object fileKey) {
+    this.fileKey = fileKey;
+  }
+
+  public void addError(long offset, int length, long baseOffset) {
+    Long key = Long.valueOf(offset + baseOffset);
+    Integer existingLength = cache.get(key);
+    if (existingLength != null && existingLength >= length) return;
+    Integer value = Integer.valueOf(length);
+    while (true) {
+      existingLength = cache.putIfAbsent(key, value);
+      if (existingLength == null || existingLength >= length) return;
+      cache.remove(key, existingLength);
+    }
+  }
+
+  public DiskRangeList getIncompleteCbs(DiskRangeList ranges, long baseOffset,
+      DiskRangeListFactory factory, BooleanRef gotAllData) {
+    DiskRangeList prev = ranges.prev;
+    if (prev == null) {
+      prev = new MutateHelper(ranges);
+    }
+    DiskRangeList current = ranges;
+    while (current != null) {
+      // We assume ranges in "ranges" are non-overlapping; thus, we will save next in advance.
+      DiskRangeList check = current;
+      current = current.next;
+      if (check.hasData()) continue;
+      Integer badLength = cache.get(Long.valueOf(check.getOffset() + baseOffset));
+      if (badLength == null || badLength < check.getLength()) {
+        gotAllData.value = false;
+        continue;
+      }
+      check.removeSelf();
+    }
+    return prev.next;
+  }
+
+  public Object getFileKey() {
+    return fileKey;
+  }
+
+  public long estimateMemoryUsage() {
+    // Since we won't be able to update this as we add, for now, estimate 10x usage.
+    // This shouldn't be much and this cache should be remove later anyway.
+    estimatedMemUsage = 10 * SIZE_ESTIMATOR.estimate(this, SIZE_ESTIMATORS);
+    return estimatedMemUsage;
+  }
+
+  private static OrcFileEstimateErrors createDummy(Object fileKey) {
+    OrcFileEstimateErrors dummy = new OrcFileEstimateErrors(fileKey);
+    dummy.addError(0L, 0, 0L);
+    return dummy;
+  }
+
+  @Override
+  protected boolean invalidate() {
+    return true;
+  }
+
+  @Override
+  public long getMemoryUsage() {
+    return estimatedMemUsage;
+  }
+
+  @Override
+  public void notifyEvicted(EvictionDispatcher evictionDispatcher) {
+    evictionDispatcher.notifyEvicted(this);
+  }
+
+  @Override
+  protected boolean isLocked() {
+    return false;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/eead54c9/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
index e970137..66713d3 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
@@ -21,22 +21,29 @@ package org.apache.hadoop.hive.llap.io.metadata;
 import java.io.IOException;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.hadoop.hive.common.io.DiskRange;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
+import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
 import org.apache.hadoop.hive.llap.cache.LowLevelCachePolicy;
 import org.apache.hadoop.hive.llap.cache.MemoryManager;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
 import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey;
 
 public class OrcMetadataCache {
-  private final ConcurrentHashMap<Object, OrcFileMetadata> metadata =
-      new ConcurrentHashMap<Object, OrcFileMetadata>();
+  private final ConcurrentHashMap<Object, OrcFileMetadata> metadata = new ConcurrentHashMap<>();
   private final ConcurrentHashMap<OrcBatchKey, OrcStripeMetadata> stripeMetadata =
-      new ConcurrentHashMap<OrcBatchKey, OrcStripeMetadata>();
+      new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<Object, OrcFileEstimateErrors> estimateErrors;
   private final MemoryManager memoryManager;
   private final LowLevelCachePolicy policy;
 
-  public OrcMetadataCache(MemoryManager memoryManager, LowLevelCachePolicy policy) {
+  public OrcMetadataCache(MemoryManager memoryManager, LowLevelCachePolicy policy,
+      boolean useEstimateCache) {
     this.memoryManager = memoryManager;
     this.policy = policy;
+    this.estimateErrors = useEstimateCache
+        ? new ConcurrentHashMap<Object, OrcFileEstimateErrors>() : null;
   }
 
   public OrcFileMetadata putFileMetadata(OrcFileMetadata metaData) {
@@ -71,6 +78,37 @@ public class OrcMetadataCache {
     return val;
   }
 
+  public void putIncompleteCbs(Object fileKey, DiskRange[] ranges, long baseOffset) {
+    if (estimateErrors == null) return;
+    OrcFileEstimateErrors errorData = estimateErrors.get(fileKey);
+    boolean isNew = false;
+    // We should technically update memory usage if updating the old object, but we don't do it
+    // for now; there is no mechanism to properly notify the cache policy/etc. wrt parallel evicts.
+    if (errorData == null) {
+      errorData = new OrcFileEstimateErrors(fileKey);
+      for (DiskRange range : ranges) {
+        errorData.addError(range.getOffset(), range.getLength(), baseOffset);
+      }
+      long memUsage = errorData.estimateMemoryUsage();
+      memoryManager.reserveMemory(memUsage, false);
+      OrcFileEstimateErrors old = estimateErrors.putIfAbsent(fileKey, errorData);
+      if (old != null) {
+        errorData = old;
+        memoryManager.releaseMemory(memUsage);
+        policy.notifyLock(errorData);
+      } else {
+        isNew = true;
+        policy.cache(errorData, Priority.NORMAL);
+      }
+    }
+    if (!isNew) {
+      for (DiskRange range : ranges) {
+        errorData.addError(range.getOffset(), range.getLength(), baseOffset);
+      }
+    }
+    policy.notifyUnlock(errorData);
+  }
+
   public OrcStripeMetadata getStripeMetadata(OrcBatchKey stripeKey) throws IOException {
     return stripeMetadata.get(stripeKey);
   }
@@ -79,6 +117,14 @@ public class OrcMetadataCache {
     return metadata.get(fileKey);
   }
 
+  public DiskRangeList getIncompleteCbs(Object fileKey, DiskRangeList ranges, long baseOffset,
+      DiskRangeListFactory factory, BooleanRef gotAllData) {
+    if (estimateErrors == null) return ranges;
+    OrcFileEstimateErrors errors = estimateErrors.get(fileKey);
+    if (errors == null) return ranges;
+    return errors.getIncompleteCbs(ranges, baseOffset, factory, gotAllData);
+  }
+
   public void notifyEvicted(OrcFileMetadata buffer) {
     metadata.remove(buffer.getFileKey());
     // See OrcFileMetadata - we don't clear the object, it will be GCed when released by users.
@@ -88,4 +134,8 @@ public class OrcMetadataCache {
     stripeMetadata.remove(buffer.getKey());
     // See OrcStripeMetadata - we don't clear the object, it will be GCed when released by users.
   }
+
+  public void notifyEvicted(OrcFileEstimateErrors buffer) {
+    estimateErrors.remove(buffer.getFileKey());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/eead54c9/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
index 901e58a..3f2e750 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
@@ -88,7 +88,7 @@ public class TestOrcMetadataCache {
   public void testGetPut() throws Exception {
     DummyMemoryManager mm = new DummyMemoryManager();
     DummyCachePolicy cp = new DummyCachePolicy();
-    OrcMetadataCache cache = new OrcMetadataCache(mm, cp);
+    OrcMetadataCache cache = new OrcMetadataCache(mm, cp, false);
     OrcFileMetadata ofm1 = OrcFileMetadata.createDummy(1), ofm2 = OrcFileMetadata.createDummy(2);
     assertSame(ofm1, cache.putFileMetadata(ofm1));
     assertEquals(1, mm.allocs);

http://git-wip-us.apache.org/repos/asf/hive/blob/eead54c9/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 8ccedb7..40cc86f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -101,18 +101,18 @@ class EncodedReaderImpl implements EncodedReader {
   private final int bufferSize;
   private final List<OrcProto.Type> types;
   private final long rowIndexStride;
-  private final DataCache cache;
+  private final DataCache cacheWrapper;
   private boolean isTracingEnabled;
 
   public EncodedReaderImpl(Object fileKey, List<OrcProto.Type> types, CompressionCodec codec,
-      int bufferSize, long strideRate, DataCache cache, DataReader dataReader, PoolFactory pf)
-          throws IOException {
+      int bufferSize, long strideRate, DataCache cacheWrapper, DataReader dataReader,
+      PoolFactory pf) throws IOException {
     this.fileKey = fileKey;
     this.codec = codec;
     this.types = types;
     this.bufferSize = bufferSize;
     this.rowIndexStride = strideRate;
-    this.cache = cache;
+    this.cacheWrapper = cacheWrapper;
     this.dataReader = dataReader;
     if (POOLS != null) return;
     if (pf == null) {
@@ -291,7 +291,7 @@ class EncodedReaderImpl implements EncodedReader {
     }
     BooleanRef isAllInCache = new BooleanRef();
     if (hasFileId) {
-      cache.getFileData(fileKey, toRead.next, stripeOffset, CC_FACTORY, isAllInCache);
+      cacheWrapper.getFileData(fileKey, toRead.next, stripeOffset, CC_FACTORY, isAllInCache);
       if (isTracingEnabled && LOG.isInfoEnabled()) {
         LOG.trace("Disk ranges after cache (file " + fileKey + ", base offset " + stripeOffset
             + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next));
@@ -303,7 +303,7 @@ class EncodedReaderImpl implements EncodedReader {
         this.dataReader.open();
         isDataReaderOpen = true;
       }
-      dataReader.readFileData(toRead.next, stripeOffset, cache.getAllocator().isDirectAlloc());
+      dataReader.readFileData(toRead.next, stripeOffset, cacheWrapper.getAllocator().isDirectAlloc());
     }
 
     // 3. For uncompressed case, we need some special processing before read.
@@ -452,7 +452,7 @@ class EncodedReaderImpl implements EncodedReader {
       CacheChunk cc = (CacheChunk)toFree;
       if (cc.getBuffer() == null) continue;
       MemoryBuffer buffer = cc.getBuffer();
-      cache.releaseBuffer(buffer);
+      cacheWrapper.releaseBuffer(buffer);
       cc.setBuffer(null);
     }
   }
@@ -501,11 +501,11 @@ class EncodedReaderImpl implements EncodedReader {
     }
 
     @Override
-    public void handleCacheCollision(DataCache cache,
+    public void handleCacheCollision(DataCache cacheWrapper,
         MemoryBuffer replacementBuffer, List<MemoryBuffer> cacheBuffers) {
       assert cacheBuffers == null;
       // This is done at pre-read stage where there's nothing special w/refcounts. Just release.
-      cache.getAllocator().deallocate(getBuffer());
+      cacheWrapper.getAllocator().deallocate(getBuffer());
       // Replace the buffer in our big range list, as well as in current results.
       this.setBuffer(replacementBuffer);
     }
@@ -544,14 +544,14 @@ class EncodedReaderImpl implements EncodedReader {
     }
 
     @Override
-    public void handleCacheCollision(DataCache cache, MemoryBuffer replacementBuffer,
+    public void handleCacheCollision(DataCache cacheWrapper, MemoryBuffer replacementBuffer,
         List<MemoryBuffer> cacheBuffers) {
       assert originalCbIndex >= 0;
       // Had the put succeeded for our new buffer, it would have refcount of 2 - 1 from put,
       // and 1 from notifyReused call above. "Old" buffer now has the 1 from put; new buffer
       // is not in cache.
-      cache.getAllocator().deallocate(getBuffer());
-      cache.reuseBuffer(replacementBuffer);
+      cacheWrapper.getAllocator().deallocate(getBuffer());
+      cacheWrapper.reuseBuffer(replacementBuffer);
       // Replace the buffer in our big range list, as well as in current results.
       this.buffer = replacementBuffer;
       cacheBuffers.set(originalCbIndex, replacementBuffer);
@@ -594,9 +594,11 @@ class EncodedReaderImpl implements EncodedReader {
     boolean isCompressed = codec != null;
     List<ProcCacheChunk> toDecompress = null;
     List<ByteBuffer> toRelease = null;
+    List<IncompleteCb> badEstimates = null;
     if (isCompressed) {
       toRelease = !dataReader.isTrackingDiskRanges() ? null : new ArrayList<ByteBuffer>();
-      toDecompress = new ArrayList<ProcCacheChunk>();
+      toDecompress = new ArrayList<>();
+      badEstimates = new ArrayList<>();
     }
 
     // 1. Find our bearings in the stream. Normally, iter will already point either to where we
@@ -612,17 +614,25 @@ class EncodedReaderImpl implements EncodedReader {
     // 2. Go thru the blocks; add stuff to results and prepare the decompression work (see below).
     lastUncompressed = isCompressed ?
         prepareRangesForCompressedRead(cOffset, endCOffset, streamOffset,
-            unlockUntilCOffset, current, csd, toRelease, toDecompress)
+            unlockUntilCOffset, current, csd, toRelease, toDecompress, badEstimates)
       : prepareRangesForUncompressedRead(
           cOffset, endCOffset, streamOffset, unlockUntilCOffset, current, csd);
 
+    // 2.5. Remember the bad estimates for future reference.
+    if (badEstimates != null && !badEstimates.isEmpty()) {
+      // Relies on the fact that cache does not actually store these.
+      DiskRange[] cacheKeys = badEstimates.toArray(new DiskRange[badEstimates.size()]);
+      long[] result = cacheWrapper.putFileData(fileKey, cacheKeys, null, baseOffset);
+      assert result == null; // We don't expect conflicts from bad estimates.
+    }
+
+    if (toDecompress == null || toDecompress.isEmpty()) return lastUncompressed; // Nothing to do.
+
     // 3. Allocate the buffers, prepare cache keys.
     // At this point, we have read all the CBs we need to read. cacheBuffers contains some cache
     // data and some unallocated membufs for decompression. toDecompress contains all the work we
     // need to do, and each item points to one of the membufs in cacheBuffers as target. The iter
     // has also been adjusted to point to these buffers instead of compressed data for the ranges.
-    if (toDecompress == null) return lastUncompressed; // Nothing to decompress.
-
     MemoryBuffer[] targetBuffers = new MemoryBuffer[toDecompress.size()];
     DiskRange[] cacheKeys = new DiskRange[toDecompress.size()];
     int ix = 0;
@@ -631,7 +641,7 @@ class EncodedReaderImpl implements EncodedReader {
       targetBuffers[ix] = chunk.getBuffer();
       ++ix;
     }
-    cache.getAllocator().allocateMultiple(targetBuffers, bufferSize);
+    cacheWrapper.getAllocator().allocateMultiple(targetBuffers, bufferSize);
 
     // 4. Now decompress (or copy) the data into cache buffers.
     for (ProcCacheChunk chunk : toDecompress) {
@@ -646,7 +656,7 @@ class EncodedReaderImpl implements EncodedReader {
       if (isTracingEnabled) {
         LOG.trace("Locking " + chunk.getBuffer() + " due to reuse (after decompression)");
       }
-      cache.reuseBuffer(chunk.getBuffer());
+      cacheWrapper.reuseBuffer(chunk.getBuffer());
     }
 
     // 5. Release original compressed buffers to zero-copy reader if needed.
@@ -658,8 +668,9 @@ class EncodedReaderImpl implements EncodedReader {
     }
 
     // 6. Finally, put uncompressed data to cache.
+
     if (fileKey != null) {
-      long[] collisionMask = cache.putFileData(fileKey, cacheKeys, targetBuffers, baseOffset);
+      long[] collisionMask = cacheWrapper.putFileData(fileKey, cacheKeys, targetBuffers, baseOffset);
       processCacheCollisions(collisionMask, toDecompress, targetBuffers, csd.getCacheBuffers());
     }
 
@@ -674,7 +685,8 @@ class EncodedReaderImpl implements EncodedReader {
 
   private CacheChunk prepareRangesForCompressedRead(long cOffset, long endCOffset,
       long streamOffset, long unlockUntilCOffset, DiskRangeList current, ColumnStreamData columnStreamData,
-      List<ByteBuffer> toRelease, List<ProcCacheChunk> toDecompress) throws IOException {
+      List<ByteBuffer> toRelease, List<ProcCacheChunk> toDecompress,
+      List<IncompleteCb> badEstimates) throws IOException {
     if (cOffset > current.getOffset()) {
       // Target compression block is in the middle of the range; slice the range in two.
       current = current.split(cOffset).next;
@@ -689,7 +701,7 @@ class EncodedReaderImpl implements EncodedReader {
         if (isTracingEnabled) {
           LOG.trace("Locking " + cc.getBuffer() + " due to reuse");
         }
-        cache.reuseBuffer(cc.getBuffer());
+        cacheWrapper.reuseBuffer(cc.getBuffer());
         columnStreamData.getCacheBuffers().add(cc.getBuffer());
         currentOffset = cc.getEnd();
         if (isTracingEnabled) {
@@ -710,7 +722,7 @@ class EncodedReaderImpl implements EncodedReader {
         // several disk ranges, so we might need to combine them.
         BufferChunk bc = (BufferChunk)current;
         ProcCacheChunk newCached = addOneCompressionBuffer(
-            bc, columnStreamData.getCacheBuffers(), toDecompress, toRelease);
+            bc, columnStreamData.getCacheBuffers(), toDecompress, toRelease, badEstimates);
         lastUncompressed = (newCached == null) ? lastUncompressed : newCached;
         next = (newCached != null) ? newCached.next : null;
         currentOffset = (next != null) ? next.getOffset() : -1;
@@ -737,7 +749,7 @@ class EncodedReaderImpl implements EncodedReader {
       if (isTracingEnabled) {
         LOG.trace("Locking " + lastUncompressed.getBuffer() + " due to reuse");
       }
-      cache.reuseBuffer(lastUncompressed.getBuffer());
+      cacheWrapper.reuseBuffer(lastUncompressed.getBuffer());
       if (isFirst) {
         columnStreamData.setIndexBaseOffset((int)(lastUncompressed.getOffset() - streamOffset));
         isFirst = false;
@@ -818,7 +830,7 @@ class EncodedReaderImpl implements EncodedReader {
         if (noMoreDataForPart && hasEntirePartTo < partEnd && candidateCached != null) {
           // We are missing a section at the end of the part... copy the start to non-cached.
           lastUncompressed = copyAndReplaceCandidateToNonCached(
-              candidateCached, partOffset, hasEntirePartTo, cache, singleAlloc);
+              candidateCached, partOffset, hasEntirePartTo, cacheWrapper, singleAlloc);
           candidateCached = null;
         }
         current = next;
@@ -850,10 +862,10 @@ class EncodedReaderImpl implements EncodedReader {
           if (candidateCached != null) {
             assert hadEntirePartTo != -1;
             copyAndReplaceCandidateToNonCached(
-                candidateCached, partOffset, hadEntirePartTo, cache, singleAlloc);
+                candidateCached, partOffset, hadEntirePartTo, cacheWrapper, singleAlloc);
             candidateCached = null;
           }
-          lastUncompressed = copyAndReplaceUncompressedToNonCached(curBc, cache, singleAlloc);
+          lastUncompressed = copyAndReplaceUncompressedToNonCached(curBc, cacheWrapper, singleAlloc);
           next = lastUncompressed.next; // There may be more data after the gap.
         } else {
           // So far we have all the data from the beginning of the part.
@@ -885,7 +897,7 @@ class EncodedReaderImpl implements EncodedReader {
       cacheKeys[ix] = chunk; // Relies on the fact that cache does not actually store these.
       ++ix;
     }
-    cache.getAllocator().allocateMultiple(
+    cacheWrapper.getAllocator().allocateMultiple(
         targetBuffers, (int)(partCount == 1 ? streamLen : partSize));
 
     // 4. Now copy the data into cache buffers.
@@ -909,7 +921,7 @@ class EncodedReaderImpl implements EncodedReader {
 
     // 6. Finally, put uncompressed data to cache.
     if (fileKey != null) {
-      long[] collisionMask = cache.putFileData(fileKey, cacheKeys, targetBuffers, baseOffset);
+      long[] collisionMask = cacheWrapper.putFileData(fileKey, cacheKeys, targetBuffers, baseOffset);
       processCacheCollisions(collisionMask, toCache, targetBuffers, null);
     }
 
@@ -922,7 +934,7 @@ class EncodedReaderImpl implements EncodedReader {
     // of the prevalent ORC compression buffer (the default), or maximum allocation (since we
     // cannot allocate bigger chunks), whichever is less.
     long orcCbSizeDefault = ((Number)OrcConf.BUFFER_SIZE.getDefaultValue()).longValue();
-    int maxAllocSize = cache.getAllocator().getMaxAllocation();
+    int maxAllocSize = cacheWrapper.getAllocator().getMaxAllocation();
     return (int)Math.min(maxAllocSize, orcCbSizeDefault);
   }
 
@@ -942,14 +954,14 @@ class EncodedReaderImpl implements EncodedReader {
 
   private static CacheChunk copyAndReplaceCandidateToNonCached(
       UncompressedCacheChunk candidateCached, long partOffset,
-      long candidateEnd, DataCache cache, MemoryBuffer[] singleAlloc) {
+      long candidateEnd, DataCache cacheWrapper, MemoryBuffer[] singleAlloc) {
     // We thought we had the entire part to cache, but we don't; convert start to
     // non-cached. Since we are at the first gap, the previous stuff must be contiguous.
     singleAlloc[0] = null;
-    cache.getAllocator().allocateMultiple(singleAlloc, (int)(candidateEnd - partOffset));
+    cacheWrapper.getAllocator().allocateMultiple(singleAlloc, (int)(candidateEnd - partOffset));
 
     MemoryBuffer buffer = singleAlloc[0];
-    cache.reuseBuffer(buffer);
+    cacheWrapper.reuseBuffer(buffer);
     ByteBuffer dest = buffer.getByteBufferRaw();
     CacheChunk tcc = POOLS.tccPool.take();
     tcc.init(buffer, partOffset, candidateEnd);
@@ -958,11 +970,11 @@ class EncodedReaderImpl implements EncodedReader {
   }
 
   private static CacheChunk copyAndReplaceUncompressedToNonCached(
-      BufferChunk bc, DataCache cache, MemoryBuffer[] singleAlloc) {
+      BufferChunk bc, DataCache cacheWrapper, MemoryBuffer[] singleAlloc) {
     singleAlloc[0] = null;
-    cache.getAllocator().allocateMultiple(singleAlloc, bc.getLength());
+    cacheWrapper.getAllocator().allocateMultiple(singleAlloc, bc.getLength());
     MemoryBuffer buffer = singleAlloc[0];
-    cache.reuseBuffer(buffer);
+    cacheWrapper.reuseBuffer(buffer);
     ByteBuffer dest = buffer.getByteBufferRaw();
     CacheChunk tcc = POOLS.tccPool.take();
     tcc.init(buffer, bc.getOffset(), bc.getEnd());
@@ -1056,7 +1068,7 @@ class EncodedReaderImpl implements EncodedReader {
       LOG.trace("Unlocking " + cc.getBuffer() + " for the fetching thread"
           + (isBacktracking ? "; backtracking" : ""));
     }
-    cache.releaseBuffer(cc.getBuffer());
+    cacheWrapper.releaseBuffer(cc.getBuffer());
     cc.setBuffer(null);
   }
 
@@ -1081,7 +1093,7 @@ class EncodedReaderImpl implements EncodedReader {
         }
         assert replacedChunk.getBuffer() != replacementBuffer : i + " was not replaced in the results "
             + "even though mask is [" + Long.toBinaryString(maskVal) + "]";
-        replacedChunk.handleCacheCollision(cache, replacementBuffer, cacheBuffers);
+        replacedChunk.handleCacheCollision(cacheWrapper, replacementBuffer, cacheBuffers);
       }
       maskVal >>= 1;
     }
@@ -1131,11 +1143,12 @@ class EncodedReaderImpl implements EncodedReader {
    * @param toDecompress The list of work to decompress - pairs of compressed buffers and the
    *                     target buffers (same as the ones added to cacheBuffers).
    * @param toRelease The list of buffers to release to zcr because they are no longer in use.
+   * @param badEstimates The list of bad estimates that cannot be decompressed.
    * @return The resulting cache chunk.
    */
   private ProcCacheChunk addOneCompressionBuffer(BufferChunk current,
       List<MemoryBuffer> cacheBuffers, List<ProcCacheChunk> toDecompress,
-      List<ByteBuffer> toRelease) throws IOException {
+      List<ByteBuffer> toRelease, List<IncompleteCb> badEstimates) throws IOException {
     ByteBuffer slice = null;
     ByteBuffer compressed = current.getChunk();
     long cbStartOffset = current.getOffset();
@@ -1166,7 +1179,7 @@ class EncodedReaderImpl implements EncodedReader {
       return cc;
     }
     if (current.getEnd() < cbEndOffset && !current.hasContiguousNext()) {
-      addIncompleteCompressionBuffer(cbStartOffset, current, 0);
+      badEstimates.add(addIncompleteCompressionBuffer(cbStartOffset, current, 0));
       return null; // This is impossible to read from this chunk.
     }
 
@@ -1221,13 +1234,13 @@ class EncodedReaderImpl implements EncodedReader {
         }
         tmp.removeSelf();
       } else {
-        addIncompleteCompressionBuffer(cbStartOffset, tmp, extraChunkCount);
+        badEstimates.add(addIncompleteCompressionBuffer(cbStartOffset, tmp, extraChunkCount));
         return null; // This is impossible to read from this chunk.
       }
     }
   }
 
-  private void addIncompleteCompressionBuffer(
+  private IncompleteCb addIncompleteCompressionBuffer(
       long cbStartOffset, DiskRangeList target, int extraChunkCount) {
     IncompleteCb icb = new IncompleteCb(cbStartOffset, target.getEnd());
     if (isTracingEnabled) {
@@ -1235,6 +1248,7 @@ class EncodedReaderImpl implements EncodedReader {
           + icb + " in the buffers");
     }
     target.replaceSelfWith(icb);
+    return icb;
   }
 
   /**
@@ -1253,7 +1267,7 @@ class EncodedReaderImpl implements EncodedReader {
       boolean isUncompressed, long cbStartOffset, long cbEndOffset, int lastChunkLength,
       BufferChunk lastChunk, List<ProcCacheChunk> toDecompress, List<MemoryBuffer> cacheBuffers) {
     // Prepare future cache buffer.
-    MemoryBuffer futureAlloc = cache.getAllocator().createUnallocated();
+    MemoryBuffer futureAlloc = cacheWrapper.getAllocator().createUnallocated();
     // Add it to result in order we are processing.
     cacheBuffers.add(futureAlloc);
     // Add it to the list of work to decompress.