You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2019/02/04 20:29:29 UTC

[hive] branch master updated: HIVE-21159 Modify Merge statement logic to perform Update split early (Eugene Koifman, reviewed by Vaibhav Gumashta)

This is an automated email from the ASF dual-hosted git repository.

ekoifman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a4b9ca  HIVE-21159 Modify Merge statement logic to perform Update split early (Eugene Koifman, reviewed by Vaibhav Gumashta)
4a4b9ca is described below

commit 4a4b9ca87ed41e6390ff376c9d6529e7db6d60ef
Author: Eugene Koifman <ek...@apache.org>
AuthorDate: Mon Feb 4 12:29:05 2019 -0800

    HIVE-21159 Modify Merge statement logic to perform Update split early (Eugene Koifman, reviewed by Vaibhav Gumashta)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |  5 ++
 ql/src/java/org/apache/hadoop/hive/ql/Context.java | 10 ++-
 .../hive/ql/parse/MergeSemanticAnalyzer.java       | 91 ++++++++++++++++------
 .../hive/ql/parse/RewriteSemanticAnalyzer.java     |  2 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java     | 12 ++-
 .../org/apache/hadoop/hive/ql/TestTxnCommands.java | 26 +++++--
 .../apache/hadoop/hive/ql/TestTxnCommands2.java    |  1 +
 .../hadoop/hive/ql/TxnCommandsBaseForTests.java    |  1 +
 .../hadoop/hive/ql/lockmgr/TestDbTxnManager2.java  | 71 +++++++++++------
 .../hadoop/hive/metastore/txn/TxnHandler.java      | 14 +++-
 10 files changed, 170 insertions(+), 63 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 93ecb8f..414070e 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2754,6 +2754,11 @@ public class HiveConf extends Configuration {
     MERGE_CARDINALITY_VIOLATION_CHECK("hive.merge.cardinality.check", true,
       "Set to true to ensure that each SQL Merge statement ensures that for each row in the target\n" +
         "table there is at most 1 matching row in the source table per SQL Specification."),
+    MERGE_SPLIT_UPDATE("hive.merge.split.update", false,
+        "If true, SQL Merge statement will handle WHEN MATCHED UPDATE by splitting it into 2\n" +
+            "branches of a multi-insert, representing delete of existing row and an insert of\n" +
+            "the new version of the row.  Updating bucketing and partitioning columns should\n" +
+            "only be permitted if this is true."),
     OPTIMIZE_ACID_META_COLUMNS("hive.optimize.acid.meta.columns", true,
         "If true, don't decode Acid metadata columns from storage unless" +
         " they are needed."),
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index 18089d5..b11d5a1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -200,7 +200,13 @@ public class Context {
     return getTokenRewriteStream().toString(n.getTokenStartIndex(), n.getTokenStopIndex() + 1).trim();
   }
   /**
-   * The suffix is always relative to a given ASTNode
+   * The suffix is always relative to a given ASTNode.
+   * We need this so that FileSinkOperatorS corresponding to different branches of a multi-insert
+   * statement which represents a SQL Merge statement get marked correctly with
+   * {@link org.apache.hadoop.hive.ql.io.AcidUtils.Operation}.  See usages
+   * of {@link #getDestNamePrefix(ASTNode, QB)} and
+   * {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer#updating(String)} and
+   * {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer#deleting(String)}.
    */
   public DestClausePrefix getDestNamePrefix(ASTNode curNode, QB queryBlock) {
     assert curNode != null : "must supply curNode";
@@ -255,7 +261,7 @@ public class Context {
       case DELETE:
         return DestClausePrefix.DELETE;
       case MERGE:
-      /* This is the structrue expected here
+      /* This is the structure expected here
         HiveParser.TOK_QUERY;
           HiveParser.TOK_FROM
           HiveParser.TOK_INSERT;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java
index 44f7b43..db6d551 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java
@@ -64,7 +64,7 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
 
   private static final String INDENT = "  ";
 
-  private IdentifierQuoter quotedIdenfierHelper;
+  private IdentifierQuoter quotedIdentifierHelper;
 
   /**
    * This allows us to take an arbitrary ASTNode and turn it back into SQL that produced it.
@@ -112,7 +112,7 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
    * needing to understand what it is (except for QuotedIdentifiers).
    */
   private String getMatchedText(ASTNode n) {
-    quotedIdenfierHelper.visit(n);
+    quotedIdentifierHelper.visit(n);
     return ctx.getTokenRewriteStream().toString(n.getTokenStartIndex(),
       n.getTokenStopIndex() + 1).trim();
   }
@@ -130,7 +130,7 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
    * @throws SemanticException
    */
   private void analyzeMerge(ASTNode tree) throws SemanticException {
-    quotedIdenfierHelper = new IdentifierQuoter(ctx.getTokenRewriteStream());
+    quotedIdentifierHelper = new IdentifierQuoter(ctx.getTokenRewriteStream());
     /*
      * See org.apache.hadoop.hive.ql.parse.TestMergeStatement for some examples of the merge AST
       For example, given:
@@ -161,7 +161,7 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
      - VALUES clause must have the same number of values as target table (including partition cols).  Part cols go last
      in Select clause of Insert as Select
      todo: do we care to preserve comments in original SQL?
-     todo: check if identifiers are propertly escaped/quoted in the generated SQL - it's currently inconsistent
+     todo: check if identifiers are properly escaped/quoted in the generated SQL - it's currently inconsistent
       Look at UnparseTranslator.addIdentifierTranslation() - it does unescape + unparse...
      todo: consider "WHEN NOT MATCHED BY SOURCE THEN UPDATE SET TargetTable.Col1 = SourceTable.Col1 "; what happens when
      source is empty?  This should be a runtime error - maybe not the outer side of ROJ is empty => the join produces 0
@@ -211,7 +211,8 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
     if (hasHint) {
       hintStr = " /*+ " + qHint.getText() + " */ ";
     }
-
+    final boolean splitUpdateEarly =
+        HiveConf.getBoolVar(conf, HiveConf.ConfVars.MERGE_SPLIT_UPDATE);
     /**
      * We allow at most 2 WHEN MATCHED clause, in which case 1 must be Update the other Delete
      * If we have both update and delete, the 1st one (in SQL code) must have "AND <extra predicate>"
@@ -233,7 +234,8 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
       case HiveParser.TOK_UPDATE:
         numWhenMatchedUpdateClauses++;
         String s = handleUpdate(whenClause, rewrittenQueryStr, target,
-            onClauseAsText, targetTable, extraPredicate, hintProcessed ? null : hintStr);
+            onClauseAsText, targetTable, extraPredicate, hintProcessed ? null : hintStr,
+            splitUpdateEarly);
         hintProcessed = true;
         if (numWhenMatchedUpdateClauses + numWhenMatchedDeleteClauses == 1) {
           extraPredicate = s; //i.e. it's the 1st WHEN MATCHED
@@ -242,7 +244,7 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
       case HiveParser.TOK_DELETE:
         numWhenMatchedDeleteClauses++;
         String s1 = handleDelete(whenClause, rewrittenQueryStr, target,
-            onClauseAsText, targetTable, extraPredicate, hintProcessed ? null : hintStr);
+            onClauseAsText, targetTable, extraPredicate, hintProcessed ? null : hintStr, false);
         hintProcessed = true;
         if (numWhenMatchedUpdateClauses + numWhenMatchedDeleteClauses == 1) {
           extraPredicate = s1; //i.e. it's the 1st WHEN MATCHED
@@ -271,7 +273,7 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
     ASTNode rewrittenTree = rr.rewrittenTree;
     rewrittenCtx.setOperation(Context.Operation.MERGE);
 
-    //set dest name mapping on new context; 1st chid is TOK_FROM
+    //set dest name mapping on new context; 1st child is TOK_FROM
     for (int insClauseIdx = 1, whenClauseIdx = 0;
         insClauseIdx < rewrittenTree.getChildCount() - (validating ? 1 : 0/*skip cardinality violation clause*/);
         insClauseIdx++, whenClauseIdx++) {
@@ -281,7 +283,21 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
         rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.INSERT);
         break;
       case HiveParser.TOK_UPDATE:
-        rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.UPDATE);
+        if(!splitUpdateEarly) {
+          rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.UPDATE);
+        } else {
+        /* With 2 branches for the update, the 1st branch is the INSERT part
+          and the next one is the DELETE.  WriteSet tracking treats 2 concurrent DELETES
+          as in conflict so Lost Update is still prevented since the delete event lands in the
+          partition/bucket where the original version of the row was so any concurrent update/delete
+          of the same row will land in the same partition/bucket.
+
+          If the insert part lands in a different partition, it should not conflict with another
+          Update of that partition since that update by definition cannot be of the same row.
+          If we ever enforce unique constraints we may have to have I+I be in conflict*/
+          rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.INSERT);
+          rewrittenCtx.addDestNamePrefix(++insClauseIdx, Context.DestClausePrefix.DELETE);
+        }
         break;
       case HiveParser.TOK_DELETE:
         rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.DELETE);
