You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/01/10 02:40:34 UTC
hive git commit: HIVE-18269 : LLAP: Fast llap io with slow processing
pipeline can lead to OOM (Sergey Shelukhin,
reviewed by Jason Dere and Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/master cc50d62a0 -> e29aea6fa
HIVE-18269 : LLAP: Fast llap io with slow processing pipeline can lead to OOM (Sergey Shelukhin, reviewed by Jason Dere and Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e29aea6f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e29aea6f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e29aea6f
Branch: refs/heads/master
Commit: e29aea6fa20c1096ac9449ca04669f927a8a7101
Parents: cc50d62
Author: sergey <se...@apache.org>
Authored: Tue Jan 9 17:13:19 2018 -0800
Committer: sergey <se...@apache.org>
Committed: Tue Jan 9 18:40:21 2018 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 7 +
.../hive/llap/io/api/impl/LlapInputFormat.java | 7 +-
.../hive/llap/io/api/impl/LlapIoImpl.java | 2 +-
.../hive/llap/io/api/impl/LlapRecordReader.java | 214 ++++++++++++-------
.../llap/io/decode/EncodedDataConsumer.java | 10 +-
.../llap/io/decode/OrcColumnVectorProducer.java | 4 +
.../llap/io/decode/OrcEncodedDataConsumer.java | 2 +-
.../llap/io/encoded/OrcEncodedDataReader.java | 4 +-
.../llap/io/encoded/SerDeEncodedDataReader.java | 14 +-
.../hadoop/hive/ql/io/orc/encoded/Consumer.java | 10 +-
.../ql/io/orc/encoded/EncodedReaderImpl.java | 51 +++--
11 files changed, 206 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/e29aea6f/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 6529da6..1a2c22d 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3185,6 +3185,13 @@ public class HiveConf extends Configuration {
"MR LineRecordRedader into LLAP cache, if this feature is enabled. Safety flag."),
LLAP_ORC_ENABLE_TIME_COUNTERS("hive.llap.io.orc.time.counters", true,
"Whether to enable time counters for LLAP IO layer (time spent in HDFS, etc.)"),
+ LLAP_IO_VRB_QUEUE_LIMIT_BASE("hive.llap.io.vrb.queue.limit.base", 10000,
+ "The default queue size for VRBs produced by a LLAP IO thread when the processing is\n" +
+ "slower than the IO. The actual queue size is set per fragment, and is adjusted down\n" +
+ "from the base, depending on the schema."),
+ LLAP_IO_VRB_QUEUE_LIMIT_MIN("hive.llap.io.vrb.queue.limit.min", 10,
+ "The minimum queue size for VRBs produced by a LLAP IO thread when the processing is\n" +
+ "slower than the IO (used when determining the size from base size)."),
LLAP_AUTO_ALLOW_UBER("hive.llap.auto.allow.uber", false,
"Whether or not to allow the planner to run vertices in the AM."),
LLAP_AUTO_ENFORCE_TREE("hive.llap.auto.enforce.tree", true,
http://git-wip-us.apache.org/repos/asf/hive/blob/e29aea6f/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
index 4dc107a..d51d3fc 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
@@ -74,11 +74,14 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB
final ExecutorService executor;
private final String hostName;
+ private final Configuration daemonConf;
+
@SuppressWarnings({ "rawtypes", "unchecked" })
LlapInputFormat(InputFormat sourceInputFormat, Deserializer sourceSerDe,
- ColumnVectorProducer cvp, ExecutorService executor) {
+ ColumnVectorProducer cvp, ExecutorService executor, Configuration daemonConf) {
this.executor = executor;
this.cvp = cvp;
+ this.daemonConf = daemonConf;
this.sourceInputFormat = sourceInputFormat;
this.sourceASC = (sourceInputFormat instanceof AvoidSplitCombination)
? (AvoidSplitCombination)sourceInputFormat : null;
@@ -100,7 +103,7 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB
List<Integer> includedCols = ColumnProjectionUtils.isReadAllColumns(job)
? null : ColumnProjectionUtils.getReadColumnIDs(job);
LlapRecordReader rr = LlapRecordReader.create(job, fileSplit, includedCols, hostName,
- cvp, executor, sourceInputFormat, sourceSerDe, reporter);
+ cvp, executor, sourceInputFormat, sourceSerDe, reporter, daemonConf);
if (rr == null) {
// Reader-specific incompatibility like SMB or schema evolution.
return sourceInputFormat.getRecordReader(split, job, reporter);
http://git-wip-us.apache.org/repos/asf/hive/blob/e29aea6f/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index 77c8ade..eba84c3 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -216,7 +216,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
LOG.warn("LLAP encode is disabled; cannot use for " + sourceInputFormat.getClass());
return null;
}
- return new LlapInputFormat(sourceInputFormat, sourceSerDe, cvp, executor);
+ return new LlapInputFormat(sourceInputFormat, sourceSerDe, cvp, executor, daemonConf);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/e29aea6f/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
index 52a9c23..8ec5d97 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
@@ -20,11 +20,14 @@ package org.apache.hadoop.hive.llap.io.api.impl;
import java.util.ArrayList;
import java.io.IOException;
-import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.ConsumerFeedback;
@@ -50,6 +53,9 @@ import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
@@ -68,6 +74,7 @@ import com.google.common.collect.Lists;
class LlapRecordReader
implements RecordReader<NullWritable, VectorizedRowBatch>, Consumer<ColumnVectorBatch> {
private static final Logger LOG = LoggerFactory.getLogger(LlapRecordReader.class);
+ private static final Object DONE_OBJECT = new Object();
private final FileSplit split;
private List<Integer> columnIds;
@@ -76,15 +83,15 @@ class LlapRecordReader
private final VectorizedRowBatchCtx rbCtx;
private final Object[] partitionValues;
- private final LinkedList<ColumnVectorBatch> pendingData = new LinkedList<ColumnVectorBatch>();
+ private final LinkedBlockingQueue<Object> queue;
+ private final AtomicReference<Throwable> pendingError = new AtomicReference<>(null);
+
+ /** Vector that is currently being processed by our user. */
private ColumnVectorBatch lastCvb = null;
private boolean isFirst = true;
private int maxQueueSize = 0;
- private Throwable pendingError = null;
- /** Vector that is currently being processed by our user. */
- private boolean isDone = false;
- private final boolean isClosed = false;
+ private boolean isClosed = false, isInterrupted = false;
private final ConsumerFeedback<ColumnVectorBatch> feedback;
private final QueryFragmentCounters counters;
private long firstReturnTime;
@@ -101,12 +108,13 @@ class LlapRecordReader
*/
public static LlapRecordReader create(JobConf job, FileSplit split, List<Integer> includedCols,
String hostName, ColumnVectorProducer cvp, ExecutorService executor,
- InputFormat<?, ?> sourceInputFormat, Deserializer sourceSerDe, Reporter reporter)
+ InputFormat<?, ?> sourceInputFormat, Deserializer sourceSerDe, Reporter reporter,
+ Configuration daemonConf)
throws IOException, HiveException {
MapWork mapWork = findMapWork(job);
if (mapWork == null) return null; // No compatible MapWork.
LlapRecordReader rr = new LlapRecordReader(mapWork, job, split, includedCols, hostName,
- cvp, executor, sourceInputFormat, sourceSerDe, reporter);
+ cvp, executor, sourceInputFormat, sourceSerDe, reporter, daemonConf);
if (!rr.checkOrcSchemaEvolution()) {
rr.close();
return null;
@@ -117,10 +125,11 @@ class LlapRecordReader
private LlapRecordReader(MapWork mapWork, JobConf job, FileSplit split,
List<Integer> includedCols, String hostName, ColumnVectorProducer cvp,
ExecutorService executor, InputFormat<?, ?> sourceInputFormat, Deserializer sourceSerDe,
- Reporter reporter) throws IOException, HiveException {
+ Reporter reporter, Configuration daemonConf) throws IOException, HiveException {
this.executor = executor;
this.jobConf = job;
this.split = split;
+
this.sarg = ConvertAstToSearchArg.createFromConf(job);
this.columnNames = ColumnProjectionUtils.getReadColumnNames(job);
final String fragmentId = LlapTezUtils.getFragmentId(job);
@@ -139,6 +148,18 @@ class LlapRecordReader
this.counters = new QueryFragmentCounters(job, taskCounters);
this.counters.setDesc(QueryFragmentCounters.Desc.MACHINE, hostName);
+ VectorizedRowBatchCtx ctx = mapWork.getVectorizedRowBatchCtx();
+ rbCtx = ctx != null ? ctx : LlapInputFormat.createFakeVrbCtx(mapWork);
+
+ // Note: columnIds below makes additional changes for ACID. Don't use this var directly.
+ if (includedCols == null) {
+ // Assume including everything means the VRB will have everything.
+ includedCols = new ArrayList<>(rbCtx.getRowColumnTypeInfos().length);
+ for (int i = 0; i < rbCtx.getRowColumnTypeInfos().length; ++i) {
+ includedCols.add(i);
+ }
+ }
+
isAcidScan = HiveConf.getBoolVar(jobConf, ConfVars.HIVE_ACID_TABLE_SCAN);
TypeDescription schema = OrcInputFormat.getDesiredRowTypeDescr(
job, isAcidScan, Integer.MAX_VALUE);
@@ -157,15 +178,12 @@ class LlapRecordReader
this.columnCount = columnIds.size();
}
- VectorizedRowBatchCtx ctx = mapWork.getVectorizedRowBatchCtx();
- rbCtx = ctx != null ? ctx : LlapInputFormat.createFakeVrbCtx(mapWork);
- if (includedCols == null) {
- // Assume including everything means the VRB will have everything.
- includedCols = new ArrayList<>(rbCtx.getRowColumnTypeInfos().length);
- for (int i = 0; i < rbCtx.getRowColumnTypeInfos().length; ++i) {
- includedCols.add(i);
- }
- }
+ int queueLimitBase = getQueueVar(ConfVars.LLAP_IO_VRB_QUEUE_LIMIT_BASE, job, daemonConf);
+ int queueLimitMin = getQueueVar(ConfVars.LLAP_IO_VRB_QUEUE_LIMIT_MIN, job, daemonConf);
+ int limit = determineQueueLimit(queueLimitBase, queueLimitMin, rbCtx.getRowColumnTypeInfos());
+ LOG.info("Queue limit for LlapRecordReader is " + limit);
+ this.queue = new LinkedBlockingQueue<>(limit);
+
int partitionColumnCount = rbCtx.getPartitionColumnCount();
if (partitionColumnCount > 0) {
@@ -181,6 +199,48 @@ class LlapRecordReader
mapWork.getPathToPartitionInfo());
}
+ private static int getQueueVar(ConfVars var, JobConf jobConf, Configuration daemonConf) {
+ // Check job config for overrides, otherwise use the default server value.
+ int jobVal = jobConf.getInt(var.varname, -1);
+ return (jobVal != -1) ? jobVal : HiveConf.getIntVar(daemonConf, var);
+ }
+
+ // For queue size estimation purposes, we assume all columns have weight one, and the following
+ // types are counted as multiple columns. This is very primitive; if we wanted to make it better,
+ // we'd increase the base limit, and adjust dynamically based on IO and processing perf delays.
+ private static final int COL_WEIGHT_COMPLEX = 16, COL_WEIGHT_HIVEDECIMAL = 4,
+ COL_WEIGHT_STRING = 8;
+ private static int determineQueueLimit(
+ int queueLimitBase, int queueLimitMin, TypeInfo[] typeInfos) {
+ // If the values are equal, the queue limit is fixed.
+ if (queueLimitBase == queueLimitMin) return queueLimitBase;
+ // If there are no columns (projection only join?) just assume no weight.
+ if (typeInfos == null || typeInfos.length == 0) return queueLimitBase;
+ double totalWeight = 0;
+ for (TypeInfo ti : typeInfos) {
+ int colWeight = 1;
+ if (ti.getCategory() != Category.PRIMITIVE) {
+ colWeight = COL_WEIGHT_COMPLEX;
+ } else {
+ PrimitiveTypeInfo pti = (PrimitiveTypeInfo)ti;
+ switch (pti.getPrimitiveCategory()) {
+ case BINARY:
+ case CHAR:
+ case VARCHAR:
+ case STRING:
+ colWeight = COL_WEIGHT_STRING;
+ case DECIMAL:
+ colWeight = COL_WEIGHT_HIVEDECIMAL;
+ default:
+ colWeight = 1;
+ }
+ }
+ totalWeight += colWeight;
+ }
+ return Math.max(queueLimitMin, (int)(queueLimitBase / totalWeight));
+ }
+
+
private static MapWork findMapWork(JobConf job) throws HiveException {
String inputName = job.get(Utilities.INPUT_NAME, null);
if (LOG.isDebugEnabled()) {
@@ -259,6 +319,7 @@ class LlapRecordReader
} catch (InterruptedException e) {
// Query might have been canceled. Stop the background processing.
feedback.stop();
+ isInterrupted = true; // In case we are stuck in consume.
throw new IOException(e);
}
if (cvb == null) {
@@ -340,7 +401,11 @@ class LlapRecordReader
public void uncaughtException(final Thread t, final Throwable e) {
LlapIoImpl.LOG.error("Unhandled error from reader thread. threadName: {} threadId: {}" +
" Message: {}", t.getName(), t.getId(), e.getMessage());
- setError(e);
+ try {
+ setError(e);
+ } catch (InterruptedException e1) {
+ LOG.info("IOUncaughtExceptionHandler interrupted; ignoring");
+ }
}
}
@@ -349,41 +414,40 @@ class LlapRecordReader
if (!isFirst) {
feedback.returnData(lastCvb);
}
- synchronized (pendingData) {
- // We are waiting for next block. Either we will get it, or be told we are done.
- boolean doLogBlocking = LlapIoImpl.LOG.isTraceEnabled() && isNothingToReport();
- if (doLogBlocking) {
- LlapIoImpl.LOG.trace("next will block");
- }
- boolean didWait = false;
- while (isNothingToReport()) {
- didWait = true;
- pendingData.wait(100);
- }
- // If we didn't wait, check for interruption explicitly.
- // TODO: given that we also want a queue limit, might make sense to rely on a blocking queue;
- // or a more advanced lock. But do double check that they will ALWAYS check interrupt.
- // Hive operators don't, so if we don't either, everything goes to hell.
- if (!didWait && Thread.interrupted()) {
- throw new InterruptedException("Thread interrupted");
- }
- if (doLogBlocking) {
- LlapIoImpl.LOG.trace("next is unblocked");
- }
- rethrowErrorIfAny();
- maxQueueSize = Math.max(pendingData.size(), maxQueueSize);
- lastCvb = pendingData.poll();
+
+ // We are waiting for next block. Either we will get it, or be told we are done.
+ int queueSize = queue.size();
+ maxQueueSize = Math.max(queueSize, maxQueueSize);
+ boolean doLogBlocking = LlapIoImpl.LOG.isTraceEnabled() && queueSize == 0;
+ if (doLogBlocking) {
+ LlapIoImpl.LOG.trace("next will block");
}
- if (LlapIoImpl.LOG.isTraceEnabled() && lastCvb != null) {
+ // We rely on the fact that poll() checks interrupt even when there's something in the queue.
+ // If the structure is replaced with smth that doesn't, we MUST check interrupt here because
+ // Hive operators rely on recordreader to handle task interruption, and unlike most RRs we
+ // do not do any blocking IO ops on this thread.
+ Object next = null;
+ do {
+ rethrowErrorIfAny(pendingError.get()); // Best-effort check; see the comment in the method.
+ next = queue.poll(100, TimeUnit.MILLISECONDS);
+ } while (next == null);
+ if (doLogBlocking) {
+ LlapIoImpl.LOG.trace("next is unblocked");
+ }
+ if (next == DONE_OBJECT) {
+ return null; // We are done.
+ }
+ if (next instanceof Throwable) {
+ rethrowErrorIfAny((Throwable) next);
+ throw new AssertionError("Unreachable");
+ }
+ lastCvb = (ColumnVectorBatch) next;
+ if (LlapIoImpl.LOG.isTraceEnabled()) {
LlapIoImpl.LOG.trace("Processing will receive vector {}", lastCvb);
}
return lastCvb;
}
- private boolean isNothingToReport() {
- return !isDone && pendingData.isEmpty() && pendingError == null;
- }
-
@Override
public NullWritable createKey() {
return NullWritable.get();
@@ -402,17 +466,21 @@ class LlapRecordReader
@Override
public void close() throws IOException {
if (LlapIoImpl.LOG.isTraceEnabled()) {
- LlapIoImpl.LOG.trace("close called; closed {}, done {}, err {}, pending {}",
- isClosed, isDone, pendingError, pendingData.size());
+ LlapIoImpl.LOG.trace("close called; closed {}, interrupted {}, err {}, pending {}",
+ isClosed, isInterrupted, pendingError.get(), queue.size());
}
LlapIoImpl.LOG.info("Maximum queue length observed " + maxQueueSize);
LlapIoImpl.LOG.info("Llap counters: {}" ,counters); // This is where counters are logged!
feedback.stop();
- rethrowErrorIfAny();
+ isClosed = true;
+ rethrowErrorIfAny(pendingError.get());
MDC.clear();
}
- private void rethrowErrorIfAny() throws IOException {
+ private static void rethrowErrorIfAny(Throwable pendingError) throws IOException {
+ // This is called either with an error that was queued, or an error that was set into the
+ // atomic reference in this class. The latter is best-effort and is used to opportunistically
+ // skip processing of a long queue when the error happens.
if (pendingError == null) return;
if (pendingError instanceof IOException) {
throw (IOException)pendingError;
@@ -421,43 +489,37 @@ class LlapRecordReader
}
@Override
- public void setDone() {
+ public void setDone() throws InterruptedException {
if (LlapIoImpl.LOG.isDebugEnabled()) {
- LlapIoImpl.LOG.debug("setDone called; closed {}, done {}, err {}, pending {}",
- isClosed, isDone, pendingError, pendingData.size());
- }
- synchronized (pendingData) {
- isDone = true;
- pendingData.notifyAll();
+ LlapIoImpl.LOG.debug("setDone called; closed {}, interrupted {}, err {}, pending {}",
+ isClosed, isInterrupted, pendingError.get(), queue.size());
}
+ enqueueInternal(DONE_OBJECT);
}
@Override
- public void consumeData(ColumnVectorBatch data) {
+ public void consumeData(ColumnVectorBatch data) throws InterruptedException {
if (LlapIoImpl.LOG.isTraceEnabled()) {
- LlapIoImpl.LOG.trace("consume called; closed {}, done {}, err {}, pending {}",
- isClosed, isDone, pendingError, pendingData.size());
- }
- synchronized (pendingData) {
- if (isClosed) {
- return;
- }
- pendingData.add(data);
- pendingData.notifyAll();
+ LlapIoImpl.LOG.trace("consume called; closed {}, interrupted {}, err {}, pending {}",
+ isClosed, isInterrupted, pendingError.get(), queue.size());
}
+ enqueueInternal(data);
}
@Override
- public void setError(Throwable t) {
+ public void setError(Throwable t) throws InterruptedException {
counters.incrCounter(LlapIOCounters.NUM_ERRORS);
- LlapIoImpl.LOG.debug("setError called; current state closed {}, done {}, err {}, pending {}",
- isClosed, isDone, pendingError, pendingData.size());
+ LlapIoImpl.LOG.debug("setError called; closed {}, interrupted {}, err {}, pending {}",
+ isClosed, isInterrupted, pendingError.get(), queue.size());
LlapIoImpl.LOG.warn("setError called with an error", t);
assert t != null;
- synchronized (pendingData) {
- pendingError = t;
- pendingData.notifyAll();
- }
+ pendingError.compareAndSet(null, t);
+ enqueueInternal(t);
+ }
+
+ private void enqueueInternal(Object o) throws InterruptedException {
+ // We need to loop here to handle the case where consumer goes away.
+ do {} while (!isClosed && !isInterrupted && !queue.offer(o, 100, TimeUnit.MILLISECONDS));
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/e29aea6f/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
index 1f3f4d2..3f01a17 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
@@ -39,7 +39,7 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedCol
private Callable<Void> readCallable;
private final LlapDaemonIOMetrics ioMetrics;
// Note that the pool is per EDC - within EDC, CVBs are expected to have the same schema.
- private final static int CVB_POOL_SIZE = 128;
+ private static final int CVB_POOL_SIZE = 128;
protected final FixedSizedObjectPool<ColumnVectorBatch> cvbPool;
public EncodedDataConsumer(Consumer<ColumnVectorBatch> consumer, final int colCount,
@@ -71,7 +71,7 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedCol
}
@Override
- public void consumeData(BatchType data) {
+ public void consumeData(BatchType data) throws InterruptedException {
if (isStopped) {
returnSourceData(data);
return;
@@ -100,15 +100,15 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedCol
}
protected abstract void decodeBatch(BatchType batch,
- Consumer<ColumnVectorBatch> downstreamConsumer);
+ Consumer<ColumnVectorBatch> downstreamConsumer) throws InterruptedException;
@Override
- public void setDone() {
+ public void setDone() throws InterruptedException {
downstreamConsumer.setDone();
}
@Override
- public void setError(Throwable t) {
+ public void setError(Throwable t) throws InterruptedException {
downstreamConsumer.setError(t);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e29aea6f/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
index 373af76..7939375 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
@@ -79,6 +79,10 @@ public class OrcColumnVectorProducer implements ColumnVectorProducer {
this.tracePool = tracePool;
}
+ public Configuration getConf() {
+ return conf;
+ }
+
@Override
public ReadPipeline createReadPipeline(
Consumer<ColumnVectorBatch> consumer, FileSplit split, List<Integer> columnIds,
http://git-wip-us.apache.org/repos/asf/hive/blob/e29aea6f/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
index 2930497..afb7da7 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
@@ -101,7 +101,7 @@ public class OrcEncodedDataConsumer
@Override
protected void decodeBatch(OrcEncodedColumnBatch batch,
- Consumer<ColumnVectorBatch> downstreamConsumer) {
+ Consumer<ColumnVectorBatch> downstreamConsumer) throws InterruptedException {
long startTime = counters.startTimeCounter();
int currentStripeIndex = batch.getBatchKey().stripeIx;
http://git-wip-us.apache.org/repos/asf/hive/blob/e29aea6f/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 21f90a7..1e0eccf 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -275,7 +275,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
});
}
- protected Void performDataRead() throws IOException {
+ protected Void performDataRead() throws IOException, InterruptedException {
long startTime = counters.startTimeCounter();
LlapIoImpl.LOG.info("Processing data for file {}: {}", fileKey, split.getPath());
if (processStop()) {
@@ -431,7 +431,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
return null;
}
- private void handleReaderError(long startTime, Throwable t) {
+ private void handleReaderError(long startTime, Throwable t) throws InterruptedException {
recordReaderTime(startTime);
consumer.setError(t);
trace.dumpLog(LOG);
http://git-wip-us.apache.org/repos/asf/hive/blob/e29aea6f/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
index 4b0a1ce..2626f3e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
@@ -657,7 +657,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
}
}
- protected Void performDataRead() throws IOException {
+ protected Void performDataRead() throws IOException, InterruptedException {
boolean isOk = false;
try {
try {
@@ -760,7 +760,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
}
}
- public Boolean readFileWithCache(long startTime) throws IOException {
+ public Boolean readFileWithCache(long startTime) throws IOException, InterruptedException {
if (fileKey == null) return false;
BooleanRef gotAllData = new BooleanRef();
long endOfSplit = split.getStart() + split.getLength();
@@ -820,7 +820,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
}
public boolean processOneFileSplit(FileSplit split, long startTime,
- Ref<Integer> stripeIxRef, StripeData slice) throws IOException {
+ Ref<Integer> stripeIxRef, StripeData slice) throws IOException, InterruptedException {
LlapIoImpl.LOG.info("Processing one split {" + split.getPath() + ", "
+ split.getStart() + ", " + split.getLength() + "}");
if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
@@ -920,7 +920,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
}
private boolean processOneSlice(CacheWriter.CacheStripeData diskData, boolean[] splitIncludes,
- int stripeIx, StripeData cacheData, long startTime) throws IOException {
+ int stripeIx, StripeData cacheData, long startTime) throws IOException, InterruptedException {
logProcessOneSlice(stripeIx, diskData, cacheData);
ColumnEncoding[] cacheEncodings = cacheData == null ? null : cacheData.getEncodings();
@@ -1001,8 +1001,8 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
/** Unlike the other overload of processOneSlice, doesn't cache data. */
- private boolean processOneSlice(Vectors diskData, boolean[] splitIncludes,
- int stripeIx, StripeData cacheData, long startTime) throws IOException {
+ private boolean processOneSlice(Vectors diskData, boolean[] splitIncludes, int stripeIx,
+ StripeData cacheData, long startTime) throws IOException, InterruptedException {
if (diskData == null) {
throw new AssertionError(); // The other overload should have been used.
}
@@ -1601,7 +1601,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
}
private boolean sendEcbToConsumer(OrcEncodedColumnBatch ecb,
- boolean hasCachedSlice, CacheWriter.CacheStripeData diskData) {
+ boolean hasCachedSlice, CacheWriter.CacheStripeData diskData) throws InterruptedException {
if (ecb == null) { // This basically means stop has been called.
cleanup(true);
return false;
http://git-wip-us.apache.org/repos/asf/hive/blob/e29aea6f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Consumer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Consumer.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Consumer.java
index 43c5647..f5deff0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Consumer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Consumer.java
@@ -22,9 +22,9 @@ package org.apache.hadoop.hive.ql.io.orc.encoded;
*/
public interface Consumer<T> {
/** Some data has been produced. */
- public void consumeData(T data);
- /** No more data will be produced; done */
- public void setDone();
- /** No more data will be produced; error during production */
- public void setError(Throwable t);
+ public void consumeData(T data) throws InterruptedException;
+ /** No more data will be produced; done. */
+ public void setDone() throws InterruptedException;
+ /** No more data will be produced; error during production. */
+ public void setError(Throwable t) throws InterruptedException;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e29aea6f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 29cef30..8c37a65 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -353,7 +353,12 @@ class EncodedReaderImpl implements EncodedReader {
if (hasIndexOnlyCols && (rgs == null)) {
OrcEncodedColumnBatch ecb = POOLS.ecbPool.take();
ecb.init(fileKey, stripeIx, OrcEncodedColumnBatch.ALL_RGS, included.length);
- consumer.consumeData(ecb);
+ try {
+ consumer.consumeData(ecb);
+ } catch (InterruptedException e) {
+ LOG.error("IO thread interrupted while queueing data");
+ throw new IOException(e);
+ }
} else {
LOG.warn("Nothing to read for stripe [" + stripe + "]");
}
@@ -466,15 +471,17 @@ class EncodedReaderImpl implements EncodedReader {
hasErrorForEcb = false;
} finally {
if (hasErrorForEcb) {
- try {
- releaseEcbRefCountsOnError(ecb);
- } catch (Throwable t) {
- LOG.error("Error during the cleanup of an error; ignoring", t);
- }
+ releaseEcbRefCountsOnError(ecb);
}
}
- // After this, the non-initial refcounts are the responsibility of the consumer.
- consumer.consumeData(ecb);
+ try {
+ consumer.consumeData(ecb);
+ // After this, the non-initial refcounts are the responsibility of the consumer.
+ } catch (InterruptedException e) {
+ LOG.error("IO thread interrupted while queueing data");
+ releaseEcbRefCountsOnError(ecb);
+ throw new IOException(e);
+ }
}
if (isTracingEnabled) {
@@ -584,20 +591,24 @@ class EncodedReaderImpl implements EncodedReader {
}
private void releaseEcbRefCountsOnError(OrcEncodedColumnBatch ecb) {
- if (isTracingEnabled) {
- LOG.trace("Unlocking the batch not sent to consumer, on error");
- }
- // We cannot send the ecb to consumer. Discard whatever is already there.
- for (int colIx = 0; colIx < ecb.getTotalColCount(); ++colIx) {
- if (!ecb.hasData(colIx)) continue;
- ColumnStreamData[] datas = ecb.getColumnData(colIx);
- for (ColumnStreamData data : datas) {
- if (data == null || data.decRef() != 0) continue;
- for (MemoryBuffer buf : data.getCacheBuffers()) {
- if (buf == null) continue;
- cacheWrapper.releaseBuffer(buf);
+ try {
+ if (isTracingEnabled) {
+ LOG.trace("Unlocking the batch not sent to consumer, on error");
+ }
+ // We cannot send the ecb to consumer. Discard whatever is already there.
+ for (int colIx = 0; colIx < ecb.getTotalColCount(); ++colIx) {
+ if (!ecb.hasData(colIx)) continue;
+ ColumnStreamData[] datas = ecb.getColumnData(colIx);
+ for (ColumnStreamData data : datas) {
+ if (data == null || data.decRef() != 0) continue;
+ for (MemoryBuffer buf : data.getCacheBuffers()) {
+ if (buf == null) continue;
+ cacheWrapper.releaseBuffer(buf);
+ }
}
}
+ } catch (Throwable t) {
+ LOG.error("Error during the cleanup of an error; ignoring", t);
}
}