You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/01/10 02:40:34 UTC

hive git commit: HIVE-18269 : LLAP: Fast llap io with slow processing pipeline can lead to OOM (Sergey Shelukhin, reviewed by Jason Dere and Prasanth Jayachandran)

Repository: hive
Updated Branches:
  refs/heads/master cc50d62a0 -> e29aea6fa


HIVE-18269 : LLAP: Fast llap io with slow processing pipeline can lead to OOM (Sergey Shelukhin, reviewed by Jason Dere and Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e29aea6f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e29aea6f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e29aea6f

Branch: refs/heads/master
Commit: e29aea6fa20c1096ac9449ca04669f927a8a7101
Parents: cc50d62
Author: sergey <se...@apache.org>
Authored: Tue Jan 9 17:13:19 2018 -0800
Committer: sergey <se...@apache.org>
Committed: Tue Jan 9 18:40:21 2018 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   7 +
 .../hive/llap/io/api/impl/LlapInputFormat.java  |   7 +-
 .../hive/llap/io/api/impl/LlapIoImpl.java       |   2 +-
 .../hive/llap/io/api/impl/LlapRecordReader.java | 214 ++++++++++++-------
 .../llap/io/decode/EncodedDataConsumer.java     |  10 +-
 .../llap/io/decode/OrcColumnVectorProducer.java |   4 +
 .../llap/io/decode/OrcEncodedDataConsumer.java  |   2 +-
 .../llap/io/encoded/OrcEncodedDataReader.java   |   4 +-
 .../llap/io/encoded/SerDeEncodedDataReader.java |  14 +-
 .../hadoop/hive/ql/io/orc/encoded/Consumer.java |  10 +-
 .../ql/io/orc/encoded/EncodedReaderImpl.java    |  51 +++--
 11 files changed, 206 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/e29aea6f/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 6529da6..1a2c22d 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3185,6 +3185,13 @@ public class HiveConf extends Configuration {
         "MR LineRecordRedader into LLAP cache, if this feature is enabled. Safety flag."),
     LLAP_ORC_ENABLE_TIME_COUNTERS("hive.llap.io.orc.time.counters", true,
         "Whether to enable time counters for LLAP IO layer (time spent in HDFS, etc.)"),
+    LLAP_IO_VRB_QUEUE_LIMIT_BASE("hive.llap.io.vrb.queue.limit.base", 10000,
+        "The default queue size for VRBs produced by a LLAP IO thread when the processing is\n" +
+        "slower than the IO. The actual queue size is set per fragment, and is adjusted down\n" +
+        "from the base, depending on the schema."),
+    LLAP_IO_VRB_QUEUE_LIMIT_MIN("hive.llap.io.vrb.queue.limit.min", 10,
+        "The minimum queue size for VRBs produced by a LLAP IO thread when the processing is\n" +
+        "slower than the IO (used when determining the size from base size)."),
     LLAP_AUTO_ALLOW_UBER("hive.llap.auto.allow.uber", false,
         "Whether or not to allow the planner to run vertices in the AM."),
     LLAP_AUTO_ENFORCE_TREE("hive.llap.auto.enforce.tree", true,

http://git-wip-us.apache.org/repos/asf/hive/blob/e29aea6f/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
index 4dc107a..d51d3fc 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
@@ -74,11 +74,14 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB
   final ExecutorService executor;
   private final String hostName;
 
+  private final Configuration daemonConf;
+
   @SuppressWarnings({ "rawtypes", "unchecked" })
   LlapInputFormat(InputFormat sourceInputFormat, Deserializer sourceSerDe,
-      ColumnVectorProducer cvp, ExecutorService executor) {
+      ColumnVectorProducer cvp, ExecutorService executor, Configuration daemonConf) {
     this.executor = executor;
     this.cvp = cvp;
+    this.daemonConf = daemonConf;
     this.sourceInputFormat = sourceInputFormat;
     this.sourceASC = (sourceInputFormat instanceof AvoidSplitCombination)
         ? (AvoidSplitCombination)sourceInputFormat : null;
@@ -100,7 +103,7 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB
       List<Integer> includedCols = ColumnProjectionUtils.isReadAllColumns(job)
           ? null : ColumnProjectionUtils.getReadColumnIDs(job);
       LlapRecordReader rr = LlapRecordReader.create(job, fileSplit, includedCols, hostName,
-          cvp, executor, sourceInputFormat, sourceSerDe, reporter);
+          cvp, executor, sourceInputFormat, sourceSerDe, reporter, daemonConf);
       if (rr == null) {
         // Reader-specific incompatibility like SMB or schema evolution.
         return sourceInputFormat.getRecordReader(split, job, reporter);

http://git-wip-us.apache.org/repos/asf/hive/blob/e29aea6f/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index 77c8ade..eba84c3 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -216,7 +216,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
       LOG.warn("LLAP encode is disabled; cannot use for " + sourceInputFormat.getClass());
       return null;
     }
-    return new LlapInputFormat(sourceInputFormat, sourceSerDe, cvp, executor);
+    return new LlapInputFormat(sourceInputFormat, sourceSerDe, cvp, executor, daemonConf);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/e29aea6f/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
index 52a9c23..8ec5d97 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
@@ -20,11 +20,14 @@ package org.apache.hadoop.hive.llap.io.api.impl;
 
 import java.util.ArrayList;
 import java.io.IOException;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.ConsumerFeedback;
@@ -50,6 +53,9 @@ import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputFormat;
@@ -68,6 +74,7 @@ import com.google.common.collect.Lists;
 class LlapRecordReader
     implements RecordReader<NullWritable, VectorizedRowBatch>, Consumer<ColumnVectorBatch> {
   private static final Logger LOG = LoggerFactory.getLogger(LlapRecordReader.class);
+  private static final Object DONE_OBJECT = new Object();
 
   private final FileSplit split;
   private List<Integer> columnIds;
@@ -76,15 +83,15 @@ class LlapRecordReader
   private final VectorizedRowBatchCtx rbCtx;
   private final Object[] partitionValues;
 
-  private final LinkedList<ColumnVectorBatch> pendingData = new LinkedList<ColumnVectorBatch>();
+  private final LinkedBlockingQueue<Object> queue;
+  private final AtomicReference<Throwable> pendingError = new AtomicReference<>(null);
+
+  /** Vector that is currently being processed by our user. */
   private ColumnVectorBatch lastCvb = null;
   private boolean isFirst = true;
   private int maxQueueSize = 0;
 
-  private Throwable pendingError = null;
-  /** Vector that is currently being processed by our user. */
-  private boolean isDone = false;
-  private final boolean isClosed = false;
+  private boolean isClosed = false, isInterrupted = false;
   private final ConsumerFeedback<ColumnVectorBatch> feedback;
   private final QueryFragmentCounters counters;
   private long firstReturnTime;
@@ -101,12 +108,13 @@ class LlapRecordReader
    */
   public static LlapRecordReader create(JobConf job, FileSplit split, List<Integer> includedCols,
       String hostName, ColumnVectorProducer cvp, ExecutorService executor,
-      InputFormat<?, ?> sourceInputFormat, Deserializer sourceSerDe, Reporter reporter)
+      InputFormat<?, ?> sourceInputFormat, Deserializer sourceSerDe, Reporter reporter,
+      Configuration daemonConf)
           throws IOException, HiveException {
     MapWork mapWork = findMapWork(job);
     if (mapWork == null) return null; // No compatible MapWork.
     LlapRecordReader rr = new LlapRecordReader(mapWork, job, split, includedCols, hostName,
-        cvp, executor, sourceInputFormat, sourceSerDe, reporter);
+        cvp, executor, sourceInputFormat, sourceSerDe, reporter, daemonConf);
     if (!rr.checkOrcSchemaEvolution()) {
       rr.close();
       return null;
@@ -117,10 +125,11 @@ class LlapRecordReader
   private LlapRecordReader(MapWork mapWork, JobConf job, FileSplit split,
       List<Integer> includedCols, String hostName, ColumnVectorProducer cvp,
       ExecutorService executor, InputFormat<?, ?> sourceInputFormat, Deserializer sourceSerDe,
-      Reporter reporter) throws IOException, HiveException {
+      Reporter reporter, Configuration daemonConf) throws IOException, HiveException {
     this.executor = executor;
     this.jobConf = job;
     this.split = split;
+
     this.sarg = ConvertAstToSearchArg.createFromConf(job);
     this.columnNames = ColumnProjectionUtils.getReadColumnNames(job);
     final String fragmentId = LlapTezUtils.getFragmentId(job);
@@ -139,6 +148,18 @@ class LlapRecordReader
     this.counters = new QueryFragmentCounters(job, taskCounters);
     this.counters.setDesc(QueryFragmentCounters.Desc.MACHINE, hostName);
 
+    VectorizedRowBatchCtx ctx = mapWork.getVectorizedRowBatchCtx();
+    rbCtx = ctx != null ? ctx : LlapInputFormat.createFakeVrbCtx(mapWork);
+
+    // Note: columnIds below makes additional changes for ACID. Don't use this var directly.
+    if (includedCols == null) {
+      // Assume including everything means the VRB will have everything.
+      includedCols = new ArrayList<>(rbCtx.getRowColumnTypeInfos().length);
+      for (int i = 0; i < rbCtx.getRowColumnTypeInfos().length; ++i) {
+        includedCols.add(i);
+      }
+    }
+
     isAcidScan = HiveConf.getBoolVar(jobConf, ConfVars.HIVE_ACID_TABLE_SCAN);
     TypeDescription schema = OrcInputFormat.getDesiredRowTypeDescr(
         job, isAcidScan, Integer.MAX_VALUE);
@@ -157,15 +178,12 @@ class LlapRecordReader
       this.columnCount = columnIds.size();
     }
 
-    VectorizedRowBatchCtx ctx = mapWork.getVectorizedRowBatchCtx();
-    rbCtx = ctx != null ? ctx : LlapInputFormat.createFakeVrbCtx(mapWork);
-    if (includedCols == null) {
-      // Assume including everything means the VRB will have everything.
-      includedCols = new ArrayList<>(rbCtx.getRowColumnTypeInfos().length);
-      for (int i = 0; i < rbCtx.getRowColumnTypeInfos().length; ++i) {
-        includedCols.add(i);
-      }
-    }
+    int queueLimitBase = getQueueVar(ConfVars.LLAP_IO_VRB_QUEUE_LIMIT_BASE, job, daemonConf);
+    int queueLimitMin =  getQueueVar(ConfVars.LLAP_IO_VRB_QUEUE_LIMIT_MIN, job, daemonConf);
+    int limit = determineQueueLimit(queueLimitBase, queueLimitMin, rbCtx.getRowColumnTypeInfos());
+    LOG.info("Queue limit for LlapRecordReader is " + limit);
+    this.queue = new LinkedBlockingQueue<>(limit);
+
 
     int partitionColumnCount = rbCtx.getPartitionColumnCount();
     if (partitionColumnCount > 0) {
@@ -181,6 +199,48 @@ class LlapRecordReader
         mapWork.getPathToPartitionInfo());
   }
 
+  private static int getQueueVar(ConfVars var, JobConf jobConf, Configuration daemonConf) {
+    // Check job config for overrides, otherwise use the default server value.
+    int jobVal = jobConf.getInt(var.varname, -1);
+    return (jobVal != -1) ? jobVal : HiveConf.getIntVar(daemonConf, var);
+  }
+
+  // For queue size estimation purposes, we assume all columns have weight one, and the following
+  // types are counted as multiple columns. This is very primitive; if we wanted to make it better,
+  // we'd increase the base limit, and adjust dynamically based on IO and processing perf delays.
+  private static final int COL_WEIGHT_COMPLEX = 16, COL_WEIGHT_HIVEDECIMAL = 4,
+      COL_WEIGHT_STRING = 8;
+  private static int determineQueueLimit(
+      int queueLimitBase, int queueLimitMin, TypeInfo[] typeInfos) {
+    // If the values are equal, the queue limit is fixed.
+    if (queueLimitBase == queueLimitMin) return queueLimitBase;
+    // If there are no columns (projection only join?) just assume no weight.
+    if (typeInfos == null || typeInfos.length == 0) return queueLimitBase;
+    double totalWeight = 0;
+    for (TypeInfo ti : typeInfos) {
+      int colWeight = 1;
+      if (ti.getCategory() != Category.PRIMITIVE) {
+        colWeight = COL_WEIGHT_COMPLEX;
+      } else {
+        PrimitiveTypeInfo pti = (PrimitiveTypeInfo)ti;
+        switch (pti.getPrimitiveCategory()) {
+        case BINARY:
+        case CHAR:
+        case VARCHAR:
+        case STRING:
+          colWeight = COL_WEIGHT_STRING;
+        case DECIMAL:
+          colWeight = COL_WEIGHT_HIVEDECIMAL;
+        default:
+          colWeight = 1;
+        }
+      }
+      totalWeight += colWeight;
+    }
+    return Math.max(queueLimitMin, (int)(queueLimitBase / totalWeight));
+  }
+
+
   private static MapWork findMapWork(JobConf job) throws HiveException {
     String inputName = job.get(Utilities.INPUT_NAME, null);
     if (LOG.isDebugEnabled()) {
@@ -259,6 +319,7 @@ class LlapRecordReader
     } catch (InterruptedException e) {
       // Query might have been canceled. Stop the background processing.
       feedback.stop();
+      isInterrupted = true; // In case we are stuck in consume.
       throw new IOException(e);
     }
     if (cvb == null) {
@@ -340,7 +401,11 @@ class LlapRecordReader
     public void uncaughtException(final Thread t, final Throwable e) {
       LlapIoImpl.LOG.error("Unhandled error from reader thread. threadName: {} threadId: {}" +
           " Message: {}", t.getName(), t.getId(), e.getMessage());
-      setError(e);
+      try {
+        setError(e);
+      } catch (InterruptedException e1) {
+        LOG.info("IOUncaughtExceptionHandler interrupted; ignoring");
+      }
     }
   }
 
@@ -349,41 +414,40 @@ class LlapRecordReader
     if (!isFirst) {
       feedback.returnData(lastCvb);
     }
-    synchronized (pendingData) {
-      // We are waiting for next block. Either we will get it, or be told we are done.
-      boolean doLogBlocking = LlapIoImpl.LOG.isTraceEnabled() && isNothingToReport();
-      if (doLogBlocking) {
-        LlapIoImpl.LOG.trace("next will block");
-      }
-      boolean didWait = false;
-      while (isNothingToReport()) {
-        didWait = true;
-        pendingData.wait(100);
-      }
-      // If we didn't wait, check for interruption explicitly.
-      // TODO: given that we also want a queue limit, might make sense to rely on a blocking queue;
-      //       or a more advanced lock. But do double check that they will ALWAYS check interrupt.
-      //       Hive operators don't, so if we don't either, everything goes to hell.
-      if (!didWait && Thread.interrupted()) {
-        throw new InterruptedException("Thread interrupted");
-      }
-      if (doLogBlocking) {
-        LlapIoImpl.LOG.trace("next is unblocked");
-      }
-      rethrowErrorIfAny();
-      maxQueueSize = Math.max(pendingData.size(), maxQueueSize);
-      lastCvb = pendingData.poll();
+
+    // We are waiting for next block. Either we will get it, or be told we are done.
+    int queueSize = queue.size();
+    maxQueueSize = Math.max(queueSize, maxQueueSize);
+    boolean doLogBlocking = LlapIoImpl.LOG.isTraceEnabled() && queueSize == 0;
+    if (doLogBlocking) {
+      LlapIoImpl.LOG.trace("next will block");
     }
-    if (LlapIoImpl.LOG.isTraceEnabled() && lastCvb != null) {
+    // We rely on the fact that poll() checks interrupt even when there's something in the queue.
+    // If the structure is replaced with smth that doesn't, we MUST check interrupt here because
+    // Hive operators rely on recordreader to handle task interruption, and unlike most RRs we
+    // do not do any blocking IO ops on this thread.
+    Object next = null;
+    do {
+      rethrowErrorIfAny(pendingError.get()); // Best-effort check; see the comment in the method.
+      next = queue.poll(100, TimeUnit.MILLISECONDS);
+    } while (next == null);
+    if (doLogBlocking) {
+      LlapIoImpl.LOG.trace("next is unblocked");
+    }
+    if (next == DONE_OBJECT) {
+      return null; // We are done.
+    }
+    if (next instanceof Throwable) {
+      rethrowErrorIfAny((Throwable) next);
+      throw new AssertionError("Unreachable");
+    }
+    lastCvb = (ColumnVectorBatch) next;
+    if (LlapIoImpl.LOG.isTraceEnabled()) {
       LlapIoImpl.LOG.trace("Processing will receive vector {}", lastCvb);
     }
     return lastCvb;
   }
 
-  private boolean isNothingToReport() {
-    return !isDone && pendingData.isEmpty() && pendingError == null;
-  }
-
   @Override
   public NullWritable createKey() {
     return NullWritable.get();
@@ -402,17 +466,21 @@ class LlapRecordReader
   @Override
   public void close() throws IOException {
     if (LlapIoImpl.LOG.isTraceEnabled()) {
-      LlapIoImpl.LOG.trace("close called; closed {}, done {}, err {}, pending {}",
-          isClosed, isDone, pendingError, pendingData.size());
+      LlapIoImpl.LOG.trace("close called; closed {}, interrupted {}, err {}, pending {}",
+          isClosed, isInterrupted, pendingError.get(), queue.size());
     }
     LlapIoImpl.LOG.info("Maximum queue length observed " + maxQueueSize);
     LlapIoImpl.LOG.info("Llap counters: {}" ,counters); // This is where counters are logged!
     feedback.stop();
-    rethrowErrorIfAny();
+    isClosed = true;
+    rethrowErrorIfAny(pendingError.get());
     MDC.clear();
   }
 
-  private void rethrowErrorIfAny() throws IOException {
+  private static void rethrowErrorIfAny(Throwable pendingError) throws IOException {
+    // This is called either with an error that was queued, or an error that was set into the
+    // atomic reference in this class. The latter is best-effort and is used to opportunistically
+    // skip processing of a long queue when the error happens.
     if (pendingError == null) return;
     if (pendingError instanceof IOException) {
       throw (IOException)pendingError;
@@ -421,43 +489,37 @@ class LlapRecordReader
   }
 
   @Override
-  public void setDone() {
+  public void setDone() throws InterruptedException {
     if (LlapIoImpl.LOG.isDebugEnabled()) {
-      LlapIoImpl.LOG.debug("setDone called; closed {}, done {}, err {}, pending {}",
-          isClosed, isDone, pendingError, pendingData.size());
-    }
-    synchronized (pendingData) {
-      isDone = true;
-      pendingData.notifyAll();
+      LlapIoImpl.LOG.debug("setDone called; closed {}, interrupted {}, err {}, pending {}",
+          isClosed, isInterrupted, pendingError.get(), queue.size());
     }
+    enqueueInternal(DONE_OBJECT);
   }
 
   @Override
-  public void consumeData(ColumnVectorBatch data) {
+  public void consumeData(ColumnVectorBatch data) throws InterruptedException {
     if (LlapIoImpl.LOG.isTraceEnabled()) {
-      LlapIoImpl.LOG.trace("consume called; closed {}, done {}, err {}, pending {}",
-          isClosed, isDone, pendingError, pendingData.size());
-    }
-    synchronized (pendingData) {
-      if (isClosed) {
-        return;
-      }
-      pendingData.add(data);
-      pendingData.notifyAll();
+      LlapIoImpl.LOG.trace("consume called; closed {}, interrupted {}, err {}, pending {}",
+          isClosed, isInterrupted, pendingError.get(), queue.size());
     }
+    enqueueInternal(data);
   }
 
   @Override
-  public void setError(Throwable t) {
+  public void setError(Throwable t) throws InterruptedException {
     counters.incrCounter(LlapIOCounters.NUM_ERRORS);
-    LlapIoImpl.LOG.debug("setError called; current state closed {}, done {}, err {}, pending {}",
-        isClosed, isDone, pendingError, pendingData.size());
+    LlapIoImpl.LOG.debug("setError called; closed {}, interrupted {},  err {}, pending {}",
+        isClosed, isInterrupted, pendingError.get(), queue.size());
     LlapIoImpl.LOG.warn("setError called with an error", t);
     assert t != null;
-    synchronized (pendingData) {
-      pendingError = t;
-      pendingData.notifyAll();
-    }
+    pendingError.compareAndSet(null, t);
+    enqueueInternal(t);
+  }
+
+  private void enqueueInternal(Object o) throws InterruptedException {
+    // We need to loop here to handle the case where consumer goes away.
+    do {} while (!isClosed && !isInterrupted && !queue.offer(o, 100, TimeUnit.MILLISECONDS));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/e29aea6f/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
index 1f3f4d2..3f01a17 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
@@ -39,7 +39,7 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedCol
   private Callable<Void> readCallable;
   private final LlapDaemonIOMetrics ioMetrics;
   // Note that the pool is per EDC - within EDC, CVBs are expected to have the same schema.
-  private final static int CVB_POOL_SIZE = 128;
+  private static final int CVB_POOL_SIZE = 128;
   protected final FixedSizedObjectPool<ColumnVectorBatch> cvbPool;
 
   public EncodedDataConsumer(Consumer<ColumnVectorBatch> consumer, final int colCount,
@@ -71,7 +71,7 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedCol
   }
 
   @Override
-  public void consumeData(BatchType data) {
+  public void consumeData(BatchType data) throws InterruptedException {
     if (isStopped) {
       returnSourceData(data);
       return;
@@ -100,15 +100,15 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedCol
   }
 
   protected abstract void decodeBatch(BatchType batch,
-      Consumer<ColumnVectorBatch> downstreamConsumer);
+      Consumer<ColumnVectorBatch> downstreamConsumer) throws InterruptedException;
 
   @Override
-  public void setDone() {
+  public void setDone() throws InterruptedException {
     downstreamConsumer.setDone();
   }
 
   @Override
-  public void setError(Throwable t) {
+  public void setError(Throwable t) throws InterruptedException {
     downstreamConsumer.setError(t);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/e29aea6f/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
index 373af76..7939375 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
@@ -79,6 +79,10 @@ public class OrcColumnVectorProducer implements ColumnVectorProducer {
     this.tracePool = tracePool;
   }
 
+  public Configuration getConf() {
+    return conf;
+  }
+
   @Override
   public ReadPipeline createReadPipeline(
       Consumer<ColumnVectorBatch> consumer, FileSplit split, List<Integer> columnIds,

http://git-wip-us.apache.org/repos/asf/hive/blob/e29aea6f/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
index 2930497..afb7da7 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
@@ -101,7 +101,7 @@ public class OrcEncodedDataConsumer
 
   @Override
   protected void decodeBatch(OrcEncodedColumnBatch batch,
-      Consumer<ColumnVectorBatch> downstreamConsumer) {
+      Consumer<ColumnVectorBatch> downstreamConsumer) throws InterruptedException {
     long startTime = counters.startTimeCounter();
     int currentStripeIndex = batch.getBatchKey().stripeIx;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/e29aea6f/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 21f90a7..1e0eccf 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -275,7 +275,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     });
   }
 
-  protected Void performDataRead() throws IOException {
+  protected Void performDataRead() throws IOException, InterruptedException {
     long startTime = counters.startTimeCounter();
     LlapIoImpl.LOG.info("Processing data for file {}: {}", fileKey, split.getPath());
     if (processStop()) {
@@ -431,7 +431,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     return null;
   }
 
-  private void handleReaderError(long startTime, Throwable t) {
+  private void handleReaderError(long startTime, Throwable t) throws InterruptedException {
     recordReaderTime(startTime);
     consumer.setError(t);
     trace.dumpLog(LOG);

http://git-wip-us.apache.org/repos/asf/hive/blob/e29aea6f/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
index 4b0a1ce..2626f3e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
@@ -657,7 +657,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     }
   }
 
-  protected Void performDataRead() throws IOException {
+  protected Void performDataRead() throws IOException, InterruptedException {
     boolean isOk = false;
     try {
       try {
@@ -760,7 +760,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     }
   }
 
-  public Boolean readFileWithCache(long startTime) throws IOException {
+  public Boolean readFileWithCache(long startTime) throws IOException, InterruptedException {
     if (fileKey == null) return false;
     BooleanRef gotAllData = new BooleanRef();
     long endOfSplit = split.getStart() + split.getLength();
@@ -820,7 +820,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
   }
 
   public boolean processOneFileSplit(FileSplit split, long startTime,
-      Ref<Integer> stripeIxRef, StripeData slice) throws IOException {
+      Ref<Integer> stripeIxRef, StripeData slice) throws IOException, InterruptedException {
     LlapIoImpl.LOG.info("Processing one split {" + split.getPath() + ", "
         + split.getStart() + ", " + split.getLength() + "}");
     if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
@@ -920,7 +920,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
   }
 
   private boolean processOneSlice(CacheWriter.CacheStripeData diskData, boolean[] splitIncludes,
-      int stripeIx, StripeData cacheData, long startTime) throws IOException {
+      int stripeIx, StripeData cacheData, long startTime) throws IOException, InterruptedException {
     logProcessOneSlice(stripeIx, diskData, cacheData);
 
     ColumnEncoding[] cacheEncodings = cacheData == null ? null : cacheData.getEncodings();
@@ -1001,8 +1001,8 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
 
 
   /** Unlike the other overload of processOneSlice, doesn't cache data. */
-  private boolean processOneSlice(Vectors diskData, boolean[] splitIncludes,
-      int stripeIx, StripeData cacheData, long startTime) throws IOException {
+  private boolean processOneSlice(Vectors diskData, boolean[] splitIncludes, int stripeIx,
+      StripeData cacheData, long startTime) throws IOException, InterruptedException {
     if (diskData == null) {
       throw new AssertionError(); // The other overload should have been used.
     }
@@ -1601,7 +1601,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
   }
 
   private boolean sendEcbToConsumer(OrcEncodedColumnBatch ecb,
-      boolean hasCachedSlice, CacheWriter.CacheStripeData diskData) {
+      boolean hasCachedSlice, CacheWriter.CacheStripeData diskData) throws InterruptedException {
     if (ecb == null) { // This basically means stop has been called.
       cleanup(true);
       return false;

http://git-wip-us.apache.org/repos/asf/hive/blob/e29aea6f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Consumer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Consumer.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Consumer.java
index 43c5647..f5deff0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Consumer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Consumer.java
@@ -22,9 +22,9 @@ package org.apache.hadoop.hive.ql.io.orc.encoded;
  */
 public interface Consumer<T> {
   /** Some data has been produced. */
-  public void consumeData(T data);
-  /** No more data will be produced; done */
-  public void setDone();
-  /** No more data will be produced; error during production */
-  public void setError(Throwable t);
+  public void consumeData(T data) throws InterruptedException;
+  /** No more data will be produced; done. */
+  public void setDone() throws InterruptedException;
+  /** No more data will be produced; error during production. */
+  public void setError(Throwable t) throws InterruptedException;
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e29aea6f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 29cef30..8c37a65 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -353,7 +353,12 @@ class EncodedReaderImpl implements EncodedReader {
       if (hasIndexOnlyCols && (rgs == null)) {
         OrcEncodedColumnBatch ecb = POOLS.ecbPool.take();
         ecb.init(fileKey, stripeIx, OrcEncodedColumnBatch.ALL_RGS, included.length);
-        consumer.consumeData(ecb);
+        try {
+          consumer.consumeData(ecb);
+        } catch (InterruptedException e) {
+          LOG.error("IO thread interrupted while queueing data");
+          throw new IOException(e);
+        }
       } else {
         LOG.warn("Nothing to read for stripe [" + stripe + "]");
       }
@@ -466,15 +471,17 @@ class EncodedReaderImpl implements EncodedReader {
           hasErrorForEcb = false;
         } finally {
           if (hasErrorForEcb) {
-            try {
-              releaseEcbRefCountsOnError(ecb);
-            } catch (Throwable t) {
-              LOG.error("Error during the cleanup of an error; ignoring", t);
-            }
+            releaseEcbRefCountsOnError(ecb);
           }
         }
-        // After this, the non-initial refcounts are the responsibility of the consumer.
-        consumer.consumeData(ecb);
+        try {
+          consumer.consumeData(ecb);
+          // After this, the non-initial refcounts are the responsibility of the consumer.
+        } catch (InterruptedException e) {
+          LOG.error("IO thread interrupted while queueing data");
+          releaseEcbRefCountsOnError(ecb);
+          throw new IOException(e);
+        }
       }
 
       if (isTracingEnabled) {
@@ -584,20 +591,24 @@ class EncodedReaderImpl implements EncodedReader {
   }
 
   private void releaseEcbRefCountsOnError(OrcEncodedColumnBatch ecb) {
-    if (isTracingEnabled) {
-      LOG.trace("Unlocking the batch not sent to consumer, on error");
-    }
-    // We cannot send the ecb to consumer. Discard whatever is already there.
-    for (int colIx = 0; colIx < ecb.getTotalColCount(); ++colIx) {
-      if (!ecb.hasData(colIx)) continue;
-      ColumnStreamData[] datas = ecb.getColumnData(colIx);
-      for (ColumnStreamData data : datas) {
-        if (data == null || data.decRef() != 0) continue;
-        for (MemoryBuffer buf : data.getCacheBuffers()) {
-          if (buf == null) continue;
-          cacheWrapper.releaseBuffer(buf);
+    try {
+      if (isTracingEnabled) {
+        LOG.trace("Unlocking the batch not sent to consumer, on error");
+      }
+      // We cannot send the ecb to consumer. Discard whatever is already there.
+      for (int colIx = 0; colIx < ecb.getTotalColCount(); ++colIx) {
+        if (!ecb.hasData(colIx)) continue;
+        ColumnStreamData[] datas = ecb.getColumnData(colIx);
+        for (ColumnStreamData data : datas) {
+          if (data == null || data.decRef() != 0) continue;
+          for (MemoryBuffer buf : data.getCacheBuffers()) {
+            if (buf == null) continue;
+            cacheWrapper.releaseBuffer(buf);
+          }
         }
       }
+    } catch (Throwable t) {
+      LOG.error("Error during the cleanup of an error; ignoring", t);
     }
   }