You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kg...@apache.org on 2017/11/21 16:11:07 UTC
hive git commit: HIVE-18063: Make CommandProcessorResponse an
exception instead of a return class (Zoltan Haindrich,
reviewed by Ashutosh Chauhan)
Repository: hive
Updated Branches:
refs/heads/master a5c2e15c7 -> ed4e75763
HIVE-18063: Make CommandProcessorResponse an exception instead of a return class (Zoltan Haindrich, reviewed by Ashutosh Chauhan)
Signed-off-by: Zoltan Haindrich <ki...@rxd.hu>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ed4e7576
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ed4e7576
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ed4e7576
Branch: refs/heads/master
Commit: ed4e757638981b03d0240edbf84d9e048cae9528
Parents: a5c2e15
Author: Zoltan Haindrich <ki...@rxd.hu>
Authored: Tue Nov 21 16:34:18 2017 +0100
Committer: Zoltan Haindrich <ki...@rxd.hu>
Committed: Tue Nov 21 16:34:18 2017 +0100
----------------------------------------------------------------------
.../apache/hadoop/hive/ql/TestAcidOnTez.java | 4 +-
.../java/org/apache/hadoop/hive/ql/Driver.java | 240 +++++++++----------
.../apache/hadoop/hive/ql/exec/Utilities.java | 11 +-
.../hive/ql/io/CombineHiveInputFormat.java | 7 +-
.../zookeeper/ZooKeeperHiveLockManager.java | 3 +-
.../ql/processors/CommandProcessorResponse.java | 2 +-
.../ql/udf/generic/GenericUDTFGetSplits.java | 13 +-
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 14 +-
.../hadoop/hive/ql/TxnCommandsBaseForTests.java | 2 +-
.../hive/ql/lockmgr/TestDummyTxnManager.java | 7 +-
10 files changed, 143 insertions(+), 160 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/ed4e7576/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
index 65a1ed1..26377e0 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
@@ -150,8 +150,8 @@ public class TestAcidOnTez {
try {
if (d != null) {
dropTables();
- d.destroy();
d.close();
+ d.destroy();
d = null;
}
TxnDbUtil.cleanDb(hiveConf);
@@ -785,8 +785,8 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree ~/dev/hiverwgit/itests/h
ss.close();
}
if (d != null) {
- d.destroy();
d.close();
+ d.destroy();
}
SessionState.start(conf);
http://git-wip-us.apache.org/repos/asf/hive/blob/ed4e7576/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index af9f193..450e3cd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -35,6 +35,7 @@ import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import com.google.common.annotations.VisibleForTesting;
@@ -188,15 +189,12 @@ public class Driver implements CommandProcessor {
// either initTxnMgr or from the SessionState, in that order.
private HiveTxnManager queryTxnMgr;
- public enum DriverState {
+ private enum DriverState {
INITIALIZED,
COMPILING,
COMPILED,
EXECUTING,
EXECUTED,
- // a state that the driver enters after close() has been called to interrupt its running
- // query in the query cancellation
- INTERRUPT,
// a state that the driver enters after close() has been called to clean the query results
// and release the resources after the query has been executed
CLOSED,
@@ -210,6 +208,7 @@ public class Driver implements CommandProcessor {
// resource releases
public final ReentrantLock stateLock = new ReentrantLock();
public DriverState driverState = DriverState.INITIALIZED;
+ public AtomicBoolean aborted = new AtomicBoolean();
private static ThreadLocal<LockedDriverState> lds = new ThreadLocal<LockedDriverState>() {
@Override
protected LockedDriverState initialValue() {
@@ -230,6 +229,19 @@ public class Driver implements CommandProcessor {
lds.remove();
}
}
+
+ public boolean isAborted() {
+ return aborted.get();
+ }
+
+ public void abort() {
+ aborted.set(true);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s(aborted:%s)", driverState, aborted.get());
+ }
}
private boolean checkConcurrency() {
@@ -430,13 +442,18 @@ public class Driver implements CommandProcessor {
* @return 0 for ok
*/
public int compile(String command, boolean resetTaskIds) {
- return compile(command, resetTaskIds, false);
+ try {
+ compile(command, resetTaskIds, false);
+ return 0;
+ } catch (CommandProcessorResponse cpr) {
+ return cpr.getErrorCode();
+ }
}
// deferClose indicates if the close/destroy should be deferred when the process has been
// interrupted, it should be set to true if the compile is called within another method like
// runInternal, which defers the close to the called in that method.
- private int compile(String command, boolean resetTaskIds, boolean deferClose) {
+ private void compile(String command, boolean resetTaskIds, boolean deferClose) throws CommandProcessorResponse {
PerfLogger perfLogger = SessionState.getPerfLogger(true);
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_RUN);
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.COMPILE);
@@ -463,9 +480,7 @@ public class Driver implements CommandProcessor {
LOG.warn("WARNING! Query command could not be redacted." + e);
}
- if (isInterrupted()) {
- return handleInterruption("at beginning of compilation."); //indicate if need clean resource
- }
+ checkInterrupted("at beginning of compilation.", null, null);
if (ctx != null && ctx.getExplainAnalyze() != AnalyzeState.RUNNING) {
// close the existing ctx etc before compiling a new query, but does not destroy driver
@@ -522,9 +537,8 @@ public class Driver implements CommandProcessor {
};
ShutdownHookManager.addShutdownHook(shutdownRunner, SHUTDOWN_HOOK_PRIORITY);
- if (isInterrupted()) {
- return handleInterruption("before parsing and analysing the query");
- }
+ checkInterrupted("before parsing and analysing the query", null, null);
+
if (ctx == null) {
ctx = new Context(conf);
setTriggerContext(queryId);
@@ -567,7 +581,7 @@ public class Driver implements CommandProcessor {
String userFromUGI = getUserFromUGI();
if (!queryTxnMgr.isTxnOpen()) {
if(userFromUGI == null) {
- return 10;
+ throw createProcessorResponse(10);
}
long txnid = queryTxnMgr.openTxn(ctx, userFromUGI);
}
@@ -597,9 +611,7 @@ public class Driver implements CommandProcessor {
sem.validate();
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE);
- if (isInterrupted()) {
- return handleInterruption("after analyzing query.");
- }
+ checkInterrupted("after analyzing query.", null, null);
// get the output schema
schema = getSchema(sem, conf);
@@ -628,7 +640,7 @@ public class Driver implements CommandProcessor {
+ ". Use SHOW GRANT to get more details.");
errorMessage = authExp.getMessage();
SQLState = "42000";
- return 403;
+ throw createProcessorResponse(403);
} finally {
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DO_AUTHORIZATION);
}
@@ -644,11 +656,8 @@ public class Driver implements CommandProcessor {
}
}
}
- return 0;
} catch (Exception e) {
- if (isInterrupted()) {
- return handleInterruption("during query compilation: " + e.getMessage());
- }
+ checkInterrupted("during query compilation: " + e.getMessage(), null, null);
compileError = true;
ErrorMsg error = ErrorMsg.getErrorMsg(e.getMessage());
@@ -672,8 +681,7 @@ public class Driver implements CommandProcessor {
downstreamError = e;
console.printError(errorMessage, "\n"
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
- return error.getErrorCode();//todo: this is bad if returned as cmd shell exit
- // since it exceeds valid range of shell return values
+ throw createProcessorResponse(error.getErrorCode());
} finally {
// Trigger post compilation hook. Note that if the compilation fails here then
// before/after execution hook will never be executed.
@@ -689,7 +697,7 @@ public class Driver implements CommandProcessor {
ImmutableMap<String, Long> compileHMSTimings = dumpMetaCallTimingWithoutEx("compilation");
queryDisplay.setHmsTimings(QueryDisplay.Phase.COMPILATION, compileHMSTimings);
- boolean isInterrupted = isInterrupted();
+ boolean isInterrupted = lDrvState.isAborted();
if (isInterrupted && !deferClose) {
closeInProcess(true);
}
@@ -760,9 +768,6 @@ public class Driver implements CommandProcessor {
}
return shouldOpenImplicitTxn;
}
- private int handleInterruption(String msg) {
- return handleInterruptionWithHook(msg, null, null);
- }
private int handleInterruptionWithHook(String msg, HookContext hookContext,
PerfLogger perfLogger) {
@@ -779,16 +784,9 @@ public class Driver implements CommandProcessor {
return 1000;
}
- private boolean isInterrupted() {
- lDrvState.stateLock.lock();
- try {
- if (lDrvState.driverState == DriverState.INTERRUPT) {
- return true;
- } else {
- return false;
- }
- } finally {
- lDrvState.stateLock.unlock();
+ private void checkInterrupted(String msg, HookContext hookContext, PerfLogger perfLogger) throws CommandProcessorResponse {
+ if (lDrvState.isAborted()) {
+ throw createProcessorResponse(handleInterruptionWithHook(msg, hookContext, perfLogger));
}
}
@@ -1223,8 +1221,9 @@ public class Driver implements CommandProcessor {
*
* This method also records the list of valid transactions. This must be done after any
* transactions have been opened.
+ * @throws CommandProcessorResponse
**/
- private int acquireLocks() {
+ private void acquireLocks() throws CommandProcessorResponse {
PerfLogger perfLogger = SessionState.getPerfLogger();
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ACQUIRE_READ_WRITE_LOCKS);
@@ -1233,12 +1232,12 @@ public class Driver implements CommandProcessor {
acid txn manager requires all locks to be associated with a txn so if we
end up here w/o an open txn it's because we are processing something like "use <database>
which by definition needs no locks*/
- return 0;
+ return;
}
try {
String userFromUGI = getUserFromUGI();
if(userFromUGI == null) {
- return 10;
+ throw createProcessorResponse(10);
}
// Set the transaction id in all of the acid file sinks
if (haveAcidWrite()) {
@@ -1260,14 +1259,13 @@ public class Driver implements CommandProcessor {
if(queryTxnMgr.recordSnapshot(plan)) {
recordValidTxns(queryTxnMgr);
}
- return 0;
} catch (Exception e) {
errorMessage = "FAILED: Error in acquiring locks: " + e.getMessage();
SQLState = ErrorMsg.findSQLState(e.getMessage());
downstreamError = e;
console.printError(errorMessage, "\n"
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
- return 10;
+ throw createProcessorResponse(10);
} finally {
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ACQUIRE_READ_WRITE_LOCKS);
}
@@ -1353,11 +1351,12 @@ public class Driver implements CommandProcessor {
public CommandProcessorResponse run(String command, boolean alreadyCompiled)
throws CommandNeedRetryException {
- CommandProcessorResponse cpr = runInternal(command, alreadyCompiled);
- if(cpr.getResponseCode() == 0) {
- return cpr;
- }
+ try {
+ runInternal(command, alreadyCompiled);
+ return createProcessorResponse(0);
+ } catch (CommandProcessorResponse cpr) {
+
SessionState ss = SessionState.get();
if(ss == null) {
return cpr;
@@ -1408,31 +1407,38 @@ public class Driver implements CommandProcessor {
org.apache.hadoop.util.StringUtils.stringifyException(ex));
}
return cpr;
+ }
}
public CommandProcessorResponse compileAndRespond(String command) {
- return createProcessorResponse(compileInternal(command, false));
+ try {
+ compileInternal(command, false);
+ return createProcessorResponse(0);
+ } catch (CommandProcessorResponse e) {
+ return e;
+ }
}
- public CommandProcessorResponse lockAndRespond() {
+ public void lockAndRespond() throws CommandProcessorResponse {
// Assumes the query has already been compiled
if (plan == null) {
throw new IllegalStateException(
"No previously compiled query for driver - queryId=" + queryState.getQueryId());
}
- int ret = 0;
if (requiresLock()) {
- ret = acquireLocks();
- }
- if (ret != 0) {
- return rollback(createProcessorResponse(ret));
+ try {
+ acquireLocks();
+ } catch (CommandProcessorResponse cpr) {
+ rollback(cpr);
+ throw cpr;
+ }
}
- return createProcessorResponse(ret);
}
private static final ReentrantLock globalCompileLock = new ReentrantLock();
- private int compileInternal(String command, boolean deferClose) {
+
+ private void compileInternal(String command, boolean deferClose) throws CommandProcessorResponse {
int ret;
Metrics metrics = MetricsFactory.getInstance();
@@ -1450,22 +1456,21 @@ public class Driver implements CommandProcessor {
}
if (compileLock == null) {
- return ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCode();
+ throw createProcessorResponse(ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCode());
}
- try {
- ret = compile(command, true, deferClose);
- } finally {
- compileLock.unlock();
- }
- if (ret != 0) {
+ try {
+ compile(command, true, deferClose);
+ } catch (CommandProcessorResponse cpr) {
try {
releaseLocksAndCommitOrRollback(false);
} catch (LockException e) {
- LOG.warn("Exception in releasing locks. "
- + org.apache.hadoop.util.StringUtils.stringifyException(e));
+ LOG.warn("Exception in releasing locks. " + org.apache.hadoop.util.StringUtils.stringifyException(e));
}
+ throw cpr;
+ } finally {
+ compileLock.unlock();
}
//Save compile-time PerfLogging for WebUI.
@@ -1473,7 +1478,6 @@ public class Driver implements CommandProcessor {
//or a reset PerfLogger.
queryDisplay.setPerfLogStarts(QueryDisplay.Phase.COMPILATION, perfLogger.getStartTimes());
queryDisplay.setPerfLogEnds(QueryDisplay.Phase.COMPILATION, perfLogger.getEndTimes());
- return ret;
}
/**
@@ -1535,8 +1539,8 @@ public class Driver implements CommandProcessor {
return compileLock;
}
- private CommandProcessorResponse runInternal(String command, boolean alreadyCompiled)
- throws CommandNeedRetryException {
+ private void runInternal(String command, boolean alreadyCompiled)
+ throws CommandNeedRetryException, CommandProcessorResponse {
errorMessage = null;
SQLState = null;
downstreamError = null;
@@ -1550,7 +1554,7 @@ public class Driver implements CommandProcessor {
} else {
errorMessage = "FAILED: Precompiled query has been cancelled or closed.";
console.printError(errorMessage);
- return createProcessorResponse(12);
+ throw createProcessorResponse(12);
}
} else {
lDrvState.driverState = DriverState.COMPILING;
@@ -1578,20 +1582,16 @@ public class Driver implements CommandProcessor {
downstreamError = e;
console.printError(errorMessage + "\n"
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
- return createProcessorResponse(12);
+ throw createProcessorResponse(12);
}
PerfLogger perfLogger = null;
- int ret;
if (!alreadyCompiled) {
// compile internal will automatically reset the perf logger
- ret = compileInternal(command, true);
+ compileInternal(command, true);
// then we continue to use this perf logger
perfLogger = SessionState.getPerfLogger();
- if (ret != 0) {
- return createProcessorResponse(ret);
- }
} else {
// reuse existing perf logger.
perfLogger = SessionState.getPerfLogger();
@@ -1603,21 +1603,18 @@ public class Driver implements CommandProcessor {
// same instance of Driver, which can run multiple queries.
ctx.setHiveTxnManager(queryTxnMgr);
- if (isInterrupted()) {
- return createProcessorResponse(handleInterruption("at acquiring the lock."));
- }
+ checkInterrupted("at acquiring the lock.", null, null);
- CommandProcessorResponse resp = lockAndRespond();
- if (resp.failed()) {
- return resp;
- }
+ lockAndRespond();
- ret = execute();
- if (ret != 0) {
- //if needRequireLock is false, the release here will do nothing because there is no lock
- return rollback(createProcessorResponse(ret));
+ try {
+ execute();
+ } catch (CommandProcessorResponse cpr) {
+ rollback(cpr);
+ throw cpr;
}
+
//if needRequireLock is false, the release here will do nothing because there is no lock
try {
//since set autocommit starts an implicit txn, close it
@@ -1631,7 +1628,7 @@ public class Driver implements CommandProcessor {
//txn (if there is one started) is not finished
}
} catch (LockException e) {
- return handleHiveException(e, 12);
+ throw handleHiveException(e, 12);
}
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DRIVER_RUN);
@@ -1649,31 +1646,27 @@ public class Driver implements CommandProcessor {
downstreamError = e;
console.printError(errorMessage + "\n"
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
- return createProcessorResponse(12);
+ throw createProcessorResponse(12);
}
isFinishedWithError = false;
- return createProcessorResponse(ret);
} finally {
- if (isInterrupted()) {
+ if (lDrvState.isAborted()) {
closeInProcess(true);
} else {
// only release the related resources ctx, driverContext as normal
releaseResources();
}
+
lDrvState.stateLock.lock();
try {
- if (lDrvState.driverState == DriverState.INTERRUPT) {
- lDrvState.driverState = DriverState.ERROR;
- } else {
- lDrvState.driverState = isFinishedWithError ? DriverState.ERROR : DriverState.EXECUTED;
- }
+ lDrvState.driverState = isFinishedWithError ? DriverState.ERROR : DriverState.EXECUTED;
} finally {
lDrvState.stateLock.unlock();
}
}
}
- private CommandProcessorResponse rollback(CommandProcessorResponse cpr) {
+ private CommandProcessorResponse rollback(CommandProcessorResponse cpr) throws CommandProcessorResponse {
//console.printError(cpr.toString());
try {
@@ -1685,10 +1678,12 @@ public class Driver implements CommandProcessor {
}
return cpr;
}
- private CommandProcessorResponse handleHiveException(HiveException e, int ret) {
+
+ private CommandProcessorResponse handleHiveException(HiveException e, int ret) throws CommandProcessorResponse {
return handleHiveException(e, ret, null);
}
- private CommandProcessorResponse handleHiveException(HiveException e, int ret, String rootMsg) {
+
+ private CommandProcessorResponse handleHiveException(HiveException e, int ret, String rootMsg) throws CommandProcessorResponse {
errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);
if(rootMsg != null) {
errorMessage += "\n" + rootMsg;
@@ -1698,7 +1693,7 @@ public class Driver implements CommandProcessor {
downstreamError = e;
console.printError(errorMessage + "\n"
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
- return createProcessorResponse(ret);
+ throw createProcessorResponse(ret);
}
private boolean requiresLock() {
if (!checkConcurrency()) {
@@ -1759,7 +1754,7 @@ public class Driver implements CommandProcessor {
return new CommandProcessorResponse(ret, errorMessage, SQLState, downstreamError);
}
- private int execute() throws CommandNeedRetryException {
+ private void execute() throws CommandNeedRetryException, CommandProcessorResponse {
PerfLogger perfLogger = SessionState.getPerfLogger();
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_EXECUTE);
@@ -1779,10 +1774,9 @@ public class Driver implements CommandProcessor {
if (lDrvState.driverState != DriverState.COMPILED &&
lDrvState.driverState != DriverState.EXECUTING) {
SQLState = "HY008";
- errorMessage = "FAILED: query " + queryStr + " has " +
- (lDrvState.driverState == DriverState.INTERRUPT ? "been cancelled" : "not been compiled.");
+ errorMessage = "FAILED: unexpected driverstate: " + lDrvState + ", for query " + queryStr;
console.printError(errorMessage);
- return 1000;
+ throw createProcessorResponse(1000);
} else {
lDrvState.driverState = DriverState.EXECUTING;
}
@@ -1852,9 +1846,8 @@ public class Driver implements CommandProcessor {
// At any time, at most maxthreads tasks can be running
// The main thread polls the TaskRunners to check if they have finished.
- if (isInterrupted()) {
- return handleInterruptionWithHook("before running tasks.", hookContext, perfLogger);
- }
+ checkInterrupted("before running tasks.", hookContext, perfLogger);
+
DriverContext driverCxt = new DriverContext(ctx);
driverCxt.prepare(plan);
@@ -1914,9 +1907,8 @@ public class Driver implements CommandProcessor {
TaskResult result = tskRun.getTaskResult();
int exitVal = result.getExitVal();
- if (isInterrupted()) {
- return handleInterruptionWithHook("when checking the execution result.", hookContext, perfLogger);
- }
+ checkInterrupted("when checking the execution result.", hookContext, perfLogger);
+
if (exitVal != 0) {
if (tsk.ifRetryCmdWhenFail()) {
driverCxt.shutdown();
@@ -1962,7 +1954,7 @@ public class Driver implements CommandProcessor {
// in case we decided to run everything in local mode, restore the
// the jobtracker setting to its initial value
ctx.restoreOriginalTracker();
- return exitVal;
+ throw createProcessorResponse(exitVal);
}
}
@@ -1993,7 +1985,7 @@ public class Driver implements CommandProcessor {
errorMessage = "FAILED: Operation cancelled";
invokeFailureHooks(perfLogger, hookContext, errorMessage, null);
console.printError(errorMessage);
- return 1000;
+ throw createProcessorResponse(1000);
}
// remove incomplete outputs.
@@ -2029,11 +2021,13 @@ public class Driver implements CommandProcessor {
} catch (CommandNeedRetryException e) {
executionError = true;
throw e;
+ } catch (CommandProcessorResponse cpr) {
+ executionError = true;
+ throw cpr;
} catch (Throwable e) {
executionError = true;
- if (isInterrupted()) {
- return handleInterruptionWithHook("during query execution: \n" + e.getMessage(), hookContext, perfLogger);
- }
+
+ checkInterrupted("during query execution: \n" + e.getMessage(), hookContext, perfLogger);
ctx.restoreOriginalTracker();
if (SessionState.get() != null) {
@@ -2053,7 +2047,7 @@ public class Driver implements CommandProcessor {
downstreamError = e;
console.printError(errorMessage + "\n"
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
- return (12);
+ throw createProcessorResponse(12);
} finally {
// Trigger query hooks after query completes its execution.
try {
@@ -2083,17 +2077,13 @@ public class Driver implements CommandProcessor {
}
console.printInfo("Total MapReduce CPU Time Spent: " + Utilities.formatMsecToStr(totalCpu));
}
- boolean isInterrupted = isInterrupted();
lDrvState.stateLock.lock();
try {
- if (isInterrupted) {
- } else {
- lDrvState.driverState = executionError ? DriverState.ERROR : DriverState.EXECUTED;
- }
+ lDrvState.driverState = executionError ? DriverState.ERROR : DriverState.EXECUTED;
} finally {
lDrvState.stateLock.unlock();
}
- if (isInterrupted) {
+ if (lDrvState.isAborted()) {
LOG.info("Executing command(queryId=" + queryId + ") has been interrupted after " + duration + " seconds");
} else {
LOG.info("Completed executing command(queryId=" + queryId + "); Time taken: " + duration + " seconds");
@@ -2103,8 +2093,6 @@ public class Driver implements CommandProcessor {
if (console != null) {
console.printInfo("OK");
}
-
- return (0);
}
private void releasePlan(QueryPlan plan) {
@@ -2436,9 +2424,8 @@ public class Driver implements CommandProcessor {
try {
releaseDriverContext();
if (lDrvState.driverState == DriverState.COMPILING ||
- lDrvState.driverState == DriverState.EXECUTING ||
- lDrvState.driverState == DriverState.INTERRUPT) {
- lDrvState.driverState = DriverState.INTERRUPT;
+ lDrvState.driverState == DriverState.EXECUTING) {
+ lDrvState.abort();
return 0;
}
releasePlan();
@@ -2463,8 +2450,7 @@ public class Driver implements CommandProcessor {
try {
// in the cancel case where the driver state is INTERRUPTED, destroy will be deferred to
// the query process
- if (lDrvState.driverState == DriverState.DESTROYED ||
- lDrvState.driverState == DriverState.INTERRUPT) {
+ if (lDrvState.driverState == DriverState.DESTROYED) {
return;
} else {
lDrvState.driverState = DriverState.DESTROYED;
http://git-wip-us.apache.org/repos/asf/hive/blob/ed4e7576/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 29c33f9..d7397e0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -110,7 +110,6 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.ql.Driver.DriverState;
import org.apache.hadoop.hive.ql.Driver.LockedDriverState;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryPlan;
@@ -3283,7 +3282,7 @@ public final class Utilities {
boolean hasLogged = false;
for (Map.Entry<Path, ArrayList<String>> e : pathToAliases) {
- if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT) {
+ if (lDrvStat != null && lDrvStat.isAborted()) {
throw new IOException("Operation is Canceled.");
}
@@ -3339,7 +3338,7 @@ public final class Utilities {
finalPathsToAdd.addAll(getInputPathsWithPool(job, work, hiveScratchDir, ctx, skipDummy, pathsToAdd, pool));
} else {
for (final Path path : pathsToAdd) {
- if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT) {
+ if (lDrvStat != null && lDrvStat.isAborted()) {
throw new IOException("Operation is Canceled.");
}
Path newPath = new GetInputPathsCallable(path, job, work, hiveScratchDir, ctx, skipDummy).call();
@@ -3360,7 +3359,7 @@ public final class Utilities {
try {
Map<GetInputPathsCallable, Future<Path>> getPathsCallableToFuture = new LinkedHashMap<>();
for (final Path path : pathsToAdd) {
- if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT) {
+ if (lDrvStat != null && lDrvStat.isAborted()) {
throw new IOException("Operation is Canceled.");
}
GetInputPathsCallable callable = new GetInputPathsCallable(path, job, work, hiveScratchDir, ctx, skipDummy);
@@ -3369,7 +3368,7 @@ public final class Utilities {
pool.shutdown();
for (Map.Entry<GetInputPathsCallable, Future<Path>> future : getPathsCallableToFuture.entrySet()) {
- if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT) {
+ if (lDrvStat != null && lDrvStat.isAborted()) {
throw new IOException("Operation is Canceled.");
}
@@ -3960,7 +3959,7 @@ public final class Utilities {
String[] classNames = org.apache.hadoop.util.StringUtils.getStrings(HiveConf.getVar(hiveConf,
confVar));
if (classNames == null) {
- return Collections.emptyList();
+ return new ArrayList<>(0);
}
Collection<Class<?>> classList = new ArrayList<Class<?>>(classNames.length);
for (String className : classNames) {
http://git-wip-us.apache.org/repos/asf/hive/blob/ed4e7576/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
index 6a188ac..21238db 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
@@ -18,8 +18,6 @@
package org.apache.hadoop.hive.ql.io;
-import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
-
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -45,8 +43,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
-import org.apache.hadoop.hive.ql.Driver.DriverState;
import org.apache.hadoop.hive.ql.Driver.LockedDriverState;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -367,8 +363,9 @@ public class CombineHiveInputFormat<K extends WritableComparable, V extends Writ
LockedDriverState lDrvStat = LockedDriverState.getLockedDriverState();
for (Path path : paths) {
- if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT)
+ if (lDrvStat != null && lDrvStat.isAborted()) {
throw new IOException("Operation is Canceled. ");
+ }
PartitionDesc part = HiveFileFormatUtils.getFromPathRecursively(
pathToPartitionInfo, path, IOPrepareCache.get().allocatePartitionDescMap());
http://git-wip-us.apache.org/repos/asf/hive/blob/ed4e7576/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
index 42c0042..cbb46df 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.hive.common.metrics.common.Metrics;
import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.Driver.DriverState;
import org.apache.hadoop.hive.ql.Driver.LockedDriverState;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.lockmgr.*;
@@ -198,7 +197,7 @@ public class ZooKeeperHiveLockManager implements HiveLockManager {
boolean isInterrupted = false;
if (lDrvState != null) {
lDrvState.stateLock.lock();
- if (lDrvState.driverState == DriverState.INTERRUPT) {
+ if (lDrvState.isAborted()) {
isInterrupted = true;
}
lDrvState.stateLock.unlock();
http://git-wip-us.apache.org/repos/asf/hive/blob/ed4e7576/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java b/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java
index a4687f2..63b563e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.hive.ql.ErrorMsg;
* is not 0. Note that often {@code responseCode} ends up the exit value of
* command shell process so should keep it to < 127.
*/
-public class CommandProcessorResponse {
+public class CommandProcessorResponse extends Exception {
private final int responseCode;
private final String errorMessage;
private final int hiveErrorCode;
http://git-wip-us.apache.org/repos/asf/hive/blob/ed4e7576/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
index dc79283..4148a8a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
@@ -115,7 +115,7 @@ import com.google.common.base.Preconditions;
/**
* GenericUDTFGetSplits.
- *
+ *
*/
@Description(name = "get_splits", value = "_FUNC_(string,int) - "
+ "Returns an array of length int serialized splits for the referenced tables string.")
@@ -301,9 +301,10 @@ public class GenericUDTFGetSplits extends GenericUDTF {
// Table will be queried directly by LLAP
// Acquire locks if necessary - they will be released during session cleanup.
// The read will have READ_COMMITTED level semantics.
- cpr = driver.lockAndRespond();
- if (cpr.getResponseCode() != 0) {
- throw new HiveException("Failed to acquire locks: " + cpr.getException());
+ try {
+ driver.lockAndRespond();
+ } catch (CommandProcessorResponse cpr1) {
+ throw new HiveException("Failed to acquire locks", cpr1);
}
// Attach the resources to the session cleanup.
@@ -394,7 +395,7 @@ public class GenericUDTFGetSplits extends GenericUDTF {
LlapSigner signer = null;
if (UserGroupInformation.isSecurityEnabled()) {
signer = coordinator.getLlapSigner(job);
-
+
// 1. Generate the token for query user (applies to all splits).
queryUser = SessionState.getUserFromAuthenticator();
if (queryUser == null) {
@@ -563,7 +564,7 @@ public class GenericUDTFGetSplits extends GenericUDTF {
/**
* Returns a local resource representing a jar. This resource will be used to
* execute the plan on the cluster.
- *
+ *
* @param localJarPath
* Local path to the jar to be localized.
* @return LocalResource corresponding to the localized hive exec resource.
http://git-wip-us.apache.org/repos/asf/hive/blob/ed4e7576/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index b877253..2e73e48 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -169,8 +169,8 @@ public class TestTxnCommands2 {
try {
if (d != null) {
dropTables();
- d.destroy();
d.close();
+ d.destroy();
d = null;
}
TxnDbUtil.cleanDb(hiveConf);
@@ -364,7 +364,7 @@ public class TestTxnCommands2 {
Assert.assertEquals(536870912, BucketCodec.V1.encode(new AcidOutputFormat.Options(hiveConf).bucket(0)));
Assert.assertEquals(536936448, BucketCodec.V1.encode(new AcidOutputFormat.Options(hiveConf).bucket(1)));
/*
- * All ROW__IDs are unique on read after conversion to acid
+ * All ROW__IDs are unique on read after conversion to acid
* ROW__IDs are exactly the same before and after compaction
* Also check the file name (only) after compaction for completeness
* Note: order of rows in a file ends up being the reverse of order in values clause (why?!)
@@ -1523,7 +1523,7 @@ public class TestTxnCommands2 {
runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals));
List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals(stringifyValues(vals), r);
- String query = "merge into " + Table.ACIDTBL +
+ String query = "merge into " + Table.ACIDTBL +
" using " + Table.NONACIDPART2 + " source ON " + Table.ACIDTBL + ".a = a2 and b + 1 = source.b2 + 1 " +
"WHEN MATCHED THEN UPDATE set b = source.b2 " +
"WHEN NOT MATCHED THEN INSERT VALUES(source.a2, source.b2)";
@@ -1647,7 +1647,7 @@ public class TestTxnCommands2 {
// 1 ReadEntity: default@values__tmp__table__1
// 1 WriteEntity: default@acidtblpart Type=TABLE WriteType=INSERT isDP=false
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,1,'p1'),(2,2,'p1'),(3,3,'p1'),(4,4,'p2')");
-
+
List<String> r1 = runStatementOnDriver("select count(*) from " + Table.ACIDTBLPART);
Assert.assertEquals("4", r1.get(0));
//In DbTxnManager.acquireLocks() we have
@@ -1655,12 +1655,12 @@ public class TestTxnCommands2 {
// 1 WriteEntity: default@acidtblpart Type=TABLE WriteType=INSERT isDP=false
//todo: side note on the above: LockRequestBuilder combines the both default@acidtblpart entries to 1
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) select * from " + Table.ACIDTBLPART + " where p='p1'");
-
+
//In DbTxnManager.acquireLocks() we have
// 2 ReadEntity: [default@acidtblpart@p=p1, default@acidtblpart]
// 1 WriteEntity: default@acidtblpart@p=p2 Type=PARTITION WriteType=INSERT isDP=false
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p='p2') select a,b from " + Table.ACIDTBLPART + " where p='p1'");
-
+
//In UpdateDeleteSemanticAnalyzer, after super analyze
// 3 ReadEntity: [default@acidtblpart, default@acidtblpart@p=p1, default@acidtblpart@p=p2]
// 1 WriteEntity: [default@acidtblpart TABLE/INSERT]
@@ -1670,7 +1670,7 @@ public class TestTxnCommands2 {
//todo: Why acquire per partition locks - if you have many partitions that's hugely inefficient.
//could acquire 1 table level Shared_write intead
runStatementOnDriver("update " + Table.ACIDTBLPART + " set b = 1");
-
+
//In UpdateDeleteSemanticAnalyzer, after super analyze
// Read [default@acidtblpart, default@acidtblpart@p=p1]
// Write default@acidtblpart TABLE/INSERT
http://git-wip-us.apache.org/repos/asf/hive/blob/ed4e7576/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
index 8737369..9f31eb1 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
@@ -111,8 +111,8 @@ public abstract class TxnCommandsBaseForTests {
try {
if (d != null) {
dropTables();
- d.destroy();
d.close();
+ d.destroy();
d = null;
}
} finally {
http://git-wip-us.apache.org/repos/asf/hive/blob/ed4e7576/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDummyTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDummyTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDummyTxnManager.java
index 9d27d21..913b60c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDummyTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDummyTxnManager.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.Driver.DriverState;
import org.apache.hadoop.hive.ql.Driver.LockedDriverState;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
@@ -83,7 +82,9 @@ public class TestDummyTxnManager {
@After
public void tearDown() throws Exception {
- if (txnMgr != null) txnMgr.closeTxnManager();
+ if (txnMgr != null) {
+ txnMgr.closeTxnManager();
+ }
}
/**
@@ -100,7 +101,7 @@ public class TestDummyTxnManager {
expectedLocks.add(new ZooKeeperHiveLock("default.table1", new HiveLockObject(), HiveLockMode.SHARED));
LockedDriverState lDrvState = new LockedDriverState();
LockedDriverState lDrvInp = new LockedDriverState();
- lDrvInp.driverState = DriverState.INTERRUPT;
+ lDrvInp.abort();
LockException lEx = new LockException(ErrorMsg.LOCK_ACQUIRE_CANCELLED.getMsg());
when(mockLockManager.lock(anyListOf(HiveLockObj.class), eq(false), eq(lDrvState))).thenReturn(expectedLocks);
when(mockLockManager.lock(anyListOf(HiveLockObj.class), eq(false), eq(lDrvInp))).thenThrow(lEx);