@@ -339,7 +355,7 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
     //this is a tmp table and thus Session scoped and acid requires SQL statement to be serial in a
     // given session, i.e. the name can be fixed across all invocations
     String tableName = "merge_tmp_table";
-    rewrittenQueryStr.append("\nINSERT INTO ").append(tableName)
+    rewrittenQueryStr.append("INSERT INTO ").append(tableName)
       .append("\n  SELECT cardinality_violation(")
       .append(getSimpleTableName(target)).append(".ROW__ID");
     addPartitionColsToSelect(targetTable.getPartCols(), rewrittenQueryStr, target);
@@ -381,22 +397,25 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
    * @param deleteExtraPredicate - see notes at caller
    */
   private String handleUpdate(ASTNode whenMatchedUpdateClause, StringBuilder rewrittenQueryStr, ASTNode target,
-      String onClauseAsString, Table targetTable, String deleteExtraPredicate, String hintStr)
+      String onClauseAsString, Table targetTable, String deleteExtraPredicate, String hintStr, boolean splitUpdateEarly)
       throws SemanticException {
     assert whenMatchedUpdateClause.getType() == HiveParser.TOK_MATCHED;
     assert getWhenClauseOperation(whenMatchedUpdateClause).getType() == HiveParser.TOK_UPDATE;
     String targetName = getSimpleTableName(target);
     rewrittenQueryStr.append("INSERT INTO ").append(getFullTableNameForSQL(target));
     addPartitionColsToInsert(targetTable.getPartCols(), rewrittenQueryStr);
-    rewrittenQueryStr.append("    -- update clause\n SELECT ");
+    rewrittenQueryStr.append("    -- update clause").append(splitUpdateEarly ? "(insert part)": "")
+        .append("\n SELECT ");
     if (hintStr != null) {
       rewrittenQueryStr.append(hintStr);
     }
-    rewrittenQueryStr.append(targetName).append(".ROW__ID");
+    if(!splitUpdateEarly) {
+      rewrittenQueryStr.append(targetName).append(".ROW__ID, ");
+    }
 
     ASTNode setClause = (ASTNode)getWhenClauseOperation(whenMatchedUpdateClause).getChild(0);
     //columns being updated -> update expressions; "setRCols" (last param) is null because we use actual expressions
-    //before reparsing, i.e. they are known to SemanticAnalyzer logic
+    //before re-parsing, i.e. they are known to SemanticAnalyzer logic
     Map<String, ASTNode> setColsExprs = collectSetColumnsAndExpressions(setClause, null, targetTable);
     //if target table has cols c1,c2,c3 and p1 partition col and we had "SET c2 = 5, c1 = current_date()" we want to end
     //up with
@@ -404,8 +423,11 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
     //since we take the RHS of set exactly as it was in Input, we don't need to deal with quoting/escaping column/table
     //names
     List<FieldSchema> nonPartCols = targetTable.getCols();
-    for (FieldSchema fs : nonPartCols) {
-      rewrittenQueryStr.append(", ");
+    for(int i = 0; i < nonPartCols.size(); i++) {
+      FieldSchema fs = nonPartCols.get(i);
+      if(i > 0) {
+        rewrittenQueryStr.append(", ");
+      }
       String name = fs.getName();
       if (setColsExprs.containsKey(name)) {
         String rhsExp = getMatchedText(setColsExprs.get(name));
@@ -435,14 +457,26 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
     if (deleteExtraPredicate != null) {
       rewrittenQueryStr.append(" AND NOT(").append(deleteExtraPredicate).append(")");
     }
-    rewrittenQueryStr.append("\n SORT BY ");
-    rewrittenQueryStr.append(targetName).append(".ROW__ID \n");
+    if(!splitUpdateEarly) {
+      rewrittenQueryStr.append("\n SORT BY ");
+      rewrittenQueryStr.append(targetName).append(".ROW__ID ");
+    }
+    rewrittenQueryStr.append("\n");
 
     setUpAccessControlInfoForUpdate(targetTable, setColsExprs);
     //we don't deal with columns on RHS of SET expression since the whole expr is part of the
-    //rewritten SQL statement and is thus handled by SemanticAnalzyer.  Nor do we have to
+    //rewritten SQL statement and is thus handled by SemanticAnalyzer.  Nor do we have to
     //figure which cols on RHS are from source and which from target
 
+    if(splitUpdateEarly) {
+      /**
+       * this is part of the WHEN MATCHED UPDATE, so we ignore any 'extra predicate' generated
+       * by this call to handleDelete()
+       */
+      handleDelete(whenMatchedUpdateClause, rewrittenQueryStr, target, onClauseAsString,
+          targetTable, deleteExtraPredicate, hintStr, true);
+    }
+
     return extraPredicate;
   }
 
@@ -450,17 +484,23 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
    * @param onClauseAsString - because there is no clone() and we need to use in multiple places
    * @param updateExtraPredicate - see notes at caller
    */
-  private String handleDelete(ASTNode whenMatchedDeleteClause, StringBuilder rewrittenQueryStr, ASTNode target,
-      String onClauseAsString, Table targetTable, String updateExtraPredicate, String hintStr)
-      throws SemanticException {
+  private String handleDelete(ASTNode whenMatchedDeleteClause, StringBuilder rewrittenQueryStr,
+      ASTNode target, String onClauseAsString, Table targetTable, String updateExtraPredicate,
+      String hintStr, boolean splitUpdateEarly) throws SemanticException {
     assert whenMatchedDeleteClause.getType() == HiveParser.TOK_MATCHED;
-    assert getWhenClauseOperation(whenMatchedDeleteClause).getType() == HiveParser.TOK_DELETE;
+    assert (splitUpdateEarly &&
+        getWhenClauseOperation(whenMatchedDeleteClause).getType() == HiveParser.TOK_UPDATE) ||
+        getWhenClauseOperation(whenMatchedDeleteClause).getType() == HiveParser.TOK_DELETE;
     List<FieldSchema> partCols = targetTable.getPartCols();
     String targetName = getSimpleTableName(target);
     rewrittenQueryStr.append("INSERT INTO ").append(getFullTableNameForSQL(target));
     addPartitionColsToInsert(partCols, rewrittenQueryStr);
 
-    rewrittenQueryStr.append("    -- delete clause\n SELECT ");
+    if(splitUpdateEarly) {
+      rewrittenQueryStr.append("    -- update clause (delete part)\n SELECT ");
+    } else {
+      rewrittenQueryStr.append("    -- delete clause\n SELECT ");
+    }
     if (hintStr != null) {
       rewrittenQueryStr.append(hintStr);
     }
@@ -590,8 +630,9 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
     if (extraPredicate != null) {
       //we have WHEN NOT MATCHED AND <boolean expr> THEN INSERT
       rewrittenQueryStr.append(" AND ")
-        .append(getMatchedText(((ASTNode)whenNotMatchedClause.getChild(1)))).append('\n');
+        .append(getMatchedText(((ASTNode)whenNotMatchedClause.getChild(1))));
     }
+    rewrittenQueryStr.append('\n');
   }
 
   private String replaceDefaultKeywordForMerge(String valueClause, Table table, ASTNode columnListNode)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java
index 6caac11..33247f0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java
@@ -221,7 +221,7 @@ public abstract class RewriteSemanticAnalyzer extends SemanticAnalyzer {
    *  For updates, we need to set the column access info so that it contains information on
    *  the columns we are updating.
    *  (But not all the columns of the target table even though the rewritten query writes
-   *  all columns of target table since that is an implmentation detail).
+   *  all columns of target table since that is an implementation detail).
    */
   protected void setUpAccessControlInfoForUpdate(Table mTable, Map<String, ASTNode> setCols) {
     ColumnAccessInfo cai = new ColumnAccessInfo();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 706fa55..0d0196e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -7971,8 +7971,16 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     if ((dpCtx == null || dpCtx.getNumDPCols() == 0)) {
       output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable, dest));
       if (!outputs.add(output)) {
-        throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES
-            .getMsg(dest_tab.getTableName()));
+        if(!((this instanceof MergeSemanticAnalyzer) &&
+            conf.getBoolVar(ConfVars.MERGE_SPLIT_UPDATE))) {
+          /**
+           * Merge stmt with early split update may create several (2) writes to the same
+           * table with the same {@link WriteType}, e.g. if original Merge stmt has both update and
+           * delete clauses, and update is split into insert + delete, in which case it's not an
+           * error*/
+          throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES
+              .getMsg(dest_tab.getTableName()));
+        }
       }
     }
 
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index c2b931a..33c25ed 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -970,13 +970,29 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
       sb.append(s).append('\n');
     }
     LOG.info("Explain1: " + sb);
+    /*
+     Edges:
+     Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 8 (SIMPLE_EDGE)
+     Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
+     Reducer 4 <- Reducer 2 (SIMPLE_EDGE)
+     Reducer 5 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
+     Reducer 6 <- Reducer 2 (SIMPLE_EDGE)
+     Reducer 7 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
+     */
     for(int i = 0; i < explain.size(); i++) {
       if(explain.get(i).contains("Edges:")) {
-        Assert.assertTrue("At i+1=" + (i+1) + explain.get(i + 1), explain.get(i + 1).contains("Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 7 (SIMPLE_EDGE)"));
-        Assert.assertTrue("At i+1=" + (i+2) + explain.get(i + 2), explain.get(i + 2).contains("Reducer 3 <- Reducer 2 (SIMPLE_EDGE)"));
-        Assert.assertTrue("At i+1=" + (i+3) + explain.get(i + 3), explain.get(i + 3).contains("Reducer 4 <- Reducer 2 (SIMPLE_EDGE)"));
-        Assert.assertTrue("At i+1=" + (i+4) + explain.get(i + 4), explain.get(i + 4).contains("Reducer 5 <- Reducer 2 (SIMPLE_EDGE)"));
-        Assert.assertTrue("At i+1=" + (i+5) + explain.get(i + 5), explain.get(i + 5).contains("Reducer 6 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)"));
+        Assert.assertTrue("At i+1=" + (i+1) + explain.get(i + 1),
+            explain.get(i + 1).contains("Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 8 (SIMPLE_EDGE)"));
+        Assert.assertTrue("At i+1=" + (i+2) + explain.get(i + 2),
+            explain.get(i + 2).contains("Reducer 3 <- Reducer 2 (SIMPLE_EDGE)"));
+        Assert.assertTrue("At i+1=" + (i+3) + explain.get(i + 3),
+            explain.get(i + 3).contains("Reducer 4 <- Reducer 2 (SIMPLE_EDGE)"));
+        Assert.assertTrue("At i+1=" + (i+4) + explain.get(i + 4),
+            explain.get(i + 4).contains("Reducer 5 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)"));
+        Assert.assertTrue("At i+1=" + (i+5) + explain.get(i + 5),
+            explain.get(i + 5).contains("Reducer 6 <- Reducer 2 (SIMPLE_EDGE)"));
+        Assert.assertTrue("At i+1=" + (i+5) + explain.get(i + 5),
+            explain.get(i + 6).contains("Reducer 7 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)"));
         break;
       }
     }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 6f44e9b..0734ed9 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -147,6 +147,7 @@ public class TestTxnCommands2 {
         .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
             "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
     hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, true);
+    HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.MERGE_SPLIT_UPDATE, true);
     hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false);
     //TestTxnCommands2WithSplitUpdateAndVectorization has the vectorized version
     //of these tests.
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
index dc2963d..5f39fdc 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
@@ -100,6 +100,7 @@ public abstract class TxnCommandsBaseForTests {
       .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
         "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
     hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, true);
+    HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.MERGE_SPLIT_UPDATE, true);
     hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false);
     hiveConf.setBoolean("mapred.input.dir.recursive", true);
     TxnDbUtil.setConfValues(hiveConf);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index 5e085f8..43a3047 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.junit.Before;
+import org.junit.ComparisonFailure;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -1529,7 +1530,7 @@ public class TestDbTxnManager2 {
       3, TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null"));
   }
   /**
-   * Concurrent delete/detele of same partition - should pass
+   * Concurrent delete/delete of same partition - should NOT pass
    */
   @Test
   public void testWriteSetTracking11() throws Exception {
@@ -1584,18 +1585,27 @@ public class TestDbTxnManager2 {
       Collections.singletonList("p=two"));
     adp.setOperationType(DataOperationType.DELETE);
     txnHandler.addDynamicPartitions(adp);
-    txnMgr.commitTxn();//"select * from tab1" txn
-
-    Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"),
-      1, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=" + txnIdDelete));
-    Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"),
-      1, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=" + txnIdSelect));
-    Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"),
-      1, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=" + txnIdDelete));
-    Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"),
-      1, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=" + txnIdSelect));
+    LockException expectedException = null;
+    try {
+      txnMgr.commitTxn();//"select * from tab1" txn
+    }
+    catch(LockException ex) {
+      expectedException = ex;
+    }
+    Assert.assertNotNull("Didn't get expected d/d conflict", expectedException);
+    Assert.assertEquals("Transaction manager has aborted the transaction txnid:5.  " +
+        "Reason: Aborting [txnid:5,5] due to a write conflict on default/tab1/p=two " +
+        "committed by [txnid:4,5] d/d", expectedException.getMessage());
+    Assert.assertEquals("WRITE_SET mismatch: " +
+            TxnDbUtil.queryToString(conf, "select * from WRITE_SET"),
+      1, TxnDbUtil.countQueryAgent(conf,
+            "select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=" + txnIdDelete));
+    Assert.assertEquals("WRITE_SET mismatch: " +
+            TxnDbUtil.queryToString(conf, "select * from WRITE_SET"),
+      0, TxnDbUtil.countQueryAgent(conf,
+            "select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=" + txnIdSelect));
     Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString(conf, "select * from COMPLETED_TXN_COMPONENTS"),
-      4, TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null"));
+      3, TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null"));
   }
   @Test
   public void testCompletedTxnComponents() throws Exception {
@@ -1706,8 +1716,8 @@ public class TestDbTxnManager2 {
       "(9,10,1,2),        (3,4,1,2), (11,12,1,3), (5,13,1,3), (7,8,2,2), (14,15,1,1)"));
     checkCmdOnDriver(driver.run("create table source2 (a int, b int, p int, q int)"));
     checkCmdOnDriver(driver.run("insert into source2 values " +
-  //cc ? -:U-(1/2)     D-(1/2)         cc ? U-(1/3):-             D-(2/2)       I-(1/1) - new part 2
-      "(9,100,1,2),      (3,4,1,2),               (5,13,1,3),       (7,8,2,2), (14,15,2,1)"));
+    //cc ? -:U-(1/2)     D-(1/2)         cc ? U-(1/3):-       D-(2/2)       I-(1/1) - new part 2
+      "(9,100,1,2),      (3,4,1,2),         (5,13,1,3),       (7,8,2,2), (14,15,2,1)"));
 
 
     checkCmdOnDriver(driver.compileAndRespond("merge into target t using source s on t.a=s.b " +
@@ -1729,7 +1739,7 @@ public class TestDbTxnManager2 {
     swapTxnManager(txnMgr2);
     checkCmdOnDriver(driver.compileAndRespond("merge into target t using source2 s on t.a=s.b " +
       "when matched and t.a=" + (cc ? 5 : 9) + " then update set b=s.b " + //if conflict updates p=1/q=3 else update p=1/q=2
-      "when matched and t.a in (3,7) then delete " + //deletes from p=1/q=2, p=2/q=2
+      "when matched and t.a in (" + (cc ? "3,7" : "11, 13")  + ") then delete " + //if cc deletes from p=1/q=2, p=2/q=2, else delete nothing
       "when not matched and t.a >= 8 then insert values(s.a, s.b, s.p, s.q)", true));//insert p=1/q=2, p=1/q=3 and new part 1/1
     long txnId2 = txnMgr2.getCurrentTxnId();
     txnMgr2.acquireLocks(driver.getPlan(), ctx, "T1", false);
@@ -1824,10 +1834,12 @@ public class TestDbTxnManager2 {
       Collections.singletonList(cc ? "p=1/q=3" : "p=1/p=2"));//update clause
     adp.setOperationType(DataOperationType.UPDATE);
     txnHandler.addDynamicPartitions(adp);
-    adp = new AddDynamicPartitions(txnId2, writeId, "default", "target",
-      Arrays.asList("p=1/q=2","p=2/q=2"));//delete clause
-    adp.setOperationType(DataOperationType.DELETE);
-    txnHandler.addDynamicPartitions(adp);
+    if(cc) {
+      adp = new AddDynamicPartitions(txnId2, writeId, "default", "target",
+          Arrays.asList("p=1/q=2", "p=2/q=2"));//delete clause
+      adp.setOperationType(DataOperationType.DELETE);
+      txnHandler.addDynamicPartitions(adp);
+    }
     adp = new AddDynamicPartitions(txnId2, writeId, "default", "target",
       Arrays.asList("p=1/q=2","p=1/q=3","p=1/q=1"));//insert clause
     adp.setOperationType(DataOperationType.INSERT);
@@ -1841,7 +1853,7 @@ public class TestDbTxnManager2 {
     Assert.assertEquals(
       "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " +
         TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"),
-      2,
+        (cc ? 2 : 0),
       TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId2 +
         " and tc_operation_type='d'"));
     Assert.assertEquals(
@@ -1860,9 +1872,18 @@ public class TestDbTxnManager2 {
     }
     if(cc) {
       Assert.assertNotNull("didn't get exception", expectedException);
-      Assert.assertEquals("Transaction manager has aborted the transaction txnid:11.  Reason: " +
-        "Aborting [txnid:11,11] due to a write conflict on default/target/p=1/q=3 " +
-        "committed by [txnid:10,11] u/u", expectedException.getMessage());
+      try {
+        Assert.assertEquals("Transaction manager has aborted the transaction txnid:11.  Reason: " +
+            "Aborting [txnid:11,11] due to a write conflict on default/target/p=1/q=3 " +
+            "committed by [txnid:10,11] u/u", expectedException.getMessage());
+      }
+      catch(ComparisonFailure ex) {
+        //the 2 txns have 2 conflicts between them so check for either failure since which one is
+        //reported (among the 2) is not deterministic
+        Assert.assertEquals("Transaction manager has aborted the transaction txnid:11.  Reason: " +
+            "Aborting [txnid:11,11] due to a write conflict on default/target/p=1/q=2 " +
+            "committed by [txnid:10,11] d/d", expectedException.getMessage());
+      }
       Assert.assertEquals(
         "COMPLETED_TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " +
           TxnDbUtil.queryToString(conf, "select * from COMPLETED_TXN_COMPONENTS"),
@@ -1879,7 +1900,7 @@ public class TestDbTxnManager2 {
       Assert.assertEquals(
         "COMPLETED_TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " +
           TxnDbUtil.queryToString(conf, "select * from COMPLETED_TXN_COMPONENTS"),
-        6,
+        4,
         TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + txnId2));
       Assert.assertEquals(
         "WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " +
@@ -1890,7 +1911,7 @@ public class TestDbTxnManager2 {
       Assert.assertEquals(
         "WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " +
           TxnDbUtil.queryToString(conf, "select * from WRITE_SET"),
-        2,
+        0,
         TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET where ws_txnid=" + txnId2 +
           " and ws_operation_type='d'"));
     }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 7520922..6df7680 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -1174,9 +1174,17 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
               " and cur.ws_txnid=" + txnid + //make sure RHS of join only has rows we just inserted as
               // part of this commitTxn() op
               " and committed.ws_txnid <> " + txnid + //and LHS only has committed txns
-              //U+U and U+D is a conflict but D+D is not and we don't currently track I in WRITE_SET at all
-              " and (committed.ws_operation_type=" + quoteChar(OperationType.UPDATE.sqlConst) +
-              " OR cur.ws_operation_type=" + quoteChar(OperationType.UPDATE.sqlConst) + ")"));
+              //U+U and U+D and D+D is a conflict and we don't currently track I in WRITE_SET at all
+                //it may seem like D+D should not be in conflict but consider 2 multi-stmt txns
+                //where each does "delete X + insert X, where X is a row with the same PK.  This is
+                //equivalent to an update of X but won't be in conflict unless D+D is in conflict.
+                //The same happens when Hive splits U=I+D early so it looks like 2 branches of a
+                //multi-insert stmt (an Insert and a Delete branch).  It also 'feels'
+                // un-serializable to allow concurrent deletes
+              " and (committed.ws_operation_type IN(" + quoteChar(OperationType.UPDATE.sqlConst) +
+                ", " + quoteChar(OperationType.DELETE.sqlConst) +
+              ") AND cur.ws_operation_type IN(" + quoteChar(OperationType.UPDATE.sqlConst) + ", "
+                + quoteChar(OperationType.DELETE.sqlConst) + "))"));
           if (rs.next()) {
             //found a conflict
             String committedTxn = "[" + JavaUtils.txnIdToString(rs.getLong(1)) + "," + rs.getLong(2) + "]";