You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by yc...@apache.org on 2017/04/24 14:50:18 UTC
hive git commit: HIVE-16426: Query cancel: improve the way to handle
files.
Repository: hive
Updated Branches:
refs/heads/master eaa439e39 -> 19a1a4b42
HIVE-16426: Query cancel: improve the way to handle files.
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/19a1a4b4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/19a1a4b4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/19a1a4b4
Branch: refs/heads/master
Commit: 19a1a4b4280b017c20b5123232aefb1fb4fa6053
Parents: eaa439e
Author: Yongzhi Chen <yc...@apache.org>
Authored: Thu Apr 13 16:25:24 2017 -0400
Committer: Yongzhi Chen <yc...@apache.org>
Committed: Mon Apr 24 10:46:40 2017 -0400
----------------------------------------------------------------------
.../java/org/apache/hadoop/hive/ql/Driver.java | 24 ++++++++++++++++++++
.../apache/hadoop/hive/ql/exec/Utilities.java | 11 +++++++++
.../hive/ql/io/CombineHiveInputFormat.java | 7 ++++++
.../service/cli/operation/SQLOperation.java | 4 +++-
4 files changed, 45 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/19a1a4b4/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 6a8cc60..16b8101 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -201,6 +201,25 @@ public class Driver implements CommandProcessor {
// resource releases
public final ReentrantLock stateLock = new ReentrantLock();
public DriverState driverState = DriverState.INITIALIZED;
+ private static ThreadLocal<LockedDriverState> lds = new ThreadLocal<LockedDriverState>() {
+ @Override
+ protected LockedDriverState initialValue() {
+ return new LockedDriverState();
+ }
+ };
+
+ public static void setLockedDriverState(LockedDriverState lDrv) {
+ lds.set(lDrv);
+ }
+
+ public static LockedDriverState getLockedDriverState() {
+ return lds.get();
+ }
+
+ public static void removeLockedDriverState() {
+ if (lds != null)
+ lds.remove();
+ }
}
private boolean checkConcurrency() {
@@ -429,6 +448,8 @@ public class Driver implements CommandProcessor {
TaskFactory.resetId();
}
+ LockedDriverState.setLockedDriverState(lDrvState);
+
String queryId = conf.getVar(HiveConf.ConfVars.HIVEQUERYID);
//save some info for webUI for use after plan is freed
@@ -1407,6 +1428,8 @@ public class Driver implements CommandProcessor {
errorMessage = null;
SQLState = null;
downstreamError = null;
+ LockedDriverState.setLockedDriverState(lDrvState);
+
lDrvState.stateLock.lock();
try {
if (alreadyCompiled) {
@@ -2337,6 +2360,7 @@ public class Driver implements CommandProcessor {
lDrvState.driverState = DriverState.CLOSED;
} finally {
lDrvState.stateLock.unlock();
+ LockedDriverState.removeLockedDriverState();
}
if (SessionState.get() != null) {
SessionState.get().getLineageState().clear();
http://git-wip-us.apache.org/repos/asf/hive/blob/19a1a4b4/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index b0657f0..9036d9e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -24,6 +24,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.WordUtils;
@@ -52,6 +53,8 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.Driver.DriverState;
+import org.apache.hadoop.hive.ql.Driver.LockedDriverState;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
@@ -3018,6 +3021,7 @@ public final class Utilities {
Set<Path> pathsProcessed = new HashSet<Path>();
List<Path> pathsToAdd = new LinkedList<Path>();
+ LockedDriverState lDrvStat = LockedDriverState.getLockedDriverState();
// AliasToWork contains all the aliases
for (String alias : work.getAliasToWork().keySet()) {
LOG.info("Processing alias " + alias);
@@ -3027,6 +3031,9 @@ public final class Utilities {
boolean hasLogged = false;
// Note: this copies the list because createDummyFileForEmptyPartition may modify the map.
for (Path file : new LinkedList<Path>(work.getPathToAliases().keySet())) {
+ if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT)
+ throw new IOException("Operation is Canceled. ");
+
List<String> aliases = work.getPathToAliases().get(file);
if (aliases.contains(alias)) {
if (file != null) {
@@ -3079,6 +3086,8 @@ public final class Utilities {
List<Path> finalPathsToAdd = new LinkedList<>();
List<Future<Path>> futures = new LinkedList<>();
for (final Path path : pathsToAdd) {
+ if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT)
+ throw new IOException("Operation is Canceled. ");
if (pool == null) {
finalPathsToAdd.add(new GetInputPathsCallable(path, job, work, hiveScratchDir, ctx, skipDummy).call());
} else {
@@ -3088,6 +3097,8 @@ public final class Utilities {
if (pool != null) {
for (Future<Path> future : futures) {
+ if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT)
+ throw new IOException("Operation is Canceled. ");
finalPathsToAdd.add(future.get());
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/19a1a4b4/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
index 7a113bf..9a7e9d9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
@@ -35,6 +35,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import com.google.common.annotations.VisibleForTesting;
+
import org.apache.hadoop.hive.common.StringInternUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,6 +43,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.ql.Driver.DriverState;
+import org.apache.hadoop.hive.ql.Driver.LockedDriverState;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.log.PerfLogger;
@@ -348,8 +351,12 @@ public class CombineHiveInputFormat<K extends WritableComparable, V extends Writ
Map<CombinePathInputFormat, CombineFilter> poolMap =
new HashMap<CombinePathInputFormat, CombineFilter>();
Set<Path> poolSet = new HashSet<Path>();
+ LockedDriverState lDrvStat = LockedDriverState.getLockedDriverState();
for (Path path : paths) {
+ if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT)
+ throw new IOException("Operation is Canceled. ");
+
PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively(
pathToPartitionInfo, path, IOPrepareCache.get().allocatePartitionDescMap());
TableDesc tableDesc = part.getTableDesc();
http://git-wip-us.apache.org/repos/asf/hive/blob/19a1a4b4/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
index d9bfba87..d8718b3 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
@@ -394,7 +394,9 @@ public class SQLOperation extends ExecuteStatementOperation {
private synchronized void cleanup(OperationState state) throws HiveSQLException {
setState(state);
- if (shouldRunAsync()) {
+ //Need shut down background thread gracefully, driver.close will inform background thread
+ //a cancel request is sent.
+ if (shouldRunAsync() && state != OperationState.CANCELED && state != OperationState.TIMEDOUT) {
Future<?> backgroundHandle = getBackgroundHandle();
if (backgroundHandle != null) {
boolean success = backgroundHandle.cancel(true);