You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ab...@apache.org on 2021/12/13 09:22:16 UTC

[hive] branch master updated: HIVE-25751: Ignore exceptions related to interruption when the limit is reached (#2828) (Laszlo Bodor reviewed by Rajesh Balamohan and Panagiotis Garefalakis)

This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new a83446a  HIVE-25751: Ignore exceptions related to interruption when the limit is reached (#2828) (Laszlo Bodor reviewed by Rajesh Balamohan and Panagiotis Garefalakis)
a83446a is described below

commit a83446a2b48213c542c88ae59b339b759d06f4ef
Author: Bodor Laszlo <bo...@gmail.com>
AuthorDate: Mon Dec 13 10:21:49 2021 +0100

    HIVE-25751: Ignore exceptions related to interruption when the limit is reached (#2828) (Laszlo Bodor reviewed by Rajesh Balamohan and Panagiotis Garefalakis)
---
 .../org/apache/hadoop/hive/common/JavaUtils.java   |  8 +++++
 .../test/resources/testconfiguration.properties    |  2 ++
 .../hive/llap/io/api/impl/LlapInputFormat.java     | 24 ++++++++++++-
 .../apache/hadoop/hive/ql/exec/LimitOperator.java  | 31 +++++++++++++++-
 .../hadoop/hive/ql/exec/tez/TezProcessor.java      | 19 +---------
 .../apache/hadoop/hive/ql/io/HiveInputFormat.java  | 25 +++++++++++--
 ql/src/test/queries/clientpositive/limit_bailout.q |  7 ++++
 .../clientpositive/llap/limit_bailout.q.out        | 42 ++++++++++++++++++++++
 .../results/clientpositive/tez/limit_bailout.q.out | 42 ++++++++++++++++++++++
 9 files changed, 178 insertions(+), 22 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java b/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java
index e5c9a4f..8d0d6c5 100644
--- a/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java
@@ -121,4 +121,12 @@ public final class JavaUtils {
   private JavaUtils() {
     // prevent instantiation
   }
+
+  public static Throwable findRootCause(Throwable throwable) {
+    Throwable rootCause = throwable;
+    while (rootCause.getCause() != null && rootCause.getCause() != rootCause) {
+      rootCause = rootCause.getCause();
+    }
+    return rootCause;
+  }
 }
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index f5f50e5..99901ac 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -23,6 +23,7 @@ minitez.query.files=\
   explainanalyze_4.q,\
   explainanalyze_5.q,\
   explainuser_3.q,\
+  limit_bailout.q,\
   mapjoin_addjar.q,\
   orc_merge12.q,\
   orc_vectorization_ppd.q,\
@@ -91,6 +92,7 @@ minillap.query.files=\
   intersect_all.q,\
   intersect_distinct.q,\
   intersect_merge.q,\
+  limit_bailout.q,\
   llap_nullscan.q,\
   llap_stats.q,\
   llap_udf.q,\
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
index 6cbf1eb..789b637 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
@@ -21,10 +21,12 @@ package org.apache.hadoop.hive.llap.io.api.impl;
 
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedSupport;
 import org.apache.hadoop.hive.ql.io.BatchToRowInputFormat;
+import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 
 import java.io.IOException;
+import java.nio.channels.ClosedByInterruptException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -38,6 +40,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.LimitOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.RowSchema;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
@@ -47,6 +50,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
 import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.AvoidSplitCombination;
 import org.apache.hadoop.hive.ql.io.LlapAwareSplit;
+import org.apache.hadoop.hive.ql.io.NullRowsInputFormat.NullRowsRecordReader;
 import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
@@ -138,10 +142,28 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB
       rr.start();
       return result;
     } catch (Exception ex) {
-      throw new IOException(ex);
+      Throwable rootCause = JavaUtils.findRootCause(ex);
+      if (checkLimitReached(job)
+          && (rootCause instanceof InterruptedException || rootCause instanceof ClosedByInterruptException)) {
+        LlapIoImpl.LOG.info("Ignoring exception while getting record reader as limit is reached", rootCause);
+        return new NullRowsRecordReader(job, split);
+      } else {
+        throw new IOException(ex);
+      }
     }
   }
 
+  private boolean checkLimitReached(JobConf job) {
+    /*
+     * 2 assumptions here when using "tez.mapreduce.vertex.name"
+     *
+     * 1. The execution engine is tez, which is valid in case of LLAP.
+     * 2. The property "tez.mapreduce.vertex.name" is present: it is handled in MRInputBase.initialize.
+     *    On Input codepaths we cannot use properties from TezProcessor.initTezAttributes.
+     */
+    return LimitOperator.checkLimitReachedForVertex(job, job.get("tez.mapreduce.vertex.name"));
+  }
+
   private RecordReader<NullWritable, VectorizedRowBatch> wrapLlapReader(
       List<Integer> includedCols, LlapRecordReader rr, InputSplit split) throws IOException {
     // vectorized row batch reader
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java
index 608a0fd..41bee8d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.exec.tez.TezProcessor;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.LimitDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.mapred.JobConf;
 
 /**
  * Limit operator implementation Limits the number of rows to be passed on.
@@ -38,6 +39,8 @@ import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 public class LimitOperator extends Operator<LimitDesc> implements Serializable {
   private static final long serialVersionUID = 1L;
 
+  private static final String LIMIT_REACHED_KEY_SUFFIX = "_limit_reached";
+
   protected transient int limit;
   protected transient int offset;
   protected transient int leastRow;
@@ -161,6 +164,32 @@ public class LimitOperator extends Operator<LimitDesc> implements Serializable {
   }
 
   public static String getLimitReachedKey(Configuration conf) {
-    return conf.get(TezProcessor.HIVE_TEZ_VERTEX_NAME) + "_limit_reached";
+    return conf.get(TezProcessor.HIVE_TEZ_VERTEX_NAME) + LIMIT_REACHED_KEY_SUFFIX;
+  }
+
+  public static boolean checkLimitReached(JobConf jobConf) {
+    String queryId = HiveConf.getVar(jobConf, HiveConf.ConfVars.HIVEQUERYID);
+    String limitReachedKey = getLimitReachedKey(jobConf);
+
+    return checkLimitReached(jobConf, queryId, limitReachedKey);
+  }
+
+  public static boolean checkLimitReachedForVertex(JobConf jobConf, String vertexName) {
+    String queryId = HiveConf.getVar(jobConf, HiveConf.ConfVars.HIVEQUERYID);
+    return checkLimitReached(jobConf, queryId, vertexName + LIMIT_REACHED_KEY_SUFFIX);
+  }
+
+  private static boolean checkLimitReached(JobConf jobConf, String queryId, String limitReachedKey) {
+    try {
+      return ObjectCacheFactory.getCache(jobConf, queryId, false, true)
+          .retrieve(limitReachedKey, new Callable<AtomicBoolean>() {
+            @Override
+            public AtomicBoolean call() {
+              return new AtomicBoolean(false);
+            }
+          }).get();
+    } catch (HiveException e) {
+      throw new RuntimeException(e);
+    }
   }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
index 0cbcc26..2558d01 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
@@ -244,7 +244,7 @@ public class TezProcessor extends AbstractLogicalIOProcessor {
     }
 
     synchronized (this) {
-      boolean limitReached = checkLimitReached();
+      boolean limitReached = LimitOperator.checkLimitReached(jobConf);
       if (limitReached) {
         LOG.info(
             "TezProcessor exits early as query limit already reached, vertex: {}, task: {}, attempt: {}",
@@ -279,23 +279,6 @@ public class TezProcessor extends AbstractLogicalIOProcessor {
     // TODO HIVE-14042. In case of an abort request, throw an InterruptedException
   }
 
-  private boolean checkLimitReached() {
-    String queryId = HiveConf.getVar(jobConf, HiveConf.ConfVars.HIVEQUERYID);
-    String limitReachedKey = LimitOperator.getLimitReachedKey(jobConf);
-
-    try {
-      return ObjectCacheFactory.getCache(jobConf, queryId, false, true)
-          .retrieve(limitReachedKey, new Callable<AtomicBoolean>() {
-            @Override
-            public AtomicBoolean call() {
-              return new AtomicBoolean(false);
-            }
-          }).get();
-    } catch (HiveException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
   protected void initializeAndRunProcessor(Map<String, LogicalInput> inputs,
       Map<String, LogicalOutput> outputs)
       throws Exception {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index e0787f5..3331228 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.common.StringInternUtils;
 import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
@@ -35,12 +36,14 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
 import org.apache.hadoop.hive.llap.io.api.LlapIo;
 import org.apache.hadoop.hive.llap.io.api.LlapProxy;
+import org.apache.hadoop.hive.ql.exec.LimitOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.tez.HashableInputSplit;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.NullRowsInputFormat.NullRowsRecordReader;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
@@ -80,6 +83,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Serializable;
+import java.nio.channels.ClosedByInterruptException;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -445,14 +449,31 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
       // Handle the special header/footer skipping cases here.
       innerReader = RecordReaderWrapper.create(inputFormat, hsplit, part.getTableDesc(), job, reporter);
     } catch (Exception e) {
-      innerReader = HiveIOExceptionHandlerUtil
-          .handleRecordReaderCreationException(e, job);
+      Throwable rootCause = JavaUtils.findRootCause(e);
+      if (checkLimitReached(job)
+          && (rootCause instanceof InterruptedException || rootCause instanceof ClosedByInterruptException)) {
+        LOG.info("Ignoring exception while getting record reader as limit is reached", rootCause);
+        innerReader = new NullRowsRecordReader(job, split);
+      } else {
+        innerReader = HiveIOExceptionHandlerUtil
+            .handleRecordReaderCreationException(e, job);
+      }
     }
     HiveRecordReader<K,V> rr = new HiveRecordReader(innerReader, job);
     rr.initIOContext(hsplit, job, inputFormatClass, innerReader);
     return rr;
   }
 
+  private boolean checkLimitReached(JobConf job) {
+    /*
+     * Assuming that "tez.mapreduce.vertex.name" is present in case of tez.
+     * If it's not present (e.g. different execution engine), then checkLimitReachedForVertex will return
+     * false due to an invalid cache key (like: "null_limit_reached"), so this will silently acts as
+     * limit hasn't been reached, which is a proper behavior in case we don't support early bailout.
+     */
+    return LimitOperator.checkLimitReachedForVertex(job, job.get("tez.mapreduce.vertex.name"));
+  }
+
   protected void init(JobConf job) {
     if (mrwork == null || pathToPartitionInfo == null) {
       if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
diff --git a/ql/src/test/queries/clientpositive/limit_bailout.q b/ql/src/test/queries/clientpositive/limit_bailout.q
new file mode 100644
index 0000000..3f6cd16
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/limit_bailout.q
@@ -0,0 +1,7 @@
+-- this test should run in TestMiniLlapCliDriver (not local) as it validates HIVE-25751,
+-- that typically reproduces on DFSClient codepath
+create table limit_bailout_src_text(c string);
+load data local inpath '../../data/files/smbdata.txt' into table limit_bailout_src_text;
+create table limit_bailout(c string) clustered by (c) sorted by (c) into 5 buckets;
+insert overwrite table limit_bailout select * from limit_bailout_src_text;
+select 1 from limit_bailout join limit_bailout t1 on limit_bailout.c=t1.c limit 1;
diff --git a/ql/src/test/results/clientpositive/llap/limit_bailout.q.out b/ql/src/test/results/clientpositive/llap/limit_bailout.q.out
new file mode 100644
index 0000000..fa7c8d0
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/limit_bailout.q.out
@@ -0,0 +1,42 @@
+PREHOOK: query: create table limit_bailout_src_text(c string)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@limit_bailout_src_text
+POSTHOOK: query: create table limit_bailout_src_text(c string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@limit_bailout_src_text
+PREHOOK: query: load data local inpath '../../data/files/smbdata.txt' into table limit_bailout_src_text
+PREHOOK: type: LOAD
+#### A masked pattern was here ####
+PREHOOK: Output: default@limit_bailout_src_text
+POSTHOOK: query: load data local inpath '../../data/files/smbdata.txt' into table limit_bailout_src_text
+POSTHOOK: type: LOAD
+#### A masked pattern was here ####
+POSTHOOK: Output: default@limit_bailout_src_text
+PREHOOK: query: create table limit_bailout(c string) clustered by (c) sorted by (c) into 5 buckets
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@limit_bailout
+POSTHOOK: query: create table limit_bailout(c string) clustered by (c) sorted by (c) into 5 buckets
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@limit_bailout
+PREHOOK: query: insert overwrite table limit_bailout select * from limit_bailout_src_text
+PREHOOK: type: QUERY
+PREHOOK: Input: default@limit_bailout_src_text
+PREHOOK: Output: default@limit_bailout
+POSTHOOK: query: insert overwrite table limit_bailout select * from limit_bailout_src_text
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@limit_bailout_src_text
+POSTHOOK: Output: default@limit_bailout
+POSTHOOK: Lineage: limit_bailout.c SIMPLE [(limit_bailout_src_text)limit_bailout_src_text.FieldSchema(name:c, type:string, comment:null), ]
+PREHOOK: query: select 1 from limit_bailout join limit_bailout t1 on limit_bailout.c=t1.c limit 1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@limit_bailout
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select 1 from limit_bailout join limit_bailout t1 on limit_bailout.c=t1.c limit 1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@limit_bailout
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1
diff --git a/ql/src/test/results/clientpositive/tez/limit_bailout.q.out b/ql/src/test/results/clientpositive/tez/limit_bailout.q.out
new file mode 100644
index 0000000..fa7c8d0
--- /dev/null
+++ b/ql/src/test/results/clientpositive/tez/limit_bailout.q.out
@@ -0,0 +1,42 @@
+PREHOOK: query: create table limit_bailout_src_text(c string)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@limit_bailout_src_text
+POSTHOOK: query: create table limit_bailout_src_text(c string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@limit_bailout_src_text
+PREHOOK: query: load data local inpath '../../data/files/smbdata.txt' into table limit_bailout_src_text
+PREHOOK: type: LOAD
+#### A masked pattern was here ####
+PREHOOK: Output: default@limit_bailout_src_text
+POSTHOOK: query: load data local inpath '../../data/files/smbdata.txt' into table limit_bailout_src_text
+POSTHOOK: type: LOAD
+#### A masked pattern was here ####
+POSTHOOK: Output: default@limit_bailout_src_text
+PREHOOK: query: create table limit_bailout(c string) clustered by (c) sorted by (c) into 5 buckets
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@limit_bailout
+POSTHOOK: query: create table limit_bailout(c string) clustered by (c) sorted by (c) into 5 buckets
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@limit_bailout
+PREHOOK: query: insert overwrite table limit_bailout select * from limit_bailout_src_text
+PREHOOK: type: QUERY
+PREHOOK: Input: default@limit_bailout_src_text
+PREHOOK: Output: default@limit_bailout
+POSTHOOK: query: insert overwrite table limit_bailout select * from limit_bailout_src_text
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@limit_bailout_src_text
+POSTHOOK: Output: default@limit_bailout
+POSTHOOK: Lineage: limit_bailout.c SIMPLE [(limit_bailout_src_text)limit_bailout_src_text.FieldSchema(name:c, type:string, comment:null), ]
+PREHOOK: query: select 1 from limit_bailout join limit_bailout t1 on limit_bailout.c=t1.c limit 1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@limit_bailout
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select 1 from limit_bailout join limit_bailout t1 on limit_bailout.c=t1.c limit 1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@limit_bailout
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1