You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by yc...@apache.org on 2017/04/24 14:50:18 UTC

hive git commit: HIVE-16426: Query cancel: improve the way to handle files.

Repository: hive
Updated Branches:
  refs/heads/master eaa439e39 -> 19a1a4b42


HIVE-16426: Query cancel: improve the way to handle files.


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/19a1a4b4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/19a1a4b4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/19a1a4b4

Branch: refs/heads/master
Commit: 19a1a4b4280b017c20b5123232aefb1fb4fa6053
Parents: eaa439e
Author: Yongzhi Chen <yc...@apache.org>
Authored: Thu Apr 13 16:25:24 2017 -0400
Committer: Yongzhi Chen <yc...@apache.org>
Committed: Mon Apr 24 10:46:40 2017 -0400

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hive/ql/Driver.java  | 24 ++++++++++++++++++++
 .../apache/hadoop/hive/ql/exec/Utilities.java   | 11 +++++++++
 .../hive/ql/io/CombineHiveInputFormat.java      |  7 ++++++
 .../service/cli/operation/SQLOperation.java     |  4 +++-
 4 files changed, 45 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/19a1a4b4/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 6a8cc60..16b8101 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -201,6 +201,25 @@ public class Driver implements CommandProcessor {
     // resource releases
     public final ReentrantLock stateLock = new ReentrantLock();
     public DriverState driverState = DriverState.INITIALIZED;
+    private static ThreadLocal<LockedDriverState> lds = new ThreadLocal<LockedDriverState>() {
+      @Override
+      protected LockedDriverState initialValue() {
+        return new LockedDriverState();
+      }
+    };
+
+    public static void setLockedDriverState(LockedDriverState lDrv) {
+      lds.set(lDrv);
+    }
+
+    public static LockedDriverState getLockedDriverState() {
+      return lds.get();
+    }
+
+    public static void removeLockedDriverState() {
+      if (lds != null)
+        lds.remove();
+    }
   }
 
   private boolean checkConcurrency() {
@@ -429,6 +448,8 @@ public class Driver implements CommandProcessor {
       TaskFactory.resetId();
     }
 
+    LockedDriverState.setLockedDriverState(lDrvState);
+
     String queryId = conf.getVar(HiveConf.ConfVars.HIVEQUERYID);
 
     //save some info for webUI for use after plan is freed
@@ -1407,6 +1428,8 @@ public class Driver implements CommandProcessor {
     errorMessage = null;
     SQLState = null;
     downstreamError = null;
+    LockedDriverState.setLockedDriverState(lDrvState);
+
     lDrvState.stateLock.lock();
     try {
       if (alreadyCompiled) {
@@ -2337,6 +2360,7 @@ public class Driver implements CommandProcessor {
       lDrvState.driverState = DriverState.CLOSED;
     } finally {
       lDrvState.stateLock.unlock();
+      LockedDriverState.removeLockedDriverState();
     }
     if (SessionState.get() != null) {
       SessionState.get().getLineageState().clear();

http://git-wip-us.apache.org/repos/asf/hive/blob/19a1a4b4/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index b0657f0..9036d9e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -24,6 +24,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.WordUtils;
@@ -52,6 +53,8 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.Driver.DriverState;
+import org.apache.hadoop.hive.ql.Driver.LockedDriverState;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
@@ -3018,6 +3021,7 @@ public final class Utilities {
 
     Set<Path> pathsProcessed = new HashSet<Path>();
     List<Path> pathsToAdd = new LinkedList<Path>();
+    LockedDriverState lDrvStat = LockedDriverState.getLockedDriverState();
     // AliasToWork contains all the aliases
     for (String alias : work.getAliasToWork().keySet()) {
       LOG.info("Processing alias " + alias);
@@ -3027,6 +3031,9 @@ public final class Utilities {
       boolean hasLogged = false;
       // Note: this copies the list because createDummyFileForEmptyPartition may modify the map.
       for (Path file : new LinkedList<Path>(work.getPathToAliases().keySet())) {
+        if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT)
+          throw new IOException("Operation is Canceled. ");
+
         List<String> aliases = work.getPathToAliases().get(file);
         if (aliases.contains(alias)) {
           if (file != null) {
@@ -3079,6 +3086,8 @@ public final class Utilities {
     List<Path> finalPathsToAdd = new LinkedList<>();
     List<Future<Path>> futures = new LinkedList<>();
     for (final Path path : pathsToAdd) {
+      if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT)
+        throw new IOException("Operation is Canceled. ");
       if (pool == null) {
         finalPathsToAdd.add(new GetInputPathsCallable(path, job, work, hiveScratchDir, ctx, skipDummy).call());
       } else {
@@ -3088,6 +3097,8 @@ public final class Utilities {
 
     if (pool != null) {
       for (Future<Path> future : futures) {
+        if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT)
+          throw new IOException("Operation is Canceled. ");
         finalPathsToAdd.add(future.get());
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/19a1a4b4/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
index 7a113bf..9a7e9d9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
@@ -35,6 +35,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.hadoop.hive.common.StringInternUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,6 +43,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.ql.Driver.DriverState;
+import org.apache.hadoop.hive.ql.Driver.LockedDriverState;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
@@ -348,8 +351,12 @@ public class CombineHiveInputFormat<K extends WritableComparable, V extends Writ
     Map<CombinePathInputFormat, CombineFilter> poolMap =
       new HashMap<CombinePathInputFormat, CombineFilter>();
     Set<Path> poolSet = new HashSet<Path>();
+    LockedDriverState lDrvStat = LockedDriverState.getLockedDriverState();
 
     for (Path path : paths) {
+      if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT)
+        throw new IOException("Operation is Canceled. ");
+
       PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively(
           pathToPartitionInfo, path, IOPrepareCache.get().allocatePartitionDescMap());
       TableDesc tableDesc = part.getTableDesc();

http://git-wip-us.apache.org/repos/asf/hive/blob/19a1a4b4/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
index d9bfba87..d8718b3 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
@@ -394,7 +394,9 @@ public class SQLOperation extends ExecuteStatementOperation {
   private synchronized void cleanup(OperationState state) throws HiveSQLException {
     setState(state);
 
-    if (shouldRunAsync()) {
+    //Need shut down background thread gracefully, driver.close will inform background thread
+    //a cancel request is sent.
+    if (shouldRunAsync() && state != OperationState.CANCELED && state != OperationState.TIMEDOUT) {
       Future<?> backgroundHandle = getBackgroundHandle();
       if (backgroundHandle != null) {
         boolean success = backgroundHandle.cancel(true);