You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/07/31 21:17:20 UTC

hive git commit: HIVE-20244 : forward port HIVE-19704 to master (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

Repository: hive
Updated Branches:
  refs/heads/master 87ce36b45 -> 5b2124b99


HIVE-20244 : forward port HIVE-19704 to master (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5b2124b9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5b2124b9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5b2124b9

Branch: refs/heads/master
Commit: 5b2124b99fee059d3e8ce71e8ac646a9faed84d5
Parents: 87ce36b
Author: sergey <se...@apache.org>
Authored: Tue Jul 31 14:14:18 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Tue Jul 31 14:14:18 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hive/llap/cache/BuddyAllocator.java  | 17 +++--
 .../llap/cache/LowLevelCacheMemoryManager.java  | 51 +++++++------
 .../hadoop/hive/llap/cache/MemoryManager.java   |  4 +-
 .../llap/io/encoded/OrcEncodedDataReader.java   | 14 ++--
 .../llap/io/encoded/SerDeEncodedDataReader.java | 34 ++++++---
 .../hive/llap/io/metadata/MetadataCache.java    | 76 ++++++++++++--------
 .../hive/llap/cache/TestBuddyAllocator.java     |  3 +-
 .../llap/cache/TestLowLevelLrfuCachePolicy.java | 12 ++--
 .../hive/llap/cache/TestOrcMetadataCache.java   | 19 +++--
 .../hadoop/hive/llap/LlapCacheAwareFs.java      |  2 +
 .../hive/ql/io/orc/encoded/EncodedReader.java   |  3 +
 .../ql/io/orc/encoded/EncodedReaderImpl.java    | 31 +++++---
 .../ql/io/orc/encoded/StoppableAllocator.java   | 30 ++++++++
 .../vector/VectorizedParquetRecordReader.java   |  5 +-
 .../hive/common/io/FileMetadataCache.java       | 17 +++--
 15 files changed, 223 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
index e3ce2e7..a27964f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.llap.cache;
 
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -45,9 +46,10 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
+import org.apache.hadoop.hive.ql.io.orc.encoded.StoppableAllocator;
 
 public final class BuddyAllocator
-  implements EvictionAwareAllocator, BuddyAllocatorMXBean, LlapIoDebugDump {
+  implements EvictionAwareAllocator, StoppableAllocator, BuddyAllocatorMXBean, LlapIoDebugDump {
   private final Arena[] arenas;
   private final AtomicInteger allocatedArenas = new AtomicInteger(0);
 
@@ -224,16 +226,23 @@ public final class BuddyAllocator
     return (int)arenaSizeVal;
   }
 
+
+  @VisibleForTesting
   @Override
   public void allocateMultiple(MemoryBuffer[] dest, int size)
       throws AllocatorOutOfMemoryException {
-    allocateMultiple(dest, size, null);
+    allocateMultiple(dest, size, null, null);
   }
 
-  // TODO: would it make sense to return buffers asynchronously?
   @Override
   public void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory factory)
       throws AllocatorOutOfMemoryException {
+    allocateMultiple(dest, size, factory, null);
+  }
+
+  @Override
+  public void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory factory, AtomicBoolean isStopped)
+      throws AllocatorOutOfMemoryException {
     assert size > 0 : "size is " + size;
     if (size > maxAllocation) {
       throw new RuntimeException("Trying to allocate " + size + "; max is " + maxAllocation);
@@ -243,7 +252,7 @@ public final class BuddyAllocator
     int allocationSize = 1 << allocLog2;
 
     // If using async, we could also reserve one by one.
-    memoryManager.reserveMemory(dest.length << allocLog2);
+    memoryManager.reserveMemory(dest.length << allocLog2, isStopped);
     for (int i = 0; i < dest.length; ++i) {
       if (dest[i] != null) continue;
       // Note: this is backward compat only. Should be removed with createUnallocated.

http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
index 4297cfc..c5b5bf2 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
@@ -18,14 +18,14 @@
 
 package org.apache.hadoop.hive.llap.cache;
 
-import com.google.common.annotations.VisibleForTesting;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Implementation of memory manager for low level cache. Note that memory is released during
  * reserve most of the time, by calling the evictor to evict some memory. releaseMemory is
@@ -49,21 +49,28 @@ public class LowLevelCacheMemoryManager implements MemoryManager {
     }
   }
 
+  public static class ReserveFailedException extends RuntimeException {
+    private static final long serialVersionUID = 1L;
+    public ReserveFailedException(AtomicBoolean isStopped) {
+      super("Cannot reserve memory"
+          + (Thread.currentThread().isInterrupted() ? "; thread interrupted" : "")
+          + ((isStopped != null && isStopped.get()) ? "; thread stopped" : ""));
+    }
+  }
 
   @Override
-  public void reserveMemory(final long memoryToReserve) {
-    boolean result = reserveMemory(memoryToReserve, true);
+  public void reserveMemory(final long memoryToReserve, AtomicBoolean isStopped) {
+    boolean result = reserveMemory(memoryToReserve, true, isStopped);
     if (result) return;
     // Can only happen if there's no evictor, or if thread is interrupted.
-    throw new RuntimeException("Cannot reserve memory"
-        + (Thread.currentThread().isInterrupted() ? "; thread interrupted" : ""));
+    throw new ReserveFailedException(isStopped);
   }
 
   @VisibleForTesting
-  public boolean reserveMemory(final long memoryToReserve, boolean waitForEviction) {
+  public boolean reserveMemory(final long memoryToReserve,
+      boolean waitForEviction, AtomicBoolean isStopped) {
     // TODO: if this cannot evict enough, it will spin infinitely. Terminate at some point?
     int badCallCount = 0;
-    int nextLog = 4;
     long evictedTotalMetric = 0, reservedTotalMetric = 0, remainingToReserve = memoryToReserve;
     boolean result = true;
     while (remainingToReserve > 0) {
@@ -79,21 +86,23 @@ public class LowLevelCacheMemoryManager implements MemoryManager {
       // TODO: for one-block case, we could move notification for the last block out of the loop.
       long evicted = evictor.evictSomeBlocks(remainingToReserve);
       if (evicted == 0) {
+        ++badCallCount;
         if (!waitForEviction) {
           result = false;
           break;
         }
-        ++badCallCount;
-        if (badCallCount == nextLog) {
-          LlapIoImpl.LOG.warn("Cannot evict blocks for " + badCallCount + " calls; cache full?");
-          nextLog <<= 1;
-          try {
-            Thread.sleep(Math.min(1000, nextLog));
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            result = false;
-            break;
-          }
+
+        if (isStopped != null && isStopped.get()) {
+          result = false;
+          break;
+        }
+        try {
+          Thread.sleep(badCallCount > 9 ? 1000 : (1 << badCallCount));
+        } catch (InterruptedException e) {
+          LlapIoImpl.LOG.warn("Thread interrupted"); // We currently don't expect this.
+          Thread.currentThread().interrupt();
+          result = false;
+          break;
         }
         continue;
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
index 542041d..fedade5 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
@@ -18,8 +18,10 @@
 
 package org.apache.hadoop.hive.llap.cache;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 public interface MemoryManager {
   void releaseMemory(long memUsage);
   void updateMaxSize(long maxSize);
-  void reserveMemory(long memoryToReserve);
+  void reserveMemory(long memoryToReserve, AtomicBoolean isStopped);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index b76b0de..e8a3b40 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -23,6 +23,7 @@ import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -173,7 +174,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
    * Contains only stripes that are read, and only columns included. null => read all RGs.
    */
   private boolean[][] stripeRgs;
-  private volatile boolean isStopped = false;
+  private AtomicBoolean isStopped = new AtomicBoolean(false);
   @SuppressWarnings("unused")
   private volatile boolean isPaused = false;
 
@@ -240,7 +241,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
   @Override
   public void stop() {
     LOG.debug("Encoded reader is being stopped");
-    isStopped = true;
+    isStopped.set(true);
   }
 
   @Override
@@ -436,6 +437,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     stripeReader = orcReader.encodedReader(
         fileKey, dw, dw, useObjectPools ? POOL_FACTORY : null, trace, useCodecPool, cacheTag);
     stripeReader.setTracing(LlapIoImpl.ORC_LOGGER.isTraceEnabled());
+    stripeReader.setStopped(isStopped);
   }
 
   private void recordReaderTime(long startTime) {
@@ -454,7 +456,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
   }
 
   private boolean processStop() {
-    if (!isStopped) return false;
+    if (!isStopped.get()) return false;
     LOG.info("Encoded data reader is stopping");
     tracePool.offer(trace);
     cleanupReaders();
@@ -584,7 +586,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     ensureOrcReader();
     ByteBuffer tailBufferBb = orcReader.getSerializedFileFooter();
     if (hasCache) {
-      tailBuffers = metadataCache.putFileMetadata(fileKey, tailBufferBb, cacheTag);
+      tailBuffers = metadataCache.putFileMetadata(fileKey, tailBufferBb, cacheTag, isStopped);
       metadataCache.decRefBuffer(tailBuffers); // We don't use the cache's copy of the buffer.
     }
     FileTail ft = orcReader.getFileTail();
@@ -677,7 +679,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     assert footerRange.next == null; // Can only happens w/zcr for a single input buffer.
     if (hasCache) {
       LlapBufferOrBuffers cacheBuf = metadataCache.putStripeTail(
-          stripeKey, footerRange.getData().duplicate(), cacheTag);
+          stripeKey, footerRange.getData().duplicate(), cacheTag, isStopped);
       metadataCache.decRefBuffer(cacheBuf); // We don't use this one.
     }
     ByteBuffer bb = footerRange.getData().duplicate();
@@ -918,7 +920,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
         return lowLevelCache.putFileData(
             fileKey, ranges, data, baseOffset, Priority.NORMAL, counters, tag);
       } else if (metadataCache != null) {
-        metadataCache.putIncompleteCbs(fileKey, ranges, baseOffset);
+        metadataCache.putIncompleteCbs(fileKey, ranges, baseOffset, isStopped);
       }
       return null;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
index 5b54af5..2576175 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -68,6 +69,7 @@ import org.apache.hadoop.hive.ql.io.orc.Reader;
 import org.apache.hadoop.hive.ql.io.orc.Writer;
 import org.apache.hadoop.hive.ql.io.orc.encoded.CacheChunk;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
+import org.apache.hadoop.hive.ql.io.orc.encoded.StoppableAllocator;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.SerDeException;
@@ -150,7 +152,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
   private final String cacheTag;
   private final FileSystem fs;
 
-  private volatile boolean isStopped = false;
+  private AtomicBoolean isStopped = new AtomicBoolean(false);
   private final Deserializer sourceSerDe;
   private final InputFormat<?, ?> sourceInputFormat;
   private final Reporter reporter;
@@ -245,7 +247,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
   @Override
   public void stop() {
     LlapIoImpl.LOG.debug("Encoded reader is being stopped");
-    isStopped = true;
+    isStopped.set(true);
   }
 
   @Override
@@ -344,16 +346,18 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     private final Map<StreamName, OutputReceiver> streams = new HashMap<>();
     private final Map<Integer, List<CacheOutputReceiver>> colStreams = new HashMap<>();
     private final boolean doesSourceHaveIncludes;
+    private final AtomicBoolean isStopped;
 
     public CacheWriter(BufferUsageManager bufferManager, List<Integer> columnIds,
         boolean[] writerIncludes, boolean doesSourceHaveIncludes,
-        Allocator.BufferObjectFactory bufferFactory) {
+        Allocator.BufferObjectFactory bufferFactory, AtomicBoolean isStopped) {
       this.bufferManager = bufferManager;
       assert writerIncludes != null; // Taken care of on higher level.
       this.writerIncludes = writerIncludes;
       this.doesSourceHaveIncludes = doesSourceHaveIncludes;
       this.columnIds = columnIds;
       this.bufferFactory = bufferFactory;
+      this.isStopped = isStopped;
       startStripe();
     }
 
@@ -440,7 +444,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
         if (LlapIoImpl.LOG.isTraceEnabled()) {
           LlapIoImpl.LOG.trace("Creating cache receiver for " + name);
         }
-        CacheOutputReceiver cor = new CacheOutputReceiver(bufferManager, bufferFactory, name);
+        CacheOutputReceiver cor = new CacheOutputReceiver(bufferManager, name, bufferFactory, isStopped);
         or = cor;
         List<CacheOutputReceiver> list = colStreams.get(name.getColumn());
         if (list == null) {
@@ -597,12 +601,17 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     private List<MemoryBuffer> buffers = null;
     private int lastBufferPos = -1;
     private boolean suppressed = false;
+    private final AtomicBoolean isStopped;
+    private final StoppableAllocator allocator;
 
     public CacheOutputReceiver(BufferUsageManager bufferManager,
-        BufferObjectFactory bufferFactory, StreamName name) {
+        StreamName name, BufferObjectFactory bufferFactory, AtomicBoolean isStopped) {
       this.bufferManager = bufferManager;
       this.bufferFactory = bufferFactory;
+      Allocator alloc = bufferManager.getAllocator();
+      this.allocator = alloc instanceof StoppableAllocator ? (StoppableAllocator) alloc : null;
       this.name = name;
+      this.isStopped = isStopped;
     }
 
     public void clear() {
@@ -617,6 +626,15 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
       lastBufferPos = -1;
     }
 
+    private void allocateMultiple(MemoryBuffer[] dest, int size) {
+      if (allocator != null) {
+        allocator.allocateMultiple(dest, size, bufferFactory, isStopped);
+      } else {
+        bufferManager.getAllocator().allocateMultiple(dest, size, bufferFactory);
+      }
+    }
+
+
     @Override
     public void output(ByteBuffer buffer) throws IOException {
       // TODO: avoid put() by working directly in OutStream?
@@ -640,7 +658,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
       boolean isNewBuffer = (lastBufferPos == -1);
       if (isNewBuffer) {
         MemoryBuffer[] dest = new MemoryBuffer[1];
-        bufferManager.getAllocator().allocateMultiple(dest, size, bufferFactory);
+        allocateMultiple(dest, size);
         LlapSerDeDataBuffer newBuffer = (LlapSerDeDataBuffer)dest[0];
         bb = newBuffer.getByteBufferRaw();
         lastBufferPos = bb.position();
@@ -1417,7 +1435,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
       // TODO: move this into ctor? EW would need to create CacheWriter then
       List<Integer> cwColIds = writer.isOnlyWritingIncludedColumns() ? splitColumnIds : columnIds;
       writer.init(new CacheWriter(bufferManager, cwColIds, splitIncludes,
-          writer.isOnlyWritingIncludedColumns(), bufferFactory), daemonConf, split.getPath());
+          writer.isOnlyWritingIncludedColumns(), bufferFactory, isStopped), daemonConf, split.getPath());
       if (writer instanceof VectorDeserializeOrcWriter) {
         VectorDeserializeOrcWriter asyncWriter = (VectorDeserializeOrcWriter)writer;
         asyncWriter.startAsync(new AsyncCacheDataCallback());
@@ -1673,7 +1691,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
   }
 
   private boolean processStop() {
-    if (!isStopped) return false;
+    if (!isStopped.get()) return false;
     LlapIoImpl.LOG.info("SerDe-based data reader is stopping");
     cleanup(true);
     return true;

http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java
index 426d599..2b3bca6 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java
@@ -28,11 +28,13 @@ import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers;
 
 import java.nio.ByteBuffer;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.hive.common.io.DiskRange;
 import org.apache.hadoop.hive.common.io.DiskRangeList;
 import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
 import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+import org.apache.hadoop.hive.llap.cache.BuddyAllocator;
 import org.apache.hadoop.hive.llap.cache.EvictionAwareAllocator;
 import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
 import org.apache.hadoop.hive.llap.cache.LlapAllocatorBuffer;
@@ -43,6 +45,7 @@ import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
 import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey;
+import org.apache.hadoop.hive.ql.io.orc.encoded.StoppableAllocator;
 
 public class MetadataCache implements LlapIoDebugDump, FileMetadataCache {
   private final ConcurrentHashMap<Object, LlapBufferOrBuffers> metadata =
@@ -51,10 +54,10 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache {
   private final ConcurrentHashMap<Object, OrcFileEstimateErrors> estimateErrors;
   private final MemoryManager memoryManager;
   private final LowLevelCachePolicy policy;
-  private final EvictionAwareAllocator allocator;
+  private final BuddyAllocator allocator;
   private final LlapDaemonCacheMetrics metrics;
 
-  public MetadataCache(EvictionAwareAllocator allocator, MemoryManager memoryManager,
+  public MetadataCache(BuddyAllocator allocator, MemoryManager memoryManager,
       LowLevelCachePolicy policy, boolean useEstimateCache, LlapDaemonCacheMetrics metrics) {
     this.memoryManager = memoryManager;
     this.allocator = allocator;
@@ -64,7 +67,7 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache {
         ? new ConcurrentHashMap<Object, OrcFileEstimateErrors>() : null;
   }
 
-  public void putIncompleteCbs(Object fileKey, DiskRange[] ranges, long baseOffset) {
+  public void putIncompleteCbs(Object fileKey, DiskRange[] ranges, long baseOffset, AtomicBoolean isStopped) {
     if (estimateErrors == null) return;
     OrcFileEstimateErrors errorData = estimateErrors.get(fileKey);
     boolean isNew = false;
@@ -76,7 +79,7 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache {
         errorData.addError(range.getOffset(), range.getLength(), baseOffset);
       }
       long memUsage = errorData.estimateMemoryUsage();
-      memoryManager.reserveMemory(memUsage);
+      memoryManager.reserveMemory(memUsage, isStopped);
       OrcFileEstimateErrors old = estimateErrors.putIfAbsent(fileKey, errorData);
       if (old != null) {
         errorData = old;
@@ -150,34 +153,49 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache {
   }
 
   @Override
-  public LlapBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer) {
-    return putInternal(fileKey, tailBuffer, null);
+  public MemoryBufferOrBuffers putFileMetadata(Object fileKey,
+      ByteBuffer tailBuffer) {
+    return putInternal(fileKey, tailBuffer, null, null);
   }
 
   @Override
-  public LlapBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer, String tag) {
-    return putInternal(fileKey, tailBuffer, tag);
+  public MemoryBufferOrBuffers putFileMetadata(Object fileKey,
+      ByteBuffer tailBuffer, String tag) {
+    return putInternal(fileKey, tailBuffer, tag, null);
+  }
+
+  @Override
+  public MemoryBufferOrBuffers putFileMetadata(Object fileKey, int length,
+      InputStream is) throws IOException {
+    return putFileMetadata(fileKey, length, is, null, null);
   }
 
   public LlapBufferOrBuffers putStripeTail(
-      OrcBatchKey stripeKey, ByteBuffer tailBuffer, String tag) {
-    return putInternal(new StripeKey(stripeKey.fileKey, stripeKey.stripeIx), tailBuffer, tag);
+      OrcBatchKey stripeKey, ByteBuffer tailBuffer, String tag, AtomicBoolean isStopped) {
+    return putInternal(new StripeKey(stripeKey.fileKey, stripeKey.stripeIx), tailBuffer, tag, isStopped);
   }
 
   @Override
-  public LlapBufferOrBuffers putFileMetadata(
-      Object fileKey, int length, InputStream is) throws IOException {
-    return putFileMetadata(fileKey, length, is, null);
+  public MemoryBufferOrBuffers putFileMetadata(Object fileKey, int length,
+      InputStream is, String tag) throws IOException {
+    return putFileMetadata(fileKey, length, is, tag, null);
+  }
+
+
+  @Override
+  public LlapBufferOrBuffers putFileMetadata(Object fileKey,
+      ByteBuffer tailBuffer, String tag, AtomicBoolean isStopped) {
+    return putInternal(fileKey, tailBuffer, tag, isStopped);
   }
 
   @Override
-  public LlapBufferOrBuffers putFileMetadata(
-      Object fileKey, int length, InputStream is, String tag) throws IOException {
+  public LlapBufferOrBuffers putFileMetadata(Object fileKey, int length, InputStream is,
+      String tag, AtomicBoolean isStopped) throws IOException {
     LlapBufferOrBuffers result = null;
     while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value).
       LlapBufferOrBuffers oldVal = metadata.get(fileKey);
       if (oldVal == null) {
-        result = wrapBbForFile(result, fileKey, length, is, tag);
+        result = wrapBbForFile(result, fileKey, length, is, tag, isStopped);
         if (!lockBuffer(result, false)) {
           throw new AssertionError("Cannot lock a newly created value " + result);
         }
@@ -198,7 +216,7 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache {
 
   @SuppressWarnings({ "rawtypes", "unchecked" })
   private LlapBufferOrBuffers wrapBbForFile(LlapBufferOrBuffers result,
-      Object fileKey, int length, InputStream stream, String tag) throws IOException {
+      Object fileKey, int length, InputStream stream, String tag, AtomicBoolean isStopped) throws IOException {
     if (result != null) return result;
     int maxAlloc = allocator.getMaxAllocation();
     LlapMetadataBuffer<Object>[] largeBuffers = null;
@@ -207,7 +225,7 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache {
       for (int i = 0; i < largeBuffers.length; ++i) {
         largeBuffers[i] = new LlapMetadataBuffer<Object>(fileKey, tag);
       }
-      allocator.allocateMultiple(largeBuffers, maxAlloc, null);
+      allocator.allocateMultiple(largeBuffers, maxAlloc, null, isStopped);
       for (int i = 0; i < largeBuffers.length; ++i) {
         readIntoCacheBuffer(stream, maxAlloc, largeBuffers[i]);
       }
@@ -218,7 +236,7 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache {
     } else {
       LlapMetadataBuffer<Object>[] smallBuffer = new LlapMetadataBuffer[1];
       smallBuffer[0] = new LlapMetadataBuffer(fileKey, tag);
-      allocator.allocateMultiple(smallBuffer, length, null);
+      allocator.allocateMultiple(smallBuffer, length, null, isStopped);
       readIntoCacheBuffer(stream, smallSize, smallBuffer[0]);
       if (largeBuffers == null) {
         return smallBuffer[0]; // This is the overwhelmingly common case.
@@ -243,12 +261,12 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache {
     bb.position(pos);
   }
 
-  private <T> LlapBufferOrBuffers putInternal(T key, ByteBuffer tailBuffer, String tag) {
+  private <T> LlapBufferOrBuffers putInternal(T key, ByteBuffer tailBuffer, String tag, AtomicBoolean isStopped) {
     LlapBufferOrBuffers result = null;
     while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value).
       LlapBufferOrBuffers oldVal = metadata.get(key);
       if (oldVal == null) {
-        result = wrapBb(result, key, tailBuffer, tag);
+        result = wrapBb(result, key, tailBuffer, tag, isStopped);
         oldVal = metadata.putIfAbsent(key, result);
         if (oldVal == null) {
           cacheInPolicy(result); // Cached successfully, add to policy.
@@ -306,11 +324,11 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache {
   }
 
   private <T> LlapBufferOrBuffers wrapBb(
-      LlapBufferOrBuffers result, T key, ByteBuffer tailBuffer, String tag) {
+      LlapBufferOrBuffers result, T key, ByteBuffer tailBuffer, String tag, AtomicBoolean isStopped) {
     if (result != null) return result;
     if (tailBuffer.remaining() <= allocator.getMaxAllocation()) {
       // The common case by far.
-      return wrapSmallBb(new LlapMetadataBuffer<T>(key, tag), tailBuffer);
+      return wrapSmallBb(new LlapMetadataBuffer<T>(key, tag), tailBuffer, isStopped);
     } else {
       int allocCount = determineAllocCount(tailBuffer);
       @SuppressWarnings("unchecked")
@@ -318,22 +336,24 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache {
       for (int i = 0; i < allocCount; ++i) {
         results[i] = new LlapMetadataBuffer<T>(key, tag);
       }
-      wrapLargeBb(results, tailBuffer);
+      wrapLargeBb(results, tailBuffer, isStopped);
       return new LlapMetadataBuffers<T>(results);
     }
   }
 
-  private <T extends LlapAllocatorBuffer> T wrapSmallBb(T result, ByteBuffer tailBuffer) {
+  private <T extends LlapAllocatorBuffer> T wrapSmallBb(T result, ByteBuffer tailBuffer,
+      AtomicBoolean isStopped) {
     // Note: we pass in null factory because we allocate objects here. We could also pass a
     //       per-call factory that would set fileKey; or set it after put.
-    allocator.allocateMultiple(new MemoryBuffer[] { result }, tailBuffer.remaining(), null);
+    allocator.allocateMultiple(new MemoryBuffer[] { result }, tailBuffer.remaining(), null, isStopped);
     return putBufferToDest(tailBuffer.duplicate(), result);
   }
 
-  private <T extends LlapAllocatorBuffer> void wrapLargeBb(T[] results, ByteBuffer tailBuffer) {
+  private <T extends LlapAllocatorBuffer> void wrapLargeBb(T[] results, ByteBuffer tailBuffer,
+      AtomicBoolean isStopped) {
     // Note: we pass in null factory because we allocate objects here. We could also pass a
     //       per-call factory that would set fileKey; or set it after put.
-    allocator.allocateMultiple(results, allocator.getMaxAllocation(), null);
+    allocator.allocateMultiple(results, allocator.getMaxAllocation(), null, isStopped);
     ByteBuffer src = tailBuffer.duplicate();
     int pos = src.position(), remaining = src.remaining();
     for (int i = 0; i < results.length; ++i) {

http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
index 1e6f3ac..b3179c0 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
@@ -27,6 +27,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.FutureTask;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.hive.common.io.Allocator.AllocatorOutOfMemoryException;
 import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
@@ -58,7 +59,7 @@ public class TestBuddyAllocator {
 
   static class DummyMemoryManager implements MemoryManager {
     @Override
-    public void reserveMemory(long memoryToReserve) {
+    public void reserveMemory(long memoryToReserve, AtomicBoolean isStopped) {
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
index 6eb2eb5..923042d 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
@@ -85,7 +85,7 @@ public class TestLowLevelLrfuCachePolicy {
       listLock.unlock();
     }
     // Now try to evict with locked buffer still in the list.
-    mm.reserveMemory(1, false);
+    mm.reserveMemory(1, false, null);
     assertSame(buffer2, et.evicted.get(0));
     unlock(lrfu, buffer1);
   }
@@ -237,7 +237,7 @@ public class TestLowLevelLrfuCachePolicy {
     // Lock the lowest priority buffer; try to evict - we'll evict some other buffer.
     LlapDataBuffer locked = inserted.get(0);
     lock(lrfu, locked);
-    mm.reserveMemory(1, false);
+    mm.reserveMemory(1, false, null);
     LlapDataBuffer evicted = et.evicted.get(0);
     assertNotNull(evicted);
     assertTrue(evicted.isInvalid());
@@ -248,7 +248,7 @@ public class TestLowLevelLrfuCachePolicy {
   // Buffers in test are fakes not linked to cache; notify cache policy explicitly.
   public boolean cache(LowLevelCacheMemoryManager mm,
       LowLevelLrfuCachePolicy lrfu, EvictionTracker et, LlapDataBuffer buffer) {
-    if (mm != null && !mm.reserveMemory(1, false)) {
+    if (mm != null && !mm.reserveMemory(1, false, null)) {
       return false;
     }
     buffer.incRef();
@@ -337,7 +337,7 @@ public class TestLowLevelLrfuCachePolicy {
       lock(lrfu, buf);
     }
     assertEquals(heapSize, m.cacheUsed.get());
-    assertFalse(mm.reserveMemory(1, false));
+    assertFalse(mm.reserveMemory(1, false, null));
     if (!et.evicted.isEmpty()) {
       assertTrue("Got " + et.evicted.get(0), et.evicted.isEmpty());
     }
@@ -362,13 +362,13 @@ public class TestLowLevelLrfuCachePolicy {
     // Evict all blocks.
     et.evicted.clear();
     for (int i = 0; i < inserted.size(); ++i) {
-      assertTrue(mm.reserveMemory(1, false));
+      assertTrue(mm.reserveMemory(1, false, null));
       if (cacheUsed != null) {
         assertEquals(inserted.size(), cacheUsed.get());
       }
     }
     // The map should now be empty.
-    assertFalse(mm.reserveMemory(1, false));
+    assertFalse(mm.reserveMemory(1, false, null));
     if (cacheUsed != null) {
       assertEquals(inserted.size(), cacheUsed.get());
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
index df20f20..aa9d6ed 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
@@ -21,10 +21,14 @@ import static org.junit.Assert.*;
 
 import java.nio.ByteBuffer;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.hive.common.io.DataCache;
 import org.apache.hadoop.hive.common.io.DiskRange;
 import org.apache.hadoop.hive.common.io.DiskRangeList;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
 import org.apache.hadoop.hive.llap.io.metadata.MetadataCache;
 import org.apache.hadoop.hive.llap.io.metadata.MetadataCache.LlapBufferOrBuffers;
@@ -75,8 +79,11 @@ public class TestOrcMetadataCache {
   }
 
   private static class DummyMemoryManager implements MemoryManager {
+    private int allocs;
+
     @Override
-    public void reserveMemory(long memoryToReserve) {
+    public void reserveMemory(long memoryToReserve, AtomicBoolean isStopped) {
+      ++allocs;
     }
 
     @Override
@@ -102,11 +109,11 @@ public class TestOrcMetadataCache {
 
     ByteBuffer smallBuffer = ByteBuffer.allocate(MAX_ALLOC - 1);
     rdm.nextBytes(smallBuffer.array());
-    LlapBufferOrBuffers result = cache.putFileMetadata(fileKey1, smallBuffer);
+    LlapBufferOrBuffers result = cache.putFileMetadata(fileKey1, smallBuffer, null, null);
     cache.decRefBuffer(result);
     ByteBuffer cacheBuf = result.getSingleBuffer().getByteBufferDup();
     assertEquals(smallBuffer, cacheBuf);
-    result = cache.putFileMetadata(fileKey1, smallBuffer);
+    result = cache.putFileMetadata(fileKey1, smallBuffer, null, null);
     cache.decRefBuffer(result);
     cacheBuf = result.getSingleBuffer().getByteBufferDup();
     assertEquals(smallBuffer, cacheBuf);
@@ -120,7 +127,7 @@ public class TestOrcMetadataCache {
 
     ByteBuffer largeBuffer = ByteBuffer.allocate((int)(MAX_ALLOC * 2.5));
     rdm.nextBytes(largeBuffer.array());
-    result = cache.putFileMetadata(fileKey1, largeBuffer);
+    result = cache.putFileMetadata(fileKey1, largeBuffer, null, null);
     cache.decRefBuffer(result);
     assertNull(result.getSingleBuffer());
     assertEquals(largeBuffer, extractResultBbs(result));
@@ -162,13 +169,13 @@ public class TestOrcMetadataCache {
     Object fileKey1 = new Object();
 
     // Note: incomplete CBs are always an exact match.
-    cache.putIncompleteCbs(fileKey1, new DiskRange[] { new DiskRangeList(0, 3) }, 0);
+    cache.putIncompleteCbs(fileKey1, new DiskRange[] { new DiskRangeList(0, 3) }, 0, null);
     cp.verifyEquals(1);
     DiskRangeList result = cache.getIncompleteCbs(
         fileKey1, new DiskRangeList(0, 3), 0, gotAllData);
     assertTrue(gotAllData.value);
     verifyResult(result, INCOMPLETE, 0, 3);
-    cache.putIncompleteCbs(fileKey1, new DiskRange[] { new DiskRangeList(5, 6) }, 0);
+    cache.putIncompleteCbs(fileKey1, new DiskRange[] { new DiskRangeList(5, 6) }, 0, null);
     cp.verifyEquals(3);
     DiskRangeList ranges = new DiskRangeList(0, 3);
     ranges.insertAfter(new DiskRangeList(4, 6));

http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
index dcb24b80..8370aa6 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
@@ -284,6 +284,7 @@ public class LlapCacheAwareFs extends FileSystem {
             int extraOffsetInChunk = 0;
             if (maxAlloc < chunkLength) {
               largeBuffers = new MemoryBuffer[largeBufCount];
+              // Note: we don't use StoppableAllocator here - this is not on an IO thread.
               allocator.allocateMultiple(largeBuffers, maxAlloc, cache.getDataBufferFactory());
               for (int i = 0; i < largeBuffers.length; ++i) {
                 // By definition here we copy up to the limit of the buffer.
@@ -301,6 +302,7 @@ public class LlapCacheAwareFs extends FileSystem {
             largeBuffers = null;
             if (smallSize > 0) {
               smallBuffer = new MemoryBuffer[1];
+              // Note: we don't use StoppableAllocator here - this is not on an IO thread.
               allocator.allocateMultiple(smallBuffer, smallSize, cache.getDataBufferFactory());
               ByteBuffer bb = smallBuffer[0].getByteBufferRaw();
               copyDiskDataToCacheBuffer(array,

http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java
index f6b949e..f3699f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.io.orc.encoded;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.orc.StripeInformation;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
@@ -68,4 +69,6 @@ public interface EncodedReader {
   void readIndexStreams(OrcIndex index, StripeInformation stripe,
       List<OrcProto.Stream> streams, boolean[] included, boolean[] sargColumns)
           throws IOException;
+
+  void setStopped(AtomicBoolean isStopped);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 63d1387..1b11e0e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -25,11 +25,13 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.IdentityHashMap;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.common.Pool;
 import org.apache.hadoop.hive.common.Pool.PoolObjectHelper;
+import org.apache.hadoop.hive.common.io.Allocator;
 import org.apache.hadoop.hive.common.io.DataCache;
 import org.apache.hadoop.hive.common.io.DiskRange;
 import org.apache.hadoop.hive.common.io.DiskRangeList;
@@ -146,6 +148,8 @@ class EncodedReaderImpl implements EncodedReader {
   private final TypeDescription fileSchema;
   private final WriterVersion version;
   private final String tag;
+  private AtomicBoolean isStopped;
+  private StoppableAllocator allocator;
 
   public EncodedReaderImpl(Object fileKey, List<OrcProto.Type> types,
       TypeDescription fileSchema, org.apache.orc.CompressionKind kind, WriterVersion version,
@@ -162,6 +166,8 @@ class EncodedReaderImpl implements EncodedReader {
     this.bufferSize = bufferSize;
     this.rowIndexStride = strideRate;
     this.cacheWrapper = cacheWrapper;
+    Allocator alloc = cacheWrapper.getAllocator();
+    this.allocator = alloc instanceof StoppableAllocator ? (StoppableAllocator) alloc : null;
     this.dataReader = dataReader;
     this.trace = trace;
     this.tag = tag;
@@ -897,8 +903,7 @@ class EncodedReaderImpl implements EncodedReader {
     }
     boolean isAllocated = false;
     try {
-      cacheWrapper.getAllocator().allocateMultiple(targetBuffers, bufferSize,
-          cacheWrapper.getDataBufferFactory());
+      allocateMultiple(targetBuffers, bufferSize);
       isAllocated = true;
     } finally {
       // toDecompress/targetBuffers contents are actually already added to some structures that
@@ -1206,8 +1211,7 @@ class EncodedReaderImpl implements EncodedReader {
       cacheKeys[ix] = chunk; // Relies on the fact that cache does not actually store these.
       ++ix;
     }
-    cacheWrapper.getAllocator().allocateMultiple(targetBuffers,
-        (int)(partCount == 1 ? streamLen : partSize), cacheWrapper.getDataBufferFactory());
+    allocateMultiple(targetBuffers, (int)(partCount == 1 ? streamLen : partSize));
 
     // 4. Now copy the data into cache buffers.
     ix = 0;
@@ -1260,8 +1264,7 @@ class EncodedReaderImpl implements EncodedReader {
     // non-cached. Since we are at the first gap, the previous stuff must be contiguous.
     singleAlloc[0] = null;
     trace.logPartialUncompressedData(partOffset, candidateEnd, true);
-    cacheWrapper.getAllocator().allocateMultiple(
-        singleAlloc, (int)(candidateEnd - partOffset), cacheWrapper.getDataBufferFactory());
+    allocateMultiple(singleAlloc, (int)(candidateEnd - partOffset));
     MemoryBuffer buffer = singleAlloc[0];
     cacheWrapper.reuseBuffer(buffer);
     ByteBuffer dest = buffer.getByteBufferRaw();
@@ -1270,12 +1273,19 @@ class EncodedReaderImpl implements EncodedReader {
     return tcc;
   }
 
+  private void allocateMultiple(MemoryBuffer[] dest, int size) {
+    if (allocator != null) {
+      allocator.allocateMultiple(dest, size, cacheWrapper.getDataBufferFactory(), isStopped);
+    } else {
+      cacheWrapper.getAllocator().allocateMultiple(dest, size, cacheWrapper.getDataBufferFactory());
+    }
+  }
+
   private CacheChunk copyAndReplaceUncompressedToNonCached(
       BufferChunk bc, DataCache cacheWrapper, MemoryBuffer[] singleAlloc) {
     singleAlloc[0] = null;
     trace.logPartialUncompressedData(bc.getOffset(), bc.getEnd(), false);
-    cacheWrapper.getAllocator().allocateMultiple(
-        singleAlloc, bc.getLength(), cacheWrapper.getDataBufferFactory());
+    allocateMultiple(singleAlloc, bc.getLength());
     MemoryBuffer buffer = singleAlloc[0];
     cacheWrapper.reuseBuffer(buffer);
     ByteBuffer dest = buffer.getByteBufferRaw();
@@ -2152,4 +2162,9 @@ class EncodedReaderImpl implements EncodedReader {
         return false;
     }
   }
+
+  @Override
+  public void setStopped(AtomicBoolean isStopped) {
+    this.isStopped = isStopped;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java
new file mode 100644
index 0000000..0806d78
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc.encoded;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.hive.common.io.Allocator;
+import org.apache.hadoop.hive.common.io.Allocator.BufferObjectFactory;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+
+public interface StoppableAllocator extends Allocator {
+  /** Stoppable allocate method specific to branch-2. */
+  void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory factory,
+      AtomicBoolean isStopped) throws AllocatorOutOfMemoryException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index f64efe2..8c49056 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -253,7 +253,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
     colsToInclude = ColumnProjectionUtils.getReadColumnIDs(configuration);
     requestedSchema = DataWritableReadSupport
       .getRequestedSchema(indexAccess, columnNamesList, columnTypesList, fileSchema, configuration);
- 
+
     Path path = wrapPathForCache(file, cacheKey, configuration, blocks, cacheTag);
     this.reader = new ParquetFileReader(
       configuration, footer.getFileMetaData(), path, blocks, requestedSchema.getColumns());
@@ -317,7 +317,8 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
       if (LOG.isInfoEnabled()) {
         LOG.info("Caching the footer of length " + footerLength + " for " + cacheKey);
       }
-      footerData = metadataCache.putFileMetadata(cacheKey, footerLength, stream, tag);
+      // Note: we don't pass in isStopped here - this is not on an IO thread.
+      footerData = metadataCache.putFileMetadata(cacheKey, footerLength, stream, tag, null);
       try {
         return ParquetFileReader.readFooter(new ParquetFooterInputFromCache(footerData), filter);
       } finally {

http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java b/storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java
index d1da7f5..e4aa888 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.common.io;
 
 
 import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.io.IOException;
 import java.io.InputStream;
 
@@ -39,6 +40,13 @@ public interface FileMetadataCache {
   @Deprecated
   MemoryBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer);
 
+  @Deprecated
+  MemoryBufferOrBuffers putFileMetadata(
+      Object fileKey, int length, InputStream is, String tag) throws IOException;
+
+  @Deprecated
+  MemoryBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer, String tag);
+
   /**
    * Releases the buffer returned from getFileMetadata or putFileMetadata method.
    * @param buffer The buffer to release.
@@ -54,8 +62,9 @@ public interface FileMetadataCache {
    * @return The buffer or buffers representing the cached footer.
    *         The caller must decref this buffer when done.
    */
-  MemoryBufferOrBuffers putFileMetadata(
-      Object fileKey, int length, InputStream is, String tag) throws IOException;
+  MemoryBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer,
+      String tag, AtomicBoolean isStopped);
 
-  MemoryBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer, String tag);
-} 
\ No newline at end of file
+  MemoryBufferOrBuffers putFileMetadata(Object fileKey, int length,
+      InputStream is, String tag, AtomicBoolean isStopped) throws IOException;
+}