You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2016/11/12 20:20:21 UTC

[3/3] hive git commit: HIVE-14943 Base Implementation (of HIVE-10924) (Eugene Koifman, reviewed by Alan Gates)

HIVE-14943 Base Implementation (of HIVE-10924) (Eugene Koifman, reviewed by Alan Gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e00f909d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e00f909d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e00f909d

Branch: refs/heads/master
Commit: e00f909dd35ee46de5d9de493bb76e37ba4b6f74
Parents: 52ba014
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Sat Nov 12 12:20:01 2016 -0800
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Sat Nov 12 12:20:01 2016 -0800

----------------------------------------------------------------------
 .../hive/metastore/LockComponentBuilder.java    |   4 +
 .../metastore/txn/CompactionTxnHandler.java     |   5 +-
 .../hadoop/hive/metastore/txn/TxnHandler.java   |  51 +-
 .../java/org/apache/hadoop/hive/ql/Context.java |  63 +-
 .../java/org/apache/hadoop/hive/ql/Driver.java  |   4 +-
 .../org/apache/hadoop/hive/ql/ErrorMsg.java     |  11 +-
 .../apache/hadoop/hive/ql/exec/MoveTask.java    |   4 +
 .../apache/hadoop/hive/ql/hooks/ReadEntity.java |   3 +
 .../hadoop/hive/ql/hooks/WriteEntity.java       |  10 +
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java    |   9 +-
 .../hadoop/hive/ql/lockmgr/DummyTxnManager.java |   2 +-
 .../hadoop/hive/ql/lockmgr/HiveTxnManager.java  |   7 +-
 .../BucketingSortingReduceSinkOptimizer.java    |   7 +-
 .../apache/hadoop/hive/ql/parse/ASTNode.java    |   8 +
 .../hadoop/hive/ql/parse/FromClauseParser.g     |   6 +-
 .../org/apache/hadoop/hive/ql/parse/HiveLexer.g |   2 +
 .../apache/hadoop/hive/ql/parse/HiveParser.g    |  60 +-
 .../hadoop/hive/ql/parse/IdentifiersParser.g    |   7 +-
 .../hadoop/hive/ql/parse/QBParseInfo.java       |   7 +
 .../hadoop/hive/ql/parse/SelectClauseParser.g   |   2 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  | 155 +--
 .../hive/ql/parse/SemanticAnalyzerFactory.java  |   1 +
 .../ql/parse/UpdateDeleteSemanticAnalyzer.java  | 989 ++++++++++++++++---
 .../hadoop/hive/ql/plan/CreateTableDesc.java    |   4 +-
 .../metastore/txn/TestCompactionTxnHandler.java |   1 +
 .../apache/hadoop/hive/ql/TestTxnCommands.java  |  35 +
 .../apache/hadoop/hive/ql/TestTxnCommands2.java | 417 +++++++-
 .../ql/TestTxnCommands2WithSplitUpdate.java     |  13 +-
 .../hive/ql/lockmgr/TestDbTxnManager2.java      | 645 +++++++++++-
 .../apache/hadoop/hive/ql/parse/TestIUD.java    |   4 +-
 .../hive/ql/parse/TestMergeStatement.java       | 246 +++++
 31 files changed, 2464 insertions(+), 318 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/metastore/src/java/org/apache/hadoop/hive/metastore/LockComponentBuilder.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/LockComponentBuilder.java b/metastore/src/java/org/apache/hadoop/hive/metastore/LockComponentBuilder.java
index 3e8f193..e074152 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/LockComponentBuilder.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/LockComponentBuilder.java
@@ -102,6 +102,10 @@ public class LockComponentBuilder {
     partNameSet = true;
     return this;
   }
+  public LockComponentBuilder setIsDynamicPartitionWrite(boolean t) {
+    component.setIsDynamicPartitionWrite(t);
+    return this;
+  }
 
  /**
    * Get the constructed lock component.

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 75a4d87..9145fcc 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -43,9 +43,6 @@ class CompactionTxnHandler extends TxnHandler {
   static final private String CLASS_NAME = CompactionTxnHandler.class.getName();
   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
 
-  // Always access COMPACTION_QUEUE before COMPLETED_TXN_COMPONENTS
-  // See TxnHandler for notes on how to deal with deadlocks.  Follow those notes.
-
   public CompactionTxnHandler() {
   }
 
@@ -428,7 +425,7 @@ class CompactionTxnHandler extends TxnHandler {
   }
 
   /**
-   * Clean up aborted transactions from txns that have no components in txn_components.  The reson such
+   * Clean up aborted transactions from txns that have no components in txn_components.  The reason such
    * txns exist can be that now work was done in this txn (e.g. Streaming opened TransactionBatch and
    * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called.
    */

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 547ee98..a815f2c 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -625,7 +625,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         rs = stmt.executeQuery(sqlGenerator.addLimitClause(1, "tc_operation_type " + conflictSQLSuffix));
         if (rs.next()) {
           close(rs);
-          //here means currently committing txn performed update/delete and we should check WW conflict
+          //if here it means currently committing txn performed update/delete and we should check WW conflict
           /**
            * This S4U will mutex with other commitTxn() and openTxns(). 
            * -1 below makes txn intervals look like [3,3] [4,4] if all txns are serial
@@ -653,7 +653,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
            */
           rs = stmt.executeQuery
             (sqlGenerator.addLimitClause(1, "committed.ws_txnid, committed.ws_commit_id, committed.ws_database," +
-              "committed.ws_table, committed.ws_partition, cur.ws_commit_id cur_ws_commit_id " +
+              "committed.ws_table, committed.ws_partition, cur.ws_commit_id cur_ws_commit_id, " +
+              "cur.ws_operation_type cur_op, committed.ws_operation_type committed_op " +
               "from WRITE_SET committed INNER JOIN WRITE_SET cur " +
               "ON committed.ws_database=cur.ws_database and committed.ws_table=cur.ws_table " +
               //For partitioned table we always track writes at partition level (never at table)
@@ -677,7 +678,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
               resource.append('/').append(partitionName);
             }
             String msg = "Aborting [" + JavaUtils.txnIdToString(txnid) + "," + rs.getLong(6) + "]" + " due to a write conflict on " + resource +
-              " committed by " + committedTxn;
+              " committed by " + committedTxn + " " + rs.getString(7) + "/" + rs.getString(8);
             close(rs);
             //remove WRITE_SET info for current txn since it's about to abort
             dbConn.rollback(undoWriteSetForCurrentTxn);
@@ -712,6 +713,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         int modCount = 0;
         if ((modCount = stmt.executeUpdate(s)) < 1) {
           //this can be reasonable for an empty txn START/COMMIT or read-only txn
+          //also an IUD with DP that didn't match any rows.
           LOG.info("Expected to move at least one record from txn_components to " +
             "completed_txn_components when committing txn! " + JavaUtils.txnIdToString(txnid));
         }
@@ -884,14 +886,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
         if (txnid > 0) {
           List<String> rows = new ArrayList<>();
-          /**
-           * todo QueryPlan has BaseSemanticAnalyzer which has acidFileSinks list of FileSinkDesc
-           * FileSinkDesc.table is ql.metadata.Table
-           * Table.tableSpec which is TableSpec, which has specType which is SpecType
-           * So maybe this can work to know that this is part of dynamic partition insert in which case
-           * we'll get addDynamicPartitions() call and should not write TXN_COMPONENTS here.
-           * In any case, that's an optimization for now;  will be required when adding multi-stmt txns
-           */
           // For each component in this lock request,
           // add an entry to the txn_components table
           for (LockComponent lc : rqst.getComponent()) {
@@ -909,7 +903,18 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
                 case INSERT:
                 case UPDATE:
                 case DELETE:
-                  updateTxnComponents = true;
+                  if(!lc.isSetIsDynamicPartitionWrite()) {
+                    //must be old client talking, i.e. we don't know if it's DP so be conservative
+                    updateTxnComponents = true;
+                  }
+                  else {
+                    /**
+                     * we know this is part of DP operation and so we'll get
+                     * {@link #addDynamicPartitions(AddDynamicPartitions)} call with the list
+                     * of partitions actually chaged.
+                     */
+                    updateTxnComponents = !lc.isIsDynamicPartitionWrite();
+                  }
                   break;
                 case SELECT:
                   updateTxnComponents = false;
@@ -1544,22 +1549,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         if(rqst.isSetOperationType()) {
           ot = OpertaionType.fromDataOperationType(rqst.getOperationType());
         }
-        
-        //what if a txn writes the same table > 1 time...(HIVE-9675) let's go with this for now, but really
-        //need to not write this in the first place, i.e. make this delete not needed
-        //see enqueueLockWithRetry() - that's where we write to TXN_COMPONENTS
-        String deleteSql = "delete from TXN_COMPONENTS where tc_txnid=" + rqst.getTxnid() + " and tc_database=" +
-          quoteString(rqst.getDbname()) + " and tc_table=" + quoteString(rqst.getTablename());
-        //we delete the entries made by enqueueLockWithRetry() since those are based on lock information which is
-        //much "wider" than necessary in a lot of cases.  Here on the other hand, we know exactly which
-        //partitions have been written to.  w/o this WRITE_SET would contain entries for partitions not actually
-        //written to
-        int modCount = stmt.executeUpdate(deleteSql);
         List<String> rows = new ArrayList<>();
         for (String partName : rqst.getPartitionnames()) {
           rows.add(rqst.getTxnid() + "," + quoteString(rqst.getDbname()) + "," + quoteString(rqst.getTablename()) +
             "," + quoteString(partName) + "," + quoteChar(ot.sqlConst));
         }
+        int modCount = 0;
+        //record partitions that were written to
         List<String> queries = sqlGenerator.createInsertValuesStmt(
           "TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type)", rows);
         for(String query : queries) {
@@ -2080,7 +2076,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     }
 
     public int compare(LockInfo info1, LockInfo info2) {
-      // We sort by state (acquired vs waiting) and then by LockType, they by id
+      // We sort by state (acquired vs waiting) and then by LockType, then by id
       if (info1.state == LockState.ACQUIRED &&
         info2.state != LockState .ACQUIRED) {
         return -1;
@@ -2285,6 +2281,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     /**
      * todo: Longer term we should pass this from client somehow - this would be an optimization;  once
      * that is in place make sure to build and test "writeSet" below using OperationType not LockType
+     * With SP we assume that the query modifies exactly the partitions it locked.  (not entirely
+     * realistic since Update/Delete may have some predicate that filters out all records out of
+     * some partition(s), but plausible).  For DP, we acquire locks very wide (all known partitions),
+     * but for most queries only a fraction will actually be updated.  #addDynamicPartitions() tells
+     * us exactly which ones were written to.  Thus using this trick to kill a query early for
+     * DP queries may be too restrictive.
      */
     boolean isPartOfDynamicPartitionInsert = true;
     try {
@@ -2567,6 +2569,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       (desiredLock.txnId != 0 && desiredLock.txnId == existingLock.txnId) ||
       //txnId=0 means it's a select or IUD which does not write to ACID table, e.g
       //insert overwrite table T partition(p=1) select a,b from T and autoCommit=true
+      // todo: fix comment as of HIVE-14988
       (desiredLock.txnId == 0 &&  desiredLock.extLockId == existingLock.extLockId);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/Context.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index 838d73e..4355d21 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -25,6 +25,7 @@ import java.net.URI;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -44,13 +45,13 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.TaskRunner;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager.Heartbeater;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj;
 import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
 import org.apache.hadoop.hive.ql.lockmgr.LockException;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
 import org.apache.hadoop.hive.ql.parse.ExplainConfiguration;
 import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
@@ -107,11 +108,6 @@ public class Context {
   // Transaction manager for this query
   protected HiveTxnManager hiveTxnManager;
 
-  // Used to track what type of acid operation (insert, update, or delete) we are doing.  Useful
-  // since we want to change where bucket columns are accessed in some operators and
-  // optimizations when doing updates and deletes.
-  private AcidUtils.Operation acidOperation = AcidUtils.Operation.NOT_ACID;
-
   private boolean needLockMgr;
 
   private AtomicInteger sequencer = new AtomicInteger();
@@ -129,6 +125,53 @@ public class Context {
   private Heartbeater heartbeater;
 
   private boolean skipTableMasking;
+  /**
+   * This determines the prefix of the
+   * {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.Phase1Ctx#dest}
+   * name for a given subtree of the AST.  Most of the times there is only 1 destination in a
+   * given tree but multi-insert has several and multi-insert representing MERGE must use
+   * different prefixes to encode the purpose of different Insert branches
+   */
+  private Map<ASTNode, DestClausePrefix> tree2DestNamePrefix;
+  public enum DestClausePrefix {
+    INSERT("insclause-"), UPDATE("updclause-"), DELETE("delclause-");
+    private final String prefix;
+    DestClausePrefix(String prefix) {
+      this.prefix = prefix;
+    }
+    public String toString() {
+      return prefix;
+    }
+  }
+  /**
+   * The suffix is always relative to a given ASTNode
+   */
+  public DestClausePrefix getDestNamePrefix(ASTNode curNode) {
+    //if there is no mapping, we want to default to "old" naming
+    assert curNode != null : "must supply curNode";
+    if(tree2DestNamePrefix == null || tree2DestNamePrefix.isEmpty()) {
+      return DestClausePrefix.INSERT;
+    }
+    do {
+      DestClausePrefix prefix = tree2DestNamePrefix.get(curNode);
+      if(prefix != null) {
+        return prefix;
+      }
+      curNode = (ASTNode) curNode.parent;
+    } while(curNode != null);
+    return DestClausePrefix.INSERT;
+  }
+  /**
+   * Will make SemanticAnalyzer.Phase1Ctx#dest in subtree rooted at 'tree' use 'prefix'
+   * @param tree
+   * @return previous prefix for 'tree' or null
+   */
+  public DestClausePrefix addDestNamePrefix(ASTNode tree, DestClausePrefix prefix) {
+    if(tree2DestNamePrefix == null) {
+      tree2DestNamePrefix = new IdentityHashMap<>();
+    }
+    return tree2DestNamePrefix.put(tree, prefix);
+  }
 
   public Context(Configuration conf) throws IOException {
     this(conf, generateExecutionId());
@@ -760,14 +803,6 @@ public class Context {
     this.tryCount = tryCount;
   }
 
-  public void setAcidOperation(AcidUtils.Operation op) {
-    acidOperation = op;
-  }
-
-  public AcidUtils.Operation getAcidOperation() {
-    return acidOperation;
-  }
-
   public String getCboInfo() {
     return cboInfo;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 923ef08..b77948b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -1111,7 +1111,9 @@ public class Driver implements CommandProcessor {
       if (haveAcidWrite()) {
         for (FileSinkDesc desc : acidSinks) {
           desc.setTransactionId(txnMgr.getCurrentTxnId());
-          desc.setStatementId(txnMgr.getStatementId());
+          //it's possible to have > 1 FileSink writing to the same table/partition
+          //e.g. Merge stmt, multi-insert stmt when mixing DP and SP writes
+          desc.setStatementId(txnMgr.getWriteIdAndIncrement());
         }
       }
       /*Note, we have to record snapshot after lock acquisition to prevent lost update problem

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 7ed3907..97fcd55 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -398,7 +398,7 @@ public enum ErrorMsg {
   DISTINCT_NOT_SUPPORTED(10285, "Distinct keyword is not support in current context"),
   NONACID_COMPACTION_NOT_SUPPORTED(10286, "Compaction is not allowed on non-ACID table {0}.{1}", true),
 
-  UPDATEDELETE_PARSE_ERROR(10290, "Encountered parse error while parsing rewritten update or " +
+  UPDATEDELETE_PARSE_ERROR(10290, "Encountered parse error while parsing rewritten merge/update or " +
       "delete query"),
   UPDATEDELETE_IO_ERROR(10291, "Encountered I/O error while parsing rewritten update or " +
       "delete query"),
@@ -456,6 +456,10 @@ public enum ErrorMsg {
   REPLACE_MATERIALIZED_WITH_VIEW(10401, "Attempt to replace materialized view {0} with view", true),
   UPDATE_DELETE_VIEW(10402, "You cannot update or delete records in a view"),
   MATERIALIZED_VIEW_DEF_EMPTY(10403, "Query for the materialized view rebuild could not be retrieved"),
+  MERGE_PREDIACTE_REQUIRED(10404, "MERGE statement with both UPDATE and DELETE clauses " +
+    "requires \"AND <boolean>\" on the 1st WHEN MATCHED clause of <{0}>", true),
+  MERGE_TOO_MANY_DELETE(10405, "MERGE statment can have at most 1 WHEN MATCHED ... DELETE clause: <{0}>", true),
+  MERGE_TOO_MANY_UPDATE(10406, "MERGE statment can have at most 1 WHEN MATCHED ... UPDATE clause: <{0}>", true),
   //========================== 20000 range starts here ========================//
   SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."),
   SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. "
@@ -722,6 +726,11 @@ public enum ErrorMsg {
     sb.append(":");
     sb.append(getCharPositionInLine(tree));
   }
+  public static String renderPosition(ASTNode n) {
+    StringBuilder sb = new StringBuilder();
+    ErrorMsg.renderPosition(sb, n);
+    return sb.toString();
+  }
 
   public String getMsg(Tree tree) {
     return getMsg((ASTNode) tree);

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index 8265af4..349f115 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -206,6 +206,10 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
     }
 
     Context ctx = driverContext.getCtx();
+    if(ctx.getHiveTxnManager().supportsAcid()) {
+      //Acid LM doesn't maintain getOutputLockObjects(); this 'if' just makes it more explicit
+      return;
+    }
     HiveLockManager lockMgr = ctx.getHiveTxnManager().getLockManager();
     WriteEntity output = ctx.getLoadTableOutputMap().get(ltd);
     List<HiveLockObj> lockObjects = ctx.getOutputLockObjects().get(output);

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java
index 3d7de69..b805904 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java
@@ -53,6 +53,9 @@ public class ReadEntity extends Entity implements Serializable {
   // important because in that case we shouldn't acquire a lock for it or authorize the read.
   // These will be handled by the output to the table instead.
   private boolean isUpdateOrDelete = false;
+  //https://issues.apache.org/jira/browse/HIVE-15048
+  public transient boolean isFromTopLevelQuery = true;
+
 
   // For views, the entities can be nested - by default, entities are at the top level
   // Must be deterministic order set for consistent q-test output across Java versions

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
index 9e18638..da8c1e2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
@@ -38,6 +38,7 @@ public class WriteEntity extends Entity implements Serializable {
   private static final Logger LOG = LoggerFactory.getLogger(WriteEntity.class);
 
   private boolean isTempURI = false;
+  private transient boolean isDynamicPartitionWrite = false;
 
   public static enum WriteType {
     DDL_EXCLUSIVE, // for use in DDL statements that require an exclusive lock,
@@ -221,5 +222,14 @@ public class WriteEntity extends Entity implements Serializable {
         throw new RuntimeException("Unknown operation " + op.toString());
     }
   }
+  public boolean isDynamicPartitionWrite() {
+    return isDynamicPartitionWrite;
+  }
+  public void setDynamicPartitionWrite(boolean t) {
+    isDynamicPartitionWrite = t;
+  }
+  public String toDetailedString() {
+    return toString() + " Type=" + getTyp() + " WriteType=" + getWriteType() + " isDP=" + isDynamicPartitionWrite();
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index da7505b..867e445 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -321,6 +321,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
       if(t != null && AcidUtils.isAcidTable(t)) {
         compBuilder.setIsAcid(true);
       }
+      compBuilder.setIsDynamicPartitionWrite(output.isDynamicPartitionWrite());
       LockComponent comp = compBuilder.build();
       LOG.debug("Adding lock component to lock request " + comp.toString());
       rqstBuilder.addLockComponent(comp);
@@ -335,9 +336,6 @@ public class DbTxnManager extends HiveTxnManagerImpl {
     }
 
     List<HiveLock> locks = new ArrayList<HiveLock>(1);
-    if(isTxnOpen()) {
-      statementId++;
-    }
     LockState lockState = lockMgr.lock(rqstBuilder.build(), queryId, isBlocking, locks);
     ctx.setHiveLocks(locks);
     return lockState;
@@ -650,8 +648,9 @@ public class DbTxnManager extends HiveTxnManagerImpl {
     return txnId;
   }
   @Override
-  public int getStatementId() {
-    return statementId;
+  public int getWriteIdAndIncrement() {
+    assert isTxnOpen();
+    return statementId++;
   }
 
   private static long getHeartbeatInterval(Configuration conf) throws LockException {

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
index 1d071a8..f001f59 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
@@ -63,7 +63,7 @@ class DummyTxnManager extends HiveTxnManagerImpl {
   }
 
   @Override
-  public int getStatementId() {
+  public int getWriteIdAndIncrement() {
     return 0;
   }
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
index 9b4a97f..5b9ad60 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
@@ -210,8 +210,9 @@ public interface HiveTxnManager {
   long getCurrentTxnId();
 
   /**
-   * 0..N Id of current statement within currently opened transaction
+   * Should be though of more as a unique write operation ID in a given txn (at QueryPlan level).
+   * Each statement writing data within a multi statement txn should have a unique WriteId.
+   * Even a single statement, (e.g. Merge, multi-insert may generates several writes).
    */
-  int getStatementId();
-
+  int getWriteIdAndIncrement();
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
index da261bb..8f40998 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
@@ -401,9 +401,12 @@ public class BucketingSortingReduceSinkOptimizer extends Transform {
         return null;
       }
 
+      assert fsOp.getConf().getWriteType() == rsOp.getConf().getWriteType() :
+        "WriteType mismatch. fsOp is " + fsOp.getConf().getWriteType() +
+          "; rsOp is " + rsOp.getConf().getWriteType();
       // Don't do this optimization with updates or deletes
-      if (pGraphContext.getContext().getAcidOperation() == AcidUtils.Operation.UPDATE ||
-          pGraphContext.getContext().getAcidOperation() == AcidUtils.Operation.DELETE){
+      if (fsOp.getConf().getWriteType() == AcidUtils.Operation.UPDATE ||
+        fsOp.getConf().getWriteType() == AcidUtils.Operation.DELETE) {
         return null;
       }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java
index 62f9d14..0e6d903 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java
@@ -42,6 +42,7 @@ public class ASTNode extends CommonTree implements Node,Serializable {
   private transient ASTNode rootNode;
   private transient boolean isValidASTStr;
   private transient boolean visited = false;
+  transient String matchedText;
 
   public ASTNode() {
   }
@@ -347,4 +348,11 @@ public class ASTNode extends CommonTree implements Node,Serializable {
     return rootNode.getMemoizedSubString(startIndx, endIndx);
   }
 
+  /**
+   * The string that generated this node.
+   * Only set for a node if parser grammar sets it for a particular rule
+   */
+  public String getMatchedText() {
+    return matchedText;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g
index 26aca96..f8adb38 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g
@@ -19,7 +19,7 @@ parser grammar FromClauseParser;
 options
 {
 output=AST;
-ASTLabelType=CommonTree;
+ASTLabelType=ASTNode;
 backtrack=false;
 k=3;
 }
@@ -142,7 +142,7 @@ tableAlias
 
 fromSource
 @init { gParent.pushMsg("from source", state); }
-@after { gParent.popMsg(state); }
+@after { $fromSource.tree.matchedText = $fromSource.text; gParent.popMsg(state); }
     :
     (LPAREN KW_VALUES) => fromSource0
     | (LPAREN) => LPAREN joinSource RPAREN -> joinSource
@@ -278,7 +278,7 @@ searchCondition
 // INSERT INTO <table> (col1,col2,...) SELECT * FROM (VALUES(1,2,3),(4,5,6),...) as Foo(a,b,c)
 valueRowConstructor
 @init { gParent.pushMsg("value row constructor", state); }
-@after { gParent.popMsg(state); }
+@after { $valueRowConstructor.tree.matchedText = $valueRowConstructor.text; gParent.popMsg(state); }
     :
     LPAREN precedenceUnaryPrefixExpression (COMMA precedenceUnaryPrefixExpression)* RPAREN -> ^(TOK_VALUE_ROW precedenceUnaryPrefixExpression+)
     ;

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
index 63c32a8..b467c51 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
@@ -336,6 +336,8 @@ KW_KEY: 'KEY';
 KW_ABORT: 'ABORT';
 KW_EXTRACT: 'EXTRACT';
 KW_FLOOR: 'FLOOR';
+KW_MERGE: 'MERGE';
+KW_MATCHED: 'MATCHED';
 
 // Operators
 // NOTE: if you add a new function/operator, add it to sysFuncNames so that describe function _FUNC_ will work.

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
index 8aa39b0..bd53a36 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
@@ -20,7 +20,7 @@ options
 {
 tokenVocab=HiveLexer;
 output=AST;
-ASTLabelType=CommonTree;
+ASTLabelType=ASTNode;
 backtrack=false;
 k=3;
 }
@@ -384,6 +384,11 @@ TOK_ROLLBACK;
 TOK_SET_AUTOCOMMIT;
 TOK_CACHE_METADATA;
 TOK_ABORT_TRANSACTIONS;
+TOK_MERGE;
+TOK_MATCHED;
+TOK_NOT_MATCHED;
+TOK_UPDATE;
+TOK_DELETE;
 }
 
 
@@ -737,6 +742,7 @@ execStatement
     | deleteStatement
     | updateStatement
     | sqlTransactionStatement
+    | mergeStatement
     ;
 
 loadStatement
@@ -2670,3 +2676,55 @@ abortTransactionStatement
   :
   KW_ABORT KW_TRANSACTIONS ( Number )+ -> ^(TOK_ABORT_TRANSACTIONS ( Number )+)
   ;
+
+
+/*
+BEGIN SQL Merge statement
+*/
+mergeStatement
+@init { pushMsg("MERGE statement", state); }
+@after { popMsg(state); }
+   :
+   KW_MERGE KW_INTO tableName (KW_AS? identifier)? KW_USING fromSource KW_ON expression whenClauses ->
+    ^(TOK_MERGE ^(TOK_TABREF tableName identifier?) fromSource expression whenClauses)
+   ;
+/*
+Allow 0,1 or 2 WHEN MATCHED clauses and 0 or 1 WHEN NOT MATCHED
+Each WHEN clause may have AND <boolean predicate>.
+If 2 WHEN MATCHED clauses are present, 1 must be UPDATE the other DELETE and the 1st one
+must have AND <boolean predicate>
+*/
+whenClauses
+   :
+   (whenMatchedAndClause|whenMatchedThenClause)* whenNotMatchedClause?
+   ;
+whenNotMatchedClause
+@init { pushMsg("WHEN NOT MATCHED clause", state); }
+@after { popMsg(state); }
+   :
+  KW_WHEN KW_NOT KW_MATCHED (KW_AND expression)? KW_THEN KW_INSERT KW_VALUES valueRowConstructor ->
+    ^(TOK_NOT_MATCHED ^(TOK_INSERT valueRowConstructor) expression?)
+  ;
+whenMatchedAndClause
+@init { pushMsg("WHEN MATCHED AND clause", state); }
+@after { popMsg(state); }
+  :
+  KW_WHEN KW_MATCHED KW_AND expression KW_THEN updateOrDelete ->
+    ^(TOK_MATCHED updateOrDelete expression)
+  ;
+whenMatchedThenClause
+@init { pushMsg("WHEN MATCHED THEN clause", state); }
+@after { popMsg(state); }
+  :
+  KW_WHEN KW_MATCHED KW_THEN updateOrDelete ->
+     ^(TOK_MATCHED updateOrDelete)
+  ;
+updateOrDelete
+   :
+   KW_UPDATE setColumnsClause -> ^(TOK_UPDATE setColumnsClause)
+   |
+   KW_DELETE -> TOK_DELETE
+   ;
+/*
+END SQL Merge statement
+*/

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
index 2e40aa5..aa92739 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
@@ -19,7 +19,7 @@ parser grammar IdentifiersParser;
 options
 {
 output=AST;
-ASTLabelType=CommonTree;
+ASTLabelType=ASTNode;
 backtrack=false;
 k=3;
 }
@@ -383,7 +383,7 @@ intervalQualifiers
 
 expression
 @init { gParent.pushMsg("expression specification", state); }
-@after { gParent.popMsg(state); }
+@after { $expression.tree.matchedText = $expression.text; gParent.popMsg(state); }
     :
     precedenceOrExpression
     ;
@@ -459,6 +459,7 @@ precedencePlusOperator
     ;
 
 precedencePlusExpression
+@after { $precedencePlusExpression.tree.matchedText = $precedencePlusExpression.text; }
     :
     precedenceStarExpression (precedencePlusOperator^ precedenceStarExpression)*
     ;
@@ -759,6 +760,8 @@ nonReserved
     | KW_VALIDATE
     | KW_NOVALIDATE
     | KW_KEY
+    | KW_MERGE
+    | KW_MATCHED
 ;
 
 //The following SQL2011 reserved keywords are used as function name only, but not as identifiers.

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
index 3a0402e..f549dff 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
@@ -181,6 +181,9 @@ public class QBParseInfo {
     insertIntoTables.put(fullName.toLowerCase(), ast);
   }
 
+  /**
+   * See also {@link #getInsertOverwriteTables()}
+   */
   public boolean isInsertIntoTable(String dbName, String table) {
     String fullName = dbName + "." + table;
     return insertIntoTables.containsKey(fullName.toLowerCase());
@@ -188,6 +191,7 @@ public class QBParseInfo {
 
   /**
    * Check if a table is in the list to be inserted into
+   * See also {@link #getInsertOverwriteTables()}
    * @param fullTableName table name in dbname.tablename format
    * @return
    */
@@ -640,6 +644,9 @@ public class QBParseInfo {
     this.isPartialScanAnalyzeCommand = isPartialScanAnalyzeCommand;
   }
 
+  /**
+   * See also {@link #isInsertIntoTable(String)}
+   */
   public Map<String, ASTNode> getInsertOverwriteTables() {
     return insertOverwriteTables;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/SelectClauseParser.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SelectClauseParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/SelectClauseParser.g
index 3c6fa39..2c2e856 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SelectClauseParser.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SelectClauseParser.g
@@ -19,7 +19,7 @@ parser grammar SelectClauseParser;
 options
 {
 output=AST;
-ASTLabelType=CommonTree;
+ASTLabelType=ASTNode;
 backtrack=false;
 k=3;
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 17dfd03..8f5542b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.hive.conf.HiveConf.StrictChecks;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -72,6 +73,7 @@ import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
 import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryProperties;
 import org.apache.hadoop.hive.ql.QueryState;
@@ -679,17 +681,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     this.ast = newAST;
   }
 
-  /**
-   * Goes though the tabref tree and finds the alias for the table. Once found,
-   * it records the table name-> alias association in aliasToTabs. It also makes
-   * an association from the alias to the table AST in parse info.
-   *
-   * @return the alias of the table
-   */
-  private String processTable(QB qb, ASTNode tabref) throws SemanticException {
-    // For each table reference get the table name
-    // and the alias (if alias is not present, the table name
-    // is used as an alias)
+  int[] findTabRefIdxs(ASTNode tabref) {
+    assert tabref.getType() == HiveParser.TOK_TABREF;
     int aliasIndex = 0;
     int propsIndex = -1;
     int tsampleIndex = -1;
@@ -706,11 +699,12 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         aliasIndex = index;
       }
     }
-
+    return new int[] {aliasIndex, propsIndex, tsampleIndex, ssampleIndex};
+  }
+  String findSimpleTableName(ASTNode tabref, int aliasIndex) {
+    assert tabref.getType() == HiveParser.TOK_TABREF;
     ASTNode tableTree = (ASTNode) (tabref.getChild(0));
 
-    String tabIdName = getUnescapedName(tableTree).toLowerCase();
-
     String alias;
     if (aliasIndex != 0) {
       alias = unescapeIdentifier(tabref.getChild(aliasIndex).getText());
@@ -718,6 +712,30 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     else {
       alias = getUnescapedUnqualifiedTableName(tableTree);
     }
+    return alias;
+  }
+  /**
+   * Goes though the tabref tree and finds the alias for the table. Once found,
+   * it records the table name-> alias association in aliasToTabs. It also makes
+   * an association from the alias to the table AST in parse info.
+   *
+   * @return the alias of the table
+   */
+  private String processTable(QB qb, ASTNode tabref) throws SemanticException {
+    // For each table reference get the table name
+    // and the alias (if alias is not present, the table name
+    // is used as an alias)
+    int[] indexes = findTabRefIdxs(tabref);
+    int aliasIndex = indexes[0];
+    int propsIndex = indexes[1];
+    int tsampleIndex = indexes[2];
+    int ssampleIndex = indexes[3];
+
+    ASTNode tableTree = (ASTNode) (tabref.getChild(0));
+
+    String tabIdName = getUnescapedName(tableTree).toLowerCase();
+
+    String alias = findSimpleTableName(tabref, aliasIndex);
 
     if (propsIndex >= 0) {
       Tree propsAST = tabref.getChild(propsIndex);
@@ -1423,7 +1441,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         doPhase1GetColumnAliasesFromSelect(ast, qbp);
         qbp.setAggregationExprsForClause(ctx_1.dest, aggregations);
         qbp.setDistinctFuncExprsForClause(ctx_1.dest,
-        doPhase1GetDistinctFuncExprs(aggregations));
+          doPhase1GetDistinctFuncExprs(aggregations));
         break;
 
       case HiveParser.TOK_WHERE:
@@ -1438,7 +1456,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         qbp.addInsertIntoTable(tab_name, ast);
 
       case HiveParser.TOK_DESTINATION:
-        ctx_1.dest = "insclause-" + ctx_1.nextNum;
+        ctx_1.dest = this.ctx.getDestNamePrefix(ast).toString() + ctx_1.nextNum;
         ctx_1.nextNum++;
         boolean isTmpFileDest = false;
         if (ast.getChildCount() > 0 && ast.getChild(0) instanceof ASTNode) {
@@ -1945,25 +1963,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
           throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(alias));
         }
       }
-
-      // Disallow INSERT INTO on bucketized tables
-      boolean isAcid = AcidUtils.isAcidTable(tab);
-      boolean isTableWrittenTo = qb.getParseInfo().isInsertIntoTable(tab.getDbName(), tab.getTableName());
-      if (isTableWrittenTo &&
-          tab.getNumBuckets() > 0 && !isAcid) {
-        throw new SemanticException(ErrorMsg.INSERT_INTO_BUCKETIZED_TABLE.
-            getMsg("Table: " + tabName));
-      }
-      // Disallow update and delete on non-acid tables
-      if ((updating() || deleting()) && !isAcid && isTableWrittenTo) {
-        //isTableWrittenTo: delete from acidTbl where a in (select id from nonAcidTable)
-        //so only assert this if we are actually writing to this table
-        // Whether we are using an acid compliant transaction manager has already been caught in
-        // UpdateDeleteSemanticAnalyzer, so if we are updating or deleting and getting nonAcid
-        // here, it means the table itself doesn't support it.
-        throw new SemanticException(ErrorMsg.ACID_OP_ON_NONACID_TABLE, tabName);
-      }
-
      if (tab.isView()) {
         if (qb.getParseInfo().isAnalyzeCommand()) {
           throw new SemanticException(ErrorMsg.ANALYZE_VIEW.getMsg());
@@ -2086,6 +2085,21 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
                 .getMsg(ast, "The class is " + outputFormatClass.toString()));
           }
 
+          boolean isTableWrittenTo = qb.getParseInfo().isInsertIntoTable(ts.tableHandle.getDbName(),
+            ts.tableHandle.getTableName());
+          isTableWrittenTo |= (qb.getParseInfo().getInsertOverwriteTables().
+            get(getUnescapedName((ASTNode) ast.getChild(0), ts.tableHandle.getDbName())) != null);
+          assert isTableWrittenTo :
+            "Inconsistent data structure detected: we are writing to " + ts.tableHandle  + " in " +
+              name + " but it's not in isInsertIntoTable() or getInsertOverwriteTables()";
+          // Disallow update and delete on non-acid tables
+          boolean isAcid = AcidUtils.isAcidTable(ts.tableHandle);
+          if ((updating(name) || deleting(name)) && !isAcid) {
+            // Whether we are using an acid compliant transaction manager has already been caught in
+            // UpdateDeleteSemanticAnalyzer, so if we are updating or deleting and getting nonAcid
+            // here, it means the table itself doesn't support it.
+            throw new SemanticException(ErrorMsg.ACID_OP_ON_NONACID_TABLE, ts.tableName);
+          }
           // TableSpec ts is got from the query (user specified),
           // which means the user didn't specify partitions in their query,
           // but whether the table itself is partitioned is not know.
@@ -6421,7 +6435,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
 
     if (dest_tab.getNumBuckets() > 0) {
       enforceBucketing = true;
-      if (updating() || deleting()) {
+      if (updating(dest) || deleting(dest)) {
         partnCols = getPartitionColsFromBucketColsForUpdateDelete(input, true);
       } else {
         partnCols = getPartitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input, true);
@@ -6472,7 +6486,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         nullOrder.append(sortOrder == BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC ? 'a' : 'z');
       }
       input = genReduceSinkPlan(input, partnCols, sortCols, order.toString(), nullOrder.toString(),
-              maxReducers, (AcidUtils.isAcidTable(dest_tab) ? getAcidType() : AcidUtils.Operation.NOT_ACID));
+              maxReducers, (AcidUtils.isAcidTable(dest_tab) ? getAcidType(dest) : AcidUtils.Operation.NOT_ACID));
       reduceSinkOperatorsAddedByEnforceBucketingSorting.add((ReduceSinkOperator)input.getParentOperators().get(0));
       ctx.setMultiFileSpray(multiFileSpray);
       ctx.setNumFiles(numFiles);
@@ -6488,7 +6502,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
 
     if ((dest_tab.getNumBuckets() > 0)) {
       enforceBucketing = true;
-      if (updating() || deleting()) {
+      if (updating(dest) || deleting(dest)) {
         partnColsNoConvert = getPartitionColsFromBucketColsForUpdateDelete(input, false);
       } else {
         partnColsNoConvert = getPartitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input,
@@ -6646,7 +6660,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       if (!isNonNativeTable) {
         AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID;
         if (destTableIsAcid) {
-          acidOp = getAcidType(table_desc.getOutputFileFormatClass());
+          acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest);
           checkAcidConstraints(qb, table_desc, dest_tab);
         }
         ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp);
@@ -6666,7 +6680,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       // in the case of DP, we will register WriteEntity in MoveTask when the
       // list of dynamically created partitions are known.
       if ((dpCtx == null || dpCtx.getNumDPCols() == 0)) {
-        output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable));
+        output = new WriteEntity(dest_tab,  determineWriteType(ltd, isNonNativeTable, dest));
         if (!outputs.add(output)) {
           throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES
               .getMsg(dest_tab.getTableName()));
@@ -6675,8 +6689,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       if ((dpCtx != null) && (dpCtx.getNumDPCols() >= 0)) {
         // No static partition specified
         if (dpCtx.getNumSPCols() == 0) {
-          output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable), false);
+          output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable, dest), false);
           outputs.add(output);
+          output.setDynamicPartitionWrite(true);
         }
         // part of the partition specified
         // Create a DummyPartition in this case. Since, the metastore does not store partial
@@ -6689,7 +6704,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
                 new DummyPartition(dest_tab, dest_tab.getDbName()
                     + "@" + dest_tab.getTableName() + "@" + ppath,
                     partSpec);
-            output = new WriteEntity(p, getWriteType(), false);
+            output = new WriteEntity(p, getWriteType(dest), false);
+            output.setDynamicPartitionWrite(true);
             outputs.add(output);
           } catch (HiveException e) {
             throw new SemanticException(e.getMessage(), e);
@@ -6753,7 +6769,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
           dest_part.isStoredAsSubDirectories(), conf);
       AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID;
       if (destTableIsAcid) {
-        acidOp = getAcidType(table_desc.getOutputFileFormatClass());
+        acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest);
         checkAcidConstraints(qb, table_desc, dest_tab);
       }
       ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp);
@@ -6762,9 +6778,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       ltd.setLbCtx(lbCtx);
 
       loadTableWork.add(ltd);
-
       if (!outputs.add(new WriteEntity(dest_part,
-        determineWriteType(ltd, dest_tab.isNonNative())))) {
+        determineWriteType(ltd, dest_tab.isNonNative(), dest)))) {
+
         throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES
             .getMsg(dest_tab.getTableName() + "@" + dest_part.getName()));
       }
@@ -6925,7 +6941,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
 
     ArrayList<ColumnInfo> vecCol = new ArrayList<ColumnInfo>();
 
-    if (updating() || deleting()) {
+    if (updating(dest) || deleting(dest)) {
       vecCol.add(new ColumnInfo(VirtualColumn.ROWID.getName(), VirtualColumn.ROWID.getTypeInfo(),
           "", true));
     } else {
@@ -6978,8 +6994,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     // If this is an insert, update, or delete on an ACID table then mark that so the
     // FileSinkOperator knows how to properly write to it.
     if (destTableIsAcid) {
-      AcidUtils.Operation wt = updating() ? AcidUtils.Operation.UPDATE :
-          (deleting() ? AcidUtils.Operation.DELETE : AcidUtils.Operation.INSERT);
+      AcidUtils.Operation wt = updating(dest) ? AcidUtils.Operation.UPDATE :
+          (deleting(dest) ? AcidUtils.Operation.DELETE : AcidUtils.Operation.INSERT);
       fileSinkDesc.setWriteType(wt);
       acidFileSinks.add(fileSinkDesc);
     }
@@ -7150,7 +7166,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     }
 
     // The numbers of input columns and output columns should match for regular query
-    if (!updating() && !deleting() && inColumnCnt != outColumnCnt) {
+    if (!updating(dest) && !deleting(dest) && inColumnCnt != outColumnCnt) {
       String reason = "Table " + dest + " has " + outColumnCnt
           + " columns, but query has " + inColumnCnt + " columns.";
       throw new SemanticException(ErrorMsg.TARGET_TABLE_COLUMN_MISMATCH.getMsg(
@@ -7169,18 +7185,18 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         MetadataTypedColumnsetSerDe.class);
     boolean isLazySimpleSerDe = table_desc.getDeserializerClass().equals(
         LazySimpleSerDe.class);
-    if (!isMetaDataSerDe && !deleting()) {
+    if (!isMetaDataSerDe && !deleting(dest)) {
 
       // If we're updating, add the ROW__ID expression, then make the following column accesses
       // offset by 1 so that we don't try to convert the ROW__ID
-      if (updating()) {
+      if (updating(dest)) {
         expressions.add(new ExprNodeColumnDesc(rowFields.get(0).getType(),
             rowFields.get(0).getInternalName(), "", true));
       }
 
       // here only deals with non-partition columns. We deal with partition columns next
       for (int i = 0; i < columnNumber; i++) {
-        int rowFieldsOffset = updating() ? i + 1 : i;
+        int rowFieldsOffset = updating(dest) ? i + 1 : i;
         ObjectInspector tableFieldOI = tableFields.get(i)
             .getFieldObjectInspector();
         TypeInfo tableFieldTypeInfo = TypeInfoUtils
@@ -7218,7 +7234,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     // deal with dynamic partition columns: convert ExprNodeDesc type to String??
     if (dynPart && dpCtx != null && dpCtx.getNumDPCols() > 0) {
       // DP columns starts with tableFields.size()
-      for (int i = tableFields.size() + (updating() ? 1 : 0); i < rowFields.size(); ++i) {
+      for (int i = tableFields.size() + (updating(dest) ? 1 : 0); i < rowFields.size(); ++i) {
         TypeInfo rowFieldTypeInfo = rowFields.get(i).getType();
         ExprNodeDesc column = new ExprNodeColumnDesc(
             rowFieldTypeInfo, rowFields.get(i).getInternalName(), "", true);
@@ -10525,7 +10541,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
   }
 
   /**
-   * Planner specific stuff goen in here.
+   * Planner specific stuff goes in here.
    */
   static class PlannerContext {
     protected ASTNode   child;
@@ -13045,18 +13061,19 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     }
   }
 
-  private WriteEntity.WriteType determineWriteType(LoadTableDesc ltd, boolean isNonNativeTable) {
+  private WriteEntity.WriteType determineWriteType(LoadTableDesc ltd, boolean isNonNativeTable, String dest) {
     // Don't know the characteristics of non-native tables,
     // and don't have a rational way to guess, so assume the most
     // conservative case.
     if (isNonNativeTable) return WriteEntity.WriteType.INSERT_OVERWRITE;
-    else return (ltd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE : getWriteType());
-  }
-  private WriteEntity.WriteType getWriteType() {
-    return updating() ? WriteEntity.WriteType.UPDATE :
-      (deleting() ? WriteEntity.WriteType.DELETE : WriteEntity.WriteType.INSERT);
+    else return (ltd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE :
+      getWriteType(dest));
   }
 
+  private WriteEntity.WriteType getWriteType(String dest) {
+    return updating(dest) ? WriteEntity.WriteType.UPDATE :
+      (deleting(dest) ? WriteEntity.WriteType.DELETE : WriteEntity.WriteType.INSERT);
+  }
   private boolean isAcidOutputFormat(Class<? extends OutputFormat> of) {
     Class<?>[] interfaces = of.getInterfaces();
     for (Class<?> iface : interfaces) {
@@ -13069,28 +13086,28 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
 
   // Note that this method assumes you have already decided this is an Acid table.  It cannot
   // figure out if a table is Acid or not.
-  private AcidUtils.Operation getAcidType() {
-    return deleting() ? AcidUtils.Operation.DELETE :
-        (updating() ? AcidUtils.Operation.UPDATE :
+  private AcidUtils.Operation getAcidType(String destination) {
+    return deleting(destination) ? AcidUtils.Operation.DELETE :
+        (updating(destination) ? AcidUtils.Operation.UPDATE :
             AcidUtils.Operation.INSERT);
   }
 
-  private AcidUtils.Operation getAcidType(Class<? extends OutputFormat> of) {
+  private AcidUtils.Operation getAcidType(Class<? extends OutputFormat> of, String dest) {
     if (SessionState.get() == null || !SessionState.get().getTxnMgr().supportsAcid()) {
       return AcidUtils.Operation.NOT_ACID;
     } else if (isAcidOutputFormat(of)) {
-      return getAcidType();
+      return getAcidType(dest);
     } else {
       return AcidUtils.Operation.NOT_ACID;
     }
   }
 
-  protected boolean updating() {
-    return false;
+  protected boolean updating(String destination) {
+    return destination.startsWith(Context.DestClausePrefix.UPDATE.toString());
   }
 
-  protected boolean deleting() {
-    return false;
+  protected boolean deleting(String destination) {
+    return destination.startsWith(Context.DestClausePrefix.DELETE.toString());
   }
 
   // Make sure the proper transaction manager that supports ACID is being used

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
index 4f0ead0..ed01a31 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
@@ -296,6 +296,7 @@ public final class SemanticAnalyzerFactory {
 
       case HiveParser.TOK_UPDATE_TABLE:
       case HiveParser.TOK_DELETE_FROM:
+      case HiveParser.TOK_MERGE:
         return new UpdateDeleteSemanticAnalyzer(queryState);
 
       case HiveParser.TOK_START_TRANSACTION: