You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ab...@apache.org on 2021/12/13 09:22:16 UTC
[hive] branch master updated: HIVE-25751: Ignore exceptions related to interruption when the limit is reached (#2828) (Laszlo Bodor reviewed by Rajesh Balamohan and Panagiotis Garefalakis)
This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new a83446a HIVE-25751: Ignore exceptions related to interruption when the limit is reached (#2828) (Laszlo Bodor reviewed by Rajesh Balamohan and Panagiotis Garefalakis)
a83446a is described below
commit a83446a2b48213c542c88ae59b339b759d06f4ef
Author: Bodor Laszlo <bo...@gmail.com>
AuthorDate: Mon Dec 13 10:21:49 2021 +0100
HIVE-25751: Ignore exceptions related to interruption when the limit is reached (#2828) (Laszlo Bodor reviewed by Rajesh Balamohan and Panagiotis Garefalakis)
---
.../org/apache/hadoop/hive/common/JavaUtils.java | 8 +++++
.../test/resources/testconfiguration.properties | 2 ++
.../hive/llap/io/api/impl/LlapInputFormat.java | 24 ++++++++++++-
.../apache/hadoop/hive/ql/exec/LimitOperator.java | 31 +++++++++++++++-
.../hadoop/hive/ql/exec/tez/TezProcessor.java | 19 +---------
.../apache/hadoop/hive/ql/io/HiveInputFormat.java | 25 +++++++++++--
ql/src/test/queries/clientpositive/limit_bailout.q | 7 ++++
.../clientpositive/llap/limit_bailout.q.out | 42 ++++++++++++++++++++++
.../results/clientpositive/tez/limit_bailout.q.out | 42 ++++++++++++++++++++++
9 files changed, 178 insertions(+), 22 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java b/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java
index e5c9a4f..8d0d6c5 100644
--- a/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java
@@ -121,4 +121,12 @@ public final class JavaUtils {
private JavaUtils() {
// prevent instantiation
}
+
+ public static Throwable findRootCause(Throwable throwable) {
+ Throwable rootCause = throwable;
+ while (rootCause.getCause() != null && rootCause.getCause() != rootCause) {
+ rootCause = rootCause.getCause();
+ }
+ return rootCause;
+ }
}
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index f5f50e5..99901ac 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -23,6 +23,7 @@ minitez.query.files=\
explainanalyze_4.q,\
explainanalyze_5.q,\
explainuser_3.q,\
+ limit_bailout.q,\
mapjoin_addjar.q,\
orc_merge12.q,\
orc_vectorization_ppd.q,\
@@ -91,6 +92,7 @@ minillap.query.files=\
intersect_all.q,\
intersect_distinct.q,\
intersect_merge.q,\
+ limit_bailout.q,\
llap_nullscan.q,\
llap_stats.q,\
llap_udf.q,\
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
index 6cbf1eb..789b637 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
@@ -21,10 +21,12 @@ package org.apache.hadoop.hive.llap.io.api.impl;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedSupport;
import org.apache.hadoop.hive.ql.io.BatchToRowInputFormat;
+import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import java.io.IOException;
+import java.nio.channels.ClosedByInterruptException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -38,6 +40,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.LimitOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.RowSchema;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
@@ -47,6 +50,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.AvoidSplitCombination;
import org.apache.hadoop.hive.ql.io.LlapAwareSplit;
+import org.apache.hadoop.hive.ql.io.NullRowsInputFormat.NullRowsRecordReader;
import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
@@ -138,10 +142,28 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB
rr.start();
return result;
} catch (Exception ex) {
- throw new IOException(ex);
+ Throwable rootCause = JavaUtils.findRootCause(ex);
+ if (checkLimitReached(job)
+ && (rootCause instanceof InterruptedException || rootCause instanceof ClosedByInterruptException)) {
+ LlapIoImpl.LOG.info("Ignoring exception while getting record reader as limit is reached", rootCause);
+ return new NullRowsRecordReader(job, split);
+ } else {
+ throw new IOException(ex);
+ }
}
}
+ private boolean checkLimitReached(JobConf job) {
+ /*
+ * 2 assumptions here when using "tez.mapreduce.vertex.name"
+ *
+ * 1. The execution engine is tez, which is valid in case of LLAP.
+ * 2. The property "tez.mapreduce.vertex.name" is present: it is handled in MRInputBase.initialize.
+ * On Input codepaths we cannot use properties from TezProcessor.initTezAttributes.
+ */
+ return LimitOperator.checkLimitReachedForVertex(job, job.get("tez.mapreduce.vertex.name"));
+ }
+
private RecordReader<NullWritable, VectorizedRowBatch> wrapLlapReader(
List<Integer> includedCols, LlapRecordReader rr, InputSplit split) throws IOException {
// vectorized row batch reader
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java
index 608a0fd..41bee8d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.exec.tez.TezProcessor;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.LimitDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.mapred.JobConf;
/**
* Limit operator implementation Limits the number of rows to be passed on.
@@ -38,6 +39,8 @@ import org.apache.hadoop.hive.ql.plan.api.OperatorType;
public class LimitOperator extends Operator<LimitDesc> implements Serializable {
private static final long serialVersionUID = 1L;
+ private static final String LIMIT_REACHED_KEY_SUFFIX = "_limit_reached";
+
protected transient int limit;
protected transient int offset;
protected transient int leastRow;
@@ -161,6 +164,32 @@ public class LimitOperator extends Operator<LimitDesc> implements Serializable {
}
public static String getLimitReachedKey(Configuration conf) {
- return conf.get(TezProcessor.HIVE_TEZ_VERTEX_NAME) + "_limit_reached";
+ return conf.get(TezProcessor.HIVE_TEZ_VERTEX_NAME) + LIMIT_REACHED_KEY_SUFFIX;
+ }
+
+ public static boolean checkLimitReached(JobConf jobConf) {
+ String queryId = HiveConf.getVar(jobConf, HiveConf.ConfVars.HIVEQUERYID);
+ String limitReachedKey = getLimitReachedKey(jobConf);
+
+ return checkLimitReached(jobConf, queryId, limitReachedKey);
+ }
+
+ public static boolean checkLimitReachedForVertex(JobConf jobConf, String vertexName) {
+ String queryId = HiveConf.getVar(jobConf, HiveConf.ConfVars.HIVEQUERYID);
+ return checkLimitReached(jobConf, queryId, vertexName + LIMIT_REACHED_KEY_SUFFIX);
+ }
+
+ private static boolean checkLimitReached(JobConf jobConf, String queryId, String limitReachedKey) {
+ try {
+ return ObjectCacheFactory.getCache(jobConf, queryId, false, true)
+ .retrieve(limitReachedKey, new Callable<AtomicBoolean>() {
+ @Override
+ public AtomicBoolean call() {
+ return new AtomicBoolean(false);
+ }
+ }).get();
+ } catch (HiveException e) {
+ throw new RuntimeException(e);
+ }
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
index 0cbcc26..2558d01 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
@@ -244,7 +244,7 @@ public class TezProcessor extends AbstractLogicalIOProcessor {
}
synchronized (this) {
- boolean limitReached = checkLimitReached();
+ boolean limitReached = LimitOperator.checkLimitReached(jobConf);
if (limitReached) {
LOG.info(
"TezProcessor exits early as query limit already reached, vertex: {}, task: {}, attempt: {}",
@@ -279,23 +279,6 @@ public class TezProcessor extends AbstractLogicalIOProcessor {
// TODO HIVE-14042. In case of an abort request, throw an InterruptedException
}
- private boolean checkLimitReached() {
- String queryId = HiveConf.getVar(jobConf, HiveConf.ConfVars.HIVEQUERYID);
- String limitReachedKey = LimitOperator.getLimitReachedKey(jobConf);
-
- try {
- return ObjectCacheFactory.getCache(jobConf, queryId, false, true)
- .retrieve(limitReachedKey, new Callable<AtomicBoolean>() {
- @Override
- public AtomicBoolean call() {
- return new AtomicBoolean(false);
- }
- }).get();
- } catch (HiveException e) {
- throw new RuntimeException(e);
- }
- }
-
protected void initializeAndRunProcessor(Map<String, LogicalInput> inputs,
Map<String, LogicalOutput> outputs)
throws Exception {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index e0787f5..3331228 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.common.StringInternUtils;
import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
@@ -35,12 +36,14 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
import org.apache.hadoop.hive.llap.io.api.LlapIo;
import org.apache.hadoop.hive.llap.io.api.LlapProxy;
+import org.apache.hadoop.hive.ql.exec.LimitOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.tez.HashableInputSplit;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.NullRowsInputFormat.NullRowsRecordReader;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
@@ -80,6 +83,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
+import java.nio.channels.ClosedByInterruptException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
@@ -445,14 +449,31 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
// Handle the special header/footer skipping cases here.
innerReader = RecordReaderWrapper.create(inputFormat, hsplit, part.getTableDesc(), job, reporter);
} catch (Exception e) {
- innerReader = HiveIOExceptionHandlerUtil
- .handleRecordReaderCreationException(e, job);
+ Throwable rootCause = JavaUtils.findRootCause(e);
+ if (checkLimitReached(job)
+ && (rootCause instanceof InterruptedException || rootCause instanceof ClosedByInterruptException)) {
+ LOG.info("Ignoring exception while getting record reader as limit is reached", rootCause);
+ innerReader = new NullRowsRecordReader(job, split);
+ } else {
+ innerReader = HiveIOExceptionHandlerUtil
+ .handleRecordReaderCreationException(e, job);
+ }
}
HiveRecordReader<K,V> rr = new HiveRecordReader(innerReader, job);
rr.initIOContext(hsplit, job, inputFormatClass, innerReader);
return rr;
}
+ private boolean checkLimitReached(JobConf job) {
+ /*
+ * Assuming that "tez.mapreduce.vertex.name" is present in case of tez.
+ * If it's not present (e.g. different execution engine), then checkLimitReachedForVertex will return
+ * false due to an invalid cache key (like: "null_limit_reached"), so this will silently acts as
+ * limit hasn't been reached, which is a proper behavior in case we don't support early bailout.
+ */
+ return LimitOperator.checkLimitReachedForVertex(job, job.get("tez.mapreduce.vertex.name"));
+ }
+
protected void init(JobConf job) {
if (mrwork == null || pathToPartitionInfo == null) {
if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
diff --git a/ql/src/test/queries/clientpositive/limit_bailout.q b/ql/src/test/queries/clientpositive/limit_bailout.q
new file mode 100644
index 0000000..3f6cd16
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/limit_bailout.q
@@ -0,0 +1,7 @@
+-- this test should run in TestMiniLlapCliDriver (not local) as it validates HIVE-25751,
+-- that typically reproduces on DFSClient codepath
+create table limit_bailout_src_text(c string);
+load data local inpath '../../data/files/smbdata.txt' into table limit_bailout_src_text;
+create table limit_bailout(c string) clustered by (c) sorted by (c) into 5 buckets;
+insert overwrite table limit_bailout select * from limit_bailout_src_text;
+select 1 from limit_bailout join limit_bailout t1 on limit_bailout.c=t1.c limit 1;
diff --git a/ql/src/test/results/clientpositive/llap/limit_bailout.q.out b/ql/src/test/results/clientpositive/llap/limit_bailout.q.out
new file mode 100644
index 0000000..fa7c8d0
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/limit_bailout.q.out
@@ -0,0 +1,42 @@
+PREHOOK: query: create table limit_bailout_src_text(c string)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@limit_bailout_src_text
+POSTHOOK: query: create table limit_bailout_src_text(c string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@limit_bailout_src_text
+PREHOOK: query: load data local inpath '../../data/files/smbdata.txt' into table limit_bailout_src_text
+PREHOOK: type: LOAD
+#### A masked pattern was here ####
+PREHOOK: Output: default@limit_bailout_src_text
+POSTHOOK: query: load data local inpath '../../data/files/smbdata.txt' into table limit_bailout_src_text
+POSTHOOK: type: LOAD
+#### A masked pattern was here ####
+POSTHOOK: Output: default@limit_bailout_src_text
+PREHOOK: query: create table limit_bailout(c string) clustered by (c) sorted by (c) into 5 buckets
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@limit_bailout
+POSTHOOK: query: create table limit_bailout(c string) clustered by (c) sorted by (c) into 5 buckets
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@limit_bailout
+PREHOOK: query: insert overwrite table limit_bailout select * from limit_bailout_src_text
+PREHOOK: type: QUERY
+PREHOOK: Input: default@limit_bailout_src_text
+PREHOOK: Output: default@limit_bailout
+POSTHOOK: query: insert overwrite table limit_bailout select * from limit_bailout_src_text
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@limit_bailout_src_text
+POSTHOOK: Output: default@limit_bailout
+POSTHOOK: Lineage: limit_bailout.c SIMPLE [(limit_bailout_src_text)limit_bailout_src_text.FieldSchema(name:c, type:string, comment:null), ]
+PREHOOK: query: select 1 from limit_bailout join limit_bailout t1 on limit_bailout.c=t1.c limit 1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@limit_bailout
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select 1 from limit_bailout join limit_bailout t1 on limit_bailout.c=t1.c limit 1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@limit_bailout
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1
diff --git a/ql/src/test/results/clientpositive/tez/limit_bailout.q.out b/ql/src/test/results/clientpositive/tez/limit_bailout.q.out
new file mode 100644
index 0000000..fa7c8d0
--- /dev/null
+++ b/ql/src/test/results/clientpositive/tez/limit_bailout.q.out
@@ -0,0 +1,42 @@
+PREHOOK: query: create table limit_bailout_src_text(c string)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@limit_bailout_src_text
+POSTHOOK: query: create table limit_bailout_src_text(c string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@limit_bailout_src_text
+PREHOOK: query: load data local inpath '../../data/files/smbdata.txt' into table limit_bailout_src_text
+PREHOOK: type: LOAD
+#### A masked pattern was here ####
+PREHOOK: Output: default@limit_bailout_src_text
+POSTHOOK: query: load data local inpath '../../data/files/smbdata.txt' into table limit_bailout_src_text
+POSTHOOK: type: LOAD
+#### A masked pattern was here ####
+POSTHOOK: Output: default@limit_bailout_src_text
+PREHOOK: query: create table limit_bailout(c string) clustered by (c) sorted by (c) into 5 buckets
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@limit_bailout
+POSTHOOK: query: create table limit_bailout(c string) clustered by (c) sorted by (c) into 5 buckets
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@limit_bailout
+PREHOOK: query: insert overwrite table limit_bailout select * from limit_bailout_src_text
+PREHOOK: type: QUERY
+PREHOOK: Input: default@limit_bailout_src_text
+PREHOOK: Output: default@limit_bailout
+POSTHOOK: query: insert overwrite table limit_bailout select * from limit_bailout_src_text
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@limit_bailout_src_text
+POSTHOOK: Output: default@limit_bailout
+POSTHOOK: Lineage: limit_bailout.c SIMPLE [(limit_bailout_src_text)limit_bailout_src_text.FieldSchema(name:c, type:string, comment:null), ]
+PREHOOK: query: select 1 from limit_bailout join limit_bailout t1 on limit_bailout.c=t1.c limit 1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@limit_bailout
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select 1 from limit_bailout join limit_bailout t1 on limit_bailout.c=t1.c limit 1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@limit_bailout
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1