You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2015/05/07 23:09:24 UTC

hive git commit: HIVE-10429: LLAP: Abort hive tez processor on interrupts (Prasanth Jayachandran reviewed by Gunther Hagleitner)

Repository: hive
Updated Branches:
  refs/heads/llap 7f21a4254 -> 20ac70b15


HIVE-10429: LLAP: Abort hive tez processor on interrupts (Prasanth Jayachandran reviewed by Gunther Hagleitner)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/20ac70b1
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/20ac70b1
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/20ac70b1

Branch: refs/heads/llap
Commit: 20ac70b1546d9508f050918b1cb85d6fba9da9c0
Parents: 7f21a42
Author: Prasanth Jayachandran <j....@gmail.com>
Authored: Thu May 7 14:08:50 2015 -0700
Committer: Prasanth Jayachandran <j....@gmail.com>
Committed: Thu May 7 14:08:50 2015 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hive/ql/exec/Operator.java    | 15 ++++++++++-
 .../hive/ql/exec/tez/MapRecordProcessor.java    | 28 +++++++++++++++++---
 .../ql/exec/tez/MergeFileRecordProcessor.java   |  7 ++++-
 .../hive/ql/exec/tez/RecordProcessor.java       | 24 ++++++++---------
 .../hive/ql/exec/tez/ReduceRecordProcessor.java | 25 +++++++++++++++--
 .../hadoop/hive/ql/exec/tez/TezProcessor.java   |  4 +++
 6 files changed, 83 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/20ac70b1/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
index d7f1b42..3a6c5f1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
@@ -68,6 +69,7 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C
   protected List<Operator<? extends OperatorDesc>> childOperators;
   protected List<Operator<? extends OperatorDesc>> parentOperators;
   protected String operatorId;
+  protected AtomicBoolean abortOp;
   private transient ExecMapperContext execContext;
   private transient boolean rootInitializeCalled = false;
 
@@ -106,6 +108,7 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C
     initOperatorId();
     childOperators = new ArrayList<Operator<? extends OperatorDesc>>();
     parentOperators = new ArrayList<Operator<? extends OperatorDesc>>();
+    abortOp = new AtomicBoolean(false);
   }
 
   public Operator() {
@@ -383,7 +386,11 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C
     int i = 0;
     for (Future<?> f : fs) {
       try {
-        os[i++] = f.get();
+        if (abortOp.get()) {
+          f.cancel(true);
+        } else {
+          os[i++] = f.get();
+        }
       } catch (Exception e) {
         throw new HiveException(e);
       }
@@ -442,6 +449,10 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C
     }
   }
 
+  public void abort() {
+    abortOp.set(true);
+  }
+
   /**
    * Pass the execContext reference to every child operator
    */
@@ -612,6 +623,8 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C
       LOG.info(id + " finished. closing... ");
     }
 
+    abort |= abortOp.get();
+
     // call the operator specific close routine
     closeOp(abort);
 

http://git-wip-us.apache.org/repos/asf/hive/blob/20ac70b1/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
index 44d5418..2172fdb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
 import org.apache.hadoop.hive.ql.exec.tez.tools.KeyValueInputMerger;
 import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -70,11 +71,11 @@ import org.apache.tez.runtime.library.api.KeyValueReader;
  * Just pump the records through the query plan.
  */
 public class MapRecordProcessor extends RecordProcessor {
-
+  public static final Log l4j = LogFactory.getLog(MapRecordProcessor.class);
+  protected static final String MAP_PLAN_KEY = "__MAP_PLAN__";
 
   private MapOperator mapOp;
   private final List<MapOperator> mergeMapOpList = new ArrayList<MapOperator>();
-  public static final Log l4j = LogFactory.getLog(MapRecordProcessor.class);
   private MapRecordSource[] sources;
   private final Map<String, MultiMRInput> multiMRInputMap = new HashMap<String, MultiMRInput>();
   private int position;
@@ -82,11 +83,11 @@ public class MapRecordProcessor extends RecordProcessor {
   MultiMRInput mainWorkMultiMRInput;
   private final ExecMapperContext execContext;
   private boolean abort;
-  protected static final String MAP_PLAN_KEY = "__MAP_PLAN__";
   private MapWork mapWork;
   List<MapWork> mergeWorkList;
   List<String> cacheKeys;
   ObjectCache cache;
+  private int nRows;
 
   private static Map<Integer, DummyStoreOperator> connectOps =
     new TreeMap<Integer, DummyStoreOperator>();
@@ -101,6 +102,7 @@ public class MapRecordProcessor extends RecordProcessor {
     execContext = new ExecMapperContext(jconf);
     execContext.setJc(jconf);
     cacheKeys = new ArrayList<String>();
+    nRows = 0;
   }
 
   @Override
@@ -311,7 +313,25 @@ public class MapRecordProcessor extends RecordProcessor {
 
   @Override
   void run() throws Exception {
-    while (sources[position].pushRecord()) {}
+    while (sources[position].pushRecord()) {
+      if (nRows++ == CHECK_INTERRUPTION_AFTER_ROWS) {
+        if (abort && Thread.interrupted()) {
+          throw new HiveException("Processing thread interrupted");
+        }
+        nRows = 0;
+      }
+    }
+  }
+
+  @Override
+  public void abort() {
+    // this will stop run() from pushing records
+    abort = true;
+
+    // this will abort initializeOp()
+    if (mapOp != null) {
+      mapOp.abort();
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/20ac70b1/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
index d82a048..7c0eb89 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
@@ -148,13 +148,18 @@ public class MergeFileRecordProcessor extends RecordProcessor {
     while (reader.next()) {
       boolean needMore = processRow(reader.getCurrentKey(),
           reader.getCurrentValue());
-      if (!needMore) {
+      if (!needMore || abort) {
         break;
       }
     }
   }
 
   @Override
+  void abort() {
+    abort = true;
+  }
+
+  @Override
   void close() {
 
     if (cache != null && cacheKey != null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/20ac70b1/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
index c563d9d..0859dc4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
@@ -16,8 +16,14 @@
  * limitations under the License.
  */
 package org.apache.hadoop.hive.ql.exec.tez;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.exec.ObjectCache;
@@ -26,7 +32,6 @@ import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
-import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
@@ -34,21 +39,15 @@ import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.ProcessorContext;
 
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.Callable;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 
 /**
  * Process input from tez LogicalInput and write output
  * It has different subclasses for map and reduce processing
  */
 public abstract class RecordProcessor  {
+  protected static final int CHECK_INTERRUPTION_AFTER_ROWS = 1000;
 
   protected final JobConf jconf;
   protected Map<String, LogicalInput> inputs;
@@ -108,6 +107,7 @@ public abstract class RecordProcessor  {
    */
   abstract void run() throws Exception;
 
+  abstract void abort();
 
   abstract void close();
 

http://git-wip-us.apache.org/repos/asf/hive/blob/20ac70b1/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
index 23ef420..63c427f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats;
 import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -83,6 +84,7 @@ public class ReduceRecordProcessor  extends RecordProcessor{
   private byte bigTablePosition = 0;
 
   private boolean abort;
+  private int nRows = 0;
 
   public ReduceRecordProcessor(final JobConf jconf, final ProcessorContext context) throws Exception {
     super(jconf, context);
@@ -246,12 +248,31 @@ public class ReduceRecordProcessor  extends RecordProcessor{
 
     for (Entry<String, LogicalOutput> outputEntry : outputs.entrySet()) {
       l4j.info("Starting Output: " + outputEntry.getKey());
-      outputEntry.getValue().start();
-      ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize();
+      if (!abort) {
+        outputEntry.getValue().start();
+        ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize();
+      }
     }
 
     // run the operator pipeline
     while (sources[bigTablePosition].pushRecord()) {
+      if (nRows++ == CHECK_INTERRUPTION_AFTER_ROWS) {
+        if (abort && Thread.interrupted()) {
+          throw new HiveException("Processing thread interrupted");
+        }
+        nRows = 0;
+      }
+    }
+  }
+
+  @Override
+  public void abort() {
+    // this will stop run() from pushing records
+    abort = true;
+
+    // this will abort initializeOp()
+    if (reducer != null) {
+      reducer.abort();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/20ac70b1/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
index 39f9db6..f8c5314 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
@@ -173,6 +173,10 @@ public class TezProcessor extends AbstractLogicalIOProcessor {
     }
   }
 
+  public void abort() {
+    rproc.abort();
+  }
+
   /**
    * KVOutputCollector. OutputCollector that writes using KVWriter.
    * Must be initialized before it is used.