You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/12/21 03:22:32 UTC

hive git commit: HIVE-18318 : LLAP record reader should check interrupt even when not blocking (Sergey Shelukhin, reviewed by Gopal Vijayaraghavan)

Repository: hive
Updated Branches:
  refs/heads/master 4e43ec7c4 -> ed1cf112a


HIVE-18318 : LLAP record reader should check interrupt even when not blocking (Sergey Shelukhin, reviewed by Gopal Vijayaraghavan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ed1cf112
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ed1cf112
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ed1cf112

Branch: refs/heads/master
Commit: ed1cf112a5a9e022f12c0f28e953435e1248b876
Parents: 4e43ec7
Author: sergey <se...@apache.org>
Authored: Wed Dec 20 19:19:07 2017 -0800
Committer: sergey <se...@apache.org>
Committed: Wed Dec 20 19:19:07 2017 -0800

----------------------------------------------------------------------
 .../hadoop/hive/llap/io/api/impl/LlapRecordReader.java | 13 ++++++++++++-
 1 file changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/ed1cf112/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
index a4b877b..52a9c23 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hive.llap.io.api.impl;
 
 import java.util.ArrayList;
-
 import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
@@ -80,6 +79,7 @@ class LlapRecordReader
   private final LinkedList<ColumnVectorBatch> pendingData = new LinkedList<ColumnVectorBatch>();
   private ColumnVectorBatch lastCvb = null;
   private boolean isFirst = true;
+  private int maxQueueSize = 0;
 
   private Throwable pendingError = null;
   /** Vector that is currently being processed by our user. */
@@ -355,13 +355,23 @@ class LlapRecordReader
       if (doLogBlocking) {
         LlapIoImpl.LOG.trace("next will block");
       }
+      boolean didWait = false;
       while (isNothingToReport()) {
+        didWait = true;
         pendingData.wait(100);
       }
+      // If we didn't wait, check for interruption explicitly.
+      // TODO: given that we also want a queue limit, might make sense to rely on a blocking queue;
+      //       or a more advanced lock. But do double check that they will ALWAYS check interrupt.
+      //       Hive operators don't, so if we don't either, everything goes to hell.
+      if (!didWait && Thread.interrupted()) {
+        throw new InterruptedException("Thread interrupted");
+      }
       if (doLogBlocking) {
         LlapIoImpl.LOG.trace("next is unblocked");
       }
       rethrowErrorIfAny();
+      maxQueueSize = Math.max(pendingData.size(), maxQueueSize);
       lastCvb = pendingData.poll();
     }
     if (LlapIoImpl.LOG.isTraceEnabled() && lastCvb != null) {
@@ -395,6 +405,7 @@ class LlapRecordReader
       LlapIoImpl.LOG.trace("close called; closed {}, done {}, err {}, pending {}",
           isClosed, isDone, pendingError, pendingData.size());
     }
+    LlapIoImpl.LOG.info("Maximum queue length observed " + maxQueueSize);
     LlapIoImpl.LOG.info("Llap counters: {}" ,counters); // This is where counters are logged!
     feedback.stop();
     rethrowErrorIfAny();