You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/12/21 03:22:32 UTC
hive git commit: HIVE-18318 : LLAP record reader should check
interrupt even when not blocking (Sergey Shelukhin,
reviewed by Gopal Vijayaraghavan)
Repository: hive
Updated Branches:
refs/heads/master 4e43ec7c4 -> ed1cf112a
HIVE-18318 : LLAP record reader should check interrupt even when not blocking (Sergey Shelukhin, reviewed by Gopal Vijayaraghavan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ed1cf112
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ed1cf112
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ed1cf112
Branch: refs/heads/master
Commit: ed1cf112a5a9e022f12c0f28e953435e1248b876
Parents: 4e43ec7
Author: sergey <se...@apache.org>
Authored: Wed Dec 20 19:19:07 2017 -0800
Committer: sergey <se...@apache.org>
Committed: Wed Dec 20 19:19:07 2017 -0800
----------------------------------------------------------------------
.../hadoop/hive/llap/io/api/impl/LlapRecordReader.java | 13 ++++++++++++-
1 file changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/ed1cf112/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
index a4b877b..52a9c23 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.hive.llap.io.api.impl;
import java.util.ArrayList;
-
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
@@ -80,6 +79,7 @@ class LlapRecordReader
private final LinkedList<ColumnVectorBatch> pendingData = new LinkedList<ColumnVectorBatch>();
private ColumnVectorBatch lastCvb = null;
private boolean isFirst = true;
+ private int maxQueueSize = 0;
private Throwable pendingError = null;
/** Vector that is currently being processed by our user. */
@@ -355,13 +355,23 @@ class LlapRecordReader
if (doLogBlocking) {
LlapIoImpl.LOG.trace("next will block");
}
+ boolean didWait = false;
while (isNothingToReport()) {
+ didWait = true;
pendingData.wait(100);
}
+ // If we didn't wait, check for interruption explicitly.
+ // TODO: given that we also want a queue limit, might make sense to rely on a blocking queue;
+ // or a more advanced lock. But do double check that they will ALWAYS check interrupt.
+ // Hive operators don't, so if we don't either, everything goes to hell.
+ if (!didWait && Thread.interrupted()) {
+ throw new InterruptedException("Thread interrupted");
+ }
if (doLogBlocking) {
LlapIoImpl.LOG.trace("next is unblocked");
}
rethrowErrorIfAny();
+ maxQueueSize = Math.max(pendingData.size(), maxQueueSize);
lastCvb = pendingData.poll();
}
if (LlapIoImpl.LOG.isTraceEnabled() && lastCvb != null) {
@@ -395,6 +405,7 @@ class LlapRecordReader
LlapIoImpl.LOG.trace("close called; closed {}, done {}, err {}, pending {}",
isClosed, isDone, pendingError, pendingData.size());
}
+ LlapIoImpl.LOG.info("Maximum queue length observed " + maxQueueSize);
LlapIoImpl.LOG.info("Llap counters: {}" ,counters); // This is where counters are logged!
feedback.stop();
rethrowErrorIfAny();