You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/07/31 02:43:16 UTC
[16/43] hive git commit: HIVE-11077 Add support in parser and wire up
to txn manager (Eugene Koifman, reviewed by Alan Gates)
HIVE-11077 Add support in parser and wire up to txn manager (Eugene Koifman, reviewed by Alan Gates)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e57c3602
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e57c3602
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e57c3602
Branch: refs/heads/spark
Commit: e57c3602b831340519d5d004cf4119da2f3e7ef8
Parents: 2240dbd
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Wed Jul 22 12:44:40 2015 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Wed Jul 22 12:44:40 2015 -0700
----------------------------------------------------------------------
.../hadoop/hive/cli/TestOptionsProcessor.java | 1 -
.../hadoop/hive/common/ValidReadTxnList.java | 2 +-
.../hadoop/hive/metastore/txn/TxnHandler.java | 9 +-
.../metastore/txn/ValidCompactorTxnList.java | 2 +-
.../java/org/apache/hadoop/hive/ql/Context.java | 1 -
.../java/org/apache/hadoop/hive/ql/Driver.java | 196 +++++---
.../org/apache/hadoop/hive/ql/ErrorMsg.java | 2 +-
.../org/apache/hadoop/hive/ql/QueryPlan.java | 18 +-
.../apache/hadoop/hive/ql/exec/MoveTask.java | 2 +-
.../hadoop/hive/ql/lockmgr/DbTxnManager.java | 36 +-
.../hadoop/hive/ql/lockmgr/DummyTxnManager.java | 8 +
.../hadoop/hive/ql/lockmgr/HiveTxnManager.java | 21 +
.../hive/ql/lockmgr/HiveTxnManagerImpl.java | 10 +
.../hadoop/hive/ql/lockmgr/LockException.java | 8 +-
.../hadoop/hive/ql/metadata/HiveException.java | 3 +
.../hive/ql/parse/BaseSemanticAnalyzer.java | 13 +
.../org/apache/hadoop/hive/ql/parse/HiveLexer.g | 11 +
.../apache/hadoop/hive/ql/parse/HiveParser.g | 70 +++
.../hadoop/hive/ql/parse/IdentifiersParser.g | 19 +-
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 25 +-
.../hive/ql/parse/SemanticAnalyzerFactory.java | 12 +
.../hadoop/hive/ql/plan/HiveOperation.java | 32 +-
.../ql/processors/CommandProcessorResponse.java | 21 +-
.../hadoop/hive/ql/processors/HiveCommand.java | 3 +
.../authorization/plugin/HiveOperationType.java | 5 +
.../plugin/sqlstd/Operation2Privilege.java | 11 +
.../hadoop/hive/ql/session/SessionState.java | 34 +-
.../apache/hadoop/hive/ql/TestTxnCommands.java | 473 +++++++++++++++++++
.../positive/TestTransactionStatement.java | 102 ++++
.../hive/ql/session/TestSessionState.java | 2 +-
.../clientnegative/exchange_partition.q.out | 2 +-
.../clientpositive/exchange_partition.q.out | 4 +-
.../clientpositive/exchange_partition2.q.out | 4 +-
.../clientpositive/exchange_partition3.q.out | 4 +-
34 files changed, 1020 insertions(+), 146 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/e57c3602/cli/src/test/org/apache/hadoop/hive/cli/TestOptionsProcessor.java
----------------------------------------------------------------------
diff --git a/cli/src/test/org/apache/hadoop/hive/cli/TestOptionsProcessor.java b/cli/src/test/org/apache/hadoop/hive/cli/TestOptionsProcessor.java
index 9d0399a..ac22ab1 100644
--- a/cli/src/test/org/apache/hadoop/hive/cli/TestOptionsProcessor.java
+++ b/cli/src/test/org/apache/hadoop/hive/cli/TestOptionsProcessor.java
@@ -56,7 +56,6 @@ public class TestOptionsProcessor {
assertEquals("execString", sessionState.execString);
assertEquals(0, sessionState.initFiles.size());
assertTrue(sessionState.getIsVerbose());
- sessionState.setConf(null);
assertTrue(sessionState.getIsSilent());
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e57c3602/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java b/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
index 479e0df..fda242d 100644
--- a/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
+++ b/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
@@ -105,7 +105,7 @@ public class ValidReadTxnList implements ValidTxnList {
@Override
public void readFromString(String src) {
- if (src == null) {
+ if (src == null || src.length() == 0) {
highWatermark = Long.MAX_VALUE;
exceptions = new long[0];
} else {
http://git-wip-us.apache.org/repos/asf/hive/blob/e57c3602/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index fd9c275..c0e83c6 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -380,8 +380,9 @@ public class TxnHandler {
"tc_partition from TXN_COMPONENTS where tc_txnid = " + txnid;
LOG.debug("Going to execute insert <" + s + ">");
if (stmt.executeUpdate(s) < 1) {
- LOG.warn("Expected to move at least one record from txn_components to " +
- "completed_txn_components when committing txn!");
+ //this can be reasonable for an empty txn START/COMMIT
+ LOG.info("Expected to move at least one record from txn_components to " +
+ "completed_txn_components when committing txn! txnid:" + txnid);
}
// Always access TXN_COMPONENTS before HIVE_LOCKS;
@@ -1351,7 +1352,7 @@ public class TxnHandler {
throws NoSuchTxnException, TxnAbortedException, MetaException, SQLException {
// We want to minimize the number of concurrent lock requests being issued. If we do not we
// get a large number of deadlocks in the database, since this method has to both clean
- // timedout locks and insert new locks. This synchronization barrier will not eliminiate all
+ // timedout locks and insert new locks. This synchronization barrier will not eliminate all
// deadlocks, and the code is still resilient in the face of a database deadlock. But it
// will reduce the number. This could have been done via a lock table command in the
// underlying database, but was not for two reasons. One, different databases have different
@@ -1452,7 +1453,7 @@ public class TxnHandler {
long extLockId,
boolean alwaysCommit)
throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException, SQLException {
- List<LockInfo> locksBeingChecked = getLockInfoFromLockId(dbConn, extLockId);
+ List<LockInfo> locksBeingChecked = getLockInfoFromLockId(dbConn, extLockId);//being acquired now
LockResponse response = new LockResponse();
response.setLockid(extLockId);
http://git-wip-us.apache.org/repos/asf/hive/blob/e57c3602/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
index 71f14e5..67631ba 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
@@ -88,7 +88,7 @@ public class ValidCompactorTxnList extends ValidReadTxnList {
@Override
public void readFromString(String src) {
- if (src == null) {
+ if (src == null || src.length() == 0) {
highWatermark = Long.MAX_VALUE;
exceptions = new long[0];
} else {
http://git-wip-us.apache.org/repos/asf/hive/blob/e57c3602/ql/src/java/org/apache/hadoop/hive/ql/Context.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index a74bbbe..ca0d487 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -96,7 +96,6 @@ public class Context {
// List of Locks for this query
protected List<HiveLock> hiveLocks;
- protected HiveLockManager hiveLockMgr;
// Transaction manager for this query
protected HiveTxnManager hiveTxnManager;
http://git-wip-us.apache.org/repos/asf/hive/blob/e57c3602/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 6ad3f49..424f4fa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -387,7 +387,10 @@ public class Driver implements CommandProcessor {
SessionState.get().setupQueryCurrentTimestamp();
try {
- command = new VariableSubstitution().substitute(conf,command);
+ // Initialize the transaction manager. This must be done before analyze is called.
+ SessionState.get().initTxnMgr(conf);
+
+ command = new VariableSubstitution().substitute(conf, command);
ctx = new Context(conf);
ctx.setTryCount(getTryCount());
ctx.setCmd(command);
@@ -399,13 +402,6 @@ public class Driver implements CommandProcessor {
tree = ParseUtils.findRootNonNullToken(tree);
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PARSE);
- // Initialize the transaction manager. This must be done before analyze is called. Also
- // record the valid transactions for this query. We have to do this at compile time
- // because we use the information in planning the query. Also,
- // we want to record it at this point so that users see data valid at the point that they
- // submit the query.
- SessionState.get().initTxnMgr(conf);
- recordValidTxns();
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ANALYZE);
BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(conf, tree);
@@ -445,10 +441,8 @@ public class Driver implements CommandProcessor {
// to avoid returning sensitive data
String queryStr = HookUtils.redactLogString(conf, command);
- String operationName = ctx.getExplain() ?
- HiveOperation.EXPLAIN.getOperationName() : SessionState.get().getCommandType();
plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId,
- operationName, getSchema(sem, conf));
+ SessionState.get().getHiveOperation(), getSchema(sem, conf));
conf.setVar(HiveConf.ConfVars.HIVEQUERYSTRING, queryStr);
@@ -507,7 +501,8 @@ public class Driver implements CommandProcessor {
downstreamError = e;
console.printError(errorMessage, "\n"
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
- return error.getErrorCode();
+ return error.getErrorCode();//todo: this is bad if returned as cmd shell exit
+ // since it exceeds valid range of shell return values
} finally {
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.COMPILE);
dumpMetaCallTimingWithoutEx("compilation");
@@ -937,30 +932,32 @@ public class Driver implements CommandProcessor {
// Write the current set of valid transactions into the conf file so that it can be read by
// the input format.
private void recordValidTxns() throws LockException {
- ValidTxnList txns = SessionState.get().getTxnMgr().getValidTxns();
+ HiveTxnManager txnMgr = SessionState.get().getTxnMgr();
+ ValidTxnList txns = txnMgr.getValidTxns();
String txnStr = txns.toString();
conf.set(ValidTxnList.VALID_TXNS_KEY, txnStr);
- LOG.debug("Encoding valid txns info " + txnStr);
- // TODO I think when we switch to cross query transactions we need to keep this list in
- // session state rather than agressively encoding it in the conf like this. We can let the
- // TableScanOperators then encode it in the conf before calling the input formats.
+ LOG.debug("Encoding valid txns info " + txnStr + " txnid:" + txnMgr.getCurrentTxnId());
}
/**
* Acquire read and write locks needed by the statement. The list of objects to be locked are
- * obtained from the inputs and outputs populated by the compiler. The lock acuisition scheme is
+ * obtained from the inputs and outputs populated by the compiler. The lock acquisition scheme is
* pretty simple. If all the locks cannot be obtained, error out. Deadlock is avoided by making
* sure that the locks are lexicographically sorted.
*
* This method also records the list of valid transactions. This must be done after any
* transactions have been opened and locks acquired.
+ * @param startTxnImplicitly in AC=false, the 1st DML starts a txn
**/
- private int acquireLocksAndOpenTxn() {
+ private int acquireLocksAndOpenTxn(boolean startTxnImplicitly) {
PerfLogger perfLogger = PerfLogger.getPerfLogger();
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ACQUIRE_READ_WRITE_LOCKS);
SessionState ss = SessionState.get();
HiveTxnManager txnMgr = ss.getTxnMgr();
+ if(startTxnImplicitly) {
+ assert !txnMgr.getAutoCommit();
+ }
try {
// Don't use the userName member, as it may or may not have been set. Get the value from
@@ -976,27 +973,34 @@ public class Driver implements CommandProcessor {
"\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
return 10;
}
- if (acidSinks != null && acidSinks.size() > 0) {
+
+ boolean existingTxn = txnMgr.isTxnOpen();
+ if((txnMgr.getAutoCommit() && haveAcidWrite()) || plan.getOperation() == HiveOperation.START_TRANSACTION ||
+ (!txnMgr.getAutoCommit() && startTxnImplicitly)) {
// We are writing to tables in an ACID compliant way, so we need to open a transaction
- long txnId = ss.getCurrentTxn();
- if (txnId == SessionState.NO_CURRENT_TXN) {
- txnId = txnMgr.openTxn(userFromUGI);
- ss.setCurrentTxn(txnId);
- LOG.debug("Setting current transaction to " + txnId);
- }
- // Set the transaction id in all of the acid file sinks
- if (acidSinks != null) {
- for (FileSinkDesc desc : acidSinks) {
- desc.setTransactionId(txnId);
- desc.setStatementId(txnMgr.getStatementId());
- }
+ txnMgr.openTxn(userFromUGI);
+ }
+ // Set the transaction id in all of the acid file sinks
+ if (haveAcidWrite()) {
+ for (FileSinkDesc desc : acidSinks) {
+ desc.setTransactionId(txnMgr.getCurrentTxnId());
+ desc.setStatementId(txnMgr.getStatementId());
}
-
- // TODO Once we move to cross query transactions we need to add the open transaction to
- // our list of valid transactions. We don't have a way to do that right now.
}
-
+ /*Note, we have to record snapshot after lock acquisition to prevent lost update problem
+ consider 2 concurrent "update table T set x = x + 1". 1st will get the locks and the
+ 2nd will block until 1st one commits and only then lock in the snapshot, i.e. it will
+ see the changes made by 1st one. This takes care of autoCommit=true case.
+ For multi-stmt txns this is not sufficient and will be managed via WriteSet tracking
+ in the lock manager.*/
txnMgr.acquireLocks(plan, ctx, userFromUGI);
+ if(!existingTxn) {
+ //For multi-stmt txns we should record the snapshot when txn starts but
+ // don't update it after that until txn completes. Thus the check for {@code existingTxn}
+ //For autoCommit=true, Read-only statements, txn is implicit, i.e. lock in the snapshot
+ //for each statement.
+ recordValidTxns();
+ }
return 0;
} catch (LockException e) {
@@ -1011,6 +1015,9 @@ public class Driver implements CommandProcessor {
}
}
+ private boolean haveAcidWrite() {
+ return acidSinks != null && !acidSinks.isEmpty();
+ }
/**
* @param hiveLocks
* list of hive locks to be released Release all the locks specified. If some of the
@@ -1028,17 +1035,14 @@ public class Driver implements CommandProcessor {
HiveTxnManager txnMgr = ss.getTxnMgr();
// If we've opened a transaction we need to commit or rollback rather than explicitly
// releasing the locks.
- if (ss.getCurrentTxn() != SessionState.NO_CURRENT_TXN && ss.isAutoCommit()) {
- try {
- if (commit) {
- txnMgr.commitTxn();
- } else {
- txnMgr.rollbackTxn();
- }
- } finally {
- ss.setCurrentTxn(SessionState.NO_CURRENT_TXN);
+ if (txnMgr.isTxnOpen()) {
+ if (commit) {
+ txnMgr.commitTxn();//both commit & rollback clear ALL locks for this tx
+ } else {
+ txnMgr.rollbackTxn();
}
} else {
+ //since there is no tx, we only have locks for current query (if any)
if (hiveLocks != null) {
txnMgr.getLockManager().releaseLocks(hiveLocks);
}
@@ -1193,44 +1197,77 @@ public class Driver implements CommandProcessor {
// Since we're reusing the compiled plan, we need to update its start time for current run
plan.setQueryStartTime(perfLogger.getStartTime(PerfLogger.DRIVER_RUN));
}
-
// the reason that we set the txn manager for the cxt here is because each
// query has its own ctx object. The txn mgr is shared across the
// same instance of Driver, which can run multiple queries.
- ctx.setHiveTxnManager(SessionState.get().getTxnMgr());
+ HiveTxnManager txnManager = SessionState.get().getTxnMgr();
+ ctx.setHiveTxnManager(txnManager);
+
+ boolean startTxnImplicitly = false;
+ {
+ //this block ensures op makes sense in given context, e.g. COMMIT is valid only if txn is open
+ //DDL is not allowed in a txn, etc.
+ //an error in an open txn does a rollback of the txn
+ if (txnManager.isTxnOpen() && !plan.getOperation().isAllowedInTransaction()) {
+ assert !txnManager.getAutoCommit() : "didn't expect AC=true";
+ return rollback(new CommandProcessorResponse(12, ErrorMsg.OP_NOT_ALLOWED_IN_TXN, null,
+ plan.getOperationName(), Long.toString(txnManager.getCurrentTxnId())));
+ }
+ if(!txnManager.isTxnOpen() && plan.getOperation().isRequiresOpenTransaction()) {
+ return rollback(new CommandProcessorResponse(12, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, null, plan.getOperationName()));
+ }
+ if(!txnManager.isTxnOpen() && plan.getOperation() == HiveOperation.QUERY && !txnManager.getAutoCommit()) {
+ //this effectively makes START TRANSACTION optional and supports JDBC setAutoCommit(false) semantics
+ //also, indirectly allows DDL to be executed outside a txn context
+ startTxnImplicitly = true;
+ }
+ if(txnManager.getAutoCommit() && plan.getOperation() == HiveOperation.START_TRANSACTION) {
+ return rollback(new CommandProcessorResponse(12, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT, null, plan.getOperationName()));
+ }
+ }
+ if(plan.getOperation() == HiveOperation.SET_AUTOCOMMIT) {
+ try {
+ if(plan.getAutoCommitValue() && !txnManager.getAutoCommit()) {
+ /*here, if there is an open txn, we want to commit it; this behavior matches
+ * https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#setAutoCommit(boolean)*/
+ releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), true);
+ txnManager.setAutoCommit(true);
+ }
+ else if(!plan.getAutoCommitValue() && txnManager.getAutoCommit()) {
+ txnManager.setAutoCommit(false);
+ }
+ else {/*didn't change autoCommit value - no-op*/}
+ }
+ catch(LockException e) {
+ return handleHiveException(e, 12);
+ }
+ }
if (requiresLock()) {
- ret = acquireLocksAndOpenTxn();
+ ret = acquireLocksAndOpenTxn(startTxnImplicitly);
if (ret != 0) {
- try {
- releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), false);
- } catch (LockException e) {
- // Not much to do here
- }
- return createProcessorResponse(ret);
+ return rollback(createProcessorResponse(ret));
}
}
ret = execute();
if (ret != 0) {
//if needRequireLock is false, the release here will do nothing because there is no lock
- try {
- releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), false);
- } catch (LockException e) {
- // Nothing to do here
- }
- return createProcessorResponse(ret);
+ return rollback(createProcessorResponse(ret));
}
//if needRequireLock is false, the release here will do nothing because there is no lock
try {
- releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), true);
+ if(txnManager.getAutoCommit() || plan.getOperation() == HiveOperation.COMMIT) {
+ releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), true);
+ }
+ else if(plan.getOperation() == HiveOperation.ROLLBACK) {
+ releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), false);
+ }
+ else {
+ //txn (if there is one started) is not finished
+ }
} catch (LockException e) {
- errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);
- SQLState = ErrorMsg.findSQLState(e.getMessage());
- downstreamError = e;
- console.printError(errorMessage + "\n"
- + org.apache.hadoop.util.StringUtils.stringifyException(e));
- return createProcessorResponse(12);
+ return handleHiveException(e, 12);
}
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DRIVER_RUN);
@@ -1253,6 +1290,31 @@ public class Driver implements CommandProcessor {
return createProcessorResponse(ret);
}
+ private CommandProcessorResponse rollback(CommandProcessorResponse cpr) {
+ try {
+ releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), false);
+ }
+ catch (LockException e) {
+ LOG.error("rollback() FAILED: " + cpr);//make sure not to loose
+ handleHiveException(e, 12, "Additional info in hive.log at \"rollback() FAILED\"");
+ }
+ return cpr;
+ }
+ private CommandProcessorResponse handleHiveException(HiveException e, int ret) {
+ return handleHiveException(e, ret, null);
+ }
+ private CommandProcessorResponse handleHiveException(HiveException e, int ret, String rootMsg) {
+ errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);
+ if(rootMsg != null) {
+ errorMessage += "\n" + rootMsg;
+ }
+ SQLState = e.getCanonicalErrorMsg() != null ?
+ e.getCanonicalErrorMsg().getSQLState() : ErrorMsg.findSQLState(e.getMessage());
+ downstreamError = e;
+ console.printError(errorMessage + "\n"
+ + org.apache.hadoop.util.StringUtils.stringifyException(e));
+ return createProcessorResponse(ret);
+ }
private boolean requiresLock() {
if (!checkConcurrency()) {
return false;
http://git-wip-us.apache.org/repos/asf/hive/blob/e57c3602/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index fbdd66a..39b287a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -447,7 +447,7 @@ public enum ErrorMsg {
" (={2}). This is controlled by hive.limit.query.max.table.partition.", true),
OP_NOT_ALLOWED_IN_AUTOCOMMIT(20006, "Operation {0} is not allowed when autoCommit=true.", true),//todo: better SQLState?
OP_NOT_ALLOWED_IN_TXN(20007, "Operation {0} is not allowed in a transaction. TransactionID={1}.", true),
- OP_NOT_ALLOWED_WITHOUT_TXN(2008, "Operation {0} is not allowed since autoCommit=false and there is no active transaction", true),
+ OP_NOT_ALLOWED_WITHOUT_TXN(20008, "Operation {0} is not allowed since autoCommit=false and there is no active transaction", true),
//========================== 30000 range starts here ========================//
STATSPUBLISHER_NOT_OBTAINED(30000, "StatsPublisher cannot be obtained. " +
http://git-wip-us.apache.org/repos/asf/hive/blob/e57c3602/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
index 29a3939..b9776ea 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.ColumnAccessInfo;
import org.apache.hadoop.hive.ql.parse.TableAccessInfo;
+import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReducerTimeStatsPerJob;
import org.apache.hadoop.hive.ql.plan.api.AdjacencyType;
@@ -106,14 +107,16 @@ public class QueryPlan implements Serializable {
private QueryProperties queryProperties;
private transient Long queryStartTime;
- private String operationName;
+ private final HiveOperation operation;
+ private Boolean autoCommitValue;
public QueryPlan() {
this.reducerTimeStatsPerJobList = new ArrayList<ReducerTimeStatsPerJob>();
+ operation = null;
}
public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, String queryId,
- String operationName, Schema resultSchema) {
+ HiveOperation operation, Schema resultSchema) {
this.queryString = queryString;
rootTasks = new ArrayList<Task<? extends Serializable>>();
@@ -134,7 +137,8 @@ public class QueryPlan implements Serializable {
query.putToQueryAttributes("queryString", this.queryString);
queryProperties = sem.getQueryProperties();
queryStartTime = startTime;
- this.operationName = operationName;
+ this.operation = operation;
+ this.autoCommitValue = sem.getAutoCommitValue();
this.resultSchema = resultSchema;
}
@@ -794,6 +798,12 @@ public class QueryPlan implements Serializable {
}
public String getOperationName() {
- return operationName;
+ return operation == null ? null : operation.getOperationName();
+ }
+ public HiveOperation getOperation() {
+ return operation;
+ }
+ public Boolean getAutoCommitValue() {
+ return autoCommitValue;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e57c3602/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index b07a37a..0a466e4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -380,7 +380,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
tbd.getHoldDDLTime(),
isSkewedStoredAsDirs(tbd),
work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID,
- SessionState.get().getCurrentTxn());
+ SessionState.get().getTxnMgr().getCurrentTxnId());
console.printInfo("\t Time taken for load dynamic partitions : " +
(System.currentTimeMillis() - startTime));
http://git-wip-us.apache.org/repos/asf/hive/blob/e57c3602/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 445f606..4813d5b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -21,7 +21,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.LockComponentBuilder;
import org.apache.hadoop.hive.metastore.LockRequestBuilder;
@@ -51,6 +50,10 @@ public class DbTxnManager extends HiveTxnManagerImpl {
private DbLockManager lockMgr = null;
private IMetaStoreClient client = null;
+ /**
+ * The Metastore NEXT_TXN_ID.NTXN_NEXT is initialized to 1; it contains the next available
+ * transaction id. Thus is 1 is first transaction id.
+ */
private long txnId = 0;
/**
* assigns a unique monotonically increasing ID to each statement
@@ -75,14 +78,16 @@ public class DbTxnManager extends HiveTxnManagerImpl {
@Override
public long openTxn(String user) throws LockException {
init();
+ if(isTxnOpen()) {
+ throw new LockException("Transaction already opened. txnId=" + txnId);//ToDo: ErrorMsg
+ }
try {
txnId = client.openTxn(user);
statementId = 0;
LOG.debug("Opened txn " + txnId);
return txnId;
} catch (TException e) {
- throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(),
- e);
+ throw new LockException(e, ErrorMsg.METASTORE_COMMUNICATION_FAILED);
}
}
@@ -232,7 +237,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
}
List<HiveLock> locks = new ArrayList<HiveLock>(1);
- if(txnId > 0) {
+ if(isTxnOpen()) {
statementId++;
}
LockState lockState = lockMgr.lock(rqstBuilder.build(), plan.getQueryId(), isBlocking, locks);
@@ -242,9 +247,8 @@ public class DbTxnManager extends HiveTxnManagerImpl {
@Override
public void commitTxn() throws LockException {
- if (txnId == 0) {
- throw new RuntimeException("Attempt to commit before opening a " +
- "transaction");
+ if (!isTxnOpen()) {
+ throw new RuntimeException("Attempt to commit before opening a transaction");
}
try {
lockMgr.clearLocalLockRecords();
@@ -267,9 +271,8 @@ public class DbTxnManager extends HiveTxnManagerImpl {
@Override
public void rollbackTxn() throws LockException {
- if (txnId == 0) {
- throw new RuntimeException("Attempt to rollback before opening a " +
- "transaction");
+ if (!isTxnOpen()) {
+ throw new RuntimeException("Attempt to rollback before opening a transaction");
}
try {
lockMgr.clearLocalLockRecords();
@@ -292,7 +295,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
LOG.debug("Heartbeating lock and transaction " + txnId);
List<HiveLock> locks = lockMgr.getLocks(false, false);
if (locks.size() == 0) {
- if (txnId == 0) {
+ if (!isTxnOpen()) {
// No locks, no txn, we outta here.
return;
} else {
@@ -350,7 +353,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
@Override
protected void destruct() {
try {
- if (txnId > 0) rollbackTxn();
+ if (isTxnOpen()) rollbackTxn();
if (lockMgr != null) lockMgr.close();
} catch (Exception e) {
LOG.error("Caught exception " + e.getClass().getName() + " with message <" + e.getMessage()
@@ -376,8 +379,15 @@ public class DbTxnManager extends HiveTxnManagerImpl {
}
}
@Override
+ public boolean isTxnOpen() {
+ return txnId > 0;
+ }
+ @Override
+ public long getCurrentTxnId() {
+ return txnId;
+ }
+ @Override
public int getStatementId() {
return statementId;
}
-
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e57c3602/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
index 1906982..be5a593 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
@@ -52,6 +52,14 @@ class DummyTxnManager extends HiveTxnManagerImpl {
// No-op
return 0L;
}
+ @Override
+ public boolean isTxnOpen() {
+ return false;
+ }
+ @Override
+ public long getCurrentTxnId() {
+ return 0L;
+ }
@Override
public int getStatementId() {
http://git-wip-us.apache.org/repos/asf/hive/blob/e57c3602/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
index c900548..74512d7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
@@ -173,6 +173,27 @@ public interface HiveTxnManager {
*/
boolean supportsAcid();
+ /**
+ * This behaves exactly as
+ * https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#setAutoCommit(boolean)
+ */
+ void setAutoCommit(boolean autoCommit) throws LockException;
+
+ /**
+ * This behaves exactly as
+ * https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#getAutoCommit()
+ */
+ boolean getAutoCommit();
+
+ boolean isTxnOpen();
+ /**
+ * if {@code isTxnOpen()}, returns the currently active transaction ID
+ */
+ long getCurrentTxnId();
+
+ /**
+ * 0..N Id of current statement within currently opened transaction
+ */
int getStatementId();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e57c3602/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
index ceeae68..ed022d9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hive.ql.plan.UnlockTableDesc;
abstract class HiveTxnManagerImpl implements HiveTxnManager {
protected HiveConf conf;
+ private boolean isAutoCommit = true;//true by default; matches JDBC spec
void setHiveConf(HiveConf c) {
conf = c;
@@ -58,6 +59,15 @@ abstract class HiveTxnManagerImpl implements HiveTxnManager {
protected void finalize() throws Throwable {
destruct();
}
+ @Override
+ public void setAutoCommit(boolean autoCommit) throws LockException {
+ isAutoCommit = autoCommit;
+ }
+
+ @Override
+ public boolean getAutoCommit() {
+ return isAutoCommit;
+ }
@Override
public int lockTable(Hive db, LockTableDesc lockTbl) throws HiveException {
http://git-wip-us.apache.org/repos/asf/hive/blob/e57c3602/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/LockException.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/LockException.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/LockException.java
index 9894a70..8ea457e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/LockException.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/LockException.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.lockmgr;
+import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.metadata.HiveException;
/**
@@ -43,5 +44,10 @@ public class LockException extends HiveException {
public LockException(String message, Throwable cause) {
super(message, cause);
}
-
+ public LockException(Throwable cause, ErrorMsg errorMsg, String... msgArgs) {
+ super(cause, errorMsg, msgArgs);
+ }
+ public LockException(Throwable cause, ErrorMsg errorMsg) {
+ super(cause, errorMsg);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e57c3602/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveException.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveException.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveException.java
index 1d895ca..d017705 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveException.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveException.java
@@ -60,6 +60,9 @@ public class HiveException extends Exception {
canonicalErrorMsg = errorMsg;
}
+ public HiveException(Throwable cause, ErrorMsg errorMsg) {
+ this(cause, errorMsg, new String[0]);
+ }
/**
* @return {@link ErrorMsg#GENERIC_ERROR} by default
*/
http://git-wip-us.apache.org/repos/asf/hive/blob/e57c3602/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
index d72991f..fbe93f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
@@ -125,6 +125,19 @@ public abstract class BaseSemanticAnalyzer {
* Columns accessed by updates
*/
protected ColumnAccessInfo updateColumnAccessInfo;
+ /**
+ * the value of set autocommit true|false
+ * It's an object to make sure it's {@code null} if the parsed statement is
+ * not 'set autocommit...'
+ */
+ private Boolean autoCommitValue;
+
+ public Boolean getAutoCommitValue() {
+ return autoCommitValue;
+ }
+ void setAutoCommitValue(Boolean autoCommit) {
+ autoCommitValue = autoCommit;
+ }
public boolean skipAuthorization() {
http://git-wip-us.apache.org/repos/asf/hive/blob/e57c3602/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
index 85c0ae6..9f8cfd1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
@@ -301,6 +301,17 @@ KW_DAY: 'DAY';
KW_HOUR: 'HOUR';
KW_MINUTE: 'MINUTE';
KW_SECOND: 'SECOND';
+KW_START: 'START';
+KW_TRANSACTION: 'TRANSACTION';
+KW_COMMIT: 'COMMIT';
+KW_ROLLBACK: 'ROLLBACK';
+KW_WORK: 'WORK';
+KW_ONLY: 'ONLY';
+KW_WRITE: 'WRITE';
+KW_ISOLATION: 'ISOLATION';
+KW_LEVEL: 'LEVEL';
+KW_SNAPSHOT: 'SNAPSHOT';
+KW_AUTOCOMMIT: 'AUTOCOMMIT';
// Operators
// NOTE: if you add a new function/operator, add it to sysFuncNames so that describe function _FUNC_ will work.
http://git-wip-us.apache.org/repos/asf/hive/blob/e57c3602/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
index 3f95bb8..cf7ab3a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
@@ -354,6 +354,15 @@ TOK_ANONYMOUS;
TOK_COL_NAME;
TOK_URI_TYPE;
TOK_SERVER_TYPE;
+TOK_START_TRANSACTION;
+TOK_ISOLATION_LEVEL;
+TOK_ISOLATION_SNAPSHOT;
+TOK_TXN_ACCESS_MODE;
+TOK_TXN_READ_ONLY;
+TOK_TXN_READ_WRITE;
+TOK_COMMIT;
+TOK_ROLLBACK;
+TOK_SET_AUTOCOMMIT;
}
@@ -375,6 +384,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
private static HashMap<String, String> xlateMap;
static {
+ //this is used to support auto completion in CLI
xlateMap = new HashMap<String, String>();
// Keywords
@@ -693,6 +703,7 @@ execStatement
| ddlStatement
| deleteStatement
| updateStatement
+ | sqlTransactionStatement
;
loadStatement
@@ -2363,3 +2374,62 @@ updateStatement
:
KW_UPDATE tableName setColumnsClause whereClause? -> ^(TOK_UPDATE_TABLE tableName setColumnsClause whereClause?)
;
+
+/*
+BEGIN user defined transaction boundaries; follows SQL 2003 standard exactly except for addition of
+"setAutoCommitStatement" which is not in the standard doc but is supported by most SQL engines.
+*/
+sqlTransactionStatement
+@init { pushMsg("transaction statement", state); }
+@after { popMsg(state); }
+ :
+ startTransactionStatement
+ | commitStatement
+ | rollbackStatement
+ | setAutoCommitStatement
+ ;
+
+startTransactionStatement
+ :
+ KW_START KW_TRANSACTION ( transactionMode ( COMMA transactionMode )* )? -> ^(TOK_START_TRANSACTION transactionMode*)
+ ;
+
+transactionMode
+ :
+ isolationLevel
+ | transactionAccessMode -> ^(TOK_TXN_ACCESS_MODE transactionAccessMode)
+ ;
+
+transactionAccessMode
+ :
+ KW_READ KW_ONLY -> TOK_TXN_READ_ONLY
+ | KW_READ KW_WRITE -> TOK_TXN_READ_WRITE
+ ;
+
+isolationLevel
+ :
+ KW_ISOLATION KW_LEVEL levelOfIsolation -> ^(TOK_ISOLATION_LEVEL levelOfIsolation)
+ ;
+
+/*READ UNCOMMITTED | READ COMMITTED | REPEATABLE READ | SERIALIZABLE may be supported later*/
+levelOfIsolation
+ :
+ KW_SNAPSHOT -> TOK_ISOLATION_SNAPSHOT
+ ;
+
+commitStatement
+ :
+ KW_COMMIT ( KW_WORK )? -> TOK_COMMIT
+ ;
+
+rollbackStatement
+ :
+ KW_ROLLBACK ( KW_WORK )? -> TOK_ROLLBACK
+ ;
+setAutoCommitStatement
+ :
+ KW_SET KW_AUTOCOMMIT booleanValueTok -> ^(TOK_SET_AUTOCOMMIT booleanValueTok)
+ ;
+/*
+END user defined transaction boundaries
+*/
http://git-wip-us.apache.org/repos/asf/hive/blob/e57c3602/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
index 4f8be52..501287d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
@@ -499,6 +499,12 @@ booleanValue
KW_TRUE^ | KW_FALSE^
;
+booleanValueTok
+ :
+ KW_TRUE -> TOK_TRUE
+ | KW_FALSE -> TOK_FALSE
+ ;
+
tableOrPartition
:
tableName partitionSpec? -> ^(TOK_TAB tableName partitionSpec?)
@@ -629,7 +635,18 @@ nonReserved
| KW_STREAMTABLE | KW_STRING | KW_STRUCT | KW_TABLES | KW_TBLPROPERTIES | KW_TEMPORARY | KW_TERMINATED
| KW_TINYINT | KW_TOUCH | KW_TRANSACTIONS | KW_UNARCHIVE | KW_UNDO | KW_UNIONTYPE | KW_UNLOCK | KW_UNSET
| KW_UNSIGNED | KW_URI | KW_USE | KW_UTC | KW_UTCTIMESTAMP | KW_VALUE_TYPE | KW_VIEW | KW_WHILE | KW_YEAR
- ;
+ | KW_WORK
+ | KW_START
+ | KW_TRANSACTION
+ | KW_COMMIT
+ | KW_ROLLBACK
+ | KW_ONLY
+ | KW_WRITE
+ | KW_ISOLATION
+ | KW_LEVEL
+ | KW_SNAPSHOT
+ | KW_AUTOCOMMIT
+;
//The following SQL2011 reserved keywords are used as cast function name only, it is a subset of the sql11ReservedKeywordsUsedAsIdentifier.
sql11ReservedKeywordsUsedAsCastFunctionName
http://git-wip-us.apache.org/repos/asf/hive/blob/e57c3602/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 0c191da..ad4efef 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -10042,6 +10042,25 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
viewsExpanded.add(createVwDesc.getViewName());
}
+ switch(ast.getToken().getType()) {
+ case HiveParser.TOK_SET_AUTOCOMMIT:
+ assert ast.getChildCount() == 1;
+ if(ast.getChild(0).getType() == HiveParser.TOK_TRUE) {
+ setAutoCommitValue(true);
+ }
+ else if(ast.getChild(0).getType() == HiveParser.TOK_FALSE) {
+ setAutoCommitValue(false);
+ }
+ else {
+ assert false : "Unexpected child of TOK_SET_AUTOCOMMIT: " + ast.getChild(0).getType();
+ }
+ //fall through
+ case HiveParser.TOK_START_TRANSACTION:
+ case HiveParser.TOK_COMMIT:
+ case HiveParser.TOK_ROLLBACK:
+ SessionState.get().setCommandType(SemanticAnalyzerFactory.getOperation(ast.getToken().getType()));
+ return false;
+ }
// 4. continue analyzing from the child ASTNode.
Phase1Ctx ctx_1 = initPhase1Ctx();
preProcessForInsert(child, qb);
@@ -10164,7 +10183,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
}
// 6. Generate table access stats if required
- if (HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_TABLEKEYS) == true) {
+ if (HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_TABLEKEYS)) {
TableAccessAnalyzer tableAccessAnalyzer = new TableAccessAnalyzer(pCtx);
setTableAccessInfo(tableAccessAnalyzer.analyzeTableAccess());
}
@@ -10187,7 +10206,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
boolean isColumnInfoNeedForAuth = SessionState.get().isAuthorizationModeV2()
&& HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED);
if (isColumnInfoNeedForAuth
- || HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_SCANCOLS) == true) {
+ || HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_SCANCOLS)) {
ColumnAccessAnalyzer columnAccessAnalyzer = new ColumnAccessAnalyzer(pCtx);
setColumnAccessInfo(columnAccessAnalyzer.analyzeColumnAccess());
}
@@ -10657,7 +10676,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
* Add default properties for table property. If a default parameter exists
* in the tblProp, the value in tblProp will be kept.
*
- * @param table
+ * @param tblProp
* property map
* @return Modified table property map
*/
http://git-wip-us.apache.org/repos/asf/hive/blob/e57c3602/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
index 2fdf1e7..a2fbc11 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
@@ -59,6 +59,7 @@ public final class SemanticAnalyzerFactory {
commandType.put(HiveParser.TOK_ALTERTABLE_UNARCHIVE, HiveOperation.ALTERTABLE_UNARCHIVE);
commandType.put(HiveParser.TOK_ALTERTABLE_PROPERTIES, HiveOperation.ALTERTABLE_PROPERTIES);
commandType.put(HiveParser.TOK_ALTERTABLE_DROPPROPERTIES, HiveOperation.ALTERTABLE_PROPERTIES);
+ commandType.put(HiveParser.TOK_ALTERTABLE_EXCHANGEPARTITION, HiveOperation.ALTERTABLE_EXCHANGEPARTITION);
commandType.put(HiveParser.TOK_SHOWDATABASES, HiveOperation.SHOWDATABASES);
commandType.put(HiveParser.TOK_SHOWTABLES, HiveOperation.SHOWTABLES);
commandType.put(HiveParser.TOK_SHOWCOLUMNS, HiveOperation.SHOWCOLUMNS);
@@ -111,6 +112,10 @@ public final class SemanticAnalyzerFactory {
commandType.put(HiveParser.TOK_ALTERTABLE_PARTCOLTYPE, HiveOperation.ALTERTABLE_PARTCOLTYPE);
commandType.put(HiveParser.TOK_SHOW_COMPACTIONS, HiveOperation.SHOW_COMPACTIONS);
commandType.put(HiveParser.TOK_SHOW_TRANSACTIONS, HiveOperation.SHOW_TRANSACTIONS);
+ commandType.put(HiveParser.TOK_START_TRANSACTION, HiveOperation.START_TRANSACTION);
+ commandType.put(HiveParser.TOK_COMMIT, HiveOperation.COMMIT);
+ commandType.put(HiveParser.TOK_ROLLBACK, HiveOperation.ROLLBACK);
+ commandType.put(HiveParser.TOK_SET_AUTOCOMMIT, HiveOperation.SET_AUTOCOMMIT);
}
static {
@@ -270,6 +275,10 @@ public final class SemanticAnalyzerFactory {
case HiveParser.TOK_DELETE_FROM:
return new UpdateDeleteSemanticAnalyzer(conf);
+ case HiveParser.TOK_START_TRANSACTION:
+ case HiveParser.TOK_COMMIT:
+ case HiveParser.TOK_ROLLBACK:
+ case HiveParser.TOK_SET_AUTOCOMMIT:
default: {
SemanticAnalyzer semAnalyzer = HiveConf
.getBoolVar(conf, HiveConf.ConfVars.HIVE_CBO_ENABLED) ? new CalcitePlanner(conf)
@@ -289,4 +298,7 @@ public final class SemanticAnalyzerFactory {
private SemanticAnalyzerFactory() {
// prevent instantiation
}
+ static HiveOperation getOperation(int hiveParserToken) {
+ return commandType.get(hiveParserToken);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e57c3602/ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java
index df37832..dee2136 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java
@@ -100,7 +100,7 @@ public enum HiveOperation {
CREATETABLE("CREATETABLE", null, new Privilege[]{Privilege.CREATE}),
TRUNCATETABLE("TRUNCATETABLE", null, new Privilege[]{Privilege.DROP}),
CREATETABLE_AS_SELECT("CREATETABLE_AS_SELECT", new Privilege[]{Privilege.SELECT}, new Privilege[]{Privilege.CREATE}),
- QUERY("QUERY", new Privilege[]{Privilege.SELECT}, new Privilege[]{Privilege.ALTER_DATA, Privilege.CREATE}),
+ QUERY("QUERY", new Privilege[]{Privilege.SELECT}, new Privilege[]{Privilege.ALTER_DATA, Privilege.CREATE}, true, false),
ALTERINDEX_PROPS("ALTERINDEX_PROPS",null, null),
ALTERDATABASE("ALTERDATABASE", null, null),
ALTERDATABASE_OWNER("ALTERDATABASE_OWNER", null, null),
@@ -111,11 +111,16 @@ public enum HiveOperation {
ALTERTBLPART_SKEWED_LOCATION("ALTERTBLPART_SKEWED_LOCATION",
new Privilege[] {Privilege.ALTER_DATA}, null),
ALTERTABLE_PARTCOLTYPE("ALTERTABLE_PARTCOLTYPE", new Privilege[] { Privilege.SELECT }, new Privilege[] { Privilege.ALTER_DATA }),
+ ALTERTABLE_EXCHANGEPARTITION("ALTERTABLE_EXCHANGEPARTITION", null, null),
ALTERVIEW_RENAME("ALTERVIEW_RENAME", new Privilege[] {Privilege.ALTER_METADATA}, null),
ALTERVIEW_AS("ALTERVIEW_AS", new Privilege[] {Privilege.ALTER_METADATA}, null),
ALTERTABLE_COMPACT("ALTERTABLE_COMPACT", new Privilege[]{Privilege.SELECT}, new Privilege[]{Privilege.ALTER_DATA}),
SHOW_COMPACTIONS("SHOW COMPACTIONS", null, null),
- SHOW_TRANSACTIONS("SHOW TRANSACTIONS", null, null);
+ SHOW_TRANSACTIONS("SHOW TRANSACTIONS", null, null),
+ START_TRANSACTION("START TRANSACTION", null, null, false, false),
+ COMMIT("COMMIT", null, null, true, true),
+ ROLLBACK("ROLLBACK", null, null, true, true),
+ SET_AUTOCOMMIT("SET AUTOCOMMIT", null, null, true, false);
;
private String operationName;
@@ -124,6 +129,12 @@ public enum HiveOperation {
private Privilege[] outputRequiredPrivileges;
+ /**
+ * Only a small set of operations is allowed inside an open transactions, e.g. DML
+ */
+ private final boolean allowedInTransaction;
+ private final boolean requiresOpenTransaction;
+
public Privilege[] getInputRequiredPrivileges() {
return inputRequiredPrivileges;
}
@@ -136,11 +147,26 @@ public enum HiveOperation {
return operationName;
}
+ public boolean isAllowedInTransaction() {
+ return allowedInTransaction;
+ }
+ public boolean isRequiresOpenTransaction() { return requiresOpenTransaction; }
+
private HiveOperation(String operationName,
- Privilege[] inputRequiredPrivileges, Privilege[] outputRequiredPrivileges) {
+ Privilege[] inputRequiredPrivileges, Privilege[] outputRequiredPrivileges) {
+ this(operationName, inputRequiredPrivileges, outputRequiredPrivileges, false, false);
+ }
+ private HiveOperation(String operationName,
+ Privilege[] inputRequiredPrivileges, Privilege[] outputRequiredPrivileges,
+ boolean allowedInTransaction, boolean requiresOpenTransaction) {
this.operationName = operationName;
this.inputRequiredPrivileges = inputRequiredPrivileges;
this.outputRequiredPrivileges = outputRequiredPrivileges;
+ this.requiresOpenTransaction = requiresOpenTransaction;
+ if(requiresOpenTransaction) {
+ allowedInTransaction = true;
+ }
+ this.allowedInTransaction = allowedInTransaction;
}
public static class PrivilegeAgreement {
http://git-wip-us.apache.org/repos/asf/hive/blob/e57c3602/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java b/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java
index 4584517..21b7457 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java
@@ -19,16 +19,19 @@
package org.apache.hadoop.hive.ql.processors;
import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.ql.ErrorMsg;
/**
* Encapsulates the basic response info returned by classes the implement the
* <code>CommandProcessor</code> interface. Typically <code>errorMessage</code>
* and <code>SQLState</code> will only be set if the <code>responseCode</code>
- * is not 0.
+ * is not 0. Note that often {@code responseCode} ends up the exit value of
+ * command shell process so should keep it to < 127.
*/
public class CommandProcessorResponse {
private final int responseCode;
private final String errorMessage;
+ private final int hiveErrorCode;
private final String SQLState;
private final Schema resSchema;
@@ -49,6 +52,10 @@ public class CommandProcessorResponse {
public CommandProcessorResponse(int responseCode, String errorMessage, String SQLState, Schema schema) {
this(responseCode, errorMessage, SQLState, schema, null);
}
+ public CommandProcessorResponse(int responseCode, ErrorMsg canonicalErrMsg, Throwable t, String ... msgArgs) {
+ this(responseCode, canonicalErrMsg.format(msgArgs),
+ canonicalErrMsg.getSQLState(), null, t, canonicalErrMsg.getErrorCode());
+ }
/**
* Create CommandProcessorResponse object indicating an error.
@@ -63,12 +70,17 @@ public class CommandProcessorResponse {
}
public CommandProcessorResponse(int responseCode, String errorMessage, String SQLState,
- Schema schema, Throwable exception) {
+ Schema schema, Throwable exception) {
+ this(responseCode, errorMessage, SQLState, schema, exception, -1);
+ }
+ public CommandProcessorResponse(int responseCode, String errorMessage, String SQLState,
+ Schema schema, Throwable exception, int hiveErrorCode) {
this.responseCode = responseCode;
this.errorMessage = errorMessage;
this.SQLState = SQLState;
this.resSchema = schema;
this.exception = exception;
+ this.hiveErrorCode = hiveErrorCode;
}
public int getResponseCode() { return responseCode; }
@@ -76,8 +88,11 @@ public class CommandProcessorResponse {
public String getSQLState() { return SQLState; }
public Schema getSchema() { return resSchema; }
public Throwable getException() { return exception; }
+ public int getErrorCode() { return hiveErrorCode; }
public String toString() {
- return "(" + responseCode + "," + errorMessage + "," + SQLState +
+ return "(" + responseCode + "," + errorMessage + "," +
+ (hiveErrorCode > 0 ? hiveErrorCode + "," : "" ) +
+ SQLState +
(resSchema == null ? "" : ",") +
(exception == null ? "" : exception.getMessage()) + ")";
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e57c3602/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java b/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java
index 319a79b..c8c9831 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java
@@ -75,6 +75,9 @@ public enum HiveCommand {
} else if(command.length > 1 && "from".equalsIgnoreCase(command[1])) {
//special handling for SQL "delete from <table> where..."
return null;
+ }
+ else if(command.length > 1 && "set".equalsIgnoreCase(command[0]) && "autocommit".equalsIgnoreCase(command[1])) {
+ return null;//don't want set autocommit true|false to get mixed with set hive.foo.bar...
} else if (COMMANDS.contains(cmd)) {
HiveCommand hiveCommand = HiveCommand.valueOf(cmd);
http://git-wip-us.apache.org/repos/asf/hive/blob/e57c3602/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/HiveOperationType.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/HiveOperationType.java b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/HiveOperationType.java
index b974b59..71be469 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/HiveOperationType.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/HiveOperationType.java
@@ -125,6 +125,11 @@ public enum HiveOperationType {
ADD,
DELETE,
COMPILE,
+ START_TRANSACTION,
+ COMMIT,
+ ROLLBACK,
+ SET_AUTOCOMMIT,
+ ALTERTABLE_EXCHANGEPARTITION,
// ==== Hive command operations ends here ==== //
// ==== HiveServer2 metadata api types start here ==== //
http://git-wip-us.apache.org/repos/asf/hive/blob/e57c3602/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/Operation2Privilege.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/Operation2Privilege.java b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/Operation2Privilege.java
index a6226b6..8e61d57 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/Operation2Privilege.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/Operation2Privilege.java
@@ -400,6 +400,17 @@ public class Operation2Privilege {
op2Priv.put(HiveOperationType.GET_COLUMNS,
PrivRequirement.newIOPrivRequirement(SEL_NOGRANT_AR, null));
+ op2Priv.put(HiveOperationType.START_TRANSACTION, PrivRequirement.newIOPrivRequirement
+ (null, null));
+ op2Priv.put(HiveOperationType.COMMIT, PrivRequirement.newIOPrivRequirement
+ (null, null));
+ op2Priv.put(HiveOperationType.ROLLBACK, PrivRequirement.newIOPrivRequirement
+ (null, null));
+ op2Priv.put(HiveOperationType.SET_AUTOCOMMIT, PrivRequirement.newIOPrivRequirement
+ (null, null));
+ op2Priv.put(HiveOperationType.ALTERTABLE_EXCHANGEPARTITION,
+ PrivRequirement.newIOPrivRequirement(null, null));
+
}
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/e57c3602/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index 49d64db..510d8a7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -116,7 +116,7 @@ public class SessionState {
/**
* current configuration.
*/
- protected HiveConf conf;
+ private final HiveConf conf;
/**
* silent mode.
@@ -249,23 +249,6 @@ public class SessionState {
private HiveTxnManager txnMgr = null;
/**
- * When {@link #setCurrentTxn(long)} is set to this or {@link #getCurrentTxn()}} returns this it
- * indicates that there is not a current transaction in this session.
- */
- public static final long NO_CURRENT_TXN = -1L;
-
- /**
- * Transaction currently open
- */
- private long currentTxn = NO_CURRENT_TXN;
-
- /**
- * Whether we are in auto-commit state or not. Currently we are always in auto-commit,
- * so there are not setters for this yet.
- */
- private final boolean txnAutoCommit = true;
-
- /**
* store the jars loaded last time
*/
private final Set<String> preReloadableAuxJars = new HashSet<String>();
@@ -293,9 +276,6 @@ public class SessionState {
return conf;
}
- public void setConf(HiveConf conf) {
- this.conf = conf;
- }
public File getTmpOutputFile() {
return tmpOutputFile;
@@ -410,18 +390,6 @@ public class SessionState {
return txnMgr;
}
- public long getCurrentTxn() {
- return currentTxn;
- }
-
- public void setCurrentTxn(long currTxn) {
- currentTxn = currTxn;
- }
-
- public boolean isAutoCommit() {
- return txnAutoCommit;
- }
-
public HadoopShims.HdfsEncryptionShim getHdfsEncryptionShim() throws HiveException {
if (hdfsEncryptionShim == null) {
try {
http://git-wip-us.apache.org/repos/asf/hive/blob/e57c3602/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
new file mode 100644
index 0000000..c73621f
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -0,0 +1,473 @@
+package org.apache.hadoop.hive.ql;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.orc.FileDump;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * The LockManager is not ready, but for no-concurrency straight-line path we can
+ * test AC=true, and AC=false with commit/rollback/exception and test resulting data.
+ *
+ * Can also test, calling commit in AC=true mode, etc, toggling AC...
+ */
+public class TestTxnCommands {
+ private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
+ File.separator + TestTxnCommands.class.getCanonicalName()
+ + "-" + System.currentTimeMillis()
+ ).getPath().replaceAll("\\\\", "/");
+ private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
+ //bucket count for test tables; set it to 1 for easier debugging
+ private static int BUCKET_COUNT = 2;
+ @Rule
+ public TestName testName = new TestName();
+ private HiveConf hiveConf;
+ private Driver d;
+ private static enum Table {
+ ACIDTBL("acidTbl"),
+ ACIDTBL2("acidTbl2"),
+ NONACIDORCTBL("nonAcidOrcTbl"),
+ NONACIDORCTBL2("nonAcidOrcTbl2");
+
+ private final String name;
+ @Override
+ public String toString() {
+ return name;
+ }
+ Table(String name) {
+ this.name = name;
+ }
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ tearDown();
+ hiveConf = new HiveConf(this.getClass());
+ hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+ hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR);
+ TxnDbUtil.setConfValues(hiveConf);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVEENFORCEBUCKETING, true);
+ TxnDbUtil.prepDb();
+ File f = new File(TEST_WAREHOUSE_DIR);
+ if (f.exists()) {
+ FileUtil.fullyDelete(f);
+ }
+ if (!(new File(TEST_WAREHOUSE_DIR).mkdirs())) {
+ throw new RuntimeException("Could not create " + TEST_WAREHOUSE_DIR);
+ }
+ SessionState.start(new SessionState(hiveConf));
+ d = new Driver(hiveConf);
+ dropTables();
+ runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ runStatementOnDriver("create table " + Table.NONACIDORCTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')");
+ runStatementOnDriver("create table " + Table.NONACIDORCTBL2 + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')");
+ runStatementOnDriver("create temporary table " + Table.ACIDTBL2 + "(a int, b int, c int) clustered by (c) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ }
+ private void dropTables() throws Exception {
+ for(Table t : Table.values()) {
+ runStatementOnDriver("drop table if exists " + t);
+ }
+ }
+ @After
+ public void tearDown() throws Exception {
+ try {
+ if (d != null) {
+ runStatementOnDriver("set autocommit true");
+ dropTables();
+ d.destroy();
+ d.close();
+ d = null;
+ }
+ } finally {
+ TxnDbUtil.cleanDb();
+ FileUtils.deleteDirectory(new File(TEST_DATA_DIR));
+ }
+ }
+ @Test
+ public void testInsertOverwrite() throws Exception {
+ runStatementOnDriver("insert overwrite table " + Table.NONACIDORCTBL + " select a,b from " + Table.NONACIDORCTBL2);
+ runStatementOnDriver("create table " + Table.NONACIDORCTBL2 + "3(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')");
+
+ }
+ @Ignore("not needed but useful for testing")
+ @Test
+ public void testNonAcidInsert() throws Exception {
+ runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)");
+ List<String> rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+ runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(2,3)");
+ List<String> rs1 = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+ }
+
+ /**
+ * Useful for debugging. Dumps ORC file in JSON to CWD.
+ */
+ private void dumpBucketData(Table table, long txnId, int stmtId, int bucketNum) throws Exception {
+ if(true) {
+ return;
+ }
+ Path bucket = AcidUtils.createBucketFile(new Path(new Path(TEST_WAREHOUSE_DIR, table.toString().toLowerCase()), AcidUtils.deltaSubdir(txnId, txnId, stmtId)), bucketNum);
+ FileOutputStream delta = new FileOutputStream(testName.getMethodName() + "_" + bucket.getParent().getName() + "_" + bucket.getName());
+// try {
+// FileDump.printJsonData(hiveConf, bucket.toString(), delta);
+// }
+// catch(FileNotFoundException ex) {
+ ;//this happens if you change BUCKET_COUNT
+// }
+ delta.close();
+ }
+ /**
+ * Dump all data in the table by bucket in JSON format
+ */
+ private void dumpTableData(Table table, long txnId, int stmtId) throws Exception {
+ for(int bucketNum = 0; bucketNum < BUCKET_COUNT; bucketNum++) {
+ dumpBucketData(table, txnId, stmtId, bucketNum);
+ }
+ }
+ @Test
+ public void testSimpleAcidInsert() throws Exception {
+ int[][] rows1 = {{1,2},{3,4}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1));
+ //List<String> rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ //Assert.assertEquals("Data didn't match in autocommit=true (rs)", stringifyValues(rows1), rs);
+ runStatementOnDriver("set autocommit false");
+ runStatementOnDriver("START TRANSACTION");
+ int[][] rows2 = {{5,6},{7,8}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows2));
+ List<String> allData = stringifyValues(rows1);
+ allData.addAll(stringifyValues(rows2));
+ List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ Assert.assertEquals("Data didn't match inside tx (rs0)", allData, rs0);
+ runStatementOnDriver("COMMIT WORK");
+ dumpTableData(Table.ACIDTBL, 1, 0);
+ dumpTableData(Table.ACIDTBL, 2, 0);
+ runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ runStatementOnDriver("COMMIT");//txn started implicitly by previous statement
+ runStatementOnDriver("set autocommit true");
+ List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ Assert.assertEquals("Data didn't match inside tx (rs0)", allData, rs1);
+ }
+
+ /**
+ * add tests for all transitions - AC=t, AC=t, AC=f, commit (for example)
+ * @throws Exception
+ */
+ @Test
+ public void testErrors() throws Exception {
+ runStatementOnDriver("set autocommit true");
+ CommandProcessorResponse cpr = runStatementOnDriverNegative("start transaction");
+ Assert.assertEquals("Error didn't match: " + cpr, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT.getErrorCode(), cpr.getErrorCode());
+ runStatementOnDriver("set autocommit false");
+ runStatementOnDriver("start transaction");
+ CommandProcessorResponse cpr2 = runStatementOnDriverNegative("create table foo(x int, y int)");
+ Assert.assertEquals("Expected DDL to fail in an open txn", ErrorMsg.OP_NOT_ALLOWED_IN_TXN.getErrorCode(), cpr2.getErrorCode());
+ runStatementOnDriver("set autocommit true");
+ CommandProcessorResponse cpr3 = runStatementOnDriverNegative("update " + Table.ACIDTBL + " set a = 1 where b != 1");
+ Assert.assertEquals("Expected update of bucket column to fail",
+ "FAILED: SemanticException [Error 10302]: Updating values of bucketing columns is not supported. Column a.",
+ cpr3.getErrorMessage());
+ //line below should in principle work but Driver doesn't propagate errorCode properly
+ //Assert.assertEquals("Expected update of bucket column to fail", ErrorMsg.UPDATE_CANNOT_UPDATE_BUCKET_VALUE.getErrorCode(), cpr3.getErrorCode());
+ cpr3 = runStatementOnDriverNegative("commit work");//not allowed in AC=true
+ Assert.assertEquals("Error didn't match: " + cpr3, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT.getErrorCode(), cpr.getErrorCode());
+ cpr3 = runStatementOnDriverNegative("rollback work");//not allowed in AC=true
+ Assert.assertEquals("Error didn't match: " + cpr3, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT.getErrorCode(), cpr.getErrorCode());
+ runStatementOnDriver("set autocommit false");
+ cpr3 = runStatementOnDriverNegative("commit");//not allowed in w/o tx
+ Assert.assertEquals("Error didn't match: " + cpr3, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT.getErrorCode(), cpr.getErrorCode());
+ cpr3 = runStatementOnDriverNegative("rollback");//not allowed in w/o tx
+ Assert.assertEquals("Error didn't match: " + cpr3, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT.getErrorCode(), cpr.getErrorCode());
+ runStatementOnDriver("start transaction");
+ cpr3 = runStatementOnDriverNegative("start transaction");//not allowed in a tx
+ Assert.assertEquals("Expected start transaction to fail", ErrorMsg.OP_NOT_ALLOWED_IN_TXN.getErrorCode(), cpr3.getErrorCode());
+ runStatementOnDriver("start transaction");//ok since previously opened txn was killed
+ runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)");
+ List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ Assert.assertEquals("Can't see my own write", 1, rs0.size());
+ runStatementOnDriver("set autocommit true");//this should commit previous txn
+ rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ Assert.assertEquals("Can't see my own write", 1, rs0.size());
+ }
+ @Test
+ public void testReadMyOwnInsert() throws Exception {
+ runStatementOnDriver("set autocommit false");
+ runStatementOnDriver("START TRANSACTION");
+ List<String> rs = runStatementOnDriver("select * from " + Table.ACIDTBL);
+ Assert.assertEquals("Expected empty " + Table.ACIDTBL, 0, rs.size());
+ runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)");
+ List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ Assert.assertEquals("Can't see my own write", 1, rs0.size());
+ runStatementOnDriver("commit");
+ runStatementOnDriver("START TRANSACTION");
+ List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ runStatementOnDriver("rollback work");
+ Assert.assertEquals("Can't see write after commit", 1, rs1.size());
+ }
+ @Test
+ public void testImplicitRollback() throws Exception {
+ runStatementOnDriver("set autocommit false");
+ runStatementOnDriver("START TRANSACTION");
+ runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)");
+ List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ Assert.assertEquals("Can't see my own write", 1, rs0.size());
+ //next command should produce an error
+ CommandProcessorResponse cpr = runStatementOnDriverNegative("select * from no_such_table");
+ Assert.assertEquals("Txn didn't fail?",
+ "FAILED: SemanticException [Error 10001]: Line 1:14 Table not found 'no_such_table'",
+ cpr.getErrorMessage());
+ runStatementOnDriver("start transaction");
+ List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ runStatementOnDriver("commit");
+ Assert.assertEquals("Didn't rollback as expected", 0, rs1.size());
+ }
+ @Test
+ public void testExplicitRollback() throws Exception {
+ runStatementOnDriver("set autocommit false");
+ runStatementOnDriver("START TRANSACTION");
+ runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)");
+ runStatementOnDriver("ROLLBACK");
+ runStatementOnDriver("set autocommit true");
+ List<String> rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ Assert.assertEquals("Rollback didn't rollback", 0, rs.size());
+ }
+
+ @Test
+ public void testMultipleInserts() throws Exception {
+ runStatementOnDriver("set autocommit false");
+ runStatementOnDriver("START TRANSACTION");
+ int[][] rows1 = {{1,2},{3,4}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1));
+ int[][] rows2 = {{5,6},{7,8}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows2));
+ List<String> allData = stringifyValues(rows1);
+ allData.addAll(stringifyValues(rows2));
+ List<String> rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ Assert.assertEquals("Content didn't match before commit rs", allData, rs);
+ runStatementOnDriver("commit");
+ dumpTableData(Table.ACIDTBL, 1, 0);
+ dumpTableData(Table.ACIDTBL, 1, 1);
+ runStatementOnDriver("set autocommit true");
+ List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ Assert.assertEquals("Content didn't match after commit rs1", allData, rs1);
+ }
+ @Test
+ public void testDelete() throws Exception {
+ int[][] rows1 = {{1,2},{3,4}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1));
+ List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ Assert.assertEquals("Content didn't match rs0", stringifyValues(rows1), rs0);
+ runStatementOnDriver("set autocommit false");
+ runStatementOnDriver("START TRANSACTION");
+ runStatementOnDriver("delete from " + Table.ACIDTBL + " where b = 4");
+ int[][] updatedData2 = {{1,2}};
+ List<String> rs3 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ Assert.assertEquals("Wrong data after delete", stringifyValues(updatedData2), rs3);
+ runStatementOnDriver("commit");
+ runStatementOnDriver("set autocommit true");
+ List<String> rs4 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ Assert.assertEquals("Wrong data after commit", stringifyValues(updatedData2), rs4);
+ }
+
+ @Test
+ public void testUpdateOfInserts() throws Exception {
+ int[][] rows1 = {{1,2},{3,4}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1));
+ List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ Assert.assertEquals("Content didn't match rs0", stringifyValues(rows1), rs0);
+ runStatementOnDriver("set autocommit false");
+ runStatementOnDriver("START TRANSACTION");
+ int[][] rows2 = {{5,6},{7,8}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows2));
+ List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ List<String> allData = stringifyValues(rows1);
+ allData.addAll(stringifyValues(rows2));
+ Assert.assertEquals("Content didn't match rs1", allData, rs1);
+ runStatementOnDriver("update " + Table.ACIDTBL + " set b = 1 where b != 1");
+ int[][] updatedData = {{1,1},{3,1},{5,1},{7,1}};
+ List<String> rs2 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ Assert.assertEquals("Wrong data after update", stringifyValues(updatedData), rs2);
+ runStatementOnDriver("commit");
+ runStatementOnDriver("set autocommit true");
+ List<String> rs4 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ Assert.assertEquals("Wrong data after commit", stringifyValues(updatedData), rs4);
+ }
+ @Test
+ public void testUpdateDeleteOfInserts() throws Exception {
+ int[][] rows1 = {{1,2},{3,4}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1));
+ List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ Assert.assertEquals("Content didn't match rs0", stringifyValues(rows1), rs0);
+ runStatementOnDriver("set autocommit false");
+ runStatementOnDriver("START TRANSACTION");
+ int[][] rows2 = {{5,6},{7,8}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows2));
+ List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ List<String> allData = stringifyValues(rows1);
+ allData.addAll(stringifyValues(rows2));
+ Assert.assertEquals("Content didn't match rs1", allData, rs1);
+ runStatementOnDriver("update " + Table.ACIDTBL + " set b = 1 where b != 1");
+ int[][] updatedData = {{1,1},{3,1},{5,1},{7,1}};
+ List<String> rs2 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ Assert.assertEquals("Wrong data after update", stringifyValues(updatedData), rs2);
+ runStatementOnDriver("delete from " + Table.ACIDTBL + " where a = 7 and b = 1");
+ dumpTableData(Table.ACIDTBL, 1, 0);
+ dumpTableData(Table.ACIDTBL, 2, 0);
+ dumpTableData(Table.ACIDTBL, 2, 2);
+ dumpTableData(Table.ACIDTBL, 2, 4);
+ int[][] updatedData2 = {{1,1},{3,1},{5,1}};
+ List<String> rs3 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ Assert.assertEquals("Wrong data after delete", stringifyValues(updatedData2), rs3);
+ runStatementOnDriver("commit");
+ runStatementOnDriver("set autocommit true");
+ List<String> rs4 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ Assert.assertEquals("Wrong data after commit", stringifyValues(updatedData2), rs4);
+ }
+ @Test
+ public void testMultipleDelete() throws Exception {
+ int[][] rows1 = {{1,2},{3,4},{5,6},{7,8}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1));
+ List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ Assert.assertEquals("Content didn't match rs0", stringifyValues(rows1), rs0);
+ runStatementOnDriver("set autocommit false");
+ runStatementOnDriver("START TRANSACTION");
+ runStatementOnDriver("delete from " + Table.ACIDTBL + " where b = 8");
+ int[][] updatedData2 = {{1,2},{3,4},{5,6}};
+ List<String> rs2 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ Assert.assertEquals("Wrong data after delete", stringifyValues(updatedData2), rs2);
+ runStatementOnDriver("delete from " + Table.ACIDTBL + " where b = 4");
+ int[][] updatedData3 = {{1, 2}, {5, 6}};
+ List<String> rs3 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ Assert.assertEquals("Wrong data after delete2", stringifyValues(updatedData3), rs3);
+ runStatementOnDriver("update " + Table.ACIDTBL + " set b=3");
+ dumpTableData(Table.ACIDTBL, 1, 0);
+ //nothing actually hashes to bucket0, so update/delete deltas don't have it
+ dumpTableData(Table.ACIDTBL, 2, 0);
+ dumpTableData(Table.ACIDTBL, 2, 2);
+ dumpTableData(Table.ACIDTBL, 2, 4);
+ List<String> rs5 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ int [][] updatedData4 = {{1,3},{5,3}};
+ Assert.assertEquals("Wrong data after delete", stringifyValues(updatedData4), rs5);
+ runStatementOnDriver("commit");
+ runStatementOnDriver("set autocommit true");
+ List<String> rs4 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ Assert.assertEquals("Wrong data after commit", stringifyValues(updatedData4), rs4);
+ }
+ @Test
+ public void testDeleteIn() throws Exception {
+ runStatementOnDriver("delete from " + Table.ACIDTBL + " where a IN (SELECT A.a from " +
+ Table.ACIDTBL + " A)");
+ int[][] tableData = {{1,2},{3,2},{5,2},{1,3},{3,3},{5,3}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData));
+ runStatementOnDriver("insert into " + Table.ACIDTBL2 + "(a,b,c) values(1,7,17),(3,7,17)");
+// runStatementOnDriver("select b from " + Table.ACIDTBL + " where a in (select b from " + Table.NONACIDORCTBL + ")");
+ runStatementOnDriver("delete from " + Table.ACIDTBL + " where a in(select a from " + Table.ACIDTBL2 + ")");
+// runStatementOnDriver("delete from " + Table.ACIDTBL + " where a in(select a from " + Table.NONACIDORCTBL + ")");
+ runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) select a,b from " + Table.ACIDTBL2);
+ List<String> rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ int[][] updatedData = {{1,7},{3,7},{5,2},{5,3}};
+ Assert.assertEquals("Bulk update failed", stringifyValues(updatedData), rs);
+ }
+
+ /**
+ * takes raw data and turns it into a string as if from Driver.getResults()
+ * sorts rows in dictionary order
+ */
+ private List<String> stringifyValues(int[][] rowsIn) {
+ assert rowsIn.length > 0;
+ int[][] rows = rowsIn.clone();
+ Arrays.sort(rows, new RowComp());
+ List<String> rs = new ArrayList<String>();
+ for(int[] row : rows) {
+ assert row.length > 0;
+ StringBuilder sb = new StringBuilder();
+ for(int value : row) {
+ sb.append(value).append("\t");
+ }
+ sb.setLength(sb.length() - 1);
+ rs.add(sb.toString());
+ }
+ return rs;
+ }
+ private static final class RowComp implements Comparator<int[]> {
+ public int compare(int[] row1, int[] row2) {
+ assert row1 != null && row2 != null && row1.length == row2.length;
+ for(int i = 0; i < row1.length; i++) {
+ int comp = Integer.compare(row1[i], row2[i]);
+ if(comp != 0) {
+ return comp;
+ }
+ }
+ return 0;
+ }
+ }
+ private String makeValuesClause(int[][] rows) {
+ assert rows.length > 0;
+ StringBuilder sb = new StringBuilder("values");
+ for(int[] row : rows) {
+ assert row.length > 0;
+ if(row.length > 1) {
+ sb.append("(");
+ }
+ for(int value : row) {
+ sb.append(value).append(",");
+ }
+ sb.setLength(sb.length() - 1);//remove trailing comma
+ if(row.length > 1) {
+ sb.append(")");
+ }
+ sb.append(",");
+ }
+ sb.setLength(sb.length() - 1);//remove trailing comma
+ return sb.toString();
+ }
+
+ private List<String> runStatementOnDriver(String stmt) throws Exception {
+ CommandProcessorResponse cpr = d.run(stmt);
+ if(cpr.getResponseCode() != 0) {
+ throw new RuntimeException(stmt + " failed: " + cpr);
+ }
+ List<String> rs = new ArrayList<String>();
+ d.getResults(rs);
+ return rs;
+ }
+ private CommandProcessorResponse runStatementOnDriverNegative(String stmt) throws Exception {
+ CommandProcessorResponse cpr = d.run(stmt);
+ if(cpr.getResponseCode() != 0) {
+ return cpr;
+ }
+ throw new RuntimeException("Didn't get expected failure!");
+ }
+
+// @Ignore
+ @Test
+ public void exchangePartition() throws Exception {
+ runStatementOnDriver("create database ex1");
+ runStatementOnDriver("create database ex2");
+
+ runStatementOnDriver("CREATE TABLE ex1.exchange_part_test1 (f1 string) PARTITIONED BY (ds STRING)");
+ runStatementOnDriver("CREATE TABLE ex2.exchange_part_test2 (f1 string) PARTITIONED BY (ds STRING)");
+ runStatementOnDriver("ALTER TABLE ex2.exchange_part_test2 ADD PARTITION (ds='2013-04-05')");
+ runStatementOnDriver("ALTER TABLE ex1.exchange_part_test1 EXCHANGE PARTITION (ds='2013-04-05') WITH TABLE ex2.exchange_part_test2");
+ }
+}