You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2019/01/09 23:15:25 UTC
[2/2] hive git commit: HIVE-20919 Break up
UpdateDeleteSemanticAnalyzer (Miklos Gergely via Eugene Koifman)
HIVE-20919 Break up UpdateDeleteSemanticAnalyzer (Miklos Gergely via Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/dcc89501
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/dcc89501
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/dcc89501
Branch: refs/heads/master
Commit: dcc8950164db00adac982d6764bbd8fe31e6897d
Parents: 4d03e31
Author: Miklos Gergely <mg...@hortonworks.com>
Authored: Wed Jan 9 15:15:16 2019 -0800
Committer: Eugene Koifman <ek...@apache.org>
Committed: Wed Jan 9 15:15:16 2019 -0800
----------------------------------------------------------------------
.../ql/parse/AcidExportSemanticAnalyzer.java | 299 ++++
.../hive/ql/parse/MergeSemanticAnalyzer.java | 760 ++++++++++
.../hive/ql/parse/RewriteSemanticAnalyzer.java | 451 ++++++
.../hive/ql/parse/SemanticAnalyzerFactory.java | 8 +-
.../ql/parse/UpdateDeleteSemanticAnalyzer.java | 1427 +-----------------
.../hadoop/hive/ql/plan/AlterTableDesc.java | 6 +-
.../apache/hadoop/hive/ql/plan/ExportWork.java | 2 +-
7 files changed, 1563 insertions(+), 1390 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/dcc89501/ql/src/java/org/apache/hadoop/hive/ql/parse/AcidExportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/AcidExportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/AcidExportSemanticAnalyzer.java
new file mode 100644
index 0000000..41e3754
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/AcidExportSemanticAnalyzer.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.antlr.runtime.tree.Tree;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.DDLTask;
+import org.apache.hadoop.hive.ql.exec.StatsTask;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
+import org.apache.hadoop.hive.ql.plan.CreateTableLikeDesc;
+import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.DropTableDesc;
+import org.apache.hadoop.hive.ql.plan.ExportWork;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+/**
+ * A subclass of the {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer} that just handles
+ * acid export statements. It works by rewriting the acid export into insert statements into a temporary table,
+ * and then export it from there.
+ */
+public class AcidExportSemanticAnalyzer extends RewriteSemanticAnalyzer {
+ AcidExportSemanticAnalyzer(QueryState queryState) throws SemanticException {
+ super(queryState);
+ }
+
+ protected void analyze(ASTNode tree) throws SemanticException {
+ if (tree.getToken().getType() != HiveParser.TOK_EXPORT) {
+ throw new RuntimeException("Asked to parse token " + tree.getName() + " in " +
+ "AcidExportSemanticAnalyzer");
+ }
+ analyzeAcidExport(tree);
+ }
+
+ /**
+ * Exporting an Acid table is more complicated than a flat table. It may contains delete events,
+ * which can only be interpreted properly withing the context of the table/metastore where they
+ * were generated. It may also contain insert events that belong to transactions that aborted
+ * where the same constraints apply.
+ * In order to make the export artifact free of these constraints, the export does a
+ * insert into tmpTable select * from <export table> to filter/apply the events in current
+ * context and then export the tmpTable. This export artifact can now be imported into any
+ * table on any cluster (subject to schema checks etc).
+ * See {@link #analyzeAcidExport(ASTNode)}
+ * @param tree Export statement
+ * @return true if exporting an Acid table.
+ */
+ public static boolean isAcidExport(ASTNode tree) throws SemanticException {
+ assert tree != null && tree.getToken() != null && tree.getToken().getType() == HiveParser.TOK_EXPORT;
+ Tree tokTab = tree.getChild(0);
+ assert tokTab != null && tokTab.getType() == HiveParser.TOK_TAB;
+ Table tableHandle = null;
+ try {
+ tableHandle = getTable((ASTNode) tokTab.getChild(0), Hive.get(), false);
+ } catch(HiveException ex) {
+ throw new SemanticException(ex);
+ }
+
+ //tableHandle can be null if table doesn't exist
+ return tableHandle != null && AcidUtils.isFullAcidTable(tableHandle);
+ }
+ private static String getTmptTableNameForExport(Table exportTable) {
+ String tmpTableDb = exportTable.getDbName();
+ String tmpTableName = exportTable.getTableName() + "_" + UUID.randomUUID().toString().replace('-', '_');
+ return Warehouse.getQualifiedName(tmpTableDb, tmpTableName);
+ }
+
+ /**
+ * See {@link #isAcidExport(ASTNode)}
+ * 1. create the temp table T
+ * 2. compile 'insert into T select * from acidTable'
+ * 3. compile 'export acidTable' (acidTable will be replaced with T during execution)
+ * 4. create task to drop T
+ *
+ * Using a true temp (session level) table means it should not affect replication and the table
+ * is not visible outside the Session that created for security
+ */
+ private void analyzeAcidExport(ASTNode ast) throws SemanticException {
+ assert ast != null && ast.getToken() != null && ast.getToken().getType() == HiveParser.TOK_EXPORT;
+ ASTNode tableTree = (ASTNode)ast.getChild(0);
+ assert tableTree != null && tableTree.getType() == HiveParser.TOK_TAB;
+ ASTNode tokRefOrNameExportTable = (ASTNode) tableTree.getChild(0);
+ Table exportTable = getTargetTable(tokRefOrNameExportTable);
+ assert AcidUtils.isFullAcidTable(exportTable);
+
+ //need to create the table "manually" rather than creating a task since it has to exist to
+ // compile the insert into T...
+ String newTableName = getTmptTableNameForExport(exportTable); //this is db.table
+ Map<String, String> tblProps = new HashMap<>();
+ tblProps.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.FALSE.toString());
+ String location;
+
+ // for temporary tables we set the location to something in the session's scratch dir
+ // it has the same life cycle as the tmp table
+ try {
+ // Generate a unique ID for temp table path.
+ // This path will be fixed for the life of the temp table.
+ Path path = new Path(SessionState.getTempTableSpace(conf), UUID.randomUUID().toString());
+ path = Warehouse.getDnsPath(path, conf);
+ location = path.toString();
+ } catch (MetaException err) {
+ throw new SemanticException("Error while generating temp table path:", err);
+ }
+
+ CreateTableLikeDesc ctlt = new CreateTableLikeDesc(newTableName,
+ false, true, null,
+ null, location, null, null,
+ tblProps,
+ true, //important so we get an exception on name collision
+ Warehouse.getQualifiedName(exportTable.getTTable()), false);
+ Table newTable;
+ try {
+ ReadEntity dbForTmpTable = new ReadEntity(db.getDatabase(exportTable.getDbName()));
+ inputs.add(dbForTmpTable); //so the plan knows we are 'reading' this db - locks, security...
+ DDLTask createTableTask = (DDLTask) TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), ctlt), conf);
+ createTableTask.setConf(conf); //above get() doesn't set it
+ createTableTask.execute(new DriverContext(new Context(conf)));
+ newTable = db.getTable(newTableName);
+ } catch(IOException|HiveException ex) {
+ throw new SemanticException(ex);
+ }
+
+ //now generate insert statement
+ //insert into newTableName select * from ts <where partition spec>
+ StringBuilder rewrittenQueryStr = generateExportQuery(newTable.getPartCols(), tokRefOrNameExportTable, tableTree,
+ newTableName);
+ ReparseResult rr = parseRewrittenQuery(rewrittenQueryStr, ctx.getCmd());
+ Context rewrittenCtx = rr.rewrittenCtx;
+ rewrittenCtx.setIsUpdateDeleteMerge(false); //it's set in parseRewrittenQuery()
+ ASTNode rewrittenTree = rr.rewrittenTree;
+ try {
+ useSuper = true;
+ //newTable has to exist at this point to compile
+ super.analyze(rewrittenTree, rewrittenCtx);
+ } finally {
+ useSuper = false;
+ }
+ //now we have the rootTasks set up for Insert ... Select
+ removeStatsTasks(rootTasks);
+ //now make an ExportTask from temp table
+ /*analyzeExport() creates TableSpec which in turn tries to build
+ "public List<Partition> partitions" by looking in the metastore to find Partitions matching
+ the partition spec in the Export command. These of course don't exist yet since we've not
+ ran the insert stmt yet!!!!!!!
+ */
+ Task<ExportWork> exportTask = ExportSemanticAnalyzer.analyzeExport(ast, newTableName, db, conf, inputs, outputs);
+
+ // Add an alter table task to set transactional props
+ // do it after populating temp table so that it's written as non-transactional table but
+ // update props before export so that export archive metadata has these props. This way when
+ // IMPORT is done for this archive and target table doesn't exist, it will be created as Acid.
+ AlterTableDesc alterTblDesc = new AlterTableDesc(AlterTableDesc.AlterTableTypes.ADDPROPS);
+ Map<String, String> mapProps = new HashMap<>();
+ mapProps.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.TRUE.toString());
+ alterTblDesc.setProps(mapProps);
+ alterTblDesc.setOldName(newTableName);
+ addExportTask(rootTasks, exportTask, TaskFactory.get(new DDLWork(getInputs(), getOutputs(), alterTblDesc)));
+
+ // Now make a task to drop temp table
+ // {@link DDLSemanticAnalyzer#analyzeDropTable(ASTNode ast, TableType expectedType)
+ ReplicationSpec replicationSpec = new ReplicationSpec();
+ DropTableDesc dropTblDesc = new DropTableDesc(newTableName, TableType.MANAGED_TABLE, false, true, replicationSpec);
+ Task<DDLWork> dropTask = TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), dropTblDesc), conf);
+ exportTask.addDependentTask(dropTask);
+ markReadEntityForUpdate();
+ if (ctx.isExplainPlan()) {
+ try {
+ //so that "explain" doesn't "leak" tmp tables
+ // TODO: catalog
+ db.dropTable(newTable.getDbName(), newTable.getTableName(), true, true, true);
+ } catch(HiveException ex) {
+ LOG.warn("Unable to drop " + newTableName + " due to: " + ex.getMessage(), ex);
+ }
+ }
+ }
+
+ /**
+ * Generate
+ * insert into newTableName select * from ts <where partition spec>
+ * for EXPORT command.
+ */
+ private StringBuilder generateExportQuery(List<FieldSchema> partCols, ASTNode tokRefOrNameExportTable,
+ ASTNode tableTree, String newTableName) throws SemanticException {
+ StringBuilder rewrittenQueryStr = new StringBuilder("insert into ").append(newTableName);
+ addPartitionColsToInsert(partCols, rewrittenQueryStr);
+ rewrittenQueryStr.append(" select * from ").append(getFullTableNameForSQL(tokRefOrNameExportTable));
+ //builds partition spec so we can build suitable WHERE clause
+ TableSpec exportTableSpec = new TableSpec(db, conf, tableTree, false, true);
+ if (exportTableSpec.getPartSpec() != null) {
+ StringBuilder whereClause = null;
+ int partColsIdx = -1; //keep track of corresponding col in partCols
+ for (Map.Entry<String, String> ent : exportTableSpec.getPartSpec().entrySet()) {
+ partColsIdx++;
+ if (ent.getValue() == null) {
+ continue; //partial spec
+ }
+ if (whereClause == null) {
+ whereClause = new StringBuilder(" WHERE ");
+ }
+ if (whereClause.length() > " WHERE ".length()) {
+ whereClause.append(" AND ");
+ }
+ whereClause.append(HiveUtils.unparseIdentifier(ent.getKey(), conf))
+ .append(" = ").append(genPartValueString(partCols.get(partColsIdx).getType(), ent.getValue()));
+ }
+ if (whereClause != null) {
+ rewrittenQueryStr.append(whereClause);
+ }
+ }
+ return rewrittenQueryStr;
+ }
+
+ /**
+ * Makes the exportTask run after all other tasks of the "insert into T ..." are done.
+ */
+ private void addExportTask(List<Task<?>> rootTasks,
+ Task<ExportWork> exportTask, Task<DDLWork> alterTable) {
+ for (Task<? extends Serializable> t : rootTasks) {
+ if (t.getNumChild() <= 0) {
+ //todo: ConditionalTask#addDependentTask(Task) doesn't do the right thing: HIVE-18978
+ t.addDependentTask(alterTable);
+ //this is a leaf so add exportTask to follow it
+ alterTable.addDependentTask(exportTask);
+ } else {
+ addExportTask(t.getDependentTasks(), exportTask, alterTable);
+ }
+ }
+ }
+
+ private void removeStatsTasks(List<Task<?>> rootTasks) {
+ List<Task<?>> statsTasks = findStatsTasks(rootTasks, null);
+ if (statsTasks == null) {
+ return;
+ }
+ for (Task<?> statsTask : statsTasks) {
+ if (statsTask.getParentTasks() == null) {
+ continue; //should never happen
+ }
+ for (Task<?> t : new ArrayList<>(statsTask.getParentTasks())) {
+ t.removeDependentTask(statsTask);
+ }
+ }
+ }
+
+ private List<Task<?>> findStatsTasks(
+ List<Task<?>> rootTasks, List<Task<?>> statsTasks) {
+ for (Task<? extends Serializable> t : rootTasks) {
+ if (t instanceof StatsTask) {
+ if (statsTasks == null) {
+ statsTasks = new ArrayList<>();
+ }
+ statsTasks.add(t);
+ }
+ if (t.getDependentTasks() != null) {
+ statsTasks = findStatsTasks(t.getDependentTasks(), statsTasks);
+ }
+ }
+ return statsTasks;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/dcc89501/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java
new file mode 100644
index 0000000..44f7b43
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java
@@ -0,0 +1,760 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.antlr.runtime.TokenRewriteStream;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+
+/**
+ * A subclass of the {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer} that just handles
+ * merge statements. It works by rewriting the updates and deletes into insert statements (since
+ * they are actually inserts) and then doing some patch up to make them work as merges instead.
+ */
+public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
+ MergeSemanticAnalyzer(QueryState queryState) throws SemanticException {
+ super(queryState);
+ }
+
+ @Override
+ public void analyze(ASTNode tree) throws SemanticException {
+ if (tree.getToken().getType() != HiveParser.TOK_MERGE) {
+ throw new RuntimeException("Asked to parse token " + tree.getName() + " in " +
+ "MergeSemanticAnalyzer");
+ }
+ analyzeMerge(tree);
+ }
+
+ private static final String INDENT = " ";
+
+ private IdentifierQuoter quotedIdenfierHelper;
+
+ /**
+ * This allows us to take an arbitrary ASTNode and turn it back into SQL that produced it.
+ * Since HiveLexer.g is written such that it strips away any ` (back ticks) around
+ * quoted identifiers we need to add those back to generated SQL.
+ * Additionally, the parser only produces tokens of type Identifier and never
+ * QuotedIdentifier (HIVE-6013). So here we just quote all identifiers.
+ * (') around String literals are retained w/o issues
+ */
+ private static class IdentifierQuoter {
+ private final TokenRewriteStream trs;
+ private final IdentityHashMap<ASTNode, ASTNode> visitedNodes = new IdentityHashMap<>();
+
+ IdentifierQuoter(TokenRewriteStream trs) {
+ this.trs = trs;
+ if (trs == null) {
+ throw new IllegalArgumentException("Must have a TokenRewriteStream");
+ }
+ }
+
+ private void visit(ASTNode n) {
+ if (n.getType() == HiveParser.Identifier) {
+ if (visitedNodes.containsKey(n)) {
+ /**
+ * Since we are modifying the stream, it's not idempotent. Ideally, the caller would take
+ * care to only quote Identifiers in each subtree once, but this makes it safe
+ */
+ return;
+ }
+ visitedNodes.put(n, n);
+ trs.insertBefore(n.getToken(), "`");
+ trs.insertAfter(n.getToken(), "`");
+ }
+ if (n.getChildCount() <= 0) {
+ return;
+ }
+ for (Node c : n.getChildren()) {
+ visit((ASTNode)c);
+ }
+ }
+ }
+
+ /**
+ * This allows us to take an arbitrary ASTNode and turn it back into SQL that produced it without
+ * needing to understand what it is (except for QuotedIdentifiers).
+ */
+ private String getMatchedText(ASTNode n) {
+ quotedIdenfierHelper.visit(n);
+ return ctx.getTokenRewriteStream().toString(n.getTokenStartIndex(),
+ n.getTokenStopIndex() + 1).trim();
+ }
+
+ /**
+ * Here we take a Merge statement AST and generate a semantically equivalent multi-insert
+ * statement to execute. Each Insert leg represents a single WHEN clause. As much as possible,
+ * the new SQL statement is made to look like the input SQL statement so that it's easier to map
+ * Query Compiler errors from generated SQL to original one this way.
+ * The generated SQL is a complete representation of the original input for the same reason.
+ * In many places SemanticAnalyzer throws exceptions that contain (line, position) coordinates.
+ * If generated SQL doesn't have everything and is patched up later, these coordinates point to
+ * the wrong place.
+ *
+ * @throws SemanticException
+ */
+ private void analyzeMerge(ASTNode tree) throws SemanticException {
+ quotedIdenfierHelper = new IdentifierQuoter(ctx.getTokenRewriteStream());
+ /*
+ * See org.apache.hadoop.hive.ql.parse.TestMergeStatement for some examples of the merge AST
+ For example, given:
+ MERGE INTO acidTbl USING nonAcidPart2 source ON acidTbl.a = source.a2
+ WHEN MATCHED THEN UPDATE SET b = source.b2
+ WHEN NOT MATCHED THEN INSERT VALUES (source.a2, source.b2)
+
+ We get AST like this:
+ "(tok_merge " +
+ "(tok_tabname acidtbl) (tok_tabref (tok_tabname nonacidpart2) source) " +
+ "(= (. (tok_table_or_col acidtbl) a) (. (tok_table_or_col source) a2)) " +
+ "(tok_matched " +
+ "(tok_update " +
+ "(tok_set_columns_clause (= (tok_table_or_col b) (. (tok_table_or_col source) b2))))) " +
+ "(tok_not_matched " +
+ "tok_insert " +
+ "(tok_value_row (. (tok_table_or_col source) a2) (. (tok_table_or_col source) b2))))");
+
+ And need to produce a multi-insert like this to execute:
+ FROM acidTbl RIGHT OUTER JOIN nonAcidPart2 ON acidTbl.a = source.a2
+ INSERT INTO TABLE acidTbl SELECT nonAcidPart2.a2, nonAcidPart2.b2 WHERE acidTbl.a IS null
+ INSERT INTO TABLE acidTbl SELECT target.ROW__ID, nonAcidPart2.a2, nonAcidPart2.b2
+ WHERE nonAcidPart2.a2=acidTbl.a SORT BY acidTbl.ROW__ID
+ */
+ /*todo: we need some sort of validation phase over original AST to make things user friendly; for example, if
+ original command refers to a column that doesn't exist, this will be caught when processing the rewritten query but
+ the errors will point at locations that the user can't map to anything
+ - VALUES clause must have the same number of values as target table (including partition cols). Part cols go last
+ in Select clause of Insert as Select
+ todo: do we care to preserve comments in original SQL?
+ todo: check if identifiers are propertly escaped/quoted in the generated SQL - it's currently inconsistent
+ Look at UnparseTranslator.addIdentifierTranslation() - it does unescape + unparse...
+ todo: consider "WHEN NOT MATCHED BY SOURCE THEN UPDATE SET TargetTable.Col1 = SourceTable.Col1 "; what happens when
+ source is empty? This should be a runtime error - maybe not the outer side of ROJ is empty => the join produces 0
+ rows. If supporting WHEN NOT MATCHED BY SOURCE, then this should be a runtime error
+ */
+ ASTNode target = (ASTNode)tree.getChild(0);
+ ASTNode source = (ASTNode)tree.getChild(1);
+ String targetName = getSimpleTableName(target);
+ String sourceName = getSimpleTableName(source);
+ ASTNode onClause = (ASTNode) tree.getChild(2);
+ String onClauseAsText = getMatchedText(onClause);
+
+ int whenClauseBegins = 3;
+ boolean hasHint = false;
+ // query hint
+ ASTNode qHint = (ASTNode) tree.getChild(3);
+ if (qHint.getType() == HiveParser.QUERY_HINT) {
+ hasHint = true;
+ whenClauseBegins++;
+ }
+ Table targetTable = getTargetTable(target);
+ validateTargetTable(targetTable);
+ List<ASTNode> whenClauses = findWhenClauses(tree, whenClauseBegins);
+
+ StringBuilder rewrittenQueryStr = new StringBuilder("FROM\n");
+
+ rewrittenQueryStr.append(INDENT).append(getFullTableNameForSQL(target));
+ if (isAliased(target)) {
+ rewrittenQueryStr.append(" ").append(targetName);
+ }
+ rewrittenQueryStr.append('\n');
+ rewrittenQueryStr.append(INDENT).append(chooseJoinType(whenClauses)).append("\n");
+ if (source.getType() == HiveParser.TOK_SUBQUERY) {
+ //this includes the mandatory alias
+ rewrittenQueryStr.append(INDENT).append(getMatchedText(source));
+ } else {
+ rewrittenQueryStr.append(INDENT).append(getFullTableNameForSQL(source));
+ if (isAliased(source)) {
+ rewrittenQueryStr.append(" ").append(sourceName);
+ }
+ }
+ rewrittenQueryStr.append('\n');
+ rewrittenQueryStr.append(INDENT).append("ON ").append(onClauseAsText).append('\n');
+
+ // Add the hint if any
+ String hintStr = null;
+ if (hasHint) {
+ hintStr = " /*+ " + qHint.getText() + " */ ";
+ }
+
+ /**
+ * We allow at most 2 WHEN MATCHED clause, in which case 1 must be Update the other Delete
+ * If we have both update and delete, the 1st one (in SQL code) must have "AND <extra predicate>"
+ * so that the 2nd can ensure not to process the same rows.
+ * Update and Delete may be in any order. (Insert is always last)
+ */
+ String extraPredicate = null;
+ int numWhenMatchedUpdateClauses = 0, numWhenMatchedDeleteClauses = 0;
+ int numInsertClauses = 0;
+ boolean hintProcessed = false;
+ for (ASTNode whenClause : whenClauses) {
+ switch (getWhenClauseOperation(whenClause).getType()) {
+ case HiveParser.TOK_INSERT:
+ numInsertClauses++;
+ handleInsert(whenClause, rewrittenQueryStr, target, onClause,
+ targetTable, targetName, onClauseAsText, hintProcessed ? null : hintStr);
+ hintProcessed = true;
+ break;
+ case HiveParser.TOK_UPDATE:
+ numWhenMatchedUpdateClauses++;
+ String s = handleUpdate(whenClause, rewrittenQueryStr, target,
+ onClauseAsText, targetTable, extraPredicate, hintProcessed ? null : hintStr);
+ hintProcessed = true;
+ if (numWhenMatchedUpdateClauses + numWhenMatchedDeleteClauses == 1) {
+ extraPredicate = s; //i.e. it's the 1st WHEN MATCHED
+ }
+ break;
+ case HiveParser.TOK_DELETE:
+ numWhenMatchedDeleteClauses++;
+ String s1 = handleDelete(whenClause, rewrittenQueryStr, target,
+ onClauseAsText, targetTable, extraPredicate, hintProcessed ? null : hintStr);
+ hintProcessed = true;
+ if (numWhenMatchedUpdateClauses + numWhenMatchedDeleteClauses == 1) {
+ extraPredicate = s1; //i.e. it's the 1st WHEN MATCHED
+ }
+ break;
+ default:
+ throw new IllegalStateException("Unexpected WHEN clause type: " + whenClause.getType() +
+ addParseInfo(whenClause));
+ }
+ if (numWhenMatchedDeleteClauses > 1) {
+ throw new SemanticException(ErrorMsg.MERGE_TOO_MANY_DELETE, ctx.getCmd());
+ }
+ if (numWhenMatchedUpdateClauses > 1) {
+ throw new SemanticException(ErrorMsg.MERGE_TOO_MANY_UPDATE, ctx.getCmd());
+ }
+ assert numInsertClauses < 2: "too many Insert clauses";
+ }
+ if (numWhenMatchedDeleteClauses + numWhenMatchedUpdateClauses == 2 && extraPredicate == null) {
+ throw new SemanticException(ErrorMsg.MERGE_PREDIACTE_REQUIRED, ctx.getCmd());
+ }
+
+ boolean validating = handleCardinalityViolation(rewrittenQueryStr, target, onClauseAsText, targetTable,
+ numWhenMatchedDeleteClauses == 0 && numWhenMatchedUpdateClauses == 0);
+ ReparseResult rr = parseRewrittenQuery(rewrittenQueryStr, ctx.getCmd());
+ Context rewrittenCtx = rr.rewrittenCtx;
+ ASTNode rewrittenTree = rr.rewrittenTree;
+ rewrittenCtx.setOperation(Context.Operation.MERGE);
+
+ //set dest name mapping on new context; 1st chid is TOK_FROM
+ for (int insClauseIdx = 1, whenClauseIdx = 0;
+ insClauseIdx < rewrittenTree.getChildCount() - (validating ? 1 : 0/*skip cardinality violation clause*/);
+ insClauseIdx++, whenClauseIdx++) {
+ //we've added Insert clauses in order or WHEN items in whenClauses
+ switch (getWhenClauseOperation(whenClauses.get(whenClauseIdx)).getType()) {
+ case HiveParser.TOK_INSERT:
+ rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.INSERT);
+ break;
+ case HiveParser.TOK_UPDATE:
+ rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.UPDATE);
+ break;
+ case HiveParser.TOK_DELETE:
+ rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.DELETE);
+ break;
+ default:
+ assert false;
+ }
+ }
+ if (validating) {
+ //here means the last branch of the multi-insert is Cardinality Validation
+ rewrittenCtx.addDestNamePrefix(rewrittenTree.getChildCount() - 1, Context.DestClausePrefix.INSERT);
+ }
+
+ try {
+ useSuper = true;
+ super.analyze(rewrittenTree, rewrittenCtx);
+ } finally {
+ useSuper = false;
+ }
+ updateOutputs(targetTable);
+ }
+
+ /**
+ * If there is no WHEN NOT MATCHED THEN INSERT, we don't outer join.
+ */
+ private String chooseJoinType(List<ASTNode> whenClauses) {
+ for (ASTNode whenClause : whenClauses) {
+ if (getWhenClauseOperation(whenClause).getType() == HiveParser.TOK_INSERT) {
+ return "RIGHT OUTER JOIN";
+ }
+ }
+ return "INNER JOIN";
+ }
+
+ /**
+ * Per SQL Spec ISO/IEC 9075-2:2011(E) Section 14.2 under "General Rules" Item 6/Subitem a/Subitem 2/Subitem B,
+ * an error should be raised if > 1 row of "source" matches the same row in "target".
+ * This should not affect the runtime of the query as it's running in parallel with other
+ * branches of the multi-insert. It won't actually write any data to merge_tmp_table since the
+ * cardinality_violation() UDF throws an error whenever it's called killing the query
+ * @return true if another Insert clause was added
+ */
+ private boolean handleCardinalityViolation(StringBuilder rewrittenQueryStr, ASTNode target,
+ String onClauseAsString, Table targetTable, boolean onlyHaveWhenNotMatchedClause)
+ throws SemanticException {
+ if (!conf.getBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK)) {
+ LOG.info("Merge statement cardinality violation check is disabled: " +
+ HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK.varname);
+ return false;
+ }
+ if (onlyHaveWhenNotMatchedClause) {
+ //if no update or delete in Merge, there is no need to to do cardinality check
+ return false;
+ }
+ //this is a tmp table and thus Session scoped and acid requires SQL statement to be serial in a
+ // given session, i.e. the name can be fixed across all invocations
+ String tableName = "merge_tmp_table";
+ rewrittenQueryStr.append("\nINSERT INTO ").append(tableName)
+ .append("\n SELECT cardinality_violation(")
+ .append(getSimpleTableName(target)).append(".ROW__ID");
+ addPartitionColsToSelect(targetTable.getPartCols(), rewrittenQueryStr, target);
+
+ rewrittenQueryStr.append(")\n WHERE ").append(onClauseAsString)
+ .append(" GROUP BY ").append(getSimpleTableName(target)).append(".ROW__ID");
+
+ addPartitionColsToSelect(targetTable.getPartCols(), rewrittenQueryStr, target);
+
+ rewrittenQueryStr.append(" HAVING count(*) > 1");
+ //say table T has partition p, we are generating
+ //select cardinality_violation(ROW_ID, p) WHERE ... GROUP BY ROW__ID, p
+ //the Group By args are passed to cardinality_violation to add the violating value to the error msg
+ try {
+ if (null == db.getTable(tableName, false)) {
+ StorageFormat format = new StorageFormat(conf);
+ format.processStorageFormat("TextFile");
+ Table table = db.newTable(tableName);
+ table.setSerializationLib(format.getSerde());
+ List<FieldSchema> fields = new ArrayList<FieldSchema>();
+ fields.add(new FieldSchema("val", "int", null));
+ table.setFields(fields);
+ table.setDataLocation(Warehouse.getDnsPath(new Path(SessionState.get().getTempTableSpace(),
+ tableName), conf));
+ table.getTTable().setTemporary(true);
+ table.setStoredAsSubDirectories(false);
+ table.setInputFormatClass(format.getInputFormat());
+ table.setOutputFormatClass(format.getOutputFormat());
+ db.createTable(table, true);
+ }
+ } catch(HiveException|MetaException e) {
+ throw new SemanticException(e.getMessage(), e);
+ }
+ return true;
+ }
+
+ /**
+ * @param onClauseAsString - because there is no clone() and we need to use in multiple places
+ * @param deleteExtraPredicate - see notes at caller
+ */
+ private String handleUpdate(ASTNode whenMatchedUpdateClause, StringBuilder rewrittenQueryStr, ASTNode target,
+ String onClauseAsString, Table targetTable, String deleteExtraPredicate, String hintStr)
+ throws SemanticException {
+ assert whenMatchedUpdateClause.getType() == HiveParser.TOK_MATCHED;
+ assert getWhenClauseOperation(whenMatchedUpdateClause).getType() == HiveParser.TOK_UPDATE;
+ String targetName = getSimpleTableName(target);
+ rewrittenQueryStr.append("INSERT INTO ").append(getFullTableNameForSQL(target));
+ addPartitionColsToInsert(targetTable.getPartCols(), rewrittenQueryStr);
+ rewrittenQueryStr.append(" -- update clause\n SELECT ");
+ if (hintStr != null) {
+ rewrittenQueryStr.append(hintStr);
+ }
+ rewrittenQueryStr.append(targetName).append(".ROW__ID");
+
+ ASTNode setClause = (ASTNode)getWhenClauseOperation(whenMatchedUpdateClause).getChild(0);
+ //columns being updated -> update expressions; "setRCols" (last param) is null because we use actual expressions
+ //before reparsing, i.e. they are known to SemanticAnalyzer logic
+ Map<String, ASTNode> setColsExprs = collectSetColumnsAndExpressions(setClause, null, targetTable);
+ //if target table has cols c1,c2,c3 and p1 partition col and we had "SET c2 = 5, c1 = current_date()" we want to end
+ //up with
+ //insert into target (p1) select current_date(), 5, c3, p1 where ....
+ //since we take the RHS of set exactly as it was in Input, we don't need to deal with quoting/escaping column/table
+ //names
+ List<FieldSchema> nonPartCols = targetTable.getCols();
+ for (FieldSchema fs : nonPartCols) {
+ rewrittenQueryStr.append(", ");
+ String name = fs.getName();
+ if (setColsExprs.containsKey(name)) {
+ String rhsExp = getMatchedText(setColsExprs.get(name));
+ //"set a=5, b=8" - rhsExp picks up the next char (e.g. ',') from the token stream
+ switch (rhsExp.charAt(rhsExp.length() - 1)) {
+ case ',':
+ case '\n':
+ rhsExp = rhsExp.substring(0, rhsExp.length() - 1);
+ break;
+ default:
+ //do nothing
+ }
+ rewrittenQueryStr.append(rhsExp);
+ } else {
+ rewrittenQueryStr.append(getSimpleTableName(target))
+ .append(".")
+ .append(HiveUtils.unparseIdentifier(name, this.conf));
+ }
+ }
+ addPartitionColsToSelect(targetTable.getPartCols(), rewrittenQueryStr, target);
+ rewrittenQueryStr.append("\n WHERE ").append(onClauseAsString);
+ String extraPredicate = getWhenClausePredicate(whenMatchedUpdateClause);
+ if (extraPredicate != null) {
+ //we have WHEN MATCHED AND <boolean expr> THEN DELETE
+ rewrittenQueryStr.append(" AND ").append(extraPredicate);
+ }
+ if (deleteExtraPredicate != null) {
+ rewrittenQueryStr.append(" AND NOT(").append(deleteExtraPredicate).append(")");
+ }
+ rewrittenQueryStr.append("\n SORT BY ");
+ rewrittenQueryStr.append(targetName).append(".ROW__ID \n");
+
+ setUpAccessControlInfoForUpdate(targetTable, setColsExprs);
+ //we don't deal with columns on RHS of SET expression since the whole expr is part of the
+ //rewritten SQL statement and is thus handled by SemanticAnalzyer. Nor do we have to
+ //figure which cols on RHS are from source and which from target
+
+ return extraPredicate;
+ }
+
+ /**
+ * @param onClauseAsString - because there is no clone() and we need to use in multiple places
+ * @param updateExtraPredicate - see notes at caller
+ */
+ private String handleDelete(ASTNode whenMatchedDeleteClause, StringBuilder rewrittenQueryStr, ASTNode target,
+ String onClauseAsString, Table targetTable, String updateExtraPredicate, String hintStr)
+ throws SemanticException {
+ assert whenMatchedDeleteClause.getType() == HiveParser.TOK_MATCHED;
+ assert getWhenClauseOperation(whenMatchedDeleteClause).getType() == HiveParser.TOK_DELETE;
+ List<FieldSchema> partCols = targetTable.getPartCols();
+ String targetName = getSimpleTableName(target);
+ rewrittenQueryStr.append("INSERT INTO ").append(getFullTableNameForSQL(target));
+ addPartitionColsToInsert(partCols, rewrittenQueryStr);
+
+ rewrittenQueryStr.append(" -- delete clause\n SELECT ");
+ if (hintStr != null) {
+ rewrittenQueryStr.append(hintStr);
+ }
+ rewrittenQueryStr.append(targetName).append(".ROW__ID ");
+ addPartitionColsToSelect(partCols, rewrittenQueryStr, target);
+ rewrittenQueryStr.append("\n WHERE ").append(onClauseAsString);
+ String extraPredicate = getWhenClausePredicate(whenMatchedDeleteClause);
+ if (extraPredicate != null) {
+ //we have WHEN MATCHED AND <boolean expr> THEN DELETE
+ rewrittenQueryStr.append(" AND ").append(extraPredicate);
+ }
+ if (updateExtraPredicate != null) {
+ rewrittenQueryStr.append(" AND NOT(").append(updateExtraPredicate).append(")");
+ }
+ rewrittenQueryStr.append("\n SORT BY ");
+ rewrittenQueryStr.append(targetName).append(".ROW__ID \n");
+ return extraPredicate;
+ }
+
+ private static String addParseInfo(ASTNode n) {
+ return " at " + ErrorMsg.renderPosition(n);
+ }
+
+ private boolean isAliased(ASTNode n) {
+ switch (n.getType()) {
+ case HiveParser.TOK_TABREF:
+ return findTabRefIdxs(n)[0] != 0;
+ case HiveParser.TOK_TABNAME:
+ return false;
+ case HiveParser.TOK_SUBQUERY:
+ assert n.getChildCount() > 1 : "Expected Derived Table to be aliased";
+ return true;
+ default:
+ throw raiseWrongType("TOK_TABREF|TOK_TABNAME", n);
+ }
+ }
+
+ /**
+ * Collect WHEN clauses from Merge statement AST.
+ */
+ private List<ASTNode> findWhenClauses(ASTNode tree, int start) throws SemanticException {
+ assert tree.getType() == HiveParser.TOK_MERGE;
+ List<ASTNode> whenClauses = new ArrayList<>();
+ for (int idx = start; idx < tree.getChildCount(); idx++) {
+ ASTNode whenClause = (ASTNode)tree.getChild(idx);
+ assert whenClause.getType() == HiveParser.TOK_MATCHED ||
+ whenClause.getType() == HiveParser.TOK_NOT_MATCHED :
+ "Unexpected node type found: " + whenClause.getType() + addParseInfo(whenClause);
+ whenClauses.add(whenClause);
+ }
+ if (whenClauses.size() <= 0) {
+ //Futureproofing: the parser will actually not allow this
+ throw new SemanticException("Must have at least 1 WHEN clause in MERGE statement");
+ }
+ return whenClauses;
+ }
+
+ private ASTNode getWhenClauseOperation(ASTNode whenClause) {
+ if (!(whenClause.getType() == HiveParser.TOK_MATCHED || whenClause.getType() == HiveParser.TOK_NOT_MATCHED)) {
+ throw raiseWrongType("Expected TOK_MATCHED|TOK_NOT_MATCHED", whenClause);
+ }
+ return (ASTNode) whenClause.getChild(0);
+ }
+
+ /**
+ * Returns the <boolean predicate> as in WHEN MATCHED AND <boolean predicate> THEN...
+ * @return may be null
+ */
+ private String getWhenClausePredicate(ASTNode whenClause) {
+ if (!(whenClause.getType() == HiveParser.TOK_MATCHED || whenClause.getType() == HiveParser.TOK_NOT_MATCHED)) {
+ throw raiseWrongType("Expected TOK_MATCHED|TOK_NOT_MATCHED", whenClause);
+ }
+ if (whenClause.getChildCount() == 2) {
+ return getMatchedText((ASTNode)whenClause.getChild(1));
+ }
+ return null;
+ }
+
+ /**
+ * Generates the Insert leg of the multi-insert SQL to represent WHEN NOT MATCHED THEN INSERT clause.
+ * @param targetTableNameInSourceQuery - simple name/alias
+ * @throws SemanticException
+ */
+ private void handleInsert(ASTNode whenNotMatchedClause, StringBuilder rewrittenQueryStr, ASTNode target,
+ ASTNode onClause, Table targetTable, String targetTableNameInSourceQuery, String onClauseAsString,
+ String hintStr) throws SemanticException {
+ ASTNode whenClauseOperation = getWhenClauseOperation(whenNotMatchedClause);
+ assert whenNotMatchedClause.getType() == HiveParser.TOK_NOT_MATCHED;
+ assert whenClauseOperation.getType() == HiveParser.TOK_INSERT;
+
+ // identify the node that contains the values to insert and the optional column list node
+ ArrayList<Node> children = whenClauseOperation.getChildren();
+ ASTNode valuesNode =
+ (ASTNode)children.stream().filter(n -> ((ASTNode)n).getType() == HiveParser.TOK_FUNCTION).findFirst().get();
+ ASTNode columnListNode =
+ (ASTNode)children.stream().filter(n -> ((ASTNode)n).getType() == HiveParser.TOK_TABCOLNAME).findFirst()
+ .orElse(null);
+
+ // if column list is specified, then it has to have the same number of elements as the values
+ // valuesNode has a child for struct, the rest are the columns
+ if (columnListNode != null && columnListNode.getChildCount() != (valuesNode.getChildCount() - 1)) {
+ throw new SemanticException(String.format("Column schema must have the same length as values (%d vs %d)",
+ columnListNode.getChildCount(), valuesNode.getChildCount() - 1));
+ }
+
+ rewrittenQueryStr.append("INSERT INTO ").append(getFullTableNameForSQL(target));
+ if (columnListNode != null) {
+ rewrittenQueryStr.append(' ').append(getMatchedText(columnListNode));
+ }
+ addPartitionColsToInsert(targetTable.getPartCols(), rewrittenQueryStr);
+
+ rewrittenQueryStr.append(" -- insert clause\n SELECT ");
+ if (hintStr != null) {
+ rewrittenQueryStr.append(hintStr);
+ }
+
+ OnClauseAnalyzer oca = new OnClauseAnalyzer(onClause, targetTable, targetTableNameInSourceQuery,
+ conf, onClauseAsString);
+ oca.analyze();
+
+ String valuesClause = getMatchedText(valuesNode);
+ valuesClause = valuesClause.substring(1, valuesClause.length() - 1); //strip '(' and ')'
+ valuesClause = replaceDefaultKeywordForMerge(valuesClause, targetTable, columnListNode);
+ rewrittenQueryStr.append(valuesClause).append("\n WHERE ").append(oca.getPredicate());
+
+ String extraPredicate = getWhenClausePredicate(whenNotMatchedClause);
+ if (extraPredicate != null) {
+ //we have WHEN NOT MATCHED AND <boolean expr> THEN INSERT
+ rewrittenQueryStr.append(" AND ")
+ .append(getMatchedText(((ASTNode)whenNotMatchedClause.getChild(1)))).append('\n');
+ }
+ }
+
+ private String replaceDefaultKeywordForMerge(String valueClause, Table table, ASTNode columnListNode)
+ throws SemanticException {
+ if (!valueClause.toLowerCase().contains("`default`")) {
+ return valueClause;
+ }
+
+ Map<String, String> colNameToDefaultConstraint = getColNameToDefaultValueMap(table);
+ String[] values = valueClause.trim().split(",");
+ String[] replacedValues = new String[values.length];
+
+ // the list of the column names may be set in the query
+ String[] columnNames = columnListNode == null ?
+ table.getAllCols().stream().map(f -> f.getName()).toArray(size -> new String[size]) :
+ columnListNode.getChildren().stream().map(n -> ((ASTNode)n).toString()).toArray(size -> new String[size]);
+
+ for (int i = 0; i < values.length; i++) {
+ if (values[i].trim().toLowerCase().equals("`default`")) {
+ replacedValues[i] = MapUtils.getString(colNameToDefaultConstraint, columnNames[i], "null");
+ } else {
+ replacedValues[i] = values[i];
+ }
+ }
+ return StringUtils.join(replacedValues, ',');
+ }
+
+ /**
+ * Suppose the input Merge statement has ON target.a = source.b and c = d. Assume, that 'c' is from
+ * target table and 'd' is from source expression. In order to properly
+ * generate the Insert for WHEN NOT MATCHED THEN INSERT, we need to make sure that the Where
+ * clause of this Insert contains "target.a is null and target.c is null" This ensures that this
+ * Insert leg does not receive any rows that are processed by Insert corresponding to
+ * WHEN MATCHED THEN ... clauses. (Implicit in this is a mini resolver that figures out if an
+ * unqualified column is part of the target table. We can get away with this simple logic because
+ * we know that target is always a table (as opposed to some derived table).
+ * The job of this class is to generate this predicate.
+ *
+ * Note that is this predicate cannot simply be NOT(on-clause-expr). IF on-clause-expr evaluates
+ * to Unknown, it will be treated as False in the WHEN MATCHED Inserts but NOT(Unknown) = Unknown,
+ * and so it will be False for WHEN NOT MATCHED Insert...
+ */
+ private static final class OnClauseAnalyzer {
+ private final ASTNode onClause;
+ private final Map<String, List<String>> table2column = new HashMap<>();
+ private final List<String> unresolvedColumns = new ArrayList<>();
+ private final List<FieldSchema> allTargetTableColumns = new ArrayList<>();
+ private final Set<String> tableNamesFound = new HashSet<>();
+ private final String targetTableNameInSourceQuery;
+ private final HiveConf conf;
+ private final String onClauseAsString;
+
+ /**
+ * @param targetTableNameInSourceQuery alias or simple name
+ */
+ OnClauseAnalyzer(ASTNode onClause, Table targetTable, String targetTableNameInSourceQuery,
+ HiveConf conf, String onClauseAsString) {
+ this.onClause = onClause;
+ allTargetTableColumns.addAll(targetTable.getCols());
+ allTargetTableColumns.addAll(targetTable.getPartCols());
+ this.targetTableNameInSourceQuery = unescapeIdentifier(targetTableNameInSourceQuery);
+ this.conf = conf;
+ this.onClauseAsString = onClauseAsString;
+ }
+
+ /**
+ * Finds all columns and groups by table ref (if there is one).
+ */
+ private void visit(ASTNode n) {
+ if (n.getType() == HiveParser.TOK_TABLE_OR_COL) {
+ ASTNode parent = (ASTNode) n.getParent();
+ if (parent != null && parent.getType() == HiveParser.DOT) {
+ //the ref must be a table, so look for column name as right child of DOT
+ if (parent.getParent() != null && parent.getParent().getType() == HiveParser.DOT) {
+ //I don't think this can happen... but just in case
+ throw new IllegalArgumentException("Found unexpected db.table.col reference in " + onClauseAsString);
+ }
+ addColumn2Table(n.getChild(0).getText(), parent.getChild(1).getText());
+ } else {
+ //must be just a column name
+ unresolvedColumns.add(n.getChild(0).getText());
+ }
+ }
+ if (n.getChildCount() == 0) {
+ return;
+ }
+ for (Node child : n.getChildren()) {
+ visit((ASTNode)child);
+ }
+ }
+
+ private void analyze() {
+ visit(onClause);
+ if (tableNamesFound.size() > 2) {
+ throw new IllegalArgumentException("Found > 2 table refs in ON clause. Found " +
+ tableNamesFound + " in " + onClauseAsString);
+ }
+ handleUnresolvedColumns();
+ if (tableNamesFound.size() > 2) {
+ throw new IllegalArgumentException("Found > 2 table refs in ON clause (incl unresolved). " +
+ "Found " + tableNamesFound + " in " + onClauseAsString);
+ }
+ }
+
+ /**
+ * Find those that belong to target table.
+ */
+ private void handleUnresolvedColumns() {
+ if (unresolvedColumns.isEmpty()) {
+ return;
+ }
+ for (String c : unresolvedColumns) {
+ for (FieldSchema fs : allTargetTableColumns) {
+ if (c.equalsIgnoreCase(fs.getName())) {
+ //c belongs to target table; strictly speaking there maybe an ambiguous ref but
+ //this will be caught later when multi-insert is parsed
+ addColumn2Table(targetTableNameInSourceQuery.toLowerCase(), c);
+ break;
+ }
+ }
+ }
+ }
+
+ private void addColumn2Table(String tableName, String columnName) {
+ tableName = tableName.toLowerCase(); //normalize name for mapping
+ tableNamesFound.add(tableName);
+ List<String> cols = table2column.get(tableName);
+ if (cols == null) {
+ cols = new ArrayList<>();
+ table2column.put(tableName, cols);
+ }
+ //we want to preserve 'columnName' as it was in original input query so that rewrite
+ //looks as much as possible like original query
+ cols.add(columnName);
+ }
+
+ /**
+ * Now generate the predicate for Where clause.
+ */
+ private String getPredicate() {
+ //normilize table name for mapping
+ List<String> targetCols = table2column.get(targetTableNameInSourceQuery.toLowerCase());
+ if (targetCols == null) {
+ /*e.g. ON source.t=1
+ * this is not strictly speaking invalid but it does ensure that all columns from target
+ * table are all NULL for every row. This would make any WHEN MATCHED clause invalid since
+ * we don't have a ROW__ID. The WHEN NOT MATCHED could be meaningful but it's just data from
+ * source satisfying source.t=1... not worth the effort to support this*/
+ throw new IllegalArgumentException(ErrorMsg.INVALID_TABLE_IN_ON_CLAUSE_OF_MERGE
+ .format(targetTableNameInSourceQuery, onClauseAsString));
+ }
+ StringBuilder sb = new StringBuilder();
+ for (String col : targetCols) {
+ if (sb.length() > 0) {
+ sb.append(" AND ");
+ }
+ //but preserve table name in SQL
+ sb.append(HiveUtils.unparseIdentifier(targetTableNameInSourceQuery, conf))
+ .append(".")
+ .append(HiveUtils.unparseIdentifier(col, conf))
+ .append(" IS NULL");
+ }
+ return sb.toString();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/dcc89501/ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java
new file mode 100644
index 0000000..6caac11
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.hooks.Entity;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A subclass of the {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer} that just handles
+ * update, delete and merge statements. It works by rewriting the updates and deletes into insert
+ * statements (since they are actually inserts) and then doing some patch up to make them work as
+ * updates and deletes instead.
+ */
+public abstract class RewriteSemanticAnalyzer extends SemanticAnalyzer {
+ protected static final Logger LOG = LoggerFactory.getLogger(RewriteSemanticAnalyzer.class);
+
+ protected boolean useSuper = false;
+
+ RewriteSemanticAnalyzer(QueryState queryState) throws SemanticException {
+ super(queryState);
+ }
+
+ @Override
+ public void analyzeInternal(ASTNode tree) throws SemanticException {
+ if (useSuper) {
+ super.analyzeInternal(tree);
+ } else {
+ if (!getTxnMgr().supportsAcid()) {
+ throw new SemanticException(ErrorMsg.ACID_OP_ON_NONACID_TXNMGR.getMsg());
+ }
+ analyze(tree);
+ cleanUpMetaColumnAccessControl();
+ }
+ }
+
+ protected abstract void analyze(ASTNode tree) throws SemanticException;
+
+ /**
+ * Append list of partition columns to Insert statement, i.e. the 2nd set of partCol1,partCol2
+ * INSERT INTO T PARTITION(partCol1,partCol2...) SELECT col1, ... partCol1,partCol2...
+ * @param target target table
+ */
+ protected void addPartitionColsToSelect(List<FieldSchema> partCols, StringBuilder rewrittenQueryStr,
+ ASTNode target) throws SemanticException {
+ String targetName = target != null ? getSimpleTableName(target) : null;
+
+ // If the table is partitioned, we need to select the partition columns as well.
+ if (partCols != null) {
+ for (FieldSchema fschema : partCols) {
+ rewrittenQueryStr.append(", ");
+ //would be nice if there was a way to determine if quotes are needed
+ if (targetName != null) {
+ rewrittenQueryStr.append(targetName).append('.');
+ }
+ rewrittenQueryStr.append(HiveUtils.unparseIdentifier(fschema.getName(), this.conf));
+ }
+ }
+ }
+
+ /**
+ * Assert that we are not asked to update a bucketing column or partition column.
+ * @param colName it's the A in "SET A = B"
+ */
+ protected void checkValidSetClauseTarget(ASTNode colName, Table targetTable) throws SemanticException {
+ String columnName = normalizeColName(colName.getText());
+
+ // Make sure this isn't one of the partitioning columns, that's not supported.
+ for (FieldSchema fschema : targetTable.getPartCols()) {
+ if (fschema.getName().equalsIgnoreCase(columnName)) {
+ throw new SemanticException(ErrorMsg.UPDATE_CANNOT_UPDATE_PART_VALUE.getMsg());
+ }
+ }
+ //updating bucket column should move row from one file to another - not supported
+ if (targetTable.getBucketCols() != null && targetTable.getBucketCols().contains(columnName)) {
+ throw new SemanticException(ErrorMsg.UPDATE_CANNOT_UPDATE_BUCKET_VALUE, columnName);
+ }
+ boolean foundColumnInTargetTable = false;
+ for (FieldSchema col : targetTable.getCols()) {
+ if (columnName.equalsIgnoreCase(col.getName())) {
+ foundColumnInTargetTable = true;
+ break;
+ }
+ }
+ if (!foundColumnInTargetTable) {
+ throw new SemanticException(ErrorMsg.INVALID_TARGET_COLUMN_IN_SET_CLAUSE, colName.getText(),
+ targetTable.getFullyQualifiedName());
+ }
+ }
+
+ protected ASTNode findLHSofAssignment(ASTNode assignment) {
+ assert assignment.getToken().getType() == HiveParser.EQUAL :
+ "Expected set assignments to use equals operator but found " + assignment.getName();
+ ASTNode tableOrColTok = (ASTNode)assignment.getChildren().get(0);
+ assert tableOrColTok.getToken().getType() == HiveParser.TOK_TABLE_OR_COL :
+ "Expected left side of assignment to be table or column";
+ ASTNode colName = (ASTNode)tableOrColTok.getChildren().get(0);
+ assert colName.getToken().getType() == HiveParser.Identifier :
+ "Expected column name";
+ return colName;
+ }
+
+ protected Map<String, ASTNode> collectSetColumnsAndExpressions(ASTNode setClause,
+ Set<String> setRCols, Table targetTable) throws SemanticException {
+ // An update needs to select all of the columns, as we rewrite the entire row. Also,
+ // we need to figure out which columns we are going to replace.
+ assert setClause.getToken().getType() == HiveParser.TOK_SET_COLUMNS_CLAUSE :
+ "Expected second child of update token to be set token";
+
+ // Get the children of the set clause, each of which should be a column assignment
+ List<? extends Node> assignments = setClause.getChildren();
+ // Must be deterministic order map for consistent q-test output across Java versions
+ Map<String, ASTNode> setCols = new LinkedHashMap<String, ASTNode>(assignments.size());
+ for (Node a : assignments) {
+ ASTNode assignment = (ASTNode)a;
+ ASTNode colName = findLHSofAssignment(assignment);
+ if (setRCols != null) {
+ addSetRCols((ASTNode) assignment.getChildren().get(1), setRCols);
+ }
+ checkValidSetClauseTarget(colName, targetTable);
+
+ String columnName = normalizeColName(colName.getText());
+ // This means that in UPDATE T SET x = _something_
+ // _something_ can be whatever is supported in SELECT _something_
+ setCols.put(columnName, (ASTNode)assignment.getChildren().get(1));
+ }
+ return setCols;
+ }
+
+ /**
+ * @return the Metastore representation of the target table
+ */
+ protected Table getTargetTable(ASTNode tabRef) throws SemanticException {
+ return getTable(tabRef, db, true);
+ }
+
+ /**
+ * @param throwException if false, return null if table doesn't exist, else throw
+ */
+ protected static Table getTable(ASTNode tabRef, Hive db, boolean throwException) throws SemanticException {
+ String[] tableName;
+ switch (tabRef.getType()) {
+ case HiveParser.TOK_TABREF:
+ tableName = getQualifiedTableName((ASTNode) tabRef.getChild(0));
+ break;
+ case HiveParser.TOK_TABNAME:
+ tableName = getQualifiedTableName(tabRef);
+ break;
+ default:
+ throw raiseWrongType("TOK_TABREF|TOK_TABNAME", tabRef);
+ }
+
+ Table mTable;
+ try {
+ mTable = db.getTable(tableName[0], tableName[1], throwException);
+ } catch (InvalidTableException e) {
+ LOG.error("Failed to find table " + getDotName(tableName) + " got exception " + e.getMessage());
+ throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(getDotName(tableName)), e);
+ } catch (HiveException e) {
+ LOG.error("Failed to find table " + getDotName(tableName) + " got exception " + e.getMessage());
+ throw new SemanticException(e.getMessage(), e);
+ }
+ return mTable;
+ }
+
+ /**
+ * Walk through all our inputs and set them to note that this read is part of an update or a delete.
+ */
+ protected void markReadEntityForUpdate() {
+ for (ReadEntity input : inputs) {
+ if (isWritten(input)) {
+ //TODO: this is actually not adding anything since LockComponent uses a Trie to "promote" a lock
+ //except by accident - when we have a partitioned target table we have a ReadEntity and WriteEntity
+ //for the table, so we mark ReadEntity and then delete WriteEntity (replace with Partition entries)
+ //so DbTxnManager skips Read lock on the ReadEntity....
+ input.setUpdateOrDelete(true); //input.noLockNeeded()?
+ }
+ }
+ }
+
+ /**
+ * For updates, we need to set the column access info so that it contains information on
+ * the columns we are updating.
+ * (But not all the columns of the target table even though the rewritten query writes
+ * all columns of target table since that is an implmentation detail).
+ */
+ protected void setUpAccessControlInfoForUpdate(Table mTable, Map<String, ASTNode> setCols) {
+ ColumnAccessInfo cai = new ColumnAccessInfo();
+ for (String colName : setCols.keySet()) {
+ cai.add(Table.getCompleteName(mTable.getDbName(), mTable.getTableName()), colName);
+ }
+ setUpdateColumnAccessInfo(cai);
+ }
+
+ /**
+ * We need to weed ROW__ID out of the input column info, as it doesn't make any sense to
+ * require the user to have authorization on that column.
+ */
+ private void cleanUpMetaColumnAccessControl() {
+ //we do this for Update/Delete (incl Merge) because we introduce this column into the query
+ //as part of rewrite
+ if (columnAccessInfo != null) {
+ columnAccessInfo.stripVirtualColumn(VirtualColumn.ROWID);
+ }
+ }
+
+ /**
+ * Parse the newly generated SQL statement to get a new AST.
+ */
+ protected ReparseResult parseRewrittenQuery(StringBuilder rewrittenQueryStr, String originalQuery)
+ throws SemanticException {
+ // Set dynamic partitioning to nonstrict so that queries do not need any partition
+ // references.
+ // TODO: this may be a perf issue as it prevents the optimizer.. or not
+ HiveConf.setVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+ // Disable LLAP IO wrapper; doesn't propagate extra ACID columns correctly.
+ HiveConf.setBoolVar(conf, ConfVars.LLAP_IO_ROW_WRAPPER_ENABLED, false);
+ // Parse the rewritten query string
+ Context rewrittenCtx;
+ try {
+ rewrittenCtx = new Context(conf);
+ rewrittenCtx.setHDFSCleanup(true);
+ // We keep track of all the contexts that are created by this query
+ // so we can clear them when we finish execution
+ ctx.addRewrittenStatementContext(rewrittenCtx);
+ } catch (IOException e) {
+ throw new SemanticException(ErrorMsg.UPDATEDELETE_IO_ERROR.getMsg());
+ }
+ rewrittenCtx.setExplainConfig(ctx.getExplainConfig());
+ rewrittenCtx.setExplainPlan(ctx.isExplainPlan());
+ rewrittenCtx.setStatsSource(ctx.getStatsSource());
+ rewrittenCtx.setPlanMapper(ctx.getPlanMapper());
+ rewrittenCtx.setIsUpdateDeleteMerge(true);
+ rewrittenCtx.setCmd(rewrittenQueryStr.toString());
+
+ ASTNode rewrittenTree;
+ try {
+ LOG.info("Going to reparse <" + originalQuery + "> as \n<" + rewrittenQueryStr.toString() + ">");
+ rewrittenTree = ParseUtils.parse(rewrittenQueryStr.toString(), rewrittenCtx);
+ } catch (ParseException e) {
+ throw new SemanticException(ErrorMsg.UPDATEDELETE_PARSE_ERROR.getMsg(), e);
+ }
+ return new ReparseResult(rewrittenTree, rewrittenCtx);
+ }
+
+ /**
+ * Assert it supports Acid write.
+ */
+ protected void validateTargetTable(Table mTable) throws SemanticException {
+ if (mTable.getTableType() == TableType.VIRTUAL_VIEW || mTable.getTableType() == TableType.MATERIALIZED_VIEW) {
+ LOG.error("Table " + mTable.getFullyQualifiedName() + " is a view or materialized view");
+ throw new SemanticException(ErrorMsg.UPDATE_DELETE_VIEW.getMsg());
+ }
+ }
+
+ /**
+ * Check that {@code readEntity} is also being written.
+ */
+ private boolean isWritten(Entity readEntity) {
+ for (Entity writeEntity : outputs) {
+ //make sure to compare them as Entity, i.e. that it's the same table or partition, etc
+ if (writeEntity.toString().equalsIgnoreCase(readEntity.toString())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ // This method finds any columns on the right side of a set statement (thus rcols) and puts them
+ // in a set so we can add them to the list of input cols to check.
+ private void addSetRCols(ASTNode node, Set<String> setRCols) {
+
+ // See if this node is a TOK_TABLE_OR_COL. If so, find the value and put it in the list. If
+ // not, recurse on any children
+ if (node.getToken().getType() == HiveParser.TOK_TABLE_OR_COL) {
+ ASTNode colName = (ASTNode)node.getChildren().get(0);
+ assert colName.getToken().getType() == HiveParser.Identifier :
+ "Expected column name";
+ setRCols.add(normalizeColName(colName.getText()));
+ } else if (node.getChildren() != null) {
+ for (Node n : node.getChildren()) {
+ addSetRCols((ASTNode)n, setRCols);
+ }
+ }
+ }
+
+ /**
+ * Column names are stored in metastore in lower case, regardless of the CREATE TABLE statement.
+ * Unfortunately there is no single place that normalizes the input query.
+ * @param colName not null
+ */
+ private static String normalizeColName(String colName) {
+ return colName.toLowerCase();
+ }
+
+ /**
+ * SemanticAnalyzer will generate a WriteEntity for the target table since it doesn't know/check
+ * if the read and write are of the same table in "insert ... select ....". Since DbTxnManager
+ * uses Read/WriteEntity objects to decide which locks to acquire, we get more concurrency if we
+ * have change the table WriteEntity to a set of partition WriteEntity objects based on
+ * ReadEntity objects computed for this table.
+ */
+ protected void updateOutputs(Table targetTable) {
+ markReadEntityForUpdate();
+
+ if (targetTable.isPartitioned()) {
+ List<ReadEntity> partitionsRead = getRestrictedPartitionSet(targetTable);
+ if (!partitionsRead.isEmpty()) {
+ // if there is WriteEntity with WriteType=UPDATE/DELETE for target table, replace it with
+ // WriteEntity for each partition
+ List<WriteEntity> toRemove = new ArrayList<>();
+ for (WriteEntity we : outputs) {
+ WriteEntity.WriteType wt = we.getWriteType();
+ if (isTargetTable(we, targetTable) &&
+ (wt == WriteEntity.WriteType.UPDATE || wt == WriteEntity.WriteType.DELETE)) {
+ // The assumption here is that SemanticAnalyzer will will generate ReadEntity for each
+ // partition that exists and is matched by the WHERE clause (which may be all of them).
+ // Since we don't allow updating the value of a partition column, we know that we always
+ // write the same (or fewer) partitions than we read. Still, the write is a Dynamic
+ // Partition write - see HIVE-15032.
+ toRemove.add(we);
+ }
+ }
+ outputs.removeAll(toRemove);
+ // TODO: why is this like that?
+ for (ReadEntity re : partitionsRead) {
+ for (WriteEntity original : toRemove) {
+ //since we may have both Update and Delete branches, Auth needs to know
+ WriteEntity we = new WriteEntity(re.getPartition(), original.getWriteType());
+ we.setDynamicPartitionWrite(original.isDynamicPartitionWrite());
+ outputs.add(we);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * If the optimizer has determined that it only has to read some of the partitions of the
+ * target table to satisfy the query, then we know that the write side of update/delete
+ * (and update/delete parts of merge)
+ * can only write (at most) that set of partitions (since we currently don't allow updating
+ * partition (or bucket) columns). So we want to replace the table level
+ * WriteEntity in the outputs with WriteEntity for each of these partitions
+ * ToDo: see if this should be moved to SemanticAnalyzer itself since it applies to any
+ * insert which does a select against the same table. Then SemanticAnalyzer would also
+ * be able to not use DP for the Insert...
+ *
+ * Note that the Insert of Merge may be creating new partitions and writing to partitions
+ * which were not read (WHEN NOT MATCHED...). WriteEntity for that should be created
+ * in MoveTask (or some other task after the query is complete).
+ */
+ private List<ReadEntity> getRestrictedPartitionSet(Table targetTable) {
+ List<ReadEntity> partitionsRead = new ArrayList<>();
+ for (ReadEntity re : inputs) {
+ if (re.isFromTopLevelQuery && re.getType() == Entity.Type.PARTITION && isTargetTable(re, targetTable)) {
+ partitionsRead.add(re);
+ }
+ }
+ return partitionsRead;
+ }
+
+ /**
+ * Does this Entity belong to target table (partition).
+ */
+ private boolean isTargetTable(Entity entity, Table targetTable) {
+ //todo: https://issues.apache.org/jira/browse/HIVE-15048
+ /**
+ * is this the right way to compare? Should it just compare paths?
+ * equals() impl looks heavy weight
+ */
+ return targetTable.equals(entity.getTable());
+ }
+
+ /**
+ * Returns the table name to use in the generated query preserving original quotes/escapes if any.
+ * @see #getFullTableNameForSQL(ASTNode)
+ */
+ protected String getSimpleTableName(ASTNode n) throws SemanticException {
+ return HiveUtils.unparseIdentifier(getSimpleTableNameBase(n), this.conf);
+ }
+
+ protected String getSimpleTableNameBase(ASTNode n) throws SemanticException {
+ switch (n.getType()) {
+ case HiveParser.TOK_TABREF:
+ int aliasIndex = findTabRefIdxs(n)[0];
+ if (aliasIndex != 0) {
+ return n.getChild(aliasIndex).getText(); //the alias
+ }
+ return getSimpleTableNameBase((ASTNode) n.getChild(0));
+ case HiveParser.TOK_TABNAME:
+ if (n.getChildCount() == 2) {
+ //db.table -> return table
+ return n.getChild(1).getText();
+ }
+ return n.getChild(0).getText();
+ case HiveParser.TOK_SUBQUERY:
+ return n.getChild(1).getText(); //the alias
+ default:
+ throw raiseWrongType("TOK_TABREF|TOK_TABNAME|TOK_SUBQUERY", n);
+ }
+ }
+
+ protected static final class ReparseResult {
+ final ASTNode rewrittenTree;
+ final Context rewrittenCtx;
+ ReparseResult(ASTNode n, Context c) {
+ rewrittenTree = n;
+ rewrittenCtx = c;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/dcc89501/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
index 088b5cf..51a6b2a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
@@ -216,8 +216,8 @@ public final class SemanticAnalyzerFactory {
case HiveParser.TOK_LOAD:
return new LoadSemanticAnalyzer(queryState);
case HiveParser.TOK_EXPORT:
- if (UpdateDeleteSemanticAnalyzer.isAcidExport(tree)) {
- return new UpdateDeleteSemanticAnalyzer(queryState);
+ if (AcidExportSemanticAnalyzer.isAcidExport(tree)) {
+ return new AcidExportSemanticAnalyzer(queryState);
}
return new ExportSemanticAnalyzer(queryState);
case HiveParser.TOK_IMPORT:
@@ -368,9 +368,11 @@ public final class SemanticAnalyzerFactory {
case HiveParser.TOK_UPDATE_TABLE:
case HiveParser.TOK_DELETE_FROM:
- case HiveParser.TOK_MERGE:
return new UpdateDeleteSemanticAnalyzer(queryState);
+ case HiveParser.TOK_MERGE:
+ return new MergeSemanticAnalyzer(queryState);
+
case HiveParser.TOK_START_TRANSACTION:
case HiveParser.TOK_COMMIT:
case HiveParser.TOK_ROLLBACK: