You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/11/15 20:21:01 UTC
[29/50] [abbrv] hive git commit: HIVE-14943 Base Implementation (of
HIVE-10924) (Eugene Koifman, reviewed by Alan Gates)
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
index 5b874e4..55a3735 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
@@ -18,7 +18,9 @@
package org.apache.hadoop.hive.ql.parse;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
@@ -26,15 +28,16 @@ import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConfUtil;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.Entity;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
@@ -46,7 +49,7 @@ import org.apache.hadoop.hive.ql.session.SessionState;
/**
* A subclass of the {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer} that just handles
- * update and delete statements. It works by rewriting the updates and deletes into insert
+ * update, delete and merge statements. It works by rewriting the updates and deletes into insert
* statements (since they are actually inserts) and then doing some patch up to make them work as
* updates and deletes instead.
*/
@@ -70,46 +73,249 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
switch (tree.getToken().getType()) {
case HiveParser.TOK_DELETE_FROM:
analyzeDelete(tree);
- return;
-
+ break;
case HiveParser.TOK_UPDATE_TABLE:
analyzeUpdate(tree);
- return;
-
+ break;
+ case HiveParser.TOK_MERGE:
+ analyzeMerge(tree);
+ break;
default:
throw new RuntimeException("Asked to parse token " + tree.getName() + " in " +
"UpdateDeleteSemanticAnalyzer");
}
+ cleanUpMetaColumnAccessControl();
+
}
}
-
- @Override
- protected boolean updating() {
- return ctx.getAcidOperation() == AcidUtils.Operation.UPDATE;
+ private boolean updating() {
+ return currentOperation == Operation.UPDATE;
}
-
- @Override
- protected boolean deleting() {
- return ctx.getAcidOperation() == AcidUtils.Operation.DELETE;
+ private boolean deleting() {
+ return currentOperation == Operation.DELETE;
}
private void analyzeUpdate(ASTNode tree) throws SemanticException {
- ctx.setAcidOperation(AcidUtils.Operation.UPDATE);
+ currentOperation = Operation.UPDATE;
reparseAndSuperAnalyze(tree);
}
private void analyzeDelete(ASTNode tree) throws SemanticException {
- ctx.setAcidOperation(AcidUtils.Operation.DELETE);
+ currentOperation = Operation.DELETE;
reparseAndSuperAnalyze(tree);
}
+ /**
+ * Append list of partition columns to Insert statement, i.e. the 1st set of partCol1,partCol2
+ * INSERT INTO T PARTITION(partCol1,partCol2...) SELECT col1, ... partCol1,partCol2...
+ */
+ private void addPartitionColsToInsert(List<FieldSchema> partCols, StringBuilder rewrittenQueryStr) {
+ // If the table is partitioned we have to put the partition() clause in
+ if (partCols != null && partCols.size() > 0) {
+ rewrittenQueryStr.append(" partition (");
+ boolean first = true;
+ for (FieldSchema fschema : partCols) {
+ if (first)
+ first = false;
+ else
+ rewrittenQueryStr.append(", ");
+ //would be nice if there was a way to determine if quotes are needed
+ rewrittenQueryStr.append(HiveUtils.unparseIdentifier(fschema.getName(), this.conf));
+ }
+ rewrittenQueryStr.append(")");
+ }
+ }
+ /**
+ * Append list of partition columns to Insert statement, i.e. the 2nd set of partCol1,partCol2
+ * INSERT INTO T PARTITION(partCol1,partCol2...) SELECT col1, ... partCol1,partCol2...
+ * @param targetName simple target table name (i.e. name or alias)
+ */
+ private void addPartitionColsToSelect(List<FieldSchema> partCols, StringBuilder rewrittenQueryStr, String targetName) {
+ // If the table is partitioned, we need to select the partition columns as well.
+ if (partCols != null) {
+ for (FieldSchema fschema : partCols) {
+ rewrittenQueryStr.append(", ");
+ //would be nice if there was a way to determine if quotes are needed
+ if(targetName != null) {
+ rewrittenQueryStr.append(HiveUtils.unparseIdentifier(targetName, this.conf)).append('.');
+ }
+ rewrittenQueryStr.append(HiveUtils.unparseIdentifier(fschema.getName(), this.conf));
+ }
+ }
+ }
+ /**
+ * Assert that we are not asked to update a bucketing column or partition column
+ * @param colName it's the A in "SET A = B"
+ */
+ private void checkValidSetClauseTarget(ASTNode colName, List<FieldSchema> partCols,
+ List<String> bucketingCols) throws SemanticException {
+ String columnName = normalizeColName(colName.getText());
+ // Make sure this isn't one of the partitioning columns, that's not supported.
+ if (partCols != null) {
+ for (FieldSchema fschema : partCols) {
+ if (fschema.getName().equalsIgnoreCase(columnName)) {
+ throw new SemanticException(ErrorMsg.UPDATE_CANNOT_UPDATE_PART_VALUE.getMsg());
+ }
+ }
+ }
+ //updating bucket column should move row from one file to another - not supported
+ if(bucketingCols != null && bucketingCols.contains(columnName)) {
+ throw new SemanticException(ErrorMsg.UPDATE_CANNOT_UPDATE_BUCKET_VALUE,columnName);
+ }
+ }
+ private ASTNode findLHSofAssignment(ASTNode assignment) {
+ assert assignment.getToken().getType() == HiveParser.EQUAL :
+ "Expected set assignments to use equals operator but found " + assignment.getName();
+ ASTNode tableOrColTok = (ASTNode)assignment.getChildren().get(0);
+ assert tableOrColTok.getToken().getType() == HiveParser.TOK_TABLE_OR_COL :
+ "Expected left side of assignment to be table or column";
+ ASTNode colName = (ASTNode)tableOrColTok.getChildren().get(0);
+ assert colName.getToken().getType() == HiveParser.Identifier :
+ "Expected column name";
+ return colName;
+ }
+ private Map<String, ASTNode> collectSetColumnsAndExpressions(
+ ASTNode setClause,List<FieldSchema> partCols, List<String> bucketingCols, Set<String> setRCols)
+ throws SemanticException {
+ // An update needs to select all of the columns, as we rewrite the entire row. Also,
+ // we need to figure out which columns we are going to replace.
+ assert setClause.getToken().getType() == HiveParser.TOK_SET_COLUMNS_CLAUSE :
+ "Expected second child of update token to be set token";
+
+ // Get the children of the set clause, each of which should be a column assignment
+ List<? extends Node> assignments = setClause.getChildren();
+ // Must be deterministic order map for consistent q-test output across Java versions
+ Map<String, ASTNode> setCols = new LinkedHashMap<String, ASTNode>(assignments.size());
+ for (Node a : assignments) {
+ ASTNode assignment = (ASTNode)a;
+ ASTNode colName = findLHSofAssignment(assignment);
+ if(setRCols != null) {
+ addSetRCols((ASTNode) assignment.getChildren().get(1), setRCols);
+ }
+ checkValidSetClauseTarget(colName, partCols, bucketingCols);
+
+ String columnName = normalizeColName(colName.getText());
+ // This means that in UPDATE T SET x = _something_
+ // _something_ can be whatever is supported in SELECT _something_
+ setCols.put(columnName, (ASTNode)assignment.getChildren().get(1));
+ }
+ return setCols;
+ }
+ /**
+ * @return the Metastore representation of the target table
+ */
+ private Table getTargetTable(ASTNode tabRef) throws SemanticException {
+ String[] tableName;
+ Table mTable;
+ switch (tabRef.getType()) {
+ case HiveParser.TOK_TABREF:
+ tableName = getQualifiedTableName((ASTNode) tabRef.getChild(0));
+ break;
+ case HiveParser.TOK_TABNAME:
+ tableName = getQualifiedTableName(tabRef);
+ break;
+ default:
+ throw raiseWrongType("TOK_TABREF|TOK_TABNAME", tabRef);
+ }
+ try {
+ mTable = db.getTable(tableName[0], tableName[1]);
+ } catch (InvalidTableException e) {
+ LOG.error("Failed to find table " + getDotName(tableName) + " got exception "
+ + e.getMessage());
+ throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(getDotName(tableName)), e);
+ } catch (HiveException e) {
+ LOG.error("Failed to find table " + getDotName(tableName) + " got exception "
+ + e.getMessage());
+ throw new SemanticException(e.getMessage(), e);
+ }
+ return mTable;
+ }
+ // Walk through all our inputs and set them to note that this read is part of an update or a
+ // delete.
+ private void markReadEntityForUpdate() {
+ for (ReadEntity input : inputs) {
+ if(isWritten(input)) {
+ //todo: this is actually not adding anything since LockComponent uses a Trie to "promote" a lock
+ //except by accident - when we have a partitioned target table we have a ReadEntity and WriteEntity
+ //for the table, so we mark ReadEntity and then delete WriteEntity (replace with Partition entries)
+ //so DbTxnManager skips Read lock on the ReadEntity....
+ input.setUpdateOrDelete(true);//input.noLockNeeded()?
+ }
+ }
+ }
+ /**
+ * For updates, we need to set the column access info so that it contains information on
+ * the columns we are updating.
+ * (But not all the columns of the target table even though the rewritten query writes
+ * all columns of target table since that is an implmentation detail)
+ */
+ private void setUpAccessControlInfoForUpdate(Table mTable, Map<String, ASTNode> setCols) {
+ ColumnAccessInfo cai = new ColumnAccessInfo();
+ for (String colName : setCols.keySet()) {
+ cai.add(Table.getCompleteName(mTable.getDbName(), mTable.getTableName()), colName);
+ }
+ setUpdateColumnAccessInfo(cai);
+ }
+ /**
+ * We need to weed ROW__ID out of the input column info, as it doesn't make any sense to
+ * require the user to have authorization on that column.
+ */
+ private void cleanUpMetaColumnAccessControl() {
+ //we do this for Update/Delete (incl Merge) because we introduce this column into the query
+ //as part of rewrite
+ if (columnAccessInfo != null) {
+ columnAccessInfo.stripVirtualColumn(VirtualColumn.ROWID);
+ }
+ }
+ /**
+ * Parse the newly generated SQL statment to get a new AST
+ */
+ private ReparseResult parseRewrittenQuery(StringBuilder rewrittenQueryStr, String originalQuery) throws SemanticException {
+ // Parse the rewritten query string
+ Context rewrittenCtx;
+ try {
+ // Set dynamic partitioning to nonstrict so that queries do not need any partition
+ // references.
+ // todo: this may be a perf issue as it prevents the optimizer.. or not
+ HiveConf.setVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+ rewrittenCtx = new Context(conf);
+ rewrittenCtx.setExplainConfig(ctx.getExplainConfig());
+ } catch (IOException e) {
+ throw new SemanticException(ErrorMsg.UPDATEDELETE_IO_ERROR.getMsg());
+ }
+ rewrittenCtx.setCmd(rewrittenQueryStr.toString());
+
+ ParseDriver pd = new ParseDriver();
+ ASTNode rewrittenTree;
+ try {
+ LOG.info("Going to reparse <" + originalQuery + "> as \n<" + rewrittenQueryStr.toString() + ">");
+ rewrittenTree = pd.parse(rewrittenQueryStr.toString(), rewrittenCtx);
+ rewrittenTree = ParseUtils.findRootNonNullToken(rewrittenTree);
+
+ } catch (ParseException e) {
+ throw new SemanticException(ErrorMsg.UPDATEDELETE_PARSE_ERROR.getMsg(), e);
+ }
+ return new ReparseResult(rewrittenTree, rewrittenCtx);
+ }
+ /**
+ * Assert it supports Acid write
+ */
+ private void validateTargetTable(Table mTable) throws SemanticException {
+ if (mTable.getTableType() == TableType.VIRTUAL_VIEW ||
+ mTable.getTableType() == TableType.MATERIALIZED_VIEW) {
+ LOG.error("Table " + getDotName(new String[] {mTable.getDbName(), mTable.getTableName()}) + " is a view or materialized view");
+ throw new SemanticException(ErrorMsg.UPDATE_DELETE_VIEW.getMsg());
+ }
+ }
+ /**
+ * This supports update and delete statements
+ */
private void reparseAndSuperAnalyze(ASTNode tree) throws SemanticException {
List<? extends Node> children = tree.getChildren();
// The first child should be the table we are deleting from
ASTNode tabName = (ASTNode)children.get(0);
assert tabName.getToken().getType() == HiveParser.TOK_TABNAME :
"Expected tablename as first child of " + operation() + " but found " + tabName.getName();
- String[] tableName = getQualifiedTableName(tabName);
// Rewrite the delete or update into an insert. Crazy, but it works as deletes and update
// actually are inserts into the delta file in Hive. A delete
@@ -129,98 +335,31 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
// merge on read.
StringBuilder rewrittenQueryStr = new StringBuilder();
- Table mTable;
- try {
- mTable = db.getTable(tableName[0], tableName[1]);
- } catch (InvalidTableException e) {
- LOG.error("Failed to find table " + getDotName(tableName) + " got exception "
- + e.getMessage());
- throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(getDotName(tableName)), e);
- } catch (HiveException e) {
- LOG.error("Failed to find table " + getDotName(tableName) + " got exception "
- + e.getMessage());
- throw new SemanticException(e.getMessage(), e);
- }
-
- if (mTable.getTableType() == TableType.VIRTUAL_VIEW ||
- mTable.getTableType() == TableType.MATERIALIZED_VIEW) {
- LOG.error("Table " + getDotName(tableName) + " is a view or materialized view");
- throw new SemanticException(ErrorMsg.UPDATE_DELETE_VIEW.getMsg());
- }
+ Table mTable = getTargetTable(tabName);
+ validateTargetTable(mTable);
List<FieldSchema> partCols = mTable.getPartCols();
List<String> bucketingCols = mTable.getBucketCols();
rewrittenQueryStr.append("insert into table ");
- rewrittenQueryStr.append(getDotName(new String[] {
- HiveUtils.unparseIdentifier(tableName[0], this.conf),
- HiveUtils.unparseIdentifier(tableName[1], this.conf) }));
+ rewrittenQueryStr.append(getFullTableNameForSQL(tabName));
- // If the table is partitioned we have to put the partition() clause in
- if (partCols != null && partCols.size() > 0) {
- rewrittenQueryStr.append(" partition (");
- boolean first = true;
- for (FieldSchema fschema : partCols) {
- if (first)
- first = false;
- else
- rewrittenQueryStr.append(", ");
- rewrittenQueryStr.append(HiveUtils.unparseIdentifier(fschema.getName(), this.conf));
- }
- rewrittenQueryStr.append(")");
- }
+ addPartitionColsToInsert(partCols, rewrittenQueryStr);
rewrittenQueryStr.append(" select ROW__ID");
+
Map<Integer, ASTNode> setColExprs = null;
Map<String, ASTNode> setCols = null;
// Must be deterministic order set for consistent q-test output across Java versions
Set<String> setRCols = new LinkedHashSet<String>();
if (updating()) {
- // An update needs to select all of the columns, as we rewrite the entire row. Also,
- // we need to figure out which columns we are going to replace. We won't write the set
+ // We won't write the set
// expressions in the rewritten query. We'll patch that up later.
// The set list from update should be the second child (index 1)
assert children.size() >= 2 : "Expected update token to have at least two children";
ASTNode setClause = (ASTNode)children.get(1);
- assert setClause.getToken().getType() == HiveParser.TOK_SET_COLUMNS_CLAUSE :
- "Expected second child of update token to be set token";
-
- // Get the children of the set clause, each of which should be a column assignment
- List<? extends Node> assignments = setClause.getChildren();
- // Must be deterministic order map for consistent q-test output across Java versions
- setCols = new LinkedHashMap<String, ASTNode>(assignments.size());
- setColExprs = new HashMap<Integer, ASTNode>(assignments.size());
- for (Node a : assignments) {
- ASTNode assignment = (ASTNode)a;
- assert assignment.getToken().getType() == HiveParser.EQUAL :
- "Expected set assignments to use equals operator but found " + assignment.getName();
- ASTNode tableOrColTok = (ASTNode)assignment.getChildren().get(0);
- assert tableOrColTok.getToken().getType() == HiveParser.TOK_TABLE_OR_COL :
- "Expected left side of assignment to be table or column";
- ASTNode colName = (ASTNode)tableOrColTok.getChildren().get(0);
- assert colName.getToken().getType() == HiveParser.Identifier :
- "Expected column name";
-
- addSetRCols((ASTNode) assignment.getChildren().get(1), setRCols);
-
- String columnName = normalizeColName(colName.getText());
-
- // Make sure this isn't one of the partitioning columns, that's not supported.
- if (partCols != null) {
- for (FieldSchema fschema : partCols) {
- if (fschema.getName().equalsIgnoreCase(columnName)) {
- throw new SemanticException(ErrorMsg.UPDATE_CANNOT_UPDATE_PART_VALUE.getMsg());
- }
- }
- }
- //updating bucket column should move row from one file to another - not supported
- if(bucketingCols != null && bucketingCols.contains(columnName)) {
- throw new SemanticException(ErrorMsg.UPDATE_CANNOT_UPDATE_BUCKET_VALUE,columnName);
- }
- // This means that in UPDATE T SET x = _something_
- // _something_ can be whatever is supported in SELECT _something_
- setCols.put(columnName, (ASTNode)assignment.getChildren().get(1));
- }
+ setCols = collectSetColumnsAndExpressions(setClause, partCols, bucketingCols, setRCols);
+ setColExprs = new HashMap<Integer, ASTNode>(setClause.getChildCount());
List<FieldSchema> nonPartCols = mTable.getCols();
for (int i = 0; i < nonPartCols.size(); i++) {
@@ -237,17 +376,9 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
}
}
- // If the table is partitioned, we need to select the partition columns as well.
- if (partCols != null) {
- for (FieldSchema fschema : partCols) {
- rewrittenQueryStr.append(", ");
- rewrittenQueryStr.append(HiveUtils.unparseIdentifier(fschema.getName(), this.conf));
- }
- }
+ addPartitionColsToSelect(partCols, rewrittenQueryStr, null);
rewrittenQueryStr.append(" from ");
- rewrittenQueryStr.append(getDotName(new String[] {
- HiveUtils.unparseIdentifier(tableName[0], this.conf),
- HiveUtils.unparseIdentifier(tableName[1], this.conf) }));
+ rewrittenQueryStr.append(getFullTableNameForSQL(tabName));
ASTNode where = null;
int whereIndex = deleting() ? 1 : 2;
@@ -260,35 +391,21 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
// Add a sort by clause so that the row ids come out in the correct order
rewrittenQueryStr.append(" sort by ROW__ID ");
- // Parse the rewritten query string
- Context rewrittenCtx;
- try {
- // Set dynamic partitioning to nonstrict so that queries do not need any partition
- // references.
- HiveConf.setVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
- rewrittenCtx = new Context(conf);
- rewrittenCtx.setExplainConfig(ctx.getExplainConfig());
- } catch (IOException e) {
- throw new SemanticException(ErrorMsg.UPDATEDELETE_IO_ERROR.getMsg());
- }
- rewrittenCtx.setCmd(rewrittenQueryStr.toString());
- rewrittenCtx.setAcidOperation(ctx.getAcidOperation());
-
- ParseDriver pd = new ParseDriver();
- ASTNode rewrittenTree;
- try {
- LOG.info("Going to reparse " + operation() + " as <" + rewrittenQueryStr.toString() + ">");
- rewrittenTree = pd.parse(rewrittenQueryStr.toString(), rewrittenCtx);
- rewrittenTree = ParseUtils.findRootNonNullToken(rewrittenTree);
-
- } catch (ParseException e) {
- throw new SemanticException(ErrorMsg.UPDATEDELETE_PARSE_ERROR.getMsg(), e);
- }
+ ReparseResult rr = parseRewrittenQuery(rewrittenQueryStr, ctx.getCmd());
+ Context rewrittenCtx = rr.rewrittenCtx;
+ ASTNode rewrittenTree = rr.rewrittenTree;
ASTNode rewrittenInsert = (ASTNode)rewrittenTree.getChildren().get(1);
assert rewrittenInsert.getToken().getType() == HiveParser.TOK_INSERT :
"Expected TOK_INSERT as second child of TOK_QUERY but found " + rewrittenInsert.getName();
+ if(updating()) {
+ rewrittenCtx.addDestNamePrefix(rewrittenInsert, Context.DestClausePrefix.UPDATE);
+ }
+ else if(deleting()) {
+ rewrittenCtx.addDestNamePrefix(rewrittenInsert, Context.DestClausePrefix.DELETE);
+ }
+
if (where != null) {
// The structure of the AST for the rewritten insert statement is:
// TOK_QUERY -> TOK_FROM
@@ -334,42 +451,38 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
useSuper = false;
}
- // Walk through all our inputs and set them to note that this read is part of an update or a
- // delete.
- for (ReadEntity input : inputs) {
- if(isWritten(input)) {
- input.setUpdateOrDelete(true);
- }
- }
+ markReadEntityForUpdate();
if (inputIsPartitioned(inputs)) {
+ //todo: there are bugs here: https://issues.apache.org/jira/browse/HIVE-15048
// In order to avoid locking the entire write table we need to replace the single WriteEntity
// with a WriteEntity for each partition
+ assert outputs.size() == 1 : "expected 1 WriteEntity. Got " + outputs;//this asserts comment above
+ WriteEntity original = null;
+ for(WriteEntity we : outputs) {
+ original = we;
+ }
outputs.clear();
for (ReadEntity input : inputs) {
+ /**
+ * The assumption here is that SemanticAnalyzer will will generate ReadEntity for each
+ * partition that exists and is matched by the WHERE clause (which may be all of them).
+ * Since we don't allow updating the value of a partition column, we know that we always
+ * write the same (or fewer) partitions than we read. Still, the write is a Dynamic
+ * Partition write - see HIVE-15032.
+ */
if (input.getTyp() == Entity.Type.PARTITION) {
WriteEntity.WriteType writeType = deleting() ? WriteEntity.WriteType.DELETE :
WriteEntity.WriteType.UPDATE;
- outputs.add(new WriteEntity(input.getPartition(), writeType));
+ WriteEntity we = new WriteEntity(input.getPartition(), writeType);
+ we.setDynamicPartitionWrite(original.isDynamicPartitionWrite());
+ outputs.add(we);
}
}
- } else {
- // We still need to patch up the WriteEntities as they will have an insert type. Change
- // them to the appropriate type for our operation.
- for (WriteEntity output : outputs) {
- output.setWriteType(deleting() ? WriteEntity.WriteType.DELETE :
- WriteEntity.WriteType.UPDATE);
- }
}
- // For updates, we need to set the column access info so that it contains information on
- // the columns we are updating.
if (updating()) {
- ColumnAccessInfo cai = new ColumnAccessInfo();
- for (String colName : setCols.keySet()) {
- cai.add(Table.getCompleteName(mTable.getDbName(), mTable.getTableName()), colName);
- }
- setUpdateColumnAccessInfo(cai);
+ setUpAccessControlInfoForUpdate(mTable, setCols);
// Add the setRCols to the input list
for (String colName : setRCols) {
@@ -379,14 +492,7 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
}
}
}
-
- // We need to weed ROW__ID out of the input column info, as it doesn't make any sense to
- // require the user to have authorization on that column.
- if (columnAccessInfo != null) {
- columnAccessInfo.stripVirtualColumn(VirtualColumn.ROWID);
- }
}
-
/**
* Check that {@code readEntity} is also being written
*/
@@ -400,10 +506,11 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
return false;
}
private String operation() {
- if (updating()) return "update";
- else if (deleting()) return "delete";
- else throw new IllegalStateException("UpdateDeleteSemanticAnalyzer neither updating nor " +
- "deleting, operation not known.");
+ if (currentOperation == Operation.NOT_ACID) {
+ throw new IllegalStateException("UpdateDeleteSemanticAnalyzer neither updating nor " +
+ "deleting, operation not known.");
+ }
+ return currentOperation.toString();
}
private boolean inputIsPartitioned(Set<ReadEntity> inputs) {
@@ -417,7 +524,7 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
return false;
}
- // This method find any columns on the right side of a set statement (thus rcols) and puts them
+ // This method finds any columns on the right side of a set statement (thus rcols) and puts them
// in a set so we can add them to the list of input cols to check.
private void addSetRCols(ASTNode node, Set<String> setRCols) {
@@ -443,4 +550,566 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
private static String normalizeColName(String colName) {
return colName.toLowerCase();
}
+
+ //todo: see SubQueryDiagnostic for some ideas on turning ASTNode into SQL
+ //todo: should we add MERGE to AcidUtils.Operation instead? that will be a lot of code clean up
+ private enum Operation {UPDATE, DELETE, MERGE, NOT_ACID};
+ private Operation currentOperation = Operation.NOT_ACID;
+ private static final String Indent = " ";
+
+ /**
+ * Here we take a Merge statement AST and generate a semantically equivalent multi-insert
+ * statement to exectue. Each Insert leg represents a single WHEN clause. As much as possible,
+ * the new SQL statement is made to look like the input SQL statement so that it's easier to map
+ * Query Compiler errors from generated SQL to original one this way.
+ * The generated SQL is a complete representation of the original input for the same reason.
+ * In many places SemanticAnalyzer throws exceptions that contain (line, position) coordinates.
+ * If generated SQL doesn't have everything and is patched up later, these coordinates point to
+ * the wrong place.
+ *
+ * @throws SemanticException
+ */
+ private void analyzeMerge(ASTNode tree) throws SemanticException {
+ currentOperation = Operation.MERGE;
+ /*
+ * See org.apache.hadoop.hive.ql.parse.TestMergeStatement for some examples of the merge AST
+ For example, given:
+ merge into acidTbl using nonAcidPart2 source ON acidTbl.a = source.a2
+ WHEN MATCHED THEN UPDATE set b = source.b2
+ WHEN NOT MATCHED THEN INSERT VALUES(source.a2, source.b2)
+
+ We get AST like this:
+ "(tok_merge " +
+ "(tok_tabname acidtbl) (tok_tabref (tok_tabname nonacidpart2) source) " +
+ "(= (. (tok_table_or_col acidtbl) a) (. (tok_table_or_col source) a2)) " +
+ "(tok_matched " +
+ "(tok_update " +
+ "(tok_set_columns_clause (= (tok_table_or_col b) (. (tok_table_or_col source) b2))))) " +
+ "(tok_not_matched " +
+ "tok_insert " +
+ "(tok_value_row (. (tok_table_or_col source) a2) (. (tok_table_or_col source) b2))))");
+
+ And need to produce a multi-insert like this to execute:
+ FROM acidTbl right outer join nonAcidPart2 ON acidTbl.a = source.a2
+ Insert into table acidTbl select nonAcidPart2.a2, nonAcidPart2.b2 where acidTbl.a is null
+ INSERT INTO TABLE acidTbl select target.ROW__ID, nonAcidPart2.a2, nonAcidPart2.b2 where nonAcidPart2.a2=acidTbl.a sort by acidTbl.ROW__ID
+ */
+ /*todo: we need some sort of validation phase over original AST to make things user friendly; for example, if
+ original command refers to a column that doesn't exist, this will be caught when processing the rewritten query but
+ the errors will point at locations that the user can't map to anything
+ - VALUES clause must have the same number of values as target table (including partition cols). Part cols go last in Select clause of Insert as Select
+ todo: do we care to preserve comments in original SQL?
+ todo: check if identifiers are propertly escaped/quoted in the generated SQL - it's currently inconsistent
+ Look at UnparseTranslator.addIdentifierTranslation() - it does unescape + unparse...
+ todo: consider "WHEN NOT MATCHED BY SOURCE THEN UPDATE SET TargetTable.Col1 = SourceTable.Col1 "; what happens hwen source is empty? This should be a runtime error - maybe not
+ the outer side of ROJ is empty => the join produces 0 rows. If supporting WHEN NOT MATCHED BY SOURCE, then this should be a runtime error
+ */
+ ASTNode target = (ASTNode)tree.getChild(0);
+ ASTNode source = (ASTNode)tree.getChild(1);
+ String targetName = getSimpleTableName(target);
+ String sourceName = getSimpleTableName(source);
+ ASTNode onClause = (ASTNode) tree.getChild(2);
+
+ Table targetTable = getTargetTable(target);
+ validateTargetTable(targetTable);
+ List<ASTNode> whenClauses = findWhenClauses(tree);
+
+ StringBuilder rewrittenQueryStr = new StringBuilder("FROM\n");
+ rewrittenQueryStr.append(Indent).append(getFullTableNameForSQL(target));
+ if(isAliased(target)) {
+ rewrittenQueryStr.append(" ").append(targetName);
+ }
+ rewrittenQueryStr.append('\n');
+ rewrittenQueryStr.append(Indent).append(chooseJoinType(whenClauses)).append("\n");
+ if(source.getType() == HiveParser.TOK_SUBQUERY) {
+ //this includes the mandatory alias
+ rewrittenQueryStr.append(Indent).append(source.getMatchedText());
+ }
+ else {
+ rewrittenQueryStr.append(Indent).append(getFullTableNameForSQL(source));
+ if(isAliased(source)) {
+ rewrittenQueryStr.append(" ").append(sourceName);
+ }
+ }
+ rewrittenQueryStr.append('\n');
+ rewrittenQueryStr.append(Indent).append("ON ").append(onClause.getMatchedText()).append('\n');
+
+ /**
+ * We allow at most 2 WHEN MATCHED clause, in which case 1 must be Update the other Delete
+ * If we have both update and delete, the 1st one (in SQL code) must have "AND <extra predicate>"
+ * so that the 2nd can ensure not to process the same rows.
+ * Update and Delete may be in any order. (Insert is always last)
+ */
+ String extraPredicate = null;
+ int numWhenMatchedUpdateClauses = 0, numWhenMatchedDeleteClauses = 0;
+ for(ASTNode whenClause : whenClauses) {
+ switch (getWhenClauseOperation(whenClause).getType()) {
+ case HiveParser.TOK_INSERT:
+ handleInsert(whenClause, rewrittenQueryStr, target, onClause, targetTable, targetName);
+ break;
+ case HiveParser.TOK_UPDATE:
+ numWhenMatchedUpdateClauses++;
+ String s = handleUpdate(whenClause, rewrittenQueryStr, target, onClause.getMatchedText(), targetTable, extraPredicate);
+ if(numWhenMatchedUpdateClauses + numWhenMatchedDeleteClauses == 1) {
+ extraPredicate = s;//i.e. it's the 1st WHEN MATCHED
+ }
+ break;
+ case HiveParser.TOK_DELETE:
+ numWhenMatchedDeleteClauses++;
+ String s1 = handleDelete(whenClause, rewrittenQueryStr, target, onClause.getMatchedText(), targetTable, extraPredicate);
+ if(numWhenMatchedUpdateClauses + numWhenMatchedDeleteClauses == 1) {
+ extraPredicate = s1;//i.e. it's the 1st WHEN MATCHED
+ }
+ break;
+ default:
+ throw new IllegalStateException("Unexpected WHEN clause type: " + whenClause.getType() +
+ addParseInfo(whenClause));
+ }
+ if(numWhenMatchedDeleteClauses > 1) {
+ throw new SemanticException(ErrorMsg.MERGE_TOO_MANY_DELETE, ctx.getCmd());
+ }
+ if(numWhenMatchedUpdateClauses > 1) {
+ throw new SemanticException(ErrorMsg.MERGE_TOO_MANY_UPDATE, ctx.getCmd());
+ }
+ }
+ if(numWhenMatchedDeleteClauses + numWhenMatchedUpdateClauses == 2 && extraPredicate == null) {
+ throw new SemanticException(ErrorMsg.MERGE_PREDIACTE_REQUIRED, ctx.getCmd());
+ }
+
+ ReparseResult rr = parseRewrittenQuery(rewrittenQueryStr, ctx.getCmd());
+ Context rewrittenCtx = rr.rewrittenCtx;
+ ASTNode rewrittenTree = rr.rewrittenTree;
+
+ //set dest name mapping on new context
+ for(int insClauseIdx = 1, whenClauseIdx = 0; insClauseIdx < rewrittenTree.getChildCount(); insClauseIdx++, whenClauseIdx++) {
+ //we've added Insert clauses in order or WHEN items in whenClauses
+ ASTNode insertClause = (ASTNode) rewrittenTree.getChild(insClauseIdx);
+ switch (getWhenClauseOperation(whenClauses.get(whenClauseIdx)).getType()) {
+ case HiveParser.TOK_INSERT:
+ rewrittenCtx.addDestNamePrefix(insertClause, Context.DestClausePrefix.INSERT);
+ break;
+ case HiveParser.TOK_UPDATE:
+ rewrittenCtx.addDestNamePrefix(insertClause, Context.DestClausePrefix.UPDATE);
+ break;
+ case HiveParser.TOK_DELETE:
+ rewrittenCtx.addDestNamePrefix(insertClause, Context.DestClausePrefix.DELETE);
+ break;
+ default:
+ assert false;
+ }
+ }
+ try {
+ useSuper = true;
+ super.analyze(rewrittenTree, rewrittenCtx);
+ } finally {
+ useSuper = false;
+ }
+
+ markReadEntityForUpdate();
+
+ if(targetTable.isPartitioned()) {
+ List<ReadEntity> partitionsRead = getRestrictedPartitionSet(targetTable);
+ if(!partitionsRead.isEmpty()) {
+ //if there is WriteEntity with WriteType=UPDATE/DELETE for target table, replace it with
+ //WriteEntity for each partition
+ List<WriteEntity> toRemove = new ArrayList<>();
+ for(WriteEntity we : outputs) {
+ WriteEntity.WriteType wt = we.getWriteType();
+ if(isTargetTable(we, targetTable) &&
+ (wt == WriteEntity.WriteType.UPDATE || wt == WriteEntity.WriteType.DELETE)) {
+ toRemove.add(we);
+ }
+ }
+ outputs.removeAll(toRemove);
+ for(ReadEntity re : partitionsRead) {
+ for(WriteEntity original : toRemove) {
+ //since we may have both Update and Delete branches, Auth needs to know
+ WriteEntity we = new WriteEntity(re.getPartition(), original.getWriteType());
+ we.setDynamicPartitionWrite(original.isDynamicPartitionWrite());
+ outputs.add(we);
+ }
+ }
+ }
+ }
+ }
+ /**
+ * If the optimizer has determined that it only has to read some of the partitions of the
+ * target table to satisfy the query, then we know that the write side of update/delete
+ * (and update/delete parts of merge)
+ * can only write (at most) that set of partitions (since we currently don't allow updating
+ * partition (or bucket) columns). So we want to replace the table level
+ * WriteEntity in the outputs with WriteEntity for each of these partitions
+ * ToDo: see if this should be moved to SemanticAnalyzer itself since it applies to any
+ * insert which does a select against the same table. Then SemanticAnalyzer would also
+ * be able to not use DP for the Insert...
+ *
+ * Note that the Insert of Merge may be creating new partitions and writing to partitions
+ * which were not read (WHEN NOT MATCHED...)
+ */
+ private List<ReadEntity> getRestrictedPartitionSet(Table targetTable) {
+ List<ReadEntity> partitionsRead = new ArrayList<>();
+ for(ReadEntity re : inputs) {
+ if(re.isFromTopLevelQuery && re.getType() == Entity.Type.PARTITION && isTargetTable(re, targetTable)) {
+ partitionsRead.add(re);
+ }
+ }
+ return partitionsRead;
+ }
+ /**
+ * if there is no WHEN NOT MATCHED THEN INSERT, we don't outer join
+ */
+ private String chooseJoinType(List<ASTNode> whenClauses) {
+ for(ASTNode whenClause : whenClauses) {
+ if(getWhenClauseOperation(whenClause).getType() == HiveParser.TOK_INSERT) {
+ return "RIGHT OUTER JOIN";
+ }
+ }
+ return "INNER JOIN";
+ }
+ /**
+ * does this Entity belong to target table (partition)
+ */
+ private boolean isTargetTable(Entity entity, Table targetTable) {
+ //todo: https://issues.apache.org/jira/browse/HIVE-15048
+ /**
+ * is this the right way to compare? Should it just compare paths?
+ * equals() impl looks heavy weight
+ */
+ return targetTable.equals(entity.getTable());
+ }
+ /**
+ * @param onClauseAsString - because there is no clone() and we need to use in multiple places
+ * @param deleteExtraPredicate - see notes at caller
+ */
+ private String handleUpdate(ASTNode whenMatchedUpdateClause, StringBuilder rewrittenQueryStr,
+ ASTNode target, String onClauseAsString, Table targetTable,
+ String deleteExtraPredicate) throws SemanticException {
+ assert whenMatchedUpdateClause.getType() == HiveParser.TOK_MATCHED;
+ assert getWhenClauseOperation(whenMatchedUpdateClause).getType() == HiveParser.TOK_UPDATE;
+ List<FieldSchema> partCols = targetTable.getPartCols();
+ List<String> bucketingCols = targetTable.getBucketCols();
+ String targetName = getSimpleTableName(target);
+ rewrittenQueryStr.append("INSERT INTO ").append(getFullTableNameForSQL(target));
+ addPartitionColsToInsert(partCols, rewrittenQueryStr);
+ rewrittenQueryStr.append("\n select ").append(targetName).append(".ROW__ID");
+
+ ASTNode setClause = (ASTNode)getWhenClauseOperation(whenMatchedUpdateClause).getChild(0);
+ //columns being updated -> update expressions; "setRCols" (last param) is null because we use actual expressions
+ //before reparsing, i.e. they are known to SemanticAnalyzer logic
+ Map<String, ASTNode> setColsExprs = collectSetColumnsAndExpressions(setClause, partCols, bucketingCols, null);
+ //if target table has cols c1,c2,c3 and p1 partition col and we had "SET c2 = 5, c1 = current_date()" we want to end up with
+ //insert into target (p1) select current_date(), 5, c3, p1 where ....
+ //since we take the RHS of set exactly as it was in Input, we don't need to deal with quoting/escaping column/table names
+ List<FieldSchema> nonPartCols = targetTable.getCols();
+ for(FieldSchema fs : nonPartCols) {
+ rewrittenQueryStr.append(", ");
+ String name = fs.getName();
+ if (setColsExprs.containsKey(name)) {
+ rewrittenQueryStr.append(setColsExprs.get(name).getMatchedText());
+ }
+ else {
+ //todo: is this the right way to get <table>.<colum> for target?
+ rewrittenQueryStr.append(getSimpleTableName(target)).append(".").append(HiveUtils.unparseIdentifier(name, this.conf));
+ }
+ }
+ addPartitionColsToSelect(partCols, rewrittenQueryStr, targetName);
+ rewrittenQueryStr.append("\n WHERE ").append(onClauseAsString);
+ String extraPredicate = getWhenClausePredicate(whenMatchedUpdateClause);
+ if(extraPredicate != null) {
+ //we have WHEN MATCHED AND <boolean expr> THEN DELETE
+ rewrittenQueryStr.append(" AND ").append(extraPredicate);
+ }
+ if(deleteExtraPredicate != null) {
+ rewrittenQueryStr.append(" AND NOT(").append(deleteExtraPredicate).append(")");
+ }
+ rewrittenQueryStr.append("\n sort by ");
+ rewrittenQueryStr.append(targetName).append(".ROW__ID \n");
+
+ setUpAccessControlInfoForUpdate(targetTable, setColsExprs);
+ //we don't deal with columns on RHS of SET expression since the whole expr is part of the
+ //rewritten SQL statement and is thus handled by SemanticAnalzyer. Nor do we have to
+ //figure which cols on RHS are from source and which from target
+
+ return extraPredicate;
+ }
+ /**
+ * @param onClauseAsString - because there is no clone() and we need to use in multiple places
+ * @param updateExtraPredicate - see notes at caller
+ */
+ private String handleDelete(ASTNode whenMatchedDeleteClause, StringBuilder rewrittenQueryStr, ASTNode target,
+ String onClauseAsString, Table targetTable, String updateExtraPredicate) throws SemanticException {
+ assert whenMatchedDeleteClause.getType() == HiveParser.TOK_MATCHED;
+ assert getWhenClauseOperation(whenMatchedDeleteClause).getType() == HiveParser.TOK_DELETE;
+ List<FieldSchema> partCols = targetTable.getPartCols();
+ String targetName = getSimpleTableName(target);
+ rewrittenQueryStr.append("INSERT INTO ").append(getFullTableNameForSQL(target));
+ addPartitionColsToInsert(partCols, rewrittenQueryStr);
+
+ rewrittenQueryStr.append("\n select ").append(targetName).append(".ROW__ID ");
+ addPartitionColsToSelect(partCols, rewrittenQueryStr, targetName);
+ rewrittenQueryStr.append("\n WHERE ").append(onClauseAsString);
+ String extraPredicate = getWhenClausePredicate(whenMatchedDeleteClause);
+ if(extraPredicate != null) {
+ //we have WHEN MATCHED AND <boolean expr> THEN DELETE
+ rewrittenQueryStr.append(" AND ").append(extraPredicate);
+ }
+ if(updateExtraPredicate != null) {
+ rewrittenQueryStr.append(" AND NOT(").append(updateExtraPredicate).append(")");
+ }
+ rewrittenQueryStr.append("\n sort by ");
+ rewrittenQueryStr.append(targetName).append(".ROW__ID \n");
+ return extraPredicate;
+ }
+ private static String addParseInfo(ASTNode n) {
+ return " at " + ErrorMsg.renderPosition(n);
+ }
+
+ /**
+ * Returns the table name to use in the generated query preserving original quotes/escapes if any
+ * @see #getFullTableNameForSQL(ASTNode)
+ */
+ private String getSimpleTableName(ASTNode n) throws SemanticException {
+ return HiveUtils.unparseIdentifier(getSimpleTableNameBase(n), this.conf);
+ }
+ private String getSimpleTableNameBase(ASTNode n) throws SemanticException {
+ switch (n.getType()) {
+ case HiveParser.TOK_TABREF:
+ int aliasIndex = findTabRefIdxs(n)[0];
+ if (aliasIndex != 0) {
+ return n.getChild(aliasIndex).getText();//the alias
+ }
+ return getSimpleTableNameBase((ASTNode) n.getChild(0));
+ case HiveParser.TOK_TABNAME:
+ if(n.getChildCount() == 2) {
+ //db.table -> return table
+ return n.getChild(1).getText();
+ }
+ return n.getChild(0).getText();
+ case HiveParser.TOK_SUBQUERY:
+ return n.getChild(1).getText();//the alias
+ default:
+ throw raiseWrongType("TOK_TABREF|TOK_TABNAME|TOK_SUBQUERY", n);
+ }
+ }
+ /**
+ * @return table name in db.table form with proper quoting/escaping to be used in a SQL statement
+ */
+ private String getFullTableNameForSQL(ASTNode n) throws SemanticException {
+ switch (n.getType()) {
+ case HiveParser.TOK_TABNAME:
+ String[] tableName = getQualifiedTableName(n);
+ return getDotName(new String[] {
+ HiveUtils.unparseIdentifier(tableName[0], this.conf),
+ HiveUtils.unparseIdentifier(tableName[1], this.conf) });
+ case HiveParser.TOK_TABREF:
+ return getFullTableNameForSQL((ASTNode) n.getChild(0));
+ default:
+ throw raiseWrongType("TOK_TABNAME", n);
+ }
+ } private static final class ReparseResult {
+ private final ASTNode rewrittenTree;
+ private final Context rewrittenCtx;
+ ReparseResult(ASTNode n, Context c) {
+ rewrittenTree = n;
+ rewrittenCtx = c;
+ }
+ }
+ private static IllegalArgumentException raiseWrongType(String expectedTokName, ASTNode n) {
+ return new IllegalArgumentException("Expected " + expectedTokName + "; got " + n.getType());
+ }
+ private boolean isAliased(ASTNode n) {
+ switch (n.getType()) {
+ case HiveParser.TOK_TABREF:
+ return findTabRefIdxs(n)[0] != 0;
+ case HiveParser.TOK_TABNAME:
+ return false;
+ case HiveParser.TOK_SUBQUERY:
+ assert n.getChildCount() > 1 : "Expected Derived Table to be aliased";
+ return true;
+ default:
+ throw raiseWrongType("TOK_TABREF|TOK_TABNAME", n);
+ }
+ }
+ /**
+ * Collect WHEN clauses from Merge statement AST
+ */
+ private List<ASTNode> findWhenClauses(ASTNode tree) throws SemanticException {
+ assert tree.getType() == HiveParser.TOK_MERGE;
+ List<ASTNode> whenClauses = new ArrayList<>();
+ for(int idx = 3; idx < tree.getChildCount(); idx++) {
+ ASTNode whenClause = (ASTNode)tree.getChild(idx);
+ assert whenClause.getType() == HiveParser.TOK_MATCHED ||
+ whenClause.getType() == HiveParser.TOK_NOT_MATCHED :
+ "Unexpected node type found: " + whenClause.getType() + addParseInfo(whenClause);
+ whenClauses.add(whenClause);
+ }
+ if(whenClauses.size() <= 0) {
+ //Futureproofing: the parser will actually not allow this
+ throw new SemanticException("Must have at least 1 WHEN clause in MERGE statement");
+ }
+ return whenClauses;
+ }
+ private ASTNode getWhenClauseOperation(ASTNode whenClause) {
+ if(!(whenClause.getType() == HiveParser.TOK_MATCHED || whenClause.getType() == HiveParser.TOK_NOT_MATCHED)) {
+ throw raiseWrongType("Expected TOK_MATCHED|TOK_NOT_MATCHED", whenClause);
+ }
+ return (ASTNode) whenClause.getChild(0);
+ }
+ /**
+ * returns the <boolean predicate> as in WHEN MATCHED AND <boolean predicate> THEN...
+ * @return may be null
+ */
+ private String getWhenClausePredicate(ASTNode whenClause) {
+ if(!(whenClause.getType() == HiveParser.TOK_MATCHED || whenClause.getType() == HiveParser.TOK_NOT_MATCHED)) {
+ throw raiseWrongType("Expected TOK_MATCHED|TOK_NOT_MATCHED", whenClause);
+ }
+ if(whenClause.getChildCount() == 2) {
+ return ((ASTNode)whenClause.getChild(1)).getMatchedText();
+ }
+ return null;
+ }
+ /**
+ * Generates the Insert leg of the multi-insert SQL to represent WHEN NOT MATCHED THEN INSERT clause
+ * @param targetTableNameInSourceQuery - simple name/alias
+ * @throws SemanticException
+ */
+ private void handleInsert(ASTNode whenNotMatchedClause, StringBuilder rewrittenQueryStr, ASTNode target,
+ ASTNode onClause, Table targetTable,
+ String targetTableNameInSourceQuery) throws SemanticException{
+ assert whenNotMatchedClause.getType() == HiveParser.TOK_NOT_MATCHED;
+ assert getWhenClauseOperation(whenNotMatchedClause).getType() == HiveParser.TOK_INSERT;
+ List<FieldSchema> partCols = targetTable.getPartCols();
+
+ String valuesClause = ((ASTNode)getWhenClauseOperation(whenNotMatchedClause).getChild(0))
+ .getMatchedText();
+ valuesClause = valuesClause.substring(1, valuesClause.length() - 1);
+ rewrittenQueryStr.append("INSERT INTO ").append(getFullTableNameForSQL(target));
+ addPartitionColsToInsert(partCols, rewrittenQueryStr);
+
+ OnClauseAnalyzer oca = new OnClauseAnalyzer(onClause, targetTable, targetTableNameInSourceQuery);
+ oca.analyze();
+ rewrittenQueryStr.append("\n select ")
+ .append(valuesClause).append("\n WHERE ").append(oca.getPredicate());
+ String extraPredicate = getWhenClausePredicate(whenNotMatchedClause);
+ if(extraPredicate != null) {
+ //we have WHEN NOT MATCHED AND <boolean expr> THEN INSERT
+ rewrittenQueryStr.append(" AND ")
+ .append(((ASTNode)whenNotMatchedClause.getChild(1)).getMatchedText()).append('\n');
+ }
+ }
+ /**
+ * Suppose the input Merge statement has ON target.a = source.b and c = d. Assume, that 'c' is from
+ * target table and 'd' is from source expression. In order to properly
+ * generate the Insert for WHEN NOT MATCHED THEN INSERT, we need to make sure that the Where
+ * clause of this Insert contains "target.a is null and target.c is null" This ensures that this
+ * Insert leg does not receive any rows that are processed by Insert corresponding to
+ * WHEN MATCHED THEN ... clauses. (Implicit in this is a mini resolver that figures out if an
+ * unqualified column is part of the target table. We can get away with this simple logic because
+ * we know that target is always a table (as opposed to some derived table).
+ * The job of this class is to generate this predicate.
+ *
+ * Note that is thi predicate cannot simply be NOT(on-clause-expr). IF on-clause-expr evaluates
+ * to Unknown, it will be treated as False in the WHEN MATCHED Inserts but NOT(Unknown) = Unknown,
+ * and so it will be False for WHEN NOT MATCHED Insert...
+ */
+ private static final class OnClauseAnalyzer {
+ private final ASTNode onClause;
+ private final Map<String, List<String>> table2column = new HashMap<>();
+ private final List<String> unresolvedColumns = new ArrayList<>();
+ private final List<FieldSchema> allTargetTableColumns = new ArrayList<>();
+ private final Set<String> tableNamesFound = new HashSet<>();
+ private final String targetTableNameInSourceQuery;
+ /**
+ * @param targetTableNameInSourceQuery alias or simple name
+ */
+ OnClauseAnalyzer(ASTNode onClause, Table targetTable, String targetTableNameInSourceQuery) {
+ this.onClause = onClause;
+ allTargetTableColumns.addAll(targetTable.getCols());
+ allTargetTableColumns.addAll(targetTable.getPartCols());
+ this.targetTableNameInSourceQuery = unescapeIdentifier(targetTableNameInSourceQuery);
+ }
+ /**
+ * finds all columns and groups by table ref (if there is one)
+ */
+ private void visit(ASTNode n) {
+ if(n.getType() == HiveParser.TOK_TABLE_OR_COL) {
+ ASTNode parent = (ASTNode) n.getParent();
+ if(parent != null && parent.getType() == HiveParser.DOT) {
+ //the ref must be a table, so look for column name as right child of DOT
+ if(parent.getParent() != null && parent.getParent().getType() == HiveParser.DOT) {
+ //I don't think this can happen... but just in case
+ throw new IllegalArgumentException("Found unexpected db.table.col reference in " + onClause.getMatchedText());
+ }
+ addColumn2Table(n.getChild(0).getText(), parent.getChild(1).getText());
+ }
+ else {
+ //must be just a column name
+ unresolvedColumns.add(n.getChild(0).getText());
+ }
+ }
+ if(n.getChildCount() == 0) {
+ return;
+ }
+ for(Node child : n.getChildren()) {
+ visit((ASTNode)child);
+ }
+ }
+ private void analyze() {
+ visit(onClause);
+ int numTableRefs = tableNamesFound.size();
+ if(tableNamesFound.size() > 2) {
+ throw new IllegalArgumentException("Found > 2 table refs in ON clause. Found " +
+ tableNamesFound + " in " + onClause.getMatchedText());
+ }
+ handleUnresolvedColumns();
+ if(tableNamesFound.size() > 2) {
+ throw new IllegalArgumentException("Found > 2 table refs in ON clause (incl unresolved). " +
+ "Found " + tableNamesFound + " in " + onClause.getMatchedText());
+ }
+ }
+ /**
+ * Find those that belong to target table
+ */
+ private void handleUnresolvedColumns() {
+ if(unresolvedColumns.isEmpty()) { return; }
+ for(String c : unresolvedColumns) {
+ for(FieldSchema fs : allTargetTableColumns) {
+ if(c.equalsIgnoreCase(fs.getName())) {
+ //c belongs to target table; strictly speaking there maybe an ambiguous ref but
+ //this will be caught later when multi-insert is parsed
+ addColumn2Table(targetTableNameInSourceQuery.toLowerCase(), c);
+ break;
+ }
+ }
+ }
+ }
+ private void addColumn2Table(String tableName, String columnName) {
+ tableName = tableName.toLowerCase();//normalize name for mapping
+ tableNamesFound.add(tableName);
+ List<String> cols = table2column.get(tableName);
+ if(cols == null) {
+ cols = new ArrayList<>();
+ table2column.put(tableName, cols);
+ }
+ //we want to preserve 'columnName' as it was in original input query so that rewrite
+ //looks as much as possible like original query
+ cols.add(columnName);
+ }
+ /**
+ * Now generate the predicate for Where clause
+ */
+ private String getPredicate() {
+ //normilize table name for mapping
+ List<String> targetCols = table2column.get(targetTableNameInSourceQuery.toLowerCase());
+ StringBuilder sb = new StringBuilder();
+ for(String col : targetCols) {
+ if(sb.length() > 0) {
+ sb.append(" AND ");
+ }
+ //but preserve table name in SQL
+ sb.append(targetTableNameInSourceQuery).append(".").append(col).append(" IS NULL");
+ }
+ return sb.toString();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
index eafba21..60858e6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
@@ -516,7 +516,7 @@ public class CreateTableDesc extends DDLDesc implements Serializable {
}
}
if (!found) {
- throw new SemanticException(ErrorMsg.INVALID_COLUMN.getMsg());
+ throw new SemanticException(ErrorMsg.INVALID_COLUMN.getMsg(" \'" + bucketCol + "\'"));
}
}
}
@@ -536,7 +536,7 @@ public class CreateTableDesc extends DDLDesc implements Serializable {
}
}
if (!found) {
- throw new SemanticException(ErrorMsg.INVALID_COLUMN.getMsg());
+ throw new SemanticException(ErrorMsg.INVALID_COLUMN.getMsg(" \'" + sortCol + "\'"));
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index f513d0f..f8ae86b 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -417,6 +417,7 @@ public class TestCompactionTxnHandler {
long txnId = openTxns.getTxn_ids().get(0);
// lock a table, as in dynamic partitions
LockComponent lc = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, dbName);
+ lc.setIsDynamicPartitionWrite(true);
lc.setTablename(tableName);
DataOperationType dop = DataOperationType.UPDATE;
lc.setOperationType(dop);
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 64baa9f..68af15a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService;
@@ -580,4 +581,38 @@ public class TestTxnCommands {
runStatementOnDriver("ALTER TABLE ex2.exchange_part_test2 ADD PARTITION (ds='2013-04-05')");
runStatementOnDriver("ALTER TABLE ex1.exchange_part_test1 EXCHANGE PARTITION (ds='2013-04-05') WITH TABLE ex2.exchange_part_test2");
}
+ @Test
+ public void testMergeNegative() throws Exception {
+ CommandProcessorResponse cpr = runStatementOnDriverNegative("MERGE INTO " + Table.ACIDTBL +
+ " target USING " + Table.NONACIDORCTBL +
+ " source\nON target.a = source.a " +
+ "\nWHEN MATCHED THEN UPDATE set b = 1 " +
+ "\nWHEN MATCHED THEN DELETE " +
+ "\nWHEN NOT MATCHED AND a < 1 THEN INSERT VALUES(1,2)");
+ Assert.assertEquals(ErrorMsg.MERGE_PREDIACTE_REQUIRED, ((HiveException)cpr.getException()).getCanonicalErrorMsg());
+ }
+ @Test
+ public void testMergeNegative2() throws Exception {
+ CommandProcessorResponse cpr = runStatementOnDriverNegative("MERGE INTO "+ Table.ACIDTBL +
+ " target USING " + Table.NONACIDORCTBL + "\n source ON target.pk = source.pk " +
+ "\nWHEN MATCHED THEN UPDATE set t = 1 " +
+ "\nWHEN MATCHED THEN UPDATE set b=a");
+ Assert.assertEquals(ErrorMsg.MERGE_TOO_MANY_UPDATE, ((HiveException)cpr.getException()).getCanonicalErrorMsg());
+ }
+ @Ignore
+ @Test
+ public void testSpecialChar() throws Exception {
+ String target = "`aci/d_u/ami`";
+ String src = "`src/name`";
+ runStatementOnDriver("drop table if exists " + target);
+ runStatementOnDriver("drop table if exists " + src);
+ runStatementOnDriver("create table " + target + "(i int," +
+ "`d?*de e` decimal(5,2)," +
+ "vc varchar(128)) clustered by (i) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ runStatementOnDriver("create table " + src + "(`g/h` int, j decimal(5,2), k varchar(128))");
+ runStatementOnDriver("merge into " + target + " as `d/8` using " + src + " as `a/b` on i=`g/h` " +
+ "\nwhen matched and i > 5 then delete " +
+ "\nwhen matched then update set vc=`\u2206\u220b` " +
+ "\nwhen not matched then insert values(`a/b`.`g/h`,`a/b`.j,`a/b`.k)");
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 949e071..49ba667 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -64,12 +64,15 @@ import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* TODO: this should be merged with TestTxnCommands once that is checked in
* specifically the tests; the supporting code here is just a clone of TestTxnCommands
*/
public class TestTxnCommands2 {
+ static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommands2.class);
protected static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
File.separator + TestTxnCommands2.class.getCanonicalName()
+ "-" + System.currentTimeMillis()
@@ -86,7 +89,9 @@ public class TestTxnCommands2 {
ACIDTBL("acidTbl"),
ACIDTBLPART("acidTblPart"),
NONACIDORCTBL("nonAcidOrcTbl"),
- NONACIDPART("nonAcidPart");
+ NONACIDPART("nonAcidPart"),
+ NONACIDPART2("nonAcidPart2"),
+ ACIDNESTEDPART("acidNestedPart");
private final String name;
@Override
@@ -126,11 +131,17 @@ public class TestTxnCommands2 {
}
SessionState.start(new SessionState(hiveConf));
d = new Driver(hiveConf);
+ d.setMaxRows(10000);
dropTables();
runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES (" + tableProperties + ")");
runStatementOnDriver("create table " + Table.ACIDTBLPART + "(a int, b int) partitioned by (p string) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES (" + tableProperties + ")");
runStatementOnDriver("create table " + Table.NONACIDORCTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')");
runStatementOnDriver("create table " + Table.NONACIDPART + "(a int, b int) partitioned by (p string) stored as orc TBLPROPERTIES ('transactional'='false')");
+ runStatementOnDriver("create table " + Table.NONACIDPART2 +
+ "(a2 int, b2 int) partitioned by (p2 string) stored as orc TBLPROPERTIES ('transactional'='false')");
+ runStatementOnDriver("create table " + Table.ACIDNESTEDPART +
+ "(a int, b int) partitioned by (p int, q int) clustered by (a) into " + BUCKET_COUNT +
+ " buckets stored as orc TBLPROPERTIES (" + tableProperties + ")");
}
protected void dropTables() throws Exception {
@@ -1333,6 +1344,409 @@ public class TestTxnCommands2 {
String[] expectedResult = { "1\tfoo\tNULL", "2\tbar\tNULL" };
Assert.assertEquals(Arrays.asList(expectedResult), rs);
}
+ /**
+ * Test that ACID works with multi-insert statement
+ */
+ @Test
+ public void testMultiInsertStatement() throws Exception {
+ int[][] sourceValsOdd = {{5,5},{11,11}};
+ int[][] sourceValsEven = {{2,2}};
+ //populate source
+ runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='odd') " + makeValuesClause(sourceValsOdd));
+ runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='even') " + makeValuesClause(sourceValsEven));
+ int[][] targetValsOdd = {{5,6},{7,8}};
+ int[][] targetValsEven = {{2,1},{4,3}};
+ //populate target
+ runStatementOnDriver("insert into " + Table.ACIDTBLPART + " PARTITION(p='odd') " + makeValuesClause(targetValsOdd));
+ runStatementOnDriver("insert into " + Table.ACIDTBLPART + " PARTITION(p='even') " + makeValuesClause(targetValsEven));
+ List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBLPART + " order by a,b");
+ int[][] targetVals = {{2,1},{4,3},{5,6},{7,8}};
+ Assert.assertEquals(stringifyValues(targetVals), r);
+ //currently multi-insrt doesn't allow same table/partition in > 1 output branch
+ String s = "from " + Table.ACIDTBLPART + " target right outer join " +
+ Table.NONACIDPART2 + " source on target.a = source.a2 " +
+ " INSERT INTO TABLE " + Table.ACIDTBLPART + " PARTITION(p='even') select source.a2, source.b2 where source.a2=target.a " +
+ " insert into table " + Table.ACIDTBLPART + " PARTITION(p='odd') select source.a2,source.b2 where target.a is null";
+ //r = runStatementOnDriver("explain formatted " + s);
+ //LOG.info("Explain formatted: " + r.toString());
+ runStatementOnDriver(s);
+ r = runStatementOnDriver("select a,b from " + Table.ACIDTBLPART + " where p='even' order by a,b");
+ int[][] rExpected = {{2,1},{2,2},{4,3},{5,5}};
+ Assert.assertEquals(stringifyValues(rExpected), r);
+ r = runStatementOnDriver("select a,b from " + Table.ACIDTBLPART + " where p='odd' order by a,b");
+ int[][] rExpected2 = {{5,6},{7,8},{11,11}};
+ Assert.assertEquals(stringifyValues(rExpected2), r);
+ }
+ /**
+ * check that we can specify insert columns
+ *
+ * Need to figure out semantics: what if a row from base expr ends up in both Update and Delete clauses we'll write
+ * Update event to 1 delta and Delete to another. Given that we collapse events for same current txn for different stmt ids
+ * to the latest one, delete will win.
+ * In Acid 2.0 we'll end up with 2 Delete events for the same PK. Logically should be OK, but may break Vectorized reader impl.... need to check
+ *
+ * 1:M from target to source results in ambiguous write to target - SQL Standard expects an error. (I have an argument on how
+ * to solve this with minor mods to Join operator written down somewhere)
+ *
+ * Only need 1 Stats task for MERGE (currently we get 1 per branch).
+ * Should also eliminate Move task - that's a general ACID task
+ */
+ private void logResuts(List<String> r, String header, String prefix) {
+ LOG.info(prefix + " " + header);
+ StringBuilder sb = new StringBuilder();
+ int numLines = 0;
+ for(String line : r) {
+ numLines++;
+ sb.append(prefix).append(line).append("\n");
+ }
+ LOG.info(sb.toString());
+ LOG.info(prefix + " Printed " + numLines + " lines");
+ }
+
+
+ /**
+ * This tests that we handle non-trivial ON clause correctly
+ * @throws Exception
+ */
+ @Test
+ public void testMerge() throws Exception {
+ int[][] baseValsOdd = {{5,5},{11,11}};
+ runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='odd') " + makeValuesClause(baseValsOdd));
+ int[][] vals = {{2,1},{4,3},{5,6},{7,8}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals));
+ List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ Assert.assertEquals(stringifyValues(vals), r);
+ String query = "merge into " + Table.ACIDTBL +
+ " using " + Table.NONACIDPART2 + " source ON " + Table.ACIDTBL + ".a = a2 and b + 1 = source.b2 + 1 " +
+ "WHEN MATCHED THEN UPDATE set b = source.b2 " +
+ "WHEN NOT MATCHED THEN INSERT VALUES(source.a2, source.b2)";
+ runStatementOnDriver(query);
+
+ r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ int[][] rExpected = {{2,1},{4,3},{5,5},{5,6},{7,8},{11,11}};
+ Assert.assertEquals(stringifyValues(rExpected), r);
+ }
+ @Test
+ public void testMergeWithPredicate() throws Exception {
+ int[][] baseValsOdd = {{2,2},{5,5},{8,8},{11,11}};
+ runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='odd') " + makeValuesClause(baseValsOdd));
+ int[][] vals = {{2,1},{4,3},{5,6},{7,8}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals));
+ List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ Assert.assertEquals(stringifyValues(vals), r);
+ String query = "merge into " + Table.ACIDTBL +
+ " t using " + Table.NONACIDPART2 + " s ON t.a = s.a2 " +
+ "WHEN MATCHED AND t.b between 1 and 3 THEN UPDATE set b = s.b2 " +
+ "WHEN NOT MATCHED and s.b2 >= 11 THEN INSERT VALUES(s.a2, s.b2)";
+ runStatementOnDriver(query);
+
+ r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ int[][] rExpected = {{2,2},{4,3},{5,6},{7,8},{11,11}};
+ Assert.assertEquals(stringifyValues(rExpected), r);
+ }
+
+ /**
+ * Test combines update + insert clauses
+ * @throws Exception
+ */
+ @Test
+ public void testMerge2() throws Exception {
+ int[][] baseValsOdd = {{5,5},{11,11}};
+ int[][] baseValsEven = {{2,2},{4,44}};
+ runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='odd') " + makeValuesClause(baseValsOdd));
+ runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='even') " + makeValuesClause(baseValsEven));
+ int[][] vals = {{2,1},{4,3},{5,6},{7,8}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals));
+ List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ Assert.assertEquals(stringifyValues(vals), r);
+ String query = "merge into " + Table.ACIDTBL +
+ " using " + Table.NONACIDPART2 + " source ON " + Table.ACIDTBL + ".a = source.a2 " +
+ "WHEN MATCHED THEN UPDATE set b = source.b2 " +
+ "WHEN NOT MATCHED THEN INSERT VALUES(source.a2, source.b2) ";//AND b < 1
+ r = runStatementOnDriver(query);
+ //r = runStatementOnDriver("explain " + query);
+ //logResuts(r, "Explain logical1", "");
+
+ r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ int[][] rExpected = {{2,2},{4,44},{5,5},{7,8},{11,11}};
+ Assert.assertEquals(stringifyValues(rExpected), r);
+ }
+
+ /**
+ * test combines delete + insert clauses
+ * @throws Exception
+ */
+ @Test
+ public void testMerge3() throws Exception {
+ int[][] baseValsOdd = {{5,5},{11,11}};
+ int[][] baseValsEven = {{2,2},{4,44}};
+ runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='odd') " + makeValuesClause(baseValsOdd));
+ runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='even') " + makeValuesClause(baseValsEven));
+ int[][] vals = {{2,1},{4,3},{5,6},{7,8}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals));
+ List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ Assert.assertEquals(stringifyValues(vals), r);
+ String query = "merge into " + Table.ACIDTBL +
+ " using " + Table.NONACIDPART2 + " source ON " + Table.ACIDTBL + ".a = source.a2 " +
+ "WHEN MATCHED THEN DELETE " +
+ "WHEN NOT MATCHED THEN INSERT VALUES(source.a2, source.b2) ";
+ runStatementOnDriver(query);
+
+ r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ int[][] rExpected = {{7,8},{11,11}};
+ Assert.assertEquals(stringifyValues(rExpected), r);
+ }
+ /**
+ * https://hortonworks.jira.com/browse/BUG-66580
+ * @throws Exception
+ */
+ @Ignore
+ @Test
+ public void testMultiInsert() throws Exception {
+ runStatementOnDriver("create table if not exists srcpart (a int, b int, c int) " +
+ "partitioned by (z int) clustered by (a) into 2 buckets " +
+ "stored as orc tblproperties('transactional'='true')");
+ runStatementOnDriver("create temporary table if not exists data1 (x int)");
+// runStatementOnDriver("create temporary table if not exists data2 (x int)");
+
+ runStatementOnDriver("insert into data1 values (1),(2),(3)");
+// runStatementOnDriver("insert into data2 values (4),(5),(6)");
+ d.destroy();
+ hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+ d = new Driver(hiveConf);
+ List<String> r = runStatementOnDriver(" from data1 " +
+ "insert into srcpart partition(z) select 0,0,1,x " +
+ "insert into srcpart partition(z=1) select 0,0,1");
+ }
+ /**
+ * Investigating DP and WriteEntity, etc
+ * @throws Exception
+ */
+ @Test
+ @Ignore
+ public void testDynamicPartitions() throws Exception {
+ d.destroy();
+ hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+ //In DbTxnManager.acquireLocks() we have
+ // 1 ReadEntity: default@values__tmp__table__1
+ // 1 WriteEntity: default@acidtblpart Type=TABLE WriteType=INSERT isDP=false
+ runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,1,'p1'),(2,2,'p1'),(3,3,'p1'),(4,4,'p2')");
+
+ List<String> r1 = runStatementOnDriver("select count(*) from " + Table.ACIDTBLPART);
+ Assert.assertEquals("4", r1.get(0));
+ //In DbTxnManager.acquireLocks() we have
+ // 2 ReadEntity: [default@acidtblpart@p=p1, default@acidtblpart]
+ // 1 WriteEntity: default@acidtblpart Type=TABLE WriteType=INSERT isDP=false
+ //todo: side note on the above: LockRequestBuilder combines the both default@acidtblpart entries to 1
+ runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) select * from " + Table.ACIDTBLPART + " where p='p1'");
+
+ //In DbTxnManager.acquireLocks() we have
+ // 2 ReadEntity: [default@acidtblpart@p=p1, default@acidtblpart]
+ // 1 WriteEntity: default@acidtblpart@p=p2 Type=PARTITION WriteType=INSERT isDP=false
+ runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p='p2') select a,b from " + Table.ACIDTBLPART + " where p='p1'");
+
+ //In UpdateDeleteSemanticAnalyzer, after super analyze
+ // 3 ReadEntity: [default@acidtblpart, default@acidtblpart@p=p1, default@acidtblpart@p=p2]
+ // 1 WriteEntity: [default@acidtblpart TABLE/INSERT]
+ //after UDSA
+ // Read [default@acidtblpart, default@acidtblpart@p=p1, default@acidtblpart@p=p2]
+ // Write [default@acidtblpart@p=p1, default@acidtblpart@p=p2] - PARTITION/UPDATE, PARTITION/UPDATE
+ //todo: Why acquire per partition locks - if you have many partitions that's hugely inefficient.
+ //could acquire 1 table level Shared_write intead
+ runStatementOnDriver("update " + Table.ACIDTBLPART + " set b = 1");
+
+ //In UpdateDeleteSemanticAnalyzer, after super analyze
+ // Read [default@acidtblpart, default@acidtblpart@p=p1]
+ // Write default@acidtblpart TABLE/INSERT
+ //after UDSA
+ // Read [default@acidtblpart, default@acidtblpart@p=p1]
+ // Write [default@acidtblpart@p=p1] PARTITION/UPDATE
+ //todo: this causes a Read lock on the whole table - clearly overkill
+ //for Update/Delete we always write exactly (at most actually) the partitions we read
+ runStatementOnDriver("update " + Table.ACIDTBLPART + " set b = 1 where p='p1'");
+ }
+ @Test
+ public void testDynamicPartitionsMerge() throws Exception {
+ d.destroy();
+ hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+ runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,1,'p1'),(2,2,'p1'),(3,3,'p1'),(4,4,'p2')");
+
+ List<String> r1 = runStatementOnDriver("select count(*) from " + Table.ACIDTBLPART);
+ Assert.assertEquals("4", r1.get(0));
+ int[][] sourceVals = {{2,15},{4,44},{5,5},{11,11}};
+ runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(sourceVals));
+ runStatementOnDriver("merge into " + Table.ACIDTBLPART + " using " + Table.NONACIDORCTBL +
+ " as s ON " + Table.ACIDTBLPART + ".a = s.a " +
+ "when matched then update set b = s.b " +
+ "when not matched then insert values(s.a, s.b, 'new part')");
+ r1 = runStatementOnDriver("select p,a,b from " + Table.ACIDTBLPART + " order by p, a, b");
+ String result= r1.toString();
+ Assert.assertEquals("[new part\t5\t5, new part\t11\t11, p1\t1\t1, p1\t2\t15, p1\t3\t3, p2\t4\t44]", result);
+ }
+ /**
+ * Using nested partitions and thus DummyPartition
+ * @throws Exception
+ */
+ @Test
+ public void testDynamicPartitionsMerge2() throws Exception {
+ d.destroy();
+ hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+ int[][] targetVals = {{1,1,1},{2,2,2},{3,3,1},{4,4,2}};
+ runStatementOnDriver("insert into " + Table.ACIDNESTEDPART + " partition(p=1,q) " + makeValuesClause(targetVals));
+
+ List<String> r1 = runStatementOnDriver("select count(*) from " + Table.ACIDNESTEDPART);
+ Assert.assertEquals("4", r1.get(0));
+ int[][] sourceVals = {{2,15},{4,44},{5,5},{11,11}};
+ runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(sourceVals));
+ runStatementOnDriver("merge into " + Table.ACIDNESTEDPART + " using " + Table.NONACIDORCTBL +
+ " as s ON " + Table.ACIDNESTEDPART + ".a = s.a " +
+ "when matched then update set b = s.b " +
+ "when not matched then insert values(s.a, s.b, 3,4)");
+ r1 = runStatementOnDriver("select p,q,a,b from " + Table.ACIDNESTEDPART + " order by p,q, a, b");
+ Assert.assertEquals(stringifyValues(new int[][] {{1,1,1,1},{1,1,3,3},{1,2,2,15},{1,2,4,44},{3,4,5,5},{3,4,11,11}}), r1);
+ }
+ @Ignore("Covered elsewhere")
+ @Test
+ public void testMergeAliasedTarget() throws Exception {
+ int[][] baseValsOdd = {{2,2},{4,44},{5,5},{11,11}};
+ runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(baseValsOdd));
+ int[][] vals = {{2,1},{4,3},{5,6},{7,8}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals));
+ String query = "merge into " + Table.ACIDTBL +
+ " as target using " + Table.NONACIDORCTBL + " source ON target.a = source.a " +
+ "WHEN MATCHED THEN update set b = 0 " +
+ "WHEN NOT MATCHED THEN INSERT VALUES(source.a, source.b) ";
+ runStatementOnDriver(query);
+
+ List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ int[][] rExpected = {{2,0},{4,0},{5,0},{7,8},{11,11}};
+ Assert.assertEquals(stringifyValues(rExpected), r);
+ }
+ @Test
+ public void testMergeUpdateDelete() throws Exception {
+ int[][] baseValsOdd = {{2,2},{4,44},{5,5},{11,11}};
+ runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(baseValsOdd));
+ int[][] vals = {{2,1},{4,3},{5,6},{7,8}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals));
+ String query = "merge into " + Table.ACIDTBL +
+ " as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " +
+ "WHEN MATCHED AND s.a < 3 THEN update set b = 0 " +
+ "WHEN MATCHED and t.a > 3 and t.a < 5 THEN DELETE " +
+ "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) ";
+ runStatementOnDriver(query);
+
+ List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ int[][] rExpected = {{2,0},{5,6},{7,8},{11,11}};
+ Assert.assertEquals(stringifyValues(rExpected), r);
+ }
+ @Test
+ public void testMergeDeleteUpdate() throws Exception {
+ int[][] sourceVals = {{2,2},{4,44},{5,5},{11,11}};
+ runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(sourceVals));
+ int[][] targetVals = {{2,1},{4,3},{5,6},{7,8}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(targetVals));
+ String query = "merge into " + Table.ACIDTBL +
+ " as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " +
+ "WHEN MATCHED and s.a < 5 THEN DELETE " +
+ "WHEN MATCHED AND s.a < 3 THEN update set b = 0 " +
+ "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) ";
+ runStatementOnDriver(query);
+
+ List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ int[][] rExpected = {{5,6},{7,8},{11,11}};
+ Assert.assertEquals(stringifyValues(rExpected), r);
+ }
+
+ /**
+ * https://www.linkedin.com/pulse/how-load-slowly-changing-dimension-type-2-using-oracle-padhy
+ */
+ @Test
+ public void testMergeType2SCD01() throws Exception {
+ runStatementOnDriver("drop table if exists target");
+ runStatementOnDriver("drop table if exists source");
+ runStatementOnDriver("drop table if exists splitTable");
+
+ runStatementOnDriver("create table splitTable(op int)");
+ runStatementOnDriver("insert into splitTable values (0),(1)");
+ runStatementOnDriver("create table source (key int, data int)");
+ runStatementOnDriver("create table target (key int, data int, cur int) clustered by (key) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ int[][] targetVals = {{1, 5, 1}, {2, 6, 1}, {1, 18, 0}};
+ runStatementOnDriver("insert into target " + makeValuesClause(targetVals));
+ int[][] sourceVals = {{1, 7}, {3, 8}};
+ runStatementOnDriver("insert into source " + makeValuesClause(sourceVals));
+ //augment source with a col which has 1 if it will cause an update in target, 0 otherwise
+ String curMatch = "select s.*, case when t.cur is null then 0 else 1 end m from source s left outer join (select * from target where target.cur=1) t on s.key=t.key";
+ //split each row (duplicate) which will cause an update into 2 rows and augment with 'op' col which has 0 to insert, 1 to update
+ String teeCurMatch = "select curMatch.*, case when splitTable.op is null or splitTable.op = 0 then 0 else 1 end op from (" + curMatch + ") curMatch left outer join splitTable on curMatch.m=1";
+ if(false) {
+ //this is just for debug
+ List<String> r1 = runStatementOnDriver(curMatch);
+ List<String> r2 = runStatementOnDriver(teeCurMatch);
+ }
+ String stmt = "merge into target t using (" + teeCurMatch + ") s on t.key=s.key and t.cur=1 and s.op=1 " +
+ "when matched then update set cur=0 " +
+ "when not matched then insert values(s.key,s.data,1)";
+
+ runStatementOnDriver(stmt);
+ int[][] resultVals = {{1,5,0},{1,7,1},{1,18,0},{2,6,1},{3,8,1}};
+ List<String> r = runStatementOnDriver("select * from target order by key,data,cur");
+ Assert.assertEquals(stringifyValues(resultVals), r);
+ }
+ /**
+ * https://www.linkedin.com/pulse/how-load-slowly-changing-dimension-type-2-using-oracle-padhy
+ * Same as testMergeType2SCD01 but with a more intuitive "source" expression
+ */
+ @Test
+ public void testMergeType2SCD02() throws Exception {
+ runStatementOnDriver("drop table if exists target");
+ runStatementOnDriver("drop table if exists source");
+ runStatementOnDriver("create table source (key int, data int)");
+ runStatementOnDriver("create table target (key int, data int, cur int) clustered by (key) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ int[][] targetVals = {{1, 5, 1}, {2, 6, 1}, {1, 18, 0}};
+ runStatementOnDriver("insert into target " + makeValuesClause(targetVals));
+ int[][] sourceVals = {{1, 7}, {3, 8}};
+ runStatementOnDriver("insert into source " + makeValuesClause(sourceVals));
+
+ String baseSrc = "select source.*, 0 c from source " +
+ "union all " +
+ "select source.*, 1 c from source " +
+ "inner join target " +
+ "on source.key=target.key where target.cur=1";
+ if(false) {
+ //this is just for debug
+ List<String> r1 = runStatementOnDriver(baseSrc);
+ List<String> r2 = runStatementOnDriver(
+ "select t.*, s.* from target t right outer join (" + baseSrc + ") s " +
+ "\non t.key=s.key and t.cur=s.c and t.cur=1");
+ }
+ String stmt = "merge into target t using " +
+ "(" + baseSrc + ") s " +
+ "on t.key=s.key and t.cur=s.c and t.cur=1 " +
+ "when matched then update set cur=0 " +
+ "when not matched then insert values(s.key,s.data,1)";
+
+ runStatementOnDriver(stmt);
+ int[][] resultVals = {{1,5,0},{1,7,1},{1,18,0},{2,6,1},{3,8,1}};
+ List<String> r = runStatementOnDriver("select * from target order by key,data,cur");
+ Assert.assertEquals(stringifyValues(resultVals), r);
+ }
+
+ @Test
+ @Ignore("Values clause with table constructor not yet supported")
+ public void testValuesSource() throws Exception {
+ int[][] targetVals = {{2,1},{4,3},{5,6},{7,8}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(targetVals));
+ String query = "merge into " + Table.ACIDTBL +
+ " as t using (select * from (values (2,2),(4,44),(5,5),(11,11)) as F(a,b)) s ON t.a = s.a " +
+ "WHEN MATCHED and s.a < 5 THEN DELETE " +
+ "WHEN MATCHED AND s.a < 3 THEN update set b = 0 " +
+ "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) ";
+ runStatementOnDriver(query);
+
+ List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ int[][] rExpected = {{5,6},{7,8},{11,11}};
+ Assert.assertEquals(stringifyValues(rExpected), r);
+ }
/**
* takes raw data and turns it into a string as if from Driver.getResults()
@@ -1389,6 +1803,7 @@ public class TestTxnCommands2 {
}
protected List<String> runStatementOnDriver(String stmt) throws Exception {
+ LOG.info("+runStatementOnDriver(" + stmt + ")");
CommandProcessorResponse cpr = d.run(stmt);
if(cpr.getResponseCode() != 0) {
throw new RuntimeException(stmt + " failed: " + cpr);
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java
index c2330cb..c4dead8 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java
@@ -18,21 +18,16 @@
package org.apache.hadoop.hive.ql;
-import java.io.File;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
-import org.apache.hadoop.hive.ql.io.HiveInputFormat;
-import org.apache.hadoop.hive.ql.session.SessionState;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -545,4 +540,10 @@ public class TestTxnCommands2WithSplitUpdate extends TestTxnCommands2 {
resultCount = 2;
Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
}
+ @Test
+ @Ignore
+ public void testMergeType2SCD01() throws Exception {}
+ @Test
+ @Ignore
+ public void testMergeType2SCD02() throws Exception {}
}