You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2019/02/04 20:29:29 UTC
[hive] branch master updated: HIVE-21159 Modify Merge statement
logic to perform Update split early (Eugene Koifman,
reviewed by Vaibhav Gumashta)
This is an automated email from the ASF dual-hosted git repository.
ekoifman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 4a4b9ca HIVE-21159 Modify Merge statement logic to perform Update split early (Eugene Koifman, reviewed by Vaibhav Gumashta)
4a4b9ca is described below
commit 4a4b9ca87ed41e6390ff376c9d6529e7db6d60ef
Author: Eugene Koifman <ek...@apache.org>
AuthorDate: Mon Feb 4 12:29:05 2019 -0800
HIVE-21159 Modify Merge statement logic to perform Update split early (Eugene Koifman, reviewed by Vaibhav Gumashta)
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 5 ++
ql/src/java/org/apache/hadoop/hive/ql/Context.java | 10 ++-
.../hive/ql/parse/MergeSemanticAnalyzer.java | 91 ++++++++++++++++------
.../hive/ql/parse/RewriteSemanticAnalyzer.java | 2 +-
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 12 ++-
.../org/apache/hadoop/hive/ql/TestTxnCommands.java | 26 +++++--
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 1 +
.../hadoop/hive/ql/TxnCommandsBaseForTests.java | 1 +
.../hadoop/hive/ql/lockmgr/TestDbTxnManager2.java | 71 +++++++++++------
.../hadoop/hive/metastore/txn/TxnHandler.java | 14 +++-
10 files changed, 170 insertions(+), 63 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 93ecb8f..414070e 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2754,6 +2754,11 @@ public class HiveConf extends Configuration {
MERGE_CARDINALITY_VIOLATION_CHECK("hive.merge.cardinality.check", true,
"Set to true to ensure that each SQL Merge statement ensures that for each row in the target\n" +
"table there is at most 1 matching row in the source table per SQL Specification."),
+ MERGE_SPLIT_UPDATE("hive.merge.split.update", false,
+ "If true, SQL Merge statement will handle WHEN MATCHED UPDATE by splitting it into 2\n" +
+ "branches of a multi-insert, representing delete of existing row and an insert of\n" +
+ "the new version of the row. Updating bucketing and partitioning columns should\n" +
+ "only be permitted if this is true."),
OPTIMIZE_ACID_META_COLUMNS("hive.optimize.acid.meta.columns", true,
"If true, don't decode Acid metadata columns from storage unless" +
" they are needed."),
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index 18089d5..b11d5a1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -200,7 +200,13 @@ public class Context {
return getTokenRewriteStream().toString(n.getTokenStartIndex(), n.getTokenStopIndex() + 1).trim();
}
/**
- * The suffix is always relative to a given ASTNode
+ * The suffix is always relative to a given ASTNode.
+ * We need this so that FileSinkOperatorS corresponding to different branches of a multi-insert
+ * statement which represents a SQL Merge statement get marked correctly with
+ * {@link org.apache.hadoop.hive.ql.io.AcidUtils.Operation}. See usages
+ * of {@link #getDestNamePrefix(ASTNode, QB)} and
+ * {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer#updating(String)} and
+ * {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer#deleting(String)}.
*/
public DestClausePrefix getDestNamePrefix(ASTNode curNode, QB queryBlock) {
assert curNode != null : "must supply curNode";
@@ -255,7 +261,7 @@ public class Context {
case DELETE:
return DestClausePrefix.DELETE;
case MERGE:
- /* This is the structrue expected here
+ /* This is the structure expected here
HiveParser.TOK_QUERY;
HiveParser.TOK_FROM
HiveParser.TOK_INSERT;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java
index 44f7b43..db6d551 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java
@@ -64,7 +64,7 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
private static final String INDENT = " ";
- private IdentifierQuoter quotedIdenfierHelper;
+ private IdentifierQuoter quotedIdentifierHelper;
/**
* This allows us to take an arbitrary ASTNode and turn it back into SQL that produced it.
@@ -112,7 +112,7 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
* needing to understand what it is (except for QuotedIdentifiers).
*/
private String getMatchedText(ASTNode n) {
- quotedIdenfierHelper.visit(n);
+ quotedIdentifierHelper.visit(n);
return ctx.getTokenRewriteStream().toString(n.getTokenStartIndex(),
n.getTokenStopIndex() + 1).trim();
}
@@ -130,7 +130,7 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
* @throws SemanticException
*/
private void analyzeMerge(ASTNode tree) throws SemanticException {
- quotedIdenfierHelper = new IdentifierQuoter(ctx.getTokenRewriteStream());
+ quotedIdentifierHelper = new IdentifierQuoter(ctx.getTokenRewriteStream());
/*
* See org.apache.hadoop.hive.ql.parse.TestMergeStatement for some examples of the merge AST
For example, given:
@@ -161,7 +161,7 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
- VALUES clause must have the same number of values as target table (including partition cols). Part cols go last
in Select clause of Insert as Select
todo: do we care to preserve comments in original SQL?
- todo: check if identifiers are propertly escaped/quoted in the generated SQL - it's currently inconsistent
+ todo: check if identifiers are properly escaped/quoted in the generated SQL - it's currently inconsistent
Look at UnparseTranslator.addIdentifierTranslation() - it does unescape + unparse...
todo: consider "WHEN NOT MATCHED BY SOURCE THEN UPDATE SET TargetTable.Col1 = SourceTable.Col1 "; what happens when
source is empty? This should be a runtime error - maybe not the outer side of ROJ is empty => the join produces 0
@@ -211,7 +211,8 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
if (hasHint) {
hintStr = " /*+ " + qHint.getText() + " */ ";
}
-
+ final boolean splitUpdateEarly =
+ HiveConf.getBoolVar(conf, HiveConf.ConfVars.MERGE_SPLIT_UPDATE);
/**
* We allow at most 2 WHEN MATCHED clause, in which case 1 must be Update the other Delete
* If we have both update and delete, the 1st one (in SQL code) must have "AND <extra predicate>"
@@ -233,7 +234,8 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
case HiveParser.TOK_UPDATE:
numWhenMatchedUpdateClauses++;
String s = handleUpdate(whenClause, rewrittenQueryStr, target,
- onClauseAsText, targetTable, extraPredicate, hintProcessed ? null : hintStr);
+ onClauseAsText, targetTable, extraPredicate, hintProcessed ? null : hintStr,
+ splitUpdateEarly);
hintProcessed = true;
if (numWhenMatchedUpdateClauses + numWhenMatchedDeleteClauses == 1) {
extraPredicate = s; //i.e. it's the 1st WHEN MATCHED
@@ -242,7 +244,7 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
case HiveParser.TOK_DELETE:
numWhenMatchedDeleteClauses++;
String s1 = handleDelete(whenClause, rewrittenQueryStr, target,
- onClauseAsText, targetTable, extraPredicate, hintProcessed ? null : hintStr);
+ onClauseAsText, targetTable, extraPredicate, hintProcessed ? null : hintStr, false);
hintProcessed = true;
if (numWhenMatchedUpdateClauses + numWhenMatchedDeleteClauses == 1) {
extraPredicate = s1; //i.e. it's the 1st WHEN MATCHED
@@ -271,7 +273,7 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
ASTNode rewrittenTree = rr.rewrittenTree;
rewrittenCtx.setOperation(Context.Operation.MERGE);
- //set dest name mapping on new context; 1st chid is TOK_FROM
+ //set dest name mapping on new context; 1st child is TOK_FROM
for (int insClauseIdx = 1, whenClauseIdx = 0;
insClauseIdx < rewrittenTree.getChildCount() - (validating ? 1 : 0/*skip cardinality violation clause*/);
insClauseIdx++, whenClauseIdx++) {
@@ -281,7 +283,21 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.INSERT);
break;
case HiveParser.TOK_UPDATE:
- rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.UPDATE);
+ if(!splitUpdateEarly) {
+ rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.UPDATE);
+ } else {
+ /* With 2 branches for the update, the 1st branch is the INSERT part
+ and the next one is the DELETE. WriteSet tracking treats 2 concurrent DELETES
+ as in conflict so Lost Update is still prevented since the delete event lands in the
+ partition/bucket where the original version of the row was so any concurrent update/delete
+ of the same row will land in the same partition/bucket.
+
+ If the insert part lands in a different partition, it should not conflict with another
+ Update of that partition since that update by definition cannot be of the same row.
+ If we ever enforce unique constraints we may have to have I+I be in conflict*/
+ rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.INSERT);
+ rewrittenCtx.addDestNamePrefix(++insClauseIdx, Context.DestClausePrefix.DELETE);
+ }
break;
case HiveParser.TOK_DELETE:
rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.DELETE);
@@ -339,7 +355,7 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
//this is a tmp table and thus Session scoped and acid requires SQL statement to be serial in a
// given session, i.e. the name can be fixed across all invocations
String tableName = "merge_tmp_table";
- rewrittenQueryStr.append("\nINSERT INTO ").append(tableName)
+ rewrittenQueryStr.append("INSERT INTO ").append(tableName)
.append("\n SELECT cardinality_violation(")
.append(getSimpleTableName(target)).append(".ROW__ID");
addPartitionColsToSelect(targetTable.getPartCols(), rewrittenQueryStr, target);
@@ -381,22 +397,25 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
* @param deleteExtraPredicate - see notes at caller
*/
private String handleUpdate(ASTNode whenMatchedUpdateClause, StringBuilder rewrittenQueryStr, ASTNode target,
- String onClauseAsString, Table targetTable, String deleteExtraPredicate, String hintStr)
+ String onClauseAsString, Table targetTable, String deleteExtraPredicate, String hintStr, boolean splitUpdateEarly)
throws SemanticException {
assert whenMatchedUpdateClause.getType() == HiveParser.TOK_MATCHED;
assert getWhenClauseOperation(whenMatchedUpdateClause).getType() == HiveParser.TOK_UPDATE;
String targetName = getSimpleTableName(target);
rewrittenQueryStr.append("INSERT INTO ").append(getFullTableNameForSQL(target));
addPartitionColsToInsert(targetTable.getPartCols(), rewrittenQueryStr);
- rewrittenQueryStr.append(" -- update clause\n SELECT ");
+ rewrittenQueryStr.append(" -- update clause").append(splitUpdateEarly ? "(insert part)": "")
+ .append("\n SELECT ");
if (hintStr != null) {
rewrittenQueryStr.append(hintStr);
}
- rewrittenQueryStr.append(targetName).append(".ROW__ID");
+ if(!splitUpdateEarly) {
+ rewrittenQueryStr.append(targetName).append(".ROW__ID, ");
+ }
ASTNode setClause = (ASTNode)getWhenClauseOperation(whenMatchedUpdateClause).getChild(0);
//columns being updated -> update expressions; "setRCols" (last param) is null because we use actual expressions
- //before reparsing, i.e. they are known to SemanticAnalyzer logic
+ //before re-parsing, i.e. they are known to SemanticAnalyzer logic
Map<String, ASTNode> setColsExprs = collectSetColumnsAndExpressions(setClause, null, targetTable);
//if target table has cols c1,c2,c3 and p1 partition col and we had "SET c2 = 5, c1 = current_date()" we want to end
//up with
@@ -404,8 +423,11 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
//since we take the RHS of set exactly as it was in Input, we don't need to deal with quoting/escaping column/table
//names
List<FieldSchema> nonPartCols = targetTable.getCols();
- for (FieldSchema fs : nonPartCols) {
- rewrittenQueryStr.append(", ");
+ for(int i = 0; i < nonPartCols.size(); i++) {
+ FieldSchema fs = nonPartCols.get(i);
+ if(i > 0) {
+ rewrittenQueryStr.append(", ");
+ }
String name = fs.getName();
if (setColsExprs.containsKey(name)) {
String rhsExp = getMatchedText(setColsExprs.get(name));
@@ -435,14 +457,26 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
if (deleteExtraPredicate != null) {
rewrittenQueryStr.append(" AND NOT(").append(deleteExtraPredicate).append(")");
}
- rewrittenQueryStr.append("\n SORT BY ");
- rewrittenQueryStr.append(targetName).append(".ROW__ID \n");
+ if(!splitUpdateEarly) {
+ rewrittenQueryStr.append("\n SORT BY ");
+ rewrittenQueryStr.append(targetName).append(".ROW__ID ");
+ }
+ rewrittenQueryStr.append("\n");
setUpAccessControlInfoForUpdate(targetTable, setColsExprs);
//we don't deal with columns on RHS of SET expression since the whole expr is part of the
- //rewritten SQL statement and is thus handled by SemanticAnalzyer. Nor do we have to
+ //rewritten SQL statement and is thus handled by SemanticAnalyzer. Nor do we have to
//figure which cols on RHS are from source and which from target
+ if(splitUpdateEarly) {
+ /**
+ * this is part of the WHEN MATCHED UPDATE, so we ignore any 'extra predicate' generated
+ * by this call to handleDelete()
+ */
+ handleDelete(whenMatchedUpdateClause, rewrittenQueryStr, target, onClauseAsString,
+ targetTable, deleteExtraPredicate, hintStr, true);
+ }
+
return extraPredicate;
}
@@ -450,17 +484,23 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
* @param onClauseAsString - because there is no clone() and we need to use in multiple places
* @param updateExtraPredicate - see notes at caller
*/
- private String handleDelete(ASTNode whenMatchedDeleteClause, StringBuilder rewrittenQueryStr, ASTNode target,
- String onClauseAsString, Table targetTable, String updateExtraPredicate, String hintStr)
- throws SemanticException {
+ private String handleDelete(ASTNode whenMatchedDeleteClause, StringBuilder rewrittenQueryStr,
+ ASTNode target, String onClauseAsString, Table targetTable, String updateExtraPredicate,
+ String hintStr, boolean splitUpdateEarly) throws SemanticException {
assert whenMatchedDeleteClause.getType() == HiveParser.TOK_MATCHED;
- assert getWhenClauseOperation(whenMatchedDeleteClause).getType() == HiveParser.TOK_DELETE;
+ assert (splitUpdateEarly &&
+ getWhenClauseOperation(whenMatchedDeleteClause).getType() == HiveParser.TOK_UPDATE) ||
+ getWhenClauseOperation(whenMatchedDeleteClause).getType() == HiveParser.TOK_DELETE;
List<FieldSchema> partCols = targetTable.getPartCols();
String targetName = getSimpleTableName(target);
rewrittenQueryStr.append("INSERT INTO ").append(getFullTableNameForSQL(target));
addPartitionColsToInsert(partCols, rewrittenQueryStr);
- rewrittenQueryStr.append(" -- delete clause\n SELECT ");
+ if(splitUpdateEarly) {
+ rewrittenQueryStr.append(" -- update clause (delete part)\n SELECT ");
+ } else {
+ rewrittenQueryStr.append(" -- delete clause\n SELECT ");
+ }
if (hintStr != null) {
rewrittenQueryStr.append(hintStr);
}
@@ -590,8 +630,9 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
if (extraPredicate != null) {
//we have WHEN NOT MATCHED AND <boolean expr> THEN INSERT
rewrittenQueryStr.append(" AND ")
- .append(getMatchedText(((ASTNode)whenNotMatchedClause.getChild(1)))).append('\n');
+ .append(getMatchedText(((ASTNode)whenNotMatchedClause.getChild(1))));
}
+ rewrittenQueryStr.append('\n');
}
private String replaceDefaultKeywordForMerge(String valueClause, Table table, ASTNode columnListNode)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java
index 6caac11..33247f0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java
@@ -221,7 +221,7 @@ public abstract class RewriteSemanticAnalyzer extends SemanticAnalyzer {
* For updates, we need to set the column access info so that it contains information on
* the columns we are updating.
* (But not all the columns of the target table even though the rewritten query writes
- * all columns of target table since that is an implmentation detail).
+ * all columns of target table since that is an implementation detail).
*/
protected void setUpAccessControlInfoForUpdate(Table mTable, Map<String, ASTNode> setCols) {
ColumnAccessInfo cai = new ColumnAccessInfo();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 706fa55..0d0196e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -7971,8 +7971,16 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
if ((dpCtx == null || dpCtx.getNumDPCols() == 0)) {
output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable, dest));
if (!outputs.add(output)) {
- throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES
- .getMsg(dest_tab.getTableName()));
+ if(!((this instanceof MergeSemanticAnalyzer) &&
+ conf.getBoolVar(ConfVars.MERGE_SPLIT_UPDATE))) {
+ /**
+ * Merge stmt with early split update may create several (2) writes to the same
+ * table with the same {@link WriteType}, e.g. if original Merge stmt has both update and
+ * delete clauses, and update is split into insert + delete, in which case it's not an
+ * error*/
+ throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES
+ .getMsg(dest_tab.getTableName()));
+ }
}
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index c2b931a..33c25ed 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -970,13 +970,29 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
sb.append(s).append('\n');
}
LOG.info("Explain1: " + sb);
+ /*
+ Edges:
+ Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 8 (SIMPLE_EDGE)
+ Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
+ Reducer 4 <- Reducer 2 (SIMPLE_EDGE)
+ Reducer 5 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
+ Reducer 6 <- Reducer 2 (SIMPLE_EDGE)
+ Reducer 7 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
+ */
for(int i = 0; i < explain.size(); i++) {
if(explain.get(i).contains("Edges:")) {
- Assert.assertTrue("At i+1=" + (i+1) + explain.get(i + 1), explain.get(i + 1).contains("Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 7 (SIMPLE_EDGE)"));
- Assert.assertTrue("At i+1=" + (i+2) + explain.get(i + 2), explain.get(i + 2).contains("Reducer 3 <- Reducer 2 (SIMPLE_EDGE)"));
- Assert.assertTrue("At i+1=" + (i+3) + explain.get(i + 3), explain.get(i + 3).contains("Reducer 4 <- Reducer 2 (SIMPLE_EDGE)"));
- Assert.assertTrue("At i+1=" + (i+4) + explain.get(i + 4), explain.get(i + 4).contains("Reducer 5 <- Reducer 2 (SIMPLE_EDGE)"));
- Assert.assertTrue("At i+1=" + (i+5) + explain.get(i + 5), explain.get(i + 5).contains("Reducer 6 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)"));
+ Assert.assertTrue("At i+1=" + (i+1) + explain.get(i + 1),
+ explain.get(i + 1).contains("Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 8 (SIMPLE_EDGE)"));
+ Assert.assertTrue("At i+1=" + (i+2) + explain.get(i + 2),
+ explain.get(i + 2).contains("Reducer 3 <- Reducer 2 (SIMPLE_EDGE)"));
+ Assert.assertTrue("At i+1=" + (i+3) + explain.get(i + 3),
+ explain.get(i + 3).contains("Reducer 4 <- Reducer 2 (SIMPLE_EDGE)"));
+ Assert.assertTrue("At i+1=" + (i+4) + explain.get(i + 4),
+ explain.get(i + 4).contains("Reducer 5 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)"));
+ Assert.assertTrue("At i+1=" + (i+5) + explain.get(i + 5),
+ explain.get(i + 5).contains("Reducer 6 <- Reducer 2 (SIMPLE_EDGE)"));
+ Assert.assertTrue("At i+1=" + (i+5) + explain.get(i + 5),
+ explain.get(i + 6).contains("Reducer 7 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)"));
break;
}
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 6f44e9b..0734ed9 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -147,6 +147,7 @@ public class TestTxnCommands2 {
.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, true);
+ HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.MERGE_SPLIT_UPDATE, true);
hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false);
//TestTxnCommands2WithSplitUpdateAndVectorization has the vectorized version
//of these tests.
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
index dc2963d..5f39fdc 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
@@ -100,6 +100,7 @@ public abstract class TxnCommandsBaseForTests {
.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, true);
+ HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.MERGE_SPLIT_UPDATE, true);
hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false);
hiveConf.setBoolean("mapred.input.dir.recursive", true);
TxnDbUtil.setConfValues(hiveConf);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index 5e085f8..43a3047 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.junit.Before;
+import org.junit.ComparisonFailure;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -1529,7 +1530,7 @@ public class TestDbTxnManager2 {
3, TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null"));
}
/**
- * Concurrent delete/detele of same partition - should pass
+ * Concurrent delete/delete of same partition - should NOT pass
*/
@Test
public void testWriteSetTracking11() throws Exception {
@@ -1584,18 +1585,27 @@ public class TestDbTxnManager2 {
Collections.singletonList("p=two"));
adp.setOperationType(DataOperationType.DELETE);
txnHandler.addDynamicPartitions(adp);
- txnMgr.commitTxn();//"select * from tab1" txn
-
- Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"),
- 1, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=" + txnIdDelete));
- Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"),
- 1, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=" + txnIdSelect));
- Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"),
- 1, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=" + txnIdDelete));
- Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"),
- 1, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=" + txnIdSelect));
+ LockException expectedException = null;
+ try {
+ txnMgr.commitTxn();//"select * from tab1" txn
+ }
+ catch(LockException ex) {
+ expectedException = ex;
+ }
+ Assert.assertNotNull("Didn't get expected d/d conflict", expectedException);
+ Assert.assertEquals("Transaction manager has aborted the transaction txnid:5. " +
+ "Reason: Aborting [txnid:5,5] due to a write conflict on default/tab1/p=two " +
+ "committed by [txnid:4,5] d/d", expectedException.getMessage());
+ Assert.assertEquals("WRITE_SET mismatch: " +
+ TxnDbUtil.queryToString(conf, "select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent(conf,
+ "select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=" + txnIdDelete));
+ Assert.assertEquals("WRITE_SET mismatch: " +
+ TxnDbUtil.queryToString(conf, "select * from WRITE_SET"),
+ 0, TxnDbUtil.countQueryAgent(conf,
+ "select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=" + txnIdSelect));
Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString(conf, "select * from COMPLETED_TXN_COMPONENTS"),
- 4, TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null"));
+ 3, TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null"));
}
@Test
public void testCompletedTxnComponents() throws Exception {
@@ -1706,8 +1716,8 @@ public class TestDbTxnManager2 {
"(9,10,1,2), (3,4,1,2), (11,12,1,3), (5,13,1,3), (7,8,2,2), (14,15,1,1)"));
checkCmdOnDriver(driver.run("create table source2 (a int, b int, p int, q int)"));
checkCmdOnDriver(driver.run("insert into source2 values " +
- //cc ? -:U-(1/2) D-(1/2) cc ? U-(1/3):- D-(2/2) I-(1/1) - new part 2
- "(9,100,1,2), (3,4,1,2), (5,13,1,3), (7,8,2,2), (14,15,2,1)"));
+ //cc ? -:U-(1/2) D-(1/2) cc ? U-(1/3):- D-(2/2) I-(1/1) - new part 2
+ "(9,100,1,2), (3,4,1,2), (5,13,1,3), (7,8,2,2), (14,15,2,1)"));
checkCmdOnDriver(driver.compileAndRespond("merge into target t using source s on t.a=s.b " +
@@ -1729,7 +1739,7 @@ public class TestDbTxnManager2 {
swapTxnManager(txnMgr2);
checkCmdOnDriver(driver.compileAndRespond("merge into target t using source2 s on t.a=s.b " +
"when matched and t.a=" + (cc ? 5 : 9) + " then update set b=s.b " + //if conflict updates p=1/q=3 else update p=1/q=2
- "when matched and t.a in (3,7) then delete " + //deletes from p=1/q=2, p=2/q=2
+ "when matched and t.a in (" + (cc ? "3,7" : "11, 13") + ") then delete " + //if cc deletes from p=1/q=2, p=2/q=2, else delete nothing
"when not matched and t.a >= 8 then insert values(s.a, s.b, s.p, s.q)", true));//insert p=1/q=2, p=1/q=3 and new part 1/1
long txnId2 = txnMgr2.getCurrentTxnId();
txnMgr2.acquireLocks(driver.getPlan(), ctx, "T1", false);
@@ -1824,10 +1834,12 @@ public class TestDbTxnManager2 {
Collections.singletonList(cc ? "p=1/q=3" : "p=1/p=2"));//update clause
adp.setOperationType(DataOperationType.UPDATE);
txnHandler.addDynamicPartitions(adp);
- adp = new AddDynamicPartitions(txnId2, writeId, "default", "target",
- Arrays.asList("p=1/q=2","p=2/q=2"));//delete clause
- adp.setOperationType(DataOperationType.DELETE);
- txnHandler.addDynamicPartitions(adp);
+ if(cc) {
+ adp = new AddDynamicPartitions(txnId2, writeId, "default", "target",
+ Arrays.asList("p=1/q=2", "p=2/q=2"));//delete clause
+ adp.setOperationType(DataOperationType.DELETE);
+ txnHandler.addDynamicPartitions(adp);
+ }
adp = new AddDynamicPartitions(txnId2, writeId, "default", "target",
Arrays.asList("p=1/q=2","p=1/q=3","p=1/q=1"));//insert clause
adp.setOperationType(DataOperationType.INSERT);
@@ -1841,7 +1853,7 @@ public class TestDbTxnManager2 {
Assert.assertEquals(
"TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " +
TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"),
- 2,
+ (cc ? 2 : 0),
TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId2 +
" and tc_operation_type='d'"));
Assert.assertEquals(
@@ -1860,9 +1872,18 @@ public class TestDbTxnManager2 {
}
if(cc) {
Assert.assertNotNull("didn't get exception", expectedException);
- Assert.assertEquals("Transaction manager has aborted the transaction txnid:11. Reason: " +
- "Aborting [txnid:11,11] due to a write conflict on default/target/p=1/q=3 " +
- "committed by [txnid:10,11] u/u", expectedException.getMessage());
+ try {
+ Assert.assertEquals("Transaction manager has aborted the transaction txnid:11. Reason: " +
+ "Aborting [txnid:11,11] due to a write conflict on default/target/p=1/q=3 " +
+ "committed by [txnid:10,11] u/u", expectedException.getMessage());
+ }
+ catch(ComparisonFailure ex) {
+ //the 2 txns have 2 conflicts between them so check for either failure since which one is
+ //reported (among the 2) is not deterministic
+ Assert.assertEquals("Transaction manager has aborted the transaction txnid:11. Reason: " +
+ "Aborting [txnid:11,11] due to a write conflict on default/target/p=1/q=2 " +
+ "committed by [txnid:10,11] d/d", expectedException.getMessage());
+ }
Assert.assertEquals(
"COMPLETED_TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " +
TxnDbUtil.queryToString(conf, "select * from COMPLETED_TXN_COMPONENTS"),
@@ -1879,7 +1900,7 @@ public class TestDbTxnManager2 {
Assert.assertEquals(
"COMPLETED_TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " +
TxnDbUtil.queryToString(conf, "select * from COMPLETED_TXN_COMPONENTS"),
- 6,
+ 4,
TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + txnId2));
Assert.assertEquals(
"WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " +
@@ -1890,7 +1911,7 @@ public class TestDbTxnManager2 {
Assert.assertEquals(
"WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " +
TxnDbUtil.queryToString(conf, "select * from WRITE_SET"),
- 2,
+ 0,
TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET where ws_txnid=" + txnId2 +
" and ws_operation_type='d'"));
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 7520922..6df7680 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -1174,9 +1174,17 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
" and cur.ws_txnid=" + txnid + //make sure RHS of join only has rows we just inserted as
// part of this commitTxn() op
" and committed.ws_txnid <> " + txnid + //and LHS only has committed txns
- //U+U and U+D is a conflict but D+D is not and we don't currently track I in WRITE_SET at all
- " and (committed.ws_operation_type=" + quoteChar(OperationType.UPDATE.sqlConst) +
- " OR cur.ws_operation_type=" + quoteChar(OperationType.UPDATE.sqlConst) + ")"));
+ //U+U and U+D and D+D is a conflict and we don't currently track I in WRITE_SET at all
+ //it may seem like D+D should not be in conflict but consider 2 multi-stmt txns
+ //where each does "delete X + insert X, where X is a row with the same PK. This is
+ //equivalent to an update of X but won't be in conflict unless D+D is in conflict.
+ //The same happens when Hive splits U=I+D early so it looks like 2 branches of a
+ //multi-insert stmt (an Insert and a Delete branch). It also 'feels'
+ // un-serializable to allow concurrent deletes
+ " and (committed.ws_operation_type IN(" + quoteChar(OperationType.UPDATE.sqlConst) +
+ ", " + quoteChar(OperationType.DELETE.sqlConst) +
+ ") AND cur.ws_operation_type IN(" + quoteChar(OperationType.UPDATE.sqlConst) + ", "
+ + quoteChar(OperationType.DELETE.sqlConst) + "))"));
if (rs.next()) {
//found a conflict
String committedTxn = "[" + JavaUtils.txnIdToString(rs.getLong(1)) + "," + rs.getLong(2) + "]";