You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2017/05/01 16:43:36 UTC
[3/3] hive git commit: HIVE-12636 Ensure that all queries (with
DbTxnManager) run in a transaction (Eugene Koifman, reviewed by Wei Zheng)
HIVE-12636 Ensure that all queries (with DbTxnManager) run in a transaction (Eugene Koifman, reviewed by Wei Zheng)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/21909601
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/21909601
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/21909601
Branch: refs/heads/master
Commit: 21909601f8f5f9d8325774178aaaa8fb3c26a764
Parents: 41c3832
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Mon May 1 09:43:27 2017 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Mon May 1 09:43:27 2017 -0700
----------------------------------------------------------------------
.../hive/hcatalog/streaming/TestStreaming.java | 40 +--
.../hive/ql/txn/compactor/TestCompactor.java | 80 ++---
.../hadoop/hive/metastore/txn/TxnHandler.java | 13 +-
.../hadoop/hive/metastore/txn/TxnUtils.java | 4 +
.../java/org/apache/hadoop/hive/ql/Context.java | 7 +
.../java/org/apache/hadoop/hive/ql/Driver.java | 204 ++++++-----
.../org/apache/hadoop/hive/ql/ErrorMsg.java | 14 +-
.../org/apache/hadoop/hive/ql/QueryPlan.java | 28 +-
.../hadoop/hive/ql/lockmgr/DbTxnManager.java | 282 +++++++++++++---
.../hadoop/hive/ql/lockmgr/HiveTxnManager.java | 16 +-
.../hive/ql/lockmgr/HiveTxnManagerImpl.java | 20 +-
.../hive/ql/parse/ExplainSemanticAnalyzer.java | 6 +-
.../hive/ql/parse/SemanticAnalyzerFactory.java | 23 +-
.../hadoop/hive/ql/plan/HiveOperation.java | 37 +-
.../hadoop/hive/ql/session/SessionState.java | 15 +
.../hive/metastore/txn/TestTxnHandler.java | 2 +-
.../org/apache/hadoop/hive/ql/TestErrorMsg.java | 6 +-
.../apache/hadoop/hive/ql/TestTxnCommands.java | 48 +--
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 21 +-
.../ql/TestTxnCommands2WithSplitUpdate.java | 20 +-
.../hive/ql/lockmgr/TestDbTxnManager.java | 65 ++--
.../hive/ql/lockmgr/TestDbTxnManager2.java | 336 +++++++++++--------
.../hive/ql/txn/compactor/TestInitiator.java | 11 +-
ql/src/test/queries/clientpositive/row__id.q | 4 +-
.../clientpositive/acid_table_stats.q.out | 14 +-
.../clientpositive/autoColumnStats_4.q.out | 4 +-
.../insert_values_orig_table_use_metadata.q.out | 18 +-
.../llap/acid_bucket_pruning.q.out | 6 +-
.../test/results/clientpositive/row__id.q.out | 34 +-
29 files changed, 840 insertions(+), 538 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 8ea58e6..097de9b 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -741,7 +741,7 @@ public class TestStreaming {
txnBatch.write("1,Hello streaming".getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+ checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
, txnBatch.getCurrentTransactionState());
@@ -753,11 +753,11 @@ public class TestStreaming {
txnBatch.write("2,Welcome to streaming".getBytes());
// data should not be visible
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+ checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
txnBatch.commit();
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}",
"{2, Welcome to streaming}");
txnBatch.close();
@@ -809,7 +809,7 @@ public class TestStreaming {
txnBatch.write("1,Hello streaming".getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+ checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
, txnBatch.getCurrentTransactionState());
@@ -821,11 +821,11 @@ public class TestStreaming {
txnBatch.write("2,Welcome to streaming".getBytes());
// data should not be visible
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+ checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
txnBatch.commit();
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}",
"{2, Welcome to streaming}");
txnBatch.close();
@@ -871,7 +871,7 @@ public class TestStreaming {
txnBatch.write(rec1.getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+ checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
, txnBatch.getCurrentTransactionState());
@@ -998,7 +998,7 @@ public class TestStreaming {
txnBatch.write("2,Welcome to streaming".getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 14, 23, 1, 1, "{1, Hello streaming}",
"{2, Welcome to streaming}");
txnBatch.close();
@@ -1017,13 +1017,13 @@ public class TestStreaming {
txnBatch.write("1,Hello streaming".getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+ checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
txnBatch.beginNextTransaction();
txnBatch.write("2,Welcome to streaming".getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}",
"{2, Welcome to streaming}");
txnBatch.close();
@@ -1034,14 +1034,14 @@ public class TestStreaming {
txnBatch.write("3,Hello streaming - once again".getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 15, 34, 1, 2, "{1, Hello streaming}",
"{2, Welcome to streaming}", "{3, Hello streaming - once again}");
txnBatch.beginNextTransaction();
txnBatch.write("4,Welcome to streaming - once again".getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 15, 34, 1, 2, "{1, Hello streaming}",
"{2, Welcome to streaming}", "{3, Hello streaming - once again}",
"{4, Welcome to streaming - once again}");
@@ -1078,11 +1078,11 @@ public class TestStreaming {
txnBatch2.commit();
- checkDataWritten(partLoc, 11, 20, 1, 1, "{3, Hello streaming - once again}");
+ checkDataWritten(partLoc, 24, 33, 1, 1, "{3, Hello streaming - once again}");
txnBatch1.commit();
- checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
+ checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
txnBatch1.beginNextTransaction();
txnBatch1.write("2,Welcome to streaming".getBytes());
@@ -1090,17 +1090,17 @@ public class TestStreaming {
txnBatch2.beginNextTransaction();
txnBatch2.write("4,Welcome to streaming - once again".getBytes());
- checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
+ checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
txnBatch1.commit();
- checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}",
"{2, Welcome to streaming}",
"{3, Hello streaming - once again}");
txnBatch2.commit();
- checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}",
"{2, Welcome to streaming}",
"{3, Hello streaming - once again}",
"{4, Welcome to streaming - once again}");
@@ -1769,7 +1769,7 @@ public class TestStreaming {
txnBatch.heartbeat();//this is no-op on closed batch
txnBatch.abort();//ditto
GetOpenTxnsInfoResponse r = msClient.showTxns();
- Assert.assertEquals("HWM didn't match", 2, r.getTxn_high_water_mark());
+ Assert.assertEquals("HWM didn't match", 17, r.getTxn_high_water_mark());
List<TxnInfo> ti = r.getOpen_txns();
Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState());
Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState());
@@ -1833,7 +1833,7 @@ public class TestStreaming {
expectedEx != null && expectedEx.getMessage().contains("has been closed()"));
r = msClient.showTxns();
- Assert.assertEquals("HWM didn't match", 4, r.getTxn_high_water_mark());
+ Assert.assertEquals("HWM didn't match", 19, r.getTxn_high_water_mark());
ti = r.getOpen_txns();
Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState());
Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState());
@@ -1856,7 +1856,7 @@ public class TestStreaming {
expectedEx != null && expectedEx.getMessage().contains("Simulated fault occurred"));
r = msClient.showTxns();
- Assert.assertEquals("HWM didn't match", 6, r.getTxn_high_water_mark());
+ Assert.assertEquals("HWM didn't match", 21, r.getTxn_high_water_mark());
ti = r.getOpen_txns();
Assert.assertEquals("wrong status ti(3)", TxnState.ABORTED, ti.get(3).getState());
Assert.assertEquals("wrong status ti(4)", TxnState.ABORTED, ti.get(4).getState());
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 66ed8ca..f92db7c 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -659,17 +659,17 @@ public class TestCompactor {
Path resultFile = null;
for (int i = 0; i < names.length; i++) {
names[i] = stat[i].getPath().getName();
- if (names[i].equals("delta_0000001_0000004")) {
+ if (names[i].equals("delta_0000003_0000006")) {
resultFile = stat[i].getPath();
}
}
Arrays.sort(names);
- String[] expected = new String[]{"delta_0000001_0000002",
- "delta_0000001_0000004", "delta_0000003_0000004", "delta_0000005_0000006"};
+ String[] expected = new String[]{"delta_0000003_0000004",
+ "delta_0000003_0000006", "delta_0000005_0000006", "delta_0000007_0000008"};
if (!Arrays.deepEquals(expected, names)) {
Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
}
- checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty, 0, 1L, 4L);
+ checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty, 0, 3L, 6L);
} finally {
connection.close();
@@ -718,11 +718,11 @@ public class TestCompactor {
FileStatus[] stat =
fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.baseFileFilter);
if (1 != stat.length) {
- Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat));
+ Assert.fail("Expecting 1 file \"base_0000006\" and found " + stat.length + " files " + Arrays.toString(stat));
}
String name = stat[0].getPath().getName();
- Assert.assertEquals(name, "base_0000004");
- checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L);
+ Assert.assertEquals(name, "base_0000006");
+ checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L);
} finally {
connection.close();
}
@@ -778,17 +778,17 @@ public class TestCompactor {
Path resultDelta = null;
for (int i = 0; i < names.length; i++) {
names[i] = stat[i].getPath().getName();
- if (names[i].equals("delta_0000001_0000004")) {
+ if (names[i].equals("delta_0000003_0000006")) {
resultDelta = stat[i].getPath();
}
}
Arrays.sort(names);
- String[] expected = new String[]{"delta_0000001_0000002",
- "delta_0000001_0000004", "delta_0000003_0000004"};
+ String[] expected = new String[]{"delta_0000003_0000004",
+ "delta_0000003_0000006", "delta_0000005_0000006"};
if (!Arrays.deepEquals(expected, names)) {
Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
}
- checkExpectedTxnsPresent(null, new Path[]{resultDelta}, columnNamesProperty, columnTypesProperty, 0, 1L, 4L);
+ checkExpectedTxnsPresent(null, new Path[]{resultDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 6L);
} finally {
connection.close();
}
@@ -844,13 +844,13 @@ public class TestCompactor {
Assert.fail("majorCompactAfterAbort FileStatus[] stat " + Arrays.toString(stat));
}
if (1 != stat.length) {
- Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat));
+ Assert.fail("Expecting 1 file \"base_0000006\" and found " + stat.length + " files " + Arrays.toString(stat));
}
String name = stat[0].getPath().getName();
- if (!name.equals("base_0000004")) {
- Assert.fail("majorCompactAfterAbort name " + name + " not equals to base_0000004");
+ if (!name.equals("base_0000006")) {
+ Assert.fail("majorCompactAfterAbort name " + name + " not equals to base_0000006");
}
- checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L);
+ checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L);
} finally {
connection.close();
}
@@ -899,11 +899,11 @@ public class TestCompactor {
FileStatus[] stat =
fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.baseFileFilter);
if (1 != stat.length) {
- Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat));
+ Assert.fail("Expecting 1 file \"base_0000006\" and found " + stat.length + " files " + Arrays.toString(stat));
}
String name = stat[0].getPath().getName();
- Assert.assertEquals(name, "base_0000004");
- checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L);
+ Assert.assertEquals(name, "base_0000006");
+ checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L);
} finally {
connection.close();
}
@@ -923,18 +923,18 @@ public class TestCompactor {
" STORED AS ORC TBLPROPERTIES ('transactional'='true',"
+ "'transactional_properties'='default')", driver);
- // Insert some data -> this will generate only insert deltas and no delete deltas: delta_1_1
+ // Insert some data -> this will generate only insert deltas and no delete deltas: delta_3_3
executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(1, 'foo')", driver);
- // Insert some data -> this will again generate only insert deltas and no delete deltas: delta_2_2
+ // Insert some data -> this will again generate only insert deltas and no delete deltas: delta_4_4
executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(2, 'bar')", driver);
- // Delete some data -> this will generate only delete deltas and no insert deltas: delete_delta_3_3
+ // Delete some data -> this will generate only delete deltas and no insert deltas: delete_delta_5_5
executeStatementOnDriver("DELETE FROM " + tblName +" WHERE a = 2", driver);
// Now, compact -> Compaction produces a single range for both delta and delete delta
- // That is, both delta and delete_deltas would be compacted into delta_1_3 and delete_delta_1_3
- // even though there are only two delta_1_1, delta_2_2 and one delete_delta_3_3.
+ // That is, both delta and delete_deltas would be compacted into delta_3_5 and delete_delta_3_5
+ // even though there are only two delta_3_3, delta_4_4 and one delete_delta_5_5.
TxnStore txnHandler = TxnUtils.getTxnStore(conf);
txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR));
Worker t = new Worker();
@@ -957,16 +957,16 @@ public class TestCompactor {
Path minorCompactedDelta = null;
for (int i = 0; i < deltas.length; i++) {
deltas[i] = stat[i].getPath().getName();
- if (deltas[i].equals("delta_0000001_0000003")) {
+ if (deltas[i].equals("delta_0000003_0000005")) {
minorCompactedDelta = stat[i].getPath();
}
}
Arrays.sort(deltas);
- String[] expectedDeltas = new String[]{"delta_0000001_0000001_0000", "delta_0000001_0000003", "delta_0000002_0000002_0000"};
+ String[] expectedDeltas = new String[]{"delta_0000003_0000003_0000", "delta_0000003_0000005", "delta_0000004_0000004_0000"};
if (!Arrays.deepEquals(expectedDeltas, deltas)) {
Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas));
}
- checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 1L, 2L);
+ checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 4L);
// Verify that we have got correct set of delete_deltas.
FileStatus[] deleteDeltaStat =
@@ -975,16 +975,16 @@ public class TestCompactor {
Path minorCompactedDeleteDelta = null;
for (int i = 0; i < deleteDeltas.length; i++) {
deleteDeltas[i] = deleteDeltaStat[i].getPath().getName();
- if (deleteDeltas[i].equals("delete_delta_0000001_0000003")) {
+ if (deleteDeltas[i].equals("delete_delta_0000003_0000005")) {
minorCompactedDeleteDelta = deleteDeltaStat[i].getPath();
}
}
Arrays.sort(deleteDeltas);
- String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000003", "delete_delta_0000003_0000003_0000"};
+ String[] expectedDeleteDeltas = new String[]{"delete_delta_0000003_0000005", "delete_delta_0000005_0000005_0000"};
if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) {
Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
}
- checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 2L, 2L);
+ checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 4L, 4L);
}
@Test
@@ -1034,16 +1034,16 @@ public class TestCompactor {
Path minorCompactedDelta = null;
for (int i = 0; i < deltas.length; i++) {
deltas[i] = stat[i].getPath().getName();
- if (deltas[i].equals("delta_0000001_0000002")) {
+ if (deltas[i].equals("delta_0000003_0000004")) {
minorCompactedDelta = stat[i].getPath();
}
}
Arrays.sort(deltas);
- String[] expectedDeltas = new String[]{"delta_0000001_0000001_0000", "delta_0000001_0000002", "delta_0000002_0000002_0000"};
+ String[] expectedDeltas = new String[]{"delta_0000003_0000003_0000", "delta_0000003_0000004", "delta_0000004_0000004_0000"};
if (!Arrays.deepEquals(expectedDeltas, deltas)) {
Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas));
}
- checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 1L, 2L);
+ checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 4L);
// Verify that we have got correct set of delete_deltas.
FileStatus[] deleteDeltaStat =
@@ -1052,12 +1052,12 @@ public class TestCompactor {
Path minorCompactedDeleteDelta = null;
for (int i = 0; i < deleteDeltas.length; i++) {
deleteDeltas[i] = deleteDeltaStat[i].getPath().getName();
- if (deleteDeltas[i].equals("delete_delta_0000001_0000002")) {
+ if (deleteDeltas[i].equals("delete_delta_0000003_0000004")) {
minorCompactedDeleteDelta = deleteDeltaStat[i].getPath();
}
}
Arrays.sort(deleteDeltas);
- String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000002"};
+ String[] expectedDeleteDeltas = new String[]{"delete_delta_0000003_0000004"};
if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) {
Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
}
@@ -1111,17 +1111,17 @@ public class TestCompactor {
Path resultFile = null;
for (int i = 0; i < names.length; i++) {
names[i] = stat[i].getPath().getName();
- if (names[i].equals("delta_0000001_0000004")) {
+ if (names[i].equals("delta_0000003_0000006")) {
resultFile = stat[i].getPath();
}
}
Arrays.sort(names);
- String[] expected = new String[]{"delta_0000001_0000002",
- "delta_0000001_0000004", "delta_0000003_0000004", "delta_0000005_0000006"};
+ String[] expected = new String[]{"delta_0000003_0000004",
+ "delta_0000003_0000006", "delta_0000005_0000006", "delta_0000007_0000008"};
if (!Arrays.deepEquals(expected, names)) {
Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
}
- checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty, 0, 1L, 4L);
+ checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty, 0, 3L, 6L);
// Verify that we have got correct set of delete_deltas also
FileStatus[] deleteDeltaStat =
@@ -1130,12 +1130,12 @@ public class TestCompactor {
Path minorCompactedDeleteDelta = null;
for (int i = 0; i < deleteDeltas.length; i++) {
deleteDeltas[i] = deleteDeltaStat[i].getPath().getName();
- if (deleteDeltas[i].equals("delete_delta_0000001_0000004")) {
+ if (deleteDeltas[i].equals("delete_delta_0000003_0000006")) {
minorCompactedDeleteDelta = deleteDeltaStat[i].getPath();
}
}
Arrays.sort(deleteDeltas);
- String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000004"};
+ String[] expectedDeleteDeltas = new String[]{"delete_delta_0000003_0000006"};
if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) {
Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
}
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index e138838..12d98c5 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -855,7 +855,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
/**
* As much as possible (i.e. in absence of retries) we want both operations to be done on the same
* connection (but separate transactions). This avoid some flakiness in BONECP where if you
- * perform an operation on 1 connection and immediately get another fron the pool, the 2nd one
+ * perform an operation on 1 connection and immediately get another from the pool, the 2nd one
* doesn't see results of the first.
*
* Retry-by-caller note: If the call to lock is from a transaction, then in the worst case
@@ -994,6 +994,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
case SELECT:
updateTxnComponents = false;
break;
+ case NO_TXN:
+ /*this constant is a bit of a misnomer since we now always have a txn context. It
+ just means the operation is such that we don't care what tables/partitions it
+ affected as it doesn't trigger a compaction or conflict detection. A better name
+ would be NON_TRANSACTIONAL.*/
+ updateTxnComponents = false;
+ break;
default:
//since we have an open transaction, only 4 values above are expected
throw new IllegalStateException("Unexpected DataOperationType: " + lc.getOperationType()
@@ -2471,14 +2478,14 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
response.setLockid(extLockId);
LOG.debug("checkLock(): Setting savepoint. extLockId=" + JavaUtils.lockIdToString(extLockId));
- Savepoint save = dbConn.setSavepoint();//todo: get rid of this
+ Savepoint save = dbConn.setSavepoint();
StringBuilder query = new StringBuilder("select hl_lock_ext_id, " +
"hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " +
"hl_lock_type, hl_txnid from HIVE_LOCKS where hl_db in (");
Set<String> strings = new HashSet<String>(locksBeingChecked.size());
- //This the set of entities that the statement represnted by extLockId wants to update
+ //This the set of entities that the statement represented by extLockId wants to update
List<LockInfo> writeSet = new ArrayList<>();
for (LockInfo info : locksBeingChecked) {
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index 517eec3..2df88fd 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -50,6 +50,10 @@ public class TxnUtils {
* @return a valid txn list.
*/
public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, long currentTxn) {
+ /*todo: should highWater be min(currentTxn,txns.getTxn_high_water_mark()) assuming currentTxn>0
+ * otherwise if currentTxn=7 and 8 commits before 7, then 7 will see result of 8 which
+ * doesn't make sense for Snapshot Isolation. Of course for Read Committed, the list should
+ * inlude the latest committed set.*/
long highWater = txns.getTxn_high_water_mark();
Set<Long> open = txns.getOpen_txns();
long[] exceptions = new long[open.size() - (currentTxn > 0 ? 1 : 0)];
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/java/org/apache/hadoop/hive/ql/Context.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index 08bba3d..fdcf052 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -948,6 +948,13 @@ public class Context {
public ExplainConfiguration getExplainConfig() {
return explainConfig;
}
+ private boolean isExplainPlan = false;
+ public boolean isExplainPlan() {
+ return isExplainPlan;
+ }
+ public void setExplainPlan(boolean t) {
+ this.isExplainPlan = t;
+ }
public void setExplainConfig(ExplainConfiguration explainConfig) {
this.explainConfig = explainConfig;
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 16b8101..d32f313 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -37,9 +37,12 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.metrics.common.Metrics;
import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
@@ -154,11 +157,6 @@ public class Driver implements CommandProcessor {
private FetchTask fetchTask;
List<HiveLock> hiveLocks = new ArrayList<HiveLock>();
- // A list of FileSinkOperators writing in an ACID compliant manner
- private Set<FileSinkDesc> acidSinks;
- // whether any ACID table is involved in a query
- private boolean acidInQuery;
-
// A limit on the number of threads that can be launched
private int maxthreads;
private int tryCount = Integer.MAX_VALUE;
@@ -408,7 +406,7 @@ public class Driver implements CommandProcessor {
// deferClose indicates if the close/destroy should be deferred when the process has been
// interrupted, it should be set to true if the compile is called within another method like
// runInternal, which defers the close to the called in that method.
- public int compile(String command, boolean resetTaskIds, boolean deferClose) {
+ private int compile(String command, boolean resetTaskIds, boolean deferClose) {
PerfLogger perfLogger = SessionState.getPerfLogger(true);
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_RUN);
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.COMPILE);
@@ -525,6 +523,15 @@ public class Driver implements CommandProcessor {
// because at that point we need access to the objects.
Hive.get().getMSC().flushCache();
+ if(checkConcurrency() && startImplicitTxn(txnManager)) {
+ String userFromUGI = getUserFromUGI();
+ if (!txnManager.isTxnOpen()) {
+ if(userFromUGI == null) {
+ return 10;
+ }
+ long txnid = txnManager.openTxn(ctx, userFromUGI);
+ }
+ }
// Do semantic analysis and plan generation
if (saHooks != null && !saHooks.isEmpty()) {
HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl();
@@ -543,15 +550,10 @@ public class Driver implements CommandProcessor {
} else {
sem.analyze(tree, ctx);
}
- // Record any ACID compliant FileSinkOperators we saw so we can add our transaction ID to
- // them later.
- acidSinks = sem.getAcidFileSinks();
-
LOG.info("Semantic Analysis Completed");
// validate the plan
sem.validate();
- acidInQuery = sem.hasAcidInQuery();
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE);
if (isInterrupted()) {
@@ -669,7 +671,39 @@ public class Driver implements CommandProcessor {
}
}
-
+ private boolean startImplicitTxn(HiveTxnManager txnManager) throws LockException {
+ boolean shouldOpenImplicitTxn = !ctx.isExplainPlan();
+ //this is dumb. HiveOperation is not always set. see HIVE-16447/HIVE-16443
+ switch (queryState.getHiveOperation() == null ? HiveOperation.QUERY : queryState.getHiveOperation()) {
+ case COMMIT:
+ case ROLLBACK:
+ if(!txnManager.isTxnOpen()) {
+ throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, queryState.getHiveOperation().getOperationName());
+ }
+ case SWITCHDATABASE:
+ case SET_AUTOCOMMIT:
+ /**
+ * autocommit is here for completeness. TM doesn't use it. If we want to support JDBC
+ * semantics (or any other definition of autocommit) it should be done at session level.
+ */
+ case SHOWDATABASES:
+ case SHOWTABLES:
+ case SHOWCOLUMNS:
+ case SHOWFUNCTIONS:
+ case SHOWINDEXES:
+ case SHOWPARTITIONS:
+ case SHOWLOCKS:
+ case SHOWVIEWS:
+ case SHOW_ROLES:
+ case SHOW_ROLE_PRINCIPALS:
+ case SHOW_COMPACTIONS:
+ case SHOW_TRANSACTIONS:
+ case ABORT_TRANSACTIONS:
+ shouldOpenImplicitTxn = false;
+ //this implies that no locks are needed for such a command
+ }
+ return shouldOpenImplicitTxn;
+ }
private int handleInterruption(String msg) {
return handleInterruptionWithHook(msg, null, null);
}
@@ -1083,8 +1117,17 @@ public class Driver implements CommandProcessor {
// Write the current set of valid transactions into the conf file so that it can be read by
// the input format.
private void recordValidTxns() throws LockException {
+ ValidTxnList oldList = null;
+ String s = conf.get(ValidTxnList.VALID_TXNS_KEY);
+ if(s != null && s.length() > 0) {
+ oldList = new ValidReadTxnList(s);
+ }
HiveTxnManager txnMgr = SessionState.get().getTxnMgr();
ValidTxnList txns = txnMgr.getValidTxns();
+ if(oldList != null) {
+ throw new IllegalStateException("calling recordValidTxn() more than once in the same " +
+ JavaUtils.txnIdToString(txnMgr.getCurrentTxnId()));
+ }
String txnStr = txns.toString();
conf.set(ValidTxnList.VALID_TXNS_KEY, txnStr);
if(plan.getFetchTask() != null) {
@@ -1098,79 +1141,61 @@ public class Driver implements CommandProcessor {
LOG.debug("Encoding valid txns info " + txnStr + " txnid:" + txnMgr.getCurrentTxnId());
}
+ private String getUserFromUGI() {
+ // Don't use the userName member, as it may or may not have been set. Get the value from
+ // conf, which calls into getUGI to figure out who the process is running as.
+ try {
+ return conf.getUser();
+ } catch (IOException e) {
+ errorMessage = "FAILED: Error in determining user while acquiring locks: " + e.getMessage();
+ SQLState = ErrorMsg.findSQLState(e.getMessage());
+ downstreamError = e;
+ console.printError(errorMessage,
+ "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
+ }
+ return null;
+ }
/**
* Acquire read and write locks needed by the statement. The list of objects to be locked are
- * obtained from the inputs and outputs populated by the compiler. The lock acquisition scheme is
- * pretty simple. If all the locks cannot be obtained, error out. Deadlock is avoided by making
- * sure that the locks are lexicographically sorted.
+ * obtained from the inputs and outputs populated by the compiler. Locking strategy depends on
+ * HiveTxnManager and HiveLockManager configured
*
* This method also records the list of valid transactions. This must be done after any
- * transactions have been opened and locks acquired.
- * @param startTxnImplicitly in AC=false, the 1st DML starts a txn
+ * transactions have been opened.
**/
- private int acquireLocksAndOpenTxn(boolean startTxnImplicitly) {
+ private int acquireLocks() {
PerfLogger perfLogger = SessionState.getPerfLogger();
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ACQUIRE_READ_WRITE_LOCKS);
SessionState ss = SessionState.get();
HiveTxnManager txnMgr = ss.getTxnMgr();
- if(startTxnImplicitly) {
- assert !txnMgr.getAutoCommit();
+ if(!txnMgr.isTxnOpen() && txnMgr.supportsAcid()) {
+ /*non acid txn managers don't support txns but fwd lock requests to lock managers
+ acid txn manager requires all locks to be associated with a txn so if we
+ end up here w/o an open txn it's because we are processing something like "use <database>
+ which by definition needs no locks*/
+ return 0;
}
-
try {
- // Don't use the userName member, as it may or may not have been set. Get the value from
- // conf, which calls into getUGI to figure out who the process is running as.
- String userFromUGI;
- try {
- userFromUGI = conf.getUser();
- } catch (IOException e) {
- errorMessage = "FAILED: Error in determining user while acquiring locks: " + e.getMessage();
- SQLState = ErrorMsg.findSQLState(e.getMessage());
- downstreamError = e;
- console.printError(errorMessage,
- "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
+ String userFromUGI = getUserFromUGI();
+ if(userFromUGI == null) {
return 10;
}
-
- boolean initiatingTransaction = false;
- boolean readOnlyQueryInAutoCommit = false;
- if((txnMgr.getAutoCommit() && haveAcidWrite()) || plan.getOperation() == HiveOperation.START_TRANSACTION ||
- (!txnMgr.getAutoCommit() && startTxnImplicitly)) {
- if(txnMgr.isTxnOpen()) {
- throw new RuntimeException("Already have an open transaction txnid:" + txnMgr.getCurrentTxnId());
- }
- // We are writing to tables in an ACID compliant way, so we need to open a transaction
- txnMgr.openTxn(ctx, userFromUGI);
- initiatingTransaction = true;
- }
- else {
- readOnlyQueryInAutoCommit = txnMgr.getAutoCommit() && plan.getOperation() == HiveOperation.QUERY && !haveAcidWrite();
- }
// Set the transaction id in all of the acid file sinks
if (haveAcidWrite()) {
- for (FileSinkDesc desc : acidSinks) {
+ for (FileSinkDesc desc : plan.getAcidSinks()) {
desc.setTransactionId(txnMgr.getCurrentTxnId());
//it's possible to have > 1 FileSink writing to the same table/partition
//e.g. Merge stmt, multi-insert stmt when mixing DP and SP writes
desc.setStatementId(txnMgr.getWriteIdAndIncrement());
}
}
- /*Note, we have to record snapshot after lock acquisition to prevent lost update problem
- consider 2 concurrent "update table T set x = x + 1". 1st will get the locks and the
- 2nd will block until 1st one commits and only then lock in the snapshot, i.e. it will
- see the changes made by 1st one. This takes care of autoCommit=true case.
- For multi-stmt txns this is not sufficient and will be managed via WriteSet tracking
- in the lock manager.*/
+ /*It's imperative that {@code acquireLocks()} is called for all commands so that
+ HiveTxnManager can transition its state machine correctly*/
txnMgr.acquireLocks(plan, ctx, userFromUGI, lDrvState);
- if(initiatingTransaction || (readOnlyQueryInAutoCommit && acidInQuery)) {
- //For multi-stmt txns we should record the snapshot when txn starts but
- // don't update it after that until txn completes. Thus the check for {@code initiatingTransaction}
- //For autoCommit=true, Read-only statements, txn is implicit, i.e. lock in the snapshot
- //for each statement.
+ if(txnMgr.recordSnapshot(plan)) {
recordValidTxns();
}
-
return 0;
} catch (Exception e) {
errorMessage = "FAILED: Error in acquiring locks: " + e.getMessage();
@@ -1185,7 +1210,7 @@ public class Driver implements CommandProcessor {
}
private boolean haveAcidWrite() {
- return acidSinks != null && !acidSinks.isEmpty();
+ return !plan.getAcidSinks().isEmpty();
}
/**
* @param commit if there is an open transaction and if true, commit,
@@ -1193,11 +1218,11 @@ public class Driver implements CommandProcessor {
* @param txnManager an optional existing transaction manager retrieved earlier from the session
*
**/
- private void releaseLocksAndCommitOrRollback(boolean commit, HiveTxnManager txnManager)
+ @VisibleForTesting
+ public void releaseLocksAndCommitOrRollback(boolean commit, HiveTxnManager txnManager)
throws LockException {
PerfLogger perfLogger = SessionState.getPerfLogger();
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RELEASE_LOCKS);
-
HiveTxnManager txnMgr;
if (txnManager == null) {
SessionState ss = SessionState.get();
@@ -1207,6 +1232,7 @@ public class Driver implements CommandProcessor {
}
// If we've opened a transaction we need to commit or rollback rather than explicitly
// releasing the locks.
+ conf.unset(ValidTxnList.VALID_TXNS_KEY);
if (txnMgr.isTxnOpen()) {
if (commit) {
if(conf.getBoolVar(ConfVars.HIVE_IN_TEST) && conf.getBoolVar(ConfVars.HIVETESTMODEROLLBACKTXN)) {
@@ -1492,52 +1518,12 @@ public class Driver implements CommandProcessor {
HiveTxnManager txnManager = SessionState.get().getTxnMgr();
ctx.setHiveTxnManager(txnManager);
- boolean startTxnImplicitly = false;
- {
- //this block ensures op makes sense in given context, e.g. COMMIT is valid only if txn is open
- //DDL is not allowed in a txn, etc.
- //an error in an open txn does a rollback of the txn
- if (txnManager.isTxnOpen() && !plan.getOperation().isAllowedInTransaction()) {
- assert !txnManager.getAutoCommit() : "didn't expect AC=true";
- return rollback(new CommandProcessorResponse(12, ErrorMsg.OP_NOT_ALLOWED_IN_TXN, null,
- plan.getOperationName(), Long.toString(txnManager.getCurrentTxnId())));
- }
- if(!txnManager.isTxnOpen() && plan.getOperation().isRequiresOpenTransaction()) {
- return rollback(new CommandProcessorResponse(12, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, null, plan.getOperationName()));
- }
- if(!txnManager.isTxnOpen() && plan.getOperation() == HiveOperation.QUERY && !txnManager.getAutoCommit()) {
- //this effectively makes START TRANSACTION optional and supports JDBC setAutoCommit(false) semantics
- //also, indirectly allows DDL to be executed outside a txn context
- startTxnImplicitly = true;
- }
- if(txnManager.getAutoCommit() && plan.getOperation() == HiveOperation.START_TRANSACTION) {
- return rollback(new CommandProcessorResponse(12, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT, null, plan.getOperationName()));
- }
- }
- if(plan.getOperation() == HiveOperation.SET_AUTOCOMMIT) {
- try {
- if(plan.getAutoCommitValue() && !txnManager.getAutoCommit()) {
- /*here, if there is an open txn, we want to commit it; this behavior matches
- * https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#setAutoCommit(boolean)*/
- releaseLocksAndCommitOrRollback(true, null);
- txnManager.setAutoCommit(true);
- }
- else if(!plan.getAutoCommitValue() && txnManager.getAutoCommit()) {
- txnManager.setAutoCommit(false);
- }
- else {/*didn't change autoCommit value - no-op*/}
- }
- catch(LockException e) {
- return handleHiveException(e, 12);
- }
- }
-
if (requiresLock()) {
// a checkpoint to see if the thread is interrupted or not before an expensive operation
if (isInterrupted()) {
ret = handleInterruption("at acquiring the lock.");
} else {
- ret = acquireLocksAndOpenTxn(startTxnImplicitly);
+ ret = acquireLocks();
}
if (ret != 0) {
return rollback(createProcessorResponse(ret));
@@ -1551,7 +1537,8 @@ public class Driver implements CommandProcessor {
//if needRequireLock is false, the release here will do nothing because there is no lock
try {
- if(txnManager.getAutoCommit() || plan.getOperation() == HiveOperation.COMMIT) {
+ //since set autocommit starts an implicit txn, close it
+ if(txnManager.isImplicitTransactionOpen() || plan.getOperation() == HiveOperation.COMMIT) {
releaseLocksAndCommitOrRollback(true, null);
}
else if(plan.getOperation() == HiveOperation.ROLLBACK) {
@@ -1678,6 +1665,13 @@ public class Driver implements CommandProcessor {
private CommandProcessorResponse createProcessorResponse(int ret) {
SessionState.getPerfLogger().cleanupPerfLogMetrics();
queryDisplay.setErrorMessage(errorMessage);
+ if(downstreamError != null && downstreamError instanceof HiveException) {
+ ErrorMsg em = ((HiveException)downstreamError).getCanonicalErrorMsg();
+ if(em != null) {
+ return new CommandProcessorResponse(ret, errorMessage, SQLState,
+ schema, downstreamError, em.getErrorCode(), null);
+ }
+ }
return new CommandProcessorResponse(ret, errorMessage, SQLState, downstreamError);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 9667d71..d01a203 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -482,9 +482,17 @@ public enum ErrorMsg {
"is controlled by hive.exec.max.dynamic.partitions and hive.exec.max.dynamic.partitions.pernode. "),
PARTITION_SCAN_LIMIT_EXCEEDED(20005, "Number of partitions scanned (={0}) on table {1} exceeds limit" +
" (={2}). This is controlled by hive.limit.query.max.table.partition.", true),
- OP_NOT_ALLOWED_IN_AUTOCOMMIT(20006, "Operation {0} is not allowed when autoCommit=true.", true),//todo: better SQLState?
- OP_NOT_ALLOWED_IN_TXN(20007, "Operation {0} is not allowed in a transaction. TransactionID={1}.", true),
- OP_NOT_ALLOWED_WITHOUT_TXN(20008, "Operation {0} is not allowed since autoCommit=false and there is no active transaction", true),
+ /**
+ * {1} is the transaction id;
+ * use {@link org.apache.hadoop.hive.common.JavaUtils#txnIdToString(long)} to format
+ */
+ OP_NOT_ALLOWED_IN_IMPLICIT_TXN(20006, "Operation {0} is not allowed in an implicit transaction ({1}).", true),
+ /**
+ * {1} is the transaction id;
+ * use {@link org.apache.hadoop.hive.common.JavaUtils#txnIdToString(long)} to format
+ */
+ OP_NOT_ALLOWED_IN_TXN(20007, "Operation {0} is not allowed in a transaction ({1},queryId={2}).", true),
+ OP_NOT_ALLOWED_WITHOUT_TXN(20008, "Operation {0} is not allowed without an active transaction", true),
//========================== 30000 range starts here ========================//
STATSPUBLISHER_NOT_OBTAINED(30000, "StatsPublisher cannot be obtained. " +
"There was a error to retrieve the StatsPublisher, and retrying " +
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
index e8c8ae6..2ddabd9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
@@ -35,6 +35,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hive.metastore.api.Schema;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
import org.apache.hadoop.hive.ql.exec.ExplainTask;
@@ -48,6 +49,7 @@ import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.ColumnAccessInfo;
import org.apache.hadoop.hive.ql.parse.TableAccessInfo;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReducerTimeStatsPerJob;
@@ -105,11 +107,19 @@ public class QueryPlan implements Serializable {
private transient Long queryStartTime;
private final HiveOperation operation;
+ private final boolean acidResourcesInQuery;
+ private final Set<FileSinkDesc> acidSinks;
private Boolean autoCommitValue;
public QueryPlan() {
- this.reducerTimeStatsPerJobList = new ArrayList<ReducerTimeStatsPerJob>();
- operation = null;
+ this(null);
+ }
+ @VisibleForTesting
+ protected QueryPlan(HiveOperation command) {
+ this.reducerTimeStatsPerJobList = new ArrayList<>();
+ this.operation = command;
+ this.acidResourcesInQuery = false;
+ this.acidSinks = Collections.emptySet();
}
public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, String queryId,
@@ -136,8 +146,22 @@ public class QueryPlan implements Serializable {
this.operation = operation;
this.autoCommitValue = sem.getAutoCommitValue();
this.resultSchema = resultSchema;
+ this.acidResourcesInQuery = sem.hasAcidInQuery();
+ this.acidSinks = sem.getAcidFileSinks();
}
+ /**
+ * @return true if any acid resources are read/written
+ */
+ public boolean hasAcidResourcesInQuery() {
+ return acidResourcesInQuery;
+ }
+ /**
+ * @return Collection of FileSinkDesc representing writes to Acid resources
+ */
+ Set<FileSinkDesc> getAcidSinks() {
+ return acidSinks;
+ }
public String getQueryStr() {
return queryString;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 62f7c5a..cdf2c40 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -21,6 +21,11 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.plan.HiveOperation;
+import org.apache.hadoop.hive.ql.plan.LockDatabaseDesc;
+import org.apache.hadoop.hive.ql.plan.LockTableDesc;
+import org.apache.hadoop.hive.ql.plan.UnlockDatabaseDesc;
+import org.apache.hadoop.hive.ql.plan.UnlockTableDesc;
import org.apache.hive.common.util.ShutdownHookManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,6 +62,13 @@ import java.util.concurrent.atomic.AtomicInteger;
* with a single thread accessing it at a time, with the exception of {@link #heartbeat()} method.
* The later may (usually will) be called from a timer thread.
* See {@link #getMS()} for more important concurrency/metastore access notes.
+ *
+ * Each statement that the TM (transaction manager) should be aware of should belong to a transaction.
+ * Effectively, that means any statement that has side effects. Exceptions are statements like
+ * Show Compactions, Show Tables, Use Database foo, etc. The transaction is started either
+ * explicitly ( via Start Transaction SQL statement from end user - not fully supported) or
+ * implicitly by the {@link org.apache.hadoop.hive.ql.Driver} (which looks exactly as autoCommit=true
+ * from end user poit of view). See more at {@link #isExplicitTransaction}.
*/
public final class DbTxnManager extends HiveTxnManagerImpl {
@@ -76,7 +88,47 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
* to keep apart multiple writes of the same data within the same transaction
* Also see {@link org.apache.hadoop.hive.ql.io.AcidOutputFormat.Options}
*/
- private int statementId = -1;
+ private int writeId = -1;
+ /**
+ * counts number of statements in the current transaction
+ */
+ private int numStatements = 0;
+ /**
+ * if {@code true} it means current transaction is started via START TRANSACTION which means it cannot
+ * include any Operations which cannot be rolled back (drop partition; write to non-acid table).
+ * If false, it's a single statement transaction which can include any statement. This is not a
+ * contradiction from the user point of view who doesn't know anything about the implicit txn
+ * and cannot call rollback (the statement of course can fail in which case there is nothing to
+ * rollback (assuming the statement is well implemented)).
+ *
+ * This is done so that all commands run in a transaction which simplifies implementation and
+ * allows a simple implementation of multi-statement txns which don't require a lock manager
+ * capable of deadlock detection. (todo: not fully implemented; elaborate on how this LM works)
+ *
+ * Also, critically important, ensuring that everything runs in a transaction assigns an order
+ * to all operations in the system - needed for replication/DR.
+ *
+ * We don't want to allow non-transactional statements in a user demarcated txn because the effect
+ * of such statement is "visible" immediately on statement completion, but the user may
+ * issue a rollback but the action of the statement can't be undone (and has possibly already been
+ * seen by another txn). For example,
+ * start transaction
+ * insert into transactional_table values(1);
+ * insert into non_transactional_table select * from transactional_table;
+ * rollback
+ *
+ * The user would be in for a surprise especially if they are not aware of transactional
+ * properties of the tables involved.
+ *
+ * As a side note: what should the lock manager do with locks for non-transactional resources?
+ * Should it it release them at the end of the stmt or txn?
+ * Some interesting thoughts: http://mysqlmusings.blogspot.com/2009/02/mixing-engines-in-transactions.html
+ */
+ private boolean isExplicitTransaction = false;
+ /**
+ * To ensure transactions don't nest.
+ */
+ private int startTransactionCount = 0;
// QueryId for the query in current transaction
private String queryId;
@@ -141,15 +193,22 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
@VisibleForTesting
long openTxn(Context ctx, String user, long delay) throws LockException {
- //todo: why don't we lock the snapshot here??? Instead of having client make an explicit call
- //whenever it chooses
+ /*Q: why don't we lock the snapshot here??? Instead of having client make an explicit call
+ whenever it chooses
+ A: If we want to rely on locks for transaction scheduling we must get the snapshot after lock
+ acquisition. Relying on locks is a pessimistic strategy which works better under high
+ contention.*/
init();
+ getLockManager();
if(isTxnOpen()) {
throw new LockException("Transaction already opened. " + JavaUtils.txnIdToString(txnId));
}
try {
txnId = getMS().openTxn(user);
- statementId = 0;
+ writeId = 0;
+ numStatements = 0;
+ isExplicitTransaction = false;
+ startTransactionCount = 0;
LOG.debug("Opened " + JavaUtils.txnIdToString(txnId));
ctx.setHeartbeater(startHeartbeat(delay));
return txnId;
@@ -159,8 +218,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
}
/**
- * we don't expect multiple thread to call this method concurrently but {@link #lockMgr} will
- * be read by a different threads that one writing it, thus it's {@code volatile}
+ * we don't expect multiple threads to call this method concurrently but {@link #lockMgr} will
+ * be read by a different threads than one writing it, thus it's {@code volatile}
*/
@Override
public HiveLockManager getLockManager() throws LockException {
@@ -179,24 +238,95 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
catch(LockException e) {
if(e.getCause() instanceof TxnAbortedException) {
txnId = 0;
- statementId = -1;
+ writeId = -1;
}
throw e;
}
}
/**
- * This is for testing only. Normally client should call {@link #acquireLocks(org.apache.hadoop.hive.ql.QueryPlan, org.apache.hadoop.hive.ql.Context, String)}
+ * Watermark to include in error msgs and logs
+ * @param queryPlan
+ * @return
+ */
+ private static String getQueryIdWaterMark(QueryPlan queryPlan) {
+ return "queryId=" + queryPlan.getQueryId();
+ }
+
+ private void markExplicitTransaction(QueryPlan queryPlan) throws LockException {
+ isExplicitTransaction = true;
+ if(++startTransactionCount > 1) {
+ throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_IN_TXN, queryPlan.getOperationName(),
+ JavaUtils.txnIdToString(getCurrentTxnId()), queryPlan.getQueryId());
+ }
+
+ }
+ /**
+ * Ensures that the current SQL statement is appropriate for the current state of the
+ * Transaction Manager (e.g. can call commit unless you called start transaction)
+ *
+ * Note that support for multi-statement txns is a work-in-progress so it's only supported in
+ * HiveConf#HIVE_IN_TEST/HiveConf#TEZ_HIVE_IN_TEST.
+ * @param queryPlan
+ * @throws LockException
+ */
+ private void verifyState(QueryPlan queryPlan) throws LockException {
+ if(!isTxnOpen()) {
+ throw new LockException("No transaction context for operation: " + queryPlan.getOperationName() +
+ " for " + getQueryIdWaterMark(queryPlan));
+ }
+ if(queryPlan.getOperation() == null) {
+ throw new IllegalStateException("Unkown HiverOperation for " + getQueryIdWaterMark(queryPlan));
+ }
+ numStatements++;
+ switch (queryPlan.getOperation()) {
+ case START_TRANSACTION:
+ markExplicitTransaction(queryPlan);
+ break;
+ case COMMIT:
+ case ROLLBACK:
+ if(!isTxnOpen()) {
+ throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, queryPlan.getOperationName());
+ }
+ if(!isExplicitTransaction) {
+ throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_IN_IMPLICIT_TXN, queryPlan.getOperationName());
+ }
+ break;
+ default:
+ if(!queryPlan.getOperation().isAllowedInTransaction() && isExplicitTransaction) {
+ //for example, drop table in an explicit txn is not allowed
+ //in some cases this requires looking at more than just the operation
+ //for example HiveOperation.LOAD - OK if target is MM table but not OK if non-acid table
+ throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_IN_TXN, queryPlan.getOperationName(),
+ JavaUtils.txnIdToString(getCurrentTxnId()), queryPlan.getQueryId());
+ }
+ }
+ /*
+ Should we allow writing to non-transactional tables in an explicit transaction? The user may
+ issue ROLLBACK but these tables won't rollback.
+ Can do this by checking ReadEntity/WriteEntity to determine whether it's reading/writing
+ any non acid and raise an appropriate error
+ * Driver.acidSinks and Driver.acidInQuery can be used if any acid is in the query*/
+ }
+ /**
+ * Normally client should call {@link #acquireLocks(org.apache.hadoop.hive.ql.QueryPlan, org.apache.hadoop.hive.ql.Context, String)}
* @param isBlocking if false, the method will return immediately; thus the locks may be in LockState.WAITING
* @return null if no locks were needed
*/
+ @VisibleForTesting
LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isBlocking) throws LockException {
init();
- // Make sure we've built the lock manager
+ // Make sure we've built the lock manager
getLockManager();
-
+ verifyState(plan);
boolean atLeastOneLock = false;
queryId = plan.getQueryId();
+ switch (plan.getOperation()) {
+ case SET_AUTOCOMMIT:
+ /**This is here for documentation purposes. This TM doesn't support this - only has one
+ * mode of operation documented at {@link DbTxnManager#isExplicitTransaction}*/
+ return null;
+ }
LockRequestBuilder rqstBuilder = new LockRequestBuilder(queryId);
//link queryId to txnId
@@ -240,8 +370,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
// This is a file or something we don't hold locks for.
continue;
}
- if(t != null && AcidUtils.isAcidTable(t)) {
- compBuilder.setIsAcid(true);
+ if(t != null) {
+ compBuilder.setIsAcid(AcidUtils.isAcidTable(t));
}
LockComponent comp = compBuilder.build();
LOG.debug("Adding lock component to lock request " + comp.toString());
@@ -262,7 +392,33 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
}
LockComponentBuilder compBuilder = new LockComponentBuilder();
Table t = null;
+ switch (output.getType()) {
+ case DATABASE:
+ compBuilder.setDbName(output.getDatabase().getName());
+ break;
+
+ case TABLE:
+ case DUMMYPARTITION: // in case of dynamic partitioning lock the table
+ t = output.getTable();
+ compBuilder.setDbName(t.getDbName());
+ compBuilder.setTableName(t.getTableName());
+ break;
+
+ case PARTITION:
+ compBuilder.setPartitionName(output.getPartition().getName());
+ t = output.getPartition().getTable();
+ compBuilder.setDbName(t.getDbName());
+ compBuilder.setTableName(t.getTableName());
+ break;
+
+ default:
+ // This is a file or something we don't hold locks for.
+ continue;
+ }
switch (output.getWriteType()) {
+ /* base this on HiveOperation instead? this and DDL_NO_LOCK is peppered all over the code...
+ Seems much cleaner if each stmt is identified as a particular HiveOperation (which I'd think
+ makes sense everywhere). This however would be problematic for merge...*/
case DDL_EXCLUSIVE:
case INSERT_OVERWRITE:
compBuilder.setExclusive();
@@ -270,10 +426,9 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
break;
case INSERT:
- t = getTable(output);
+ assert t != null;
if(AcidUtils.isAcidTable(t)) {
compBuilder.setShared();
- compBuilder.setIsAcid(true);
}
else {
if (conf.getBoolVar(HiveConf.ConfVars.HIVE_TXN_STRICT_LOCKING_MODE)) {
@@ -281,7 +436,6 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
} else { // this is backward compatible for non-ACID resources, w/o ACID semantics
compBuilder.setShared();
}
- compBuilder.setIsAcid(false);
}
compBuilder.setOperationType(DataOperationType.INSERT);
break;
@@ -293,12 +447,10 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
case UPDATE:
compBuilder.setSemiShared();
compBuilder.setOperationType(DataOperationType.UPDATE);
- t = getTable(output);
break;
case DELETE:
compBuilder.setSemiShared();
compBuilder.setOperationType(DataOperationType.DELETE);
- t = getTable(output);
break;
case DDL_NO_LOCK:
@@ -307,34 +459,11 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
default:
throw new RuntimeException("Unknown write type " +
output.getWriteType().toString());
-
}
- switch (output.getType()) {
- case DATABASE:
- compBuilder.setDbName(output.getDatabase().getName());
- break;
-
- case TABLE:
- case DUMMYPARTITION: // in case of dynamic partitioning lock the table
- t = output.getTable();
- compBuilder.setDbName(t.getDbName());
- compBuilder.setTableName(t.getTableName());
- break;
-
- case PARTITION:
- compBuilder.setPartitionName(output.getPartition().getName());
- t = output.getPartition().getTable();
- compBuilder.setDbName(t.getDbName());
- compBuilder.setTableName(t.getTableName());
- break;
-
- default:
- // This is a file or something we don't hold locks for.
- continue;
- }
- if(t != null && AcidUtils.isAcidTable(t)) {
- compBuilder.setIsAcid(true);
+ if(t != null) {
+ compBuilder.setIsAcid(AcidUtils.isAcidTable(t));
}
+
compBuilder.setIsDynamicPartitionWrite(output.isDynamicPartitionWrite());
LockComponent comp = compBuilder.build();
LOG.debug("Adding lock component to lock request " + comp.toString());
@@ -405,7 +534,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
e);
} finally {
txnId = 0;
- statementId = -1;
+ writeId = -1;
+ numStatements = 0;
}
}
@@ -429,7 +559,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
e);
} finally {
txnId = 0;
- statementId = -1;
+ writeId = -1;
+ numStatements = 0;
}
}
@@ -556,6 +687,26 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
public boolean supportsExplicitLock() {
return false;
}
+ @Override
+ public int lockTable(Hive db, LockTableDesc lockTbl) throws HiveException {
+ super.lockTable(db, lockTbl);
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public int unlockTable(Hive hiveDB, UnlockTableDesc unlockTbl) throws HiveException {
+ super.unlockTable(hiveDB, unlockTbl);
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public int lockDatabase(Hive hiveDB, LockDatabaseDesc lockDb) throws HiveException {
+ super.lockDatabase(hiveDB, lockDb);
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public int unlockDatabase(Hive hiveDB, UnlockDatabaseDesc unlockDb) throws HiveException {
+ super.unlockDatabase(hiveDB, unlockDb);
+ throw new UnsupportedOperationException();
+ }
@Override
public boolean useNewShowLocksFormat() {
@@ -566,7 +717,44 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
public boolean supportsAcid() {
return true;
}
-
+ /**
+ * In an explicit txn start_transaction is the 1st statement and we record the snapshot at the
+ * start of the txn for Snapshot Isolation. For Read Committed (not supported yet) we'd record
+ * it before executing each statement (but after lock acquisition if using lock based concurrency
+ * control).
+ * For implicit txn, the stmt that triggered/started the txn is the first statement
+ */
+ @Override
+ public boolean recordSnapshot(QueryPlan queryPlan) {
+ assert isTxnOpen();
+ assert numStatements > 0 : "was acquireLocks() called already?";
+ if(queryPlan.getOperation() == HiveOperation.START_TRANSACTION) {
+ //here if start of explicit txn
+ assert isExplicitTransaction;
+ assert numStatements == 1;
+ return true;
+ }
+ else if(!isExplicitTransaction) {
+ assert numStatements == 1 : "numStatements=" + numStatements + " in implicit txn";
+ if (queryPlan.hasAcidResourcesInQuery()) {
+ //1st and only stmt in implicit txn and uses acid resource
+ return true;
+ }
+ }
+ return false;
+ }
+ @Override
+ public boolean isImplicitTransactionOpen() {
+ if(!isTxnOpen()) {
+ //some commands like "show databases" don't start implicit transactions
+ return false;
+ }
+ if(!isExplicitTransaction) {
+ assert numStatements == 1 : "numStatements=" + numStatements;
+ return true;
+ }
+ return false;
+ }
@Override
protected void destruct() {
try {
@@ -626,7 +814,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
@Override
public int getWriteIdAndIncrement() {
assert isTxnOpen();
- return statementId++;
+ return writeId++;
}
private static long getHeartbeatInterval(Configuration conf) throws LockException {
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
index 187a658..b24351c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
@@ -72,7 +72,7 @@ public interface HiveTxnManager {
/**
* Acquire all of the locks needed by a query. If used with a query that
- * requires transactions, this should be called after {@link #openTxn(String)}.
+ * requires transactions, this should be called after {@link #openTxn(Context, String)}.
* A list of acquired locks will be stored in the
* {@link org.apache.hadoop.hive.ql.Context} object and can be retrieved
* via {@link org.apache.hadoop.hive.ql.Context#getHiveLocks}.
@@ -208,17 +208,13 @@ public interface HiveTxnManager {
boolean supportsAcid();
/**
- * This behaves exactly as
- * https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#setAutoCommit(boolean)
- */
- void setAutoCommit(boolean autoCommit) throws LockException;
-
- /**
- * This behaves exactly as
- * https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#getAutoCommit()
+ * For resources that support MVCC, the state of the DB must be recorded for the duration of the
+ * operation/transaction. Returns {@code true} if current statment needs to do this.
*/
- boolean getAutoCommit();
+ boolean recordSnapshot(QueryPlan queryPlan);
+ boolean isImplicitTransactionOpen();
+
boolean isTxnOpen();
/**
* if {@code isTxnOpen()}, returns the currently active transaction ID
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
index 9fa416c..8dbbf87 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
@@ -45,7 +45,6 @@ import org.apache.hadoop.hive.ql.plan.UnlockTableDesc;
abstract class HiveTxnManagerImpl implements HiveTxnManager {
protected HiveConf conf;
- private boolean isAutoCommit = true;//true by default; matches JDBC spec
void setHiveConf(HiveConf c) {
conf = c;
@@ -68,16 +67,6 @@ abstract class HiveTxnManagerImpl implements HiveTxnManager {
destruct();
}
@Override
- public void setAutoCommit(boolean autoCommit) throws LockException {
- isAutoCommit = autoCommit;
- }
-
- @Override
- public boolean getAutoCommit() {
- return isAutoCommit;
- }
-
- @Override
public int lockTable(Hive db, LockTableDesc lockTbl) throws HiveException {
HiveLockManager lockMgr = getAndCheckLockManager();
@@ -203,4 +192,13 @@ abstract class HiveTxnManagerImpl implements HiveTxnManager {
return lockMgr;
}
+ @Override
+ public boolean recordSnapshot(QueryPlan queryPlan) {
+ return false;
+ }
+ @Override
+ public boolean isImplicitTransactionOpen() {
+ return true;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java
index f62cf9a..668783a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java
@@ -125,6 +125,7 @@ public class ExplainSemanticAnalyzer extends BaseSemanticAnalyzer {
}
ctx.setExplainConfig(config);
+ ctx.setExplainPlan(true);
ASTNode input = (ASTNode) ast.getChild(0);
// explain analyze is composed of two steps
@@ -137,7 +138,7 @@ public class ExplainSemanticAnalyzer extends BaseSemanticAnalyzer {
Context runCtx = null;
try {
runCtx = new Context(conf);
- // runCtx and ctx share the configuration
+ // runCtx and ctx share the configuration, but not isExplainPlan()
runCtx.setExplainConfig(config);
Driver driver = new Driver(conf, runCtx);
CommandProcessorResponse ret = driver.run(query);
@@ -161,6 +162,9 @@ public class ExplainSemanticAnalyzer extends BaseSemanticAnalyzer {
LOG.info("Explain analyze (analyzing phase) for query " + query);
config.setAnalyze(AnalyzeState.ANALYZING);
}
+ //Creating new QueryState unfortunately causes all .q.out to change - do this in a separate ticket
+ //Sharing QueryState between generating the plan and executing the query seems bad
+ //BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(new QueryState(queryState.getConf()), input);
BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, input);
sem.analyze(input, ctx);
sem.validate();
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
index 520d3de..3c60e03 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
@@ -21,9 +21,9 @@ package org.apache.hadoop.hive.ql.parse;
import org.antlr.runtime.tree.Tree;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.QueryState;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
-import org.apache.hadoop.hive.ql.session.SessionState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.HashMap;
@@ -32,6 +32,7 @@ import java.util.HashMap;
*
*/
public final class SemanticAnalyzerFactory {
+ static final private Logger LOG = LoggerFactory.getLogger(SemanticAnalyzerFactory.class);
static HashMap<Integer, HiveOperation> commandType = new HashMap<Integer, HiveOperation>();
static HashMap<Integer, HiveOperation[]> tablePartitionCommandType = new HashMap<Integer, HiveOperation[]>();
@@ -131,7 +132,6 @@ public final class SemanticAnalyzerFactory {
commandType.put(HiveParser.TOK_REPL_DUMP, HiveOperation.EXPORT); // piggyback on EXPORT security handling for now
commandType.put(HiveParser.TOK_REPL_LOAD, HiveOperation.IMPORT); // piggyback on IMPORT security handling for now
commandType.put(HiveParser.TOK_REPL_STATUS, HiveOperation.SHOW_TBLPROPERTIES); // TODO : also actually DESCDATABASE
-
}
static {
@@ -171,7 +171,22 @@ public final class SemanticAnalyzerFactory {
HiveOperation.ALTERTABLE_UPDATEPARTSTATS});
}
- public static BaseSemanticAnalyzer get(QueryState queryState, ASTNode tree)
+
+ public static BaseSemanticAnalyzer get(QueryState queryState, ASTNode tree) throws SemanticException {
+ BaseSemanticAnalyzer sem = getInternal(queryState, tree);
+ if(queryState.getHiveOperation() == null) {
+ String query = queryState.getQueryString();
+ if(query != null && query.length() > 30) {
+ query = query.substring(0, 30);
+ }
+ String msg = "Unknown HiveOperation for query='" + query + "' queryId=" + queryState.getQueryId();
+ //throw new IllegalStateException(msg);
+ LOG.debug(msg);
+ }
+ return sem;
+ }
+
+ private static BaseSemanticAnalyzer getInternal(QueryState queryState, ASTNode tree)
throws SemanticException {
if (tree.getToken() == null) {
throw new RuntimeException("Empty Syntax Tree");
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java
index d333f91..ecac31f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java
@@ -27,7 +27,7 @@ public enum HiveOperation {
IMPORT("IMPORT", null, new Privilege[]{Privilege.ALTER_METADATA, Privilege.ALTER_DATA}),
CREATEDATABASE("CREATEDATABASE", null, new Privilege[]{Privilege.CREATE}),
DROPDATABASE("DROPDATABASE", null, new Privilege[]{Privilege.DROP}),
- SWITCHDATABASE("SWITCHDATABASE", null, null),
+ SWITCHDATABASE("SWITCHDATABASE", null, null, true, false),
LOCKDB("LOCKDATABASE", new Privilege[]{Privilege.LOCK}, null),
UNLOCKDB("UNLOCKDATABASE", new Privilege[]{Privilege.LOCK}, null),
DROPTABLE ("DROPTABLE", null, new Privilege[]{Privilege.DROP}),
@@ -60,19 +60,19 @@ public enum HiveOperation {
new Privilege[]{Privilege.ALTER_METADATA}, null),
ALTERPARTITION_BUCKETNUM("ALTERPARTITION_BUCKETNUM",
new Privilege[]{Privilege.ALTER_METADATA}, null),
- SHOWDATABASES("SHOWDATABASES", new Privilege[]{Privilege.SHOW_DATABASE}, null),
- SHOWTABLES("SHOWTABLES", null, null),
- SHOWCOLUMNS("SHOWCOLUMNS", null, null),
- SHOW_TABLESTATUS("SHOW_TABLESTATUS", null, null),
- SHOW_TBLPROPERTIES("SHOW_TBLPROPERTIES", null, null),
+ SHOWDATABASES("SHOWDATABASES", new Privilege[]{Privilege.SHOW_DATABASE}, null, true, false),
+ SHOWTABLES("SHOWTABLES", null, null, true, false),
+ SHOWCOLUMNS("SHOWCOLUMNS", null, null, true, false),
+ SHOW_TABLESTATUS("SHOW_TABLESTATUS", null, null, true, false),
+ SHOW_TBLPROPERTIES("SHOW_TBLPROPERTIES", null, null, true, false),
SHOW_CREATEDATABASE("SHOW_CREATEDATABASE", new Privilege[]{Privilege.SELECT}, null),
SHOW_CREATETABLE("SHOW_CREATETABLE", new Privilege[]{Privilege.SELECT}, null),
- SHOWFUNCTIONS("SHOWFUNCTIONS", null, null),
- SHOWINDEXES("SHOWINDEXES", null, null),
+ SHOWFUNCTIONS("SHOWFUNCTIONS", null, null, true, false),
+ SHOWINDEXES("SHOWINDEXES", null, null, true, false),
SHOWPARTITIONS("SHOWPARTITIONS", null, null),
- SHOWLOCKS("SHOWLOCKS", null, null),
+ SHOWLOCKS("SHOWLOCKS", null, null, true, false),
SHOWCONF("SHOWCONF", null, null),
- SHOWVIEWS("SHOWVIEWS", null, null),
+ SHOWVIEWS("SHOWVIEWS", null, null, true, false),
CREATEFUNCTION("CREATEFUNCTION", null, null),
DROPFUNCTION("DROPFUNCTION", null, null),
RELOADFUNCTION("RELOADFUNCTION", null, null),
@@ -94,12 +94,12 @@ public enum HiveOperation {
DROPROLE("DROPROLE", null, null),
GRANT_PRIVILEGE("GRANT_PRIVILEGE", null, null),
REVOKE_PRIVILEGE("REVOKE_PRIVILEGE", null, null),
- SHOW_GRANT("SHOW_GRANT", null, null),
+ SHOW_GRANT("SHOW_GRANT", null, null, true, false),
GRANT_ROLE("GRANT_ROLE", null, null),
REVOKE_ROLE("REVOKE_ROLE", null, null),
- SHOW_ROLES("SHOW_ROLES", null, null),
- SHOW_ROLE_PRINCIPALS("SHOW_ROLE_PRINCIPALS", null, null),
- SHOW_ROLE_GRANT("SHOW_ROLE_GRANT", null, null),
+ SHOW_ROLES("SHOW_ROLES", null, null, true, false),
+ SHOW_ROLE_PRINCIPALS("SHOW_ROLE_PRINCIPALS", null, null, true, false),
+ SHOW_ROLE_GRANT("SHOW_ROLE_GRANT", null, null, true, false),
ALTERTABLE_FILEFORMAT("ALTERTABLE_FILEFORMAT", new Privilege[]{Privilege.ALTER_METADATA}, null),
ALTERPARTITION_FILEFORMAT("ALTERPARTITION_FILEFORMAT", new Privilege[]{Privilege.ALTER_METADATA}, null),
ALTERTABLE_LOCATION("ALTERTABLE_LOCATION", new Privilege[]{Privilege.ALTER_DATA}, null),
@@ -128,8 +128,8 @@ public enum HiveOperation {
ALTERVIEW_RENAME("ALTERVIEW_RENAME", new Privilege[] {Privilege.ALTER_METADATA}, null),
ALTERVIEW_AS("ALTERVIEW_AS", new Privilege[] {Privilege.ALTER_METADATA}, null),
ALTERTABLE_COMPACT("ALTERTABLE_COMPACT", new Privilege[]{Privilege.SELECT}, new Privilege[]{Privilege.ALTER_DATA}),
- SHOW_COMPACTIONS("SHOW COMPACTIONS", null, null),
- SHOW_TRANSACTIONS("SHOW TRANSACTIONS", null, null),
+ SHOW_COMPACTIONS("SHOW COMPACTIONS", null, null, true, false),
+ SHOW_TRANSACTIONS("SHOW TRANSACTIONS", null, null, true, false),
START_TRANSACTION("START TRANSACTION", null, null, false, false),
COMMIT("COMMIT", null, null, true, true),
ROLLBACK("ROLLBACK", null, null, true, true),
@@ -143,7 +143,10 @@ public enum HiveOperation {
private Privilege[] outputRequiredPrivileges;
/**
- * Only a small set of operations is allowed inside an open transactions, e.g. DML
+ * Only a small set of operations is allowed inside an explicit transactions, e.g. DML on
+ * Acid tables or ops w/o persistent side effects like USE DATABASE, SHOW TABLES, etc so
+ * that rollback is meaningful
+ * todo: mark all operations appropriately
*/
private final boolean allowedInTransaction;
private final boolean requiresOpenTransaction;
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index ffce1d1..7692512 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -459,6 +459,21 @@ public class SessionState {
return txnMgr;
}
+ /**
+ * This only for testing. It allows to switch the manager before the (test) operation so that
+ * it's not coupled to the executing thread. Since tests run against Derby which often wedges
+ * under concurrent access, tests must use a single thead and simulate concurrent access.
+ * For example, {@code TestDbTxnManager2}
+ */
+ @VisibleForTesting
+ public HiveTxnManager setTxnMgr(HiveTxnManager mgr) {
+ if(!(sessionConf.getBoolVar(ConfVars.HIVE_IN_TEST) || sessionConf.getBoolVar(ConfVars.HIVE_IN_TEZ_TEST))) {
+ throw new IllegalStateException("Only for testing!");
+ }
+ HiveTxnManager tmp = txnMgr;
+ txnMgr = mgr;
+ return tmp;
+ }
public HadoopShims.HdfsEncryptionShim getHdfsEncryptionShim() throws HiveException {
try {
return getHdfsEncryptionShim(FileSystem.get(sessionConf));
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index adfe98a..23efce0 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -713,7 +713,7 @@ public class TestTxnHandler {
res = txnHandler.lock(req);
assertTrue(res.getState() == LockState.ACQUIRED);
}
-
+ @Ignore("now that every op has a txn ctx, we don't produce the error expected here....")
@Test
public void testWrongLockForOperation() throws Exception {
LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/test/org/apache/hadoop/hive/ql/TestErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestErrorMsg.java b/ql/src/test/org/apache/hadoop/hive/ql/TestErrorMsg.java
index c5b658f..7fe902e 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestErrorMsg.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestErrorMsg.java
@@ -23,6 +23,7 @@ import java.util.Set;
import junit.framework.Assert;
import junit.framework.TestCase;
+import org.apache.hadoop.hive.common.JavaUtils;
import org.junit.Test;
public class TestErrorMsg {
@@ -37,8 +38,9 @@ public class TestErrorMsg {
}
@Test
public void testReverseMatch() {
- testReverseMatch(ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT, "COMMIT");
- testReverseMatch(ErrorMsg.OP_NOT_ALLOWED_IN_TXN, "ALTER TABLE", "1");
+ testReverseMatch(ErrorMsg.OP_NOT_ALLOWED_IN_IMPLICIT_TXN, "COMMIT");
+ testReverseMatch(ErrorMsg.OP_NOT_ALLOWED_IN_TXN, "ALTER TABLE",
+ JavaUtils.txnIdToString(1), "123");
testReverseMatch(ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, "ROLLBACK");
}
private void testReverseMatch(ErrorMsg errorMsg, String... args) {