You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/05 17:31:56 UTC
[09/51] [partial] hive git commit: HIVE-14671 : merge master into
hive-14535 (Wei Zheng)
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 0b615cd..3230c61 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -37,10 +37,13 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
-import com.google.common.collect.Iterables;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
+
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIds;
import org.apache.hadoop.hive.common.metrics.common.Metrics;
@@ -71,12 +74,9 @@ import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
import org.apache.hadoop.hive.ql.hooks.Hook;
import org.apache.hadoop.hive.ql.hooks.HookContext;
import org.apache.hadoop.hive.ql.hooks.HookUtils;
-import org.apache.hadoop.hive.ql.hooks.MetricsQueryLifeTimeHook;
+import org.apache.hadoop.hive.ql.hooks.HooksLoader;
import org.apache.hadoop.hive.ql.hooks.PostExecute;
import org.apache.hadoop.hive.ql.hooks.PreExecute;
-import org.apache.hadoop.hive.ql.hooks.QueryLifeTimeHook;
-import org.apache.hadoop.hive.ql.hooks.QueryLifeTimeHookContext;
-import org.apache.hadoop.hive.ql.hooks.QueryLifeTimeHookContextImpl;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -101,7 +101,7 @@ import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext;
import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContextImpl;
import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.ParseDriver;
+import org.apache.hadoop.hive.ql.parse.ParseException;
import org.apache.hadoop.hive.ql.parse.ParseUtils;
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
@@ -119,8 +119,6 @@ import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObje
import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivObjectActionType;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivilegeObjectType;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzContext;
-import org.apache.hadoop.hive.ql.session.OperationLog;
-import org.apache.hadoop.hive.ql.session.OperationLog.LoggingLevel;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.hive.serde2.ByteStream;
@@ -129,7 +127,9 @@ import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.MRJobConfig;
+
import org.apache.hive.common.util.ShutdownHookManager;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -137,6 +137,7 @@ import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
+
public class Driver implements CommandProcessor {
static final private String CLASS_NAME = Driver.class.getName();
@@ -162,11 +163,6 @@ public class Driver implements CommandProcessor {
private FetchTask fetchTask;
List<HiveLock> hiveLocks = new ArrayList<HiveLock>();
- // A list of FileSinkOperators writing in an ACID compliant manner
- private Set<FileSinkDesc> acidSinks;
- // whether any ACID table is involved in a query
- private boolean acidInQuery;
-
// A limit on the number of threads that can be launched
private int maxthreads;
private int tryCount = Integer.MAX_VALUE;
@@ -184,7 +180,8 @@ public class Driver implements CommandProcessor {
private QueryState queryState;
// Query hooks that execute before compilation and after execution
- private List<QueryLifeTimeHook> queryHooks;
+ private QueryLifeTimeHookRunner queryLifeTimeHookRunner;
+ private final HooksLoader hooksLoader;
public enum DriverState {
INITIALIZED,
@@ -208,6 +205,25 @@ public class Driver implements CommandProcessor {
// resource releases
public final ReentrantLock stateLock = new ReentrantLock();
public DriverState driverState = DriverState.INITIALIZED;
+ private static ThreadLocal<LockedDriverState> lds = new ThreadLocal<LockedDriverState>() {
+ @Override
+ protected LockedDriverState initialValue() {
+ return new LockedDriverState();
+ }
+ };
+
+ public static void setLockedDriverState(LockedDriverState lDrv) {
+ lds.set(lDrv);
+ }
+
+ public static LockedDriverState getLockedDriverState() {
+ return lds.get();
+ }
+
+ public static void removeLockedDriverState() {
+ if (lds != null)
+ lds.remove();
+ }
}
private boolean checkConcurrency() {
@@ -354,11 +370,21 @@ public class Driver implements CommandProcessor {
}
public Driver(QueryState queryState, String userName) {
+ this(queryState, userName, new HooksLoader(queryState.getConf()));
+ }
+
+ public Driver(HiveConf conf, HooksLoader hooksLoader) {
+ this(new QueryState(conf), null, hooksLoader);
+ }
+
+ private Driver(QueryState queryState, String userName, HooksLoader hooksLoader) {
this.queryState = queryState;
this.conf = queryState.getConf();
isParallelEnabled = (conf != null)
&& HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_PARALLEL_COMPILATION);
this.userName = userName;
+ this.hooksLoader = hooksLoader;
+ this.queryLifeTimeHookRunner = new QueryLifeTimeHookRunner(conf, hooksLoader, console);
}
/**
@@ -386,7 +412,7 @@ public class Driver implements CommandProcessor {
// deferClose indicates if the close/destroy should be deferred when the process has been
// interrupted, it should be set to true if the compile is called within another method like
// runInternal, which defers the close to the called in that method.
- public int compile(String command, boolean resetTaskIds, boolean deferClose) {
+ private int compile(String command, boolean resetTaskIds, boolean deferClose) {
PerfLogger perfLogger = SessionState.getPerfLogger(true);
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_RUN);
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.COMPILE);
@@ -426,6 +452,8 @@ public class Driver implements CommandProcessor {
TaskFactory.resetId();
}
+ LockedDriverState.setLockedDriverState(lDrvState);
+
String queryId = conf.getVar(HiveConf.ConfVars.HIVEQUERYID);
//save some info for webUI for use after plan is freed
@@ -438,6 +466,8 @@ public class Driver implements CommandProcessor {
// Whether any error occurred during query compilation. Used for query lifetime hook.
boolean compileError = false;
+ boolean parseError = false;
+
try {
// Initialize the transaction manager. This must be done before analyze is called.
@@ -471,26 +501,27 @@ public class Driver implements CommandProcessor {
ctx.setHDFSCleanup(true);
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.PARSE);
- ASTNode tree = ParseUtils.parse(command, ctx);
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PARSE);
// Trigger query hook before compilation
- queryHooks = loadQueryHooks();
- if (queryHooks != null && !queryHooks.isEmpty()) {
- QueryLifeTimeHookContext qhc = new QueryLifeTimeHookContextImpl();
- qhc.setHiveConf(conf);
- qhc.setCommand(command);
-
- for (QueryLifeTimeHook hook : queryHooks) {
- hook.beforeCompile(qhc);
- }
+ queryLifeTimeHookRunner.runBeforeParseHook(command);
+
+ ASTNode tree;
+ try {
+ tree = ParseUtils.parse(command, ctx);
+ } catch (ParseException e) {
+ parseError = true;
+ throw e;
+ } finally {
+ queryLifeTimeHookRunner.runAfterParseHook(command, parseError);
}
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PARSE);
+
+ queryLifeTimeHookRunner.runBeforeCompileHook(command);
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ANALYZE);
BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree);
List<HiveSemanticAnalyzerHook> saHooks =
- getHooks(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK,
- HiveSemanticAnalyzerHook.class);
+ hooksLoader.getHooks(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK, console);
// Flush the metastore cache. This assures that we don't pick up objects from a previous
// query running in this same thread. This has to be done after we get our semantic
@@ -498,6 +529,15 @@ public class Driver implements CommandProcessor {
// because at that point we need access to the objects.
Hive.get().getMSC().flushCache();
+ if(checkConcurrency() && startImplicitTxn(txnManager)) {
+ String userFromUGI = getUserFromUGI();
+ if (!txnManager.isTxnOpen()) {
+ if(userFromUGI == null) {
+ return 10;
+ }
+ long txnid = txnManager.openTxn(ctx, userFromUGI);
+ }
+ }
// Do semantic analysis and plan generation
if (saHooks != null && !saHooks.isEmpty()) {
HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl();
@@ -516,15 +556,10 @@ public class Driver implements CommandProcessor {
} else {
sem.analyze(tree, ctx);
}
- // Record any ACID compliant FileSinkOperators we saw so we can add our transaction ID to
- // them later.
- acidSinks = sem.getAcidFileSinks();
-
LOG.info("Semantic Analysis Completed");
// validate the plan
sem.validate();
- acidInQuery = sem.hasAcidInQuery();
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE);
if (isInterrupted()) {
@@ -567,10 +602,8 @@ public class Driver implements CommandProcessor {
if (conf.getBoolVar(ConfVars.HIVE_LOG_EXPLAIN_OUTPUT)) {
String explainOutput = getExplainOutput(sem, plan, tree);
if (explainOutput != null) {
- if (conf.getBoolVar(ConfVars.HIVE_LOG_EXPLAIN_OUTPUT)) {
- LOG.info("EXPLAIN output for queryid " + queryId + " : "
- + explainOutput);
- }
+ LOG.info("EXPLAIN output for queryid " + queryId + " : "
+ + explainOutput);
if (conf.isWebUiQueryInfoCacheEnabled()) {
queryDisplay.setExplainPlan(explainOutput);
}
@@ -609,17 +642,12 @@ public class Driver implements CommandProcessor {
} finally {
// Trigger post compilation hook. Note that if the compilation fails here then
// before/after execution hook will never be executed.
- try {
- if (queryHooks != null && !queryHooks.isEmpty()) {
- QueryLifeTimeHookContext qhc = new QueryLifeTimeHookContextImpl();
- qhc.setHiveConf(conf);
- qhc.setCommand(command);
- for (QueryLifeTimeHook hook : queryHooks) {
- hook.afterCompile(qhc, compileError);
- }
+ if (!parseError) {
+ try {
+ queryLifeTimeHookRunner.runAfterCompilationHook(command, compileError);
+ } catch (Exception e) {
+ LOG.warn("Failed when invoking query after-compilation hook.", e);
}
- } catch (Exception e) {
- LOG.warn("Failed when invoking query after-compilation hook.", e);
}
double duration = perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.COMPILE)/1000.00;
@@ -649,11 +677,55 @@ public class Driver implements CommandProcessor {
}
}
-
+ private boolean startImplicitTxn(HiveTxnManager txnManager) throws LockException {
+ boolean shouldOpenImplicitTxn = !ctx.isExplainPlan();
+ //this is dumb. HiveOperation is not always set. see HIVE-16447/HIVE-16443
+ switch (queryState.getHiveOperation() == null ? HiveOperation.QUERY : queryState.getHiveOperation()) {
+ case COMMIT:
+ case ROLLBACK:
+ if(!txnManager.isTxnOpen()) {
+ throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, queryState.getHiveOperation().getOperationName());
+ }
+ case SWITCHDATABASE:
+ case SET_AUTOCOMMIT:
+ /**
+ * autocommit is here for completeness. TM doesn't use it. If we want to support JDBC
+ * semantics (or any other definition of autocommit) it should be done at session level.
+ */
+ case SHOWDATABASES:
+ case SHOWTABLES:
+ case SHOWCOLUMNS:
+ case SHOWFUNCTIONS:
+ case SHOWINDEXES:
+ case SHOWPARTITIONS:
+ case SHOWLOCKS:
+ case SHOWVIEWS:
+ case SHOW_ROLES:
+ case SHOW_ROLE_PRINCIPALS:
+ case SHOW_COMPACTIONS:
+ case SHOW_TRANSACTIONS:
+ case ABORT_TRANSACTIONS:
+ shouldOpenImplicitTxn = false;
+ //this implies that no locks are needed for such a command
+ }
+ return shouldOpenImplicitTxn;
+ }
private int handleInterruption(String msg) {
+ return handleInterruptionWithHook(msg, null, null);
+ }
+
+ private int handleInterruptionWithHook(String msg, HookContext hookContext,
+ PerfLogger perfLogger) {
SQLState = "HY008"; //SQLState for cancel operation
errorMessage = "FAILED: command has been interrupted: " + msg;
console.printError(errorMessage);
+ if (hookContext != null) {
+ try {
+ invokeFailureHooks(perfLogger, hookContext, errorMessage, null);
+ } catch (Exception e) {
+ LOG.warn("Caught exception attempting to invoke Failure Hooks", e);
+ }
+ }
return 1000;
}
@@ -670,19 +742,6 @@ public class Driver implements CommandProcessor {
}
}
- private List<QueryLifeTimeHook> loadQueryHooks() throws Exception {
- List<QueryLifeTimeHook> hooks = new ArrayList<>();
-
- if (conf.getBoolVar(ConfVars.HIVE_SERVER2_METRICS_ENABLED)) {
- hooks.add(new MetricsQueryLifeTimeHook());
- }
- List<QueryLifeTimeHook> propertyDefinedHoooks = getHooks(ConfVars.HIVE_QUERY_LIFETIME_HOOKS, QueryLifeTimeHook.class);
- if (propertyDefinedHoooks != null) {
- Iterables.addAll(hooks, propertyDefinedHoooks);
- }
- return hooks;
- }
-
private ImmutableMap<String, Long> dumpMetaCallTimingWithoutEx(String phase) {
try {
return Hive.get().dumpAndClearMetaCallTiming(phase);
@@ -1064,8 +1123,17 @@ public class Driver implements CommandProcessor {
// Write the current set of valid transactions into the conf file so that it can be read by
// the input format.
private void recordValidTxns() throws LockException {
+ ValidTxnList oldList = null;
+ String s = conf.get(ValidTxnList.VALID_TXNS_KEY);
+ if(s != null && s.length() > 0) {
+ oldList = new ValidReadTxnList(s);
+ }
HiveTxnManager txnMgr = SessionState.get().getTxnMgr();
ValidTxnList txns = txnMgr.getValidTxns();
+ if(oldList != null) {
+ throw new IllegalStateException("calling recordValidTxn() more than once in the same " +
+ JavaUtils.txnIdToString(txnMgr.getCurrentTxnId()));
+ }
String txnStr = txns.toString();
conf.set(ValidTxnList.VALID_TXNS_KEY, txnStr);
if(plan.getFetchTask() != null) {
@@ -1079,79 +1147,61 @@ public class Driver implements CommandProcessor {
LOG.debug("Encoding valid txns info " + txnStr + " txnid:" + txnMgr.getCurrentTxnId());
}
+ private String getUserFromUGI() {
+ // Don't use the userName member, as it may or may not have been set. Get the value from
+ // conf, which calls into getUGI to figure out who the process is running as.
+ try {
+ return conf.getUser();
+ } catch (IOException e) {
+ errorMessage = "FAILED: Error in determining user while acquiring locks: " + e.getMessage();
+ SQLState = ErrorMsg.findSQLState(e.getMessage());
+ downstreamError = e;
+ console.printError(errorMessage,
+ "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
+ }
+ return null;
+ }
/**
* Acquire read and write locks needed by the statement. The list of objects to be locked are
- * obtained from the inputs and outputs populated by the compiler. The lock acquisition scheme is
- * pretty simple. If all the locks cannot be obtained, error out. Deadlock is avoided by making
- * sure that the locks are lexicographically sorted.
+ * obtained from the inputs and outputs populated by the compiler. Locking strategy depends on
+ * HiveTxnManager and HiveLockManager configured
*
* This method also records the list of valid transactions. This must be done after any
- * transactions have been opened and locks acquired.
- * @param startTxnImplicitly in AC=false, the 1st DML starts a txn
+ * transactions have been opened.
**/
- private int acquireLocksAndOpenTxn(boolean startTxnImplicitly) {
+ private int acquireLocks() {
PerfLogger perfLogger = SessionState.getPerfLogger();
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ACQUIRE_READ_WRITE_LOCKS);
SessionState ss = SessionState.get();
HiveTxnManager txnMgr = ss.getTxnMgr();
- if(startTxnImplicitly) {
- assert !txnMgr.getAutoCommit();
+ if(!txnMgr.isTxnOpen() && txnMgr.supportsAcid()) {
+ /*non acid txn managers don't support txns but fwd lock requests to lock managers
+ acid txn manager requires all locks to be associated with a txn so if we
+ end up here w/o an open txn it's because we are processing something like "use <database>
+ which by definition needs no locks*/
+ return 0;
}
-
try {
- // Don't use the userName member, as it may or may not have been set. Get the value from
- // conf, which calls into getUGI to figure out who the process is running as.
- String userFromUGI;
- try {
- userFromUGI = conf.getUser();
- } catch (IOException e) {
- errorMessage = "FAILED: Error in determining user while acquiring locks: " + e.getMessage();
- SQLState = ErrorMsg.findSQLState(e.getMessage());
- downstreamError = e;
- console.printError(errorMessage,
- "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
+ String userFromUGI = getUserFromUGI();
+ if(userFromUGI == null) {
return 10;
}
-
- boolean initiatingTransaction = false;
- boolean readOnlyQueryInAutoCommit = false;
- if((txnMgr.getAutoCommit() && haveAcidWrite()) || plan.getOperation() == HiveOperation.START_TRANSACTION ||
- (!txnMgr.getAutoCommit() && startTxnImplicitly)) {
- if(txnMgr.isTxnOpen()) {
- throw new RuntimeException("Already have an open transaction txnid:" + txnMgr.getCurrentTxnId());
- }
- // We are writing to tables in an ACID compliant way, so we need to open a transaction
- txnMgr.openTxn(ctx, userFromUGI);
- initiatingTransaction = true;
- }
- else {
- readOnlyQueryInAutoCommit = txnMgr.getAutoCommit() && plan.getOperation() == HiveOperation.QUERY && !haveAcidWrite();
- }
// Set the transaction id in all of the acid file sinks
if (haveAcidWrite()) {
- for (FileSinkDesc desc : acidSinks) {
+ for (FileSinkDesc desc : plan.getAcidSinks()) {
desc.setTransactionId(txnMgr.getCurrentTxnId());
//it's possible to have > 1 FileSink writing to the same table/partition
//e.g. Merge stmt, multi-insert stmt when mixing DP and SP writes
desc.setStatementId(txnMgr.getWriteIdAndIncrement());
}
}
- /*Note, we have to record snapshot after lock acquisition to prevent lost update problem
- consider 2 concurrent "update table T set x = x + 1". 1st will get the locks and the
- 2nd will block until 1st one commits and only then lock in the snapshot, i.e. it will
- see the changes made by 1st one. This takes care of autoCommit=true case.
- For multi-stmt txns this is not sufficient and will be managed via WriteSet tracking
- in the lock manager.*/
+ /*It's imperative that {@code acquireLocks()} is called for all commands so that
+ HiveTxnManager can transition its state machine correctly*/
txnMgr.acquireLocks(plan, ctx, userFromUGI, lDrvState);
- if(initiatingTransaction || (readOnlyQueryInAutoCommit && acidInQuery)) {
- //For multi-stmt txns we should record the snapshot when txn starts but
- // don't update it after that until txn completes. Thus the check for {@code initiatingTransaction}
- //For autoCommit=true, Read-only statements, txn is implicit, i.e. lock in the snapshot
- //for each statement.
+ if(txnMgr.recordSnapshot(plan)) {
recordValidTxns();
}
-
return 0;
} catch (Exception e) {
errorMessage = "FAILED: Error in acquiring locks: " + e.getMessage();
@@ -1166,7 +1216,7 @@ public class Driver implements CommandProcessor {
}
private boolean haveAcidWrite() {
- return acidSinks != null && !acidSinks.isEmpty();
+ return !plan.getAcidSinks().isEmpty();
}
/**
* @param commit if there is an open transaction and if true, commit,
@@ -1174,11 +1224,11 @@ public class Driver implements CommandProcessor {
* @param txnManager an optional existing transaction manager retrieved earlier from the session
*
**/
- private void releaseLocksAndCommitOrRollback(boolean commit, HiveTxnManager txnManager)
+ @VisibleForTesting
+ public void releaseLocksAndCommitOrRollback(boolean commit, HiveTxnManager txnManager)
throws LockException {
PerfLogger perfLogger = SessionState.getPerfLogger();
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RELEASE_LOCKS);
-
HiveTxnManager txnMgr;
if (txnManager == null) {
SessionState ss = SessionState.get();
@@ -1188,6 +1238,7 @@ public class Driver implements CommandProcessor {
}
// If we've opened a transaction we need to commit or rollback rather than explicitly
// releasing the locks.
+ conf.unset(ValidTxnList.VALID_TXNS_KEY);
if (txnMgr.isTxnOpen()) {
if (commit) {
if(conf.getBoolVar(ConfVars.HIVE_IN_TEST) && conf.getBoolVar(ConfVars.HIVETESTMODEROLLBACKTXN)) {
@@ -1309,16 +1360,20 @@ public class Driver implements CommandProcessor {
metrics.incrementCounter(MetricsConstant.WAITING_COMPILE_OPS, 1);
}
+ PerfLogger perfLogger = SessionState.getPerfLogger();
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.WAIT_COMPILE);
final ReentrantLock compileLock = tryAcquireCompileLock(isParallelEnabled,
command);
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.WAIT_COMPILE);
+ if (metrics != null) {
+ metrics.decrementCounter(MetricsConstant.WAITING_COMPILE_OPS, 1);
+ }
+
if (compileLock == null) {
return ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCode();
}
try {
- if (metrics != null) {
- metrics.decrementCounter(MetricsConstant.WAITING_COMPILE_OPS, 1);
- }
ret = compile(command, true, deferClose);
} finally {
compileLock.unlock();
@@ -1336,7 +1391,6 @@ public class Driver implements CommandProcessor {
//Save compile-time PerfLogging for WebUI.
//Execution-time Perf logs are done by either another thread's PerfLogger
//or a reset PerfLogger.
- PerfLogger perfLogger = SessionState.getPerfLogger();
queryDisplay.setPerfLogStarts(QueryDisplay.Phase.COMPILATION, perfLogger.getStartTimes());
queryDisplay.setPerfLogEnds(QueryDisplay.Phase.COMPILATION, perfLogger.getEndTimes());
return ret;
@@ -1379,11 +1433,6 @@ public class Driver implements CommandProcessor {
LOG.debug("Waiting to acquire compile lock: " + command);
}
- OperationLog ol = OperationLog.getCurrentOperationLog();
- if (ol != null) {
- ol.writeOperationLog(LoggingLevel.EXECUTION, "Waiting to acquire compile lock.\n");
- }
-
if (maxCompileLockWaitTime > 0) {
try {
if(!compileLock.tryLock(maxCompileLockWaitTime, TimeUnit.SECONDS)) {
@@ -1403,9 +1452,6 @@ public class Driver implements CommandProcessor {
}
LOG.debug(lockAcquiredMsg);
- if (ol != null) {
- ol.writeOperationLog(LoggingLevel.EXECUTION, lockAcquiredMsg + "\n");
- }
return compileLock;
}
@@ -1414,6 +1460,8 @@ public class Driver implements CommandProcessor {
errorMessage = null;
SQLState = null;
downstreamError = null;
+ LockedDriverState.setLockedDriverState(lDrvState);
+
lDrvState.stateLock.lock();
try {
if (alreadyCompiled) {
@@ -1440,8 +1488,7 @@ public class Driver implements CommandProcessor {
// Get all the driver run hooks and pre-execute them.
List<HiveDriverRunHook> driverRunHooks;
try {
- driverRunHooks = getHooks(HiveConf.ConfVars.HIVE_DRIVER_RUN_HOOKS,
- HiveDriverRunHook.class);
+ driverRunHooks = hooksLoader.getHooks(HiveConf.ConfVars.HIVE_DRIVER_RUN_HOOKS, console);
for (HiveDriverRunHook driverRunHook : driverRunHooks) {
driverRunHook.preDriverRun(hookContext);
}
@@ -1477,52 +1524,12 @@ public class Driver implements CommandProcessor {
HiveTxnManager txnManager = SessionState.get().getTxnMgr();
ctx.setHiveTxnManager(txnManager);
- boolean startTxnImplicitly = false;
- {
- //this block ensures op makes sense in given context, e.g. COMMIT is valid only if txn is open
- //DDL is not allowed in a txn, etc.
- //an error in an open txn does a rollback of the txn
- if (txnManager.isTxnOpen() && !plan.getOperation().isAllowedInTransaction()) {
- assert !txnManager.getAutoCommit() : "didn't expect AC=true";
- return rollback(new CommandProcessorResponse(12, ErrorMsg.OP_NOT_ALLOWED_IN_TXN, null,
- plan.getOperationName(), Long.toString(txnManager.getCurrentTxnId())));
- }
- if(!txnManager.isTxnOpen() && plan.getOperation().isRequiresOpenTransaction()) {
- return rollback(new CommandProcessorResponse(12, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, null, plan.getOperationName()));
- }
- if(!txnManager.isTxnOpen() && plan.getOperation() == HiveOperation.QUERY && !txnManager.getAutoCommit()) {
- //this effectively makes START TRANSACTION optional and supports JDBC setAutoCommit(false) semantics
- //also, indirectly allows DDL to be executed outside a txn context
- startTxnImplicitly = true;
- }
- if(txnManager.getAutoCommit() && plan.getOperation() == HiveOperation.START_TRANSACTION) {
- return rollback(new CommandProcessorResponse(12, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT, null, plan.getOperationName()));
- }
- }
- if(plan.getOperation() == HiveOperation.SET_AUTOCOMMIT) {
- try {
- if(plan.getAutoCommitValue() && !txnManager.getAutoCommit()) {
- /*here, if there is an open txn, we want to commit it; this behavior matches
- * https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#setAutoCommit(boolean)*/
- releaseLocksAndCommitOrRollback(true, null);
- txnManager.setAutoCommit(true);
- }
- else if(!plan.getAutoCommitValue() && txnManager.getAutoCommit()) {
- txnManager.setAutoCommit(false);
- }
- else {/*didn't change autoCommit value - no-op*/}
- }
- catch(LockException e) {
- return handleHiveException(e, 12);
- }
- }
-
if (requiresLock()) {
// a checkpoint to see if the thread is interrupted or not before an expensive operation
if (isInterrupted()) {
ret = handleInterruption("at acquiring the lock.");
} else {
- ret = acquireLocksAndOpenTxn(startTxnImplicitly);
+ ret = acquireLocks();
}
if (ret != 0) {
return rollback(createProcessorResponse(ret));
@@ -1543,7 +1550,8 @@ public class Driver implements CommandProcessor {
//if needRequireLock is false, the release here will do nothing because there is no lock
try {
- if(txnManager.getAutoCommit() || plan.getOperation() == HiveOperation.COMMIT) {
+ //since set autocommit starts an implicit txn, close it
+ if(txnManager.isImplicitTransactionOpen() || plan.getOperation() == HiveOperation.COMMIT) {
releaseLocksAndCommitOrRollback(true, null);
}
else if(plan.getOperation() == HiveOperation.ROLLBACK) {
@@ -1712,35 +1720,14 @@ public class Driver implements CommandProcessor {
private CommandProcessorResponse createProcessorResponse(int ret) {
SessionState.getPerfLogger().cleanupPerfLogMetrics();
queryDisplay.setErrorMessage(errorMessage);
- return new CommandProcessorResponse(ret, errorMessage, SQLState, downstreamError);
- }
-
- /**
- * Returns a set of hooks specified in a configuration variable.
- * See getHooks(HiveConf.ConfVars hookConfVar, Class<T> clazz)
- */
- private List<Hook> getHooks(HiveConf.ConfVars hookConfVar) throws Exception {
- return getHooks(hookConfVar, Hook.class);
- }
-
- /**
- * Returns the hooks specified in a configuration variable.
- *
- * @param hookConfVar The configuration variable specifying a comma separated list of the hook
- * class names.
- * @param clazz The super type of the hooks.
- * @return A list of the hooks cast as the type specified in clazz, in the order
- * they are listed in the value of hookConfVar
- * @throws Exception
- */
- private <T extends Hook> List<T> getHooks(ConfVars hookConfVar,
- Class<T> clazz) throws Exception {
- try {
- return HookUtils.getHooks(conf, hookConfVar, clazz);
- } catch (ClassNotFoundException e) {
- console.printError(hookConfVar.varname + " Class not found:" + e.getMessage());
- throw e;
+ if(downstreamError != null && downstreamError instanceof HiveException) {
+ ErrorMsg em = ((HiveException)downstreamError).getCanonicalErrorMsg();
+ if(em != null) {
+ return new CommandProcessorResponse(ret, errorMessage, SQLState,
+ schema, downstreamError, em.getErrorCode(), null);
+ }
}
+ return new CommandProcessorResponse(ret, errorMessage, SQLState, downstreamError);
}
public int execute() throws CommandNeedRetryException {
@@ -1807,7 +1794,7 @@ public class Driver implements CommandProcessor {
ss.getSessionId(), Thread.currentThread().getName(), ss.isHiveServerQuery(), perfLogger);
hookContext.setHookType(HookContext.HookType.PRE_EXEC_HOOK);
- for (Hook peh : getHooks(HiveConf.ConfVars.PREEXECHOOKS)) {
+ for (Hook peh : hooksLoader.getHooks(HiveConf.ConfVars.PREEXECHOOKS, console)) {
if (peh instanceof ExecuteWithHookContext) {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.PRE_HOOK + peh.getClass().getName());
@@ -1825,16 +1812,7 @@ public class Driver implements CommandProcessor {
}
// Trigger query hooks before query execution.
- if (queryHooks != null && !queryHooks.isEmpty()) {
- QueryLifeTimeHookContext qhc = new QueryLifeTimeHookContextImpl();
- qhc.setHiveConf(conf);
- qhc.setCommand(queryStr);
- qhc.setHookContext(hookContext);
-
- for (QueryLifeTimeHook hook : queryHooks) {
- hook.beforeExecution(qhc);
- }
- }
+ queryLifeTimeHookRunner.runBeforeExecutionHook(queryStr, hookContext);
setQueryDisplays(plan.getRootTasks());
int mrJobs = Utilities.getMRTasks(plan.getRootTasks()).size();
@@ -1859,7 +1837,7 @@ public class Driver implements CommandProcessor {
// The main thread polls the TaskRunners to check if they have finished.
if (isInterrupted()) {
- return handleInterruption("before running tasks.");
+ return handleInterruptionWithHook("before running tasks.", hookContext, perfLogger);
}
DriverContext driverCxt = new DriverContext(ctx);
driverCxt.prepare(plan);
@@ -1909,7 +1887,7 @@ public class Driver implements CommandProcessor {
int exitVal = result.getExitVal();
if (isInterrupted()) {
- return handleInterruption("when checking the execution result.");
+ return handleInterruptionWithHook("when checking the execution result.", hookContext, perfLogger);
}
if (exitVal != 0) {
if (tsk.ifRetryCmdWhenFail()) {
@@ -1934,6 +1912,9 @@ public class Driver implements CommandProcessor {
} else {
setErrorMsgAndDetail(exitVal, result.getTaskError(), tsk);
+ if (driverCxt.isShutdown()) {
+ errorMessage = "FAILED: Operation cancelled. " + errorMessage;
+ }
invokeFailureHooks(perfLogger, hookContext,
errorMessage + Strings.nullToEmpty(tsk.getDiagnosticsMessage()), result.getTaskError());
SQLState = "08S01";
@@ -1992,7 +1973,7 @@ public class Driver implements CommandProcessor {
hookContext.setHookType(HookContext.HookType.POST_EXEC_HOOK);
// Get all the post execution hooks and execute them.
- for (Hook peh : getHooks(HiveConf.ConfVars.POSTEXECHOOKS)) {
+ for (Hook peh : hooksLoader.getHooks(HiveConf.ConfVars.POSTEXECHOOKS, console)) {
if (peh instanceof ExecuteWithHookContext) {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.POST_HOOK + peh.getClass().getName());
@@ -2022,7 +2003,7 @@ public class Driver implements CommandProcessor {
} catch (Throwable e) {
executionError = true;
if (isInterrupted()) {
- return handleInterruption("during query execution: \n" + e.getMessage());
+ return handleInterruptionWithHook("during query execution: \n" + e.getMessage(), hookContext, perfLogger);
}
ctx.restoreOriginalTracker();
@@ -2047,16 +2028,7 @@ public class Driver implements CommandProcessor {
} finally {
// Trigger query hooks after query completes its execution.
try {
- if (queryHooks != null && !queryHooks.isEmpty()) {
- QueryLifeTimeHookContext qhc = new QueryLifeTimeHookContextImpl();
- qhc.setHiveConf(conf);
- qhc.setCommand(queryStr);
- qhc.setHookContext(hookContext);
-
- for (QueryLifeTimeHook hook : queryHooks) {
- hook.afterExecution(qhc, executionError);
- }
- }
+ queryLifeTimeHookRunner.runAfterExecutionHook(queryStr, hookContext, executionError);
} catch (Exception e) {
LOG.warn("Failed when invoking query after execution hook", e);
}
@@ -2147,13 +2119,6 @@ public class Driver implements CommandProcessor {
}
String warning = HiveConf.generateMrDeprecationWarning();
LOG.warn(warning);
- warning = "WARNING: " + warning;
- console.printInfo(warning);
- // Propagate warning to beeline via operation log.
- OperationLog ol = OperationLog.getCurrentOperationLog();
- if (ol != null) {
- ol.writeOperationLog(LoggingLevel.EXECUTION, warning + "\n");
- }
}
private void setErrorMsgAndDetail(int exitVal, Throwable downstreamError, Task tsk) {
@@ -2178,7 +2143,7 @@ public class Driver implements CommandProcessor {
hookContext.setErrorMessage(errorMessage);
hookContext.setException(exception);
// Get all the failure execution hooks and execute them.
- for (Hook ofh : getHooks(HiveConf.ConfVars.ONFAILUREHOOKS)) {
+ for (Hook ofh : hooksLoader.getHooks(HiveConf.ConfVars.ONFAILUREHOOKS, console)) {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.FAILURE_HOOK + ofh.getClass().getName());
((ExecuteWithHookContext) ofh).run(hookContext);
@@ -2228,7 +2193,6 @@ public class Driver implements CommandProcessor {
if (LOG.isInfoEnabled()){
LOG.info("Starting task [" + tsk + "] in parallel");
}
- tskRun.setOperationLog(OperationLog.getCurrentOperationLog());
tskRun.start();
} else {
if (LOG.isInfoEnabled()){
@@ -2445,6 +2409,7 @@ public class Driver implements CommandProcessor {
lDrvState.driverState = DriverState.CLOSED;
} finally {
lDrvState.stateLock.unlock();
+ LockedDriverState.removeLockedDriverState();
}
if (SessionState.get() != null) {
SessionState.get().getLineageState().clear();
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 6a43385..d01a203 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -27,6 +27,7 @@ import java.util.regex.Pattern;
import org.antlr.runtime.tree.Tree;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.ASTNodeOrigin;
+import org.apache.hadoop.hive.ql.plan.AlterTableDesc.AlterTableTypes;
/**
* List of all error messages.
@@ -217,7 +218,7 @@ public enum ErrorMsg {
ALTER_COMMAND_FOR_VIEWS(10131, "To alter a view you need to use the ALTER VIEW command."),
ALTER_COMMAND_FOR_TABLES(10132, "To alter a base table you need to use the ALTER TABLE command."),
ALTER_VIEW_DISALLOWED_OP(10133, "Cannot use this form of ALTER on a view"),
- ALTER_TABLE_NON_NATIVE(10134, "ALTER TABLE cannot be used for a non-native table"),
+ ALTER_TABLE_NON_NATIVE(10134, "ALTER TABLE can only be used for " + AlterTableTypes.nonNativeTableAllowedTypes + " to a non-native table "),
SORTMERGE_MAPJOIN_FAILED(10135,
"Sort merge bucketed join could not be performed. " +
"If you really want to perform the operation, either set " +
@@ -410,8 +411,8 @@ public enum ErrorMsg {
INSERT_CANNOT_CREATE_TEMP_FILE(10293, "Unable to create temp file for insert values "),
ACID_OP_ON_NONACID_TXNMGR(10294, "Attempt to do update or delete using transaction manager that" +
" does not support these operations."),
- NO_INSERT_OVERWRITE_WITH_ACID(10295, "INSERT OVERWRITE not allowed on table with OutputFormat " +
- "that implements AcidOutputFormat while transaction manager that supports ACID is in use"),
+ NO_INSERT_OVERWRITE_WITH_ACID(10295, "INSERT OVERWRITE not allowed on table {0} with OutputFormat " +
+ "that implements AcidOutputFormat while transaction manager that supports ACID is in use", true),
VALUES_TABLE_CONSTRUCTOR_NOT_SUPPORTED(10296,
"Values clause with table constructor not yet supported"),
ACID_OP_ON_NONACID_TABLE(10297, "Attempt to do update or delete on table {0} that does not use " +
@@ -481,9 +482,17 @@ public enum ErrorMsg {
"is controlled by hive.exec.max.dynamic.partitions and hive.exec.max.dynamic.partitions.pernode. "),
PARTITION_SCAN_LIMIT_EXCEEDED(20005, "Number of partitions scanned (={0}) on table {1} exceeds limit" +
" (={2}). This is controlled by hive.limit.query.max.table.partition.", true),
- OP_NOT_ALLOWED_IN_AUTOCOMMIT(20006, "Operation {0} is not allowed when autoCommit=true.", true),//todo: better SQLState?
- OP_NOT_ALLOWED_IN_TXN(20007, "Operation {0} is not allowed in a transaction. TransactionID={1}.", true),
- OP_NOT_ALLOWED_WITHOUT_TXN(20008, "Operation {0} is not allowed since autoCommit=false and there is no active transaction", true),
+ /**
+ * {1} is the transaction id;
+ * use {@link org.apache.hadoop.hive.common.JavaUtils#txnIdToString(long)} to format
+ */
+ OP_NOT_ALLOWED_IN_IMPLICIT_TXN(20006, "Operation {0} is not allowed in an implicit transaction ({1}).", true),
+ /**
+ * {1} is the transaction id;
+ * use {@link org.apache.hadoop.hive.common.JavaUtils#txnIdToString(long)} to format
+ */
+ OP_NOT_ALLOWED_IN_TXN(20007, "Operation {0} is not allowed in a transaction ({1},queryId={2}).", true),
+ OP_NOT_ALLOWED_WITHOUT_TXN(20008, "Operation {0} is not allowed without an active transaction", true),
//========================== 30000 range starts here ========================//
STATSPUBLISHER_NOT_OBTAINED(30000, "StatsPublisher cannot be obtained. " +
"There was a error to retrieve the StatsPublisher, and retrying " +
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/QueryLifeTimeHookRunner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryLifeTimeHookRunner.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryLifeTimeHookRunner.java
new file mode 100644
index 0000000..85e038c
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryLifeTimeHookRunner.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.hadoop.hive.ql;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.hooks.HookContext;
+import org.apache.hadoop.hive.ql.hooks.HooksLoader;
+import org.apache.hadoop.hive.ql.hooks.MetricsQueryLifeTimeHook;
+import org.apache.hadoop.hive.ql.hooks.QueryLifeTimeHook;
+import org.apache.hadoop.hive.ql.hooks.QueryLifeTimeHookContext;
+import org.apache.hadoop.hive.ql.hooks.QueryLifeTimeHookContextImpl;
+import org.apache.hadoop.hive.ql.hooks.QueryLifeTimeHookWithParseHooks;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+
+/**
+ * A runner class for {@link QueryLifeTimeHook}s and {@link QueryLifeTimeHookWithParseHooks}. The class has run methods
+ * for each phase of a {@link QueryLifeTimeHook} and {@link QueryLifeTimeHookWithParseHooks}. Each run method checks if
+ * a list of hooks has be specified, and if so invokes the appropriate callback method of each hook. Each method
+ * constructs a {@link QueryLifeTimeHookContext} object and pass it to the callback functions.
+ */
+class QueryLifeTimeHookRunner {
+
+ private final HiveConf conf;
+ private final List<QueryLifeTimeHook> queryHooks;
+
+ /**
+ * Constructs a {@link QueryLifeTimeHookRunner} that loads all hooks to be run via a {@link HooksLoader}.
+ *
+ * @param conf the {@link HiveConf} to use when creating {@link QueryLifeTimeHookContext} objects
+ * @param hooksLoader the {@link HooksLoader} to use when loading all hooks to be run
+ * @param console the {@link SessionState.LogHelper} to use when running {@link HooksLoader#getHooks(HiveConf.ConfVars)}
+ */
+ QueryLifeTimeHookRunner(HiveConf conf, HooksLoader hooksLoader, SessionState.LogHelper console) {
+ this.conf = conf;
+ this.queryHooks = new ArrayList<>();
+
+ if (conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED)) {
+ queryHooks.add(new MetricsQueryLifeTimeHook());
+ }
+ List<QueryLifeTimeHook> propertyDefinedHoooks;
+ try {
+ propertyDefinedHoooks = hooksLoader.getHooks(
+ HiveConf.ConfVars.HIVE_QUERY_LIFETIME_HOOKS, console);
+ } catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) {
+ throw new IllegalArgumentException(e);
+ }
+ if (propertyDefinedHoooks != null) {
+ Iterables.addAll(queryHooks, propertyDefinedHoooks);
+ }
+ }
+
+ /**
+ * If {@link QueryLifeTimeHookWithParseHooks} have been loaded via the {@link HooksLoader} then invoke the
+ * {@link QueryLifeTimeHookWithParseHooks#beforeParse(QueryLifeTimeHookContext)} method for each
+ * {@link QueryLifeTimeHookWithParseHooks}.
+ *
+ * @param command the Hive command that is being run
+ */
+ void runBeforeParseHook(String command) {
+ if (containsHooks()) {
+ QueryLifeTimeHookContext qhc = new QueryLifeTimeHookContextImpl.Builder().withHiveConf(conf).withCommand(
+ command).build();
+
+ for (QueryLifeTimeHook hook : queryHooks) {
+ if (hook instanceof QueryLifeTimeHookWithParseHooks) {
+ ((QueryLifeTimeHookWithParseHooks) hook).beforeParse(qhc);
+ }
+ }
+ }
+ }
+
+ /**
+ * If {@link QueryLifeTimeHookWithParseHooks} have been loaded via the {@link HooksLoader} then invoke the
+ * {@link QueryLifeTimeHookWithParseHooks#afterParse(QueryLifeTimeHookContext, boolean)} method for each
+ * {@link QueryLifeTimeHookWithParseHooks}.
+ *
+ * @param command the Hive command that is being run
+ * @param parseError true if there was an error while parsing the command, false otherwise
+ */
+ void runAfterParseHook(String command, boolean parseError) {
+ if (containsHooks()) {
+ QueryLifeTimeHookContext qhc = new QueryLifeTimeHookContextImpl.Builder().withHiveConf(conf).withCommand(
+ command).build();
+
+ for (QueryLifeTimeHook hook : queryHooks) {
+ if (hook instanceof QueryLifeTimeHookWithParseHooks) {
+ ((QueryLifeTimeHookWithParseHooks) hook).afterParse(qhc, parseError);
+ }
+ }
+ }
+ }
+
+ /**
+ * Invoke the {@link QueryLifeTimeHook#beforeCompile(QueryLifeTimeHookContext)} method for each {@link QueryLifeTimeHook}
+ *
+ * @param command the Hive command that is being run
+ */
+ void runBeforeCompileHook(String command) {
+ if (containsHooks()) {
+ QueryLifeTimeHookContext qhc = new QueryLifeTimeHookContextImpl.Builder().withHiveConf(conf).withCommand(
+ command).build();
+
+ for (QueryLifeTimeHook hook : queryHooks) {
+ hook.beforeCompile(qhc);
+ }
+ }
+ }
+
+ /**
+ * Invoke the {@link QueryLifeTimeHook#afterCompile(QueryLifeTimeHookContext, boolean)} method for each {@link QueryLifeTimeHook}
+ *
+ * @param command the Hive command that is being run
+ * @param compileError true if there was an error while compiling the command, false otherwise
+ */
+ void runAfterCompilationHook(String command, boolean compileError) {
+ if (containsHooks()) {
+ QueryLifeTimeHookContext qhc = new QueryLifeTimeHookContextImpl.Builder().withHiveConf(conf).withCommand(
+ command).build();
+
+ for (QueryLifeTimeHook hook : queryHooks) {
+ hook.afterCompile(qhc, compileError);
+ }
+ }
+ }
+
+ /**
+ * Invoke the {@link QueryLifeTimeHook#beforeExecution(QueryLifeTimeHookContext)} method for each {@link QueryLifeTimeHook}
+ *
+ * @param command the Hive command that is being run
+ * @param hookContext the {@link HookContext} of the command being run
+ */
+ void runBeforeExecutionHook(String command, HookContext hookContext) {
+ if (containsHooks()) {
+ QueryLifeTimeHookContext qhc = new QueryLifeTimeHookContextImpl.Builder().withHiveConf(conf).withCommand(
+ command).withHookContext(hookContext).build();
+
+ for (QueryLifeTimeHook hook : queryHooks) {
+ hook.beforeExecution(qhc);
+ }
+ }
+ }
+
+ /**
+ * Invoke the {@link QueryLifeTimeHook#afterExecution(QueryLifeTimeHookContext, boolean)} method for each {@link QueryLifeTimeHook}
+ *
+ * @param command the Hive command that is being run
+ * @param hookContext the {@link HookContext} of the command being run
+ * @param executionError true if there was an error while executing the command, false otherwise
+ */
+ void runAfterExecutionHook(String command, HookContext hookContext, boolean executionError) {
+ if (containsHooks()) {
+ QueryLifeTimeHookContext qhc = new QueryLifeTimeHookContextImpl.Builder().withHiveConf(conf).withCommand(
+ command).withHookContext(hookContext).build();
+
+ for (QueryLifeTimeHook hook : queryHooks) {
+ hook.afterExecution(qhc, executionError);
+ }
+ }
+ }
+
+ private boolean containsHooks() {
+ return queryHooks != null && !queryHooks.isEmpty();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
index e8c8ae6..2ddabd9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
@@ -35,6 +35,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hive.metastore.api.Schema;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
import org.apache.hadoop.hive.ql.exec.ExplainTask;
@@ -48,6 +49,7 @@ import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.ColumnAccessInfo;
import org.apache.hadoop.hive.ql.parse.TableAccessInfo;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReducerTimeStatsPerJob;
@@ -105,11 +107,19 @@ public class QueryPlan implements Serializable {
private transient Long queryStartTime;
private final HiveOperation operation;
+ private final boolean acidResourcesInQuery;
+ private final Set<FileSinkDesc> acidSinks;
private Boolean autoCommitValue;
public QueryPlan() {
- this.reducerTimeStatsPerJobList = new ArrayList<ReducerTimeStatsPerJob>();
- operation = null;
+ this(null);
+ }
+ @VisibleForTesting
+ protected QueryPlan(HiveOperation command) {
+ this.reducerTimeStatsPerJobList = new ArrayList<>();
+ this.operation = command;
+ this.acidResourcesInQuery = false;
+ this.acidSinks = Collections.emptySet();
}
public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, String queryId,
@@ -136,8 +146,22 @@ public class QueryPlan implements Serializable {
this.operation = operation;
this.autoCommitValue = sem.getAutoCommitValue();
this.resultSchema = resultSchema;
+ this.acidResourcesInQuery = sem.hasAcidInQuery();
+ this.acidSinks = sem.getAcidFileSinks();
}
+ /**
+ * @return true if any acid resources are read/written
+ */
+ public boolean hasAcidResourcesInQuery() {
+ return acidResourcesInQuery;
+ }
+ /**
+ * @return Collection of FileSinkDesc representing writes to Acid resources
+ */
+ Set<FileSinkDesc> getAcidSinks() {
+ return acidSinks;
+ }
public String getQueryStr() {
return queryString;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/ArchiveUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ArchiveUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ArchiveUtils.java
index 6381a21..f7fad94 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ArchiveUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ArchiveUtils.java
@@ -28,8 +28,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
@@ -40,7 +38,6 @@ import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.shims.HadoopShims;
/**
* ArchiveUtils.
@@ -48,9 +45,7 @@ import org.apache.hadoop.hive.shims.HadoopShims;
*/
@SuppressWarnings("nls")
public final class ArchiveUtils {
- private static final Logger LOG = LoggerFactory.getLogger(ArchiveUtils.class.getName());
-
- public static String ARCHIVING_LEVEL = "archiving_level";
+ public static final String ARCHIVING_LEVEL = "archiving_level";
/**
* PartSpecInfo keeps fields and values extracted from partial partition info
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java
index e3da7f0..bb8dcbb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec;
import java.io.Serializable;
+import org.apache.hadoop.hive.common.StringInternUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
@@ -96,7 +97,7 @@ public class ColumnInfo implements Serializable {
this.tabAlias = tabAlias;
this.isVirtualCol = isVirtualCol;
this.isHiddenVirtualCol = isHiddenVirtualCol;
- this.typeName = getType().getTypeName();
+ setTypeName(getType().getTypeName());
}
public ColumnInfo(ColumnInfo columnInfo) {
@@ -114,7 +115,7 @@ public class ColumnInfo implements Serializable {
}
public void setTypeName(String typeName) {
- this.typeName = typeName;
+ this.typeName = StringInternUtils.internIfNotNull(typeName);
}
public TypeInfo getType() {
@@ -160,7 +161,7 @@ public class ColumnInfo implements Serializable {
}
public void setAlias(String col_alias) {
- alias = col_alias;
+ alias = StringInternUtils.internIfNotNull(col_alias);
}
public String getAlias() {
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java
index a899964..d96f432 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java
@@ -110,8 +110,12 @@ public class ColumnStatsTask extends Task<ColumnStatsWork> implements Serializab
}
}
+ @SuppressWarnings("serial")
+ class UnsupportedDoubleException extends Exception {
+ }
+
private void unpackDoubleStats(ObjectInspector oi, Object o, String fName,
- ColumnStatisticsObj statsObj) {
+ ColumnStatisticsObj statsObj) throws UnsupportedDoubleException {
if (fName.equals("countnulls")) {
long v = ((LongObjectInspector) oi).get(o);
statsObj.getStatsData().getDoubleStats().setNumNulls(v);
@@ -120,9 +124,15 @@ public class ColumnStatsTask extends Task<ColumnStatsWork> implements Serializab
statsObj.getStatsData().getDoubleStats().setNumDVs(v);
} else if (fName.equals("max")) {
double d = ((DoubleObjectInspector) oi).get(o);
+ if (Double.isInfinite(d) || Double.isNaN(d)) {
+ throw new UnsupportedDoubleException();
+ }
statsObj.getStatsData().getDoubleStats().setHighValue(d);
} else if (fName.equals("min")) {
double d = ((DoubleObjectInspector) oi).get(o);
+ if (Double.isInfinite(d) || Double.isNaN(d)) {
+ throw new UnsupportedDoubleException();
+ }
statsObj.getStatsData().getDoubleStats().setLowValue(d);
} else if (fName.equals("ndvbitvector")) {
PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi;
@@ -234,7 +244,7 @@ public class ColumnStatsTask extends Task<ColumnStatsWork> implements Serializab
}
private void unpackPrimitiveObject (ObjectInspector oi, Object o, String fieldName,
- ColumnStatisticsObj statsObj) {
+ ColumnStatisticsObj statsObj) throws UnsupportedDoubleException {
if (o == null) {
return;
}
@@ -294,7 +304,7 @@ public class ColumnStatsTask extends Task<ColumnStatsWork> implements Serializab
}
private void unpackStructObject(ObjectInspector oi, Object o, String fName,
- ColumnStatisticsObj cStatsObj) {
+ ColumnStatisticsObj cStatsObj) throws UnsupportedDoubleException {
if (oi.getCategory() != ObjectInspector.Category.STRUCT) {
throw new RuntimeException("Invalid object datatype : " + oi.getCategory().toString());
}
@@ -351,8 +361,13 @@ public class ColumnStatsTask extends Task<ColumnStatsWork> implements Serializab
ColumnStatisticsObj statsObj = new ColumnStatisticsObj();
statsObj.setColName(colName.get(i));
statsObj.setColType(colType.get(i));
- unpackStructObject(foi, f, fieldName, statsObj);
- statsObjs.add(statsObj);
+ try {
+ unpackStructObject(foi, f, fieldName, statsObj);
+ statsObjs.add(statsObj);
+ } catch (UnsupportedDoubleException e) {
+ // due to infinity or nan.
+ LOG.info("Because " + colName.get(i) + " is infinite or NaN, we skip stats.");
+ }
}
if (!isTblLevel) {
@@ -371,7 +386,9 @@ public class ColumnStatsTask extends Task<ColumnStatsWork> implements Serializab
ColumnStatistics colStats = new ColumnStatistics();
colStats.setStatsDesc(statsDesc);
colStats.setStatsObj(statsObjs);
- stats.add(colStats);
+ if (!statsObjs.isEmpty()) {
+ stats.add(colStats);
+ }
}
ftOp.clearFetchContext();
return stats;
@@ -398,6 +415,9 @@ public class ColumnStatsTask extends Task<ColumnStatsWork> implements Serializab
List<ColumnStatistics> colStats = constructColumnStatsFromPackedRows(db);
// Persist the column statistics object to the metastore
// Note, this function is shared for both table and partition column stats.
+ if (colStats.isEmpty()) {
+ return 0;
+ }
SetPartitionsStatsRequest request = new SetPartitionsStatsRequest(colStats);
if (work.getColStats() != null && work.getColStats().getNumBitVector() > 0) {
request.setNeedMerge(true);
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
index e8526f6..82f6074 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
@@ -80,8 +80,7 @@ public class CopyTask extends Task<CopyWork> implements Serializable {
}
}
- boolean inheritPerms = conf.getBoolVar(HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
- if (!FileUtils.mkdir(dstFs, toPath, inheritPerms, conf)) {
+ if (!FileUtils.mkdir(dstFs, toPath, conf)) {
console.printError("Cannot make target directory: " + toPath.toString());
return 2;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index a1a0862..81e4744 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -3330,20 +3330,30 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
if (tbl.isPartitioned() && part == null) {
// No partitioned specified for partitioned table, lets fetch all.
Map<String,String> tblProps = tbl.getParameters() == null ? new HashMap<String,String>() : tbl.getParameters();
- PartitionIterable parts = new PartitionIterable(db, tbl, null, conf.getIntVar(HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX));
+ Map<String, Long> valueMap = new HashMap<>();
+ Map<String, Boolean> stateMap = new HashMap<>();
for (String stat : StatsSetupConst.supportedStats) {
- boolean state = true;
- long statVal = 0l;
- for (Partition partition : parts) {
- Map<String,String> props = partition.getParameters();
- state &= StatsSetupConst.areBasicStatsUptoDate(props);
+ valueMap.put(stat, 0L);
+ stateMap.put(stat, true);
+ }
+ PartitionIterable parts = new PartitionIterable(db, tbl, null, conf.getIntVar(HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX));
+ int numParts = 0;
+ for (Partition partition : parts) {
+ Map<String, String> props = partition.getParameters();
+ Boolean state = StatsSetupConst.areBasicStatsUptoDate(props);
+ for (String stat : StatsSetupConst.supportedStats) {
+ stateMap.put(stat, stateMap.get(stat) && state);
if (props != null && props.get(stat) != null) {
- statVal += Long.parseLong(props.get(stat));
+ valueMap.put(stat, valueMap.get(stat) + Long.parseLong(props.get(stat)));
}
}
- StatsSetupConst.setBasicStatsState(tblProps, Boolean.toString(state));
- tblProps.put(stat, String.valueOf(statVal));
+ numParts++;
}
+ for (String stat : StatsSetupConst.supportedStats) {
+ StatsSetupConst.setBasicStatsState(tblProps, Boolean.toString(stateMap.get(stat)));
+ tblProps.put(stat, valueMap.get(stat).toString());
+ }
+ tblProps.put(StatsSetupConst.NUM_PARTITIONS, Integer.toString(numParts));
tbl.setParameters(tblProps);
}
} else {
@@ -4866,32 +4876,8 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
String tableName = truncateTableDesc.getTableName();
Map<String, String> partSpec = truncateTableDesc.getPartSpec();
- Table table = db.getTable(tableName, true);
-
try {
- // this is not transactional
- for (Path location : getLocations(db, table, partSpec)) {
- FileSystem fs = location.getFileSystem(conf);
- HadoopShims.HdfsEncryptionShim shim
- = ShimLoader.getHadoopShims().createHdfsEncryptionShim(fs, conf);
- if (!shim.isPathEncrypted(location)) {
- HdfsUtils.HadoopFileStatus status = new HdfsUtils.HadoopFileStatus(conf, fs, location);
- FileStatus targetStatus = fs.getFileStatus(location);
- String targetGroup = targetStatus == null ? null : targetStatus.getGroup();
- FileUtils.moveToTrash(fs, location, conf);
- fs.mkdirs(location);
- HdfsUtils.setFullFileStatus(conf, status, targetGroup, fs, location, false);
- } else {
- FileStatus[] statuses = fs.listStatus(location, FileUtils.HIDDEN_FILES_PATH_FILTER);
- if (statuses == null || statuses.length == 0) {
- continue;
- }
- boolean success = Hive.trashFiles(fs, statuses, conf);
- if (!success) {
- throw new HiveException("Error in deleting the contents of " + location.toString());
- }
- }
- }
+ db.truncateTable(tableName, partSpec);
} catch (Exception e) {
throw new HiveException(e, ErrorMsg.GENERIC_ERROR);
}
@@ -4922,58 +4908,6 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
return 0;
}
- private List<Path> getLocations(Hive db, Table table, Map<String, String> partSpec)
- throws HiveException, InvalidOperationException {
- List<Path> locations = new ArrayList<Path>();
- if (partSpec == null) {
- if (table.isPartitioned()) {
- for (Partition partition : db.getPartitions(table)) {
- locations.add(partition.getDataLocation());
- EnvironmentContext environmentContext = new EnvironmentContext();
- if (needToUpdateStats(partition.getParameters(), environmentContext)) {
- db.alterPartition(table.getDbName(), table.getTableName(), partition, environmentContext);
- }
- }
- } else {
- locations.add(table.getPath());
- EnvironmentContext environmentContext = new EnvironmentContext();
- if (needToUpdateStats(table.getParameters(), environmentContext)) {
- db.alterTable(table.getDbName()+"."+table.getTableName(), table, environmentContext);
- }
- }
- } else {
- for (Partition partition : db.getPartitionsByNames(table, partSpec)) {
- locations.add(partition.getDataLocation());
- EnvironmentContext environmentContext = new EnvironmentContext();
- if (needToUpdateStats(partition.getParameters(), environmentContext)) {
- db.alterPartition(table.getDbName(), table.getTableName(), partition, environmentContext);
- }
- }
- }
- return locations;
- }
-
- private boolean needToUpdateStats(Map<String,String> props, EnvironmentContext environmentContext) {
- if (null == props) {
- return false;
- }
- boolean statsPresent = false;
- for (String stat : StatsSetupConst.supportedStats) {
- String statVal = props.get(stat);
- if (statVal != null && Long.parseLong(statVal) > 0) {
- statsPresent = true;
- //In the case of truncate table, we set the stats to be 0.
- props.put(stat, "0");
- }
- }
- //first set basic stats to true
- StatsSetupConst.setBasicStatsState(props, StatsSetupConst.TRUE);
- environmentContext.putToProperties(StatsSetupConst.STATS_GENERATED, StatsSetupConst.TASK);
- //then invalidate column stats
- StatsSetupConst.clearColumnStatsState(props);
- return statsPresent;
- }
-
@Override
public StageType getType() {
return StageType.DDL;
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
index d35e3ba..4c24ab4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
@@ -54,9 +54,11 @@ import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkCommonOperator;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
@@ -795,9 +797,12 @@ public class ExplainTask extends Task<ExplainWork> implements Serializable {
if (jsonOut != null && jsonOut.length() > 0) {
((JSONObject) jsonOut.get(JSONObject.getNames(jsonOut)[0])).put("OperatorId:",
operator.getOperatorId());
- if (!this.work.isUserLevelExplain() && this.work.isFormatted()
- && operator instanceof ReduceSinkOperator) {
- List<String> outputOperators = ((ReduceSinkOperator) operator).getConf().getOutputOperators();
+ if (!this.work.isUserLevelExplain()
+ && this.work.isFormatted()
+ && (operator instanceof ReduceSinkOperator
+ || operator instanceof VectorReduceSinkOperator || operator instanceof VectorReduceSinkCommonOperator)) {
+ List<String> outputOperators = ((ReduceSinkDesc) operator.getConf())
+ .getOutputOperators();
if (outputOperators != null) {
((JSONObject) jsonOut.get(JSONObject.getNames(jsonOut)[0])).put(OUTPUT_OPERATORS,
Arrays.toString(outputOperators.toArray()));
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeConstantDefaultEvaluator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeConstantDefaultEvaluator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeConstantDefaultEvaluator.java
deleted file mode 100644
index f53c3e3..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeConstantDefaultEvaluator.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDefaultDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
-import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-
-/**
- * ExprNodeConstantEvaluator.
- *
- */
-public class ExprNodeConstantDefaultEvaluator extends ExprNodeEvaluator<ExprNodeConstantDefaultDesc> {
-
- transient ObjectInspector writableObjectInspector;
-
- public ExprNodeConstantDefaultEvaluator(ExprNodeConstantDefaultDesc expr) {
- this(expr, null);
- }
-
- public ExprNodeConstantDefaultEvaluator(ExprNodeConstantDefaultDesc expr, Configuration conf) {
- super(expr, conf);
- writableObjectInspector = expr.getWritableObjectInspector();
- }
-
- @Override
- public ObjectInspector initialize(ObjectInspector rowInspector) throws HiveException {
- return writableObjectInspector;
- }
-
- @Override
- protected Object _evaluate(Object row, int version) throws HiveException {
- return expr;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorFactory.java
index 34aec55..cc40cae 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorFactory.java
@@ -24,7 +24,6 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDefaultDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicValueDesc;
@@ -50,11 +49,6 @@ public final class ExprNodeEvaluatorFactory {
return new ExprNodeConstantEvaluator((ExprNodeConstantDesc) desc, conf);
}
- // Special 'default' constant node
- if (desc instanceof ExprNodeConstantDefaultDesc) {
- return new ExprNodeConstantDefaultEvaluator((ExprNodeConstantDefaultDesc) desc);
- }
-
// Column-reference node, e.g. a column in the input row
if (desc instanceof ExprNodeColumnDesc) {
return new ExprNodeColumnEvaluator((ExprNodeColumnDesc) desc, conf);
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
index 4102d02..a3e4c9f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -393,6 +394,9 @@ public class FetchOperator implements Serializable {
inputSplits = splitSampling(work.getSplitSample(), inputSplits);
}
if (inputSplits.length > 0) {
+ if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_IN_TEST)) {
+ Arrays.sort(inputSplits, new FetchInputFormatSplitComparator());
+ }
return inputSplits;
}
}
@@ -738,6 +742,18 @@ public class FetchOperator implements Serializable {
}
}
+ private static class FetchInputFormatSplitComparator implements Comparator<FetchInputFormatSplit> {
+ @Override
+ public int compare(FetchInputFormatSplit a, FetchInputFormatSplit b) {
+ final Path ap = a.getPath();
+ final Path bp = b.getPath();
+ if (ap != null) {
+ return (ap.compareTo(bp));
+ }
+ return Long.signum(a.getLength() - b.getLength());
+ }
+ }
+
public Configuration getJobConf() {
return job;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 3ad1733..8e74b2e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hive.common.HiveStatsUtils;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.common.ValidWriteIds;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConfUtil;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.ErrorMsg;
@@ -147,7 +148,6 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
protected transient long cntr = 1;
protected transient long logEveryNRows = 0;
protected transient int rowIndex = 0;
- private transient boolean inheritPerms = false;
/**
* Counters.
*/
@@ -256,7 +256,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
if ((bDynParts || isSkewedStoredAsSubDirectories)
&& !fs.exists(finalPaths[idx].getParent())) {
Utilities.LOG14535.info("commit making path for dyn/skew: " + finalPaths[idx].getParent());
- FileUtils.mkdir(fs, finalPaths[idx].getParent(), inheritPerms, hconf);
+ FileUtils.mkdir(fs, finalPaths[idx].getParent(), hconf);
}
// If we're updating or deleting there may be no file to close. This can happen
// because the where clause strained out all of the records for a given bucket. So
@@ -501,7 +501,6 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
serializer = (Serializer) conf.getTableInfo().getDeserializerClass().newInstance();
serializer.initialize(unsetNestedColumnPaths(hconf), conf.getTableInfo().getProperties());
outputClass = serializer.getSerializedClass();
- inheritPerms = HiveConf.getBoolVar(hconf, ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
if (isLogInfoEnabled) {
LOG.info("Using serializer : " + serializer + " and formatter : " + hiveOutputFormat +
@@ -601,13 +600,10 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
}
private void logOutputFormatError(Configuration hconf, HiveException ex) {
- StringWriter errorWriter = new StringWriter();
+ StringBuilder errorWriter = new StringBuilder();
errorWriter.append("Failed to create output format; configuration: ");
- try {
- Configuration.dumpConfiguration(hconf, errorWriter);
- } catch (IOException ex2) {
- errorWriter.append("{ failed to dump configuration: " + ex2.getMessage() + " }");
- }
+ // redact sensitive information before logging
+ HiveConfUtil.dumpConfig(hconf, errorWriter);
Properties tdp = null;
if (this.conf.getTableInfo() != null
&& (tdp = this.conf.getTableInfo().getProperties()) != null) {
@@ -739,7 +735,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
conf.getWriteType() == AcidUtils.Operation.INSERT_ONLY) {
Path outPath = fsp.outPaths[filesIdx];
if ((conf.getWriteType() == AcidUtils.Operation.INSERT_ONLY || conf.isMmTable())
- && inheritPerms && !FileUtils.mkdir(fs, outPath.getParent(), inheritPerms, hconf)) {
+ && !FileUtils.mkdir(fs, outPath.getParent(), hconf)) {
LOG.warn("Unable to create directory with inheritPerms: " + outPath);
}
fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter(jc, conf.getTableInfo(),
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
index 4ac25c2..1b556ac 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
@@ -67,7 +67,7 @@ import org.apache.hadoop.hive.ql.udf.UDFFromUnixTime;
import org.apache.hadoop.hive.ql.udf.UDFHex;
import org.apache.hadoop.hive.ql.udf.UDFHour;
import org.apache.hadoop.hive.ql.udf.UDFJson;
-import org.apache.hadoop.hive.ql.udf.UDFLength;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLength;
import org.apache.hadoop.hive.ql.udf.UDFLike;
import org.apache.hadoop.hive.ql.udf.UDFLn;
import org.apache.hadoop.hive.ql.udf.UDFLog;
@@ -262,13 +262,18 @@ public final class FunctionRegistry {
system.registerGenericUDF("trim", GenericUDFTrim.class);
system.registerGenericUDF("ltrim", GenericUDFLTrim.class);
system.registerGenericUDF("rtrim", GenericUDFRTrim.class);
- system.registerUDF("length", UDFLength.class, false);
+ system.registerGenericUDF("length", GenericUDFLength.class);
+ system.registerGenericUDF("character_length", GenericUDFCharacterLength.class);
+ system.registerGenericUDF("char_length", GenericUDFCharacterLength.class);
+ system.registerGenericUDF("octet_length", GenericUDFOctetLength.class);
system.registerUDF("reverse", UDFReverse.class, false);
system.registerGenericUDF("field", GenericUDFField.class);
system.registerUDF("find_in_set", UDFFindInSet.class, false);
system.registerGenericUDF("initcap", GenericUDFInitCap.class);
system.registerUDF("like", UDFLike.class, true);
+ system.registerGenericUDF("likeany", GenericUDFLikeAny.class);
+ system.registerGenericUDF("likeall", GenericUDFLikeAll.class);
system.registerGenericUDF("rlike", GenericUDFRegExp.class);
system.registerGenericUDF("regexp", GenericUDFRegExp.class);
system.registerUDF("regexp_replace", UDFRegExpReplace.class, false);
@@ -418,6 +423,16 @@ public final class FunctionRegistry {
system.registerGenericUDAF("covar_pop", new GenericUDAFCovariance());
system.registerGenericUDAF("covar_samp", new GenericUDAFCovarianceSample());
system.registerGenericUDAF("corr", new GenericUDAFCorrelation());
+ system.registerGenericUDAF("regr_slope", new GenericUDAFBinarySetFunctions.RegrSlope());
+ system.registerGenericUDAF("regr_intercept", new GenericUDAFBinarySetFunctions.RegrIntercept());
+ system.registerGenericUDAF("regr_r2", new GenericUDAFBinarySetFunctions.RegrR2());
+ system.registerGenericUDAF("regr_sxx", new GenericUDAFBinarySetFunctions.RegrSXX());
+ system.registerGenericUDAF("regr_syy", new GenericUDAFBinarySetFunctions.RegrSYY());
+ system.registerGenericUDAF("regr_sxy", new GenericUDAFBinarySetFunctions.RegrSXY());
+ system.registerGenericUDAF("regr_avgx", new GenericUDAFBinarySetFunctions.RegrAvgX());
+ system.registerGenericUDAF("regr_avgy", new GenericUDAFBinarySetFunctions.RegrAvgY());
+ system.registerGenericUDAF("regr_count", new GenericUDAFBinarySetFunctions.RegrCount());
+
system.registerGenericUDAF("histogram_numeric", new GenericUDAFHistogramNumeric());
system.registerGenericUDAF("percentile_approx", new GenericUDAFPercentileApprox());
system.registerGenericUDAF("collect_set", new GenericUDAFCollectSet());
@@ -444,6 +459,7 @@ public final class FunctionRegistry {
system.registerGenericUDF("struct", GenericUDFStruct.class);
system.registerGenericUDF("named_struct", GenericUDFNamedStruct.class);
system.registerGenericUDF("create_union", GenericUDFUnion.class);
+ system.registerGenericUDF("extract_union", GenericUDFExtractUnion.class);
system.registerGenericUDF("case", GenericUDFCase.class);
system.registerGenericUDF("when", GenericUDFWhen.class);
@@ -467,6 +483,7 @@ public final class FunctionRegistry {
system.registerGenericUDF("greatest", GenericUDFGreatest.class);
system.registerGenericUDF("least", GenericUDFLeast.class);
system.registerGenericUDF("cardinality_violation", GenericUDFCardinalityViolation.class);
+ system.registerGenericUDF("width_bucket", GenericUDFWidthBucket.class);
system.registerGenericUDF("from_utc_timestamp", GenericUDFFromUtcTimestamp.class);
system.registerGenericUDF("to_utc_timestamp", GenericUDFToUtcTimestamp.class);
@@ -764,7 +781,7 @@ public final class FunctionRegistry {
*
* @return null if no common class could be found.
*/
- public static TypeInfo getCommonClassForComparison(TypeInfo a, TypeInfo b) {
+ public static synchronized TypeInfo getCommonClassForComparison(TypeInfo a, TypeInfo b) {
// If same return one of them
if (a.equals(b)) {
return a;
@@ -1475,6 +1492,20 @@ public final class FunctionRegistry {
}
/**
+ * Returns whether the fn is an exact equality comparison.
+ */
+ public static boolean isEq(GenericUDF fn) {
+ return fn instanceof GenericUDFOPEqual;
+ }
+
+ /**
+ * Returns whether the fn is an exact non-equality comparison.
+ */
+ public static boolean isNeq(GenericUDF fn) {
+ return fn instanceof GenericUDFOPNotEqual;
+ }
+
+ /**
* Returns whether the exprNodeDesc is a node of "positive".
*/
public static boolean isOpPositive(ExprNodeDesc desc) {
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
index 6d6c608..f8b55da 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
@@ -33,6 +33,7 @@ import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.LlapDaemonInfo;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.OpParseContext;
@@ -402,8 +403,8 @@ public class GroupByOperator extends Operator<GroupByDesc> {
newKeys = keyWrapperFactory.getKeyWrapper();
isTez = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez");
- isLlap = isTez && HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE).equals("llap");
- numExecutors = isLlap ? HiveConf.getIntVar(hconf, HiveConf.ConfVars.LLAP_DAEMON_NUM_EXECUTORS) : 1;
+ isLlap = LlapDaemonInfo.INSTANCE.isLlap();
+ numExecutors = isLlap ? LlapDaemonInfo.INSTANCE.getNumExecutors() : 1;
firstRow = true;
// estimate the number of hash table entries based on the size of each
// entry. Since the size of a entry
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index 29b72a0..56be518 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -153,7 +153,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
throw new HiveException("Target " + targetPath + " is not a local directory.");
}
} else {
- if (!FileUtils.mkdir(dstFs, targetPath, false, conf)) {
+ if (!FileUtils.mkdir(dstFs, targetPath, conf)) {
throw new HiveException("Failed to create local target directory " + targetPath);
}
}
@@ -182,9 +182,6 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
actualPath = actualPath.getParent();
}
fs.mkdirs(mkDirPath);
- if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS)) {
- HdfsUtils.setFullFileStatus(conf, new HdfsUtils.HadoopFileStatus(conf, fs, actualPath), fs, mkDirPath, true);
- }
}
return deletePath;
}
@@ -418,7 +415,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
Utilities.LOG14535.info("loadPartition called from " + tbd.getSourcePath()
+ " into " + tbd.getTable().getTableName());
boolean isCommitMmWrite = tbd.isCommitMmWrite();
- db.loadSinglePartition(tbd.getSourcePath(), tbd.getTable().getTableName(),
+ db.loadPartition(tbd.getSourcePath(), tbd.getTable().getTableName(),
tbd.getPartitionSpec(), tbd.getReplace(),
tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd), work.isSrcLocal(),
(work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID &&