You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2019/01/09 23:15:24 UTC
[1/2] hive git commit: HIVE-20919 Break up
UpdateDeleteSemanticAnalyzer (Miklos Gergely via Eugene Koifman)
Repository: hive
Updated Branches:
refs/heads/master 4d03e31d3 -> dcc895016
http://git-wip-us.apache.org/repos/asf/hive/blob/dcc89501/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
index 8651afd..179021e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
@@ -17,603 +17,89 @@
*/
package org.apache.hadoop.hive.ql.parse;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.IdentityHashMap;
-import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
-import org.antlr.runtime.TokenRewriteStream;
-import org.antlr.runtime.tree.Tree;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.ql.DriverContext;
-import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryState;
-import org.apache.hadoop.hive.ql.exec.DDLTask;
-import org.apache.hadoop.hive.ql.exec.StatsTask;
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.hooks.Entity;
-import org.apache.hadoop.hive.ql.hooks.ReadEntity;
-import org.apache.hadoop.hive.ql.hooks.WriteEntity;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.lib.Node;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
-import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
-import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
-import org.apache.hadoop.hive.ql.plan.CreateTableLikeDesc;
-import org.apache.hadoop.hive.ql.plan.DDLWork;
-import org.apache.hadoop.hive.ql.plan.DropTableDesc;
-import org.apache.hadoop.hive.ql.plan.ExportWork;
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
* A subclass of the {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer} that just handles
- * update, delete and merge statements. It works by rewriting the updates and deletes into insert
+ * update and delete statements. It works by rewriting the updates and deletes into insert
* statements (since they are actually inserts) and then doing some patch up to make them work as
* updates and deletes instead.
*/
-public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
- private static final Logger LOG = LoggerFactory.getLogger(UpdateDeleteSemanticAnalyzer.class);
+public class UpdateDeleteSemanticAnalyzer extends RewriteSemanticAnalyzer {
- private boolean useSuper = false;
+ private Context.Operation operation = Context.Operation.OTHER;
UpdateDeleteSemanticAnalyzer(QueryState queryState) throws SemanticException {
super(queryState);
}
- @Override
- public void analyzeInternal(ASTNode tree) throws SemanticException {
- if (useSuper) {
- super.analyzeInternal(tree);
- } else {
- if (!getTxnMgr().supportsAcid()) {
- throw new SemanticException(ErrorMsg.ACID_OP_ON_NONACID_TXNMGR.getMsg());
- }
- switch (tree.getToken().getType()) {
- case HiveParser.TOK_DELETE_FROM:
- analyzeDelete(tree);
- break;
- case HiveParser.TOK_UPDATE_TABLE:
- analyzeUpdate(tree);
- break;
- case HiveParser.TOK_MERGE:
- analyzeMerge(tree);
- break;
- case HiveParser.TOK_EXPORT:
- analyzeAcidExport(tree);
- break;
- default:
- throw new RuntimeException("Asked to parse token " + tree.getName() + " in " +
- "UpdateDeleteSemanticAnalyzer");
- }
- cleanUpMetaColumnAccessControl();
-
- }
- }
- private boolean updating() {
- return currentOperation == Context.Operation.UPDATE;
- }
- private boolean deleting() {
- return currentOperation == Context.Operation.DELETE;
- }
-
- /**
- * Exporting an Acid table is more complicated than a flat table. It may contains delete events,
- * which can only be interpreted properly withing the context of the table/metastore where they
- * were generated. It may also contain insert events that belong to transactions that aborted
- * where the same constraints apply.
- * In order to make the export artifact free of these constraints, the export does a
- * insert into tmpTable select * from <export table> to filter/apply the events in current
- * context and then export the tmpTable. This export artifact can now be imported into any
- * table on any cluster (subject to schema checks etc).
- * See {@link #analyzeAcidExport(ASTNode)}
- * @param tree Export statement
- * @return true if exporting an Acid table.
- */
- public static boolean isAcidExport(ASTNode tree) throws SemanticException {
- assert tree != null && tree.getToken() != null &&
- tree.getToken().getType() == HiveParser.TOK_EXPORT;
- Tree tokTab = tree.getChild(0);
- assert tokTab != null && tokTab.getType() == HiveParser.TOK_TAB;
- Table tableHandle = null;
- try {
- tableHandle = getTable((ASTNode) tokTab.getChild(0), Hive.get(), false);
- } catch(HiveException ex) {
- throw new SemanticException(ex);
- }
-
- //tableHandle can be null if table doesn't exist
- return tableHandle != null && AcidUtils.isFullAcidTable(tableHandle);
- }
- private static String getTmptTableNameForExport(Table exportTable) {
- String tmpTableDb = exportTable.getDbName();
- String tmpTableName = exportTable.getTableName() + "_" +
- UUID.randomUUID().toString().replace('-', '_');
- return Warehouse.getQualifiedName(tmpTableDb, tmpTableName);
- }
-
- /**
- * See {@link #isAcidExport(ASTNode)}
- * 1. create the temp table T
- * 2. compile 'insert into T select * from acidTable'
- * 3. compile 'export acidTable' (acidTable will be replaced with T during execution)
- * 4. create task to drop T
- *
- * Using a true temp (session level) table means it should not affect replication and the table
- * is not visible outside the Session that created for security
- */
- private void analyzeAcidExport(ASTNode ast) throws SemanticException {
- assert ast != null && ast.getToken() != null &&
- ast.getToken().getType() == HiveParser.TOK_EXPORT;
- ASTNode tableTree = (ASTNode)ast.getChild(0);
- assert tableTree != null && tableTree.getType() == HiveParser.TOK_TAB;
- ASTNode tokRefOrNameExportTable = (ASTNode) tableTree.getChild(0);
- Table exportTable = getTargetTable(tokRefOrNameExportTable);
- assert AcidUtils.isFullAcidTable(exportTable);
-
- //need to create the table "manually" rather than creating a task since it has to exist to
- // compile the insert into T...
- String newTableName = getTmptTableNameForExport(exportTable); //this is db.table
- Map<String, String> tblProps = new HashMap<>();
- tblProps.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.FALSE.toString());
- String location;
-
- // for temporary tables we set the location to something in the session's scratch dir
- // it has the same life cycle as the tmp table
- try {
- // Generate a unique ID for temp table path.
- // This path will be fixed for the life of the temp table.
- Path path = new Path(SessionState.getTempTableSpace(conf), UUID.randomUUID().toString());
- path = Warehouse.getDnsPath(path, conf);
- location = path.toString();
- } catch (MetaException err) {
- throw new SemanticException("Error while generating temp table path:", err);
- }
-
- CreateTableLikeDesc ctlt = new CreateTableLikeDesc(newTableName,
- false, true, null,
- null, location, null, null,
- tblProps,
- true, //important so we get an exception on name collision
- Warehouse.getQualifiedName(exportTable.getTTable()), false);
- Table newTable;
- try {
- ReadEntity dbForTmpTable = new ReadEntity(db.getDatabase(exportTable.getDbName()));
- inputs.add(dbForTmpTable); //so the plan knows we are 'reading' this db - locks, security...
- DDLTask createTableTask = (DDLTask) TaskFactory.get(
- new DDLWork(new HashSet<>(), new HashSet<>(), ctlt), conf);
- createTableTask.setConf(conf); //above get() doesn't set it
- createTableTask.execute(new DriverContext(new Context(conf)));
- newTable = db.getTable(newTableName);
- } catch(IOException|HiveException ex) {
- throw new SemanticException(ex);
- }
-
- //now generate insert statement
- //insert into newTableName select * from ts <where partition spec>
- StringBuilder rewrittenQueryStr = generateExportQuery(newTable.getPartCols(),
- tokRefOrNameExportTable, tableTree, newTableName);
- ReparseResult rr = parseRewrittenQuery(rewrittenQueryStr, ctx.getCmd());
- Context rewrittenCtx = rr.rewrittenCtx;
- rewrittenCtx.setIsUpdateDeleteMerge(false); //it's set in parseRewrittenQuery()
- ASTNode rewrittenTree = rr.rewrittenTree;
- try {
- useSuper = true;
- //newTable has to exist at this point to compile
- super.analyze(rewrittenTree, rewrittenCtx);
- } finally {
- useSuper = false;
- }
- //now we have the rootTasks set up for Insert ... Select
- removeStatsTasks(rootTasks);
- //now make an ExportTask from temp table
- /*analyzeExport() creates TableSpec which in turn tries to build
- "public List<Partition> partitions" by looking in the metastore to find Partitions matching
- the partition spec in the Export command. These of course don't exist yet since we've not
- ran the insert stmt yet!!!!!!!
- */
- Task<ExportWork> exportTask = ExportSemanticAnalyzer.analyzeExport(ast, newTableName,
- db, conf, inputs, outputs);
-
- AlterTableDesc alterTblDesc = null;
- {
- /**
- * add an alter table task to set transactional props
- * do it after populating temp table so that it's written as non-transactional table but
- * update props before export so that export archive metadata has these props. This way when
- * IMPORT is done for this archive and target table doesn't exist, it will be created as Acid.
- */
- alterTblDesc = new AlterTableDesc(AlterTableDesc.AlterTableTypes.ADDPROPS);
- HashMap<String, String> mapProps = new HashMap<>();
- mapProps.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.TRUE.toString());
- alterTblDesc.setProps(mapProps);
- alterTblDesc.setOldName(newTableName);
- }
- addExportTask(rootTasks, exportTask, TaskFactory.get(
- new DDLWork(getInputs(), getOutputs(), alterTblDesc)));
-
- {
- /**
- * Now make a task to drop temp table
- * {@link DDLSemanticAnalyzer#analyzeDropTable(ASTNode ast, TableType expectedType)
- */
- ReplicationSpec replicationSpec = new ReplicationSpec();
- DropTableDesc dropTblDesc = new DropTableDesc(newTableName, TableType.MANAGED_TABLE,
- false, true, replicationSpec);
- Task<DDLWork> dropTask =
- TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), dropTblDesc), conf);
- exportTask.addDependentTask(dropTask);
- }
- markReadEntityForUpdate();
- if(ctx.isExplainPlan()) {
- try {
- //so that "explain" doesn't "leak" tmp tables
- // TODO: catalog
- db.dropTable(newTable.getDbName(), newTable.getTableName(), true, true, true);
- } catch(HiveException ex) {
- LOG.warn("Unable to drop " + newTableName + " due to: " + ex.getMessage(), ex);
- }
- }
- }
- /**
- * generate
- * insert into newTableName select * from ts <where partition spec>
- * for EXPORT command
- */
- private StringBuilder generateExportQuery(List<FieldSchema> partCols,
- ASTNode tokRefOrNameExportTable, ASTNode tableTree, String newTableName)
- throws SemanticException {
- StringBuilder rewrittenQueryStr = new StringBuilder("insert into ").append(newTableName);
- addPartitionColsToInsert(partCols, rewrittenQueryStr);
- rewrittenQueryStr.append(" select * from ").append(getFullTableNameForSQL(tokRefOrNameExportTable));
- //builds partition spec so we can build suitable WHERE clause
- TableSpec exportTableSpec = new TableSpec(db, conf, tableTree, false, true);
- if(exportTableSpec.getPartSpec() != null) {
- StringBuilder whereClause = null;
- int partColsIdx = -1; //keep track of corresponding col in partCols
- for(Map.Entry<String, String> ent : exportTableSpec.getPartSpec().entrySet()) {
- partColsIdx++;
- if(ent.getValue() == null) {
- continue; //partial spec
- }
- if(whereClause == null) {
- whereClause = new StringBuilder(" WHERE ");
- }
- if(whereClause.length() > " WHERE ".length()) {
- whereClause.append(" AND ");
- }
- whereClause.append(HiveUtils.unparseIdentifier(ent.getKey(), conf))
- .append(" = ").append(genPartValueString(partCols.get(partColsIdx).getType(), ent.getValue()));
- }
- if(whereClause != null) {
- rewrittenQueryStr.append(whereClause);
- }
- }
- return rewrittenQueryStr;
- }
- /**
- * Makes the exportTask run after all other tasks of the "insert into T ..." are done.
- */
- private void addExportTask(List<Task<?>> rootTasks,
- Task<ExportWork> exportTask, Task<DDLWork> alterTable) {
- for(Task<? extends Serializable> t : rootTasks) {
- if(t.getNumChild() <= 0) {
- //todo: ConditionalTask#addDependentTask(Task) doesn't do the right thing: HIVE-18978
- t.addDependentTask(alterTable);
- //this is a leaf so add exportTask to follow it
- alterTable.addDependentTask(exportTask);
- } else {
- addExportTask(t.getDependentTasks(), exportTask, alterTable);
- }
- }
- }
-
- private List<Task<?>> findStatsTasks(
- List<Task<?>> rootTasks, List<Task<?>> statsTasks) {
- for(Task<? extends Serializable> t : rootTasks) {
- if (t instanceof StatsTask) {
- if(statsTasks == null) {
- statsTasks = new ArrayList<>();
- }
- statsTasks.add(t);
- }
- if(t.getDependentTasks() != null) {
- statsTasks = findStatsTasks(t.getDependentTasks(), statsTasks);
- }
+ protected void analyze(ASTNode tree) throws SemanticException {
+ switch (tree.getToken().getType()) {
+ case HiveParser.TOK_DELETE_FROM:
+ analyzeDelete(tree);
+ break;
+ case HiveParser.TOK_UPDATE_TABLE:
+ analyzeUpdate(tree);
+ break;
+ default:
+ throw new RuntimeException("Asked to parse token " + tree.getName() + " in " +
+ "UpdateDeleteSemanticAnalyzer");
}
- return statsTasks;
}
- private void removeStatsTasks(List<Task<?>> rootTasks) {
- List<Task<?>> statsTasks = findStatsTasks(rootTasks, null);
- if(statsTasks == null) {
- return;
- }
- for (Task<?> statsTask : statsTasks) {
- if(statsTask.getParentTasks() == null) {
- continue; //should never happen
- }
- for (Task<?> t : new ArrayList<>(statsTask.getParentTasks())) {
- t.removeDependentTask(statsTask);
- }
- }
- }
private void analyzeUpdate(ASTNode tree) throws SemanticException {
- currentOperation = Context.Operation.UPDATE;
+ operation = Context.Operation.UPDATE;
reparseAndSuperAnalyze(tree);
}
private void analyzeDelete(ASTNode tree) throws SemanticException {
- currentOperation = Context.Operation.DELETE;
+ operation = Context.Operation.DELETE;
reparseAndSuperAnalyze(tree);
}
/**
- * Append list of partition columns to Insert statement, i.e. the 2nd set of partCol1,partCol2
- * INSERT INTO T PARTITION(partCol1,partCol2...) SELECT col1, ... partCol1,partCol2...
- * @param target target table
- */
- private void addPartitionColsToSelect(List<FieldSchema> partCols, StringBuilder rewrittenQueryStr,
- ASTNode target) throws SemanticException {
- String targetName = target != null ? getSimpleTableName(target) : null;
-
- // If the table is partitioned, we need to select the partition columns as well.
- if (partCols != null) {
- for (FieldSchema fschema : partCols) {
- rewrittenQueryStr.append(", ");
- //would be nice if there was a way to determine if quotes are needed
- if(targetName != null) {
- rewrittenQueryStr.append(targetName).append('.');
- }
- rewrittenQueryStr.append(HiveUtils.unparseIdentifier(fschema.getName(), this.conf));
- }
- }
- }
- /**
- * Assert that we are not asked to update a bucketing column or partition column
- * @param colName it's the A in "SET A = B"
- */
- private void checkValidSetClauseTarget(ASTNode colName, Table targetTable) throws SemanticException {
- String columnName = normalizeColName(colName.getText());
-
- // Make sure this isn't one of the partitioning columns, that's not supported.
- for (FieldSchema fschema : targetTable.getPartCols()) {
- if (fschema.getName().equalsIgnoreCase(columnName)) {
- throw new SemanticException(ErrorMsg.UPDATE_CANNOT_UPDATE_PART_VALUE.getMsg());
- }
- }
- //updating bucket column should move row from one file to another - not supported
- if(targetTable.getBucketCols() != null && targetTable.getBucketCols().contains(columnName)) {
- throw new SemanticException(ErrorMsg.UPDATE_CANNOT_UPDATE_BUCKET_VALUE,columnName);
- }
- boolean foundColumnInTargetTable = false;
- for(FieldSchema col : targetTable.getCols()) {
- if(columnName.equalsIgnoreCase(col.getName())) {
- foundColumnInTargetTable = true;
- break;
- }
- }
- if(!foundColumnInTargetTable) {
- throw new SemanticException(ErrorMsg.INVALID_TARGET_COLUMN_IN_SET_CLAUSE, colName.getText(),
- targetTable.getFullyQualifiedName());
- }
- }
- private ASTNode findLHSofAssignment(ASTNode assignment) {
- assert assignment.getToken().getType() == HiveParser.EQUAL :
- "Expected set assignments to use equals operator but found " + assignment.getName();
- ASTNode tableOrColTok = (ASTNode)assignment.getChildren().get(0);
- assert tableOrColTok.getToken().getType() == HiveParser.TOK_TABLE_OR_COL :
- "Expected left side of assignment to be table or column";
- ASTNode colName = (ASTNode)tableOrColTok.getChildren().get(0);
- assert colName.getToken().getType() == HiveParser.Identifier :
- "Expected column name";
- return colName;
- }
- private Map<String, ASTNode> collectSetColumnsAndExpressions(ASTNode setClause,
- Set<String> setRCols, Table targetTable) throws SemanticException {
- // An update needs to select all of the columns, as we rewrite the entire row. Also,
- // we need to figure out which columns we are going to replace.
- assert setClause.getToken().getType() == HiveParser.TOK_SET_COLUMNS_CLAUSE :
- "Expected second child of update token to be set token";
-
- // Get the children of the set clause, each of which should be a column assignment
- List<? extends Node> assignments = setClause.getChildren();
- // Must be deterministic order map for consistent q-test output across Java versions
- Map<String, ASTNode> setCols = new LinkedHashMap<String, ASTNode>(assignments.size());
- for (Node a : assignments) {
- ASTNode assignment = (ASTNode)a;
- ASTNode colName = findLHSofAssignment(assignment);
- if(setRCols != null) {
- addSetRCols((ASTNode) assignment.getChildren().get(1), setRCols);
- }
- checkValidSetClauseTarget(colName, targetTable);
-
- String columnName = normalizeColName(colName.getText());
- // This means that in UPDATE T SET x = _something_
- // _something_ can be whatever is supported in SELECT _something_
- setCols.put(columnName, (ASTNode)assignment.getChildren().get(1));
- }
- return setCols;
- }
- /**
- * @return the Metastore representation of the target table
- */
- private Table getTargetTable(ASTNode tabRef) throws SemanticException {
- return getTable(tabRef, db, true);
- }
- /**
- * @param throwException if false, return null if table doesn't exist, else throw
- */
- private static Table getTable(ASTNode tabRef, Hive db, boolean throwException)
- throws SemanticException {
- String[] tableName;
- Table mTable;
- switch (tabRef.getType()) {
- case HiveParser.TOK_TABREF:
- tableName = getQualifiedTableName((ASTNode) tabRef.getChild(0));
- break;
- case HiveParser.TOK_TABNAME:
- tableName = getQualifiedTableName(tabRef);
- break;
- default:
- throw raiseWrongType("TOK_TABREF|TOK_TABNAME", tabRef);
- }
- try {
- mTable = db.getTable(tableName[0], tableName[1], throwException);
- } catch (InvalidTableException e) {
- LOG.error("Failed to find table " + getDotName(tableName) + " got exception "
- + e.getMessage());
- throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(getDotName(tableName)), e);
- } catch (HiveException e) {
- LOG.error("Failed to find table " + getDotName(tableName) + " got exception "
- + e.getMessage());
- throw new SemanticException(e.getMessage(), e);
- }
- return mTable;
- }
- // Walk through all our inputs and set them to note that this read is part of an update or a
- // delete.
- private void markReadEntityForUpdate() {
- for (ReadEntity input : inputs) {
- if(isWritten(input)) {
- //todo: this is actually not adding anything since LockComponent uses a Trie to "promote" a lock
- //except by accident - when we have a partitioned target table we have a ReadEntity and WriteEntity
- //for the table, so we mark ReadEntity and then delete WriteEntity (replace with Partition entries)
- //so DbTxnManager skips Read lock on the ReadEntity....
- input.setUpdateOrDelete(true);//input.noLockNeeded()?
- }
- }
- }
- /**
- * For updates, we need to set the column access info so that it contains information on
- * the columns we are updating.
- * (But not all the columns of the target table even though the rewritten query writes
- * all columns of target table since that is an implmentation detail)
- */
- private void setUpAccessControlInfoForUpdate(Table mTable, Map<String, ASTNode> setCols) {
- ColumnAccessInfo cai = new ColumnAccessInfo();
- for (String colName : setCols.keySet()) {
- cai.add(Table.getCompleteName(mTable.getDbName(), mTable.getTableName()), colName);
- }
- setUpdateColumnAccessInfo(cai);
- }
- /**
- * We need to weed ROW__ID out of the input column info, as it doesn't make any sense to
- * require the user to have authorization on that column.
- */
- private void cleanUpMetaColumnAccessControl() {
- //we do this for Update/Delete (incl Merge) because we introduce this column into the query
- //as part of rewrite
- if (columnAccessInfo != null) {
- columnAccessInfo.stripVirtualColumn(VirtualColumn.ROWID);
- }
- }
- /**
- * Parse the newly generated SQL statement to get a new AST
- */
- private ReparseResult parseRewrittenQuery(StringBuilder rewrittenQueryStr, String originalQuery) throws SemanticException {
- // Set dynamic partitioning to nonstrict so that queries do not need any partition
- // references.
- // todo: this may be a perf issue as it prevents the optimizer.. or not
- HiveConf.setVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
- // Disable LLAP IO wrapper; doesn't propagate extra ACID columns correctly.
- HiveConf.setBoolVar(conf, ConfVars.LLAP_IO_ROW_WRAPPER_ENABLED, false);
- // Parse the rewritten query string
- Context rewrittenCtx;
- try {
- rewrittenCtx = new Context(conf);
- rewrittenCtx.setHDFSCleanup(true);
- // We keep track of all the contexts that are created by this query
- // so we can clear them when we finish execution
- ctx.addRewrittenStatementContext(rewrittenCtx);
- } catch (IOException e) {
- throw new SemanticException(ErrorMsg.UPDATEDELETE_IO_ERROR.getMsg());
- }
- rewrittenCtx.setExplainConfig(ctx.getExplainConfig());
- rewrittenCtx.setExplainPlan(ctx.isExplainPlan());
- rewrittenCtx.setStatsSource(ctx.getStatsSource());
- rewrittenCtx.setPlanMapper(ctx.getPlanMapper());
- rewrittenCtx.setIsUpdateDeleteMerge(true);
- rewrittenCtx.setCmd(rewrittenQueryStr.toString());
-
- ASTNode rewrittenTree;
- try {
- LOG.info("Going to reparse <" + originalQuery + "> as \n<" + rewrittenQueryStr.toString() + ">");
- rewrittenTree = ParseUtils.parse(rewrittenQueryStr.toString(), rewrittenCtx);
- } catch (ParseException e) {
- throw new SemanticException(ErrorMsg.UPDATEDELETE_PARSE_ERROR.getMsg(), e);
- }
- return new ReparseResult(rewrittenTree, rewrittenCtx);
- }
- /**
- * Assert it supports Acid write
- */
- private void validateTargetTable(Table mTable) throws SemanticException {
- if (mTable.getTableType() == TableType.VIRTUAL_VIEW ||
- mTable.getTableType() == TableType.MATERIALIZED_VIEW) {
- LOG.error("Table " + mTable.getFullyQualifiedName() + " is a view or materialized view");
- throw new SemanticException(ErrorMsg.UPDATE_DELETE_VIEW.getMsg());
- }
- }
- /**
* This supports update and delete statements
+ * Rewrite the delete or update into an insert. Crazy, but it works as deletes and update
+ * actually are inserts into the delta file in Hive. A delete
+ * DELETE FROM _tablename_ [WHERE ...]
+ * will be rewritten as
+ * INSERT INTO TABLE _tablename_ [PARTITION (_partcols_)] SELECT ROW__ID[,
+ * _partcols_] from _tablename_ SORT BY ROW__ID
+ * An update
+ * UPDATE _tablename_ SET x = _expr_ [WHERE...]
+ * will be rewritten as
+ * INSERT INTO TABLE _tablename_ [PARTITION (_partcols_)] SELECT _all_,
+ * _partcols_from _tablename_ SORT BY ROW__ID
+ * where _all_ is all the non-partition columns. The expressions from the set clause will be
+ * re-attached later.
+ * The where clause will also be re-attached later.
+ * The sort by clause is put in there so that records come out in the right order to enable
+ * merge on read.
*/
private void reparseAndSuperAnalyze(ASTNode tree) throws SemanticException {
List<? extends Node> children = tree.getChildren();
- // The first child should be the table we are deleting from
+
+ // The first child should be the table we are updating / deleting from
ASTNode tabName = (ASTNode)children.get(0);
assert tabName.getToken().getType() == HiveParser.TOK_TABNAME :
- "Expected tablename as first child of " + operation() + " but found " + tabName.getName();
-
- // Rewrite the delete or update into an insert. Crazy, but it works as deletes and update
- // actually are inserts into the delta file in Hive. A delete
- // DELETE FROM _tablename_ [WHERE ...]
- // will be rewritten as
- // INSERT INTO TABLE _tablename_ [PARTITION (_partcols_)] SELECT ROW__ID[,
- // _partcols_] from _tablename_ SORT BY ROW__ID
- // An update
- // UPDATE _tablename_ SET x = _expr_ [WHERE...]
- // will be rewritten as
- // INSERT INTO TABLE _tablename_ [PARTITION (_partcols_)] SELECT _all_,
- // _partcols_from _tablename_ SORT BY ROW__ID
- // where _all_ is all the non-partition columns. The expressions from the set clause will be
- // re-attached later.
- // The where clause will also be re-attached later.
- // The sort by clause is put in there so that records come out in the right order to enable
- // merge on read.
-
- StringBuilder rewrittenQueryStr = new StringBuilder();
+ "Expected tablename as first child of " + operation + " but found " + tabName.getName();
Table mTable = getTargetTable(tabName);
validateTargetTable(mTable);
+ StringBuilder rewrittenQueryStr = new StringBuilder();
rewrittenQueryStr.append("insert into table ");
rewrittenQueryStr.append(getFullTableNameForSQL(tabName));
-
addPartitionColsToInsert(mTable.getPartCols(), rewrittenQueryStr);
rewrittenQueryStr.append(" select ROW__ID");
@@ -669,11 +155,10 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
assert rewrittenInsert.getToken().getType() == HiveParser.TOK_INSERT :
"Expected TOK_INSERT as second child of TOK_QUERY but found " + rewrittenInsert.getName();
- if(updating()) {
+ if (updating()) {
rewrittenCtx.setOperation(Context.Operation.UPDATE);
rewrittenCtx.addDestNamePrefix(1, Context.DestClausePrefix.UPDATE);
- }
- else if(deleting()) {
+ } else if (deleting()) {
rewrittenCtx.setOperation(Context.Operation.DELETE);
rewrittenCtx.addDestNamePrefix(1, Context.DestClausePrefix.DELETE);
}
@@ -731,842 +216,18 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
// Add the setRCols to the input list
for (String colName : setRCols) {
- if(columnAccessInfo != null) {//assuming this means we are not doing Auth
+ if (columnAccessInfo != null) { //assuming this means we are not doing Auth
columnAccessInfo.add(Table.getCompleteName(mTable.getDbName(), mTable.getTableName()),
- colName);
- }
- }
- }
- }
- /**
- * Check that {@code readEntity} is also being written
- */
- private boolean isWritten(Entity readEntity) {
- for(Entity writeEntity : outputs) {
- //make sure to compare them as Entity, i.e. that it's the same table or partition, etc
- if(writeEntity.toString().equalsIgnoreCase(readEntity.toString())) {
- return true;
- }
- }
- return false;
- }
- private String operation() {
- if (currentOperation == Context.Operation.OTHER) {
- throw new IllegalStateException("UpdateDeleteSemanticAnalyzer neither updating nor " +
- "deleting, operation not known.");
- }
- return currentOperation.toString();
- }
-
- // This method finds any columns on the right side of a set statement (thus rcols) and puts them
- // in a set so we can add them to the list of input cols to check.
- private void addSetRCols(ASTNode node, Set<String> setRCols) {
-
- // See if this node is a TOK_TABLE_OR_COL. If so, find the value and put it in the list. If
- // not, recurse on any children
- if (node.getToken().getType() == HiveParser.TOK_TABLE_OR_COL) {
- ASTNode colName = (ASTNode)node.getChildren().get(0);
- assert colName.getToken().getType() == HiveParser.Identifier :
- "Expected column name";
- setRCols.add(normalizeColName(colName.getText()));
- } else if (node.getChildren() != null) {
- for (Node n : node.getChildren()) {
- addSetRCols((ASTNode)n, setRCols);
- }
- }
- }
-
- /**
- * Column names are stored in metastore in lower case, regardless of the CREATE TABLE statement.
- * Unfortunately there is no single place that normalizes the input query.
- * @param colName not null
- */
- private static String normalizeColName(String colName) {
- return colName.toLowerCase();
- }
-
- private Context.Operation currentOperation = Context.Operation.OTHER;
- private static final String Indent = " ";
-
- private IdentifierQuoter quotedIdenfierHelper;
-
- /**
- * This allows us to take an arbitrary ASTNode and turn it back into SQL that produced it.
- * Since HiveLexer.g is written such that it strips away any ` (back ticks) around
- * quoted identifiers we need to add those back to generated SQL.
- * Additionally, the parser only produces tokens of type Identifier and never
- * QuotedIdentifier (HIVE-6013). So here we just quote all identifiers.
- * (') around String literals are retained w/o issues
- */
- private static class IdentifierQuoter {
- private final TokenRewriteStream trs;
- private final IdentityHashMap<ASTNode, ASTNode> visitedNodes = new IdentityHashMap<>();
- IdentifierQuoter(TokenRewriteStream trs) {
- this.trs = trs;
- if(trs == null) {
- throw new IllegalArgumentException("Must have a TokenRewriteStream");
- }
- }
- private void visit(ASTNode n) {
- if(n.getType() == HiveParser.Identifier) {
- if(visitedNodes.containsKey(n)) {
- /**
- * Since we are modifying the stream, it's not idempotent. Ideally, the caller would take
- * care to only quote Identifiers in each subtree once, but this makes it safe
- */
- return;
+ colName);
}
- visitedNodes.put(n, n);
- trs.insertBefore(n.getToken(), "`");
- trs.insertAfter(n.getToken(), "`");
- }
- if(n.getChildCount() <= 0) {return;}
- for(Node c : n.getChildren()) {
- visit((ASTNode)c);
}
}
}
- /**
- * This allows us to take an arbitrary ASTNode and turn it back into SQL that produced it without
- * needing to understand what it is (except for QuotedIdentifiers)
- *
- */
- private String getMatchedText(ASTNode n) {
- quotedIdenfierHelper.visit(n);
- return ctx.getTokenRewriteStream().toString(n.getTokenStartIndex(),
- n.getTokenStopIndex() + 1).trim();
- }
- /**
- * Here we take a Merge statement AST and generate a semantically equivalent multi-insert
- * statement to execute. Each Insert leg represents a single WHEN clause. As much as possible,
- * the new SQL statement is made to look like the input SQL statement so that it's easier to map
- * Query Compiler errors from generated SQL to original one this way.
- * The generated SQL is a complete representation of the original input for the same reason.
- * In many places SemanticAnalyzer throws exceptions that contain (line, position) coordinates.
- * If generated SQL doesn't have everything and is patched up later, these coordinates point to
- * the wrong place.
- *
- * @throws SemanticException
- */
- private void analyzeMerge(ASTNode tree) throws SemanticException {
- currentOperation = Context.Operation.MERGE;
- quotedIdenfierHelper = new IdentifierQuoter(ctx.getTokenRewriteStream());
- /*
- * See org.apache.hadoop.hive.ql.parse.TestMergeStatement for some examples of the merge AST
- For example, given:
- merge into acidTbl using nonAcidPart2 source ON acidTbl.a = source.a2
- WHEN MATCHED THEN UPDATE set b = source.b2
- WHEN NOT MATCHED THEN INSERT VALUES(source.a2, source.b2)
-
- We get AST like this:
- "(tok_merge " +
- "(tok_tabname acidtbl) (tok_tabref (tok_tabname nonacidpart2) source) " +
- "(= (. (tok_table_or_col acidtbl) a) (. (tok_table_or_col source) a2)) " +
- "(tok_matched " +
- "(tok_update " +
- "(tok_set_columns_clause (= (tok_table_or_col b) (. (tok_table_or_col source) b2))))) " +
- "(tok_not_matched " +
- "tok_insert " +
- "(tok_value_row (. (tok_table_or_col source) a2) (. (tok_table_or_col source) b2))))");
-
- And need to produce a multi-insert like this to execute:
- FROM acidTbl right outer join nonAcidPart2 ON acidTbl.a = source.a2
- Insert into table acidTbl select nonAcidPart2.a2, nonAcidPart2.b2 where acidTbl.a is null
- INSERT INTO TABLE acidTbl select target.ROW__ID, nonAcidPart2.a2, nonAcidPart2.b2 where nonAcidPart2.a2=acidTbl.a sort by acidTbl.ROW__ID
- */
- /*todo: we need some sort of validation phase over original AST to make things user friendly; for example, if
- original command refers to a column that doesn't exist, this will be caught when processing the rewritten query but
- the errors will point at locations that the user can't map to anything
- - VALUES clause must have the same number of values as target table (including partition cols). Part cols go last in Select clause of Insert as Select
- todo: do we care to preserve comments in original SQL?
- todo: check if identifiers are propertly escaped/quoted in the generated SQL - it's currently inconsistent
- Look at UnparseTranslator.addIdentifierTranslation() - it does unescape + unparse...
- todo: consider "WHEN NOT MATCHED BY SOURCE THEN UPDATE SET TargetTable.Col1 = SourceTable.Col1 "; what happens when source is empty? This should be a runtime error - maybe not
- the outer side of ROJ is empty => the join produces 0 rows. If supporting WHEN NOT MATCHED BY SOURCE, then this should be a runtime error
- */
- ASTNode target = (ASTNode)tree.getChild(0);
- ASTNode source = (ASTNode)tree.getChild(1);
- String targetName = getSimpleTableName(target);
- String sourceName = getSimpleTableName(source);
- ASTNode onClause = (ASTNode) tree.getChild(2);
- String onClauseAsText = getMatchedText(onClause);
-
- int whenClauseBegins = 3;
- boolean hasHint = false;
- // query hint
- ASTNode qHint = (ASTNode) tree.getChild(3);
- if (qHint.getType() == HiveParser.QUERY_HINT) {
- hasHint = true;
- whenClauseBegins++;
- }
- Table targetTable = getTargetTable(target);
- validateTargetTable(targetTable);
- List<ASTNode> whenClauses = findWhenClauses(tree, whenClauseBegins);
-
- StringBuilder rewrittenQueryStr = new StringBuilder("FROM\n");
-
- rewrittenQueryStr.append(Indent).append(getFullTableNameForSQL(target));
- if(isAliased(target)) {
- rewrittenQueryStr.append(" ").append(targetName);
- }
- rewrittenQueryStr.append('\n');
- rewrittenQueryStr.append(Indent).append(chooseJoinType(whenClauses)).append("\n");
- if(source.getType() == HiveParser.TOK_SUBQUERY) {
- //this includes the mandatory alias
- rewrittenQueryStr.append(Indent).append(getMatchedText(source));
- }
- else {
- rewrittenQueryStr.append(Indent).append(getFullTableNameForSQL(source));
- if(isAliased(source)) {
- rewrittenQueryStr.append(" ").append(sourceName);
- }
- }
- rewrittenQueryStr.append('\n');
- rewrittenQueryStr.append(Indent).append("ON ").append(onClauseAsText).append('\n');
-
- // Add the hint if any
- String hintStr = null;
- if (hasHint) {
- hintStr = " /*+ " + qHint.getText() + " */ ";
- }
-
- /**
- * We allow at most 2 WHEN MATCHED clause, in which case 1 must be Update the other Delete
- * If we have both update and delete, the 1st one (in SQL code) must have "AND <extra predicate>"
- * so that the 2nd can ensure not to process the same rows.
- * Update and Delete may be in any order. (Insert is always last)
- */
- String extraPredicate = null;
- int numWhenMatchedUpdateClauses = 0, numWhenMatchedDeleteClauses = 0;
- int numInsertClauses = 0;
- boolean hintProcessed = false;
- for(ASTNode whenClause : whenClauses) {
- switch (getWhenClauseOperation(whenClause).getType()) {
- case HiveParser.TOK_INSERT:
- numInsertClauses++;
- handleInsert(whenClause, rewrittenQueryStr, target, onClause,
- targetTable, targetName, onClauseAsText, hintProcessed ? null : hintStr);
- hintProcessed = true;
- break;
- case HiveParser.TOK_UPDATE:
- numWhenMatchedUpdateClauses++;
- String s = handleUpdate(whenClause, rewrittenQueryStr, target,
- onClauseAsText, targetTable, extraPredicate, hintProcessed ? null : hintStr);
- hintProcessed = true;
- if(numWhenMatchedUpdateClauses + numWhenMatchedDeleteClauses == 1) {
- extraPredicate = s;//i.e. it's the 1st WHEN MATCHED
- }
- break;
- case HiveParser.TOK_DELETE:
- numWhenMatchedDeleteClauses++;
- String s1 = handleDelete(whenClause, rewrittenQueryStr, target,
- onClauseAsText, targetTable, extraPredicate, hintProcessed ? null : hintStr);
- hintProcessed = true;
- if(numWhenMatchedUpdateClauses + numWhenMatchedDeleteClauses == 1) {
- extraPredicate = s1;//i.e. it's the 1st WHEN MATCHED
- }
- break;
- default:
- throw new IllegalStateException("Unexpected WHEN clause type: " + whenClause.getType() +
- addParseInfo(whenClause));
- }
- if(numWhenMatchedDeleteClauses > 1) {
- throw new SemanticException(ErrorMsg.MERGE_TOO_MANY_DELETE, ctx.getCmd());
- }
- if(numWhenMatchedUpdateClauses > 1) {
- throw new SemanticException(ErrorMsg.MERGE_TOO_MANY_UPDATE, ctx.getCmd());
- }
- assert numInsertClauses < 2: "too many Insert clauses";
- }
- if(numWhenMatchedDeleteClauses + numWhenMatchedUpdateClauses == 2 && extraPredicate == null) {
- throw new SemanticException(ErrorMsg.MERGE_PREDIACTE_REQUIRED, ctx.getCmd());
- }
-
- boolean validating = handleCardinalityViolation(rewrittenQueryStr, target, onClauseAsText,
- targetTable, numWhenMatchedDeleteClauses == 0 && numWhenMatchedUpdateClauses == 0);
- ReparseResult rr = parseRewrittenQuery(rewrittenQueryStr, ctx.getCmd());
- Context rewrittenCtx = rr.rewrittenCtx;
- ASTNode rewrittenTree = rr.rewrittenTree;
- rewrittenCtx.setOperation(Context.Operation.MERGE);
-
- //set dest name mapping on new context; 1st chid is TOK_FROM
- for(int insClauseIdx = 1, whenClauseIdx = 0;
- insClauseIdx < rewrittenTree.getChildCount() - (validating ? 1 : 0/*skip cardinality violation clause*/);
- insClauseIdx++, whenClauseIdx++) {
- //we've added Insert clauses in order or WHEN items in whenClauses
- switch (getWhenClauseOperation(whenClauses.get(whenClauseIdx)).getType()) {
- case HiveParser.TOK_INSERT:
- rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.INSERT);
- break;
- case HiveParser.TOK_UPDATE:
- rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.UPDATE);
- break;
- case HiveParser.TOK_DELETE:
- rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.DELETE);
- break;
- default:
- assert false;
- }
- }
- if(validating) {
- //here means the last branch of the multi-insert is Cardinality Validation
- rewrittenCtx.addDestNamePrefix(rewrittenTree.getChildCount() - 1, Context.DestClausePrefix.INSERT);
- }
-
- try {
- useSuper = true;
- super.analyze(rewrittenTree, rewrittenCtx);
- } finally {
- useSuper = false;
- }
- updateOutputs(targetTable);
- }
-
- /**
- * SemanticAnalyzer will generate a WriteEntity for the target table since it doesn't know/check
- * if the read and write are of the same table in "insert ... select ....". Since DbTxnManager
- * uses Read/WriteEntity objects to decide which locks to acquire, we get more concurrency if we
- * have change the table WriteEntity to a set of partition WriteEntity objects based on
- * ReadEntity objects computed for this table.
- */
- private void updateOutputs(Table targetTable) {
- markReadEntityForUpdate();
-
- if(targetTable.isPartitioned()) {
- List<ReadEntity> partitionsRead = getRestrictedPartitionSet(targetTable);
- if(!partitionsRead.isEmpty()) {
- //if there is WriteEntity with WriteType=UPDATE/DELETE for target table, replace it with
- //WriteEntity for each partition
- List<WriteEntity> toRemove = new ArrayList<>();
- for(WriteEntity we : outputs) {
- WriteEntity.WriteType wt = we.getWriteType();
- if(isTargetTable(we, targetTable) &&
- (wt == WriteEntity.WriteType.UPDATE || wt == WriteEntity.WriteType.DELETE)) {
- /**
- * The assumption here is that SemanticAnalyzer will will generate ReadEntity for each
- * partition that exists and is matched by the WHERE clause (which may be all of them).
- * Since we don't allow updating the value of a partition column, we know that we always
- * write the same (or fewer) partitions than we read. Still, the write is a Dynamic
- * Partition write - see HIVE-15032.
- */
- toRemove.add(we);
- }
- }
- outputs.removeAll(toRemove);
- // TODO: why is this like that?
- for(ReadEntity re : partitionsRead) {
- for(WriteEntity original : toRemove) {
- //since we may have both Update and Delete branches, Auth needs to know
- WriteEntity we = new WriteEntity(re.getPartition(), original.getWriteType());
- we.setDynamicPartitionWrite(original.isDynamicPartitionWrite());
- outputs.add(we);
- }
- }
- }
- }
- }
- /**
- * If the optimizer has determined that it only has to read some of the partitions of the
- * target table to satisfy the query, then we know that the write side of update/delete
- * (and update/delete parts of merge)
- * can only write (at most) that set of partitions (since we currently don't allow updating
- * partition (or bucket) columns). So we want to replace the table level
- * WriteEntity in the outputs with WriteEntity for each of these partitions
- * ToDo: see if this should be moved to SemanticAnalyzer itself since it applies to any
- * insert which does a select against the same table. Then SemanticAnalyzer would also
- * be able to not use DP for the Insert...
- *
- * Note that the Insert of Merge may be creating new partitions and writing to partitions
- * which were not read (WHEN NOT MATCHED...). WriteEntity for that should be created
- * in MoveTask (or some other task after the query is complete)
- */
- private List<ReadEntity> getRestrictedPartitionSet(Table targetTable) {
- List<ReadEntity> partitionsRead = new ArrayList<>();
- for(ReadEntity re : inputs) {
- if(re.isFromTopLevelQuery && re.getType() == Entity.Type.PARTITION && isTargetTable(re, targetTable)) {
- partitionsRead.add(re);
- }
- }
- return partitionsRead;
- }
- /**
- * if there is no WHEN NOT MATCHED THEN INSERT, we don't outer join
- */
- private String chooseJoinType(List<ASTNode> whenClauses) {
- for(ASTNode whenClause : whenClauses) {
- if(getWhenClauseOperation(whenClause).getType() == HiveParser.TOK_INSERT) {
- return "RIGHT OUTER JOIN";
- }
- }
- return "INNER JOIN";
- }
- /**
- * does this Entity belong to target table (partition)
- */
- private boolean isTargetTable(Entity entity, Table targetTable) {
- //todo: https://issues.apache.org/jira/browse/HIVE-15048
- /**
- * is this the right way to compare? Should it just compare paths?
- * equals() impl looks heavy weight
- */
- return targetTable.equals(entity.getTable());
- }
-
- /**
- * Per SQL Spec ISO/IEC 9075-2:2011(E) Section 14.2 under "General Rules" Item 6/Subitem a/Subitem 2/Subitem B,
- * an error should be raised if > 1 row of "source" matches the same row in "target".
- * This should not affect the runtime of the query as it's running in parallel with other
- * branches of the multi-insert. It won't actually write any data to merge_tmp_table since the
- * cardinality_violation() UDF throws an error whenever it's called killing the query
- * @return true if another Insert clause was added
- */
- private boolean handleCardinalityViolation(StringBuilder rewrittenQueryStr, ASTNode target,
- String onClauseAsString, Table targetTable,
- boolean onlyHaveWhenNotMatchedClause)
- throws SemanticException {
- if(!conf.getBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK)) {
- LOG.info("Merge statement cardinality violation check is disabled: " +
- HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK.varname);
- return false;
- }
- if(onlyHaveWhenNotMatchedClause) {
- //if no update or delete in Merge, there is no need to to do cardinality check
- return false;
- }
- //this is a tmp table and thus Session scoped and acid requires SQL statement to be serial in a
- // given session, i.e. the name can be fixed across all invocations
- String tableName = "merge_tmp_table";
- rewrittenQueryStr.append("\nINSERT INTO ").append(tableName)
- .append("\n SELECT cardinality_violation(")
- .append(getSimpleTableName(target)).append(".ROW__ID");
- addPartitionColsToSelect(targetTable.getPartCols(), rewrittenQueryStr, target);
-
- rewrittenQueryStr.append(")\n WHERE ").append(onClauseAsString)
- .append(" GROUP BY ").append(getSimpleTableName(target)).append(".ROW__ID");
-
- addPartitionColsToSelect(targetTable.getPartCols(), rewrittenQueryStr, target);
-
- rewrittenQueryStr.append(" HAVING count(*) > 1");
- //say table T has partition p, we are generating
- //select cardinality_violation(ROW_ID, p) WHERE ... GROUP BY ROW__ID, p
- //the Group By args are passed to cardinality_violation to add the violating value to the error msg
- try {
- if (null == db.getTable(tableName, false)) {
- StorageFormat format = new StorageFormat(conf);
- format.processStorageFormat("TextFile");
- Table table = db.newTable(tableName);
- table.setSerializationLib(format.getSerde());
- List<FieldSchema> fields = new ArrayList<FieldSchema>();
- fields.add(new FieldSchema("val", "int", null));
- table.setFields(fields);
- table.setDataLocation(Warehouse.getDnsPath(new Path(SessionState.get().getTempTableSpace(),
- tableName), conf));
- table.getTTable().setTemporary(true);
- table.setStoredAsSubDirectories(false);
- table.setInputFormatClass(format.getInputFormat());
- table.setOutputFormatClass(format.getOutputFormat());
- db.createTable(table, true);
- }
- }
- catch(HiveException|MetaException e) {
- throw new SemanticException(e.getMessage(), e);
- }
- return true;
- }
- /**
- * @param onClauseAsString - because there is no clone() and we need to use in multiple places
- * @param deleteExtraPredicate - see notes at caller
- */
- private String handleUpdate(ASTNode whenMatchedUpdateClause, StringBuilder rewrittenQueryStr,
- ASTNode target, String onClauseAsString, Table targetTable,
- String deleteExtraPredicate, String hintStr) throws SemanticException {
- assert whenMatchedUpdateClause.getType() == HiveParser.TOK_MATCHED;
- assert getWhenClauseOperation(whenMatchedUpdateClause).getType() == HiveParser.TOK_UPDATE;
- String targetName = getSimpleTableName(target);
- rewrittenQueryStr.append("INSERT INTO ").append(getFullTableNameForSQL(target));
- addPartitionColsToInsert(targetTable.getPartCols(), rewrittenQueryStr);
- rewrittenQueryStr.append(" -- update clause\n SELECT ");
- if (hintStr != null) {
- rewrittenQueryStr.append(hintStr);
- }
- rewrittenQueryStr.append(targetName).append(".ROW__ID");
-
- ASTNode setClause = (ASTNode)getWhenClauseOperation(whenMatchedUpdateClause).getChild(0);
- //columns being updated -> update expressions; "setRCols" (last param) is null because we use actual expressions
- //before reparsing, i.e. they are known to SemanticAnalyzer logic
- Map<String, ASTNode> setColsExprs = collectSetColumnsAndExpressions(setClause, null, targetTable);
- //if target table has cols c1,c2,c3 and p1 partition col and we had "SET c2 = 5, c1 = current_date()" we want to end up with
- //insert into target (p1) select current_date(), 5, c3, p1 where ....
- //since we take the RHS of set exactly as it was in Input, we don't need to deal with quoting/escaping column/table names
- List<FieldSchema> nonPartCols = targetTable.getCols();
- for(FieldSchema fs : nonPartCols) {
- rewrittenQueryStr.append(", ");
- String name = fs.getName();
- if (setColsExprs.containsKey(name)) {
- String rhsExp = getMatchedText(setColsExprs.get(name));
- //"set a=5, b=8" - rhsExp picks up the next char (e.g. ',') from the token stream
- switch (rhsExp.charAt(rhsExp.length() - 1)) {
- case ',':
- case '\n':
- rhsExp = rhsExp.substring(0, rhsExp.length() - 1);
- }
- rewrittenQueryStr.append(rhsExp);
- }
- else {
- rewrittenQueryStr.append(getSimpleTableName(target)).append(".").append(HiveUtils.unparseIdentifier(name, this.conf));
- }
- }
- addPartitionColsToSelect(targetTable.getPartCols(), rewrittenQueryStr, target);
- rewrittenQueryStr.append("\n WHERE ").append(onClauseAsString);
- String extraPredicate = getWhenClausePredicate(whenMatchedUpdateClause);
- if(extraPredicate != null) {
- //we have WHEN MATCHED AND <boolean expr> THEN DELETE
- rewrittenQueryStr.append(" AND ").append(extraPredicate);
- }
- if(deleteExtraPredicate != null) {
- rewrittenQueryStr.append(" AND NOT(").append(deleteExtraPredicate).append(")");
- }
- rewrittenQueryStr.append("\n SORT BY ");
- rewrittenQueryStr.append(targetName).append(".ROW__ID \n");
-
- setUpAccessControlInfoForUpdate(targetTable, setColsExprs);
- //we don't deal with columns on RHS of SET expression since the whole expr is part of the
- //rewritten SQL statement and is thus handled by SemanticAnalzyer. Nor do we have to
- //figure which cols on RHS are from source and which from target
-
- return extraPredicate;
- }
- /**
- * @param onClauseAsString - because there is no clone() and we need to use in multiple places
- * @param updateExtraPredicate - see notes at caller
- */
- private String handleDelete(ASTNode whenMatchedDeleteClause, StringBuilder rewrittenQueryStr, ASTNode target,
- String onClauseAsString, Table targetTable, String updateExtraPredicate, String hintStr) throws SemanticException {
- assert whenMatchedDeleteClause.getType() == HiveParser.TOK_MATCHED;
- assert getWhenClauseOperation(whenMatchedDeleteClause).getType() == HiveParser.TOK_DELETE;
- List<FieldSchema> partCols = targetTable.getPartCols();
- String targetName = getSimpleTableName(target);
- rewrittenQueryStr.append("INSERT INTO ").append(getFullTableNameForSQL(target));
- addPartitionColsToInsert(partCols, rewrittenQueryStr);
-
- rewrittenQueryStr.append(" -- delete clause\n SELECT ");
- if (hintStr != null) {
- rewrittenQueryStr.append(hintStr);
- }
- rewrittenQueryStr.append(targetName).append(".ROW__ID ");
- addPartitionColsToSelect(partCols, rewrittenQueryStr, target);
- rewrittenQueryStr.append("\n WHERE ").append(onClauseAsString);
- String extraPredicate = getWhenClausePredicate(whenMatchedDeleteClause);
- if(extraPredicate != null) {
- //we have WHEN MATCHED AND <boolean expr> THEN DELETE
- rewrittenQueryStr.append(" AND ").append(extraPredicate);
- }
- if(updateExtraPredicate != null) {
- rewrittenQueryStr.append(" AND NOT(").append(updateExtraPredicate).append(")");
- }
- rewrittenQueryStr.append("\n SORT BY ");
- rewrittenQueryStr.append(targetName).append(".ROW__ID \n");
- return extraPredicate;
- }
- private static String addParseInfo(ASTNode n) {
- return " at " + ErrorMsg.renderPosition(n);
- }
-
- /**
- * Returns the table name to use in the generated query preserving original quotes/escapes if any
- * @see #getFullTableNameForSQL(ASTNode)
- */
- private String getSimpleTableName(ASTNode n) throws SemanticException {
- return HiveUtils.unparseIdentifier(getSimpleTableNameBase(n), this.conf);
- }
- private String getSimpleTableNameBase(ASTNode n) throws SemanticException {
- switch (n.getType()) {
- case HiveParser.TOK_TABREF:
- int aliasIndex = findTabRefIdxs(n)[0];
- if (aliasIndex != 0) {
- return n.getChild(aliasIndex).getText();//the alias
- }
- return getSimpleTableNameBase((ASTNode) n.getChild(0));
- case HiveParser.TOK_TABNAME:
- if(n.getChildCount() == 2) {
- //db.table -> return table
- return n.getChild(1).getText();
- }
- return n.getChild(0).getText();
- case HiveParser.TOK_SUBQUERY:
- return n.getChild(1).getText();//the alias
- default:
- throw raiseWrongType("TOK_TABREF|TOK_TABNAME|TOK_SUBQUERY", n);
- }
- }
-
- private static final class ReparseResult {
- private final ASTNode rewrittenTree;
- private final Context rewrittenCtx;
- ReparseResult(ASTNode n, Context c) {
- rewrittenTree = n;
- rewrittenCtx = c;
- }
- }
-
- private boolean isAliased(ASTNode n) {
- switch (n.getType()) {
- case HiveParser.TOK_TABREF:
- return findTabRefIdxs(n)[0] != 0;
- case HiveParser.TOK_TABNAME:
- return false;
- case HiveParser.TOK_SUBQUERY:
- assert n.getChildCount() > 1 : "Expected Derived Table to be aliased";
- return true;
- default:
- throw raiseWrongType("TOK_TABREF|TOK_TABNAME", n);
- }
- }
- /**
- * Collect WHEN clauses from Merge statement AST
- */
- private List<ASTNode> findWhenClauses(ASTNode tree, int start) throws SemanticException {
- assert tree.getType() == HiveParser.TOK_MERGE;
- List<ASTNode> whenClauses = new ArrayList<>();
- for(int idx = start; idx < tree.getChildCount(); idx++) {
- ASTNode whenClause = (ASTNode)tree.getChild(idx);
- assert whenClause.getType() == HiveParser.TOK_MATCHED ||
- whenClause.getType() == HiveParser.TOK_NOT_MATCHED :
- "Unexpected node type found: " + whenClause.getType() + addParseInfo(whenClause);
- whenClauses.add(whenClause);
- }
- if(whenClauses.size() <= 0) {
- //Futureproofing: the parser will actually not allow this
- throw new SemanticException("Must have at least 1 WHEN clause in MERGE statement");
- }
- return whenClauses;
- }
- private ASTNode getWhenClauseOperation(ASTNode whenClause) {
- if(!(whenClause.getType() == HiveParser.TOK_MATCHED || whenClause.getType() == HiveParser.TOK_NOT_MATCHED)) {
- throw raiseWrongType("Expected TOK_MATCHED|TOK_NOT_MATCHED", whenClause);
- }
- return (ASTNode) whenClause.getChild(0);
- }
- /**
- * returns the <boolean predicate> as in WHEN MATCHED AND <boolean predicate> THEN...
- * @return may be null
- */
- private String getWhenClausePredicate(ASTNode whenClause) {
- if(!(whenClause.getType() == HiveParser.TOK_MATCHED || whenClause.getType() == HiveParser.TOK_NOT_MATCHED)) {
- throw raiseWrongType("Expected TOK_MATCHED|TOK_NOT_MATCHED", whenClause);
- }
- if(whenClause.getChildCount() == 2) {
- return getMatchedText((ASTNode)whenClause.getChild(1));
- }
- return null;
- }
- /**
- * Generates the Insert leg of the multi-insert SQL to represent WHEN NOT MATCHED THEN INSERT clause
- * @param targetTableNameInSourceQuery - simple name/alias
- * @throws SemanticException
- */
- private void handleInsert(ASTNode whenNotMatchedClause, StringBuilder rewrittenQueryStr, ASTNode target,
- ASTNode onClause, Table targetTable, String targetTableNameInSourceQuery,
- String onClauseAsString, String hintStr) throws SemanticException {
- ASTNode whenClauseOperation = getWhenClauseOperation(whenNotMatchedClause);
- assert whenNotMatchedClause.getType() == HiveParser.TOK_NOT_MATCHED;
- assert whenClauseOperation.getType() == HiveParser.TOK_INSERT;
-
- // identify the node that contains the values to insert and the optional column list node
- ArrayList<Node> children = whenClauseOperation.getChildren();
- ASTNode valuesNode =
- (ASTNode)children.stream().filter(n -> ((ASTNode)n).getType() == HiveParser.TOK_FUNCTION).findFirst().get();
- ASTNode columnListNode =
- (ASTNode)children.stream().filter(n -> ((ASTNode)n).getType() == HiveParser.TOK_TABCOLNAME).findFirst()
- .orElse(null);
-
- // if column list is specified, then it has to have the same number of elements as the values
- // valuesNode has a child for struct, the rest are the columns
- if (columnListNode != null && columnListNode.getChildCount() != (valuesNode.getChildCount() - 1)) {
- throw new SemanticException(String.format("Column schema must have the same length as values (%d vs %d)",
- columnListNode.getChildCount(), valuesNode.getChildCount() - 1));
- }
-
- rewrittenQueryStr.append("INSERT INTO ").append(getFullTableNameForSQL(target));
- if (columnListNode != null) {
- rewrittenQueryStr.append(' ').append(getMatchedText(columnListNode));
- }
- addPartitionColsToInsert(targetTable.getPartCols(), rewrittenQueryStr);
-
- rewrittenQueryStr.append(" -- insert clause\n SELECT ");
- if (hintStr != null) {
- rewrittenQueryStr.append(hintStr);
- }
-
- OnClauseAnalyzer oca = new OnClauseAnalyzer(onClause, targetTable, targetTableNameInSourceQuery,
- conf, onClauseAsString);
- oca.analyze();
-
- String valuesClause = getMatchedText(valuesNode);
- valuesClause = valuesClause.substring(1, valuesClause.length() - 1);//strip '(' and ')'
- valuesClause = replaceDefaultKeywordForMerge(valuesClause, targetTable, columnListNode);
- rewrittenQueryStr.append(valuesClause).append("\n WHERE ").append(oca.getPredicate());
-
- String extraPredicate = getWhenClausePredicate(whenNotMatchedClause);
- if (extraPredicate != null) {
- //we have WHEN NOT MATCHED AND <boolean expr> THEN INSERT
- rewrittenQueryStr.append(" AND ")
- .append(getMatchedText(((ASTNode)whenNotMatchedClause.getChild(1)))).append('\n');
- }
- }
-
- private String replaceDefaultKeywordForMerge(String valueClause, Table table, ASTNode columnListNode)
- throws SemanticException {
- if (!valueClause.toLowerCase().contains("`default`")) {
- return valueClause;
- }
-
- Map<String, String> colNameToDefaultConstraint = getColNameToDefaultValueMap(table);
- String[] values = valueClause.trim().split(",");
- String[] replacedValues = new String[values.length];
-
- // the list of the column names may be set in the query
- String[] columnNames = columnListNode == null ?
- table.getAllCols().stream().map(f -> f.getName()).toArray(size -> new String[size]) :
- columnListNode.getChildren().stream().map(n -> ((ASTNode)n).toString()).toArray(size -> new String[size]);
-
- for (int i = 0; i < values.length; i++) {
- if (values[i].trim().toLowerCase().equals("`default`")) {
- replacedValues[i] = MapUtils.getString(colNameToDefaultConstraint, columnNames[i], "null");
- } else {
- replacedValues[i] = values[i];
- }
- }
- return StringUtils.join(replacedValues, ',');
+ private boolean updating() {
+ return operation == Context.Operation.UPDATE;
}
-
- /**
- * Suppose the input Merge statement has ON target.a = source.b and c = d. Assume, that 'c' is from
- * target table and 'd' is from source expression. In order to properly
- * generate the Insert for WHEN NOT MATCHED THEN INSERT, we need to make sure that the Where
- * clause of this Insert contains "target.a is null and target.c is null" This ensures that this
- * Insert leg does not receive any rows that are processed by Insert corresponding to
- * WHEN MATCHED THEN ... clauses. (Implicit in this is a mini resolver that figures out if an
- * unqualified column is part of the target table. We can get away with this simple logic because
- * we know that target is always a table (as opposed to some derived table).
- * The job of this class is to generate this predicate.
- *
- * Note that is this predicate cannot simply be NOT(on-clause-expr). IF on-clause-expr evaluates
- * to Unknown, it will be treated as False in the WHEN MATCHED Inserts but NOT(Unknown) = Unknown,
- * and so it will be False for WHEN NOT MATCHED Insert...
- */
- private static final class OnClauseAnalyzer {
- private final ASTNode onClause;
- private final Map<String, List<String>> table2column = new HashMap<>();
- private final List<String> unresolvedColumns = new ArrayList<>();
- private final List<FieldSchema> allTargetTableColumns = new ArrayList<>();
- private final Set<String> tableNamesFound = new HashSet<>();
- private final String targetTableNameInSourceQuery;
- private final HiveConf conf;
- private final String onClauseAsString;
- /**
- * @param targetTableNameInSourceQuery alias or simple name
- */
- OnClauseAnalyzer(ASTNode onClause, Table targetTable, String targetTableNameInSourceQuery,
- HiveConf conf, String onClauseAsString) {
- this.onClause = onClause;
- allTargetTableColumns.addAll(targetTable.getCols());
- allTargetTableColumns.addAll(targetTable.getPartCols());
- this.targetTableNameInSourceQuery = unescapeIdentifier(targetTableNameInSourceQuery);
- this.conf = conf;
- this.onClauseAsString = onClauseAsString;
- }
- /**
- * finds all columns and groups by table ref (if there is one)
- */
- private void visit(ASTNode n) {
- if(n.getType() == HiveParser.TOK_TABLE_OR_COL) {
- ASTNode parent = (ASTNode) n.getParent();
- if(parent != null && parent.getType() == HiveParser.DOT) {
- //the ref must be a table, so look for column name as right child of DOT
- if(parent.getParent() != null && parent.getParent().getType() == HiveParser.DOT) {
- //I don't think this can happen... but just in case
- throw new IllegalArgumentException("Found unexpected db.table.col reference in " + onClauseAsString);
- }
- addColumn2Table(n.getChild(0).getText(), parent.getChild(1).getText());
- }
- else {
- //must be just a column name
- unresolvedColumns.add(n.getChild(0).getText());
- }
- }
- if(n.getChildCount() == 0) {
- return;
- }
- for(Node child : n.getChildren()) {
- visit((ASTNode)child);
- }
- }
- private void analyze() {
- visit(onClause);
- if(tableNamesFound.size() > 2) {
- throw new IllegalArgumentException("Found > 2 table refs in ON clause. Found " +
- tableNamesFound + " in " + onClauseAsString);
- }
- handleUnresolvedColumns();
- if(tableNamesFound.size() > 2) {
- throw new IllegalArgumentException("Found > 2 table refs in ON clause (incl unresolved). " +
- "Found " + tableNamesFound + " in " + onClauseAsString);
- }
- }
- /**
- * Find those that belong to target table
- */
- private void handleUnresolvedColumns() {
- if(unresolvedColumns.isEmpty()) { return; }
- for(String c : unresolvedColumns) {
- for(FieldSchema fs : allTargetTableColumns) {
- if(c.equalsIgnoreCase(fs.getName())) {
- //c belongs to target table; strictly speaking there maybe an ambiguous ref but
- //this will be caught later when multi-insert is parsed
- addColumn2Table(targetTableNameInSourceQuery.toLowerCase(), c);
- break;
- }
- }
- }
- }
- private void addColumn2Table(String tableName, String columnName) {
- tableName = tableName.toLowerCase();//normalize name for mapping
- tableNamesFound.add(tableName);
- List<String> cols = table2column.get(tableName);
- if(cols == null) {
- cols = new ArrayList<>();
- table2column.put(tableName, cols);
- }
- //we want to preserve 'columnName' as it was in original input query so that rewrite
- //looks as much as possible like original query
- cols.add(columnName);
- }
- /**
- * Now generate the predicate for Where clause
- */
- private String getPredicate() {
- //normilize table name for mapping
- List<String> targetCols = table2column.get(targetTableNameInSourceQuery.toLowerCase());
- if(targetCols == null) {
- /*e.g. ON source.t=1
- * this is not strictly speaking invalid but it does ensure that all columns from target
- * table are all NULL for every row. This would make any WHEN MATCHED clause invalid since
- * we don't have a ROW__ID. The WHEN NOT MATCHED could be meaningful but it's just data from
- * source satisfying source.t=1... not worth the effort to support this*/
- throw new IllegalArgumentException(ErrorMsg.INVALID_TABLE_IN_ON_CLAUSE_OF_MERGE
- .format(targetTableNameInSourceQuery, onClauseAsString));
- }
- StringBuilder sb = new StringBuilder();
- for(String col : targetCols) {
- if(sb.length() > 0) {
- sb.append(" AND ");
- }
- //but preserve table name in SQL
- sb.append(HiveUtils.unparseIdentifier(targetTableNameInSourceQuery, conf)).append(".").append(HiveUtils.unparseIdentifier(col, conf)).append(" IS NULL");
- }
- return sb.toString();
- }
+ private boolean deleting() {
+ return operation == Context.Operation.DELETE;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/dcc89501/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java
index 423ca2a..93641af 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java
@@ -104,7 +104,7 @@ public class AlterTableDesc extends DDLDesc implements Serializable, DDLDesc.DDL
String newName;
ArrayList<FieldSchema> newCols;
String serdeName;
- HashMap<String, String> props;
+ Map<String, String> props;
String inputFormat;
String outputFormat;
String storageHandler;
@@ -484,7 +484,7 @@ public class AlterTableDesc extends DDLDesc implements Serializable, DDLDesc.DDL
* @return the props
*/
@Explain(displayName = "properties")
- public HashMap<String, String> getProps() {
+ public Map<String, String> getProps() {
return props;
}
@@ -492,7 +492,7 @@ public class AlterTableDesc extends DDLDesc implements Serializable, DDLDesc.DDL
* @param props
* the props to set
*/
- public void setProps(HashMap<String, String> props) {
+ public void setProps(Map<String, String> props) {
this.props = props;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/dcc89501/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java
index d91569e..f9d545f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java
@@ -102,7 +102,7 @@ public class ExportWork implements Serializable {
* For exporting Acid table, change the "pointer" to the temp table.
* This has to be done after the temp table is populated and all necessary Partition objects
* exist in the metastore.
- * See {@link org.apache.hadoop.hive.ql.parse.UpdateDeleteSemanticAnalyzer#isAcidExport(ASTNode)}
+ * See {@link org.apache.hadoop.hive.ql.parse.AcidExportAnalyzer#isAcidExport(ASTNode)}
* for more info.
*/
public void acidPostProcess(Hive db) throws HiveException {
[2/2] hive git commit: HIVE-20919 Break up
UpdateDeleteSemanticAnalyzer (Miklos Gergely via Eugene Koifman)
Posted by ek...@apache.org.
HIVE-20919 Break up UpdateDeleteSemanticAnalyzer (Miklos Gergely via Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/dcc89501
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/dcc89501
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/dcc89501
Branch: refs/heads/master
Commit: dcc8950164db00adac982d6764bbd8fe31e6897d
Parents: 4d03e31
Author: Miklos Gergely <mg...@hortonworks.com>
Authored: Wed Jan 9 15:15:16 2019 -0800
Committer: Eugene Koifman <ek...@apache.org>
Committed: Wed Jan 9 15:15:16 2019 -0800
----------------------------------------------------------------------
.../ql/parse/AcidExportSemanticAnalyzer.java | 299 ++++
.../hive/ql/parse/MergeSemanticAnalyzer.java | 760 ++++++++++
.../hive/ql/parse/RewriteSemanticAnalyzer.java | 451 ++++++
.../hive/ql/parse/SemanticAnalyzerFactory.java | 8 +-
.../ql/parse/UpdateDeleteSemanticAnalyzer.java | 1427 +-----------------
.../hadoop/hive/ql/plan/AlterTableDesc.java | 6 +-
.../apache/hadoop/hive/ql/plan/ExportWork.java | 2 +-
7 files changed, 1563 insertions(+), 1390 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/dcc89501/ql/src/java/org/apache/hadoop/hive/ql/parse/AcidExportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/AcidExportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/AcidExportSemanticAnalyzer.java
new file mode 100644
index 0000000..41e3754
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/AcidExportSemanticAnalyzer.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.antlr.runtime.tree.Tree;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.DDLTask;
+import org.apache.hadoop.hive.ql.exec.StatsTask;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
+import org.apache.hadoop.hive.ql.plan.CreateTableLikeDesc;
+import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.DropTableDesc;
+import org.apache.hadoop.hive.ql.plan.ExportWork;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+/**
+ * A subclass of the {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer} that just handles
+ * acid export statements. It works by rewriting the acid export into insert statements into a temporary table,
+ * and then export it from there.
+ */
+public class AcidExportSemanticAnalyzer extends RewriteSemanticAnalyzer {
+ AcidExportSemanticAnalyzer(QueryState queryState) throws SemanticException {
+ super(queryState);
+ }
+
+ protected void analyze(ASTNode tree) throws SemanticException {
+ if (tree.getToken().getType() != HiveParser.TOK_EXPORT) {
+ throw new RuntimeException("Asked to parse token " + tree.getName() + " in " +
+ "AcidExportSemanticAnalyzer");
+ }
+ analyzeAcidExport(tree);
+ }
+
+ /**
+ * Exporting an Acid table is more complicated than a flat table. It may contains delete events,
+ * which can only be interpreted properly withing the context of the table/metastore where they
+ * were generated. It may also contain insert events that belong to transactions that aborted
+ * where the same constraints apply.
+ * In order to make the export artifact free of these constraints, the export does a
+ * insert into tmpTable select * from <export table> to filter/apply the events in current
+ * context and then export the tmpTable. This export artifact can now be imported into any
+ * table on any cluster (subject to schema checks etc).
+ * See {@link #analyzeAcidExport(ASTNode)}
+ * @param tree Export statement
+ * @return true if exporting an Acid table.
+ */
+ public static boolean isAcidExport(ASTNode tree) throws SemanticException {
+ assert tree != null && tree.getToken() != null && tree.getToken().getType() == HiveParser.TOK_EXPORT;
+ Tree tokTab = tree.getChild(0);
+ assert tokTab != null && tokTab.getType() == HiveParser.TOK_TAB;
+ Table tableHandle = null;
+ try {
+ tableHandle = getTable((ASTNode) tokTab.getChild(0), Hive.get(), false);
+ } catch(HiveException ex) {
+ throw new SemanticException(ex);
+ }
+
+ //tableHandle can be null if table doesn't exist
+ return tableHandle != null && AcidUtils.isFullAcidTable(tableHandle);
+ }
+ private static String getTmptTableNameForExport(Table exportTable) {
+ String tmpTableDb = exportTable.getDbName();
+ String tmpTableName = exportTable.getTableName() + "_" + UUID.randomUUID().toString().replace('-', '_');
+ return Warehouse.getQualifiedName(tmpTableDb, tmpTableName);
+ }
+
+ /**
+ * See {@link #isAcidExport(ASTNode)}
+ * 1. create the temp table T
+ * 2. compile 'insert into T select * from acidTable'
+ * 3. compile 'export acidTable' (acidTable will be replaced with T during execution)
+ * 4. create task to drop T
+ *
+ * Using a true temp (session level) table means it should not affect replication and the table
+ * is not visible outside the Session that created for security
+ */
+ private void analyzeAcidExport(ASTNode ast) throws SemanticException {
+ assert ast != null && ast.getToken() != null && ast.getToken().getType() == HiveParser.TOK_EXPORT;
+ ASTNode tableTree = (ASTNode)ast.getChild(0);
+ assert tableTree != null && tableTree.getType() == HiveParser.TOK_TAB;
+ ASTNode tokRefOrNameExportTable = (ASTNode) tableTree.getChild(0);
+ Table exportTable = getTargetTable(tokRefOrNameExportTable);
+ assert AcidUtils.isFullAcidTable(exportTable);
+
+ //need to create the table "manually" rather than creating a task since it has to exist to
+ // compile the insert into T...
+ String newTableName = getTmptTableNameForExport(exportTable); //this is db.table
+ Map<String, String> tblProps = new HashMap<>();
+ tblProps.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.FALSE.toString());
+ String location;
+
+ // for temporary tables we set the location to something in the session's scratch dir
+ // it has the same life cycle as the tmp table
+ try {
+ // Generate a unique ID for temp table path.
+ // This path will be fixed for the life of the temp table.
+ Path path = new Path(SessionState.getTempTableSpace(conf), UUID.randomUUID().toString());
+ path = Warehouse.getDnsPath(path, conf);
+ location = path.toString();
+ } catch (MetaException err) {
+ throw new SemanticException("Error while generating temp table path:", err);
+ }
+
+ CreateTableLikeDesc ctlt = new CreateTableLikeDesc(newTableName,
+ false, true, null,
+ null, location, null, null,
+ tblProps,
+ true, //important so we get an exception on name collision
+ Warehouse.getQualifiedName(exportTable.getTTable()), false);
+ Table newTable;
+ try {
+ ReadEntity dbForTmpTable = new ReadEntity(db.getDatabase(exportTable.getDbName()));
+ inputs.add(dbForTmpTable); //so the plan knows we are 'reading' this db - locks, security...
+ DDLTask createTableTask = (DDLTask) TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), ctlt), conf);
+ createTableTask.setConf(conf); //above get() doesn't set it
+ createTableTask.execute(new DriverContext(new Context(conf)));
+ newTable = db.getTable(newTableName);
+ } catch(IOException|HiveException ex) {
+ throw new SemanticException(ex);
+ }
+
+ //now generate insert statement
+ //insert into newTableName select * from ts <where partition spec>
+ StringBuilder rewrittenQueryStr = generateExportQuery(newTable.getPartCols(), tokRefOrNameExportTable, tableTree,
+ newTableName);
+ ReparseResult rr = parseRewrittenQuery(rewrittenQueryStr, ctx.getCmd());
+ Context rewrittenCtx = rr.rewrittenCtx;
+ rewrittenCtx.setIsUpdateDeleteMerge(false); //it's set in parseRewrittenQuery()
+ ASTNode rewrittenTree = rr.rewrittenTree;
+ try {
+ useSuper = true;
+ //newTable has to exist at this point to compile
+ super.analyze(rewrittenTree, rewrittenCtx);
+ } finally {
+ useSuper = false;
+ }
+ //now we have the rootTasks set up for Insert ... Select
+ removeStatsTasks(rootTasks);
+ //now make an ExportTask from temp table
+ /*analyzeExport() creates TableSpec which in turn tries to build
+ "public List<Partition> partitions" by looking in the metastore to find Partitions matching
+ the partition spec in the Export command. These of course don't exist yet since we've not
+ ran the insert stmt yet!!!!!!!
+ */
+ Task<ExportWork> exportTask = ExportSemanticAnalyzer.analyzeExport(ast, newTableName, db, conf, inputs, outputs);
+
+ // Add an alter table task to set transactional props
+ // do it after populating temp table so that it's written as non-transactional table but
+ // update props before export so that export archive metadata has these props. This way when
+ // IMPORT is done for this archive and target table doesn't exist, it will be created as Acid.
+ AlterTableDesc alterTblDesc = new AlterTableDesc(AlterTableDesc.AlterTableTypes.ADDPROPS);
+ Map<String, String> mapProps = new HashMap<>();
+ mapProps.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.TRUE.toString());
+ alterTblDesc.setProps(mapProps);
+ alterTblDesc.setOldName(newTableName);
+ addExportTask(rootTasks, exportTask, TaskFactory.get(new DDLWork(getInputs(), getOutputs(), alterTblDesc)));
+
+ // Now make a task to drop temp table
+ // {@link DDLSemanticAnalyzer#analyzeDropTable(ASTNode ast, TableType expectedType)
+ ReplicationSpec replicationSpec = new ReplicationSpec();
+ DropTableDesc dropTblDesc = new DropTableDesc(newTableName, TableType.MANAGED_TABLE, false, true, replicationSpec);
+ Task<DDLWork> dropTask = TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), dropTblDesc), conf);
+ exportTask.addDependentTask(dropTask);
+ markReadEntityForUpdate();
+ if (ctx.isExplainPlan()) {
+ try {
+ //so that "explain" doesn't "leak" tmp tables
+ // TODO: catalog
+ db.dropTable(newTable.getDbName(), newTable.getTableName(), true, true, true);
+ } catch(HiveException ex) {
+ LOG.warn("Unable to drop " + newTableName + " due to: " + ex.getMessage(), ex);
+ }
+ }
+ }
+
+ /**
+ * Generate
+ * insert into newTableName select * from ts <where partition spec>
+ * for EXPORT command.
+ */
+ private StringBuilder generateExportQuery(List<FieldSchema> partCols, ASTNode tokRefOrNameExportTable,
+ ASTNode tableTree, String newTableName) throws SemanticException {
+ StringBuilder rewrittenQueryStr = new StringBuilder("insert into ").append(newTableName);
+ addPartitionColsToInsert(partCols, rewrittenQueryStr);
+ rewrittenQueryStr.append(" select * from ").append(getFullTableNameForSQL(tokRefOrNameExportTable));
+ //builds partition spec so we can build suitable WHERE clause
+ TableSpec exportTableSpec = new TableSpec(db, conf, tableTree, false, true);
+ if (exportTableSpec.getPartSpec() != null) {
+ StringBuilder whereClause = null;
+ int partColsIdx = -1; //keep track of corresponding col in partCols
+ for (Map.Entry<String, String> ent : exportTableSpec.getPartSpec().entrySet()) {
+ partColsIdx++;
+ if (ent.getValue() == null) {
+ continue; //partial spec
+ }
+ if (whereClause == null) {
+ whereClause = new StringBuilder(" WHERE ");
+ }
+ if (whereClause.length() > " WHERE ".length()) {
+ whereClause.append(" AND ");
+ }
+ whereClause.append(HiveUtils.unparseIdentifier(ent.getKey(), conf))
+ .append(" = ").append(genPartValueString(partCols.get(partColsIdx).getType(), ent.getValue()));
+ }
+ if (whereClause != null) {
+ rewrittenQueryStr.append(whereClause);
+ }
+ }
+ return rewrittenQueryStr;
+ }
+
+ /**
+ * Makes the exportTask run after all other tasks of the "insert into T ..." are done.
+ */
+ private void addExportTask(List<Task<?>> rootTasks,
+ Task<ExportWork> exportTask, Task<DDLWork> alterTable) {
+ for (Task<? extends Serializable> t : rootTasks) {
+ if (t.getNumChild() <= 0) {
+ //todo: ConditionalTask#addDependentTask(Task) doesn't do the right thing: HIVE-18978
+ t.addDependentTask(alterTable);
+ //this is a leaf so add exportTask to follow it
+ alterTable.addDependentTask(exportTask);
+ } else {
+ addExportTask(t.getDependentTasks(), exportTask, alterTable);
+ }
+ }
+ }
+
+ private void removeStatsTasks(List<Task<?>> rootTasks) {
+ List<Task<?>> statsTasks = findStatsTasks(rootTasks, null);
+ if (statsTasks == null) {
+ return;
+ }
+ for (Task<?> statsTask : statsTasks) {
+ if (statsTask.getParentTasks() == null) {
+ continue; //should never happen
+ }
+ for (Task<?> t : new ArrayList<>(statsTask.getParentTasks())) {
+ t.removeDependentTask(statsTask);
+ }
+ }
+ }
+
+ private List<Task<?>> findStatsTasks(
+ List<Task<?>> rootTasks, List<Task<?>> statsTasks) {
+ for (Task<? extends Serializable> t : rootTasks) {
+ if (t instanceof StatsTask) {
+ if (statsTasks == null) {
+ statsTasks = new ArrayList<>();
+ }
+ statsTasks.add(t);
+ }
+ if (t.getDependentTasks() != null) {
+ statsTasks = findStatsTasks(t.getDependentTasks(), statsTasks);
+ }
+ }
+ return statsTasks;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/dcc89501/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java
new file mode 100644
index 0000000..44f7b43
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java
@@ -0,0 +1,760 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.antlr.runtime.TokenRewriteStream;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+
+/**
+ * A subclass of the {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer} that just handles
+ * merge statements. It works by rewriting the updates and deletes into insert statements (since
+ * they are actually inserts) and then doing some patch up to make them work as merges instead.
+ */
+public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
+ MergeSemanticAnalyzer(QueryState queryState) throws SemanticException {
+ super(queryState);
+ }
+
+ @Override
+ public void analyze(ASTNode tree) throws SemanticException {
+ if (tree.getToken().getType() != HiveParser.TOK_MERGE) {
+ throw new RuntimeException("Asked to parse token " + tree.getName() + " in " +
+ "MergeSemanticAnalyzer");
+ }
+ analyzeMerge(tree);
+ }
+
+ private static final String INDENT = " ";
+
+ private IdentifierQuoter quotedIdenfierHelper;
+
+ /**
+ * This allows us to take an arbitrary ASTNode and turn it back into SQL that produced it.
+ * Since HiveLexer.g is written such that it strips away any ` (back ticks) around
+ * quoted identifiers we need to add those back to generated SQL.
+ * Additionally, the parser only produces tokens of type Identifier and never
+ * QuotedIdentifier (HIVE-6013). So here we just quote all identifiers.
+ * (') around String literals are retained w/o issues
+ */
+ private static class IdentifierQuoter {
+ private final TokenRewriteStream trs;
+ private final IdentityHashMap<ASTNode, ASTNode> visitedNodes = new IdentityHashMap<>();
+
+ IdentifierQuoter(TokenRewriteStream trs) {
+ this.trs = trs;
+ if (trs == null) {
+ throw new IllegalArgumentException("Must have a TokenRewriteStream");
+ }
+ }
+
+ private void visit(ASTNode n) {
+ if (n.getType() == HiveParser.Identifier) {
+ if (visitedNodes.containsKey(n)) {
+ /**
+ * Since we are modifying the stream, it's not idempotent. Ideally, the caller would take
+ * care to only quote Identifiers in each subtree once, but this makes it safe
+ */
+ return;
+ }
+ visitedNodes.put(n, n);
+ trs.insertBefore(n.getToken(), "`");
+ trs.insertAfter(n.getToken(), "`");
+ }
+ if (n.getChildCount() <= 0) {
+ return;
+ }
+ for (Node c : n.getChildren()) {
+ visit((ASTNode)c);
+ }
+ }
+ }
+
+ /**
+ * This allows us to take an arbitrary ASTNode and turn it back into SQL that produced it without
+ * needing to understand what it is (except for QuotedIdentifiers).
+ */
+ private String getMatchedText(ASTNode n) {
+ quotedIdenfierHelper.visit(n);
+ return ctx.getTokenRewriteStream().toString(n.getTokenStartIndex(),
+ n.getTokenStopIndex() + 1).trim();
+ }
+
+ /**
+ * Here we take a Merge statement AST and generate a semantically equivalent multi-insert
+ * statement to execute. Each Insert leg represents a single WHEN clause. As much as possible,
+ * the new SQL statement is made to look like the input SQL statement so that it's easier to map
+ * Query Compiler errors from generated SQL to original one this way.
+ * The generated SQL is a complete representation of the original input for the same reason.
+ * In many places SemanticAnalyzer throws exceptions that contain (line, position) coordinates.
+ * If generated SQL doesn't have everything and is patched up later, these coordinates point to
+ * the wrong place.
+ *
+ * @throws SemanticException
+ */
+ private void analyzeMerge(ASTNode tree) throws SemanticException {
+ quotedIdenfierHelper = new IdentifierQuoter(ctx.getTokenRewriteStream());
+ /*
+ * See org.apache.hadoop.hive.ql.parse.TestMergeStatement for some examples of the merge AST
+ For example, given:
+ MERGE INTO acidTbl USING nonAcidPart2 source ON acidTbl.a = source.a2
+ WHEN MATCHED THEN UPDATE SET b = source.b2
+ WHEN NOT MATCHED THEN INSERT VALUES (source.a2, source.b2)
+
+ We get AST like this:
+ "(tok_merge " +
+ "(tok_tabname acidtbl) (tok_tabref (tok_tabname nonacidpart2) source) " +
+ "(= (. (tok_table_or_col acidtbl) a) (. (tok_table_or_col source) a2)) " +
+ "(tok_matched " +
+ "(tok_update " +
+ "(tok_set_columns_clause (= (tok_table_or_col b) (. (tok_table_or_col source) b2))))) " +
+ "(tok_not_matched " +
+ "tok_insert " +
+ "(tok_value_row (. (tok_table_or_col source) a2) (. (tok_table_or_col source) b2))))");
+
+ And need to produce a multi-insert like this to execute:
+ FROM acidTbl RIGHT OUTER JOIN nonAcidPart2 ON acidTbl.a = source.a2
+ INSERT INTO TABLE acidTbl SELECT nonAcidPart2.a2, nonAcidPart2.b2 WHERE acidTbl.a IS null
+ INSERT INTO TABLE acidTbl SELECT target.ROW__ID, nonAcidPart2.a2, nonAcidPart2.b2
+ WHERE nonAcidPart2.a2=acidTbl.a SORT BY acidTbl.ROW__ID
+ */
+ /*todo: we need some sort of validation phase over original AST to make things user friendly; for example, if
+ original command refers to a column that doesn't exist, this will be caught when processing the rewritten query but
+ the errors will point at locations that the user can't map to anything
+ - VALUES clause must have the same number of values as target table (including partition cols). Part cols go last
+ in Select clause of Insert as Select
+ todo: do we care to preserve comments in original SQL?
+ todo: check if identifiers are propertly escaped/quoted in the generated SQL - it's currently inconsistent
+ Look at UnparseTranslator.addIdentifierTranslation() - it does unescape + unparse...
+ todo: consider "WHEN NOT MATCHED BY SOURCE THEN UPDATE SET TargetTable.Col1 = SourceTable.Col1 "; what happens when
+ source is empty? This should be a runtime error - maybe not the outer side of ROJ is empty => the join produces 0
+ rows. If supporting WHEN NOT MATCHED BY SOURCE, then this should be a runtime error
+ */
+ ASTNode target = (ASTNode)tree.getChild(0);
+ ASTNode source = (ASTNode)tree.getChild(1);
+ String targetName = getSimpleTableName(target);
+ String sourceName = getSimpleTableName(source);
+ ASTNode onClause = (ASTNode) tree.getChild(2);
+ String onClauseAsText = getMatchedText(onClause);
+
+ int whenClauseBegins = 3;
+ boolean hasHint = false;
+ // query hint
+ ASTNode qHint = (ASTNode) tree.getChild(3);
+ if (qHint.getType() == HiveParser.QUERY_HINT) {
+ hasHint = true;
+ whenClauseBegins++;
+ }
+ Table targetTable = getTargetTable(target);
+ validateTargetTable(targetTable);
+ List<ASTNode> whenClauses = findWhenClauses(tree, whenClauseBegins);
+
+ StringBuilder rewrittenQueryStr = new StringBuilder("FROM\n");
+
+ rewrittenQueryStr.append(INDENT).append(getFullTableNameForSQL(target));
+ if (isAliased(target)) {
+ rewrittenQueryStr.append(" ").append(targetName);
+ }
+ rewrittenQueryStr.append('\n');
+ rewrittenQueryStr.append(INDENT).append(chooseJoinType(whenClauses)).append("\n");
+ if (source.getType() == HiveParser.TOK_SUBQUERY) {
+ //this includes the mandatory alias
+ rewrittenQueryStr.append(INDENT).append(getMatchedText(source));
+ } else {
+ rewrittenQueryStr.append(INDENT).append(getFullTableNameForSQL(source));
+ if (isAliased(source)) {
+ rewrittenQueryStr.append(" ").append(sourceName);
+ }
+ }
+ rewrittenQueryStr.append('\n');
+ rewrittenQueryStr.append(INDENT).append("ON ").append(onClauseAsText).append('\n');
+
+ // Add the hint if any
+ String hintStr = null;
+ if (hasHint) {
+ hintStr = " /*+ " + qHint.getText() + " */ ";
+ }
+
+ /**
+ * We allow at most 2 WHEN MATCHED clause, in which case 1 must be Update the other Delete
+ * If we have both update and delete, the 1st one (in SQL code) must have "AND <extra predicate>"
+ * so that the 2nd can ensure not to process the same rows.
+ * Update and Delete may be in any order. (Insert is always last)
+ */
+ String extraPredicate = null;
+ int numWhenMatchedUpdateClauses = 0, numWhenMatchedDeleteClauses = 0;
+ int numInsertClauses = 0;
+ boolean hintProcessed = false;
+ for (ASTNode whenClause : whenClauses) {
+ switch (getWhenClauseOperation(whenClause).getType()) {
+ case HiveParser.TOK_INSERT:
+ numInsertClauses++;
+ handleInsert(whenClause, rewrittenQueryStr, target, onClause,
+ targetTable, targetName, onClauseAsText, hintProcessed ? null : hintStr);
+ hintProcessed = true;
+ break;
+ case HiveParser.TOK_UPDATE:
+ numWhenMatchedUpdateClauses++;
+ String s = handleUpdate(whenClause, rewrittenQueryStr, target,
+ onClauseAsText, targetTable, extraPredicate, hintProcessed ? null : hintStr);
+ hintProcessed = true;
+ if (numWhenMatchedUpdateClauses + numWhenMatchedDeleteClauses == 1) {
+ extraPredicate = s; //i.e. it's the 1st WHEN MATCHED
+ }
+ break;
+ case HiveParser.TOK_DELETE:
+ numWhenMatchedDeleteClauses++;
+ String s1 = handleDelete(whenClause, rewrittenQueryStr, target,
+ onClauseAsText, targetTable, extraPredicate, hintProcessed ? null : hintStr);
+ hintProcessed = true;
+ if (numWhenMatchedUpdateClauses + numWhenMatchedDeleteClauses == 1) {
+ extraPredicate = s1; //i.e. it's the 1st WHEN MATCHED
+ }
+ break;
+ default:
+ throw new IllegalStateException("Unexpected WHEN clause type: " + whenClause.getType() +
+ addParseInfo(whenClause));
+ }
+ if (numWhenMatchedDeleteClauses > 1) {
+ throw new SemanticException(ErrorMsg.MERGE_TOO_MANY_DELETE, ctx.getCmd());
+ }
+ if (numWhenMatchedUpdateClauses > 1) {
+ throw new SemanticException(ErrorMsg.MERGE_TOO_MANY_UPDATE, ctx.getCmd());
+ }
+ assert numInsertClauses < 2: "too many Insert clauses";
+ }
+ if (numWhenMatchedDeleteClauses + numWhenMatchedUpdateClauses == 2 && extraPredicate == null) {
+ throw new SemanticException(ErrorMsg.MERGE_PREDIACTE_REQUIRED, ctx.getCmd());
+ }
+
+ boolean validating = handleCardinalityViolation(rewrittenQueryStr, target, onClauseAsText, targetTable,
+ numWhenMatchedDeleteClauses == 0 && numWhenMatchedUpdateClauses == 0);
+ ReparseResult rr = parseRewrittenQuery(rewrittenQueryStr, ctx.getCmd());
+ Context rewrittenCtx = rr.rewrittenCtx;
+ ASTNode rewrittenTree = rr.rewrittenTree;
+ rewrittenCtx.setOperation(Context.Operation.MERGE);
+
+ //set dest name mapping on new context; 1st chid is TOK_FROM
+ for (int insClauseIdx = 1, whenClauseIdx = 0;
+ insClauseIdx < rewrittenTree.getChildCount() - (validating ? 1 : 0/*skip cardinality violation clause*/);
+ insClauseIdx++, whenClauseIdx++) {
+ //we've added Insert clauses in order or WHEN items in whenClauses
+ switch (getWhenClauseOperation(whenClauses.get(whenClauseIdx)).getType()) {
+ case HiveParser.TOK_INSERT:
+ rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.INSERT);
+ break;
+ case HiveParser.TOK_UPDATE:
+ rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.UPDATE);
+ break;
+ case HiveParser.TOK_DELETE:
+ rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.DELETE);
+ break;
+ default:
+ assert false;
+ }
+ }
+ if (validating) {
+ //here means the last branch of the multi-insert is Cardinality Validation
+ rewrittenCtx.addDestNamePrefix(rewrittenTree.getChildCount() - 1, Context.DestClausePrefix.INSERT);
+ }
+
+ try {
+ useSuper = true;
+ super.analyze(rewrittenTree, rewrittenCtx);
+ } finally {
+ useSuper = false;
+ }
+ updateOutputs(targetTable);
+ }
+
+ /**
+ * If there is no WHEN NOT MATCHED THEN INSERT, we don't outer join.
+ */
+ private String chooseJoinType(List<ASTNode> whenClauses) {
+ for (ASTNode whenClause : whenClauses) {
+ if (getWhenClauseOperation(whenClause).getType() == HiveParser.TOK_INSERT) {
+ return "RIGHT OUTER JOIN";
+ }
+ }
+ return "INNER JOIN";
+ }
+
+ /**
+ * Per SQL Spec ISO/IEC 9075-2:2011(E) Section 14.2 under "General Rules" Item 6/Subitem a/Subitem 2/Subitem B,
+ * an error should be raised if > 1 row of "source" matches the same row in "target".
+ * This should not affect the runtime of the query as it's running in parallel with other
+ * branches of the multi-insert. It won't actually write any data to merge_tmp_table since the
+ * cardinality_violation() UDF throws an error whenever it's called killing the query
+ * @return true if another Insert clause was added
+ */
+ private boolean handleCardinalityViolation(StringBuilder rewrittenQueryStr, ASTNode target,
+ String onClauseAsString, Table targetTable, boolean onlyHaveWhenNotMatchedClause)
+ throws SemanticException {
+ if (!conf.getBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK)) {
+ LOG.info("Merge statement cardinality violation check is disabled: " +
+ HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK.varname);
+ return false;
+ }
+ if (onlyHaveWhenNotMatchedClause) {
+ //if no update or delete in Merge, there is no need to to do cardinality check
+ return false;
+ }
+ //this is a tmp table and thus Session scoped and acid requires SQL statement to be serial in a
+ // given session, i.e. the name can be fixed across all invocations
+ String tableName = "merge_tmp_table";
+ rewrittenQueryStr.append("\nINSERT INTO ").append(tableName)
+ .append("\n SELECT cardinality_violation(")
+ .append(getSimpleTableName(target)).append(".ROW__ID");
+ addPartitionColsToSelect(targetTable.getPartCols(), rewrittenQueryStr, target);
+
+ rewrittenQueryStr.append(")\n WHERE ").append(onClauseAsString)
+ .append(" GROUP BY ").append(getSimpleTableName(target)).append(".ROW__ID");
+
+ addPartitionColsToSelect(targetTable.getPartCols(), rewrittenQueryStr, target);
+
+ rewrittenQueryStr.append(" HAVING count(*) > 1");
+ //say table T has partition p, we are generating
+ //select cardinality_violation(ROW_ID, p) WHERE ... GROUP BY ROW__ID, p
+ //the Group By args are passed to cardinality_violation to add the violating value to the error msg
+ try {
+ if (null == db.getTable(tableName, false)) {
+ StorageFormat format = new StorageFormat(conf);
+ format.processStorageFormat("TextFile");
+ Table table = db.newTable(tableName);
+ table.setSerializationLib(format.getSerde());
+ List<FieldSchema> fields = new ArrayList<FieldSchema>();
+ fields.add(new FieldSchema("val", "int", null));
+ table.setFields(fields);
+ table.setDataLocation(Warehouse.getDnsPath(new Path(SessionState.get().getTempTableSpace(),
+ tableName), conf));
+ table.getTTable().setTemporary(true);
+ table.setStoredAsSubDirectories(false);
+ table.setInputFormatClass(format.getInputFormat());
+ table.setOutputFormatClass(format.getOutputFormat());
+ db.createTable(table, true);
+ }
+ } catch(HiveException|MetaException e) {
+ throw new SemanticException(e.getMessage(), e);
+ }
+ return true;
+ }
+
+ /**
+ * @param onClauseAsString - because there is no clone() and we need to use in multiple places
+ * @param deleteExtraPredicate - see notes at caller
+ */
+ private String handleUpdate(ASTNode whenMatchedUpdateClause, StringBuilder rewrittenQueryStr, ASTNode target,
+ String onClauseAsString, Table targetTable, String deleteExtraPredicate, String hintStr)
+ throws SemanticException {
+ assert whenMatchedUpdateClause.getType() == HiveParser.TOK_MATCHED;
+ assert getWhenClauseOperation(whenMatchedUpdateClause).getType() == HiveParser.TOK_UPDATE;
+ String targetName = getSimpleTableName(target);
+ rewrittenQueryStr.append("INSERT INTO ").append(getFullTableNameForSQL(target));
+ addPartitionColsToInsert(targetTable.getPartCols(), rewrittenQueryStr);
+ rewrittenQueryStr.append(" -- update clause\n SELECT ");
+ if (hintStr != null) {
+ rewrittenQueryStr.append(hintStr);
+ }
+ rewrittenQueryStr.append(targetName).append(".ROW__ID");
+
+ ASTNode setClause = (ASTNode)getWhenClauseOperation(whenMatchedUpdateClause).getChild(0);
+ //columns being updated -> update expressions; "setRCols" (last param) is null because we use actual expressions
+ //before reparsing, i.e. they are known to SemanticAnalyzer logic
+ Map<String, ASTNode> setColsExprs = collectSetColumnsAndExpressions(setClause, null, targetTable);
+ //if target table has cols c1,c2,c3 and p1 partition col and we had "SET c2 = 5, c1 = current_date()" we want to end
+ //up with
+ //insert into target (p1) select current_date(), 5, c3, p1 where ....
+ //since we take the RHS of set exactly as it was in Input, we don't need to deal with quoting/escaping column/table
+ //names
+ List<FieldSchema> nonPartCols = targetTable.getCols();
+ for (FieldSchema fs : nonPartCols) {
+ rewrittenQueryStr.append(", ");
+ String name = fs.getName();
+ if (setColsExprs.containsKey(name)) {
+ String rhsExp = getMatchedText(setColsExprs.get(name));
+ //"set a=5, b=8" - rhsExp picks up the next char (e.g. ',') from the token stream
+ switch (rhsExp.charAt(rhsExp.length() - 1)) {
+ case ',':
+ case '\n':
+ rhsExp = rhsExp.substring(0, rhsExp.length() - 1);
+ break;
+ default:
+ //do nothing
+ }
+ rewrittenQueryStr.append(rhsExp);
+ } else {
+ rewrittenQueryStr.append(getSimpleTableName(target))
+ .append(".")
+ .append(HiveUtils.unparseIdentifier(name, this.conf));
+ }
+ }
+ addPartitionColsToSelect(targetTable.getPartCols(), rewrittenQueryStr, target);
+ rewrittenQueryStr.append("\n WHERE ").append(onClauseAsString);
+ String extraPredicate = getWhenClausePredicate(whenMatchedUpdateClause);
+ if (extraPredicate != null) {
+ //we have WHEN MATCHED AND <boolean expr> THEN DELETE
+ rewrittenQueryStr.append(" AND ").append(extraPredicate);
+ }
+ if (deleteExtraPredicate != null) {
+ rewrittenQueryStr.append(" AND NOT(").append(deleteExtraPredicate).append(")");
+ }
+ rewrittenQueryStr.append("\n SORT BY ");
+ rewrittenQueryStr.append(targetName).append(".ROW__ID \n");
+
+ setUpAccessControlInfoForUpdate(targetTable, setColsExprs);
+ //we don't deal with columns on RHS of SET expression since the whole expr is part of the
+ //rewritten SQL statement and is thus handled by SemanticAnalzyer. Nor do we have to
+ //figure which cols on RHS are from source and which from target
+
+ return extraPredicate;
+ }
+
+ /**
+ * @param onClauseAsString - because there is no clone() and we need to use in multiple places
+ * @param updateExtraPredicate - see notes at caller
+ */
+ private String handleDelete(ASTNode whenMatchedDeleteClause, StringBuilder rewrittenQueryStr, ASTNode target,
+ String onClauseAsString, Table targetTable, String updateExtraPredicate, String hintStr)
+ throws SemanticException {
+ assert whenMatchedDeleteClause.getType() == HiveParser.TOK_MATCHED;
+ assert getWhenClauseOperation(whenMatchedDeleteClause).getType() == HiveParser.TOK_DELETE;
+ List<FieldSchema> partCols = targetTable.getPartCols();
+ String targetName = getSimpleTableName(target);
+ rewrittenQueryStr.append("INSERT INTO ").append(getFullTableNameForSQL(target));
+ addPartitionColsToInsert(partCols, rewrittenQueryStr);
+
+ rewrittenQueryStr.append(" -- delete clause\n SELECT ");
+ if (hintStr != null) {
+ rewrittenQueryStr.append(hintStr);
+ }
+ rewrittenQueryStr.append(targetName).append(".ROW__ID ");
+ addPartitionColsToSelect(partCols, rewrittenQueryStr, target);
+ rewrittenQueryStr.append("\n WHERE ").append(onClauseAsString);
+ String extraPredicate = getWhenClausePredicate(whenMatchedDeleteClause);
+ if (extraPredicate != null) {
+ //we have WHEN MATCHED AND <boolean expr> THEN DELETE
+ rewrittenQueryStr.append(" AND ").append(extraPredicate);
+ }
+ if (updateExtraPredicate != null) {
+ rewrittenQueryStr.append(" AND NOT(").append(updateExtraPredicate).append(")");
+ }
+ rewrittenQueryStr.append("\n SORT BY ");
+ rewrittenQueryStr.append(targetName).append(".ROW__ID \n");
+ return extraPredicate;
+ }
+
+ private static String addParseInfo(ASTNode n) {
+ return " at " + ErrorMsg.renderPosition(n);
+ }
+
+ private boolean isAliased(ASTNode n) {
+ switch (n.getType()) {
+ case HiveParser.TOK_TABREF:
+ return findTabRefIdxs(n)[0] != 0;
+ case HiveParser.TOK_TABNAME:
+ return false;
+ case HiveParser.TOK_SUBQUERY:
+ assert n.getChildCount() > 1 : "Expected Derived Table to be aliased";
+ return true;
+ default:
+ throw raiseWrongType("TOK_TABREF|TOK_TABNAME", n);
+ }
+ }
+
+ /**
+ * Collect WHEN clauses from Merge statement AST.
+ */
+ private List<ASTNode> findWhenClauses(ASTNode tree, int start) throws SemanticException {
+ assert tree.getType() == HiveParser.TOK_MERGE;
+ List<ASTNode> whenClauses = new ArrayList<>();
+ for (int idx = start; idx < tree.getChildCount(); idx++) {
+ ASTNode whenClause = (ASTNode)tree.getChild(idx);
+ assert whenClause.getType() == HiveParser.TOK_MATCHED ||
+ whenClause.getType() == HiveParser.TOK_NOT_MATCHED :
+ "Unexpected node type found: " + whenClause.getType() + addParseInfo(whenClause);
+ whenClauses.add(whenClause);
+ }
+ if (whenClauses.size() <= 0) {
+ //Futureproofing: the parser will actually not allow this
+ throw new SemanticException("Must have at least 1 WHEN clause in MERGE statement");
+ }
+ return whenClauses;
+ }
+
+ private ASTNode getWhenClauseOperation(ASTNode whenClause) {
+ if (!(whenClause.getType() == HiveParser.TOK_MATCHED || whenClause.getType() == HiveParser.TOK_NOT_MATCHED)) {
+ throw raiseWrongType("Expected TOK_MATCHED|TOK_NOT_MATCHED", whenClause);
+ }
+ return (ASTNode) whenClause.getChild(0);
+ }
+
+ /**
+ * Returns the <boolean predicate> as in WHEN MATCHED AND <boolean predicate> THEN...
+ * @return may be null
+ */
+ private String getWhenClausePredicate(ASTNode whenClause) {
+ if (!(whenClause.getType() == HiveParser.TOK_MATCHED || whenClause.getType() == HiveParser.TOK_NOT_MATCHED)) {
+ throw raiseWrongType("Expected TOK_MATCHED|TOK_NOT_MATCHED", whenClause);
+ }
+ if (whenClause.getChildCount() == 2) {
+ return getMatchedText((ASTNode)whenClause.getChild(1));
+ }
+ return null;
+ }
+
+ /**
+ * Generates the Insert leg of the multi-insert SQL to represent WHEN NOT MATCHED THEN INSERT clause.
+ * @param targetTableNameInSourceQuery - simple name/alias
+ * @throws SemanticException
+ */
+ private void handleInsert(ASTNode whenNotMatchedClause, StringBuilder rewrittenQueryStr, ASTNode target,
+ ASTNode onClause, Table targetTable, String targetTableNameInSourceQuery, String onClauseAsString,
+ String hintStr) throws SemanticException {
+ ASTNode whenClauseOperation = getWhenClauseOperation(whenNotMatchedClause);
+ assert whenNotMatchedClause.getType() == HiveParser.TOK_NOT_MATCHED;
+ assert whenClauseOperation.getType() == HiveParser.TOK_INSERT;
+
+ // identify the node that contains the values to insert and the optional column list node
+ ArrayList<Node> children = whenClauseOperation.getChildren();
+ ASTNode valuesNode =
+ (ASTNode)children.stream().filter(n -> ((ASTNode)n).getType() == HiveParser.TOK_FUNCTION).findFirst().get();
+ ASTNode columnListNode =
+ (ASTNode)children.stream().filter(n -> ((ASTNode)n).getType() == HiveParser.TOK_TABCOLNAME).findFirst()
+ .orElse(null);
+
+ // if column list is specified, then it has to have the same number of elements as the values
+ // valuesNode has a child for struct, the rest are the columns
+ if (columnListNode != null && columnListNode.getChildCount() != (valuesNode.getChildCount() - 1)) {
+ throw new SemanticException(String.format("Column schema must have the same length as values (%d vs %d)",
+ columnListNode.getChildCount(), valuesNode.getChildCount() - 1));
+ }
+
+ rewrittenQueryStr.append("INSERT INTO ").append(getFullTableNameForSQL(target));
+ if (columnListNode != null) {
+ rewrittenQueryStr.append(' ').append(getMatchedText(columnListNode));
+ }
+ addPartitionColsToInsert(targetTable.getPartCols(), rewrittenQueryStr);
+
+ rewrittenQueryStr.append(" -- insert clause\n SELECT ");
+ if (hintStr != null) {
+ rewrittenQueryStr.append(hintStr);
+ }
+
+ OnClauseAnalyzer oca = new OnClauseAnalyzer(onClause, targetTable, targetTableNameInSourceQuery,
+ conf, onClauseAsString);
+ oca.analyze();
+
+ String valuesClause = getMatchedText(valuesNode);
+ valuesClause = valuesClause.substring(1, valuesClause.length() - 1); //strip '(' and ')'
+ valuesClause = replaceDefaultKeywordForMerge(valuesClause, targetTable, columnListNode);
+ rewrittenQueryStr.append(valuesClause).append("\n WHERE ").append(oca.getPredicate());
+
+ String extraPredicate = getWhenClausePredicate(whenNotMatchedClause);
+ if (extraPredicate != null) {
+ //we have WHEN NOT MATCHED AND <boolean expr> THEN INSERT
+ rewrittenQueryStr.append(" AND ")
+ .append(getMatchedText(((ASTNode)whenNotMatchedClause.getChild(1)))).append('\n');
+ }
+ }
+
+ private String replaceDefaultKeywordForMerge(String valueClause, Table table, ASTNode columnListNode)
+ throws SemanticException {
+ if (!valueClause.toLowerCase().contains("`default`")) {
+ return valueClause;
+ }
+
+ Map<String, String> colNameToDefaultConstraint = getColNameToDefaultValueMap(table);
+ String[] values = valueClause.trim().split(",");
+ String[] replacedValues = new String[values.length];
+
+ // the list of the column names may be set in the query
+ String[] columnNames = columnListNode == null ?
+ table.getAllCols().stream().map(f -> f.getName()).toArray(size -> new String[size]) :
+ columnListNode.getChildren().stream().map(n -> ((ASTNode)n).toString()).toArray(size -> new String[size]);
+
+ for (int i = 0; i < values.length; i++) {
+ if (values[i].trim().toLowerCase().equals("`default`")) {
+ replacedValues[i] = MapUtils.getString(colNameToDefaultConstraint, columnNames[i], "null");
+ } else {
+ replacedValues[i] = values[i];
+ }
+ }
+ return StringUtils.join(replacedValues, ',');
+ }
+
+ /**
+ * Suppose the input Merge statement has ON target.a = source.b and c = d. Assume, that 'c' is from
+ * target table and 'd' is from source expression. In order to properly
+ * generate the Insert for WHEN NOT MATCHED THEN INSERT, we need to make sure that the Where
+ * clause of this Insert contains "target.a is null and target.c is null" This ensures that this
+ * Insert leg does not receive any rows that are processed by Insert corresponding to
+ * WHEN MATCHED THEN ... clauses. (Implicit in this is a mini resolver that figures out if an
+ * unqualified column is part of the target table. We can get away with this simple logic because
+ * we know that target is always a table (as opposed to some derived table).
+ * The job of this class is to generate this predicate.
+ *
+ * Note that is this predicate cannot simply be NOT(on-clause-expr). IF on-clause-expr evaluates
+ * to Unknown, it will be treated as False in the WHEN MATCHED Inserts but NOT(Unknown) = Unknown,
+ * and so it will be False for WHEN NOT MATCHED Insert...
+ */
+ private static final class OnClauseAnalyzer {
+ private final ASTNode onClause;
+ private final Map<String, List<String>> table2column = new HashMap<>();
+ private final List<String> unresolvedColumns = new ArrayList<>();
+ private final List<FieldSchema> allTargetTableColumns = new ArrayList<>();
+ private final Set<String> tableNamesFound = new HashSet<>();
+ private final String targetTableNameInSourceQuery;
+ private final HiveConf conf;
+ private final String onClauseAsString;
+
+ /**
+ * @param targetTableNameInSourceQuery alias or simple name
+ */
+ OnClauseAnalyzer(ASTNode onClause, Table targetTable, String targetTableNameInSourceQuery,
+ HiveConf conf, String onClauseAsString) {
+ this.onClause = onClause;
+ allTargetTableColumns.addAll(targetTable.getCols());
+ allTargetTableColumns.addAll(targetTable.getPartCols());
+ this.targetTableNameInSourceQuery = unescapeIdentifier(targetTableNameInSourceQuery);
+ this.conf = conf;
+ this.onClauseAsString = onClauseAsString;
+ }
+
+ /**
+ * Finds all columns and groups by table ref (if there is one).
+ */
+ private void visit(ASTNode n) {
+ if (n.getType() == HiveParser.TOK_TABLE_OR_COL) {
+ ASTNode parent = (ASTNode) n.getParent();
+ if (parent != null && parent.getType() == HiveParser.DOT) {
+ //the ref must be a table, so look for column name as right child of DOT
+ if (parent.getParent() != null && parent.getParent().getType() == HiveParser.DOT) {
+ //I don't think this can happen... but just in case
+ throw new IllegalArgumentException("Found unexpected db.table.col reference in " + onClauseAsString);
+ }
+ addColumn2Table(n.getChild(0).getText(), parent.getChild(1).getText());
+ } else {
+ //must be just a column name
+ unresolvedColumns.add(n.getChild(0).getText());
+ }
+ }
+ if (n.getChildCount() == 0) {
+ return;
+ }
+ for (Node child : n.getChildren()) {
+ visit((ASTNode)child);
+ }
+ }
+
+ private void analyze() {
+ visit(onClause);
+ if (tableNamesFound.size() > 2) {
+ throw new IllegalArgumentException("Found > 2 table refs in ON clause. Found " +
+ tableNamesFound + " in " + onClauseAsString);
+ }
+ handleUnresolvedColumns();
+ if (tableNamesFound.size() > 2) {
+ throw new IllegalArgumentException("Found > 2 table refs in ON clause (incl unresolved). " +
+ "Found " + tableNamesFound + " in " + onClauseAsString);
+ }
+ }
+
+ /**
+ * Find those that belong to target table.
+ */
+ private void handleUnresolvedColumns() {
+ if (unresolvedColumns.isEmpty()) {
+ return;
+ }
+ for (String c : unresolvedColumns) {
+ for (FieldSchema fs : allTargetTableColumns) {
+ if (c.equalsIgnoreCase(fs.getName())) {
+ //c belongs to target table; strictly speaking there maybe an ambiguous ref but
+ //this will be caught later when multi-insert is parsed
+ addColumn2Table(targetTableNameInSourceQuery.toLowerCase(), c);
+ break;
+ }
+ }
+ }
+ }
+
+ private void addColumn2Table(String tableName, String columnName) {
+ tableName = tableName.toLowerCase(); //normalize name for mapping
+ tableNamesFound.add(tableName);
+ List<String> cols = table2column.get(tableName);
+ if (cols == null) {
+ cols = new ArrayList<>();
+ table2column.put(tableName, cols);
+ }
+ //we want to preserve 'columnName' as it was in original input query so that rewrite
+ //looks as much as possible like original query
+ cols.add(columnName);
+ }
+
+ /**
+ * Now generate the predicate for Where clause.
+ */
+ private String getPredicate() {
+ //normilize table name for mapping
+ List<String> targetCols = table2column.get(targetTableNameInSourceQuery.toLowerCase());
+ if (targetCols == null) {
+ /*e.g. ON source.t=1
+ * this is not strictly speaking invalid but it does ensure that all columns from target
+ * table are all NULL for every row. This would make any WHEN MATCHED clause invalid since
+ * we don't have a ROW__ID. The WHEN NOT MATCHED could be meaningful but it's just data from
+ * source satisfying source.t=1... not worth the effort to support this*/
+ throw new IllegalArgumentException(ErrorMsg.INVALID_TABLE_IN_ON_CLAUSE_OF_MERGE
+ .format(targetTableNameInSourceQuery, onClauseAsString));
+ }
+ StringBuilder sb = new StringBuilder();
+ for (String col : targetCols) {
+ if (sb.length() > 0) {
+ sb.append(" AND ");
+ }
+ //but preserve table name in SQL
+ sb.append(HiveUtils.unparseIdentifier(targetTableNameInSourceQuery, conf))
+ .append(".")
+ .append(HiveUtils.unparseIdentifier(col, conf))
+ .append(" IS NULL");
+ }
+ return sb.toString();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/dcc89501/ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java
new file mode 100644
index 0000000..6caac11
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.hooks.Entity;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A subclass of the {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer} that just handles
+ * update, delete and merge statements. It works by rewriting the updates and deletes into insert
+ * statements (since they are actually inserts) and then doing some patch up to make them work as
+ * updates and deletes instead.
+ */
+public abstract class RewriteSemanticAnalyzer extends SemanticAnalyzer {
+ protected static final Logger LOG = LoggerFactory.getLogger(RewriteSemanticAnalyzer.class);
+
+ protected boolean useSuper = false;
+
+ RewriteSemanticAnalyzer(QueryState queryState) throws SemanticException {
+ super(queryState);
+ }
+
+ @Override
+ public void analyzeInternal(ASTNode tree) throws SemanticException {
+ if (useSuper) {
+ super.analyzeInternal(tree);
+ } else {
+ if (!getTxnMgr().supportsAcid()) {
+ throw new SemanticException(ErrorMsg.ACID_OP_ON_NONACID_TXNMGR.getMsg());
+ }
+ analyze(tree);
+ cleanUpMetaColumnAccessControl();
+ }
+ }
+
+ protected abstract void analyze(ASTNode tree) throws SemanticException;
+
+ /**
+ * Append list of partition columns to Insert statement, i.e. the 2nd set of partCol1,partCol2
+ * INSERT INTO T PARTITION(partCol1,partCol2...) SELECT col1, ... partCol1,partCol2...
+ * @param target target table
+ */
+ protected void addPartitionColsToSelect(List<FieldSchema> partCols, StringBuilder rewrittenQueryStr,
+ ASTNode target) throws SemanticException {
+ String targetName = target != null ? getSimpleTableName(target) : null;
+
+ // If the table is partitioned, we need to select the partition columns as well.
+ if (partCols != null) {
+ for (FieldSchema fschema : partCols) {
+ rewrittenQueryStr.append(", ");
+ //would be nice if there was a way to determine if quotes are needed
+ if (targetName != null) {
+ rewrittenQueryStr.append(targetName).append('.');
+ }
+ rewrittenQueryStr.append(HiveUtils.unparseIdentifier(fschema.getName(), this.conf));
+ }
+ }
+ }
+
+ /**
+ * Assert that we are not asked to update a bucketing column or partition column.
+ * @param colName it's the A in "SET A = B"
+ */
+ protected void checkValidSetClauseTarget(ASTNode colName, Table targetTable) throws SemanticException {
+ String columnName = normalizeColName(colName.getText());
+
+ // Make sure this isn't one of the partitioning columns, that's not supported.
+ for (FieldSchema fschema : targetTable.getPartCols()) {
+ if (fschema.getName().equalsIgnoreCase(columnName)) {
+ throw new SemanticException(ErrorMsg.UPDATE_CANNOT_UPDATE_PART_VALUE.getMsg());
+ }
+ }
+ //updating bucket column should move row from one file to another - not supported
+ if (targetTable.getBucketCols() != null && targetTable.getBucketCols().contains(columnName)) {
+ throw new SemanticException(ErrorMsg.UPDATE_CANNOT_UPDATE_BUCKET_VALUE, columnName);
+ }
+ boolean foundColumnInTargetTable = false;
+ for (FieldSchema col : targetTable.getCols()) {
+ if (columnName.equalsIgnoreCase(col.getName())) {
+ foundColumnInTargetTable = true;
+ break;
+ }
+ }
+ if (!foundColumnInTargetTable) {
+ throw new SemanticException(ErrorMsg.INVALID_TARGET_COLUMN_IN_SET_CLAUSE, colName.getText(),
+ targetTable.getFullyQualifiedName());
+ }
+ }
+
+ protected ASTNode findLHSofAssignment(ASTNode assignment) {
+ assert assignment.getToken().getType() == HiveParser.EQUAL :
+ "Expected set assignments to use equals operator but found " + assignment.getName();
+ ASTNode tableOrColTok = (ASTNode)assignment.getChildren().get(0);
+ assert tableOrColTok.getToken().getType() == HiveParser.TOK_TABLE_OR_COL :
+ "Expected left side of assignment to be table or column";
+ ASTNode colName = (ASTNode)tableOrColTok.getChildren().get(0);
+ assert colName.getToken().getType() == HiveParser.Identifier :
+ "Expected column name";
+ return colName;
+ }
+
+ protected Map<String, ASTNode> collectSetColumnsAndExpressions(ASTNode setClause,
+ Set<String> setRCols, Table targetTable) throws SemanticException {
+ // An update needs to select all of the columns, as we rewrite the entire row. Also,
+ // we need to figure out which columns we are going to replace.
+ assert setClause.getToken().getType() == HiveParser.TOK_SET_COLUMNS_CLAUSE :
+ "Expected second child of update token to be set token";
+
+ // Get the children of the set clause, each of which should be a column assignment
+ List<? extends Node> assignments = setClause.getChildren();
+ // Must be deterministic order map for consistent q-test output across Java versions
+ Map<String, ASTNode> setCols = new LinkedHashMap<String, ASTNode>(assignments.size());
+ for (Node a : assignments) {
+ ASTNode assignment = (ASTNode)a;
+ ASTNode colName = findLHSofAssignment(assignment);
+ if (setRCols != null) {
+ addSetRCols((ASTNode) assignment.getChildren().get(1), setRCols);
+ }
+ checkValidSetClauseTarget(colName, targetTable);
+
+ String columnName = normalizeColName(colName.getText());
+ // This means that in UPDATE T SET x = _something_
+ // _something_ can be whatever is supported in SELECT _something_
+ setCols.put(columnName, (ASTNode)assignment.getChildren().get(1));
+ }
+ return setCols;
+ }
+
+ /**
+ * @return the Metastore representation of the target table
+ */
+ protected Table getTargetTable(ASTNode tabRef) throws SemanticException {
+ return getTable(tabRef, db, true);
+ }
+
+ /**
+ * @param throwException if false, return null if table doesn't exist, else throw
+ */
+ protected static Table getTable(ASTNode tabRef, Hive db, boolean throwException) throws SemanticException {
+ String[] tableName;
+ switch (tabRef.getType()) {
+ case HiveParser.TOK_TABREF:
+ tableName = getQualifiedTableName((ASTNode) tabRef.getChild(0));
+ break;
+ case HiveParser.TOK_TABNAME:
+ tableName = getQualifiedTableName(tabRef);
+ break;
+ default:
+ throw raiseWrongType("TOK_TABREF|TOK_TABNAME", tabRef);
+ }
+
+ Table mTable;
+ try {
+ mTable = db.getTable(tableName[0], tableName[1], throwException);
+ } catch (InvalidTableException e) {
+ LOG.error("Failed to find table " + getDotName(tableName) + " got exception " + e.getMessage());
+ throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(getDotName(tableName)), e);
+ } catch (HiveException e) {
+ LOG.error("Failed to find table " + getDotName(tableName) + " got exception " + e.getMessage());
+ throw new SemanticException(e.getMessage(), e);
+ }
+ return mTable;
+ }
+
+ /**
+ * Walk through all our inputs and set them to note that this read is part of an update or a delete.
+ */
+ protected void markReadEntityForUpdate() {
+ for (ReadEntity input : inputs) {
+ if (isWritten(input)) {
+ //TODO: this is actually not adding anything since LockComponent uses a Trie to "promote" a lock
+ //except by accident - when we have a partitioned target table we have a ReadEntity and WriteEntity
+ //for the table, so we mark ReadEntity and then delete WriteEntity (replace with Partition entries)
+ //so DbTxnManager skips Read lock on the ReadEntity....
+ input.setUpdateOrDelete(true); //input.noLockNeeded()?
+ }
+ }
+ }
+
+ /**
+ * For updates, we need to set the column access info so that it contains information on
+ * the columns we are updating.
+ * (But not all the columns of the target table even though the rewritten query writes
+ * all columns of target table since that is an implmentation detail).
+ */
+ protected void setUpAccessControlInfoForUpdate(Table mTable, Map<String, ASTNode> setCols) {
+ ColumnAccessInfo cai = new ColumnAccessInfo();
+ for (String colName : setCols.keySet()) {
+ cai.add(Table.getCompleteName(mTable.getDbName(), mTable.getTableName()), colName);
+ }
+ setUpdateColumnAccessInfo(cai);
+ }
+
+ /**
+ * We need to weed ROW__ID out of the input column info, as it doesn't make any sense to
+ * require the user to have authorization on that column.
+ */
+ private void cleanUpMetaColumnAccessControl() {
+ //we do this for Update/Delete (incl Merge) because we introduce this column into the query
+ //as part of rewrite
+ if (columnAccessInfo != null) {
+ columnAccessInfo.stripVirtualColumn(VirtualColumn.ROWID);
+ }
+ }
+
+ /**
+ * Parse the newly generated SQL statement to get a new AST.
+ */
+ protected ReparseResult parseRewrittenQuery(StringBuilder rewrittenQueryStr, String originalQuery)
+ throws SemanticException {
+ // Set dynamic partitioning to nonstrict so that queries do not need any partition
+ // references.
+ // TODO: this may be a perf issue as it prevents the optimizer.. or not
+ HiveConf.setVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+ // Disable LLAP IO wrapper; doesn't propagate extra ACID columns correctly.
+ HiveConf.setBoolVar(conf, ConfVars.LLAP_IO_ROW_WRAPPER_ENABLED, false);
+ // Parse the rewritten query string
+ Context rewrittenCtx;
+ try {
+ rewrittenCtx = new Context(conf);
+ rewrittenCtx.setHDFSCleanup(true);
+ // We keep track of all the contexts that are created by this query
+ // so we can clear them when we finish execution
+ ctx.addRewrittenStatementContext(rewrittenCtx);
+ } catch (IOException e) {
+ throw new SemanticException(ErrorMsg.UPDATEDELETE_IO_ERROR.getMsg());
+ }
+ rewrittenCtx.setExplainConfig(ctx.getExplainConfig());
+ rewrittenCtx.setExplainPlan(ctx.isExplainPlan());
+ rewrittenCtx.setStatsSource(ctx.getStatsSource());
+ rewrittenCtx.setPlanMapper(ctx.getPlanMapper());
+ rewrittenCtx.setIsUpdateDeleteMerge(true);
+ rewrittenCtx.setCmd(rewrittenQueryStr.toString());
+
+ ASTNode rewrittenTree;
+ try {
+ LOG.info("Going to reparse <" + originalQuery + "> as \n<" + rewrittenQueryStr.toString() + ">");
+ rewrittenTree = ParseUtils.parse(rewrittenQueryStr.toString(), rewrittenCtx);
+ } catch (ParseException e) {
+ throw new SemanticException(ErrorMsg.UPDATEDELETE_PARSE_ERROR.getMsg(), e);
+ }
+ return new ReparseResult(rewrittenTree, rewrittenCtx);
+ }
+
+ /**
+ * Assert it supports Acid write.
+ */
+ protected void validateTargetTable(Table mTable) throws SemanticException {
+ if (mTable.getTableType() == TableType.VIRTUAL_VIEW || mTable.getTableType() == TableType.MATERIALIZED_VIEW) {
+ LOG.error("Table " + mTable.getFullyQualifiedName() + " is a view or materialized view");
+ throw new SemanticException(ErrorMsg.UPDATE_DELETE_VIEW.getMsg());
+ }
+ }
+
+ /**
+ * Check that {@code readEntity} is also being written.
+ */
+ private boolean isWritten(Entity readEntity) {
+ for (Entity writeEntity : outputs) {
+ //make sure to compare them as Entity, i.e. that it's the same table or partition, etc
+ if (writeEntity.toString().equalsIgnoreCase(readEntity.toString())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ // This method finds any columns on the right side of a set statement (thus rcols) and puts them
+ // in a set so we can add them to the list of input cols to check.
+ private void addSetRCols(ASTNode node, Set<String> setRCols) {
+
+ // See if this node is a TOK_TABLE_OR_COL. If so, find the value and put it in the list. If
+ // not, recurse on any children
+ if (node.getToken().getType() == HiveParser.TOK_TABLE_OR_COL) {
+ ASTNode colName = (ASTNode)node.getChildren().get(0);
+ assert colName.getToken().getType() == HiveParser.Identifier :
+ "Expected column name";
+ setRCols.add(normalizeColName(colName.getText()));
+ } else if (node.getChildren() != null) {
+ for (Node n : node.getChildren()) {
+ addSetRCols((ASTNode)n, setRCols);
+ }
+ }
+ }
+
+ /**
+ * Column names are stored in metastore in lower case, regardless of the CREATE TABLE statement.
+ * Unfortunately there is no single place that normalizes the input query.
+ * @param colName not null
+ */
+ private static String normalizeColName(String colName) {
+ return colName.toLowerCase();
+ }
+
+ /**
+ * SemanticAnalyzer will generate a WriteEntity for the target table since it doesn't know/check
+ * if the read and write are of the same table in "insert ... select ....". Since DbTxnManager
+ * uses Read/WriteEntity objects to decide which locks to acquire, we get more concurrency if we
+ * have change the table WriteEntity to a set of partition WriteEntity objects based on
+ * ReadEntity objects computed for this table.
+ */
+ protected void updateOutputs(Table targetTable) {
+ markReadEntityForUpdate();
+
+ if (targetTable.isPartitioned()) {
+ List<ReadEntity> partitionsRead = getRestrictedPartitionSet(targetTable);
+ if (!partitionsRead.isEmpty()) {
+ // if there is WriteEntity with WriteType=UPDATE/DELETE for target table, replace it with
+ // WriteEntity for each partition
+ List<WriteEntity> toRemove = new ArrayList<>();
+ for (WriteEntity we : outputs) {
+ WriteEntity.WriteType wt = we.getWriteType();
+ if (isTargetTable(we, targetTable) &&
+ (wt == WriteEntity.WriteType.UPDATE || wt == WriteEntity.WriteType.DELETE)) {
+ // The assumption here is that SemanticAnalyzer will will generate ReadEntity for each
+ // partition that exists and is matched by the WHERE clause (which may be all of them).
+ // Since we don't allow updating the value of a partition column, we know that we always
+ // write the same (or fewer) partitions than we read. Still, the write is a Dynamic
+ // Partition write - see HIVE-15032.
+ toRemove.add(we);
+ }
+ }
+ outputs.removeAll(toRemove);
+ // TODO: why is this like that?
+ for (ReadEntity re : partitionsRead) {
+ for (WriteEntity original : toRemove) {
+ //since we may have both Update and Delete branches, Auth needs to know
+ WriteEntity we = new WriteEntity(re.getPartition(), original.getWriteType());
+ we.setDynamicPartitionWrite(original.isDynamicPartitionWrite());
+ outputs.add(we);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * If the optimizer has determined that it only has to read some of the partitions of the
+ * target table to satisfy the query, then we know that the write side of update/delete
+ * (and update/delete parts of merge)
+ * can only write (at most) that set of partitions (since we currently don't allow updating
+ * partition (or bucket) columns). So we want to replace the table level
+ * WriteEntity in the outputs with WriteEntity for each of these partitions
+ * ToDo: see if this should be moved to SemanticAnalyzer itself since it applies to any
+ * insert which does a select against the same table. Then SemanticAnalyzer would also
+ * be able to not use DP for the Insert...
+ *
+ * Note that the Insert of Merge may be creating new partitions and writing to partitions
+ * which were not read (WHEN NOT MATCHED...). WriteEntity for that should be created
+ * in MoveTask (or some other task after the query is complete).
+ */
+ private List<ReadEntity> getRestrictedPartitionSet(Table targetTable) {
+ List<ReadEntity> partitionsRead = new ArrayList<>();
+ for (ReadEntity re : inputs) {
+ if (re.isFromTopLevelQuery && re.getType() == Entity.Type.PARTITION && isTargetTable(re, targetTable)) {
+ partitionsRead.add(re);
+ }
+ }
+ return partitionsRead;
+ }
+
+ /**
+ * Does this Entity belong to target table (partition).
+ */
+ private boolean isTargetTable(Entity entity, Table targetTable) {
+ //todo: https://issues.apache.org/jira/browse/HIVE-15048
+ /**
+ * is this the right way to compare? Should it just compare paths?
+ * equals() impl looks heavy weight
+ */
+ return targetTable.equals(entity.getTable());
+ }
+
+ /**
+ * Returns the table name to use in the generated query preserving original quotes/escapes if any.
+ * @see #getFullTableNameForSQL(ASTNode)
+ */
+ protected String getSimpleTableName(ASTNode n) throws SemanticException {
+ return HiveUtils.unparseIdentifier(getSimpleTableNameBase(n), this.conf);
+ }
+
+ protected String getSimpleTableNameBase(ASTNode n) throws SemanticException {
+ switch (n.getType()) {
+ case HiveParser.TOK_TABREF:
+ int aliasIndex = findTabRefIdxs(n)[0];
+ if (aliasIndex != 0) {
+ return n.getChild(aliasIndex).getText(); //the alias
+ }
+ return getSimpleTableNameBase((ASTNode) n.getChild(0));
+ case HiveParser.TOK_TABNAME:
+ if (n.getChildCount() == 2) {
+ //db.table -> return table
+ return n.getChild(1).getText();
+ }
+ return n.getChild(0).getText();
+ case HiveParser.TOK_SUBQUERY:
+ return n.getChild(1).getText(); //the alias
+ default:
+ throw raiseWrongType("TOK_TABREF|TOK_TABNAME|TOK_SUBQUERY", n);
+ }
+ }
+
+ protected static final class ReparseResult {
+ final ASTNode rewrittenTree;
+ final Context rewrittenCtx;
+ ReparseResult(ASTNode n, Context c) {
+ rewrittenTree = n;
+ rewrittenCtx = c;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/dcc89501/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
index 088b5cf..51a6b2a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
@@ -216,8 +216,8 @@ public final class SemanticAnalyzerFactory {
case HiveParser.TOK_LOAD:
return new LoadSemanticAnalyzer(queryState);
case HiveParser.TOK_EXPORT:
- if (UpdateDeleteSemanticAnalyzer.isAcidExport(tree)) {
- return new UpdateDeleteSemanticAnalyzer(queryState);
+ if (AcidExportSemanticAnalyzer.isAcidExport(tree)) {
+ return new AcidExportSemanticAnalyzer(queryState);
}
return new ExportSemanticAnalyzer(queryState);
case HiveParser.TOK_IMPORT:
@@ -368,9 +368,11 @@ public final class SemanticAnalyzerFactory {
case HiveParser.TOK_UPDATE_TABLE:
case HiveParser.TOK_DELETE_FROM:
- case HiveParser.TOK_MERGE:
return new UpdateDeleteSemanticAnalyzer(queryState);
+ case HiveParser.TOK_MERGE:
+ return new MergeSemanticAnalyzer(queryState);
+
case HiveParser.TOK_START_TRANSACTION:
case HiveParser.TOK_COMMIT:
case HiveParser.TOK_ROLLBACK: