You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ga...@apache.org on 2014/09/14 00:09:33 UTC

svn commit: r1624788 [1/5] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ data/conf/tez/ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/ itests/sr...

Author: gates
Date: Sat Sep 13 22:09:31 2014
New Revision: 1624788

URL: http://svn.apache.org/r1624788
Log:
HIVE-7788 Generate plans for insert, update, and delete (Alan Gates, reviewed by Thejas Nair)

Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/parse/TestUpdateDeleteSemanticAnalyzer.java
    hive/trunk/ql/src/test/queries/clientnegative/acid_overwrite.q
    hive/trunk/ql/src/test/queries/clientnegative/delete_not_acid.q
    hive/trunk/ql/src/test/queries/clientnegative/update_not_acid.q
    hive/trunk/ql/src/test/queries/clientnegative/update_partition_col.q
    hive/trunk/ql/src/test/queries/clientpositive/delete_all_non_partitioned.q
    hive/trunk/ql/src/test/queries/clientpositive/delete_all_partitioned.q
    hive/trunk/ql/src/test/queries/clientpositive/delete_orig_table.q
    hive/trunk/ql/src/test/queries/clientpositive/delete_tmp_table.q
    hive/trunk/ql/src/test/queries/clientpositive/delete_where_no_match.q
    hive/trunk/ql/src/test/queries/clientpositive/delete_where_non_partitioned.q
    hive/trunk/ql/src/test/queries/clientpositive/delete_where_partitioned.q
    hive/trunk/ql/src/test/queries/clientpositive/delete_whole_partition.q
    hive/trunk/ql/src/test/queries/clientpositive/insert_orig_table.q
    hive/trunk/ql/src/test/queries/clientpositive/insert_update_delete.q
    hive/trunk/ql/src/test/queries/clientpositive/insert_values_dynamic_partitioned.q
    hive/trunk/ql/src/test/queries/clientpositive/insert_values_non_partitioned.q
    hive/trunk/ql/src/test/queries/clientpositive/insert_values_orig_table.q
    hive/trunk/ql/src/test/queries/clientpositive/insert_values_partitioned.q
    hive/trunk/ql/src/test/queries/clientpositive/insert_values_tmp_table.q
    hive/trunk/ql/src/test/queries/clientpositive/update_after_multiple_inserts.q
    hive/trunk/ql/src/test/queries/clientpositive/update_all_non_partitioned.q
    hive/trunk/ql/src/test/queries/clientpositive/update_all_partitioned.q
    hive/trunk/ql/src/test/queries/clientpositive/update_all_types.q
    hive/trunk/ql/src/test/queries/clientpositive/update_orig_table.q
    hive/trunk/ql/src/test/queries/clientpositive/update_tmp_table.q
    hive/trunk/ql/src/test/queries/clientpositive/update_two_cols.q
    hive/trunk/ql/src/test/queries/clientpositive/update_where_no_match.q
    hive/trunk/ql/src/test/queries/clientpositive/update_where_non_partitioned.q
    hive/trunk/ql/src/test/queries/clientpositive/update_where_partitioned.q
    hive/trunk/ql/src/test/results/clientnegative/acid_overwrite.q.out
    hive/trunk/ql/src/test/results/clientnegative/delete_not_acid.q.out
    hive/trunk/ql/src/test/results/clientnegative/update_not_acid.q.out
    hive/trunk/ql/src/test/results/clientnegative/update_partition_col.q.out
    hive/trunk/ql/src/test/results/clientpositive/delete_all_non_partitioned.q.out
    hive/trunk/ql/src/test/results/clientpositive/delete_all_partitioned.q.out
    hive/trunk/ql/src/test/results/clientpositive/delete_orig_table.q.out
    hive/trunk/ql/src/test/results/clientpositive/delete_tmp_table.q.out
    hive/trunk/ql/src/test/results/clientpositive/delete_where_no_match.q.out
    hive/trunk/ql/src/test/results/clientpositive/delete_where_non_partitioned.q.out
    hive/trunk/ql/src/test/results/clientpositive/delete_where_partitioned.q.out
    hive/trunk/ql/src/test/results/clientpositive/delete_whole_partition.q.out
    hive/trunk/ql/src/test/results/clientpositive/insert_orig_table.q.out
    hive/trunk/ql/src/test/results/clientpositive/insert_update_delete.q.out
    hive/trunk/ql/src/test/results/clientpositive/insert_values_dynamic_partitioned.q.out
    hive/trunk/ql/src/test/results/clientpositive/insert_values_non_partitioned.q.out
    hive/trunk/ql/src/test/results/clientpositive/insert_values_orig_table.q.out
    hive/trunk/ql/src/test/results/clientpositive/insert_values_partitioned.q.out
    hive/trunk/ql/src/test/results/clientpositive/insert_values_tmp_table.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/delete_all_non_partitioned.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/delete_all_partitioned.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/delete_orig_table.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/delete_tmp_table.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/delete_where_no_match.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/delete_where_non_partitioned.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/delete_where_partitioned.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/delete_whole_partition.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/insert_orig_table.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/insert_update_delete.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/insert_values_dynamic_partitioned.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/insert_values_non_partitioned.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/insert_values_orig_table.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/insert_values_partitioned.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/insert_values_tmp_table.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/update_after_multiple_inserts.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/update_all_non_partitioned.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/update_all_partitioned.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/update_all_types.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/update_orig_table.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/update_tmp_table.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/update_two_cols.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/update_where_no_match.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/update_where_non_partitioned.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/update_where_partitioned.q.out
    hive/trunk/ql/src/test/results/clientpositive/update_after_multiple_inserts.q.out
    hive/trunk/ql/src/test/results/clientpositive/update_all_non_partitioned.q.out
    hive/trunk/ql/src/test/results/clientpositive/update_all_partitioned.q.out
    hive/trunk/ql/src/test/results/clientpositive/update_all_types.q.out
    hive/trunk/ql/src/test/results/clientpositive/update_orig_table.q.out
    hive/trunk/ql/src/test/results/clientpositive/update_tmp_table.q.out
    hive/trunk/ql/src/test/results/clientpositive/update_two_cols.q.out
    hive/trunk/ql/src/test/results/clientpositive/update_where_no_match.q.out
    hive/trunk/ql/src/test/results/clientpositive/update_where_non_partitioned.q.out
    hive/trunk/ql/src/test/results/clientpositive/update_where_partitioned.q.out
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/data/conf/tez/hive-site.xml
    hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java
    hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
    hive/trunk/itests/src/test/resources/testconfiguration.properties
    hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/StorageFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
    hive/trunk/ql/src/test/results/clientnegative/invalid_cast_from_binary_1.q.out

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Sat Sep 13 22:09:31 2014
@@ -301,6 +301,9 @@ public class HiveConf extends Configurat
 
     HIVE_IN_TEST("hive.in.test", false, "internal usage only, true in test mode", true),
 
+    HIVE_IN_TEZ_TEST("hive.in.tez.test", false, "internal use only, true when in testing tez",
+        true),
+
     LOCALMODEAUTO("hive.exec.mode.local.auto", false,
         "Let Hive determine whether to run in local mode automatically"),
     LOCALMODEMAXBYTES("hive.exec.mode.local.auto.inputbytes.max", 134217728L,

Modified: hive/trunk/data/conf/tez/hive-site.xml
URL: http://svn.apache.org/viewvc/hive/trunk/data/conf/tez/hive-site.xml?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
Binary files - no diff available.

Modified: hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java
URL: http://svn.apache.org/viewvc/hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java (original)
+++ hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java Sat Sep 13 22:09:31 2014
@@ -103,7 +103,7 @@ public class TestHiveHistory extends Tes
         db.dropTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, src, true, true);
         db.createTable(src, cols, null, TextInputFormat.class,
             IgnoreKeyTextOutputFormat.class);
-        db.loadTable(hadoopDataFile[i], src, false, false, false, false);
+        db.loadTable(hadoopDataFile[i], src, false, false, false, false, false);
         i++;
       }
 

Modified: hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
URL: http://svn.apache.org/viewvc/hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java (original)
+++ hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java Sat Sep 13 22:09:31 2014
@@ -147,10 +147,10 @@ public class TestCompactor {
     execSelectAndDumpData("select * from " + tblNameStg, driver, "Dumping data for " +
       tblNameStg + " after load:");
     executeStatementOnDriver("FROM " + tblNameStg +
-      " INSERT OVERWRITE TABLE " + tblName + " PARTITION(bkt=0) " +
+      " INSERT INTO TABLE " + tblName + " PARTITION(bkt=0) " +
       "SELECT a, b where a < 2", driver);
     executeStatementOnDriver("FROM " + tblNameStg +
-      " INSERT OVERWRITE TABLE " + tblName + " PARTITION(bkt=1) " +
+      " INSERT INTO TABLE " + tblName + " PARTITION(bkt=1) " +
       "SELECT a, b where a >= 2", driver);
     execSelectAndDumpData("select * from " + tblName, driver, "Dumping data for " +
       tblName + " after load:");

Modified: hive/trunk/itests/src/test/resources/testconfiguration.properties
URL: http://svn.apache.org/viewvc/hive/trunk/itests/src/test/resources/testconfiguration.properties?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/itests/src/test/resources/testconfiguration.properties (original)
+++ hive/trunk/itests/src/test/resources/testconfiguration.properties Sat Sep 13 22:09:31 2014
@@ -63,6 +63,14 @@ minitez.query.files.shared=alter_merge_2
   cross_product_check_2.q,\
   ctas.q,\
   custom_input_output_format.q,\
+  delete_all_non_partitioned.q,\
+  delete_all_partitioned.q,\
+  delete_orig_table.q,\
+  delete_tmp_table.q,\
+  delete_where_no_match.q,\
+  delete_where_non_partitioned.q,\
+  delete_where_partitioned.q,\
+  delete_whole_partition.q,\
   disable_merge_for_bucketing.q,\
   dynpart_sort_opt_vectorization.q,\
   dynpart_sort_optimization.q,\
@@ -76,6 +84,13 @@ minitez.query.files.shared=alter_merge_2
   insert1.q,\
   insert_into1.q,\
   insert_into2.q,\
+  insert_orig_table.q,\
+  insert_values_dynamic_partitioned.q,\
+  insert_values_non_partitioned.q,\
+  insert_values_orig_table.q\
+  insert_values_partitioned.q,\
+  insert_values_tmp_table.q,\
+  insert_update_delete.q,\
   join0.q,\
   join1.q,\
   leftsemijoin.q,\
@@ -127,6 +142,16 @@ minitez.query.files.shared=alter_merge_2
   union7.q,\
   union8.q,\
   union9.q,\
+  update_after_multiple_inserts.q,\
+  update_all_non_partitioned.q,\
+  update_all_partitioned.q,\
+  update_all_types.q,\
+  update_orig_table.q,\
+  update_tmp_table.q,\
+  update_where_no_match.q,\
+  update_where_non_partitioned.q,\
+  update_where_partitioned.q,\
+  update_two_cols.q,\
   vector_cast_constant.q,\
   vector_data_types.q,\
   vector_decimal_aggregate.q,\

Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java (original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java Sat Sep 13 22:09:31 2014
@@ -1081,7 +1081,8 @@ public class TxnHandler {
   private static Map<LockType, Map<LockType, Map<LockState, LockAction>>> jumpTable;
 
   private void checkQFileTestHack() {
-    boolean hackOn = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST);
+    boolean hackOn = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST) ||
+        HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEZ_TEST);
     if (hackOn) {
       LOG.info("Hacking in canned values for transaction manager");
       // Set up the transaction/locking db in the derby metastore

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java Sat Sep 13 22:09:31 2014
@@ -43,6 +43,7 @@ import org.apache.hadoop.hive.common.Fil
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.TaskRunner;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj;
@@ -98,6 +99,11 @@ public class Context {
   // Transaction manager for this query
   protected HiveTxnManager hiveTxnManager;
 
+  // Used to track what type of acid operation (insert, update, or delete) we are doing.  Useful
+  // since we want to change where bucket columns are accessed in some operators and
+  // optimizations when doing updates and deletes.
+  private AcidUtils.Operation acidOperation = AcidUtils.Operation.NOT_ACID;
+
   private boolean needLockMgr;
 
   // Keep track of the mapping from load table desc to the output and the lock
@@ -612,4 +618,12 @@ public class Context {
   public void setTryCount(int tryCount) {
     this.tryCount = tryCount;
   }
+
+  public void setAcidOperation(AcidUtils.Operation op) {
+    acidOperation = op;
+  }
+
+  public AcidUtils.Operation getAcidOperation() {
+    return acidOperation;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Sat Sep 13 22:09:31 2014
@@ -96,6 +96,7 @@ import org.apache.hadoop.hive.ql.parse.S
 import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.VariableSubstitution;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.HiveOperation;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -137,6 +138,9 @@ public class Driver implements CommandPr
   private String SQLState;
   private Throwable downstreamError;
 
+  // A list of FileSinkOperators writing in an ACID compliant manner
+  private Set<FileSinkDesc> acidSinks;
+
   // A limit on the number of threads that can be launched
   private int maxthreads;
   private int tryCount = Integer.MAX_VALUE;
@@ -408,6 +412,9 @@ public class Driver implements CommandPr
       } else {
         sem.analyze(tree, ctx);
       }
+      // Record any ACID compliant FileSinkOperators we saw so we can add our transaction ID to
+      // them later.
+      acidSinks = sem.getAcidFileSinks();
 
       LOG.info("Semantic Analysis Completed");
 
@@ -723,6 +730,11 @@ public class Driver implements CommandPr
         //do not authorize temporary uris
         continue;
       }
+      if (privObject instanceof ReadEntity && ((ReadEntity)privObject).isUpdateOrDelete()) {
+        // Skip this one, as we don't want to check select privileges for the table we're reading
+        // for an update or delete.
+        continue;
+      }
 
       //support for authorization on partitions needs to be added
       String dbname = null;
@@ -859,7 +871,9 @@ public class Driver implements CommandPr
   private int recordValidTxns() {
     try {
       ValidTxnList txns = SessionState.get().getTxnMgr().getValidTxns();
-      conf.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
+      String txnStr = txns.toString();
+      conf.set(ValidTxnList.VALID_TXNS_KEY, txnStr);
+      LOG.debug("Encoding valid txns info " + txnStr);
       return 0;
     } catch (LockException e) {
       errorMessage = "FAILED: Error in determing valid transactions: " + e.getMessage();
@@ -877,13 +891,44 @@ public class Driver implements CommandPr
    * pretty simple. If all the locks cannot be obtained, error out. Deadlock is avoided by making
    * sure that the locks are lexicographically sorted.
    **/
-  private int acquireReadWriteLocks() {
+  private int acquireLocksAndOpenTxn() {
     PerfLogger perfLogger = PerfLogger.getPerfLogger();
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ACQUIRE_READ_WRITE_LOCKS);
 
+    SessionState ss = SessionState.get();
+    HiveTxnManager txnMgr = ss.getTxnMgr();
 
     try {
-      SessionState.get().getTxnMgr().acquireLocks(plan, ctx, userName);
+      // Don't use the userName member, as it may or may not have been set.  Get the value from
+      // conf, which calls into getUGI to figure out who the process is running as.
+      String userFromUGI;
+      try {
+        userFromUGI = conf.getUser();
+      } catch (IOException e) {
+        errorMessage = "FAILED: Error in determining user while acquiring locks: " + e.getMessage();
+        SQLState = ErrorMsg.findSQLState(e.getMessage());
+        downstreamError = e;
+        console.printError(errorMessage,
+            "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
+        return 10;
+      }
+      if (acidSinks != null && acidSinks.size() > 0) {
+        // We are writing to tables in an ACID compliant way, so we need to open a transaction
+        long txnId = ss.getCurrentTxn();
+        if (txnId == SessionState.NO_CURRENT_TXN) {
+          txnId = txnMgr.openTxn(userFromUGI);
+          ss.setCurrentTxn(txnId);
+        }
+        // Set the transaction id in all of the acid file sinks
+        if (acidSinks != null) {
+          for (FileSinkDesc desc : acidSinks) {
+            desc.setTransactionId(txnId);
+          }
+        }
+      }
+
+      txnMgr.acquireLocks(plan, ctx, userFromUGI);
+
       return 0;
     } catch (LockException e) {
       errorMessage = "FAILED: Error in acquiring locks: " + e.getMessage();
@@ -901,13 +946,33 @@ public class Driver implements CommandPr
    * @param hiveLocks
    *          list of hive locks to be released Release all the locks specified. If some of the
    *          locks have already been released, ignore them
+   * @param commit if there is an open transaction and if true, commit,
+   *               if false rollback.  If there is no open transaction this parameter is ignored.
+   *
    **/
-  private void releaseLocks(List<HiveLock> hiveLocks) throws LockException {
+  private void releaseLocksAndCommitOrRollback(List<HiveLock> hiveLocks, boolean commit)
+      throws LockException {
     PerfLogger perfLogger = PerfLogger.getPerfLogger();
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RELEASE_LOCKS);
 
-    if (hiveLocks != null) {
-      SessionState.get().getTxnMgr().getLockManager().releaseLocks(hiveLocks);
+    SessionState ss = SessionState.get();
+    HiveTxnManager txnMgr = ss.getTxnMgr();
+    // If we've opened a transaction we need to commit or rollback rather than explicitly
+    // releasing the locks.
+    if (ss.getCurrentTxn() != SessionState.NO_CURRENT_TXN && ss.isAutoCommit()) {
+      try {
+        if (commit) {
+          txnMgr.commitTxn();
+        } else {
+          txnMgr.rollbackTxn();
+        }
+      } finally {
+        ss.setCurrentTxn(SessionState.NO_CURRENT_TXN);
+      }
+    } else {
+      if (hiveLocks != null) {
+        txnMgr.getLockManager().releaseLocks(hiveLocks);
+      }
     }
     ctx.setHiveLocks(null);
 
@@ -994,7 +1059,7 @@ public class Driver implements CommandPr
     }
     if (ret != 0) {
       try {
-        releaseLocks(ctx.getHiveLocks());
+        releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), false);
       } catch (LockException e) {
         LOG.warn("Exception in releasing locks. "
             + org.apache.hadoop.util.StringUtils.stringifyException(e));
@@ -1097,10 +1162,10 @@ public class Driver implements CommandPr
     }
 
     if (requireLock) {
-      ret = acquireReadWriteLocks();
+      ret = acquireLocksAndOpenTxn();
       if (ret != 0) {
         try {
-          releaseLocks(ctx.getHiveLocks());
+          releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), false);
         } catch (LockException e) {
           // Not much to do here
         }
@@ -1112,7 +1177,7 @@ public class Driver implements CommandPr
     if (ret != 0) {
       //if needRequireLock is false, the release here will do nothing because there is no lock
       try {
-        releaseLocks(ctx.getHiveLocks());
+        releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), false);
       } catch (LockException e) {
         // Nothing to do here
       }
@@ -1121,7 +1186,7 @@ public class Driver implements CommandPr
 
     //if needRequireLock is false, the release here will do nothing because there is no lock
     try {
-      releaseLocks(ctx.getHiveLocks());
+      releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), true);
     } catch (LockException e) {
       errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);
       SQLState = ErrorMsg.findSQLState(e.getMessage());
@@ -1666,7 +1731,7 @@ public class Driver implements CommandPr
     destroyed = true;
     if (ctx != null) {
       try {
-        releaseLocks(ctx.getHiveLocks());
+        releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), false);
       } catch (LockException e) {
         LOG.warn("Exception when releasing locking in destroy: " +
             e.getMessage());

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java Sat Sep 13 22:09:31 2014
@@ -404,6 +404,19 @@ public enum ErrorMsg {
       "time."),
   DISTINCT_NOT_SUPPORTED(10285, "Distinct keyword is not support in current context"),
 
+  UPDATEDELETE_PARSE_ERROR(10290, "Encountered parse error while parsing rewritten update or " +
+      "delete query"),
+  UPDATEDELETE_IO_ERROR(10291, "Encountered I/O error while parsing rewritten update or " +
+      "delete query"),
+  UPDATE_CANNOT_UPDATE_PART_VALUE(10292, "Updating values of partition columns is not supported"),
+  INSERT_CANNOT_CREATE_TEMP_FILE(10293, "Unable to create temp file for insert values "),
+  ACID_OP_ON_NONACID_TXNMGR(10294, "Attempt to do update or delete using transaction manager that" +
+      " does not support these operations."),
+  NO_INSERT_OVERWRITE_WITH_ACID(10295, "INSERT OVERWRITE not allowed on table with OutputFormat " +
+      "that implements AcidOutputFormat while transaction manager that supports ACID is in use"),
+  VALUES_TABLE_CONSTRUCTOR_NOT_SUPPORTED(10296,
+      "Values clause with table constructor not yet supported"),
+
   //========================== 20000 range starts here ========================//
   SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."),
   SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. "
@@ -460,7 +473,10 @@ public enum ErrorMsg {
       "to fail because of this, set hive.stats.atomic=false", true),
   STATS_SKIPPING_BY_ERROR(30017, "Skipping stats aggregation by error {0}", true),
   ORC_CORRUPTED_READ(30018, "Corruption in ORC data encountered. To skip reading corrupted "
-      + "data, set " + HiveConf.ConfVars.HIVE_ORC_SKIP_CORRUPT_DATA + " to true");
+      + "data, set " + HiveConf.ConfVars.HIVE_ORC_SKIP_CORRUPT_DATA + " to true"),
+
+
+
   ;
 
   private int errorCode;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Sat Sep 13 22:09:31 2014
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.HiveStatsUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.ql.Context;
@@ -35,6 +36,7 @@ import org.apache.hadoop.hive.ql.exec.mr
 import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask;
 import org.apache.hadoop.hive.ql.hooks.LineageInfo.DataContainer;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.merge.MergeFileTask;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
@@ -285,7 +287,8 @@ public class MoveTask extends Task<MoveW
           dc = new DataContainer(table.getTTable());
           db.loadTable(tbd.getSourcePath(), tbd.getTable()
               .getTableName(), tbd.getReplace(), tbd.getHoldDDLTime(), work.isSrcLocal(),
-              isSkewedStoredAsDirs(tbd));
+              isSkewedStoredAsDirs(tbd),
+              work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID);
           if (work.getOutputs() != null) {
             work.getOutputs().add(new WriteEntity(table,
                 (tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE :
@@ -365,7 +368,8 @@ public class MoveTask extends Task<MoveW
                 tbd.getReplace(),
                 dpCtx.getNumDPCols(),
                 tbd.getHoldDDLTime(),
-                isSkewedStoredAsDirs(tbd));
+                isSkewedStoredAsDirs(tbd),
+                work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID);
 
             if (dp.size() == 0 && conf.getBoolVar(HiveConf.ConfVars.HIVE_ERROR_ON_EMPTY_PARTITION)) {
               throw new HiveException("This query creates no partitions." +
@@ -400,7 +404,10 @@ public class MoveTask extends Task<MoveW
               // update columnar lineage for each partition
               dc = new DataContainer(table.getTTable(), partn.getTPartition());
 
-              if (SessionState.get() != null) {
+              // Don't set lineage on delete as we don't have all the columns
+              if (SessionState.get() != null &&
+                  work.getLoadTableWork().getWriteType() != AcidUtils.Operation.DELETE &&
+                  work.getLoadTableWork().getWriteType() != AcidUtils.Operation.UPDATE) {
                 SessionState.get().getLineageState().setLineage(tbd.getSourcePath(), dc,
                     table.getCols());
               }
@@ -414,7 +421,8 @@ public class MoveTask extends Task<MoveW
             db.validatePartitionNameCharacters(partVals);
             db.loadPartition(tbd.getSourcePath(), tbd.getTable().getTableName(),
                 tbd.getPartitionSpec(), tbd.getReplace(), tbd.getHoldDDLTime(),
-                tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd), work.isSrcLocal());
+                tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd), work.isSrcLocal(),
+                work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID);
             Partition partn = db.getPartition(table, tbd.getPartitionSpec(),
                 false);
 
@@ -433,8 +441,24 @@ public class MoveTask extends Task<MoveW
          }
         }
         if (SessionState.get() != null && dc != null) {
-          SessionState.get().getLineageState().setLineage(tbd.getSourcePath(), dc,
-              table.getCols());
+          // If we are doing an update or a delete the number of columns in the table will not
+          // match the number of columns in the file sink.  For update there will be one too many
+          // (because of the ROW__ID), and in the case of the delete there will be just the
+          // ROW__ID, which we don't need to worry about from a lineage perspective.
+          List<FieldSchema> tableCols = null;
+          switch (work.getLoadTableWork().getWriteType()) {
+            case DELETE:
+            case UPDATE:
+              // Pass an empty list as no columns will be written to the file.
+              // TODO I should be able to make this work for update
+              tableCols = new ArrayList<FieldSchema>();
+              break;
+
+            default:
+              tableCols = table.getCols();
+              break;
+          }
+          SessionState.get().getLineageState().setLineage(tbd.getSourcePath(), dc, tableCols);
         }
         releaseLocks(tbd);
       }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Sat Sep 13 22:09:31 2014
@@ -29,6 +29,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -43,8 +44,10 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
 import org.apache.hadoop.io.BinaryComparable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IntWritable;
@@ -131,10 +134,18 @@ public class ReduceSinkOperator extends 
   // TODO: we only ever use one row of these at a time. Why do we need to cache multiple?
   protected transient Object[][] cachedKeys;
 
+  private StructField recIdField; // field to look for record identifier in
+  private StructField bucketField; // field to look for bucket in record identifier
+  private StructObjectInspector acidRowInspector; // row inspector used by acid options
+  private StructObjectInspector recIdInspector; // OI for the record identifier
+  private IntObjectInspector bucketInspector; // OI for the bucket field in the record id
+
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     try {
       List<ExprNodeDesc> keys = conf.getKeyCols();
+      LOG.debug("keys size is " + keys.size());
+      for (ExprNodeDesc k : keys) LOG.debug("Key exprNodeDesc " + k.getExprString());
       keyEval = new ExprNodeEvaluator[keys.size()];
       int i = 0;
       for (ExprNodeDesc e : keys) {
@@ -259,6 +270,20 @@ public class ReduceSinkOperator extends 
         // TODO: this is fishy - we init object inspectors based on first tag. We
         //       should either init for each tag, or if rowInspector doesn't really
         //       matter, then we can create this in ctor and get rid of firstRow.
+        if (conf.getWriteType() == AcidUtils.Operation.UPDATE ||
+            conf.getWriteType() == AcidUtils.Operation.DELETE) {
+          assert rowInspector instanceof StructObjectInspector :
+              "Exptected rowInspector to be instance of StructObjectInspector but it is a " +
+                  rowInspector.getClass().getName();
+          acidRowInspector = (StructObjectInspector)rowInspector;
+          // The record identifier is always in the first column
+          recIdField = acidRowInspector.getAllStructFieldRefs().get(0);
+          recIdInspector = (StructObjectInspector)recIdField.getFieldObjectInspector();
+          // The bucket field is in the second position
+          bucketField = recIdInspector.getAllStructFieldRefs().get(1);
+          bucketInspector = (IntObjectInspector)bucketField.getFieldObjectInspector();
+        }
+
         LOG.info("keys are " + conf.getOutputKeyColumnNames() + " num distributions: " + conf.getNumDistributionKeys());
         keyObjectInspector = initEvaluatorsAndReturnStruct(keyEval,
             distinctColIndices,
@@ -283,6 +308,11 @@ public class ReduceSinkOperator extends 
       if (bucketEval != null) {
         buckNum = computeBucketNumber(row, conf.getNumBuckets());
         cachedKeys[0][buckColIdxInKey] = new IntWritable(buckNum);
+      } else if (conf.getWriteType() == AcidUtils.Operation.UPDATE ||
+          conf.getWriteType() == AcidUtils.Operation.DELETE) {
+        // In the non-partitioned case we still want to compute the bucket number for updates and
+        // deletes.
+        buckNum = computeBucketNumber(row, conf.getNumBuckets());
       }
 
       HiveKey firstKey = toHiveKey(cachedKeys[0], tag, null);
@@ -339,9 +369,20 @@ public class ReduceSinkOperator extends 
 
   private int computeBucketNumber(Object row, int numBuckets) throws HiveException {
     int buckNum = 0;
-    for (int i = 0; i < bucketEval.length; i++) {
-      Object o = bucketEval[i].evaluate(row);
-      buckNum = buckNum * 31 + ObjectInspectorUtils.hashCode(o, bucketObjectInspectors[i]);
+
+    if (conf.getWriteType() == AcidUtils.Operation.UPDATE ||
+        conf.getWriteType() == AcidUtils.Operation.DELETE) {
+      // We don't need to evalute the hash code.  Instead read the bucket number directly from
+      // the row.  I don't need to evaluate any expressions as I know I am reading the ROW__ID
+      // column directly.
+      Object recIdValue = acidRowInspector.getStructFieldData(row, recIdField);
+      buckNum = bucketInspector.get(recIdInspector.getStructFieldData(recIdValue, bucketField));
+      LOG.debug("Acid choosing bucket number " + buckNum);
+    } else {
+      for (int i = 0; i < bucketEval.length; i++) {
+        Object o = bucketEval[i].evaluate(row);
+        buckNum = buckNum * 31 + ObjectInspectorUtils.hashCode(o, bucketObjectInspectors[i]);
+      }
     }
 
     if (buckNum < 0) {
@@ -385,14 +426,19 @@ public class ReduceSinkOperator extends 
     // Evaluate the HashCode
     int keyHashCode = 0;
     if (partitionEval.length == 0) {
-      // If no partition cols, just distribute the data uniformly to provide better
-      // load balance. If the requirement is to have a single reducer, we should set
-      // the number of reducers to 1.
-      // Use a constant seed to make the code deterministic.
-      if (random == null) {
-        random = new Random(12345);
+      // If no partition cols and not doing an update or delete, just distribute the data uniformly
+      // to provide better load balance. If the requirement is to have a single reducer, we should
+      // set the number of reducers to 1. Use a constant seed to make the code deterministic.
+      // For acid operations make sure to send all records with the same key to the same
+      // FileSinkOperator, as the RecordUpdater interface can't manage multiple writers for a file.
+      if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID) {
+        if (random == null) {
+          random = new Random(12345);
+        }
+        keyHashCode = random.nextInt();
+      } else {
+        keyHashCode = 1;
       }
-      keyHashCode = random.nextInt();
     } else {
       for (int i = 0; i < partitionEval.length; i++) {
         Object o = partitionEval[i].evaluate(row);
@@ -400,6 +446,7 @@ public class ReduceSinkOperator extends 
             + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
       }
     }
+    LOG.debug("Going to return hash code " + (keyHashCode * 31 + buckNum));
     return buckNum < 0  ? keyHashCode : keyHashCode * 31 + buckNum;
   }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java Sat Sep 13 22:09:31 2014
@@ -48,6 +48,11 @@ public class ReadEntity extends Entity i
   // is marked as being read.  Defaults to true as that is the most common case.
   private boolean needsLock = true;
 
+  // When true indicates that this object is being read as part of an update or delete.  This is
+  // important because in that case we shouldn't acquire a lock for it or authorize the read.
+  // These will be handled by the output to the table instead.
+  private boolean isUpdateOrDelete = false;
+
   // For views, the entities can be nested - by default, entities are at the top level
   private final Set<ReadEntity> parents = new HashSet<ReadEntity>();
 
@@ -166,4 +171,12 @@ public class ReadEntity extends Entity i
   public List<String> getAccessedColumns() {
     return accessedColumns;
   }
+
+  public void setUpdateOrDelete(boolean isUpdateOrDelete) {
+    this.isUpdateOrDelete = isUpdateOrDelete;
+  }
+
+  public boolean isUpdateOrDelete() {
+    return isUpdateOrDelete;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java Sat Sep 13 22:09:31 2014
@@ -148,6 +148,16 @@ public class WriteEntity extends Entity 
   }
 
   /**
+   * Only use this if you are very sure of what you are doing.  This is used by the
+   * {@link org.apache.hadoop.hive.ql.parse.UpdateDeleteSemanticAnalyzer} to reset the types to
+   * update or delete after rewriting and reparsing the queries.
+   * @param type new operation type
+   */
+  public void setWriteType(WriteType type) {
+    writeType = type;
+  }
+
+  /**
    * Equals function.
    */
   @Override

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java Sat Sep 13 22:09:31 2014
@@ -26,14 +26,12 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.hive.shims.ShimLoader;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.regex.Pattern;
 
@@ -42,24 +40,40 @@ import java.util.regex.Pattern;
  * are used by the compactor and cleaner and thus must be format agnostic.
  */
 public class AcidUtils {
-  private AcidUtils() {
-    // NOT USED
-  }
-  private static final Log LOG = LogFactory.getLog(AcidUtils.class.getName());
-
   public static final String BASE_PREFIX = "base_";
   public static final String DELTA_PREFIX = "delta_";
+  public static final PathFilter deltaFileFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return path.getName().startsWith(DELTA_PREFIX);
+    }
+  };
   public static final String BUCKET_PREFIX = "bucket_";
-
+  public static final PathFilter bucketFileFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return path.getName().startsWith(BUCKET_PREFIX);
+    }
+  };
   public static final String BUCKET_DIGITS = "%05d";
   public static final String DELTA_DIGITS = "%07d";
+  public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$");
+  public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{5}");
+  public static final PathFilter originalBucketFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return ORIGINAL_PATTERN.matcher(path.getName()).matches();
+    }
+  };
+
+  private AcidUtils() {
+    // NOT USED
+  }
+  private static final Log LOG = LogFactory.getLog(AcidUtils.class.getName());
 
   private static final Pattern ORIGINAL_PATTERN =
       Pattern.compile("[0-9]+_[0-9]+");
 
-  public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$");
-  public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{5}");
-
   public static final PathFilter hiddenFileFilter = new PathFilter(){
     public boolean accept(Path p){
       String name = p.getName();
@@ -67,13 +81,6 @@ public class AcidUtils {
     }
   };
 
-  public static final PathFilter bucketFileFilter = new PathFilter() {
-    @Override
-    public boolean accept(Path path) {
-      return path.getName().startsWith(BUCKET_PREFIX);
-    }
-  };
-
   private static final HadoopShims SHIMS = ShimLoader.getHadoopShims();
 
   /**
@@ -149,7 +156,7 @@ public class AcidUtils {
           .minimumTransactionId(0)
           .maximumTransactionId(0)
           .bucket(bucket);
-    } else if (filename.startsWith(AcidUtils.BUCKET_PREFIX)) {
+    } else if (filename.startsWith(BUCKET_PREFIX)) {
       int bucket =
           Integer.parseInt(filename.substring(filename.indexOf('_') + 1));
       result
@@ -394,7 +401,8 @@ public class AcidUtils {
     }
 
     final Path base = bestBase == null ? null : bestBase.getPath();
-    LOG.debug("base = " + base + " deltas = " + deltas.size());
+    LOG.debug("in directory " + directory.toUri().toString() + " base = " + base + " deltas = " +
+        deltas.size());
 
     return new Directory(){
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java Sat Sep 13 22:09:31 2014
@@ -53,11 +53,12 @@ public class DbTxnManager extends HiveTx
   }
 
   @Override
-  public void openTxn(String user) throws LockException {
+  public long openTxn(String user) throws LockException {
     init();
     try {
       txnId = client.openTxn(user);
       LOG.debug("Opened txn " + txnId);
+      return txnId;
     } catch (TException e) {
       throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(),
           e);
@@ -88,7 +89,11 @@ public class DbTxnManager extends HiveTx
 
     // For each source to read, get a shared lock
     for (ReadEntity input : plan.getInputs()) {
-      if (!input.needsLock()) continue;
+      if (!input.needsLock() || input.isUpdateOrDelete()) {
+        // We don't want to acquire readlocks during update or delete as we'll be acquiring write
+        // locks instead.
+        continue;
+      }
       LockComponentBuilder compBuilder = new LockComponentBuilder();
       compBuilder.setShared();
 
@@ -297,6 +302,11 @@ public class DbTxnManager extends HiveTx
   }
 
   @Override
+  public boolean supportsAcid() {
+    return true;
+  }
+
+  @Override
   protected void destruct() {
     try {
       if (txnId > 0) rollbackTxn();

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java Sat Sep 13 22:09:31 2014
@@ -48,8 +48,9 @@ class DummyTxnManager extends HiveTxnMan
   private HiveLockManager lockMgr;
 
   @Override
-  public void openTxn(String user) throws LockException {
+  public long openTxn(String user) throws LockException {
     // No-op
+    return 0L;
   }
 
   @Override
@@ -208,6 +209,11 @@ class DummyTxnManager extends HiveTxnMan
     return false;
   }
 
+  @Override
+  public boolean supportsAcid() {
+    return false;
+  }
+
 
   protected void destruct() {
     if (lockMgr != null) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java Sat Sep 13 22:09:31 2014
@@ -32,9 +32,10 @@ public interface HiveTxnManager {
   /**
    * Open a new transaction.
    * @param user Hive user who is opening this transaction.
+   * @return The new transaction id
    * @throws LockException if a transaction is already open.
    */
-  void openTxn(String user) throws LockException;
+  long openTxn(String user) throws LockException;
 
   /**
    * Get the lock manager.  This must be used rather than instantiating an
@@ -120,4 +121,10 @@ public interface HiveTxnManager {
    * @return true if the new format should be used.
    */
   boolean useNewShowLocksFormat();
+
+  /**
+   * Indicate whether this transaction manager supports ACID operations
+   * @return true if this transaction manager does ACID
+   */
+  boolean supportsAcid();
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Sat Sep 13 22:09:31 2014
@@ -96,6 +96,7 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.index.HiveIndexHandler;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils;
 import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
 import org.apache.hadoop.hive.ql.plan.DropTableDesc;
@@ -1227,7 +1228,7 @@ public class Hive {
   public void loadPartition(Path loadPath, String tableName,
       Map<String, String> partSpec, boolean replace, boolean holdDDLTime,
       boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir,
-      boolean isSrcLocal) throws HiveException {
+      boolean isSrcLocal, boolean isAcid) throws HiveException {
     Table tbl = getTable(tableName);
     Path tblDataLocationPath =  tbl.getDataLocation();
     try {
@@ -1275,7 +1276,7 @@ public class Hive {
             isSrcLocal);
       } else {
         FileSystem fs = tbl.getDataLocation().getFileSystem(conf);
-        Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal);
+        Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid);
       }
 
       // recreate the partition if it existed before
@@ -1407,7 +1408,7 @@ private void constructOneLBLocationMap(F
    */
   public ArrayList<LinkedHashMap<String, String>> loadDynamicPartitions(Path loadPath,
       String tableName, Map<String, String> partSpec, boolean replace,
-      int numDP, boolean holdDDLTime, boolean listBucketingEnabled)
+      int numDP, boolean holdDDLTime, boolean listBucketingEnabled, boolean isAcid)
       throws HiveException {
 
     Set<Path> validPartitions = new HashSet<Path>();
@@ -1463,7 +1464,7 @@ private void constructOneLBLocationMap(F
 
         // finally load the partition -- move the file to the final table address
         loadPartition(partPath, tableName, fullPartSpec, replace, holdDDLTime, true,
-            listBucketingEnabled, false);
+            listBucketingEnabled, false, isAcid);
         LOG.info("New loading path = " + partPath + " with partSpec " + fullPartSpec);
       }
       return fullPartSpecs;
@@ -1489,14 +1490,16 @@ private void constructOneLBLocationMap(F
    *          If the source directory is LOCAL
    * @param isSkewedStoreAsSubdir
    *          if list bucketing enabled
+   * @param isAcid true if this is an ACID based write
    */
   public void loadTable(Path loadPath, String tableName, boolean replace,
-      boolean holdDDLTime, boolean isSrcLocal, boolean isSkewedStoreAsSubdir) throws HiveException {
+      boolean holdDDLTime, boolean isSrcLocal, boolean isSkewedStoreAsSubdir, boolean isAcid)
+      throws HiveException {
     Table tbl = getTable(tableName);
     if (replace) {
       tbl.replaceFiles(loadPath, isSrcLocal);
     } else {
-      tbl.copyFiles(loadPath, isSrcLocal);
+      tbl.copyFiles(loadPath, isSrcLocal, isAcid);
     }
 
     try {
@@ -2313,8 +2316,19 @@ private void constructOneLBLocationMap(F
     return success;
   }
 
+  /**
+   * Copy files.  This handles building the mapping for buckets and such between the source and
+   * destination
+   * @param conf Configuration object
+   * @param srcf source directory, if bucketed should contain bucket files
+   * @param destf directory to move files into
+   * @param fs Filesystem
+   * @param isSrcLocal true if source is on local file system
+   * @param isAcid true if this is an ACID based write
+   * @throws HiveException
+   */
   static protected void copyFiles(HiveConf conf, Path srcf, Path destf,
-      FileSystem fs, boolean isSrcLocal) throws HiveException {
+      FileSystem fs, boolean isSrcLocal, boolean isAcid) throws HiveException {
     boolean inheritPerms = HiveConf.getBoolVar(conf,
         HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
     try {
@@ -2342,23 +2356,105 @@ private void constructOneLBLocationMap(F
       return;
       // srcs = new FileStatus[0]; Why is this needed?
     }
+
+    // If we're moving files around for an ACID write then the rules and paths are all different.
+    // You can blame this on Owen.
+    if (isAcid) {
+      moveAcidFiles(srcFs, srcs, destf);
+    } else {
     // check that source and target paths exist
-    List<List<Path[]>> result = checkPaths(conf, fs, srcs, srcFs, destf, false);
-    // move it, move it
-    try {
-      for (List<Path[]> sdpairs : result) {
-        for (Path[] sdpair : sdpairs) {
-          if (!renameFile(conf, sdpair[0], sdpair[1], fs, false, isSrcLocal)) {
-            throw new IOException("Cannot move " + sdpair[0] + " to "
-                + sdpair[1]);
+      List<List<Path[]>> result = checkPaths(conf, fs, srcs, srcFs, destf, false);
+      // move it, move it
+      try {
+        for (List<Path[]> sdpairs : result) {
+          for (Path[] sdpair : sdpairs) {
+            if (!renameFile(conf, sdpair[0], sdpair[1], fs, false, isSrcLocal)) {
+              throw new IOException("Cannot move " + sdpair[0] + " to "
+                  + sdpair[1]);
+            }
+          }
+        }
+      } catch (IOException e) {
+        throw new HiveException("copyFiles: error while moving files!!!", e);
+      }
+    }
+  }
+
+  private static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst)
+      throws HiveException {
+    // The layout for ACID files is table|partname/base|delta/bucket
+    // We will always only be writing delta files.  In the buckets created by FileSinkOperator
+    // it will look like bucket/delta/bucket.  So we need to move that into the above structure.
+    // For the first mover there will be no delta directory, so we can move the whole directory.
+    // For everyone else we will need to just move the buckets under the existing delta
+    // directory.
+
+    Set<Path> createdDeltaDirs = new HashSet<Path>();
+    // Open the original path we've been given and find the list of original buckets
+    for (FileStatus stat : stats) {
+      Path srcPath = stat.getPath();
+
+      LOG.debug("Acid move Looking for original buckets in " + srcPath);
+
+      FileStatus[] origBucketStats = null;
+      try {
+        origBucketStats = fs.listStatus(srcPath, AcidUtils.originalBucketFilter);
+      } catch (IOException e) {
+        String msg = "Unable to look for bucket files in src path " + srcPath.toUri().toString();
+        LOG.error(msg);
+        throw new HiveException(msg, e);
+      }
+      LOG.debug("Acid move found " + origBucketStats.length + " original buckets");
+
+      for (FileStatus origBucketStat : origBucketStats) {
+        Path origBucketPath = origBucketStat.getPath();
+        LOG.debug("Acid move looking for delta files in bucket " + origBucketPath);
+
+        FileStatus[] deltaStats = null;
+        try {
+          deltaStats = fs.listStatus(origBucketPath, AcidUtils.deltaFileFilter);
+        } catch (IOException e) {
+          throw new HiveException("Unable to look for delta files in original bucket " +
+              origBucketPath.toUri().toString(), e);
+        }
+        LOG.debug("Acid move found " + deltaStats.length + " delta files");
+
+        for (FileStatus deltaStat : deltaStats) {
+          Path deltaPath = deltaStat.getPath();
+          // Create the delta directory.  Don't worry if it already exists,
+          // as that likely means another task got to it first.  Then move each of the buckets.
+          // it would be more efficient to try to move the delta with it's buckets but that is
+          // harder to make race condition proof.
+          Path deltaDest = new Path(dst, deltaPath.getName());
+          try {
+            if (!createdDeltaDirs.contains(deltaDest)) {
+              try {
+                fs.mkdirs(deltaDest);
+                createdDeltaDirs.add(deltaDest);
+              } catch (IOException swallowIt) {
+                // Don't worry about this, as it likely just means it's already been created.
+                LOG.info("Unable to create delta directory " + deltaDest +
+                    ", assuming it already exists: " + swallowIt.getMessage());
+              }
+            }
+            FileStatus[] bucketStats = fs.listStatus(deltaPath, AcidUtils.bucketFileFilter);
+            LOG.debug("Acid move found " + bucketStats.length + " bucket files");
+            for (FileStatus bucketStat : bucketStats) {
+              Path bucketSrc = bucketStat.getPath();
+              Path bucketDest = new Path(deltaDest, bucketSrc.getName());
+              LOG.info("Moving bucket " + bucketSrc.toUri().toString() + " to " +
+                  bucketDest.toUri().toString());
+              fs.rename(bucketSrc, bucketDest);
+            }
+          } catch (IOException e) {
+            throw new HiveException("Error moving acid files", e);
           }
         }
       }
-    } catch (IOException e) {
-      throw new HiveException("copyFiles: error while moving files!!!", e);
     }
   }
 
+
   /**
    * Replaces files in the partition with new data set specified by srcf. Works
    * by renaming directory of srcf to the destination file.

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java Sat Sep 13 22:09:31 2014
@@ -659,12 +659,14 @@ public class Table implements Serializab
    *          Files to be moved. Leaf directories or globbed file paths
    * @param isSrcLocal
    *          If the source directory is LOCAL
+   * @param isAcid
+   *          True if this is an ACID based insert, update, or delete
    */
-  protected void copyFiles(Path srcf, boolean isSrcLocal) throws HiveException {
+  protected void copyFiles(Path srcf, boolean isSrcLocal, boolean isAcid) throws HiveException {
     FileSystem fs;
     try {
       fs = getDataLocation().getFileSystem(Hive.get().getConf());
-      Hive.copyFiles(Hive.get().getConf(), srcf, getPath(), fs, isSrcLocal);
+      Hive.copyFiles(Hive.get().getConf(), srcf, getPath(), fs, isSrcLocal, isAcid);
     } catch (IOException e) {
       throw new HiveException("addFiles: filesystem error in check phase", e);
     }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java Sat Sep 13 22:09:31 2014
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.exec.Re
 import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.SelectOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
 import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -371,6 +372,12 @@ public class BucketingSortingReduceSinkO
         return null;
       }
 
+      // Don't do this optimization with updates or deletes
+      if (pGraphContext.getContext().getAcidOperation() == AcidUtils.Operation.UPDATE ||
+          pGraphContext.getContext().getAcidOperation() == AcidUtils.Operation.DELETE){
+        return null;
+      }
+
       // Support for dynamic partitions can be added later
       if (fsOp.getConf().getDynPartCtx() != null) {
         return null;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java Sat Sep 13 22:09:31 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.optimi
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -42,6 +43,7 @@ import org.apache.hadoop.hive.ql.exec.Op
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.RowSchema;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
 import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -173,8 +175,22 @@ public class SortedDynPartitionOptimizer
           destTable.getCols());
       ObjectPair<List<Integer>, List<Integer>> sortOrderPositions = getSortPositionsOrder(
           destTable.getSortCols(), destTable.getCols());
-      List<Integer> sortPositions = sortOrderPositions.getFirst();
-      List<Integer> sortOrder = sortOrderPositions.getSecond();
+      List<Integer> sortPositions = null;
+      List<Integer> sortOrder = null;
+      if (fsOp.getConf().getWriteType() == AcidUtils.Operation.UPDATE ||
+          fsOp.getConf().getWriteType() == AcidUtils.Operation.DELETE) {
+        // When doing updates and deletes we always want to sort on the rowid because the ACID
+        // reader will expect this sort order when doing reads.  So
+        // ignore whatever comes from the table and enforce this sort order instead.
+        sortPositions = Arrays.asList(0);
+        sortOrder = Arrays.asList(1); // 1 means asc, could really use enum here in the thrift if
+      } else {
+        sortPositions = sortOrderPositions.getFirst();
+        sortOrder = sortOrderPositions.getSecond();
+      }
+      LOG.debug("Got sort order");
+      for (int i : sortPositions) LOG.debug("sort position " + i);
+      for (int i : sortOrder) LOG.debug("sort order " + i);
       List<Integer> partitionPositions = getPartitionPositions(dpCtx, fsParent.getSchema());
       List<ColumnInfo> colInfos = parseCtx.getOpParseCtx().get(fsParent).getRowResolver()
           .getColumnInfos();
@@ -198,7 +214,7 @@ public class SortedDynPartitionOptimizer
         colExprMap.put(ci.getInternalName(), newValueCols.get(newValueCols.size() - 1));
       }
       ReduceSinkDesc rsConf = getReduceSinkDesc(partitionPositions, sortPositions, sortOrder,
-          newValueCols, bucketColumns, numBuckets, fsParent);
+          newValueCols, bucketColumns, numBuckets, fsParent, fsOp.getConf().getWriteType());
 
       // Create ReduceSink operator
       ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap(
@@ -319,7 +335,7 @@ public class SortedDynPartitionOptimizer
     public ReduceSinkDesc getReduceSinkDesc(List<Integer> partitionPositions,
         List<Integer> sortPositions, List<Integer> sortOrder, ArrayList<ExprNodeDesc> newValueCols,
         ArrayList<ExprNodeDesc> bucketColumns, int numBuckets,
-        Operator<? extends OperatorDesc> parent) {
+        Operator<? extends OperatorDesc> parent, AcidUtils.Operation writeType) {
 
       // Order of KEY columns
       // 1) Partition columns
@@ -409,7 +425,7 @@ public class SortedDynPartitionOptimizer
       // Number of reducers is set to default (-1)
       ReduceSinkDesc rsConf = new ReduceSinkDesc(newKeyCols, newKeyCols.size(), newValueCols,
           outputKeyCols, distinctColumnIndices, outValColNames, -1, newPartCols, -1, keyTable,
-          valueTable);
+          valueTable, writeType);
       rsConf.setBucketCols(bucketColumns);
       rsConf.setNumBuckets(numBuckets);
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java Sat Sep 13 22:09:31 2014
@@ -31,6 +31,7 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 
 import org.antlr.runtime.tree.CommonTree;
 import org.antlr.runtime.tree.Tree;
@@ -60,6 +61,7 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
@@ -89,6 +91,13 @@ public abstract class BaseSemanticAnalyz
   protected HashMap<String, String> idToTableNameMap;
   protected QueryProperties queryProperties;
 
+  /**
+   * A set of FileSinkOperators being written to in an ACID compliant way.  We need to remember
+   * them here because when we build them we don't yet know the transaction id.  We need to go
+   * back and set it once we actually start running the query.
+   */
+  protected Set<FileSinkDesc> acidFileSinks = new HashSet<FileSinkDesc>();
+
   public static int HIVE_COLUMN_ORDER_ASC = 1;
   public static int HIVE_COLUMN_ORDER_DESC = 0;
 
@@ -943,6 +952,10 @@ public abstract class BaseSemanticAnalyz
     return queryProperties;
   }
 
+  public Set<FileSinkDesc> getAcidFileSinks() {
+    return acidFileSinks;
+  }
+
   /**
    * Construct list bucketing context.
    *

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java Sat Sep 13 22:09:31 2014
@@ -172,6 +172,15 @@ public class QBParseInfo {
     return insertIntoTables.contains(fullName.toLowerCase());
   }
 
+  /**
+   * Check if a table is in the list to be inserted into
+   * @param fullTableName table name in dbname.tablename format
+   * @return
+   */
+  public boolean isInsertIntoTable(String fullTableName) {
+    return insertIntoTables.contains(fullTableName.toLowerCase());
+  }
+
   public HashMap<String, ASTNode> getAggregationExprsForClause(String clause) {
     return destToAggregationExprs.get(clause);
   }