You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2016/11/12 20:20:21 UTC
[3/3] hive git commit: HIVE-14943 Base Implementation (of HIVE-10924)
(Eugene Koifman, reviewed by Alan Gates)
HIVE-14943 Base Implementation (of HIVE-10924) (Eugene Koifman, reviewed by Alan Gates)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e00f909d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e00f909d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e00f909d
Branch: refs/heads/master
Commit: e00f909dd35ee46de5d9de493bb76e37ba4b6f74
Parents: 52ba014
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Sat Nov 12 12:20:01 2016 -0800
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Sat Nov 12 12:20:01 2016 -0800
----------------------------------------------------------------------
.../hive/metastore/LockComponentBuilder.java | 4 +
.../metastore/txn/CompactionTxnHandler.java | 5 +-
.../hadoop/hive/metastore/txn/TxnHandler.java | 51 +-
.../java/org/apache/hadoop/hive/ql/Context.java | 63 +-
.../java/org/apache/hadoop/hive/ql/Driver.java | 4 +-
.../org/apache/hadoop/hive/ql/ErrorMsg.java | 11 +-
.../apache/hadoop/hive/ql/exec/MoveTask.java | 4 +
.../apache/hadoop/hive/ql/hooks/ReadEntity.java | 3 +
.../hadoop/hive/ql/hooks/WriteEntity.java | 10 +
.../hadoop/hive/ql/lockmgr/DbTxnManager.java | 9 +-
.../hadoop/hive/ql/lockmgr/DummyTxnManager.java | 2 +-
.../hadoop/hive/ql/lockmgr/HiveTxnManager.java | 7 +-
.../BucketingSortingReduceSinkOptimizer.java | 7 +-
.../apache/hadoop/hive/ql/parse/ASTNode.java | 8 +
.../hadoop/hive/ql/parse/FromClauseParser.g | 6 +-
.../org/apache/hadoop/hive/ql/parse/HiveLexer.g | 2 +
.../apache/hadoop/hive/ql/parse/HiveParser.g | 60 +-
.../hadoop/hive/ql/parse/IdentifiersParser.g | 7 +-
.../hadoop/hive/ql/parse/QBParseInfo.java | 7 +
.../hadoop/hive/ql/parse/SelectClauseParser.g | 2 +-
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 155 +--
.../hive/ql/parse/SemanticAnalyzerFactory.java | 1 +
.../ql/parse/UpdateDeleteSemanticAnalyzer.java | 989 ++++++++++++++++---
.../hadoop/hive/ql/plan/CreateTableDesc.java | 4 +-
.../metastore/txn/TestCompactionTxnHandler.java | 1 +
.../apache/hadoop/hive/ql/TestTxnCommands.java | 35 +
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 417 +++++++-
.../ql/TestTxnCommands2WithSplitUpdate.java | 13 +-
.../hive/ql/lockmgr/TestDbTxnManager2.java | 645 +++++++++++-
.../apache/hadoop/hive/ql/parse/TestIUD.java | 4 +-
.../hive/ql/parse/TestMergeStatement.java | 246 +++++
31 files changed, 2464 insertions(+), 318 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/metastore/src/java/org/apache/hadoop/hive/metastore/LockComponentBuilder.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/LockComponentBuilder.java b/metastore/src/java/org/apache/hadoop/hive/metastore/LockComponentBuilder.java
index 3e8f193..e074152 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/LockComponentBuilder.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/LockComponentBuilder.java
@@ -102,6 +102,10 @@ public class LockComponentBuilder {
partNameSet = true;
return this;
}
+ public LockComponentBuilder setIsDynamicPartitionWrite(boolean t) {
+ component.setIsDynamicPartitionWrite(t);
+ return this;
+ }
/**
* Get the constructed lock component.
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 75a4d87..9145fcc 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -43,9 +43,6 @@ class CompactionTxnHandler extends TxnHandler {
static final private String CLASS_NAME = CompactionTxnHandler.class.getName();
static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
- // Always access COMPACTION_QUEUE before COMPLETED_TXN_COMPONENTS
- // See TxnHandler for notes on how to deal with deadlocks. Follow those notes.
-
public CompactionTxnHandler() {
}
@@ -428,7 +425,7 @@ class CompactionTxnHandler extends TxnHandler {
}
/**
- * Clean up aborted transactions from txns that have no components in txn_components. The reson such
+ * Clean up aborted transactions from txns that have no components in txn_components. The reason such
* txns exist can be that now work was done in this txn (e.g. Streaming opened TransactionBatch and
* abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called.
*/
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 547ee98..a815f2c 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -625,7 +625,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
rs = stmt.executeQuery(sqlGenerator.addLimitClause(1, "tc_operation_type " + conflictSQLSuffix));
if (rs.next()) {
close(rs);
- //here means currently committing txn performed update/delete and we should check WW conflict
+ //if here it means currently committing txn performed update/delete and we should check WW conflict
/**
* This S4U will mutex with other commitTxn() and openTxns().
* -1 below makes txn intervals look like [3,3] [4,4] if all txns are serial
@@ -653,7 +653,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
*/
rs = stmt.executeQuery
(sqlGenerator.addLimitClause(1, "committed.ws_txnid, committed.ws_commit_id, committed.ws_database," +
- "committed.ws_table, committed.ws_partition, cur.ws_commit_id cur_ws_commit_id " +
+ "committed.ws_table, committed.ws_partition, cur.ws_commit_id cur_ws_commit_id, " +
+ "cur.ws_operation_type cur_op, committed.ws_operation_type committed_op " +
"from WRITE_SET committed INNER JOIN WRITE_SET cur " +
"ON committed.ws_database=cur.ws_database and committed.ws_table=cur.ws_table " +
//For partitioned table we always track writes at partition level (never at table)
@@ -677,7 +678,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
resource.append('/').append(partitionName);
}
String msg = "Aborting [" + JavaUtils.txnIdToString(txnid) + "," + rs.getLong(6) + "]" + " due to a write conflict on " + resource +
- " committed by " + committedTxn;
+ " committed by " + committedTxn + " " + rs.getString(7) + "/" + rs.getString(8);
close(rs);
//remove WRITE_SET info for current txn since it's about to abort
dbConn.rollback(undoWriteSetForCurrentTxn);
@@ -712,6 +713,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
int modCount = 0;
if ((modCount = stmt.executeUpdate(s)) < 1) {
//this can be reasonable for an empty txn START/COMMIT or read-only txn
+ //also an IUD with DP that didn't match any rows.
LOG.info("Expected to move at least one record from txn_components to " +
"completed_txn_components when committing txn! " + JavaUtils.txnIdToString(txnid));
}
@@ -884,14 +886,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
if (txnid > 0) {
List<String> rows = new ArrayList<>();
- /**
- * todo QueryPlan has BaseSemanticAnalyzer which has acidFileSinks list of FileSinkDesc
- * FileSinkDesc.table is ql.metadata.Table
- * Table.tableSpec which is TableSpec, which has specType which is SpecType
- * So maybe this can work to know that this is part of dynamic partition insert in which case
- * we'll get addDynamicPartitions() call and should not write TXN_COMPONENTS here.
- * In any case, that's an optimization for now; will be required when adding multi-stmt txns
- */
// For each component in this lock request,
// add an entry to the txn_components table
for (LockComponent lc : rqst.getComponent()) {
@@ -909,7 +903,18 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
case INSERT:
case UPDATE:
case DELETE:
- updateTxnComponents = true;
+ if(!lc.isSetIsDynamicPartitionWrite()) {
+ //must be old client talking, i.e. we don't know if it's DP so be conservative
+ updateTxnComponents = true;
+ }
+ else {
+ /**
+ * we know this is part of DP operation and so we'll get
+ * {@link #addDynamicPartitions(AddDynamicPartitions)} call with the list
+ * of partitions actually chaged.
+ */
+ updateTxnComponents = !lc.isIsDynamicPartitionWrite();
+ }
break;
case SELECT:
updateTxnComponents = false;
@@ -1544,22 +1549,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
if(rqst.isSetOperationType()) {
ot = OpertaionType.fromDataOperationType(rqst.getOperationType());
}
-
- //what if a txn writes the same table > 1 time...(HIVE-9675) let's go with this for now, but really
- //need to not write this in the first place, i.e. make this delete not needed
- //see enqueueLockWithRetry() - that's where we write to TXN_COMPONENTS
- String deleteSql = "delete from TXN_COMPONENTS where tc_txnid=" + rqst.getTxnid() + " and tc_database=" +
- quoteString(rqst.getDbname()) + " and tc_table=" + quoteString(rqst.getTablename());
- //we delete the entries made by enqueueLockWithRetry() since those are based on lock information which is
- //much "wider" than necessary in a lot of cases. Here on the other hand, we know exactly which
- //partitions have been written to. w/o this WRITE_SET would contain entries for partitions not actually
- //written to
- int modCount = stmt.executeUpdate(deleteSql);
List<String> rows = new ArrayList<>();
for (String partName : rqst.getPartitionnames()) {
rows.add(rqst.getTxnid() + "," + quoteString(rqst.getDbname()) + "," + quoteString(rqst.getTablename()) +
"," + quoteString(partName) + "," + quoteChar(ot.sqlConst));
}
+ int modCount = 0;
+ //record partitions that were written to
List<String> queries = sqlGenerator.createInsertValuesStmt(
"TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type)", rows);
for(String query : queries) {
@@ -2080,7 +2076,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
public int compare(LockInfo info1, LockInfo info2) {
- // We sort by state (acquired vs waiting) and then by LockType, they by id
+ // We sort by state (acquired vs waiting) and then by LockType, then by id
if (info1.state == LockState.ACQUIRED &&
info2.state != LockState .ACQUIRED) {
return -1;
@@ -2285,6 +2281,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
/**
* todo: Longer term we should pass this from client somehow - this would be an optimization; once
* that is in place make sure to build and test "writeSet" below using OperationType not LockType
+ * With SP we assume that the query modifies exactly the partitions it locked. (not entirely
+ * realistic since Update/Delete may have some predicate that filters out all records out of
+ * some partition(s), but plausible). For DP, we acquire locks very wide (all known partitions),
+ * but for most queries only a fraction will actually be updated. #addDynamicPartitions() tells
+ * us exactly which ones were written to. Thus using this trick to kill a query early for
+ * DP queries may be too restrictive.
*/
boolean isPartOfDynamicPartitionInsert = true;
try {
@@ -2567,6 +2569,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
(desiredLock.txnId != 0 && desiredLock.txnId == existingLock.txnId) ||
//txnId=0 means it's a select or IUD which does not write to ACID table, e.g
//insert overwrite table T partition(p=1) select a,b from T and autoCommit=true
+ // todo: fix comment as of HIVE-14988
(desiredLock.txnId == 0 && desiredLock.extLockId == existingLock.extLockId);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/Context.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index 838d73e..4355d21 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -25,6 +25,7 @@ import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
+import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -44,13 +45,13 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.TaskRunner;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager.Heartbeater;
import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj;
import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
import org.apache.hadoop.hive.ql.lockmgr.LockException;
import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.ExplainConfiguration;
import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState;
import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
@@ -107,11 +108,6 @@ public class Context {
// Transaction manager for this query
protected HiveTxnManager hiveTxnManager;
- // Used to track what type of acid operation (insert, update, or delete) we are doing. Useful
- // since we want to change where bucket columns are accessed in some operators and
- // optimizations when doing updates and deletes.
- private AcidUtils.Operation acidOperation = AcidUtils.Operation.NOT_ACID;
-
private boolean needLockMgr;
private AtomicInteger sequencer = new AtomicInteger();
@@ -129,6 +125,53 @@ public class Context {
private Heartbeater heartbeater;
private boolean skipTableMasking;
+ /**
+ * This determines the prefix of the
+ * {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.Phase1Ctx#dest}
+ * name for a given subtree of the AST. Most of the times there is only 1 destination in a
+ * given tree but multi-insert has several and multi-insert representing MERGE must use
+ * different prefixes to encode the purpose of different Insert branches
+ */
+ private Map<ASTNode, DestClausePrefix> tree2DestNamePrefix;
+ public enum DestClausePrefix {
+ INSERT("insclause-"), UPDATE("updclause-"), DELETE("delclause-");
+ private final String prefix;
+ DestClausePrefix(String prefix) {
+ this.prefix = prefix;
+ }
+ public String toString() {
+ return prefix;
+ }
+ }
+ /**
+ * The suffix is always relative to a given ASTNode
+ */
+ public DestClausePrefix getDestNamePrefix(ASTNode curNode) {
+ //if there is no mapping, we want to default to "old" naming
+ assert curNode != null : "must supply curNode";
+ if(tree2DestNamePrefix == null || tree2DestNamePrefix.isEmpty()) {
+ return DestClausePrefix.INSERT;
+ }
+ do {
+ DestClausePrefix prefix = tree2DestNamePrefix.get(curNode);
+ if(prefix != null) {
+ return prefix;
+ }
+ curNode = (ASTNode) curNode.parent;
+ } while(curNode != null);
+ return DestClausePrefix.INSERT;
+ }
+ /**
+ * Will make SemanticAnalyzer.Phase1Ctx#dest in subtree rooted at 'tree' use 'prefix'
+ * @param tree
+ * @return previous prefix for 'tree' or null
+ */
+ public DestClausePrefix addDestNamePrefix(ASTNode tree, DestClausePrefix prefix) {
+ if(tree2DestNamePrefix == null) {
+ tree2DestNamePrefix = new IdentityHashMap<>();
+ }
+ return tree2DestNamePrefix.put(tree, prefix);
+ }
public Context(Configuration conf) throws IOException {
this(conf, generateExecutionId());
@@ -760,14 +803,6 @@ public class Context {
this.tryCount = tryCount;
}
- public void setAcidOperation(AcidUtils.Operation op) {
- acidOperation = op;
- }
-
- public AcidUtils.Operation getAcidOperation() {
- return acidOperation;
- }
-
public String getCboInfo() {
return cboInfo;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 923ef08..b77948b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -1111,7 +1111,9 @@ public class Driver implements CommandProcessor {
if (haveAcidWrite()) {
for (FileSinkDesc desc : acidSinks) {
desc.setTransactionId(txnMgr.getCurrentTxnId());
- desc.setStatementId(txnMgr.getStatementId());
+ //it's possible to have > 1 FileSink writing to the same table/partition
+ //e.g. Merge stmt, multi-insert stmt when mixing DP and SP writes
+ desc.setStatementId(txnMgr.getWriteIdAndIncrement());
}
}
/*Note, we have to record snapshot after lock acquisition to prevent lost update problem
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 7ed3907..97fcd55 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -398,7 +398,7 @@ public enum ErrorMsg {
DISTINCT_NOT_SUPPORTED(10285, "Distinct keyword is not support in current context"),
NONACID_COMPACTION_NOT_SUPPORTED(10286, "Compaction is not allowed on non-ACID table {0}.{1}", true),
- UPDATEDELETE_PARSE_ERROR(10290, "Encountered parse error while parsing rewritten update or " +
+ UPDATEDELETE_PARSE_ERROR(10290, "Encountered parse error while parsing rewritten merge/update or " +
"delete query"),
UPDATEDELETE_IO_ERROR(10291, "Encountered I/O error while parsing rewritten update or " +
"delete query"),
@@ -456,6 +456,10 @@ public enum ErrorMsg {
REPLACE_MATERIALIZED_WITH_VIEW(10401, "Attempt to replace materialized view {0} with view", true),
UPDATE_DELETE_VIEW(10402, "You cannot update or delete records in a view"),
MATERIALIZED_VIEW_DEF_EMPTY(10403, "Query for the materialized view rebuild could not be retrieved"),
+ MERGE_PREDIACTE_REQUIRED(10404, "MERGE statement with both UPDATE and DELETE clauses " +
+ "requires \"AND <boolean>\" on the 1st WHEN MATCHED clause of <{0}>", true),
+ MERGE_TOO_MANY_DELETE(10405, "MERGE statment can have at most 1 WHEN MATCHED ... DELETE clause: <{0}>", true),
+ MERGE_TOO_MANY_UPDATE(10406, "MERGE statment can have at most 1 WHEN MATCHED ... UPDATE clause: <{0}>", true),
//========================== 20000 range starts here ========================//
SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."),
SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. "
@@ -722,6 +726,11 @@ public enum ErrorMsg {
sb.append(":");
sb.append(getCharPositionInLine(tree));
}
+ public static String renderPosition(ASTNode n) {
+ StringBuilder sb = new StringBuilder();
+ ErrorMsg.renderPosition(sb, n);
+ return sb.toString();
+ }
public String getMsg(Tree tree) {
return getMsg((ASTNode) tree);
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index 8265af4..349f115 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -206,6 +206,10 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
}
Context ctx = driverContext.getCtx();
+ if(ctx.getHiveTxnManager().supportsAcid()) {
+ //Acid LM doesn't maintain getOutputLockObjects(); this 'if' just makes it more explicit
+ return;
+ }
HiveLockManager lockMgr = ctx.getHiveTxnManager().getLockManager();
WriteEntity output = ctx.getLoadTableOutputMap().get(ltd);
List<HiveLockObj> lockObjects = ctx.getOutputLockObjects().get(output);
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java
index 3d7de69..b805904 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java
@@ -53,6 +53,9 @@ public class ReadEntity extends Entity implements Serializable {
// important because in that case we shouldn't acquire a lock for it or authorize the read.
// These will be handled by the output to the table instead.
private boolean isUpdateOrDelete = false;
+ //https://issues.apache.org/jira/browse/HIVE-15048
+ public transient boolean isFromTopLevelQuery = true;
+
// For views, the entities can be nested - by default, entities are at the top level
// Must be deterministic order set for consistent q-test output across Java versions
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
index 9e18638..da8c1e2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
@@ -38,6 +38,7 @@ public class WriteEntity extends Entity implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(WriteEntity.class);
private boolean isTempURI = false;
+ private transient boolean isDynamicPartitionWrite = false;
public static enum WriteType {
DDL_EXCLUSIVE, // for use in DDL statements that require an exclusive lock,
@@ -221,5 +222,14 @@ public class WriteEntity extends Entity implements Serializable {
throw new RuntimeException("Unknown operation " + op.toString());
}
}
+ public boolean isDynamicPartitionWrite() {
+ return isDynamicPartitionWrite;
+ }
+ public void setDynamicPartitionWrite(boolean t) {
+ isDynamicPartitionWrite = t;
+ }
+ public String toDetailedString() {
+ return toString() + " Type=" + getTyp() + " WriteType=" + getWriteType() + " isDP=" + isDynamicPartitionWrite();
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index da7505b..867e445 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -321,6 +321,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
if(t != null && AcidUtils.isAcidTable(t)) {
compBuilder.setIsAcid(true);
}
+ compBuilder.setIsDynamicPartitionWrite(output.isDynamicPartitionWrite());
LockComponent comp = compBuilder.build();
LOG.debug("Adding lock component to lock request " + comp.toString());
rqstBuilder.addLockComponent(comp);
@@ -335,9 +336,6 @@ public class DbTxnManager extends HiveTxnManagerImpl {
}
List<HiveLock> locks = new ArrayList<HiveLock>(1);
- if(isTxnOpen()) {
- statementId++;
- }
LockState lockState = lockMgr.lock(rqstBuilder.build(), queryId, isBlocking, locks);
ctx.setHiveLocks(locks);
return lockState;
@@ -650,8 +648,9 @@ public class DbTxnManager extends HiveTxnManagerImpl {
return txnId;
}
@Override
- public int getStatementId() {
- return statementId;
+ public int getWriteIdAndIncrement() {
+ assert isTxnOpen();
+ return statementId++;
}
private static long getHeartbeatInterval(Configuration conf) throws LockException {
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
index 1d071a8..f001f59 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
@@ -63,7 +63,7 @@ class DummyTxnManager extends HiveTxnManagerImpl {
}
@Override
- public int getStatementId() {
+ public int getWriteIdAndIncrement() {
return 0;
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
index 9b4a97f..5b9ad60 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
@@ -210,8 +210,9 @@ public interface HiveTxnManager {
long getCurrentTxnId();
/**
- * 0..N Id of current statement within currently opened transaction
+ * Should be though of more as a unique write operation ID in a given txn (at QueryPlan level).
+ * Each statement writing data within a multi statement txn should have a unique WriteId.
+ * Even a single statement, (e.g. Merge, multi-insert may generates several writes).
*/
- int getStatementId();
-
+ int getWriteIdAndIncrement();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
index da261bb..8f40998 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
@@ -401,9 +401,12 @@ public class BucketingSortingReduceSinkOptimizer extends Transform {
return null;
}
+ assert fsOp.getConf().getWriteType() == rsOp.getConf().getWriteType() :
+ "WriteType mismatch. fsOp is " + fsOp.getConf().getWriteType() +
+ "; rsOp is " + rsOp.getConf().getWriteType();
// Don't do this optimization with updates or deletes
- if (pGraphContext.getContext().getAcidOperation() == AcidUtils.Operation.UPDATE ||
- pGraphContext.getContext().getAcidOperation() == AcidUtils.Operation.DELETE){
+ if (fsOp.getConf().getWriteType() == AcidUtils.Operation.UPDATE ||
+ fsOp.getConf().getWriteType() == AcidUtils.Operation.DELETE) {
return null;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java
index 62f9d14..0e6d903 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java
@@ -42,6 +42,7 @@ public class ASTNode extends CommonTree implements Node,Serializable {
private transient ASTNode rootNode;
private transient boolean isValidASTStr;
private transient boolean visited = false;
+ transient String matchedText;
public ASTNode() {
}
@@ -347,4 +348,11 @@ public class ASTNode extends CommonTree implements Node,Serializable {
return rootNode.getMemoizedSubString(startIndx, endIndx);
}
+ /**
+ * The string that generated this node.
+ * Only set for a node if parser grammar sets it for a particular rule
+ */
+ public String getMatchedText() {
+ return matchedText;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g
index 26aca96..f8adb38 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g
@@ -19,7 +19,7 @@ parser grammar FromClauseParser;
options
{
output=AST;
-ASTLabelType=CommonTree;
+ASTLabelType=ASTNode;
backtrack=false;
k=3;
}
@@ -142,7 +142,7 @@ tableAlias
fromSource
@init { gParent.pushMsg("from source", state); }
-@after { gParent.popMsg(state); }
+@after { $fromSource.tree.matchedText = $fromSource.text; gParent.popMsg(state); }
:
(LPAREN KW_VALUES) => fromSource0
| (LPAREN) => LPAREN joinSource RPAREN -> joinSource
@@ -278,7 +278,7 @@ searchCondition
// INSERT INTO <table> (col1,col2,...) SELECT * FROM (VALUES(1,2,3),(4,5,6),...) as Foo(a,b,c)
valueRowConstructor
@init { gParent.pushMsg("value row constructor", state); }
-@after { gParent.popMsg(state); }
+@after { $valueRowConstructor.tree.matchedText = $valueRowConstructor.text; gParent.popMsg(state); }
:
LPAREN precedenceUnaryPrefixExpression (COMMA precedenceUnaryPrefixExpression)* RPAREN -> ^(TOK_VALUE_ROW precedenceUnaryPrefixExpression+)
;
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
index 63c32a8..b467c51 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
@@ -336,6 +336,8 @@ KW_KEY: 'KEY';
KW_ABORT: 'ABORT';
KW_EXTRACT: 'EXTRACT';
KW_FLOOR: 'FLOOR';
+KW_MERGE: 'MERGE';
+KW_MATCHED: 'MATCHED';
// Operators
// NOTE: if you add a new function/operator, add it to sysFuncNames so that describe function _FUNC_ will work.
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
index 8aa39b0..bd53a36 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
@@ -20,7 +20,7 @@ options
{
tokenVocab=HiveLexer;
output=AST;
-ASTLabelType=CommonTree;
+ASTLabelType=ASTNode;
backtrack=false;
k=3;
}
@@ -384,6 +384,11 @@ TOK_ROLLBACK;
TOK_SET_AUTOCOMMIT;
TOK_CACHE_METADATA;
TOK_ABORT_TRANSACTIONS;
+TOK_MERGE;
+TOK_MATCHED;
+TOK_NOT_MATCHED;
+TOK_UPDATE;
+TOK_DELETE;
}
@@ -737,6 +742,7 @@ execStatement
| deleteStatement
| updateStatement
| sqlTransactionStatement
+ | mergeStatement
;
loadStatement
@@ -2670,3 +2676,55 @@ abortTransactionStatement
:
KW_ABORT KW_TRANSACTIONS ( Number )+ -> ^(TOK_ABORT_TRANSACTIONS ( Number )+)
;
+
+
+/*
+BEGIN SQL Merge statement
+*/
+mergeStatement
+@init { pushMsg("MERGE statement", state); }
+@after { popMsg(state); }
+ :
+ KW_MERGE KW_INTO tableName (KW_AS? identifier)? KW_USING fromSource KW_ON expression whenClauses ->
+ ^(TOK_MERGE ^(TOK_TABREF tableName identifier?) fromSource expression whenClauses)
+ ;
+/*
+Allow 0,1 or 2 WHEN MATCHED clauses and 0 or 1 WHEN NOT MATCHED
+Each WHEN clause may have AND <boolean predicate>.
+If 2 WHEN MATCHED clauses are present, 1 must be UPDATE the other DELETE and the 1st one
+must have AND <boolean predicate>
+*/
+whenClauses
+ :
+ (whenMatchedAndClause|whenMatchedThenClause)* whenNotMatchedClause?
+ ;
+whenNotMatchedClause
+@init { pushMsg("WHEN NOT MATCHED clause", state); }
+@after { popMsg(state); }
+ :
+ KW_WHEN KW_NOT KW_MATCHED (KW_AND expression)? KW_THEN KW_INSERT KW_VALUES valueRowConstructor ->
+ ^(TOK_NOT_MATCHED ^(TOK_INSERT valueRowConstructor) expression?)
+ ;
+whenMatchedAndClause
+@init { pushMsg("WHEN MATCHED AND clause", state); }
+@after { popMsg(state); }
+ :
+ KW_WHEN KW_MATCHED KW_AND expression KW_THEN updateOrDelete ->
+ ^(TOK_MATCHED updateOrDelete expression)
+ ;
+whenMatchedThenClause
+@init { pushMsg("WHEN MATCHED THEN clause", state); }
+@after { popMsg(state); }
+ :
+ KW_WHEN KW_MATCHED KW_THEN updateOrDelete ->
+ ^(TOK_MATCHED updateOrDelete)
+ ;
+updateOrDelete
+ :
+ KW_UPDATE setColumnsClause -> ^(TOK_UPDATE setColumnsClause)
+ |
+ KW_DELETE -> TOK_DELETE
+ ;
+/*
+END SQL Merge statement
+*/
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
index 2e40aa5..aa92739 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
@@ -19,7 +19,7 @@ parser grammar IdentifiersParser;
options
{
output=AST;
-ASTLabelType=CommonTree;
+ASTLabelType=ASTNode;
backtrack=false;
k=3;
}
@@ -383,7 +383,7 @@ intervalQualifiers
expression
@init { gParent.pushMsg("expression specification", state); }
-@after { gParent.popMsg(state); }
+@after { $expression.tree.matchedText = $expression.text; gParent.popMsg(state); }
:
precedenceOrExpression
;
@@ -459,6 +459,7 @@ precedencePlusOperator
;
precedencePlusExpression
+@after { $precedencePlusExpression.tree.matchedText = $precedencePlusExpression.text; }
:
precedenceStarExpression (precedencePlusOperator^ precedenceStarExpression)*
;
@@ -759,6 +760,8 @@ nonReserved
| KW_VALIDATE
| KW_NOVALIDATE
| KW_KEY
+ | KW_MERGE
+ | KW_MATCHED
;
//The following SQL2011 reserved keywords are used as function name only, but not as identifiers.
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
index 3a0402e..f549dff 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
@@ -181,6 +181,9 @@ public class QBParseInfo {
insertIntoTables.put(fullName.toLowerCase(), ast);
}
+ /**
+ * See also {@link #getInsertOverwriteTables()}
+ */
public boolean isInsertIntoTable(String dbName, String table) {
String fullName = dbName + "." + table;
return insertIntoTables.containsKey(fullName.toLowerCase());
@@ -188,6 +191,7 @@ public class QBParseInfo {
/**
* Check if a table is in the list to be inserted into
+ * See also {@link #getInsertOverwriteTables()}
* @param fullTableName table name in dbname.tablename format
* @return
*/
@@ -640,6 +644,9 @@ public class QBParseInfo {
this.isPartialScanAnalyzeCommand = isPartialScanAnalyzeCommand;
}
+ /**
+ * See also {@link #isInsertIntoTable(String)}
+ */
public Map<String, ASTNode> getInsertOverwriteTables() {
return insertOverwriteTables;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/SelectClauseParser.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SelectClauseParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/SelectClauseParser.g
index 3c6fa39..2c2e856 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SelectClauseParser.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SelectClauseParser.g
@@ -19,7 +19,7 @@ parser grammar SelectClauseParser;
options
{
output=AST;
-ASTLabelType=CommonTree;
+ASTLabelType=ASTNode;
backtrack=false;
k=3;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 17dfd03..8f5542b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.hive.conf.HiveConf.StrictChecks;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -72,6 +73,7 @@ import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryProperties;
import org.apache.hadoop.hive.ql.QueryState;
@@ -679,17 +681,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
this.ast = newAST;
}
- /**
- * Goes though the tabref tree and finds the alias for the table. Once found,
- * it records the table name-> alias association in aliasToTabs. It also makes
- * an association from the alias to the table AST in parse info.
- *
- * @return the alias of the table
- */
- private String processTable(QB qb, ASTNode tabref) throws SemanticException {
- // For each table reference get the table name
- // and the alias (if alias is not present, the table name
- // is used as an alias)
+ int[] findTabRefIdxs(ASTNode tabref) {
+ assert tabref.getType() == HiveParser.TOK_TABREF;
int aliasIndex = 0;
int propsIndex = -1;
int tsampleIndex = -1;
@@ -706,11 +699,12 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
aliasIndex = index;
}
}
-
+ return new int[] {aliasIndex, propsIndex, tsampleIndex, ssampleIndex};
+ }
+ String findSimpleTableName(ASTNode tabref, int aliasIndex) {
+ assert tabref.getType() == HiveParser.TOK_TABREF;
ASTNode tableTree = (ASTNode) (tabref.getChild(0));
- String tabIdName = getUnescapedName(tableTree).toLowerCase();
-
String alias;
if (aliasIndex != 0) {
alias = unescapeIdentifier(tabref.getChild(aliasIndex).getText());
@@ -718,6 +712,30 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
else {
alias = getUnescapedUnqualifiedTableName(tableTree);
}
+ return alias;
+ }
+ /**
+ * Goes though the tabref tree and finds the alias for the table. Once found,
+ * it records the table name-> alias association in aliasToTabs. It also makes
+ * an association from the alias to the table AST in parse info.
+ *
+ * @return the alias of the table
+ */
+ private String processTable(QB qb, ASTNode tabref) throws SemanticException {
+ // For each table reference get the table name
+ // and the alias (if alias is not present, the table name
+ // is used as an alias)
+ int[] indexes = findTabRefIdxs(tabref);
+ int aliasIndex = indexes[0];
+ int propsIndex = indexes[1];
+ int tsampleIndex = indexes[2];
+ int ssampleIndex = indexes[3];
+
+ ASTNode tableTree = (ASTNode) (tabref.getChild(0));
+
+ String tabIdName = getUnescapedName(tableTree).toLowerCase();
+
+ String alias = findSimpleTableName(tabref, aliasIndex);
if (propsIndex >= 0) {
Tree propsAST = tabref.getChild(propsIndex);
@@ -1423,7 +1441,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
doPhase1GetColumnAliasesFromSelect(ast, qbp);
qbp.setAggregationExprsForClause(ctx_1.dest, aggregations);
qbp.setDistinctFuncExprsForClause(ctx_1.dest,
- doPhase1GetDistinctFuncExprs(aggregations));
+ doPhase1GetDistinctFuncExprs(aggregations));
break;
case HiveParser.TOK_WHERE:
@@ -1438,7 +1456,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
qbp.addInsertIntoTable(tab_name, ast);
case HiveParser.TOK_DESTINATION:
- ctx_1.dest = "insclause-" + ctx_1.nextNum;
+ ctx_1.dest = this.ctx.getDestNamePrefix(ast).toString() + ctx_1.nextNum;
ctx_1.nextNum++;
boolean isTmpFileDest = false;
if (ast.getChildCount() > 0 && ast.getChild(0) instanceof ASTNode) {
@@ -1945,25 +1963,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(alias));
}
}
-
- // Disallow INSERT INTO on bucketized tables
- boolean isAcid = AcidUtils.isAcidTable(tab);
- boolean isTableWrittenTo = qb.getParseInfo().isInsertIntoTable(tab.getDbName(), tab.getTableName());
- if (isTableWrittenTo &&
- tab.getNumBuckets() > 0 && !isAcid) {
- throw new SemanticException(ErrorMsg.INSERT_INTO_BUCKETIZED_TABLE.
- getMsg("Table: " + tabName));
- }
- // Disallow update and delete on non-acid tables
- if ((updating() || deleting()) && !isAcid && isTableWrittenTo) {
- //isTableWrittenTo: delete from acidTbl where a in (select id from nonAcidTable)
- //so only assert this if we are actually writing to this table
- // Whether we are using an acid compliant transaction manager has already been caught in
- // UpdateDeleteSemanticAnalyzer, so if we are updating or deleting and getting nonAcid
- // here, it means the table itself doesn't support it.
- throw new SemanticException(ErrorMsg.ACID_OP_ON_NONACID_TABLE, tabName);
- }
-
if (tab.isView()) {
if (qb.getParseInfo().isAnalyzeCommand()) {
throw new SemanticException(ErrorMsg.ANALYZE_VIEW.getMsg());
@@ -2086,6 +2085,21 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
.getMsg(ast, "The class is " + outputFormatClass.toString()));
}
+ boolean isTableWrittenTo = qb.getParseInfo().isInsertIntoTable(ts.tableHandle.getDbName(),
+ ts.tableHandle.getTableName());
+ isTableWrittenTo |= (qb.getParseInfo().getInsertOverwriteTables().
+ get(getUnescapedName((ASTNode) ast.getChild(0), ts.tableHandle.getDbName())) != null);
+ assert isTableWrittenTo :
+ "Inconsistent data structure detected: we are writing to " + ts.tableHandle + " in " +
+ name + " but it's not in isInsertIntoTable() or getInsertOverwriteTables()";
+ // Disallow update and delete on non-acid tables
+ boolean isAcid = AcidUtils.isAcidTable(ts.tableHandle);
+ if ((updating(name) || deleting(name)) && !isAcid) {
+ // Whether we are using an acid compliant transaction manager has already been caught in
+ // UpdateDeleteSemanticAnalyzer, so if we are updating or deleting and getting nonAcid
+ // here, it means the table itself doesn't support it.
+ throw new SemanticException(ErrorMsg.ACID_OP_ON_NONACID_TABLE, ts.tableName);
+ }
// TableSpec ts is got from the query (user specified),
// which means the user didn't specify partitions in their query,
// but whether the table itself is partitioned is not know.
@@ -6421,7 +6435,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
if (dest_tab.getNumBuckets() > 0) {
enforceBucketing = true;
- if (updating() || deleting()) {
+ if (updating(dest) || deleting(dest)) {
partnCols = getPartitionColsFromBucketColsForUpdateDelete(input, true);
} else {
partnCols = getPartitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input, true);
@@ -6472,7 +6486,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
nullOrder.append(sortOrder == BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC ? 'a' : 'z');
}
input = genReduceSinkPlan(input, partnCols, sortCols, order.toString(), nullOrder.toString(),
- maxReducers, (AcidUtils.isAcidTable(dest_tab) ? getAcidType() : AcidUtils.Operation.NOT_ACID));
+ maxReducers, (AcidUtils.isAcidTable(dest_tab) ? getAcidType(dest) : AcidUtils.Operation.NOT_ACID));
reduceSinkOperatorsAddedByEnforceBucketingSorting.add((ReduceSinkOperator)input.getParentOperators().get(0));
ctx.setMultiFileSpray(multiFileSpray);
ctx.setNumFiles(numFiles);
@@ -6488,7 +6502,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
if ((dest_tab.getNumBuckets() > 0)) {
enforceBucketing = true;
- if (updating() || deleting()) {
+ if (updating(dest) || deleting(dest)) {
partnColsNoConvert = getPartitionColsFromBucketColsForUpdateDelete(input, false);
} else {
partnColsNoConvert = getPartitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input,
@@ -6646,7 +6660,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
if (!isNonNativeTable) {
AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID;
if (destTableIsAcid) {
- acidOp = getAcidType(table_desc.getOutputFileFormatClass());
+ acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest);
checkAcidConstraints(qb, table_desc, dest_tab);
}
ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp);
@@ -6666,7 +6680,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
// in the case of DP, we will register WriteEntity in MoveTask when the
// list of dynamically created partitions are known.
if ((dpCtx == null || dpCtx.getNumDPCols() == 0)) {
- output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable));
+ output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable, dest));
if (!outputs.add(output)) {
throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES
.getMsg(dest_tab.getTableName()));
@@ -6675,8 +6689,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
if ((dpCtx != null) && (dpCtx.getNumDPCols() >= 0)) {
// No static partition specified
if (dpCtx.getNumSPCols() == 0) {
- output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable), false);
+ output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable, dest), false);
outputs.add(output);
+ output.setDynamicPartitionWrite(true);
}
// part of the partition specified
// Create a DummyPartition in this case. Since, the metastore does not store partial
@@ -6689,7 +6704,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
new DummyPartition(dest_tab, dest_tab.getDbName()
+ "@" + dest_tab.getTableName() + "@" + ppath,
partSpec);
- output = new WriteEntity(p, getWriteType(), false);
+ output = new WriteEntity(p, getWriteType(dest), false);
+ output.setDynamicPartitionWrite(true);
outputs.add(output);
} catch (HiveException e) {
throw new SemanticException(e.getMessage(), e);
@@ -6753,7 +6769,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
dest_part.isStoredAsSubDirectories(), conf);
AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID;
if (destTableIsAcid) {
- acidOp = getAcidType(table_desc.getOutputFileFormatClass());
+ acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest);
checkAcidConstraints(qb, table_desc, dest_tab);
}
ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp);
@@ -6762,9 +6778,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
ltd.setLbCtx(lbCtx);
loadTableWork.add(ltd);
-
if (!outputs.add(new WriteEntity(dest_part,
- determineWriteType(ltd, dest_tab.isNonNative())))) {
+ determineWriteType(ltd, dest_tab.isNonNative(), dest)))) {
+
throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES
.getMsg(dest_tab.getTableName() + "@" + dest_part.getName()));
}
@@ -6925,7 +6941,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
ArrayList<ColumnInfo> vecCol = new ArrayList<ColumnInfo>();
- if (updating() || deleting()) {
+ if (updating(dest) || deleting(dest)) {
vecCol.add(new ColumnInfo(VirtualColumn.ROWID.getName(), VirtualColumn.ROWID.getTypeInfo(),
"", true));
} else {
@@ -6978,8 +6994,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
// If this is an insert, update, or delete on an ACID table then mark that so the
// FileSinkOperator knows how to properly write to it.
if (destTableIsAcid) {
- AcidUtils.Operation wt = updating() ? AcidUtils.Operation.UPDATE :
- (deleting() ? AcidUtils.Operation.DELETE : AcidUtils.Operation.INSERT);
+ AcidUtils.Operation wt = updating(dest) ? AcidUtils.Operation.UPDATE :
+ (deleting(dest) ? AcidUtils.Operation.DELETE : AcidUtils.Operation.INSERT);
fileSinkDesc.setWriteType(wt);
acidFileSinks.add(fileSinkDesc);
}
@@ -7150,7 +7166,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
}
// The numbers of input columns and output columns should match for regular query
- if (!updating() && !deleting() && inColumnCnt != outColumnCnt) {
+ if (!updating(dest) && !deleting(dest) && inColumnCnt != outColumnCnt) {
String reason = "Table " + dest + " has " + outColumnCnt
+ " columns, but query has " + inColumnCnt + " columns.";
throw new SemanticException(ErrorMsg.TARGET_TABLE_COLUMN_MISMATCH.getMsg(
@@ -7169,18 +7185,18 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
MetadataTypedColumnsetSerDe.class);
boolean isLazySimpleSerDe = table_desc.getDeserializerClass().equals(
LazySimpleSerDe.class);
- if (!isMetaDataSerDe && !deleting()) {
+ if (!isMetaDataSerDe && !deleting(dest)) {
// If we're updating, add the ROW__ID expression, then make the following column accesses
// offset by 1 so that we don't try to convert the ROW__ID
- if (updating()) {
+ if (updating(dest)) {
expressions.add(new ExprNodeColumnDesc(rowFields.get(0).getType(),
rowFields.get(0).getInternalName(), "", true));
}
// here only deals with non-partition columns. We deal with partition columns next
for (int i = 0; i < columnNumber; i++) {
- int rowFieldsOffset = updating() ? i + 1 : i;
+ int rowFieldsOffset = updating(dest) ? i + 1 : i;
ObjectInspector tableFieldOI = tableFields.get(i)
.getFieldObjectInspector();
TypeInfo tableFieldTypeInfo = TypeInfoUtils
@@ -7218,7 +7234,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
// deal with dynamic partition columns: convert ExprNodeDesc type to String??
if (dynPart && dpCtx != null && dpCtx.getNumDPCols() > 0) {
// DP columns starts with tableFields.size()
- for (int i = tableFields.size() + (updating() ? 1 : 0); i < rowFields.size(); ++i) {
+ for (int i = tableFields.size() + (updating(dest) ? 1 : 0); i < rowFields.size(); ++i) {
TypeInfo rowFieldTypeInfo = rowFields.get(i).getType();
ExprNodeDesc column = new ExprNodeColumnDesc(
rowFieldTypeInfo, rowFields.get(i).getInternalName(), "", true);
@@ -10525,7 +10541,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
}
/**
- * Planner specific stuff goen in here.
+ * Planner specific stuff goes in here.
*/
static class PlannerContext {
protected ASTNode child;
@@ -13045,18 +13061,19 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
}
}
- private WriteEntity.WriteType determineWriteType(LoadTableDesc ltd, boolean isNonNativeTable) {
+ private WriteEntity.WriteType determineWriteType(LoadTableDesc ltd, boolean isNonNativeTable, String dest) {
// Don't know the characteristics of non-native tables,
// and don't have a rational way to guess, so assume the most
// conservative case.
if (isNonNativeTable) return WriteEntity.WriteType.INSERT_OVERWRITE;
- else return (ltd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE : getWriteType());
- }
- private WriteEntity.WriteType getWriteType() {
- return updating() ? WriteEntity.WriteType.UPDATE :
- (deleting() ? WriteEntity.WriteType.DELETE : WriteEntity.WriteType.INSERT);
+ else return (ltd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE :
+ getWriteType(dest));
}
+ private WriteEntity.WriteType getWriteType(String dest) {
+ return updating(dest) ? WriteEntity.WriteType.UPDATE :
+ (deleting(dest) ? WriteEntity.WriteType.DELETE : WriteEntity.WriteType.INSERT);
+ }
private boolean isAcidOutputFormat(Class<? extends OutputFormat> of) {
Class<?>[] interfaces = of.getInterfaces();
for (Class<?> iface : interfaces) {
@@ -13069,28 +13086,28 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
// Note that this method assumes you have already decided this is an Acid table. It cannot
// figure out if a table is Acid or not.
- private AcidUtils.Operation getAcidType() {
- return deleting() ? AcidUtils.Operation.DELETE :
- (updating() ? AcidUtils.Operation.UPDATE :
+ private AcidUtils.Operation getAcidType(String destination) {
+ return deleting(destination) ? AcidUtils.Operation.DELETE :
+ (updating(destination) ? AcidUtils.Operation.UPDATE :
AcidUtils.Operation.INSERT);
}
- private AcidUtils.Operation getAcidType(Class<? extends OutputFormat> of) {
+ private AcidUtils.Operation getAcidType(Class<? extends OutputFormat> of, String dest) {
if (SessionState.get() == null || !SessionState.get().getTxnMgr().supportsAcid()) {
return AcidUtils.Operation.NOT_ACID;
} else if (isAcidOutputFormat(of)) {
- return getAcidType();
+ return getAcidType(dest);
} else {
return AcidUtils.Operation.NOT_ACID;
}
}
- protected boolean updating() {
- return false;
+ protected boolean updating(String destination) {
+ return destination.startsWith(Context.DestClausePrefix.UPDATE.toString());
}
- protected boolean deleting() {
- return false;
+ protected boolean deleting(String destination) {
+ return destination.startsWith(Context.DestClausePrefix.DELETE.toString());
}
// Make sure the proper transaction manager that supports ACID is being used
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
index 4f0ead0..ed01a31 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
@@ -296,6 +296,7 @@ public final class SemanticAnalyzerFactory {
case HiveParser.TOK_UPDATE_TABLE:
case HiveParser.TOK_DELETE_FROM:
+ case HiveParser.TOK_MERGE:
return new UpdateDeleteSemanticAnalyzer(queryState);
case HiveParser.TOK_START_TRANSACTION: