You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/08 20:42:59 UTC
[09/51] [partial] hive git commit: Revert "HIVE-14671 : merge master
into hive-14535 (Wei Zheng)"
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 3230c61..0b615cd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -37,13 +37,10 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
-import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
-
import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.hive.common.JavaUtils;
-import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIds;
import org.apache.hadoop.hive.common.metrics.common.Metrics;
@@ -74,9 +71,12 @@ import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
import org.apache.hadoop.hive.ql.hooks.Hook;
import org.apache.hadoop.hive.ql.hooks.HookContext;
import org.apache.hadoop.hive.ql.hooks.HookUtils;
-import org.apache.hadoop.hive.ql.hooks.HooksLoader;
+import org.apache.hadoop.hive.ql.hooks.MetricsQueryLifeTimeHook;
import org.apache.hadoop.hive.ql.hooks.PostExecute;
import org.apache.hadoop.hive.ql.hooks.PreExecute;
+import org.apache.hadoop.hive.ql.hooks.QueryLifeTimeHook;
+import org.apache.hadoop.hive.ql.hooks.QueryLifeTimeHookContext;
+import org.apache.hadoop.hive.ql.hooks.QueryLifeTimeHookContextImpl;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -101,7 +101,7 @@ import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext;
import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContextImpl;
import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.ParseException;
+import org.apache.hadoop.hive.ql.parse.ParseDriver;
import org.apache.hadoop.hive.ql.parse.ParseUtils;
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
@@ -119,6 +119,8 @@ import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObje
import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivObjectActionType;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivilegeObjectType;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzContext;
+import org.apache.hadoop.hive.ql.session.OperationLog;
+import org.apache.hadoop.hive.ql.session.OperationLog.LoggingLevel;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.hive.serde2.ByteStream;
@@ -127,9 +129,7 @@ import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.MRJobConfig;
-
import org.apache.hive.common.util.ShutdownHookManager;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -137,7 +137,6 @@ import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
-
public class Driver implements CommandProcessor {
static final private String CLASS_NAME = Driver.class.getName();
@@ -163,6 +162,11 @@ public class Driver implements CommandProcessor {
private FetchTask fetchTask;
List<HiveLock> hiveLocks = new ArrayList<HiveLock>();
+ // A list of FileSinkOperators writing in an ACID compliant manner
+ private Set<FileSinkDesc> acidSinks;
+ // whether any ACID table is involved in a query
+ private boolean acidInQuery;
+
// A limit on the number of threads that can be launched
private int maxthreads;
private int tryCount = Integer.MAX_VALUE;
@@ -180,8 +184,7 @@ public class Driver implements CommandProcessor {
private QueryState queryState;
// Query hooks that execute before compilation and after execution
- private QueryLifeTimeHookRunner queryLifeTimeHookRunner;
- private final HooksLoader hooksLoader;
+ private List<QueryLifeTimeHook> queryHooks;
public enum DriverState {
INITIALIZED,
@@ -205,25 +208,6 @@ public class Driver implements CommandProcessor {
// resource releases
public final ReentrantLock stateLock = new ReentrantLock();
public DriverState driverState = DriverState.INITIALIZED;
- private static ThreadLocal<LockedDriverState> lds = new ThreadLocal<LockedDriverState>() {
- @Override
- protected LockedDriverState initialValue() {
- return new LockedDriverState();
- }
- };
-
- public static void setLockedDriverState(LockedDriverState lDrv) {
- lds.set(lDrv);
- }
-
- public static LockedDriverState getLockedDriverState() {
- return lds.get();
- }
-
- public static void removeLockedDriverState() {
- if (lds != null)
- lds.remove();
- }
}
private boolean checkConcurrency() {
@@ -370,21 +354,11 @@ public class Driver implements CommandProcessor {
}
public Driver(QueryState queryState, String userName) {
- this(queryState, userName, new HooksLoader(queryState.getConf()));
- }
-
- public Driver(HiveConf conf, HooksLoader hooksLoader) {
- this(new QueryState(conf), null, hooksLoader);
- }
-
- private Driver(QueryState queryState, String userName, HooksLoader hooksLoader) {
this.queryState = queryState;
this.conf = queryState.getConf();
isParallelEnabled = (conf != null)
&& HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_PARALLEL_COMPILATION);
this.userName = userName;
- this.hooksLoader = hooksLoader;
- this.queryLifeTimeHookRunner = new QueryLifeTimeHookRunner(conf, hooksLoader, console);
}
/**
@@ -412,7 +386,7 @@ public class Driver implements CommandProcessor {
// deferClose indicates if the close/destroy should be deferred when the process has been
// interrupted, it should be set to true if the compile is called within another method like
// runInternal, which defers the close to the called in that method.
- private int compile(String command, boolean resetTaskIds, boolean deferClose) {
+ public int compile(String command, boolean resetTaskIds, boolean deferClose) {
PerfLogger perfLogger = SessionState.getPerfLogger(true);
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_RUN);
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.COMPILE);
@@ -452,8 +426,6 @@ public class Driver implements CommandProcessor {
TaskFactory.resetId();
}
- LockedDriverState.setLockedDriverState(lDrvState);
-
String queryId = conf.getVar(HiveConf.ConfVars.HIVEQUERYID);
//save some info for webUI for use after plan is freed
@@ -466,8 +438,6 @@ public class Driver implements CommandProcessor {
// Whether any error occurred during query compilation. Used for query lifetime hook.
boolean compileError = false;
- boolean parseError = false;
-
try {
// Initialize the transaction manager. This must be done before analyze is called.
@@ -501,27 +471,26 @@ public class Driver implements CommandProcessor {
ctx.setHDFSCleanup(true);
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.PARSE);
+ ASTNode tree = ParseUtils.parse(command, ctx);
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PARSE);
// Trigger query hook before compilation
- queryLifeTimeHookRunner.runBeforeParseHook(command);
-
- ASTNode tree;
- try {
- tree = ParseUtils.parse(command, ctx);
- } catch (ParseException e) {
- parseError = true;
- throw e;
- } finally {
- queryLifeTimeHookRunner.runAfterParseHook(command, parseError);
+ queryHooks = loadQueryHooks();
+ if (queryHooks != null && !queryHooks.isEmpty()) {
+ QueryLifeTimeHookContext qhc = new QueryLifeTimeHookContextImpl();
+ qhc.setHiveConf(conf);
+ qhc.setCommand(command);
+
+ for (QueryLifeTimeHook hook : queryHooks) {
+ hook.beforeCompile(qhc);
+ }
}
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PARSE);
-
- queryLifeTimeHookRunner.runBeforeCompileHook(command);
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ANALYZE);
BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree);
List<HiveSemanticAnalyzerHook> saHooks =
- hooksLoader.getHooks(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK, console);
+ getHooks(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK,
+ HiveSemanticAnalyzerHook.class);
// Flush the metastore cache. This assures that we don't pick up objects from a previous
// query running in this same thread. This has to be done after we get our semantic
@@ -529,15 +498,6 @@ public class Driver implements CommandProcessor {
// because at that point we need access to the objects.
Hive.get().getMSC().flushCache();
- if(checkConcurrency() && startImplicitTxn(txnManager)) {
- String userFromUGI = getUserFromUGI();
- if (!txnManager.isTxnOpen()) {
- if(userFromUGI == null) {
- return 10;
- }
- long txnid = txnManager.openTxn(ctx, userFromUGI);
- }
- }
// Do semantic analysis and plan generation
if (saHooks != null && !saHooks.isEmpty()) {
HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl();
@@ -556,10 +516,15 @@ public class Driver implements CommandProcessor {
} else {
sem.analyze(tree, ctx);
}
+ // Record any ACID compliant FileSinkOperators we saw so we can add our transaction ID to
+ // them later.
+ acidSinks = sem.getAcidFileSinks();
+
LOG.info("Semantic Analysis Completed");
// validate the plan
sem.validate();
+ acidInQuery = sem.hasAcidInQuery();
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE);
if (isInterrupted()) {
@@ -602,8 +567,10 @@ public class Driver implements CommandProcessor {
if (conf.getBoolVar(ConfVars.HIVE_LOG_EXPLAIN_OUTPUT)) {
String explainOutput = getExplainOutput(sem, plan, tree);
if (explainOutput != null) {
- LOG.info("EXPLAIN output for queryid " + queryId + " : "
- + explainOutput);
+ if (conf.getBoolVar(ConfVars.HIVE_LOG_EXPLAIN_OUTPUT)) {
+ LOG.info("EXPLAIN output for queryid " + queryId + " : "
+ + explainOutput);
+ }
if (conf.isWebUiQueryInfoCacheEnabled()) {
queryDisplay.setExplainPlan(explainOutput);
}
@@ -642,12 +609,17 @@ public class Driver implements CommandProcessor {
} finally {
// Trigger post compilation hook. Note that if the compilation fails here then
// before/after execution hook will never be executed.
- if (!parseError) {
- try {
- queryLifeTimeHookRunner.runAfterCompilationHook(command, compileError);
- } catch (Exception e) {
- LOG.warn("Failed when invoking query after-compilation hook.", e);
+ try {
+ if (queryHooks != null && !queryHooks.isEmpty()) {
+ QueryLifeTimeHookContext qhc = new QueryLifeTimeHookContextImpl();
+ qhc.setHiveConf(conf);
+ qhc.setCommand(command);
+ for (QueryLifeTimeHook hook : queryHooks) {
+ hook.afterCompile(qhc, compileError);
+ }
}
+ } catch (Exception e) {
+ LOG.warn("Failed when invoking query after-compilation hook.", e);
}
double duration = perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.COMPILE)/1000.00;
@@ -677,55 +649,11 @@ public class Driver implements CommandProcessor {
}
}
- private boolean startImplicitTxn(HiveTxnManager txnManager) throws LockException {
- boolean shouldOpenImplicitTxn = !ctx.isExplainPlan();
- //this is dumb. HiveOperation is not always set. see HIVE-16447/HIVE-16443
- switch (queryState.getHiveOperation() == null ? HiveOperation.QUERY : queryState.getHiveOperation()) {
- case COMMIT:
- case ROLLBACK:
- if(!txnManager.isTxnOpen()) {
- throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, queryState.getHiveOperation().getOperationName());
- }
- case SWITCHDATABASE:
- case SET_AUTOCOMMIT:
- /**
- * autocommit is here for completeness. TM doesn't use it. If we want to support JDBC
- * semantics (or any other definition of autocommit) it should be done at session level.
- */
- case SHOWDATABASES:
- case SHOWTABLES:
- case SHOWCOLUMNS:
- case SHOWFUNCTIONS:
- case SHOWINDEXES:
- case SHOWPARTITIONS:
- case SHOWLOCKS:
- case SHOWVIEWS:
- case SHOW_ROLES:
- case SHOW_ROLE_PRINCIPALS:
- case SHOW_COMPACTIONS:
- case SHOW_TRANSACTIONS:
- case ABORT_TRANSACTIONS:
- shouldOpenImplicitTxn = false;
- //this implies that no locks are needed for such a command
- }
- return shouldOpenImplicitTxn;
- }
- private int handleInterruption(String msg) {
- return handleInterruptionWithHook(msg, null, null);
- }
- private int handleInterruptionWithHook(String msg, HookContext hookContext,
- PerfLogger perfLogger) {
+ private int handleInterruption(String msg) {
SQLState = "HY008"; //SQLState for cancel operation
errorMessage = "FAILED: command has been interrupted: " + msg;
console.printError(errorMessage);
- if (hookContext != null) {
- try {
- invokeFailureHooks(perfLogger, hookContext, errorMessage, null);
- } catch (Exception e) {
- LOG.warn("Caught exception attempting to invoke Failure Hooks", e);
- }
- }
return 1000;
}
@@ -742,6 +670,19 @@ public class Driver implements CommandProcessor {
}
}
+ private List<QueryLifeTimeHook> loadQueryHooks() throws Exception {
+ List<QueryLifeTimeHook> hooks = new ArrayList<>();
+
+ if (conf.getBoolVar(ConfVars.HIVE_SERVER2_METRICS_ENABLED)) {
+ hooks.add(new MetricsQueryLifeTimeHook());
+ }
+ List<QueryLifeTimeHook> propertyDefinedHoooks = getHooks(ConfVars.HIVE_QUERY_LIFETIME_HOOKS, QueryLifeTimeHook.class);
+ if (propertyDefinedHoooks != null) {
+ Iterables.addAll(hooks, propertyDefinedHoooks);
+ }
+ return hooks;
+ }
+
private ImmutableMap<String, Long> dumpMetaCallTimingWithoutEx(String phase) {
try {
return Hive.get().dumpAndClearMetaCallTiming(phase);
@@ -1123,17 +1064,8 @@ public class Driver implements CommandProcessor {
// Write the current set of valid transactions into the conf file so that it can be read by
// the input format.
private void recordValidTxns() throws LockException {
- ValidTxnList oldList = null;
- String s = conf.get(ValidTxnList.VALID_TXNS_KEY);
- if(s != null && s.length() > 0) {
- oldList = new ValidReadTxnList(s);
- }
HiveTxnManager txnMgr = SessionState.get().getTxnMgr();
ValidTxnList txns = txnMgr.getValidTxns();
- if(oldList != null) {
- throw new IllegalStateException("calling recordValidTxn() more than once in the same " +
- JavaUtils.txnIdToString(txnMgr.getCurrentTxnId()));
- }
String txnStr = txns.toString();
conf.set(ValidTxnList.VALID_TXNS_KEY, txnStr);
if(plan.getFetchTask() != null) {
@@ -1147,61 +1079,79 @@ public class Driver implements CommandProcessor {
LOG.debug("Encoding valid txns info " + txnStr + " txnid:" + txnMgr.getCurrentTxnId());
}
- private String getUserFromUGI() {
- // Don't use the userName member, as it may or may not have been set. Get the value from
- // conf, which calls into getUGI to figure out who the process is running as.
- try {
- return conf.getUser();
- } catch (IOException e) {
- errorMessage = "FAILED: Error in determining user while acquiring locks: " + e.getMessage();
- SQLState = ErrorMsg.findSQLState(e.getMessage());
- downstreamError = e;
- console.printError(errorMessage,
- "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
- }
- return null;
- }
/**
* Acquire read and write locks needed by the statement. The list of objects to be locked are
- * obtained from the inputs and outputs populated by the compiler. Locking strategy depends on
- * HiveTxnManager and HiveLockManager configured
+ * obtained from the inputs and outputs populated by the compiler. The lock acquisition scheme is
+ * pretty simple. If all the locks cannot be obtained, error out. Deadlock is avoided by making
+ * sure that the locks are lexicographically sorted.
*
* This method also records the list of valid transactions. This must be done after any
- * transactions have been opened.
+ * transactions have been opened and locks acquired.
+ * @param startTxnImplicitly in AC=false, the 1st DML starts a txn
**/
- private int acquireLocks() {
+ private int acquireLocksAndOpenTxn(boolean startTxnImplicitly) {
PerfLogger perfLogger = SessionState.getPerfLogger();
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ACQUIRE_READ_WRITE_LOCKS);
SessionState ss = SessionState.get();
HiveTxnManager txnMgr = ss.getTxnMgr();
- if(!txnMgr.isTxnOpen() && txnMgr.supportsAcid()) {
- /*non acid txn managers don't support txns but fwd lock requests to lock managers
- acid txn manager requires all locks to be associated with a txn so if we
- end up here w/o an open txn it's because we are processing something like "use <database>
- which by definition needs no locks*/
- return 0;
+ if(startTxnImplicitly) {
+ assert !txnMgr.getAutoCommit();
}
+
try {
- String userFromUGI = getUserFromUGI();
- if(userFromUGI == null) {
+ // Don't use the userName member, as it may or may not have been set. Get the value from
+ // conf, which calls into getUGI to figure out who the process is running as.
+ String userFromUGI;
+ try {
+ userFromUGI = conf.getUser();
+ } catch (IOException e) {
+ errorMessage = "FAILED: Error in determining user while acquiring locks: " + e.getMessage();
+ SQLState = ErrorMsg.findSQLState(e.getMessage());
+ downstreamError = e;
+ console.printError(errorMessage,
+ "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
return 10;
}
+
+ boolean initiatingTransaction = false;
+ boolean readOnlyQueryInAutoCommit = false;
+ if((txnMgr.getAutoCommit() && haveAcidWrite()) || plan.getOperation() == HiveOperation.START_TRANSACTION ||
+ (!txnMgr.getAutoCommit() && startTxnImplicitly)) {
+ if(txnMgr.isTxnOpen()) {
+ throw new RuntimeException("Already have an open transaction txnid:" + txnMgr.getCurrentTxnId());
+ }
+ // We are writing to tables in an ACID compliant way, so we need to open a transaction
+ txnMgr.openTxn(ctx, userFromUGI);
+ initiatingTransaction = true;
+ }
+ else {
+ readOnlyQueryInAutoCommit = txnMgr.getAutoCommit() && plan.getOperation() == HiveOperation.QUERY && !haveAcidWrite();
+ }
// Set the transaction id in all of the acid file sinks
if (haveAcidWrite()) {
- for (FileSinkDesc desc : plan.getAcidSinks()) {
+ for (FileSinkDesc desc : acidSinks) {
desc.setTransactionId(txnMgr.getCurrentTxnId());
//it's possible to have > 1 FileSink writing to the same table/partition
//e.g. Merge stmt, multi-insert stmt when mixing DP and SP writes
desc.setStatementId(txnMgr.getWriteIdAndIncrement());
}
}
- /*It's imperative that {@code acquireLocks()} is called for all commands so that
- HiveTxnManager can transition its state machine correctly*/
+ /*Note, we have to record snapshot after lock acquisition to prevent lost update problem
+ consider 2 concurrent "update table T set x = x + 1". 1st will get the locks and the
+ 2nd will block until 1st one commits and only then lock in the snapshot, i.e. it will
+ see the changes made by 1st one. This takes care of autoCommit=true case.
+ For multi-stmt txns this is not sufficient and will be managed via WriteSet tracking
+ in the lock manager.*/
txnMgr.acquireLocks(plan, ctx, userFromUGI, lDrvState);
- if(txnMgr.recordSnapshot(plan)) {
+ if(initiatingTransaction || (readOnlyQueryInAutoCommit && acidInQuery)) {
+ //For multi-stmt txns we should record the snapshot when txn starts but
+ // don't update it after that until txn completes. Thus the check for {@code initiatingTransaction}
+ //For autoCommit=true, Read-only statements, txn is implicit, i.e. lock in the snapshot
+ //for each statement.
recordValidTxns();
}
+
return 0;
} catch (Exception e) {
errorMessage = "FAILED: Error in acquiring locks: " + e.getMessage();
@@ -1216,7 +1166,7 @@ public class Driver implements CommandProcessor {
}
private boolean haveAcidWrite() {
- return !plan.getAcidSinks().isEmpty();
+ return acidSinks != null && !acidSinks.isEmpty();
}
/**
* @param commit if there is an open transaction and if true, commit,
@@ -1224,11 +1174,11 @@ public class Driver implements CommandProcessor {
* @param txnManager an optional existing transaction manager retrieved earlier from the session
*
**/
- @VisibleForTesting
- public void releaseLocksAndCommitOrRollback(boolean commit, HiveTxnManager txnManager)
+ private void releaseLocksAndCommitOrRollback(boolean commit, HiveTxnManager txnManager)
throws LockException {
PerfLogger perfLogger = SessionState.getPerfLogger();
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RELEASE_LOCKS);
+
HiveTxnManager txnMgr;
if (txnManager == null) {
SessionState ss = SessionState.get();
@@ -1238,7 +1188,6 @@ public class Driver implements CommandProcessor {
}
// If we've opened a transaction we need to commit or rollback rather than explicitly
// releasing the locks.
- conf.unset(ValidTxnList.VALID_TXNS_KEY);
if (txnMgr.isTxnOpen()) {
if (commit) {
if(conf.getBoolVar(ConfVars.HIVE_IN_TEST) && conf.getBoolVar(ConfVars.HIVETESTMODEROLLBACKTXN)) {
@@ -1360,20 +1309,16 @@ public class Driver implements CommandProcessor {
metrics.incrementCounter(MetricsConstant.WAITING_COMPILE_OPS, 1);
}
- PerfLogger perfLogger = SessionState.getPerfLogger();
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.WAIT_COMPILE);
final ReentrantLock compileLock = tryAcquireCompileLock(isParallelEnabled,
command);
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.WAIT_COMPILE);
- if (metrics != null) {
- metrics.decrementCounter(MetricsConstant.WAITING_COMPILE_OPS, 1);
- }
-
if (compileLock == null) {
return ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCode();
}
try {
+ if (metrics != null) {
+ metrics.decrementCounter(MetricsConstant.WAITING_COMPILE_OPS, 1);
+ }
ret = compile(command, true, deferClose);
} finally {
compileLock.unlock();
@@ -1391,6 +1336,7 @@ public class Driver implements CommandProcessor {
//Save compile-time PerfLogging for WebUI.
//Execution-time Perf logs are done by either another thread's PerfLogger
//or a reset PerfLogger.
+ PerfLogger perfLogger = SessionState.getPerfLogger();
queryDisplay.setPerfLogStarts(QueryDisplay.Phase.COMPILATION, perfLogger.getStartTimes());
queryDisplay.setPerfLogEnds(QueryDisplay.Phase.COMPILATION, perfLogger.getEndTimes());
return ret;
@@ -1433,6 +1379,11 @@ public class Driver implements CommandProcessor {
LOG.debug("Waiting to acquire compile lock: " + command);
}
+ OperationLog ol = OperationLog.getCurrentOperationLog();
+ if (ol != null) {
+ ol.writeOperationLog(LoggingLevel.EXECUTION, "Waiting to acquire compile lock.\n");
+ }
+
if (maxCompileLockWaitTime > 0) {
try {
if(!compileLock.tryLock(maxCompileLockWaitTime, TimeUnit.SECONDS)) {
@@ -1452,6 +1403,9 @@ public class Driver implements CommandProcessor {
}
LOG.debug(lockAcquiredMsg);
+ if (ol != null) {
+ ol.writeOperationLog(LoggingLevel.EXECUTION, lockAcquiredMsg + "\n");
+ }
return compileLock;
}
@@ -1460,8 +1414,6 @@ public class Driver implements CommandProcessor {
errorMessage = null;
SQLState = null;
downstreamError = null;
- LockedDriverState.setLockedDriverState(lDrvState);
-
lDrvState.stateLock.lock();
try {
if (alreadyCompiled) {
@@ -1488,7 +1440,8 @@ public class Driver implements CommandProcessor {
// Get all the driver run hooks and pre-execute them.
List<HiveDriverRunHook> driverRunHooks;
try {
- driverRunHooks = hooksLoader.getHooks(HiveConf.ConfVars.HIVE_DRIVER_RUN_HOOKS, console);
+ driverRunHooks = getHooks(HiveConf.ConfVars.HIVE_DRIVER_RUN_HOOKS,
+ HiveDriverRunHook.class);
for (HiveDriverRunHook driverRunHook : driverRunHooks) {
driverRunHook.preDriverRun(hookContext);
}
@@ -1524,12 +1477,52 @@ public class Driver implements CommandProcessor {
HiveTxnManager txnManager = SessionState.get().getTxnMgr();
ctx.setHiveTxnManager(txnManager);
+ boolean startTxnImplicitly = false;
+ {
+ //this block ensures op makes sense in given context, e.g. COMMIT is valid only if txn is open
+ //DDL is not allowed in a txn, etc.
+ //an error in an open txn does a rollback of the txn
+ if (txnManager.isTxnOpen() && !plan.getOperation().isAllowedInTransaction()) {
+ assert !txnManager.getAutoCommit() : "didn't expect AC=true";
+ return rollback(new CommandProcessorResponse(12, ErrorMsg.OP_NOT_ALLOWED_IN_TXN, null,
+ plan.getOperationName(), Long.toString(txnManager.getCurrentTxnId())));
+ }
+ if(!txnManager.isTxnOpen() && plan.getOperation().isRequiresOpenTransaction()) {
+ return rollback(new CommandProcessorResponse(12, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, null, plan.getOperationName()));
+ }
+ if(!txnManager.isTxnOpen() && plan.getOperation() == HiveOperation.QUERY && !txnManager.getAutoCommit()) {
+ //this effectively makes START TRANSACTION optional and supports JDBC setAutoCommit(false) semantics
+ //also, indirectly allows DDL to be executed outside a txn context
+ startTxnImplicitly = true;
+ }
+ if(txnManager.getAutoCommit() && plan.getOperation() == HiveOperation.START_TRANSACTION) {
+ return rollback(new CommandProcessorResponse(12, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT, null, plan.getOperationName()));
+ }
+ }
+ if(plan.getOperation() == HiveOperation.SET_AUTOCOMMIT) {
+ try {
+ if(plan.getAutoCommitValue() && !txnManager.getAutoCommit()) {
+ /*here, if there is an open txn, we want to commit it; this behavior matches
+ * https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#setAutoCommit(boolean)*/
+ releaseLocksAndCommitOrRollback(true, null);
+ txnManager.setAutoCommit(true);
+ }
+ else if(!plan.getAutoCommitValue() && txnManager.getAutoCommit()) {
+ txnManager.setAutoCommit(false);
+ }
+ else {/*didn't change autoCommit value - no-op*/}
+ }
+ catch(LockException e) {
+ return handleHiveException(e, 12);
+ }
+ }
+
if (requiresLock()) {
// a checkpoint to see if the thread is interrupted or not before an expensive operation
if (isInterrupted()) {
ret = handleInterruption("at acquiring the lock.");
} else {
- ret = acquireLocks();
+ ret = acquireLocksAndOpenTxn(startTxnImplicitly);
}
if (ret != 0) {
return rollback(createProcessorResponse(ret));
@@ -1550,8 +1543,7 @@ public class Driver implements CommandProcessor {
//if needRequireLock is false, the release here will do nothing because there is no lock
try {
- //since set autocommit starts an implicit txn, close it
- if(txnManager.isImplicitTransactionOpen() || plan.getOperation() == HiveOperation.COMMIT) {
+ if(txnManager.getAutoCommit() || plan.getOperation() == HiveOperation.COMMIT) {
releaseLocksAndCommitOrRollback(true, null);
}
else if(plan.getOperation() == HiveOperation.ROLLBACK) {
@@ -1720,16 +1712,37 @@ public class Driver implements CommandProcessor {
private CommandProcessorResponse createProcessorResponse(int ret) {
SessionState.getPerfLogger().cleanupPerfLogMetrics();
queryDisplay.setErrorMessage(errorMessage);
- if(downstreamError != null && downstreamError instanceof HiveException) {
- ErrorMsg em = ((HiveException)downstreamError).getCanonicalErrorMsg();
- if(em != null) {
- return new CommandProcessorResponse(ret, errorMessage, SQLState,
- schema, downstreamError, em.getErrorCode(), null);
- }
- }
return new CommandProcessorResponse(ret, errorMessage, SQLState, downstreamError);
}
+ /**
+ * Returns a set of hooks specified in a configuration variable.
+ * See getHooks(HiveConf.ConfVars hookConfVar, Class<T> clazz)
+ */
+ private List<Hook> getHooks(HiveConf.ConfVars hookConfVar) throws Exception {
+ return getHooks(hookConfVar, Hook.class);
+ }
+
+ /**
+ * Returns the hooks specified in a configuration variable.
+ *
+ * @param hookConfVar The configuration variable specifying a comma separated list of the hook
+ * class names.
+ * @param clazz The super type of the hooks.
+ * @return A list of the hooks cast as the type specified in clazz, in the order
+ * they are listed in the value of hookConfVar
+ * @throws Exception
+ */
+ private <T extends Hook> List<T> getHooks(ConfVars hookConfVar,
+ Class<T> clazz) throws Exception {
+ try {
+ return HookUtils.getHooks(conf, hookConfVar, clazz);
+ } catch (ClassNotFoundException e) {
+ console.printError(hookConfVar.varname + " Class not found:" + e.getMessage());
+ throw e;
+ }
+ }
+
public int execute() throws CommandNeedRetryException {
return execute(false);
}
@@ -1794,7 +1807,7 @@ public class Driver implements CommandProcessor {
ss.getSessionId(), Thread.currentThread().getName(), ss.isHiveServerQuery(), perfLogger);
hookContext.setHookType(HookContext.HookType.PRE_EXEC_HOOK);
- for (Hook peh : hooksLoader.getHooks(HiveConf.ConfVars.PREEXECHOOKS, console)) {
+ for (Hook peh : getHooks(HiveConf.ConfVars.PREEXECHOOKS)) {
if (peh instanceof ExecuteWithHookContext) {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.PRE_HOOK + peh.getClass().getName());
@@ -1812,7 +1825,16 @@ public class Driver implements CommandProcessor {
}
// Trigger query hooks before query execution.
- queryLifeTimeHookRunner.runBeforeExecutionHook(queryStr, hookContext);
+ if (queryHooks != null && !queryHooks.isEmpty()) {
+ QueryLifeTimeHookContext qhc = new QueryLifeTimeHookContextImpl();
+ qhc.setHiveConf(conf);
+ qhc.setCommand(queryStr);
+ qhc.setHookContext(hookContext);
+
+ for (QueryLifeTimeHook hook : queryHooks) {
+ hook.beforeExecution(qhc);
+ }
+ }
setQueryDisplays(plan.getRootTasks());
int mrJobs = Utilities.getMRTasks(plan.getRootTasks()).size();
@@ -1837,7 +1859,7 @@ public class Driver implements CommandProcessor {
// The main thread polls the TaskRunners to check if they have finished.
if (isInterrupted()) {
- return handleInterruptionWithHook("before running tasks.", hookContext, perfLogger);
+ return handleInterruption("before running tasks.");
}
DriverContext driverCxt = new DriverContext(ctx);
driverCxt.prepare(plan);
@@ -1887,7 +1909,7 @@ public class Driver implements CommandProcessor {
int exitVal = result.getExitVal();
if (isInterrupted()) {
- return handleInterruptionWithHook("when checking the execution result.", hookContext, perfLogger);
+ return handleInterruption("when checking the execution result.");
}
if (exitVal != 0) {
if (tsk.ifRetryCmdWhenFail()) {
@@ -1912,9 +1934,6 @@ public class Driver implements CommandProcessor {
} else {
setErrorMsgAndDetail(exitVal, result.getTaskError(), tsk);
- if (driverCxt.isShutdown()) {
- errorMessage = "FAILED: Operation cancelled. " + errorMessage;
- }
invokeFailureHooks(perfLogger, hookContext,
errorMessage + Strings.nullToEmpty(tsk.getDiagnosticsMessage()), result.getTaskError());
SQLState = "08S01";
@@ -1973,7 +1992,7 @@ public class Driver implements CommandProcessor {
hookContext.setHookType(HookContext.HookType.POST_EXEC_HOOK);
// Get all the post execution hooks and execute them.
- for (Hook peh : hooksLoader.getHooks(HiveConf.ConfVars.POSTEXECHOOKS, console)) {
+ for (Hook peh : getHooks(HiveConf.ConfVars.POSTEXECHOOKS)) {
if (peh instanceof ExecuteWithHookContext) {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.POST_HOOK + peh.getClass().getName());
@@ -2003,7 +2022,7 @@ public class Driver implements CommandProcessor {
} catch (Throwable e) {
executionError = true;
if (isInterrupted()) {
- return handleInterruptionWithHook("during query execution: \n" + e.getMessage(), hookContext, perfLogger);
+ return handleInterruption("during query execution: \n" + e.getMessage());
}
ctx.restoreOriginalTracker();
@@ -2028,7 +2047,16 @@ public class Driver implements CommandProcessor {
} finally {
// Trigger query hooks after query completes its execution.
try {
- queryLifeTimeHookRunner.runAfterExecutionHook(queryStr, hookContext, executionError);
+ if (queryHooks != null && !queryHooks.isEmpty()) {
+ QueryLifeTimeHookContext qhc = new QueryLifeTimeHookContextImpl();
+ qhc.setHiveConf(conf);
+ qhc.setCommand(queryStr);
+ qhc.setHookContext(hookContext);
+
+ for (QueryLifeTimeHook hook : queryHooks) {
+ hook.afterExecution(qhc, executionError);
+ }
+ }
} catch (Exception e) {
LOG.warn("Failed when invoking query after execution hook", e);
}
@@ -2119,6 +2147,13 @@ public class Driver implements CommandProcessor {
}
String warning = HiveConf.generateMrDeprecationWarning();
LOG.warn(warning);
+ warning = "WARNING: " + warning;
+ console.printInfo(warning);
+ // Propagate warning to beeline via operation log.
+ OperationLog ol = OperationLog.getCurrentOperationLog();
+ if (ol != null) {
+ ol.writeOperationLog(LoggingLevel.EXECUTION, warning + "\n");
+ }
}
private void setErrorMsgAndDetail(int exitVal, Throwable downstreamError, Task tsk) {
@@ -2143,7 +2178,7 @@ public class Driver implements CommandProcessor {
hookContext.setErrorMessage(errorMessage);
hookContext.setException(exception);
// Get all the failure execution hooks and execute them.
- for (Hook ofh : hooksLoader.getHooks(HiveConf.ConfVars.ONFAILUREHOOKS, console)) {
+ for (Hook ofh : getHooks(HiveConf.ConfVars.ONFAILUREHOOKS)) {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.FAILURE_HOOK + ofh.getClass().getName());
((ExecuteWithHookContext) ofh).run(hookContext);
@@ -2193,6 +2228,7 @@ public class Driver implements CommandProcessor {
if (LOG.isInfoEnabled()){
LOG.info("Starting task [" + tsk + "] in parallel");
}
+ tskRun.setOperationLog(OperationLog.getCurrentOperationLog());
tskRun.start();
} else {
if (LOG.isInfoEnabled()){
@@ -2409,7 +2445,6 @@ public class Driver implements CommandProcessor {
lDrvState.driverState = DriverState.CLOSED;
} finally {
lDrvState.stateLock.unlock();
- LockedDriverState.removeLockedDriverState();
}
if (SessionState.get() != null) {
SessionState.get().getLineageState().clear();
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index d01a203..6a43385 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -27,7 +27,6 @@ import java.util.regex.Pattern;
import org.antlr.runtime.tree.Tree;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.ASTNodeOrigin;
-import org.apache.hadoop.hive.ql.plan.AlterTableDesc.AlterTableTypes;
/**
* List of all error messages.
@@ -218,7 +217,7 @@ public enum ErrorMsg {
ALTER_COMMAND_FOR_VIEWS(10131, "To alter a view you need to use the ALTER VIEW command."),
ALTER_COMMAND_FOR_TABLES(10132, "To alter a base table you need to use the ALTER TABLE command."),
ALTER_VIEW_DISALLOWED_OP(10133, "Cannot use this form of ALTER on a view"),
- ALTER_TABLE_NON_NATIVE(10134, "ALTER TABLE can only be used for " + AlterTableTypes.nonNativeTableAllowedTypes + " to a non-native table "),
+ ALTER_TABLE_NON_NATIVE(10134, "ALTER TABLE cannot be used for a non-native table"),
SORTMERGE_MAPJOIN_FAILED(10135,
"Sort merge bucketed join could not be performed. " +
"If you really want to perform the operation, either set " +
@@ -411,8 +410,8 @@ public enum ErrorMsg {
INSERT_CANNOT_CREATE_TEMP_FILE(10293, "Unable to create temp file for insert values "),
ACID_OP_ON_NONACID_TXNMGR(10294, "Attempt to do update or delete using transaction manager that" +
" does not support these operations."),
- NO_INSERT_OVERWRITE_WITH_ACID(10295, "INSERT OVERWRITE not allowed on table {0} with OutputFormat " +
- "that implements AcidOutputFormat while transaction manager that supports ACID is in use", true),
+ NO_INSERT_OVERWRITE_WITH_ACID(10295, "INSERT OVERWRITE not allowed on table with OutputFormat " +
+ "that implements AcidOutputFormat while transaction manager that supports ACID is in use"),
VALUES_TABLE_CONSTRUCTOR_NOT_SUPPORTED(10296,
"Values clause with table constructor not yet supported"),
ACID_OP_ON_NONACID_TABLE(10297, "Attempt to do update or delete on table {0} that does not use " +
@@ -482,17 +481,9 @@ public enum ErrorMsg {
"is controlled by hive.exec.max.dynamic.partitions and hive.exec.max.dynamic.partitions.pernode. "),
PARTITION_SCAN_LIMIT_EXCEEDED(20005, "Number of partitions scanned (={0}) on table {1} exceeds limit" +
" (={2}). This is controlled by hive.limit.query.max.table.partition.", true),
- /**
- * {1} is the transaction id;
- * use {@link org.apache.hadoop.hive.common.JavaUtils#txnIdToString(long)} to format
- */
- OP_NOT_ALLOWED_IN_IMPLICIT_TXN(20006, "Operation {0} is not allowed in an implicit transaction ({1}).", true),
- /**
- * {1} is the transaction id;
- * use {@link org.apache.hadoop.hive.common.JavaUtils#txnIdToString(long)} to format
- */
- OP_NOT_ALLOWED_IN_TXN(20007, "Operation {0} is not allowed in a transaction ({1},queryId={2}).", true),
- OP_NOT_ALLOWED_WITHOUT_TXN(20008, "Operation {0} is not allowed without an active transaction", true),
+ OP_NOT_ALLOWED_IN_AUTOCOMMIT(20006, "Operation {0} is not allowed when autoCommit=true.", true),//todo: better SQLState?
+ OP_NOT_ALLOWED_IN_TXN(20007, "Operation {0} is not allowed in a transaction. TransactionID={1}.", true),
+ OP_NOT_ALLOWED_WITHOUT_TXN(20008, "Operation {0} is not allowed since autoCommit=false and there is no active transaction", true),
//========================== 30000 range starts here ========================//
STATSPUBLISHER_NOT_OBTAINED(30000, "StatsPublisher cannot be obtained. " +
"There was a error to retrieve the StatsPublisher, and retrying " +
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/QueryLifeTimeHookRunner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryLifeTimeHookRunner.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryLifeTimeHookRunner.java
deleted file mode 100644
index 85e038c..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/QueryLifeTimeHookRunner.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.hadoop.hive.ql;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import com.google.common.collect.Iterables;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.hooks.HookContext;
-import org.apache.hadoop.hive.ql.hooks.HooksLoader;
-import org.apache.hadoop.hive.ql.hooks.MetricsQueryLifeTimeHook;
-import org.apache.hadoop.hive.ql.hooks.QueryLifeTimeHook;
-import org.apache.hadoop.hive.ql.hooks.QueryLifeTimeHookContext;
-import org.apache.hadoop.hive.ql.hooks.QueryLifeTimeHookContextImpl;
-import org.apache.hadoop.hive.ql.hooks.QueryLifeTimeHookWithParseHooks;
-import org.apache.hadoop.hive.ql.session.SessionState;
-
-
-/**
- * A runner class for {@link QueryLifeTimeHook}s and {@link QueryLifeTimeHookWithParseHooks}. The class has run methods
- * for each phase of a {@link QueryLifeTimeHook} and {@link QueryLifeTimeHookWithParseHooks}. Each run method checks if
- * a list of hooks has be specified, and if so invokes the appropriate callback method of each hook. Each method
- * constructs a {@link QueryLifeTimeHookContext} object and pass it to the callback functions.
- */
-class QueryLifeTimeHookRunner {
-
- private final HiveConf conf;
- private final List<QueryLifeTimeHook> queryHooks;
-
- /**
- * Constructs a {@link QueryLifeTimeHookRunner} that loads all hooks to be run via a {@link HooksLoader}.
- *
- * @param conf the {@link HiveConf} to use when creating {@link QueryLifeTimeHookContext} objects
- * @param hooksLoader the {@link HooksLoader} to use when loading all hooks to be run
- * @param console the {@link SessionState.LogHelper} to use when running {@link HooksLoader#getHooks(HiveConf.ConfVars)}
- */
- QueryLifeTimeHookRunner(HiveConf conf, HooksLoader hooksLoader, SessionState.LogHelper console) {
- this.conf = conf;
- this.queryHooks = new ArrayList<>();
-
- if (conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED)) {
- queryHooks.add(new MetricsQueryLifeTimeHook());
- }
- List<QueryLifeTimeHook> propertyDefinedHoooks;
- try {
- propertyDefinedHoooks = hooksLoader.getHooks(
- HiveConf.ConfVars.HIVE_QUERY_LIFETIME_HOOKS, console);
- } catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) {
- throw new IllegalArgumentException(e);
- }
- if (propertyDefinedHoooks != null) {
- Iterables.addAll(queryHooks, propertyDefinedHoooks);
- }
- }
-
- /**
- * If {@link QueryLifeTimeHookWithParseHooks} have been loaded via the {@link HooksLoader} then invoke the
- * {@link QueryLifeTimeHookWithParseHooks#beforeParse(QueryLifeTimeHookContext)} method for each
- * {@link QueryLifeTimeHookWithParseHooks}.
- *
- * @param command the Hive command that is being run
- */
- void runBeforeParseHook(String command) {
- if (containsHooks()) {
- QueryLifeTimeHookContext qhc = new QueryLifeTimeHookContextImpl.Builder().withHiveConf(conf).withCommand(
- command).build();
-
- for (QueryLifeTimeHook hook : queryHooks) {
- if (hook instanceof QueryLifeTimeHookWithParseHooks) {
- ((QueryLifeTimeHookWithParseHooks) hook).beforeParse(qhc);
- }
- }
- }
- }
-
- /**
- * If {@link QueryLifeTimeHookWithParseHooks} have been loaded via the {@link HooksLoader} then invoke the
- * {@link QueryLifeTimeHookWithParseHooks#afterParse(QueryLifeTimeHookContext, boolean)} method for each
- * {@link QueryLifeTimeHookWithParseHooks}.
- *
- * @param command the Hive command that is being run
- * @param parseError true if there was an error while parsing the command, false otherwise
- */
- void runAfterParseHook(String command, boolean parseError) {
- if (containsHooks()) {
- QueryLifeTimeHookContext qhc = new QueryLifeTimeHookContextImpl.Builder().withHiveConf(conf).withCommand(
- command).build();
-
- for (QueryLifeTimeHook hook : queryHooks) {
- if (hook instanceof QueryLifeTimeHookWithParseHooks) {
- ((QueryLifeTimeHookWithParseHooks) hook).afterParse(qhc, parseError);
- }
- }
- }
- }
-
- /**
- * Invoke the {@link QueryLifeTimeHook#beforeCompile(QueryLifeTimeHookContext)} method for each {@link QueryLifeTimeHook}
- *
- * @param command the Hive command that is being run
- */
- void runBeforeCompileHook(String command) {
- if (containsHooks()) {
- QueryLifeTimeHookContext qhc = new QueryLifeTimeHookContextImpl.Builder().withHiveConf(conf).withCommand(
- command).build();
-
- for (QueryLifeTimeHook hook : queryHooks) {
- hook.beforeCompile(qhc);
- }
- }
- }
-
- /**
- * Invoke the {@link QueryLifeTimeHook#afterCompile(QueryLifeTimeHookContext, boolean)} method for each {@link QueryLifeTimeHook}
- *
- * @param command the Hive command that is being run
- * @param compileError true if there was an error while compiling the command, false otherwise
- */
- void runAfterCompilationHook(String command, boolean compileError) {
- if (containsHooks()) {
- QueryLifeTimeHookContext qhc = new QueryLifeTimeHookContextImpl.Builder().withHiveConf(conf).withCommand(
- command).build();
-
- for (QueryLifeTimeHook hook : queryHooks) {
- hook.afterCompile(qhc, compileError);
- }
- }
- }
-
- /**
- * Invoke the {@link QueryLifeTimeHook#beforeExecution(QueryLifeTimeHookContext)} method for each {@link QueryLifeTimeHook}
- *
- * @param command the Hive command that is being run
- * @param hookContext the {@link HookContext} of the command being run
- */
- void runBeforeExecutionHook(String command, HookContext hookContext) {
- if (containsHooks()) {
- QueryLifeTimeHookContext qhc = new QueryLifeTimeHookContextImpl.Builder().withHiveConf(conf).withCommand(
- command).withHookContext(hookContext).build();
-
- for (QueryLifeTimeHook hook : queryHooks) {
- hook.beforeExecution(qhc);
- }
- }
- }
-
- /**
- * Invoke the {@link QueryLifeTimeHook#afterExecution(QueryLifeTimeHookContext, boolean)} method for each {@link QueryLifeTimeHook}
- *
- * @param command the Hive command that is being run
- * @param hookContext the {@link HookContext} of the command being run
- * @param executionError true if there was an error while executing the command, false otherwise
- */
- void runAfterExecutionHook(String command, HookContext hookContext, boolean executionError) {
- if (containsHooks()) {
- QueryLifeTimeHookContext qhc = new QueryLifeTimeHookContextImpl.Builder().withHiveConf(conf).withCommand(
- command).withHookContext(hookContext).build();
-
- for (QueryLifeTimeHook hook : queryHooks) {
- hook.afterExecution(qhc, executionError);
- }
- }
- }
-
- private boolean containsHooks() {
- return queryHooks != null && !queryHooks.isEmpty();
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
index 2ddabd9..e8c8ae6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
@@ -35,7 +35,6 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
-import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hive.metastore.api.Schema;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
import org.apache.hadoop.hive.ql.exec.ExplainTask;
@@ -49,7 +48,6 @@ import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.ColumnAccessInfo;
import org.apache.hadoop.hive.ql.parse.TableAccessInfo;
-import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReducerTimeStatsPerJob;
@@ -107,19 +105,11 @@ public class QueryPlan implements Serializable {
private transient Long queryStartTime;
private final HiveOperation operation;
- private final boolean acidResourcesInQuery;
- private final Set<FileSinkDesc> acidSinks;
private Boolean autoCommitValue;
public QueryPlan() {
- this(null);
- }
- @VisibleForTesting
- protected QueryPlan(HiveOperation command) {
- this.reducerTimeStatsPerJobList = new ArrayList<>();
- this.operation = command;
- this.acidResourcesInQuery = false;
- this.acidSinks = Collections.emptySet();
+ this.reducerTimeStatsPerJobList = new ArrayList<ReducerTimeStatsPerJob>();
+ operation = null;
}
public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, String queryId,
@@ -146,22 +136,8 @@ public class QueryPlan implements Serializable {
this.operation = operation;
this.autoCommitValue = sem.getAutoCommitValue();
this.resultSchema = resultSchema;
- this.acidResourcesInQuery = sem.hasAcidInQuery();
- this.acidSinks = sem.getAcidFileSinks();
}
- /**
- * @return true if any acid resources are read/written
- */
- public boolean hasAcidResourcesInQuery() {
- return acidResourcesInQuery;
- }
- /**
- * @return Collection of FileSinkDesc representing writes to Acid resources
- */
- Set<FileSinkDesc> getAcidSinks() {
- return acidSinks;
- }
public String getQueryStr() {
return queryString;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/ArchiveUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ArchiveUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ArchiveUtils.java
index f7fad94..6381a21 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ArchiveUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ArchiveUtils.java
@@ -28,6 +28,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
@@ -38,6 +40,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.shims.HadoopShims;
/**
* ArchiveUtils.
@@ -45,7 +48,9 @@ import org.apache.hadoop.hive.ql.metadata.Table;
*/
@SuppressWarnings("nls")
public final class ArchiveUtils {
- public static final String ARCHIVING_LEVEL = "archiving_level";
+ private static final Logger LOG = LoggerFactory.getLogger(ArchiveUtils.class.getName());
+
+ public static String ARCHIVING_LEVEL = "archiving_level";
/**
* PartSpecInfo keeps fields and values extracted from partial partition info
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java
index bb8dcbb..e3da7f0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.exec;
import java.io.Serializable;
-import org.apache.hadoop.hive.common.StringInternUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
@@ -97,7 +96,7 @@ public class ColumnInfo implements Serializable {
this.tabAlias = tabAlias;
this.isVirtualCol = isVirtualCol;
this.isHiddenVirtualCol = isHiddenVirtualCol;
- setTypeName(getType().getTypeName());
+ this.typeName = getType().getTypeName();
}
public ColumnInfo(ColumnInfo columnInfo) {
@@ -115,7 +114,7 @@ public class ColumnInfo implements Serializable {
}
public void setTypeName(String typeName) {
- this.typeName = StringInternUtils.internIfNotNull(typeName);
+ this.typeName = typeName;
}
public TypeInfo getType() {
@@ -161,7 +160,7 @@ public class ColumnInfo implements Serializable {
}
public void setAlias(String col_alias) {
- alias = StringInternUtils.internIfNotNull(col_alias);
+ alias = col_alias;
}
public String getAlias() {
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java
index d96f432..a899964 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java
@@ -110,12 +110,8 @@ public class ColumnStatsTask extends Task<ColumnStatsWork> implements Serializab
}
}
- @SuppressWarnings("serial")
- class UnsupportedDoubleException extends Exception {
- }
-
private void unpackDoubleStats(ObjectInspector oi, Object o, String fName,
- ColumnStatisticsObj statsObj) throws UnsupportedDoubleException {
+ ColumnStatisticsObj statsObj) {
if (fName.equals("countnulls")) {
long v = ((LongObjectInspector) oi).get(o);
statsObj.getStatsData().getDoubleStats().setNumNulls(v);
@@ -124,15 +120,9 @@ public class ColumnStatsTask extends Task<ColumnStatsWork> implements Serializab
statsObj.getStatsData().getDoubleStats().setNumDVs(v);
} else if (fName.equals("max")) {
double d = ((DoubleObjectInspector) oi).get(o);
- if (Double.isInfinite(d) || Double.isNaN(d)) {
- throw new UnsupportedDoubleException();
- }
statsObj.getStatsData().getDoubleStats().setHighValue(d);
} else if (fName.equals("min")) {
double d = ((DoubleObjectInspector) oi).get(o);
- if (Double.isInfinite(d) || Double.isNaN(d)) {
- throw new UnsupportedDoubleException();
- }
statsObj.getStatsData().getDoubleStats().setLowValue(d);
} else if (fName.equals("ndvbitvector")) {
PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi;
@@ -244,7 +234,7 @@ public class ColumnStatsTask extends Task<ColumnStatsWork> implements Serializab
}
private void unpackPrimitiveObject (ObjectInspector oi, Object o, String fieldName,
- ColumnStatisticsObj statsObj) throws UnsupportedDoubleException {
+ ColumnStatisticsObj statsObj) {
if (o == null) {
return;
}
@@ -304,7 +294,7 @@ public class ColumnStatsTask extends Task<ColumnStatsWork> implements Serializab
}
private void unpackStructObject(ObjectInspector oi, Object o, String fName,
- ColumnStatisticsObj cStatsObj) throws UnsupportedDoubleException {
+ ColumnStatisticsObj cStatsObj) {
if (oi.getCategory() != ObjectInspector.Category.STRUCT) {
throw new RuntimeException("Invalid object datatype : " + oi.getCategory().toString());
}
@@ -361,13 +351,8 @@ public class ColumnStatsTask extends Task<ColumnStatsWork> implements Serializab
ColumnStatisticsObj statsObj = new ColumnStatisticsObj();
statsObj.setColName(colName.get(i));
statsObj.setColType(colType.get(i));
- try {
- unpackStructObject(foi, f, fieldName, statsObj);
- statsObjs.add(statsObj);
- } catch (UnsupportedDoubleException e) {
- // due to infinity or nan.
- LOG.info("Because " + colName.get(i) + " is infinite or NaN, we skip stats.");
- }
+ unpackStructObject(foi, f, fieldName, statsObj);
+ statsObjs.add(statsObj);
}
if (!isTblLevel) {
@@ -386,9 +371,7 @@ public class ColumnStatsTask extends Task<ColumnStatsWork> implements Serializab
ColumnStatistics colStats = new ColumnStatistics();
colStats.setStatsDesc(statsDesc);
colStats.setStatsObj(statsObjs);
- if (!statsObjs.isEmpty()) {
- stats.add(colStats);
- }
+ stats.add(colStats);
}
ftOp.clearFetchContext();
return stats;
@@ -415,9 +398,6 @@ public class ColumnStatsTask extends Task<ColumnStatsWork> implements Serializab
List<ColumnStatistics> colStats = constructColumnStatsFromPackedRows(db);
// Persist the column statistics object to the metastore
// Note, this function is shared for both table and partition column stats.
- if (colStats.isEmpty()) {
- return 0;
- }
SetPartitionsStatsRequest request = new SetPartitionsStatsRequest(colStats);
if (work.getColStats() != null && work.getColStats().getNumBitVector() > 0) {
request.setNeedMerge(true);
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
index 82f6074..e8526f6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
@@ -80,7 +80,8 @@ public class CopyTask extends Task<CopyWork> implements Serializable {
}
}
- if (!FileUtils.mkdir(dstFs, toPath, conf)) {
+ boolean inheritPerms = conf.getBoolVar(HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
+ if (!FileUtils.mkdir(dstFs, toPath, inheritPerms, conf)) {
console.printError("Cannot make target directory: " + toPath.toString());
return 2;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 81e4744..a1a0862 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -3330,30 +3330,20 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
if (tbl.isPartitioned() && part == null) {
// No partitioned specified for partitioned table, lets fetch all.
Map<String,String> tblProps = tbl.getParameters() == null ? new HashMap<String,String>() : tbl.getParameters();
- Map<String, Long> valueMap = new HashMap<>();
- Map<String, Boolean> stateMap = new HashMap<>();
- for (String stat : StatsSetupConst.supportedStats) {
- valueMap.put(stat, 0L);
- stateMap.put(stat, true);
- }
PartitionIterable parts = new PartitionIterable(db, tbl, null, conf.getIntVar(HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX));
- int numParts = 0;
- for (Partition partition : parts) {
- Map<String, String> props = partition.getParameters();
- Boolean state = StatsSetupConst.areBasicStatsUptoDate(props);
- for (String stat : StatsSetupConst.supportedStats) {
- stateMap.put(stat, stateMap.get(stat) && state);
+ for (String stat : StatsSetupConst.supportedStats) {
+ boolean state = true;
+ long statVal = 0l;
+ for (Partition partition : parts) {
+ Map<String,String> props = partition.getParameters();
+ state &= StatsSetupConst.areBasicStatsUptoDate(props);
if (props != null && props.get(stat) != null) {
- valueMap.put(stat, valueMap.get(stat) + Long.parseLong(props.get(stat)));
+ statVal += Long.parseLong(props.get(stat));
}
}
- numParts++;
+ StatsSetupConst.setBasicStatsState(tblProps, Boolean.toString(state));
+ tblProps.put(stat, String.valueOf(statVal));
}
- for (String stat : StatsSetupConst.supportedStats) {
- StatsSetupConst.setBasicStatsState(tblProps, Boolean.toString(stateMap.get(stat)));
- tblProps.put(stat, valueMap.get(stat).toString());
- }
- tblProps.put(StatsSetupConst.NUM_PARTITIONS, Integer.toString(numParts));
tbl.setParameters(tblProps);
}
} else {
@@ -4876,8 +4866,32 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
String tableName = truncateTableDesc.getTableName();
Map<String, String> partSpec = truncateTableDesc.getPartSpec();
+ Table table = db.getTable(tableName, true);
+
try {
- db.truncateTable(tableName, partSpec);
+ // this is not transactional
+ for (Path location : getLocations(db, table, partSpec)) {
+ FileSystem fs = location.getFileSystem(conf);
+ HadoopShims.HdfsEncryptionShim shim
+ = ShimLoader.getHadoopShims().createHdfsEncryptionShim(fs, conf);
+ if (!shim.isPathEncrypted(location)) {
+ HdfsUtils.HadoopFileStatus status = new HdfsUtils.HadoopFileStatus(conf, fs, location);
+ FileStatus targetStatus = fs.getFileStatus(location);
+ String targetGroup = targetStatus == null ? null : targetStatus.getGroup();
+ FileUtils.moveToTrash(fs, location, conf);
+ fs.mkdirs(location);
+ HdfsUtils.setFullFileStatus(conf, status, targetGroup, fs, location, false);
+ } else {
+ FileStatus[] statuses = fs.listStatus(location, FileUtils.HIDDEN_FILES_PATH_FILTER);
+ if (statuses == null || statuses.length == 0) {
+ continue;
+ }
+ boolean success = Hive.trashFiles(fs, statuses, conf);
+ if (!success) {
+ throw new HiveException("Error in deleting the contents of " + location.toString());
+ }
+ }
+ }
} catch (Exception e) {
throw new HiveException(e, ErrorMsg.GENERIC_ERROR);
}
@@ -4908,6 +4922,58 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
return 0;
}
+ private List<Path> getLocations(Hive db, Table table, Map<String, String> partSpec)
+ throws HiveException, InvalidOperationException {
+ List<Path> locations = new ArrayList<Path>();
+ if (partSpec == null) {
+ if (table.isPartitioned()) {
+ for (Partition partition : db.getPartitions(table)) {
+ locations.add(partition.getDataLocation());
+ EnvironmentContext environmentContext = new EnvironmentContext();
+ if (needToUpdateStats(partition.getParameters(), environmentContext)) {
+ db.alterPartition(table.getDbName(), table.getTableName(), partition, environmentContext);
+ }
+ }
+ } else {
+ locations.add(table.getPath());
+ EnvironmentContext environmentContext = new EnvironmentContext();
+ if (needToUpdateStats(table.getParameters(), environmentContext)) {
+ db.alterTable(table.getDbName()+"."+table.getTableName(), table, environmentContext);
+ }
+ }
+ } else {
+ for (Partition partition : db.getPartitionsByNames(table, partSpec)) {
+ locations.add(partition.getDataLocation());
+ EnvironmentContext environmentContext = new EnvironmentContext();
+ if (needToUpdateStats(partition.getParameters(), environmentContext)) {
+ db.alterPartition(table.getDbName(), table.getTableName(), partition, environmentContext);
+ }
+ }
+ }
+ return locations;
+ }
+
+ private boolean needToUpdateStats(Map<String,String> props, EnvironmentContext environmentContext) {
+ if (null == props) {
+ return false;
+ }
+ boolean statsPresent = false;
+ for (String stat : StatsSetupConst.supportedStats) {
+ String statVal = props.get(stat);
+ if (statVal != null && Long.parseLong(statVal) > 0) {
+ statsPresent = true;
+ //In the case of truncate table, we set the stats to be 0.
+ props.put(stat, "0");
+ }
+ }
+ //first set basic stats to true
+ StatsSetupConst.setBasicStatsState(props, StatsSetupConst.TRUE);
+ environmentContext.putToProperties(StatsSetupConst.STATS_GENERATED, StatsSetupConst.TASK);
+ //then invalidate column stats
+ StatsSetupConst.clearColumnStatsState(props);
+ return statsPresent;
+ }
+
@Override
public StageType getType() {
return StageType.DDL;
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
index 4c24ab4..d35e3ba 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
@@ -54,11 +54,9 @@ import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
-import org.apache.hadoop.hive.ql.exec.vector.VectorReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
-import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkCommonOperator;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
@@ -797,12 +795,9 @@ public class ExplainTask extends Task<ExplainWork> implements Serializable {
if (jsonOut != null && jsonOut.length() > 0) {
((JSONObject) jsonOut.get(JSONObject.getNames(jsonOut)[0])).put("OperatorId:",
operator.getOperatorId());
- if (!this.work.isUserLevelExplain()
- && this.work.isFormatted()
- && (operator instanceof ReduceSinkOperator
- || operator instanceof VectorReduceSinkOperator || operator instanceof VectorReduceSinkCommonOperator)) {
- List<String> outputOperators = ((ReduceSinkDesc) operator.getConf())
- .getOutputOperators();
+ if (!this.work.isUserLevelExplain() && this.work.isFormatted()
+ && operator instanceof ReduceSinkOperator) {
+ List<String> outputOperators = ((ReduceSinkOperator) operator).getConf().getOutputOperators();
if (outputOperators != null) {
((JSONObject) jsonOut.get(JSONObject.getNames(jsonOut)[0])).put(OUTPUT_OPERATORS,
Arrays.toString(outputOperators.toArray()));
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeConstantDefaultEvaluator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeConstantDefaultEvaluator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeConstantDefaultEvaluator.java
new file mode 100644
index 0000000..f53c3e3
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeConstantDefaultEvaluator.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDefaultDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+/**
+ * ExprNodeConstantEvaluator.
+ *
+ */
+public class ExprNodeConstantDefaultEvaluator extends ExprNodeEvaluator<ExprNodeConstantDefaultDesc> {
+
+ transient ObjectInspector writableObjectInspector;
+
+ public ExprNodeConstantDefaultEvaluator(ExprNodeConstantDefaultDesc expr) {
+ this(expr, null);
+ }
+
+ public ExprNodeConstantDefaultEvaluator(ExprNodeConstantDefaultDesc expr, Configuration conf) {
+ super(expr, conf);
+ writableObjectInspector = expr.getWritableObjectInspector();
+ }
+
+ @Override
+ public ObjectInspector initialize(ObjectInspector rowInspector) throws HiveException {
+ return writableObjectInspector;
+ }
+
+ @Override
+ protected Object _evaluate(Object row, int version) throws HiveException {
+ return expr;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorFactory.java
index cc40cae..34aec55 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorFactory.java
@@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDefaultDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicValueDesc;
@@ -49,6 +50,11 @@ public final class ExprNodeEvaluatorFactory {
return new ExprNodeConstantEvaluator((ExprNodeConstantDesc) desc, conf);
}
+ // Special 'default' constant node
+ if (desc instanceof ExprNodeConstantDefaultDesc) {
+ return new ExprNodeConstantDefaultEvaluator((ExprNodeConstantDefaultDesc) desc);
+ }
+
// Column-reference node, e.g. a column in the input row
if (desc instanceof ExprNodeColumnDesc) {
return new ExprNodeColumnEvaluator((ExprNodeColumnDesc) desc, conf);
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
index a3e4c9f..4102d02 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -394,9 +393,6 @@ public class FetchOperator implements Serializable {
inputSplits = splitSampling(work.getSplitSample(), inputSplits);
}
if (inputSplits.length > 0) {
- if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_IN_TEST)) {
- Arrays.sort(inputSplits, new FetchInputFormatSplitComparator());
- }
return inputSplits;
}
}
@@ -742,18 +738,6 @@ public class FetchOperator implements Serializable {
}
}
- private static class FetchInputFormatSplitComparator implements Comparator<FetchInputFormatSplit> {
- @Override
- public int compare(FetchInputFormatSplit a, FetchInputFormatSplit b) {
- final Path ap = a.getPath();
- final Path bp = b.getPath();
- if (ap != null) {
- return (ap.compareTo(bp));
- }
- return Long.signum(a.getLength() - b.getLength());
- }
- }
-
public Configuration getJobConf() {
return job;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 8e74b2e..3ad1733 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -44,7 +44,6 @@ import org.apache.hadoop.hive.common.HiveStatsUtils;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.common.ValidWriteIds;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConfUtil;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.ErrorMsg;
@@ -148,6 +147,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
protected transient long cntr = 1;
protected transient long logEveryNRows = 0;
protected transient int rowIndex = 0;
+ private transient boolean inheritPerms = false;
/**
* Counters.
*/
@@ -256,7 +256,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
if ((bDynParts || isSkewedStoredAsSubDirectories)
&& !fs.exists(finalPaths[idx].getParent())) {
Utilities.LOG14535.info("commit making path for dyn/skew: " + finalPaths[idx].getParent());
- FileUtils.mkdir(fs, finalPaths[idx].getParent(), hconf);
+ FileUtils.mkdir(fs, finalPaths[idx].getParent(), inheritPerms, hconf);
}
// If we're updating or deleting there may be no file to close. This can happen
// because the where clause strained out all of the records for a given bucket. So
@@ -501,6 +501,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
serializer = (Serializer) conf.getTableInfo().getDeserializerClass().newInstance();
serializer.initialize(unsetNestedColumnPaths(hconf), conf.getTableInfo().getProperties());
outputClass = serializer.getSerializedClass();
+ inheritPerms = HiveConf.getBoolVar(hconf, ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
if (isLogInfoEnabled) {
LOG.info("Using serializer : " + serializer + " and formatter : " + hiveOutputFormat +
@@ -600,10 +601,13 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
}
private void logOutputFormatError(Configuration hconf, HiveException ex) {
- StringBuilder errorWriter = new StringBuilder();
+ StringWriter errorWriter = new StringWriter();
errorWriter.append("Failed to create output format; configuration: ");
- // redact sensitive information before logging
- HiveConfUtil.dumpConfig(hconf, errorWriter);
+ try {
+ Configuration.dumpConfiguration(hconf, errorWriter);
+ } catch (IOException ex2) {
+ errorWriter.append("{ failed to dump configuration: " + ex2.getMessage() + " }");
+ }
Properties tdp = null;
if (this.conf.getTableInfo() != null
&& (tdp = this.conf.getTableInfo().getProperties()) != null) {
@@ -735,7 +739,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
conf.getWriteType() == AcidUtils.Operation.INSERT_ONLY) {
Path outPath = fsp.outPaths[filesIdx];
if ((conf.getWriteType() == AcidUtils.Operation.INSERT_ONLY || conf.isMmTable())
- && !FileUtils.mkdir(fs, outPath.getParent(), hconf)) {
+ && inheritPerms && !FileUtils.mkdir(fs, outPath.getParent(), inheritPerms, hconf)) {
LOG.warn("Unable to create directory with inheritPerms: " + outPath);
}
fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter(jc, conf.getTableInfo(),
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
index 1b556ac..4ac25c2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
@@ -67,7 +67,7 @@ import org.apache.hadoop.hive.ql.udf.UDFFromUnixTime;
import org.apache.hadoop.hive.ql.udf.UDFHex;
import org.apache.hadoop.hive.ql.udf.UDFHour;
import org.apache.hadoop.hive.ql.udf.UDFJson;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLength;
+import org.apache.hadoop.hive.ql.udf.UDFLength;
import org.apache.hadoop.hive.ql.udf.UDFLike;
import org.apache.hadoop.hive.ql.udf.UDFLn;
import org.apache.hadoop.hive.ql.udf.UDFLog;
@@ -262,18 +262,13 @@ public final class FunctionRegistry {
system.registerGenericUDF("trim", GenericUDFTrim.class);
system.registerGenericUDF("ltrim", GenericUDFLTrim.class);
system.registerGenericUDF("rtrim", GenericUDFRTrim.class);
- system.registerGenericUDF("length", GenericUDFLength.class);
- system.registerGenericUDF("character_length", GenericUDFCharacterLength.class);
- system.registerGenericUDF("char_length", GenericUDFCharacterLength.class);
- system.registerGenericUDF("octet_length", GenericUDFOctetLength.class);
+ system.registerUDF("length", UDFLength.class, false);
system.registerUDF("reverse", UDFReverse.class, false);
system.registerGenericUDF("field", GenericUDFField.class);
system.registerUDF("find_in_set", UDFFindInSet.class, false);
system.registerGenericUDF("initcap", GenericUDFInitCap.class);
system.registerUDF("like", UDFLike.class, true);
- system.registerGenericUDF("likeany", GenericUDFLikeAny.class);
- system.registerGenericUDF("likeall", GenericUDFLikeAll.class);
system.registerGenericUDF("rlike", GenericUDFRegExp.class);
system.registerGenericUDF("regexp", GenericUDFRegExp.class);
system.registerUDF("regexp_replace", UDFRegExpReplace.class, false);
@@ -423,16 +418,6 @@ public final class FunctionRegistry {
system.registerGenericUDAF("covar_pop", new GenericUDAFCovariance());
system.registerGenericUDAF("covar_samp", new GenericUDAFCovarianceSample());
system.registerGenericUDAF("corr", new GenericUDAFCorrelation());
- system.registerGenericUDAF("regr_slope", new GenericUDAFBinarySetFunctions.RegrSlope());
- system.registerGenericUDAF("regr_intercept", new GenericUDAFBinarySetFunctions.RegrIntercept());
- system.registerGenericUDAF("regr_r2", new GenericUDAFBinarySetFunctions.RegrR2());
- system.registerGenericUDAF("regr_sxx", new GenericUDAFBinarySetFunctions.RegrSXX());
- system.registerGenericUDAF("regr_syy", new GenericUDAFBinarySetFunctions.RegrSYY());
- system.registerGenericUDAF("regr_sxy", new GenericUDAFBinarySetFunctions.RegrSXY());
- system.registerGenericUDAF("regr_avgx", new GenericUDAFBinarySetFunctions.RegrAvgX());
- system.registerGenericUDAF("regr_avgy", new GenericUDAFBinarySetFunctions.RegrAvgY());
- system.registerGenericUDAF("regr_count", new GenericUDAFBinarySetFunctions.RegrCount());
-
system.registerGenericUDAF("histogram_numeric", new GenericUDAFHistogramNumeric());
system.registerGenericUDAF("percentile_approx", new GenericUDAFPercentileApprox());
system.registerGenericUDAF("collect_set", new GenericUDAFCollectSet());
@@ -459,7 +444,6 @@ public final class FunctionRegistry {
system.registerGenericUDF("struct", GenericUDFStruct.class);
system.registerGenericUDF("named_struct", GenericUDFNamedStruct.class);
system.registerGenericUDF("create_union", GenericUDFUnion.class);
- system.registerGenericUDF("extract_union", GenericUDFExtractUnion.class);
system.registerGenericUDF("case", GenericUDFCase.class);
system.registerGenericUDF("when", GenericUDFWhen.class);
@@ -483,7 +467,6 @@ public final class FunctionRegistry {
system.registerGenericUDF("greatest", GenericUDFGreatest.class);
system.registerGenericUDF("least", GenericUDFLeast.class);
system.registerGenericUDF("cardinality_violation", GenericUDFCardinalityViolation.class);
- system.registerGenericUDF("width_bucket", GenericUDFWidthBucket.class);
system.registerGenericUDF("from_utc_timestamp", GenericUDFFromUtcTimestamp.class);
system.registerGenericUDF("to_utc_timestamp", GenericUDFToUtcTimestamp.class);
@@ -781,7 +764,7 @@ public final class FunctionRegistry {
*
* @return null if no common class could be found.
*/
- public static synchronized TypeInfo getCommonClassForComparison(TypeInfo a, TypeInfo b) {
+ public static TypeInfo getCommonClassForComparison(TypeInfo a, TypeInfo b) {
// If same return one of them
if (a.equals(b)) {
return a;
@@ -1492,20 +1475,6 @@ public final class FunctionRegistry {
}
/**
- * Returns whether the fn is an exact equality comparison.
- */
- public static boolean isEq(GenericUDF fn) {
- return fn instanceof GenericUDFOPEqual;
- }
-
- /**
- * Returns whether the fn is an exact non-equality comparison.
- */
- public static boolean isNeq(GenericUDF fn) {
- return fn instanceof GenericUDFOPNotEqual;
- }
-
- /**
* Returns whether the exprNodeDesc is a node of "positive".
*/
public static boolean isOpPositive(ExprNodeDesc desc) {
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
index f8b55da..6d6c608 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
@@ -33,7 +33,6 @@ import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.LlapDaemonInfo;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.OpParseContext;
@@ -403,8 +402,8 @@ public class GroupByOperator extends Operator<GroupByDesc> {
newKeys = keyWrapperFactory.getKeyWrapper();
isTez = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez");
- isLlap = LlapDaemonInfo.INSTANCE.isLlap();
- numExecutors = isLlap ? LlapDaemonInfo.INSTANCE.getNumExecutors() : 1;
+ isLlap = isTez && HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE).equals("llap");
+ numExecutors = isLlap ? HiveConf.getIntVar(hconf, HiveConf.ConfVars.LLAP_DAEMON_NUM_EXECUTORS) : 1;
firstRow = true;
// estimate the number of hash table entries based on the size of each
// entry. Since the size of a entry
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index 56be518..29b72a0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -153,7 +153,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
throw new HiveException("Target " + targetPath + " is not a local directory.");
}
} else {
- if (!FileUtils.mkdir(dstFs, targetPath, conf)) {
+ if (!FileUtils.mkdir(dstFs, targetPath, false, conf)) {
throw new HiveException("Failed to create local target directory " + targetPath);
}
}
@@ -182,6 +182,9 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
actualPath = actualPath.getParent();
}
fs.mkdirs(mkDirPath);
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS)) {
+ HdfsUtils.setFullFileStatus(conf, new HdfsUtils.HadoopFileStatus(conf, fs, actualPath), fs, mkDirPath, true);
+ }
}
return deletePath;
}
@@ -415,7 +418,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
Utilities.LOG14535.info("loadPartition called from " + tbd.getSourcePath()
+ " into " + tbd.getTable().getTableName());
boolean isCommitMmWrite = tbd.isCommitMmWrite();
- db.loadPartition(tbd.getSourcePath(), tbd.getTable().getTableName(),
+ db.loadSinglePartition(tbd.getSourcePath(), tbd.getTable().getTableName(),
tbd.getPartitionSpec(), tbd.getReplace(),
tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd), work.isSrcLocal(),
(work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID &&