You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2017/05/01 16:43:36 UTC

[3/3] hive git commit: HIVE-12636 Ensure that all queries (with DbTxnManager) run in a transaction (Eugene Koifman, reviewed by Wei Zheng)

HIVE-12636 Ensure that all queries (with DbTxnManager) run in a transaction (Eugene Koifman, reviewed by Wei Zheng)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/21909601
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/21909601
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/21909601

Branch: refs/heads/master
Commit: 21909601f8f5f9d8325774178aaaa8fb3c26a764
Parents: 41c3832
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Mon May 1 09:43:27 2017 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Mon May 1 09:43:27 2017 -0700

----------------------------------------------------------------------
 .../hive/hcatalog/streaming/TestStreaming.java  |  40 +--
 .../hive/ql/txn/compactor/TestCompactor.java    |  80 ++---
 .../hadoop/hive/metastore/txn/TxnHandler.java   |  13 +-
 .../hadoop/hive/metastore/txn/TxnUtils.java     |   4 +
 .../java/org/apache/hadoop/hive/ql/Context.java |   7 +
 .../java/org/apache/hadoop/hive/ql/Driver.java  | 204 ++++++-----
 .../org/apache/hadoop/hive/ql/ErrorMsg.java     |  14 +-
 .../org/apache/hadoop/hive/ql/QueryPlan.java    |  28 +-
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java    | 282 +++++++++++++---
 .../hadoop/hive/ql/lockmgr/HiveTxnManager.java  |  16 +-
 .../hive/ql/lockmgr/HiveTxnManagerImpl.java     |  20 +-
 .../hive/ql/parse/ExplainSemanticAnalyzer.java  |   6 +-
 .../hive/ql/parse/SemanticAnalyzerFactory.java  |  23 +-
 .../hadoop/hive/ql/plan/HiveOperation.java      |  37 +-
 .../hadoop/hive/ql/session/SessionState.java    |  15 +
 .../hive/metastore/txn/TestTxnHandler.java      |   2 +-
 .../org/apache/hadoop/hive/ql/TestErrorMsg.java |   6 +-
 .../apache/hadoop/hive/ql/TestTxnCommands.java  |  48 +--
 .../apache/hadoop/hive/ql/TestTxnCommands2.java |  21 +-
 .../ql/TestTxnCommands2WithSplitUpdate.java     |  20 +-
 .../hive/ql/lockmgr/TestDbTxnManager.java       |  65 ++--
 .../hive/ql/lockmgr/TestDbTxnManager2.java      | 336 +++++++++++--------
 .../hive/ql/txn/compactor/TestInitiator.java    |  11 +-
 ql/src/test/queries/clientpositive/row__id.q    |   4 +-
 .../clientpositive/acid_table_stats.q.out       |  14 +-
 .../clientpositive/autoColumnStats_4.q.out      |   4 +-
 .../insert_values_orig_table_use_metadata.q.out |  18 +-
 .../llap/acid_bucket_pruning.q.out              |   6 +-
 .../test/results/clientpositive/row__id.q.out   |  34 +-
 29 files changed, 840 insertions(+), 538 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 8ea58e6..097de9b 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -741,7 +741,7 @@ public class TestStreaming {
     txnBatch.write("1,Hello streaming".getBytes());
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
 
     Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
       , txnBatch.getCurrentTransactionState());
@@ -753,11 +753,11 @@ public class TestStreaming {
     txnBatch.write("2,Welcome to streaming".getBytes());
 
     // data should not be visible
-    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
 
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
+    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}",
       "{2, Welcome to streaming}");
 
     txnBatch.close();
@@ -809,7 +809,7 @@ public class TestStreaming {
     txnBatch.write("1,Hello streaming".getBytes());
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
 
     Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
       , txnBatch.getCurrentTransactionState());
@@ -821,11 +821,11 @@ public class TestStreaming {
     txnBatch.write("2,Welcome to streaming".getBytes());
 
     // data should not be visible
-    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
 
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
+    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}",
       "{2, Welcome to streaming}");
 
     txnBatch.close();
@@ -871,7 +871,7 @@ public class TestStreaming {
     txnBatch.write(rec1.getBytes());
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
 
     Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
             , txnBatch.getCurrentTransactionState());
@@ -998,7 +998,7 @@ public class TestStreaming {
     txnBatch.write("2,Welcome to streaming".getBytes());
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
+    checkDataWritten(partLoc, 14, 23, 1, 1, "{1, Hello streaming}",
             "{2, Welcome to streaming}");
 
     txnBatch.close();
@@ -1017,13 +1017,13 @@ public class TestStreaming {
     txnBatch.write("1,Hello streaming".getBytes());
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
 
     txnBatch.beginNextTransaction();
     txnBatch.write("2,Welcome to streaming".getBytes());
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
+    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}",
             "{2, Welcome to streaming}");
 
     txnBatch.close();
@@ -1034,14 +1034,14 @@ public class TestStreaming {
     txnBatch.write("3,Hello streaming - once again".getBytes());
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
+    checkDataWritten(partLoc, 15, 34, 1, 2, "{1, Hello streaming}",
             "{2, Welcome to streaming}", "{3, Hello streaming - once again}");
 
     txnBatch.beginNextTransaction();
     txnBatch.write("4,Welcome to streaming - once again".getBytes());
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
+    checkDataWritten(partLoc, 15, 34, 1, 2, "{1, Hello streaming}",
             "{2, Welcome to streaming}", "{3, Hello streaming - once again}",
             "{4, Welcome to streaming - once again}");
 
@@ -1078,11 +1078,11 @@ public class TestStreaming {
 
     txnBatch2.commit();
 
-    checkDataWritten(partLoc, 11, 20, 1, 1, "{3, Hello streaming - once again}");
+    checkDataWritten(partLoc, 24, 33, 1, 1, "{3, Hello streaming - once again}");
 
     txnBatch1.commit();
 
-    checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
+    checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
 
     txnBatch1.beginNextTransaction();
     txnBatch1.write("2,Welcome to streaming".getBytes());
@@ -1090,17 +1090,17 @@ public class TestStreaming {
     txnBatch2.beginNextTransaction();
     txnBatch2.write("4,Welcome to streaming - once again".getBytes());
 
-    checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
+    checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
 
     txnBatch1.commit();
 
-    checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
+    checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}",
         "{2, Welcome to streaming}",
         "{3, Hello streaming - once again}");
 
     txnBatch2.commit();
 
-    checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
+    checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}",
         "{2, Welcome to streaming}",
         "{3, Hello streaming - once again}",
         "{4, Welcome to streaming - once again}");
@@ -1769,7 +1769,7 @@ public class TestStreaming {
     txnBatch.heartbeat();//this is no-op on closed batch
     txnBatch.abort();//ditto
     GetOpenTxnsInfoResponse r = msClient.showTxns();
-    Assert.assertEquals("HWM didn't match", 2, r.getTxn_high_water_mark());
+    Assert.assertEquals("HWM didn't match", 17, r.getTxn_high_water_mark());
     List<TxnInfo> ti = r.getOpen_txns();
     Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState());
     Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState());
@@ -1833,7 +1833,7 @@ public class TestStreaming {
       expectedEx != null && expectedEx.getMessage().contains("has been closed()"));
 
     r = msClient.showTxns();
-    Assert.assertEquals("HWM didn't match", 4, r.getTxn_high_water_mark());
+    Assert.assertEquals("HWM didn't match", 19, r.getTxn_high_water_mark());
     ti = r.getOpen_txns();
     Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState());
     Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState());
@@ -1856,7 +1856,7 @@ public class TestStreaming {
       expectedEx != null && expectedEx.getMessage().contains("Simulated fault occurred"));
     
     r = msClient.showTxns();
-    Assert.assertEquals("HWM didn't match", 6, r.getTxn_high_water_mark());
+    Assert.assertEquals("HWM didn't match", 21, r.getTxn_high_water_mark());
     ti = r.getOpen_txns();
     Assert.assertEquals("wrong status ti(3)", TxnState.ABORTED, ti.get(3).getState());
     Assert.assertEquals("wrong status ti(4)", TxnState.ABORTED, ti.get(4).getState());

http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 66ed8ca..f92db7c 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -659,17 +659,17 @@ public class TestCompactor {
       Path resultFile = null;
       for (int i = 0; i < names.length; i++) {
         names[i] = stat[i].getPath().getName();
-        if (names[i].equals("delta_0000001_0000004")) {
+        if (names[i].equals("delta_0000003_0000006")) {
           resultFile = stat[i].getPath();
         }
       }
       Arrays.sort(names);
-      String[] expected = new String[]{"delta_0000001_0000002",
-          "delta_0000001_0000004", "delta_0000003_0000004", "delta_0000005_0000006"};
+      String[] expected = new String[]{"delta_0000003_0000004",
+          "delta_0000003_0000006", "delta_0000005_0000006", "delta_0000007_0000008"};
       if (!Arrays.deepEquals(expected, names)) {
         Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
       }
-      checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty,  0, 1L, 4L);
+      checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty,  0, 3L, 6L);
 
     } finally {
       connection.close();
@@ -718,11 +718,11 @@ public class TestCompactor {
       FileStatus[] stat =
           fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.baseFileFilter);
       if (1 != stat.length) {
-        Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat));
+        Assert.fail("Expecting 1 file \"base_0000006\" and found " + stat.length + " files " + Arrays.toString(stat));
       }
       String name = stat[0].getPath().getName();
-      Assert.assertEquals(name, "base_0000004");
-      checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L);
+      Assert.assertEquals(name, "base_0000006");
+      checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L);
     } finally {
       connection.close();
     }
@@ -778,17 +778,17 @@ public class TestCompactor {
       Path resultDelta = null;
       for (int i = 0; i < names.length; i++) {
         names[i] = stat[i].getPath().getName();
-        if (names[i].equals("delta_0000001_0000004")) {
+        if (names[i].equals("delta_0000003_0000006")) {
           resultDelta = stat[i].getPath();
         }
       }
       Arrays.sort(names);
-      String[] expected = new String[]{"delta_0000001_0000002",
-          "delta_0000001_0000004", "delta_0000003_0000004"};
+      String[] expected = new String[]{"delta_0000003_0000004",
+          "delta_0000003_0000006", "delta_0000005_0000006"};
       if (!Arrays.deepEquals(expected, names)) {
         Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
       }
-      checkExpectedTxnsPresent(null, new Path[]{resultDelta}, columnNamesProperty, columnTypesProperty, 0, 1L, 4L);
+      checkExpectedTxnsPresent(null, new Path[]{resultDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 6L);
     } finally {
       connection.close();
     }
@@ -844,13 +844,13 @@ public class TestCompactor {
         Assert.fail("majorCompactAfterAbort FileStatus[] stat " + Arrays.toString(stat));
       }
       if (1 != stat.length) {
-        Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat));
+        Assert.fail("Expecting 1 file \"base_0000006\" and found " + stat.length + " files " + Arrays.toString(stat));
       }
       String name = stat[0].getPath().getName();
-      if (!name.equals("base_0000004")) {
-        Assert.fail("majorCompactAfterAbort name " + name + " not equals to base_0000004");
+      if (!name.equals("base_0000006")) {
+        Assert.fail("majorCompactAfterAbort name " + name + " not equals to base_0000006");
       }
-      checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L);
+      checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L);
     } finally {
       connection.close();
     }
@@ -899,11 +899,11 @@ public class TestCompactor {
       FileStatus[] stat =
           fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.baseFileFilter);
       if (1 != stat.length) {
-        Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat));
+        Assert.fail("Expecting 1 file \"base_0000006\" and found " + stat.length + " files " + Arrays.toString(stat));
       }
       String name = stat[0].getPath().getName();
-      Assert.assertEquals(name, "base_0000004");
-      checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L);
+      Assert.assertEquals(name, "base_0000006");
+      checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L);
     } finally {
       connection.close();
     }
@@ -923,18 +923,18 @@ public class TestCompactor {
         " STORED AS ORC  TBLPROPERTIES ('transactional'='true',"
         + "'transactional_properties'='default')", driver);
 
-    // Insert some data -> this will generate only insert deltas and no delete deltas: delta_1_1
+    // Insert some data -> this will generate only insert deltas and no delete deltas: delta_3_3
     executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(1, 'foo')", driver);
 
-    // Insert some data -> this will again generate only insert deltas and no delete deltas: delta_2_2
+    // Insert some data -> this will again generate only insert deltas and no delete deltas: delta_4_4
     executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(2, 'bar')", driver);
 
-    // Delete some data -> this will generate only delete deltas and no insert deltas: delete_delta_3_3
+    // Delete some data -> this will generate only delete deltas and no insert deltas: delete_delta_5_5
     executeStatementOnDriver("DELETE FROM " + tblName +" WHERE a = 2", driver);
 
     // Now, compact -> Compaction produces a single range for both delta and delete delta
-    // That is, both delta and delete_deltas would be compacted into delta_1_3 and delete_delta_1_3
-    // even though there are only two delta_1_1, delta_2_2 and one delete_delta_3_3.
+    // That is, both delta and delete_deltas would be compacted into delta_3_5 and delete_delta_3_5
+    // even though there are only two delta_3_3, delta_4_4 and one delete_delta_5_5.
     TxnStore txnHandler = TxnUtils.getTxnStore(conf);
     txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR));
     Worker t = new Worker();
@@ -957,16 +957,16 @@ public class TestCompactor {
     Path minorCompactedDelta = null;
     for (int i = 0; i < deltas.length; i++) {
       deltas[i] = stat[i].getPath().getName();
-      if (deltas[i].equals("delta_0000001_0000003")) {
+      if (deltas[i].equals("delta_0000003_0000005")) {
         minorCompactedDelta = stat[i].getPath();
       }
     }
     Arrays.sort(deltas);
-    String[] expectedDeltas = new String[]{"delta_0000001_0000001_0000", "delta_0000001_0000003", "delta_0000002_0000002_0000"};
+    String[] expectedDeltas = new String[]{"delta_0000003_0000003_0000", "delta_0000003_0000005", "delta_0000004_0000004_0000"};
     if (!Arrays.deepEquals(expectedDeltas, deltas)) {
       Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas));
     }
-    checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 1L, 2L);
+    checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 4L);
 
     // Verify that we have got correct set of delete_deltas.
     FileStatus[] deleteDeltaStat =
@@ -975,16 +975,16 @@ public class TestCompactor {
     Path minorCompactedDeleteDelta = null;
     for (int i = 0; i < deleteDeltas.length; i++) {
       deleteDeltas[i] = deleteDeltaStat[i].getPath().getName();
-      if (deleteDeltas[i].equals("delete_delta_0000001_0000003")) {
+      if (deleteDeltas[i].equals("delete_delta_0000003_0000005")) {
         minorCompactedDeleteDelta = deleteDeltaStat[i].getPath();
       }
     }
     Arrays.sort(deleteDeltas);
-    String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000003", "delete_delta_0000003_0000003_0000"};
+    String[] expectedDeleteDeltas = new String[]{"delete_delta_0000003_0000005", "delete_delta_0000005_0000005_0000"};
     if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) {
       Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
     }
-    checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 2L, 2L);
+    checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 4L, 4L);
   }
 
   @Test
@@ -1034,16 +1034,16 @@ public class TestCompactor {
     Path minorCompactedDelta = null;
     for (int i = 0; i < deltas.length; i++) {
       deltas[i] = stat[i].getPath().getName();
-      if (deltas[i].equals("delta_0000001_0000002")) {
+      if (deltas[i].equals("delta_0000003_0000004")) {
         minorCompactedDelta = stat[i].getPath();
       }
     }
     Arrays.sort(deltas);
-    String[] expectedDeltas = new String[]{"delta_0000001_0000001_0000", "delta_0000001_0000002", "delta_0000002_0000002_0000"};
+    String[] expectedDeltas = new String[]{"delta_0000003_0000003_0000", "delta_0000003_0000004", "delta_0000004_0000004_0000"};
     if (!Arrays.deepEquals(expectedDeltas, deltas)) {
       Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas));
     }
-    checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 1L, 2L);
+    checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 4L);
 
     // Verify that we have got correct set of delete_deltas.
     FileStatus[] deleteDeltaStat =
@@ -1052,12 +1052,12 @@ public class TestCompactor {
     Path minorCompactedDeleteDelta = null;
     for (int i = 0; i < deleteDeltas.length; i++) {
       deleteDeltas[i] = deleteDeltaStat[i].getPath().getName();
-      if (deleteDeltas[i].equals("delete_delta_0000001_0000002")) {
+      if (deleteDeltas[i].equals("delete_delta_0000003_0000004")) {
         minorCompactedDeleteDelta = deleteDeltaStat[i].getPath();
       }
     }
     Arrays.sort(deleteDeltas);
-    String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000002"};
+    String[] expectedDeleteDeltas = new String[]{"delete_delta_0000003_0000004"};
     if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) {
       Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
     }
@@ -1111,17 +1111,17 @@ public class TestCompactor {
       Path resultFile = null;
       for (int i = 0; i < names.length; i++) {
         names[i] = stat[i].getPath().getName();
-        if (names[i].equals("delta_0000001_0000004")) {
+        if (names[i].equals("delta_0000003_0000006")) {
           resultFile = stat[i].getPath();
         }
       }
       Arrays.sort(names);
-      String[] expected = new String[]{"delta_0000001_0000002",
-          "delta_0000001_0000004", "delta_0000003_0000004", "delta_0000005_0000006"};
+      String[] expected = new String[]{"delta_0000003_0000004",
+          "delta_0000003_0000006", "delta_0000005_0000006", "delta_0000007_0000008"};
       if (!Arrays.deepEquals(expected, names)) {
         Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
       }
-      checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty,  0, 1L, 4L);
+      checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty,  0, 3L, 6L);
 
       // Verify that we have got correct set of delete_deltas also
       FileStatus[] deleteDeltaStat =
@@ -1130,12 +1130,12 @@ public class TestCompactor {
       Path minorCompactedDeleteDelta = null;
       for (int i = 0; i < deleteDeltas.length; i++) {
         deleteDeltas[i] = deleteDeltaStat[i].getPath().getName();
-        if (deleteDeltas[i].equals("delete_delta_0000001_0000004")) {
+        if (deleteDeltas[i].equals("delete_delta_0000003_0000006")) {
           minorCompactedDeleteDelta = deleteDeltaStat[i].getPath();
         }
       }
       Arrays.sort(deleteDeltas);
-      String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000004"};
+      String[] expectedDeleteDeltas = new String[]{"delete_delta_0000003_0000006"};
       if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) {
         Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index e138838..12d98c5 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -855,7 +855,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
   /**
    * As much as possible (i.e. in absence of retries) we want both operations to be done on the same
    * connection (but separate transactions).  This avoid some flakiness in BONECP where if you
-   * perform an operation on 1 connection and immediately get another fron the pool, the 2nd one
+   * perform an operation on 1 connection and immediately get another from the pool, the 2nd one
    * doesn't see results of the first.
    * 
    * Retry-by-caller note: If the call to lock is from a transaction, then in the worst case
@@ -994,6 +994,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
                 case SELECT:
                   updateTxnComponents = false;
                   break;
+                case NO_TXN:
+                  /*this constant is a bit of a misnomer since we now always have a txn context.  It
+                   just means the operation is such that we don't care what tables/partitions it
+                   affected as it doesn't trigger a compaction or conflict detection.  A better name
+                   would be NON_TRANSACTIONAL.*/
+                  updateTxnComponents = false;
+                  break;
                 default:
                   //since we have an open transaction, only 4 values above are expected 
                   throw new IllegalStateException("Unexpected DataOperationType: " + lc.getOperationType()
@@ -2471,14 +2478,14 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       response.setLockid(extLockId);
 
       LOG.debug("checkLock(): Setting savepoint. extLockId=" + JavaUtils.lockIdToString(extLockId));
-      Savepoint save = dbConn.setSavepoint();//todo: get rid of this
+      Savepoint save = dbConn.setSavepoint();
       StringBuilder query = new StringBuilder("select hl_lock_ext_id, " +
         "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " +
         "hl_lock_type, hl_txnid from HIVE_LOCKS where hl_db in (");
 
       Set<String> strings = new HashSet<String>(locksBeingChecked.size());
 
-      //This the set of entities that the statement represnted by extLockId wants to update
+      //This the set of entities that the statement represented by extLockId wants to update
       List<LockInfo> writeSet = new ArrayList<>();
 
       for (LockInfo info : locksBeingChecked) {

http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index 517eec3..2df88fd 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -50,6 +50,10 @@ public class TxnUtils {
    * @return a valid txn list.
    */
   public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, long currentTxn) {
+    /*todo: should highWater be min(currentTxn,txns.getTxn_high_water_mark()) assuming currentTxn>0
+     * otherwise if currentTxn=7 and 8 commits before 7, then 7 will see result of 8 which
+     * doesn't make sense for Snapshot Isolation.  Of course for Read Committed, the list should
+     * inlude the latest committed set.*/
     long highWater = txns.getTxn_high_water_mark();
     Set<Long> open = txns.getOpen_txns();
     long[] exceptions = new long[open.size() - (currentTxn > 0 ? 1 : 0)];

http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/java/org/apache/hadoop/hive/ql/Context.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index 08bba3d..fdcf052 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -948,6 +948,13 @@ public class Context {
   public ExplainConfiguration getExplainConfig() {
     return explainConfig;
   }
+  private boolean isExplainPlan = false;
+  public boolean isExplainPlan() {
+    return isExplainPlan;
+  }
+  public void setExplainPlan(boolean t) {
+    this.isExplainPlan = t;
+  }
 
   public void setExplainConfig(ExplainConfiguration explainConfig) {
     this.explainConfig = explainConfig;

http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 16b8101..d32f313 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -37,9 +37,12 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang.StringUtils;
 
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.metrics.common.Metrics;
 import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
@@ -154,11 +157,6 @@ public class Driver implements CommandProcessor {
   private FetchTask fetchTask;
   List<HiveLock> hiveLocks = new ArrayList<HiveLock>();
 
-  // A list of FileSinkOperators writing in an ACID compliant manner
-  private Set<FileSinkDesc> acidSinks;
-  // whether any ACID table is involved in a query
-  private boolean acidInQuery;
-
   // A limit on the number of threads that can be launched
   private int maxthreads;
   private int tryCount = Integer.MAX_VALUE;
@@ -408,7 +406,7 @@ public class Driver implements CommandProcessor {
   // deferClose indicates if the close/destroy should be deferred when the process has been
   // interrupted, it should be set to true if the compile is called within another method like
   // runInternal, which defers the close to the called in that method.
-  public int compile(String command, boolean resetTaskIds, boolean deferClose) {
+  private int compile(String command, boolean resetTaskIds, boolean deferClose) {
     PerfLogger perfLogger = SessionState.getPerfLogger(true);
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_RUN);
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.COMPILE);
@@ -525,6 +523,15 @@ public class Driver implements CommandProcessor {
       // because at that point we need access to the objects.
       Hive.get().getMSC().flushCache();
 
+      if(checkConcurrency() && startImplicitTxn(txnManager)) {
+        String userFromUGI = getUserFromUGI();
+        if (!txnManager.isTxnOpen()) {
+          if(userFromUGI == null) {
+            return 10;
+          }
+          long txnid = txnManager.openTxn(ctx, userFromUGI);
+        }
+      }
       // Do semantic analysis and plan generation
       if (saHooks != null && !saHooks.isEmpty()) {
         HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl();
@@ -543,15 +550,10 @@ public class Driver implements CommandProcessor {
       } else {
         sem.analyze(tree, ctx);
       }
-      // Record any ACID compliant FileSinkOperators we saw so we can add our transaction ID to
-      // them later.
-      acidSinks = sem.getAcidFileSinks();
-
       LOG.info("Semantic Analysis Completed");
 
       // validate the plan
       sem.validate();
-      acidInQuery = sem.hasAcidInQuery();
       perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE);
 
       if (isInterrupted()) {
@@ -669,7 +671,39 @@ public class Driver implements CommandProcessor {
     }
   }
 
-
+  private boolean startImplicitTxn(HiveTxnManager txnManager) throws LockException {
+    boolean shouldOpenImplicitTxn = !ctx.isExplainPlan();
+    //this is dumb. HiveOperation is not always set. see HIVE-16447/HIVE-16443
+    switch (queryState.getHiveOperation() == null ? HiveOperation.QUERY : queryState.getHiveOperation()) {
+      case COMMIT:
+      case ROLLBACK:
+        if(!txnManager.isTxnOpen()) {
+          throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, queryState.getHiveOperation().getOperationName());
+        }
+      case SWITCHDATABASE:
+      case SET_AUTOCOMMIT:
+        /**
+         * autocommit is here for completeness.  TM doesn't use it.  If we want to support JDBC
+         * semantics (or any other definition of autocommit) it should be done at session level.
+         */
+      case SHOWDATABASES:
+      case SHOWTABLES:
+      case SHOWCOLUMNS:
+      case SHOWFUNCTIONS:
+      case SHOWINDEXES:
+      case SHOWPARTITIONS:
+      case SHOWLOCKS:
+      case SHOWVIEWS:
+      case SHOW_ROLES:
+      case SHOW_ROLE_PRINCIPALS:
+      case SHOW_COMPACTIONS:
+      case SHOW_TRANSACTIONS:
+      case ABORT_TRANSACTIONS:
+        shouldOpenImplicitTxn = false;
+        //this implies that no locks are needed for such a command
+    }
+    return shouldOpenImplicitTxn;
+  }
   private int handleInterruption(String msg) {
     return handleInterruptionWithHook(msg, null, null);
   }
@@ -1083,8 +1117,17 @@ public class Driver implements CommandProcessor {
   // Write the current set of valid transactions into the conf file so that it can be read by
   // the input format.
   private void recordValidTxns() throws LockException {
+    ValidTxnList oldList = null;
+    String s = conf.get(ValidTxnList.VALID_TXNS_KEY);
+    if(s != null && s.length() > 0) {
+      oldList = new ValidReadTxnList(s);
+    }
     HiveTxnManager txnMgr = SessionState.get().getTxnMgr();
     ValidTxnList txns = txnMgr.getValidTxns();
+    if(oldList != null) {
+      throw new IllegalStateException("calling recordValidTxn() more than once in the same " +
+        JavaUtils.txnIdToString(txnMgr.getCurrentTxnId()));
+    }
     String txnStr = txns.toString();
     conf.set(ValidTxnList.VALID_TXNS_KEY, txnStr);
     if(plan.getFetchTask() != null) {
@@ -1098,79 +1141,61 @@ public class Driver implements CommandProcessor {
     LOG.debug("Encoding valid txns info " + txnStr + " txnid:" + txnMgr.getCurrentTxnId());
   }
 
+  private String getUserFromUGI() {
+    // Don't use the userName member, as it may or may not have been set.  Get the value from
+    // conf, which calls into getUGI to figure out who the process is running as.
+    try {
+      return conf.getUser();
+    } catch (IOException e) {
+      errorMessage = "FAILED: Error in determining user while acquiring locks: " + e.getMessage();
+      SQLState = ErrorMsg.findSQLState(e.getMessage());
+      downstreamError = e;
+      console.printError(errorMessage,
+        "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
+    }
+    return null;
+  }
   /**
    * Acquire read and write locks needed by the statement. The list of objects to be locked are
-   * obtained from the inputs and outputs populated by the compiler. The lock acquisition scheme is
-   * pretty simple. If all the locks cannot be obtained, error out. Deadlock is avoided by making
-   * sure that the locks are lexicographically sorted.
+   * obtained from the inputs and outputs populated by the compiler.  Locking strategy depends on
+   * HiveTxnManager and HiveLockManager configured
    *
    * This method also records the list of valid transactions.  This must be done after any
-   * transactions have been opened and locks acquired.
-   * @param startTxnImplicitly in AC=false, the 1st DML starts a txn
+   * transactions have been opened.
    **/
-  private int acquireLocksAndOpenTxn(boolean startTxnImplicitly) {
+  private int acquireLocks() {
     PerfLogger perfLogger = SessionState.getPerfLogger();
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ACQUIRE_READ_WRITE_LOCKS);
 
     SessionState ss = SessionState.get();
     HiveTxnManager txnMgr = ss.getTxnMgr();
-    if(startTxnImplicitly) {
-      assert !txnMgr.getAutoCommit();
+    if(!txnMgr.isTxnOpen() && txnMgr.supportsAcid()) {
+      /*non acid txn managers don't support txns but fwd lock requests to lock managers
+        acid txn manager requires all locks to be associated with a txn so if we
+        end up here w/o an open txn it's because we are processing something like "use <database>
+        which by definition needs no locks*/
+      return 0;
     }
-
     try {
-      // Don't use the userName member, as it may or may not have been set.  Get the value from
-      // conf, which calls into getUGI to figure out who the process is running as.
-      String userFromUGI;
-      try {
-        userFromUGI = conf.getUser();
-      } catch (IOException e) {
-        errorMessage = "FAILED: Error in determining user while acquiring locks: " + e.getMessage();
-        SQLState = ErrorMsg.findSQLState(e.getMessage());
-        downstreamError = e;
-        console.printError(errorMessage,
-            "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
+      String userFromUGI = getUserFromUGI();
+      if(userFromUGI == null) {
         return 10;
       }
-
-      boolean initiatingTransaction = false;
-      boolean readOnlyQueryInAutoCommit = false;
-      if((txnMgr.getAutoCommit() && haveAcidWrite()) || plan.getOperation() == HiveOperation.START_TRANSACTION ||
-        (!txnMgr.getAutoCommit() && startTxnImplicitly)) {
-        if(txnMgr.isTxnOpen()) {
-          throw new RuntimeException("Already have an open transaction txnid:" + txnMgr.getCurrentTxnId());
-        }
-        // We are writing to tables in an ACID compliant way, so we need to open a transaction
-        txnMgr.openTxn(ctx, userFromUGI);
-        initiatingTransaction = true;
-      }
-      else {
-        readOnlyQueryInAutoCommit = txnMgr.getAutoCommit() && plan.getOperation() == HiveOperation.QUERY && !haveAcidWrite();
-      }
       // Set the transaction id in all of the acid file sinks
       if (haveAcidWrite()) {
-        for (FileSinkDesc desc : acidSinks) {
+        for (FileSinkDesc desc : plan.getAcidSinks()) {
           desc.setTransactionId(txnMgr.getCurrentTxnId());
           //it's possible to have > 1 FileSink writing to the same table/partition
           //e.g. Merge stmt, multi-insert stmt when mixing DP and SP writes
           desc.setStatementId(txnMgr.getWriteIdAndIncrement());
         }
       }
-      /*Note, we have to record snapshot after lock acquisition to prevent lost update problem
-      consider 2 concurrent "update table T set x = x + 1".  1st will get the locks and the
-      2nd will block until 1st one commits and only then lock in the snapshot, i.e. it will
-      see the changes made by 1st one.  This takes care of autoCommit=true case.
-      For multi-stmt txns this is not sufficient and will be managed via WriteSet tracking
-      in the lock manager.*/
+      /*It's imperative that {@code acquireLocks()} is called for all commands so that 
+      HiveTxnManager can transition its state machine correctly*/
       txnMgr.acquireLocks(plan, ctx, userFromUGI, lDrvState);
-      if(initiatingTransaction || (readOnlyQueryInAutoCommit && acidInQuery)) {
-        //For multi-stmt txns we should record the snapshot when txn starts but
-        // don't update it after that until txn completes.  Thus the check for {@code initiatingTransaction}
-        //For autoCommit=true, Read-only statements, txn is implicit, i.e. lock in the snapshot
-        //for each statement.
+      if(txnMgr.recordSnapshot(plan)) {
         recordValidTxns();
       }
-
       return 0;
     } catch (Exception e) {
       errorMessage = "FAILED: Error in acquiring locks: " + e.getMessage();
@@ -1185,7 +1210,7 @@ public class Driver implements CommandProcessor {
   }
 
   private boolean haveAcidWrite() {
-    return acidSinks != null && !acidSinks.isEmpty();
+    return !plan.getAcidSinks().isEmpty();
   }
   /**
    * @param commit if there is an open transaction and if true, commit,
@@ -1193,11 +1218,11 @@ public class Driver implements CommandProcessor {
    * @param txnManager an optional existing transaction manager retrieved earlier from the session
    *
    **/
-  private void releaseLocksAndCommitOrRollback(boolean commit, HiveTxnManager txnManager)
+  @VisibleForTesting
+  public void releaseLocksAndCommitOrRollback(boolean commit, HiveTxnManager txnManager)
       throws LockException {
     PerfLogger perfLogger = SessionState.getPerfLogger();
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RELEASE_LOCKS);
-
     HiveTxnManager txnMgr;
     if (txnManager == null) {
       SessionState ss = SessionState.get();
@@ -1207,6 +1232,7 @@ public class Driver implements CommandProcessor {
     }
     // If we've opened a transaction we need to commit or rollback rather than explicitly
     // releasing the locks.
+    conf.unset(ValidTxnList.VALID_TXNS_KEY);
     if (txnMgr.isTxnOpen()) {
       if (commit) {
         if(conf.getBoolVar(ConfVars.HIVE_IN_TEST) && conf.getBoolVar(ConfVars.HIVETESTMODEROLLBACKTXN)) {
@@ -1492,52 +1518,12 @@ public class Driver implements CommandProcessor {
       HiveTxnManager txnManager = SessionState.get().getTxnMgr();
       ctx.setHiveTxnManager(txnManager);
 
-      boolean startTxnImplicitly = false;
-      {
-        //this block ensures op makes sense in given context, e.g. COMMIT is valid only if txn is open
-        //DDL is not allowed in a txn, etc.
-        //an error in an open txn does a rollback of the txn
-        if (txnManager.isTxnOpen() && !plan.getOperation().isAllowedInTransaction()) {
-          assert !txnManager.getAutoCommit() : "didn't expect AC=true";
-          return rollback(new CommandProcessorResponse(12, ErrorMsg.OP_NOT_ALLOWED_IN_TXN, null,
-            plan.getOperationName(), Long.toString(txnManager.getCurrentTxnId())));
-        }
-        if(!txnManager.isTxnOpen() && plan.getOperation().isRequiresOpenTransaction()) {
-          return rollback(new CommandProcessorResponse(12, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, null, plan.getOperationName()));
-        }
-        if(!txnManager.isTxnOpen() && plan.getOperation() == HiveOperation.QUERY && !txnManager.getAutoCommit()) {
-          //this effectively makes START TRANSACTION optional and supports JDBC setAutoCommit(false) semantics
-          //also, indirectly allows DDL to be executed outside a txn context
-          startTxnImplicitly = true;
-        }
-        if(txnManager.getAutoCommit() && plan.getOperation() == HiveOperation.START_TRANSACTION) {
-          return rollback(new CommandProcessorResponse(12, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT, null, plan.getOperationName()));
-        }
-      }
-      if(plan.getOperation() == HiveOperation.SET_AUTOCOMMIT) {
-        try {
-          if(plan.getAutoCommitValue() && !txnManager.getAutoCommit()) {
-            /*here, if there is an open txn, we want to commit it; this behavior matches
-            * https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#setAutoCommit(boolean)*/
-            releaseLocksAndCommitOrRollback(true, null);
-            txnManager.setAutoCommit(true);
-          }
-          else if(!plan.getAutoCommitValue() && txnManager.getAutoCommit()) {
-            txnManager.setAutoCommit(false);
-          }
-          else {/*didn't change autoCommit value - no-op*/}
-        }
-        catch(LockException e) {
-          return handleHiveException(e, 12);
-        }
-      }
-
       if (requiresLock()) {
         // a checkpoint to see if the thread is interrupted or not before an expensive operation
         if (isInterrupted()) {
           ret = handleInterruption("at acquiring the lock.");
         } else {
-          ret = acquireLocksAndOpenTxn(startTxnImplicitly);
+          ret = acquireLocks();
         }
         if (ret != 0) {
           return rollback(createProcessorResponse(ret));
@@ -1551,7 +1537,8 @@ public class Driver implements CommandProcessor {
 
       //if needRequireLock is false, the release here will do nothing because there is no lock
       try {
-        if(txnManager.getAutoCommit() || plan.getOperation() == HiveOperation.COMMIT) {
+        //since set autocommit starts an implicit txn, close it
+        if(txnManager.isImplicitTransactionOpen() || plan.getOperation() == HiveOperation.COMMIT) {
           releaseLocksAndCommitOrRollback(true, null);
         }
         else if(plan.getOperation() == HiveOperation.ROLLBACK) {
@@ -1678,6 +1665,13 @@ public class Driver implements CommandProcessor {
   private CommandProcessorResponse createProcessorResponse(int ret) {
     SessionState.getPerfLogger().cleanupPerfLogMetrics();
     queryDisplay.setErrorMessage(errorMessage);
+    if(downstreamError != null && downstreamError instanceof HiveException) {
+      ErrorMsg em = ((HiveException)downstreamError).getCanonicalErrorMsg();
+      if(em != null) {
+        return new CommandProcessorResponse(ret, errorMessage, SQLState,
+          schema, downstreamError, em.getErrorCode(), null);
+      }
+    }
     return new CommandProcessorResponse(ret, errorMessage, SQLState, downstreamError);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 9667d71..d01a203 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -482,9 +482,17 @@ public enum ErrorMsg {
       "is controlled by hive.exec.max.dynamic.partitions and hive.exec.max.dynamic.partitions.pernode. "),
   PARTITION_SCAN_LIMIT_EXCEEDED(20005, "Number of partitions scanned (={0}) on table {1} exceeds limit" +
       " (={2}). This is controlled by hive.limit.query.max.table.partition.", true),
-  OP_NOT_ALLOWED_IN_AUTOCOMMIT(20006, "Operation {0} is not allowed when autoCommit=true.", true),//todo: better SQLState?
-  OP_NOT_ALLOWED_IN_TXN(20007, "Operation {0} is not allowed in a transaction.  TransactionID={1}.", true),
-  OP_NOT_ALLOWED_WITHOUT_TXN(20008, "Operation {0} is not allowed since autoCommit=false and there is no active transaction", true),
+  /**
+   * {1} is the transaction id;
+   * use {@link org.apache.hadoop.hive.common.JavaUtils#txnIdToString(long)} to format
+   */
+  OP_NOT_ALLOWED_IN_IMPLICIT_TXN(20006, "Operation {0} is not allowed in an implicit transaction ({1}).", true),
+  /**
+   * {1} is the transaction id;
+   * use {@link org.apache.hadoop.hive.common.JavaUtils#txnIdToString(long)} to format
+   */
+  OP_NOT_ALLOWED_IN_TXN(20007, "Operation {0} is not allowed in a transaction ({1},queryId={2}).", true),
+  OP_NOT_ALLOWED_WITHOUT_TXN(20008, "Operation {0} is not allowed without an active transaction", true),
   //========================== 30000 range starts here ========================//
   STATSPUBLISHER_NOT_OBTAINED(30000, "StatsPublisher cannot be obtained. " +
     "There was a error to retrieve the StatsPublisher, and retrying " +

http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
index e8c8ae6..2ddabd9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
@@ -35,6 +35,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hive.metastore.api.Schema;
 import org.apache.hadoop.hive.ql.exec.ConditionalTask;
 import org.apache.hadoop.hive.ql.exec.ExplainTask;
@@ -48,6 +49,7 @@ import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.ColumnAccessInfo;
 import org.apache.hadoop.hive.ql.parse.TableAccessInfo;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.HiveOperation;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReducerTimeStatsPerJob;
@@ -105,11 +107,19 @@ public class QueryPlan implements Serializable {
 
   private transient Long queryStartTime;
   private final HiveOperation operation;
+  private final boolean acidResourcesInQuery;
+  private final Set<FileSinkDesc> acidSinks;
   private Boolean autoCommitValue;
 
   public QueryPlan() {
-    this.reducerTimeStatsPerJobList = new ArrayList<ReducerTimeStatsPerJob>();
-    operation = null;
+    this(null);
+  }
+  @VisibleForTesting
+  protected QueryPlan(HiveOperation command) {
+    this.reducerTimeStatsPerJobList = new ArrayList<>();
+    this.operation = command;
+    this.acidResourcesInQuery = false;
+    this.acidSinks = Collections.emptySet();
   }
 
   public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, String queryId,
@@ -136,8 +146,22 @@ public class QueryPlan implements Serializable {
     this.operation = operation;
     this.autoCommitValue = sem.getAutoCommitValue();
     this.resultSchema = resultSchema;
+    this.acidResourcesInQuery = sem.hasAcidInQuery();
+    this.acidSinks = sem.getAcidFileSinks();
   }
 
+  /**
+   * @return true if any acid resources are read/written
+   */
+  public boolean hasAcidResourcesInQuery() {
+    return acidResourcesInQuery;
+  }
+  /**
+   * @return Collection of FileSinkDesc representing writes to Acid resources
+   */
+  Set<FileSinkDesc> getAcidSinks() {
+    return acidSinks;
+  }
   public String getQueryStr() {
     return queryString;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 62f7c5a..cdf2c40 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -21,6 +21,11 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.plan.HiveOperation;
+import org.apache.hadoop.hive.ql.plan.LockDatabaseDesc;
+import org.apache.hadoop.hive.ql.plan.LockTableDesc;
+import org.apache.hadoop.hive.ql.plan.UnlockDatabaseDesc;
+import org.apache.hadoop.hive.ql.plan.UnlockTableDesc;
 import org.apache.hive.common.util.ShutdownHookManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,6 +62,13 @@ import java.util.concurrent.atomic.AtomicInteger;
  * with a single thread accessing it at a time, with the exception of {@link #heartbeat()} method.
  * The later may (usually will) be called from a timer thread.
  * See {@link #getMS()} for more important concurrency/metastore access notes.
+ * 
+ * Each statement that the TM (transaction manager) should be aware of should belong to a transaction.
+ * Effectively, that means any statement that has side effects.  Exceptions are statements like
+ * Show Compactions, Show Tables, Use Database foo, etc.  The transaction is started either
+ * explicitly ( via Start Transaction SQL statement from end user - not fully supported) or
+ * implicitly by the {@link org.apache.hadoop.hive.ql.Driver} (which looks exactly as autoCommit=true
+ * from end user poit of view). See more at {@link #isExplicitTransaction}.
  */
 public final class DbTxnManager extends HiveTxnManagerImpl {
 
@@ -76,7 +88,47 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
    * to keep apart multiple writes of the same data within the same transaction
    * Also see {@link org.apache.hadoop.hive.ql.io.AcidOutputFormat.Options}
    */
-  private int statementId = -1;
+  private int writeId = -1;
+  /**
+   * counts number of statements in the current transaction
+   */
+  private int numStatements = 0;
+  /**
+   * if {@code true} it means current transaction is started via START TRANSACTION which means it cannot
+   * include any Operations which cannot be rolled back (drop partition; write to  non-acid table).
+   * If false, it's a single statement transaction which can include any statement.  This is not a 
+   * contradiction from the user point of view who doesn't know anything about the implicit txn
+   * and cannot call rollback (the statement of course can fail in which case there is nothing to 
+   * rollback (assuming the statement is well implemented)).
+   *
+   * This is done so that all commands run in a transaction which simplifies implementation and
+   * allows a simple implementation of multi-statement txns which don't require a lock manager
+   * capable of deadlock detection.  (todo: not fully implemented; elaborate on how this LM works)
+   *
+   * Also, critically important, ensuring that everything runs in a transaction assigns an order
+   * to all operations in the system - needed for replication/DR.
+   *
+   * We don't want to allow non-transactional statements in a user demarcated txn because the effect
+   * of such statement is "visible" immediately on statement completion, but the user may
+   * issue a rollback but the action of the statement can't be undone (and has possibly already been
+   * seen by another txn).  For example,
+   * start transaction
+   * insert into transactional_table values(1);
+   * insert into non_transactional_table select * from transactional_table;
+   * rollback
+   *
+   * The user would be in for a surprise especially if they are not aware of transactional
+   * properties of the tables involved.
+   *
+   * As a side note: what should the lock manager do with locks for non-transactional resources?
+   * Should it it release them at the end of the stmt or txn?
+   * Some interesting thoughts: http://mysqlmusings.blogspot.com/2009/02/mixing-engines-in-transactions.html
+   */
+  private boolean isExplicitTransaction = false;
+  /**
+   * To ensure transactions don't nest.
+   */
+  private int startTransactionCount = 0;
 
   // QueryId for the query in current transaction
   private String queryId;
@@ -141,15 +193,22 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
 
   @VisibleForTesting
   long openTxn(Context ctx, String user, long delay) throws LockException {
-    //todo: why don't we lock the snapshot here???  Instead of having client make an explicit call
-    //whenever it chooses
+    /*Q: why don't we lock the snapshot here???  Instead of having client make an explicit call
+    whenever it chooses
+    A: If we want to rely on locks for transaction scheduling we must get the snapshot after lock
+    acquisition.  Relying on locks is a pessimistic strategy which works better under high
+    contention.*/
     init();
+    getLockManager();
     if(isTxnOpen()) {
       throw new LockException("Transaction already opened. " + JavaUtils.txnIdToString(txnId));
     }
     try {
       txnId = getMS().openTxn(user);
-      statementId = 0;
+      writeId = 0;
+      numStatements = 0;
+      isExplicitTransaction = false;
+      startTransactionCount = 0;
       LOG.debug("Opened " + JavaUtils.txnIdToString(txnId));
       ctx.setHeartbeater(startHeartbeat(delay));
       return txnId;
@@ -159,8 +218,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
   }
 
   /**
-   * we don't expect multiple thread to call this method concurrently but {@link #lockMgr} will
-   * be read by a different threads that one writing it, thus it's {@code volatile}
+   * we don't expect multiple threads to call this method concurrently but {@link #lockMgr} will
+   * be read by a different threads than one writing it, thus it's {@code volatile}
    */
   @Override
   public HiveLockManager getLockManager() throws LockException {
@@ -179,24 +238,95 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
     catch(LockException e) {
       if(e.getCause() instanceof TxnAbortedException) {
         txnId = 0;
-        statementId = -1;
+        writeId = -1;
       }
       throw e;
     }
   }
 
   /**
-   * This is for testing only.  Normally client should call {@link #acquireLocks(org.apache.hadoop.hive.ql.QueryPlan, org.apache.hadoop.hive.ql.Context, String)}
+   * Watermark to include in error msgs and logs
+   * @param queryPlan
+   * @return
+   */
+  private static String getQueryIdWaterMark(QueryPlan queryPlan) {
+    return "queryId=" + queryPlan.getQueryId();
+  }
+
+  private void markExplicitTransaction(QueryPlan queryPlan) throws LockException {
+    isExplicitTransaction = true;
+    if(++startTransactionCount > 1) {
+      throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_IN_TXN, queryPlan.getOperationName(),
+        JavaUtils.txnIdToString(getCurrentTxnId()), queryPlan.getQueryId());
+    }
+
+  }
+  /**
+   * Ensures that the current SQL statement is appropriate for the current state of the
+   * Transaction Manager (e.g. can call commit unless you called start transaction)
+   * 
+   * Note that support for multi-statement txns is a work-in-progress so it's only supported in
+   * HiveConf#HIVE_IN_TEST/HiveConf#TEZ_HIVE_IN_TEST.
+   * @param queryPlan
+   * @throws LockException
+   */
+  private void verifyState(QueryPlan queryPlan) throws LockException {
+    if(!isTxnOpen()) {
+      throw new LockException("No transaction context for operation: " + queryPlan.getOperationName() + 
+        " for " + getQueryIdWaterMark(queryPlan));
+    }
+    if(queryPlan.getOperation() == null) {
+      throw new IllegalStateException("Unkown HiverOperation for " + getQueryIdWaterMark(queryPlan));
+    }
+    numStatements++;
+    switch (queryPlan.getOperation()) {
+      case START_TRANSACTION:
+        markExplicitTransaction(queryPlan);
+        break;
+      case COMMIT:
+      case ROLLBACK:
+        if(!isTxnOpen()) {
+          throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, queryPlan.getOperationName());
+        }
+        if(!isExplicitTransaction) {
+          throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_IN_IMPLICIT_TXN, queryPlan.getOperationName());
+        }
+        break;
+      default:
+        if(!queryPlan.getOperation().isAllowedInTransaction() && isExplicitTransaction) {
+          //for example, drop table in an explicit txn is not allowed
+          //in some cases this requires looking at more than just the operation
+          //for example HiveOperation.LOAD - OK if target is MM table but not OK if non-acid table
+          throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_IN_TXN, queryPlan.getOperationName(),
+            JavaUtils.txnIdToString(getCurrentTxnId()), queryPlan.getQueryId());
+        }
+    }
+    /*
+    Should we allow writing to non-transactional tables in an explicit transaction?  The user may
+    issue ROLLBACK but these tables won't rollback.
+    Can do this by checking ReadEntity/WriteEntity to determine whether it's reading/writing
+    any non acid and raise an appropriate error
+    * Driver.acidSinks and Driver.acidInQuery can be used if any acid is in the query*/
+  }
+  /**
+   * Normally client should call {@link #acquireLocks(org.apache.hadoop.hive.ql.QueryPlan, org.apache.hadoop.hive.ql.Context, String)}
    * @param isBlocking if false, the method will return immediately; thus the locks may be in LockState.WAITING
    * @return null if no locks were needed
    */
+  @VisibleForTesting
   LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isBlocking) throws LockException {
     init();
-        // Make sure we've built the lock manager
+    // Make sure we've built the lock manager
     getLockManager();
-
+    verifyState(plan);
     boolean atLeastOneLock = false;
     queryId = plan.getQueryId();
+    switch (plan.getOperation()) {
+      case SET_AUTOCOMMIT:
+        /**This is here for documentation purposes.  This TM doesn't support this - only has one
+        * mode of operation documented at {@link DbTxnManager#isExplicitTransaction}*/
+        return  null;
+    }
 
     LockRequestBuilder rqstBuilder = new LockRequestBuilder(queryId);
     //link queryId to txnId
@@ -240,8 +370,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
           // This is a file or something we don't hold locks for.
           continue;
       }
-      if(t != null && AcidUtils.isAcidTable(t)) {
-        compBuilder.setIsAcid(true);
+      if(t != null) {
+        compBuilder.setIsAcid(AcidUtils.isAcidTable(t));
       }
       LockComponent comp = compBuilder.build();
       LOG.debug("Adding lock component to lock request " + comp.toString());
@@ -262,7 +392,33 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
       }
       LockComponentBuilder compBuilder = new LockComponentBuilder();
       Table t = null;
+      switch (output.getType()) {
+        case DATABASE:
+          compBuilder.setDbName(output.getDatabase().getName());
+          break;
+
+        case TABLE:
+        case DUMMYPARTITION:   // in case of dynamic partitioning lock the table
+          t = output.getTable();
+          compBuilder.setDbName(t.getDbName());
+          compBuilder.setTableName(t.getTableName());
+          break;
+
+        case PARTITION:
+          compBuilder.setPartitionName(output.getPartition().getName());
+          t = output.getPartition().getTable();
+          compBuilder.setDbName(t.getDbName());
+          compBuilder.setTableName(t.getTableName());
+          break;
+
+        default:
+          // This is a file or something we don't hold locks for.
+          continue;
+      }
       switch (output.getWriteType()) {
+        /* base this on HiveOperation instead?  this and DDL_NO_LOCK is peppered all over the code...
+         Seems much cleaner if each stmt is identified as a particular HiveOperation (which I'd think
+         makes sense everywhere).  This however would be problematic for merge...*/
         case DDL_EXCLUSIVE:
         case INSERT_OVERWRITE:
           compBuilder.setExclusive();
@@ -270,10 +426,9 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
           break;
 
         case INSERT:
-          t = getTable(output);
+          assert t != null;
           if(AcidUtils.isAcidTable(t)) {
             compBuilder.setShared();
-            compBuilder.setIsAcid(true);
           }
           else {
             if (conf.getBoolVar(HiveConf.ConfVars.HIVE_TXN_STRICT_LOCKING_MODE)) {
@@ -281,7 +436,6 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
             } else {  // this is backward compatible for non-ACID resources, w/o ACID semantics
               compBuilder.setShared();
             }
-            compBuilder.setIsAcid(false);
           }
           compBuilder.setOperationType(DataOperationType.INSERT);
           break;
@@ -293,12 +447,10 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
         case UPDATE:
           compBuilder.setSemiShared();
           compBuilder.setOperationType(DataOperationType.UPDATE);
-          t = getTable(output);
           break;
         case DELETE:
           compBuilder.setSemiShared();
           compBuilder.setOperationType(DataOperationType.DELETE);
-          t = getTable(output);
           break;
 
         case DDL_NO_LOCK:
@@ -307,34 +459,11 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
         default:
           throw new RuntimeException("Unknown write type " +
               output.getWriteType().toString());
-
       }
-      switch (output.getType()) {
-        case DATABASE:
-          compBuilder.setDbName(output.getDatabase().getName());
-          break;
-
-        case TABLE:
-        case DUMMYPARTITION:   // in case of dynamic partitioning lock the table
-          t = output.getTable();
-          compBuilder.setDbName(t.getDbName());
-          compBuilder.setTableName(t.getTableName());
-          break;
-
-        case PARTITION:
-          compBuilder.setPartitionName(output.getPartition().getName());
-          t = output.getPartition().getTable();
-          compBuilder.setDbName(t.getDbName());
-          compBuilder.setTableName(t.getTableName());
-          break;
-
-        default:
-          // This is a file or something we don't hold locks for.
-          continue;
-      }
-      if(t != null && AcidUtils.isAcidTable(t)) {
-        compBuilder.setIsAcid(true);
+      if(t != null) {
+        compBuilder.setIsAcid(AcidUtils.isAcidTable(t));
       }
+
       compBuilder.setIsDynamicPartitionWrite(output.isDynamicPartitionWrite());
       LockComponent comp = compBuilder.build();
       LOG.debug("Adding lock component to lock request " + comp.toString());
@@ -405,7 +534,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
           e);
     } finally {
       txnId = 0;
-      statementId = -1;
+      writeId = -1;
+      numStatements = 0;
     }
   }
 
@@ -429,7 +559,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
           e);
     } finally {
       txnId = 0;
-      statementId = -1;
+      writeId = -1;
+      numStatements = 0;
     }
   }
 
@@ -556,6 +687,26 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
   public boolean supportsExplicitLock() {
     return false;
   }
+  @Override
+  public int lockTable(Hive db, LockTableDesc lockTbl) throws HiveException {
+    super.lockTable(db, lockTbl);
+    throw new UnsupportedOperationException();
+  }
+  @Override
+  public int unlockTable(Hive hiveDB, UnlockTableDesc unlockTbl) throws HiveException {
+    super.unlockTable(hiveDB, unlockTbl);
+    throw new UnsupportedOperationException();
+  }
+  @Override
+  public int lockDatabase(Hive hiveDB, LockDatabaseDesc lockDb) throws HiveException {
+    super.lockDatabase(hiveDB, lockDb);
+    throw new UnsupportedOperationException();
+  }
+  @Override
+  public int unlockDatabase(Hive hiveDB, UnlockDatabaseDesc unlockDb) throws HiveException {
+    super.unlockDatabase(hiveDB, unlockDb);
+    throw new UnsupportedOperationException();
+  }
 
   @Override
   public boolean useNewShowLocksFormat() {
@@ -566,7 +717,44 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
   public boolean supportsAcid() {
     return true;
   }
-
+  /**
+   * In an explicit txn start_transaction is the 1st statement and we record the snapshot at the
+   * start of the txn for Snapshot Isolation.  For Read Committed (not supported yet) we'd record
+   * it before executing each statement (but after lock acquisition if using lock based concurrency
+   * control).
+   * For implicit txn, the stmt that triggered/started the txn is the first statement
+   */
+  @Override
+  public boolean recordSnapshot(QueryPlan queryPlan) {
+    assert isTxnOpen();
+    assert numStatements > 0 : "was acquireLocks() called already?";
+    if(queryPlan.getOperation() == HiveOperation.START_TRANSACTION) {
+      //here if start of explicit txn
+      assert isExplicitTransaction;
+      assert numStatements == 1;
+      return true;
+    }
+    else if(!isExplicitTransaction) {
+      assert numStatements == 1 : "numStatements=" + numStatements + " in implicit txn";
+      if (queryPlan.hasAcidResourcesInQuery()) {
+        //1st and only stmt in implicit txn and uses acid resource
+        return true;
+      }
+    }
+    return false;
+  }
+  @Override
+  public boolean isImplicitTransactionOpen() {
+    if(!isTxnOpen()) {
+      //some commands like "show databases" don't start implicit transactions
+      return false;
+    }
+    if(!isExplicitTransaction) {
+      assert numStatements == 1 : "numStatements=" + numStatements;
+      return true;
+    }
+    return false;
+  }
   @Override
   protected void destruct() {
     try {
@@ -626,7 +814,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
   @Override
   public int getWriteIdAndIncrement() {
     assert isTxnOpen();
-    return statementId++;
+    return writeId++;
   }
 
   private static long getHeartbeatInterval(Configuration conf) throws LockException {

http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
index 187a658..b24351c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
@@ -72,7 +72,7 @@ public interface HiveTxnManager {
 
   /**
    * Acquire all of the locks needed by a query.  If used with a query that
-   * requires transactions, this should be called after {@link #openTxn(String)}.
+   * requires transactions, this should be called after {@link #openTxn(Context, String)}.
    * A list of acquired locks will be stored in the
    * {@link org.apache.hadoop.hive.ql.Context} object and can be retrieved
    * via {@link org.apache.hadoop.hive.ql.Context#getHiveLocks}.
@@ -208,17 +208,13 @@ public interface HiveTxnManager {
   boolean supportsAcid();
 
   /**
-   * This behaves exactly as
-   * https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#setAutoCommit(boolean)
-   */
-  void setAutoCommit(boolean autoCommit) throws LockException;
-
-  /**
-   * This behaves exactly as
-   * https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#getAutoCommit()
+   * For resources that support MVCC, the state of the DB must be recorded for the duration of the
+   * operation/transaction.  Returns {@code true} if current statment needs to do this.
    */
-  boolean getAutoCommit();
+  boolean recordSnapshot(QueryPlan queryPlan);
 
+  boolean isImplicitTransactionOpen();
+  
   boolean isTxnOpen();
   /**
    * if {@code isTxnOpen()}, returns the currently active transaction ID

http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
index 9fa416c..8dbbf87 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
@@ -45,7 +45,6 @@ import org.apache.hadoop.hive.ql.plan.UnlockTableDesc;
 abstract class HiveTxnManagerImpl implements HiveTxnManager {
 
   protected HiveConf conf;
-  private boolean isAutoCommit = true;//true by default; matches JDBC spec
 
   void setHiveConf(HiveConf c) {
     conf = c;
@@ -68,16 +67,6 @@ abstract class HiveTxnManagerImpl implements HiveTxnManager {
     destruct();
   }
   @Override
-  public void setAutoCommit(boolean autoCommit) throws LockException {
-    isAutoCommit = autoCommit;
-  }
-
-  @Override
-  public boolean getAutoCommit() {
-    return isAutoCommit;
-  }
-
-  @Override
   public int lockTable(Hive db, LockTableDesc lockTbl) throws HiveException {
     HiveLockManager lockMgr = getAndCheckLockManager();
 
@@ -203,4 +192,13 @@ abstract class HiveTxnManagerImpl implements HiveTxnManager {
 
     return lockMgr;
   }
+  @Override
+  public boolean recordSnapshot(QueryPlan queryPlan) {
+    return false;
+  }
+  @Override
+  public boolean isImplicitTransactionOpen() {
+    return true;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java
index f62cf9a..668783a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java
@@ -125,6 +125,7 @@ public class ExplainSemanticAnalyzer extends BaseSemanticAnalyzer {
     }
 
     ctx.setExplainConfig(config);
+    ctx.setExplainPlan(true);
 
     ASTNode input = (ASTNode) ast.getChild(0);
     // explain analyze is composed of two steps
@@ -137,7 +138,7 @@ public class ExplainSemanticAnalyzer extends BaseSemanticAnalyzer {
       Context runCtx = null;
       try {
         runCtx = new Context(conf);
-        // runCtx and ctx share the configuration
+        // runCtx and ctx share the configuration, but not isExplainPlan()
         runCtx.setExplainConfig(config);
         Driver driver = new Driver(conf, runCtx);
         CommandProcessorResponse ret = driver.run(query);
@@ -161,6 +162,9 @@ public class ExplainSemanticAnalyzer extends BaseSemanticAnalyzer {
       LOG.info("Explain analyze (analyzing phase) for query " + query);
       config.setAnalyze(AnalyzeState.ANALYZING);
     }
+    //Creating new QueryState unfortunately causes all .q.out to change - do this in a separate ticket
+    //Sharing QueryState between generating the plan and executing the query seems bad
+    //BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(new QueryState(queryState.getConf()), input);
     BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, input);
     sem.analyze(input, ctx);
     sem.validate();

http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
index 520d3de..3c60e03 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
@@ -21,9 +21,9 @@ package org.apache.hadoop.hive.ql.parse;
 import org.antlr.runtime.tree.Tree;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.QueryState;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.HiveOperation;
-import org.apache.hadoop.hive.ql.session.SessionState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 
@@ -32,6 +32,7 @@ import java.util.HashMap;
  *
  */
 public final class SemanticAnalyzerFactory {
+  static final private Logger LOG = LoggerFactory.getLogger(SemanticAnalyzerFactory.class);
 
   static HashMap<Integer, HiveOperation> commandType = new HashMap<Integer, HiveOperation>();
   static HashMap<Integer, HiveOperation[]> tablePartitionCommandType = new HashMap<Integer, HiveOperation[]>();
@@ -131,7 +132,6 @@ public final class SemanticAnalyzerFactory {
     commandType.put(HiveParser.TOK_REPL_DUMP, HiveOperation.EXPORT); // piggyback on EXPORT security handling for now
     commandType.put(HiveParser.TOK_REPL_LOAD, HiveOperation.IMPORT); // piggyback on IMPORT security handling for now
     commandType.put(HiveParser.TOK_REPL_STATUS, HiveOperation.SHOW_TBLPROPERTIES); // TODO : also actually DESCDATABASE
-
   }
 
   static {
@@ -171,7 +171,22 @@ public final class SemanticAnalyzerFactory {
         HiveOperation.ALTERTABLE_UPDATEPARTSTATS});
   }
 
-  public static BaseSemanticAnalyzer get(QueryState queryState, ASTNode tree)
+  
+  public static BaseSemanticAnalyzer get(QueryState queryState, ASTNode tree) throws SemanticException {
+    BaseSemanticAnalyzer sem = getInternal(queryState, tree);
+    if(queryState.getHiveOperation() == null) {
+      String query = queryState.getQueryString();
+      if(query != null && query.length() > 30) {
+        query = query.substring(0, 30);
+      }
+      String msg = "Unknown HiveOperation for query='" + query + "' queryId=" + queryState.getQueryId();
+      //throw new IllegalStateException(msg);
+      LOG.debug(msg);
+    }
+    return sem;
+  }
+  
+  private static BaseSemanticAnalyzer getInternal(QueryState queryState, ASTNode tree)
       throws SemanticException {
     if (tree.getToken() == null) {
       throw new RuntimeException("Empty Syntax Tree");

http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java
index d333f91..ecac31f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java
@@ -27,7 +27,7 @@ public enum HiveOperation {
   IMPORT("IMPORT", null, new Privilege[]{Privilege.ALTER_METADATA, Privilege.ALTER_DATA}),
   CREATEDATABASE("CREATEDATABASE", null, new Privilege[]{Privilege.CREATE}),
   DROPDATABASE("DROPDATABASE", null, new Privilege[]{Privilege.DROP}),
-  SWITCHDATABASE("SWITCHDATABASE", null, null),
+  SWITCHDATABASE("SWITCHDATABASE", null, null, true, false),
   LOCKDB("LOCKDATABASE",  new Privilege[]{Privilege.LOCK}, null),
   UNLOCKDB("UNLOCKDATABASE",  new Privilege[]{Privilege.LOCK}, null),
   DROPTABLE ("DROPTABLE", null, new Privilege[]{Privilege.DROP}),
@@ -60,19 +60,19 @@ public enum HiveOperation {
       new Privilege[]{Privilege.ALTER_METADATA}, null),
   ALTERPARTITION_BUCKETNUM("ALTERPARTITION_BUCKETNUM",
       new Privilege[]{Privilege.ALTER_METADATA}, null),
-  SHOWDATABASES("SHOWDATABASES", new Privilege[]{Privilege.SHOW_DATABASE}, null),
-  SHOWTABLES("SHOWTABLES", null, null),
-  SHOWCOLUMNS("SHOWCOLUMNS", null, null),
-  SHOW_TABLESTATUS("SHOW_TABLESTATUS", null, null),
-  SHOW_TBLPROPERTIES("SHOW_TBLPROPERTIES", null, null),
+  SHOWDATABASES("SHOWDATABASES", new Privilege[]{Privilege.SHOW_DATABASE}, null, true, false),
+  SHOWTABLES("SHOWTABLES", null, null, true, false),
+  SHOWCOLUMNS("SHOWCOLUMNS", null, null, true, false),
+  SHOW_TABLESTATUS("SHOW_TABLESTATUS", null, null, true, false),
+  SHOW_TBLPROPERTIES("SHOW_TBLPROPERTIES", null, null, true, false),
   SHOW_CREATEDATABASE("SHOW_CREATEDATABASE", new Privilege[]{Privilege.SELECT}, null),
   SHOW_CREATETABLE("SHOW_CREATETABLE", new Privilege[]{Privilege.SELECT}, null),
-  SHOWFUNCTIONS("SHOWFUNCTIONS", null, null),
-  SHOWINDEXES("SHOWINDEXES", null, null),
+  SHOWFUNCTIONS("SHOWFUNCTIONS", null, null, true, false),
+  SHOWINDEXES("SHOWINDEXES", null, null, true, false),
   SHOWPARTITIONS("SHOWPARTITIONS", null, null),
-  SHOWLOCKS("SHOWLOCKS", null, null),
+  SHOWLOCKS("SHOWLOCKS", null, null, true, false),
   SHOWCONF("SHOWCONF", null, null),
-  SHOWVIEWS("SHOWVIEWS", null, null),
+  SHOWVIEWS("SHOWVIEWS", null, null, true, false),
   CREATEFUNCTION("CREATEFUNCTION", null, null),
   DROPFUNCTION("DROPFUNCTION", null, null),
   RELOADFUNCTION("RELOADFUNCTION", null, null),
@@ -94,12 +94,12 @@ public enum HiveOperation {
   DROPROLE("DROPROLE", null, null),
   GRANT_PRIVILEGE("GRANT_PRIVILEGE", null, null),
   REVOKE_PRIVILEGE("REVOKE_PRIVILEGE", null, null),
-  SHOW_GRANT("SHOW_GRANT", null, null),
+  SHOW_GRANT("SHOW_GRANT", null, null, true, false),
   GRANT_ROLE("GRANT_ROLE", null, null),
   REVOKE_ROLE("REVOKE_ROLE", null, null),
-  SHOW_ROLES("SHOW_ROLES", null, null),
-  SHOW_ROLE_PRINCIPALS("SHOW_ROLE_PRINCIPALS", null, null),
-  SHOW_ROLE_GRANT("SHOW_ROLE_GRANT", null, null),
+  SHOW_ROLES("SHOW_ROLES", null, null, true, false),
+  SHOW_ROLE_PRINCIPALS("SHOW_ROLE_PRINCIPALS", null, null, true, false),
+  SHOW_ROLE_GRANT("SHOW_ROLE_GRANT", null, null, true, false),
   ALTERTABLE_FILEFORMAT("ALTERTABLE_FILEFORMAT", new Privilege[]{Privilege.ALTER_METADATA}, null),
   ALTERPARTITION_FILEFORMAT("ALTERPARTITION_FILEFORMAT", new Privilege[]{Privilege.ALTER_METADATA}, null),
   ALTERTABLE_LOCATION("ALTERTABLE_LOCATION", new Privilege[]{Privilege.ALTER_DATA}, null),
@@ -128,8 +128,8 @@ public enum HiveOperation {
   ALTERVIEW_RENAME("ALTERVIEW_RENAME", new Privilege[] {Privilege.ALTER_METADATA}, null),
   ALTERVIEW_AS("ALTERVIEW_AS", new Privilege[] {Privilege.ALTER_METADATA}, null),
   ALTERTABLE_COMPACT("ALTERTABLE_COMPACT", new Privilege[]{Privilege.SELECT}, new Privilege[]{Privilege.ALTER_DATA}),
-  SHOW_COMPACTIONS("SHOW COMPACTIONS", null, null),
-  SHOW_TRANSACTIONS("SHOW TRANSACTIONS", null, null),
+  SHOW_COMPACTIONS("SHOW COMPACTIONS", null, null, true, false),
+  SHOW_TRANSACTIONS("SHOW TRANSACTIONS", null, null, true, false),
   START_TRANSACTION("START TRANSACTION", null, null, false, false),
   COMMIT("COMMIT", null, null, true, true),
   ROLLBACK("ROLLBACK", null, null, true, true),
@@ -143,7 +143,10 @@ public enum HiveOperation {
   private Privilege[] outputRequiredPrivileges;
 
   /**
-   * Only a small set of operations is allowed inside an open transactions, e.g. DML
+   * Only a small set of operations is allowed inside an explicit transactions, e.g. DML on
+   * Acid tables or ops w/o persistent side effects like USE DATABASE, SHOW TABLES, etc so
+   * that rollback is meaningful
+   * todo: mark all operations appropriately
    */
   private final boolean allowedInTransaction;
   private final boolean requiresOpenTransaction;

http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index ffce1d1..7692512 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -459,6 +459,21 @@ public class SessionState {
     return txnMgr;
   }
 
+  /**
+   * This only for testing.  It allows to switch the manager before the (test) operation so that
+   * it's not coupled to the executing thread.  Since tests run against Derby which often wedges
+   * under concurrent access, tests must use a single thead and simulate concurrent access.
+   * For example, {@code TestDbTxnManager2}
+   */
+  @VisibleForTesting
+  public HiveTxnManager setTxnMgr(HiveTxnManager mgr) {
+    if(!(sessionConf.getBoolVar(ConfVars.HIVE_IN_TEST) || sessionConf.getBoolVar(ConfVars.HIVE_IN_TEZ_TEST))) {
+      throw new IllegalStateException("Only for testing!");
+    }
+    HiveTxnManager tmp = txnMgr;
+    txnMgr = mgr;
+    return tmp;
+  }
   public HadoopShims.HdfsEncryptionShim getHdfsEncryptionShim() throws HiveException {
     try {
       return getHdfsEncryptionShim(FileSystem.get(sessionConf));

http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index adfe98a..23efce0 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -713,7 +713,7 @@ public class TestTxnHandler {
     res = txnHandler.lock(req);
     assertTrue(res.getState() == LockState.ACQUIRED);
   }
-
+  @Ignore("now that every op has a txn ctx, we don't produce the error expected here....")
   @Test
   public void testWrongLockForOperation() throws Exception {
     LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");

http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/test/org/apache/hadoop/hive/ql/TestErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestErrorMsg.java b/ql/src/test/org/apache/hadoop/hive/ql/TestErrorMsg.java
index c5b658f..7fe902e 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestErrorMsg.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestErrorMsg.java
@@ -23,6 +23,7 @@ import java.util.Set;
 
 import junit.framework.Assert;
 import junit.framework.TestCase;
+import org.apache.hadoop.hive.common.JavaUtils;
 import org.junit.Test;
 
 public class TestErrorMsg {
@@ -37,8 +38,9 @@ public class TestErrorMsg {
   }
   @Test
   public void testReverseMatch() {
-    testReverseMatch(ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT, "COMMIT");
-    testReverseMatch(ErrorMsg.OP_NOT_ALLOWED_IN_TXN, "ALTER TABLE", "1");
+    testReverseMatch(ErrorMsg.OP_NOT_ALLOWED_IN_IMPLICIT_TXN, "COMMIT");
+    testReverseMatch(ErrorMsg.OP_NOT_ALLOWED_IN_TXN, "ALTER TABLE",
+      JavaUtils.txnIdToString(1), "123");
     testReverseMatch(ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, "ROLLBACK");
   }
   private void testReverseMatch(ErrorMsg errorMsg, String... args) {