You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2015/05/07 23:09:24 UTC
hive git commit: HIVE-10429: LLAP: Abort hive tez processor on
interrupts (Prasanth Jayachandran reviewed by Gunther Hagleitner)
Repository: hive
Updated Branches:
refs/heads/llap 7f21a4254 -> 20ac70b15
HIVE-10429: LLAP: Abort hive tez processor on interrupts (Prasanth Jayachandran reviewed by Gunther Hagleitner)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/20ac70b1
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/20ac70b1
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/20ac70b1
Branch: refs/heads/llap
Commit: 20ac70b1546d9508f050918b1cb85d6fba9da9c0
Parents: 7f21a42
Author: Prasanth Jayachandran <j....@gmail.com>
Authored: Thu May 7 14:08:50 2015 -0700
Committer: Prasanth Jayachandran <j....@gmail.com>
Committed: Thu May 7 14:08:50 2015 -0700
----------------------------------------------------------------------
.../apache/hadoop/hive/ql/exec/Operator.java | 15 ++++++++++-
.../hive/ql/exec/tez/MapRecordProcessor.java | 28 +++++++++++++++++---
.../ql/exec/tez/MergeFileRecordProcessor.java | 7 ++++-
.../hive/ql/exec/tez/RecordProcessor.java | 24 ++++++++---------
.../hive/ql/exec/tez/ReduceRecordProcessor.java | 25 +++++++++++++++--
.../hadoop/hive/ql/exec/tez/TezProcessor.java | 4 +++
6 files changed, 83 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/20ac70b1/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
index d7f1b42..3a6c5f1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
@@ -68,6 +69,7 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C
protected List<Operator<? extends OperatorDesc>> childOperators;
protected List<Operator<? extends OperatorDesc>> parentOperators;
protected String operatorId;
+ protected AtomicBoolean abortOp;
private transient ExecMapperContext execContext;
private transient boolean rootInitializeCalled = false;
@@ -106,6 +108,7 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C
initOperatorId();
childOperators = new ArrayList<Operator<? extends OperatorDesc>>();
parentOperators = new ArrayList<Operator<? extends OperatorDesc>>();
+ abortOp = new AtomicBoolean(false);
}
public Operator() {
@@ -383,7 +386,11 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C
int i = 0;
for (Future<?> f : fs) {
try {
- os[i++] = f.get();
+ if (abortOp.get()) {
+ f.cancel(true);
+ } else {
+ os[i++] = f.get();
+ }
} catch (Exception e) {
throw new HiveException(e);
}
@@ -442,6 +449,10 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C
}
}
+ public void abort() {
+ abortOp.set(true);
+ }
+
/**
* Pass the execContext reference to every child operator
*/
@@ -612,6 +623,8 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C
LOG.info(id + " finished. closing... ");
}
+ abort |= abortOp.get();
+
// call the operator specific close routine
closeOp(abort);
http://git-wip-us.apache.org/repos/asf/hive/blob/20ac70b1/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
index 44d5418..2172fdb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
import org.apache.hadoop.hive.ql.exec.tez.tools.KeyValueInputMerger;
import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -70,11 +71,11 @@ import org.apache.tez.runtime.library.api.KeyValueReader;
* Just pump the records through the query plan.
*/
public class MapRecordProcessor extends RecordProcessor {
-
+ public static final Log l4j = LogFactory.getLog(MapRecordProcessor.class);
+ protected static final String MAP_PLAN_KEY = "__MAP_PLAN__";
private MapOperator mapOp;
private final List<MapOperator> mergeMapOpList = new ArrayList<MapOperator>();
- public static final Log l4j = LogFactory.getLog(MapRecordProcessor.class);
private MapRecordSource[] sources;
private final Map<String, MultiMRInput> multiMRInputMap = new HashMap<String, MultiMRInput>();
private int position;
@@ -82,11 +83,11 @@ public class MapRecordProcessor extends RecordProcessor {
MultiMRInput mainWorkMultiMRInput;
private final ExecMapperContext execContext;
private boolean abort;
- protected static final String MAP_PLAN_KEY = "__MAP_PLAN__";
private MapWork mapWork;
List<MapWork> mergeWorkList;
List<String> cacheKeys;
ObjectCache cache;
+ private int nRows;
private static Map<Integer, DummyStoreOperator> connectOps =
new TreeMap<Integer, DummyStoreOperator>();
@@ -101,6 +102,7 @@ public class MapRecordProcessor extends RecordProcessor {
execContext = new ExecMapperContext(jconf);
execContext.setJc(jconf);
cacheKeys = new ArrayList<String>();
+ nRows = 0;
}
@Override
@@ -311,7 +313,25 @@ public class MapRecordProcessor extends RecordProcessor {
@Override
void run() throws Exception {
- while (sources[position].pushRecord()) {}
+ while (sources[position].pushRecord()) {
+ if (nRows++ == CHECK_INTERRUPTION_AFTER_ROWS) {
+ if (abort && Thread.interrupted()) {
+ throw new HiveException("Processing thread interrupted");
+ }
+ nRows = 0;
+ }
+ }
+ }
+
+ @Override
+ public void abort() {
+ // this will stop run() from pushing records
+ abort = true;
+
+ // this will abort initializeOp()
+ if (mapOp != null) {
+ mapOp.abort();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/20ac70b1/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
index d82a048..7c0eb89 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
@@ -148,13 +148,18 @@ public class MergeFileRecordProcessor extends RecordProcessor {
while (reader.next()) {
boolean needMore = processRow(reader.getCurrentKey(),
reader.getCurrentValue());
- if (!needMore) {
+ if (!needMore || abort) {
break;
}
}
}
@Override
+ void abort() {
+ abort = true;
+ }
+
+ @Override
void close() {
if (cache != null && cacheKey != null) {
http://git-wip-us.apache.org/repos/asf/hive/blob/20ac70b1/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
index c563d9d..0859dc4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
@@ -16,8 +16,14 @@
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.exec.tez;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.ObjectCache;
@@ -26,7 +32,6 @@ import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
-import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.tez.mapreduce.processor.MRTaskReporter;
@@ -34,21 +39,15 @@ import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.ProcessorContext;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.Callable;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
/**
* Process input from tez LogicalInput and write output
* It has different subclasses for map and reduce processing
*/
public abstract class RecordProcessor {
+ protected static final int CHECK_INTERRUPTION_AFTER_ROWS = 1000;
protected final JobConf jconf;
protected Map<String, LogicalInput> inputs;
@@ -108,6 +107,7 @@ public abstract class RecordProcessor {
*/
abstract void run() throws Exception;
+ abstract void abort();
abstract void close();
http://git-wip-us.apache.org/repos/asf/hive/blob/20ac70b1/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
index 23ef420..63c427f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats;
import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -83,6 +84,7 @@ public class ReduceRecordProcessor extends RecordProcessor{
private byte bigTablePosition = 0;
private boolean abort;
+ private int nRows = 0;
public ReduceRecordProcessor(final JobConf jconf, final ProcessorContext context) throws Exception {
super(jconf, context);
@@ -246,12 +248,31 @@ public class ReduceRecordProcessor extends RecordProcessor{
for (Entry<String, LogicalOutput> outputEntry : outputs.entrySet()) {
l4j.info("Starting Output: " + outputEntry.getKey());
- outputEntry.getValue().start();
- ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize();
+ if (!abort) {
+ outputEntry.getValue().start();
+ ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize();
+ }
}
// run the operator pipeline
while (sources[bigTablePosition].pushRecord()) {
+ if (nRows++ == CHECK_INTERRUPTION_AFTER_ROWS) {
+ if (abort && Thread.interrupted()) {
+ throw new HiveException("Processing thread interrupted");
+ }
+ nRows = 0;
+ }
+ }
+ }
+
+ @Override
+ public void abort() {
+ // this will stop run() from pushing records
+ abort = true;
+
+ // this will abort initializeOp()
+ if (reducer != null) {
+ reducer.abort();
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/20ac70b1/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
index 39f9db6..f8c5314 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
@@ -173,6 +173,10 @@ public class TezProcessor extends AbstractLogicalIOProcessor {
}
}
+ public void abort() {
+ rproc.abort();
+ }
+
/**
* KVOutputCollector. OutputCollector that writes using KVWriter.
* Must be initialized before it is used.