You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ga...@apache.org on 2014/09/14 00:09:33 UTC

svn commit: r1624788 [2/5] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ data/conf/tez/ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/ itests/sr...

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Sat Sep 13 22:09:31 2014
@@ -37,6 +37,8 @@ import java.util.UUID;
 import java.util.regex.Pattern;
 import java.util.regex.PatternSyntaxException;
 
+import org.antlr.runtime.ClassicToken;
+import org.antlr.runtime.Token;
 import org.antlr.runtime.tree.Tree;
 import org.antlr.runtime.tree.TreeWizard;
 import org.antlr.runtime.tree.TreeWizard.ContextVisitor;
@@ -85,6 +87,8 @@ import org.apache.hadoop.hive.ql.exec.Un
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
@@ -209,6 +213,8 @@ public class SemanticAnalyzer extends Ba
   // Max characters when auto generating the column name with func name
   private static final int AUTOGEN_COLALIAS_PRFX_MAXLENGTH = 20;
 
+  private static final String VALUES_TMP_TABLE_NAME_PREFIX = "Values__Tmp__Table__";
+
   private HashMap<TableScanOperator, ExprNodeDesc> opToPartPruner;
   private HashMap<TableScanOperator, PrunedPartitionList> opToPartList;
   private HashMap<String, Operator<? extends OperatorDesc>> topOps;
@@ -680,6 +686,140 @@ public class SemanticAnalyzer extends Ba
     return alias;
   }
 
+  // Generate a temp table out of a value clause
+  private ASTNode genValuesTempTable(ASTNode originalFrom) throws SemanticException {
+    // Pick a name for the table
+    SessionState ss = SessionState.get();
+    String tableName = VALUES_TMP_TABLE_NAME_PREFIX + ss.getNextValuesTempTableSuffix();
+
+    // Step 1, parse the values clause we were handed
+    List<? extends Node> fromChildren = originalFrom.getChildren();
+    // First child should be the virtual table ref
+    ASTNode virtualTableRef = (ASTNode)fromChildren.get(0);
+    assert virtualTableRef.getToken().getType() == HiveParser.TOK_VIRTUAL_TABREF :
+        "Expected first child of TOK_VIRTUAL_TABLE to be TOK_VIRTUAL_TABREF but was " +
+            virtualTableRef.getName();
+
+    List<? extends Node> virtualTableRefChildren = virtualTableRef.getChildren();
+    // First child of this should be the table name.  If it's anonymous,
+    // then we don't have a table name.
+    ASTNode tabName = (ASTNode)virtualTableRefChildren.get(0);
+    if (tabName.getToken().getType() != HiveParser.TOK_ANONYMOUS) {
+      // TODO, if you want to make select ... from (values(...) as foo(...) work,
+      // you need to parse this list of columns names and build it into the table
+      throw new SemanticException(ErrorMsg.VALUES_TABLE_CONSTRUCTOR_NOT_SUPPORTED.getMsg());
+    }
+
+    // The second child of the TOK_VIRTUAL_TABLE should be TOK_VALUES_TABLE
+    ASTNode valuesTable = (ASTNode)fromChildren.get(1);
+    assert valuesTable.getToken().getType() == HiveParser.TOK_VALUES_TABLE :
+        "Expected second child of TOK_VIRTUAL_TABLE to be TOK_VALUE_TABLE but was " +
+            valuesTable.getName();
+    // Each of the children of TOK_VALUES_TABLE will be a TOK_VALUE_ROW
+    List<? extends Node> valuesTableChildren = valuesTable.getChildren();
+
+    // Now that we're going to start reading through the rows, open a file to write the rows too
+    // If we leave this method before creating the temporary table we need to be sure to clean up
+    // this file.
+    Path tablePath = null;
+    FileSystem fs = null;
+    try {
+      tablePath = Warehouse.getDnsPath(new Path(ss.getTempTableSpace(), tableName), conf);
+      fs = tablePath.getFileSystem(conf);
+      fs.mkdirs(tablePath);
+      Path dataFile = new Path(tablePath, "data_file");
+      FSDataOutputStream out = fs.create(dataFile);
+      List<FieldSchema> fields = new ArrayList<FieldSchema>();
+
+      boolean firstRow = true;
+      for (Node n : valuesTableChildren) {
+        ASTNode valuesRow = (ASTNode) n;
+        assert valuesRow.getToken().getType() == HiveParser.TOK_VALUE_ROW :
+            "Expected child of TOK_VALUE_TABLE to be TOK_VALUE_ROW but was " + valuesRow.getName();
+        // Each of the children of this should be a literal
+        List<? extends Node> valuesRowChildren = valuesRow.getChildren();
+        boolean isFirst = true;
+        int nextColNum = 1;
+        for (Node n1 : valuesRowChildren) {
+          ASTNode value = (ASTNode) n1;
+          if (firstRow) {
+            fields.add(new FieldSchema("tmp_values_col" + nextColNum++, "string", ""));
+          }
+          if (isFirst) isFirst = false;
+          else out.writeBytes("\u0001");
+          out.writeBytes(unparseExprForValuesClause(value));
+        }
+        out.writeBytes("\n");
+        firstRow = false;
+      }
+      out.close();
+
+      // Step 2, create a temp table, using the created file as the data
+      StorageFormat format = new StorageFormat(conf);
+      format.processStorageFormat("TextFile");
+      Table table = db.newTable(tableName);
+      table.setSerializationLib(format.getSerde());
+      table.setFields(fields);
+      table.setDataLocation(tablePath);
+      table.getTTable().setTemporary(true);
+      table.setStoredAsSubDirectories(false);
+      table.setInputFormatClass(format.getInputFormat());
+      table.setOutputFormatClass(format.getOutputFormat());
+      db.createTable(table, false);
+    } catch (Exception e) {
+      String errMsg = ErrorMsg.INSERT_CANNOT_CREATE_TEMP_FILE.getMsg() + e.getMessage();
+      LOG.error(errMsg);
+      // Try to delete the file
+      if (fs != null && tablePath != null) {
+        try {
+          fs.delete(tablePath, false);
+        } catch (IOException swallowIt) {}
+      }
+      throw new SemanticException(errMsg, e);
+    }
+
+    // Step 3, return a new subtree with a from clause built around that temp table
+    // The form of the tree is TOK_TABREF->TOK_TABNAME->identifier(tablename)
+    Token t = new ClassicToken(HiveParser.TOK_TABREF);
+    ASTNode tabRef = new ASTNode(t);
+    t = new ClassicToken(HiveParser.TOK_TABNAME);
+    ASTNode tabNameNode = new ASTNode(t);
+    tabRef.addChild(tabNameNode);
+    t = new ClassicToken(HiveParser.Identifier, tableName);
+    ASTNode identifier = new ASTNode(t);
+    tabNameNode.addChild(identifier);
+    return tabRef;
+  }
+
+  // Take an expression in the values clause and turn it back into a string.  This is far from
+  // comprehensive.  At the moment it only supports:
+  // * literals (all types)
+  // * unary negatives
+  // * true/false
+  private String unparseExprForValuesClause(ASTNode expr) throws SemanticException {
+    switch (expr.getToken().getType()) {
+      case HiveParser.Number:
+        return expr.getText();
+
+      case HiveParser.StringLiteral:
+        return PlanUtils.stripQuotes(expr.getText());
+
+      case HiveParser.KW_FALSE:
+        return "FALSE";
+
+      case HiveParser.KW_TRUE:
+        return "TRUE";
+
+      case HiveParser.MINUS:
+        return "-" + unparseExprForValuesClause((ASTNode)expr.getChildren().get(0));
+
+      default:
+        throw new SemanticException("Expression of type " + expr.getText() +
+            " not supported in insert/values");
+    }
+
+  }
+
   private void assertCombineInputFormat(Tree numerator, String message) throws SemanticException {
     String inputFormat = conf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") ?
       HiveConf.getVar(conf, HiveConf.ConfVars.HIVETEZINPUTFORMAT):
@@ -997,7 +1137,11 @@ public class SemanticAnalyzer extends Ba
         if (frm.getToken().getType() == HiveParser.TOK_TABREF) {
           processTable(qb, frm);
         } else if (frm.getToken().getType() == HiveParser.TOK_VIRTUAL_TABLE) {
-          throw new RuntimeException("VALUES() clause is not fully supported yet...");
+          // Create a temp table with the passed values in it then rewrite this portion of the
+          // tree to be from that table.
+          ASTNode newFrom = genValuesTempTable(frm);
+          ast.setChild(0, newFrom);
+          processTable(qb, newFrom);
         } else if (frm.getToken().getType() == HiveParser.TOK_SUBQUERY) {
           processSubQuery(qb, frm);
         } else if (frm.getToken().getType() == HiveParser.TOK_LATERAL_VIEW ||
@@ -1190,10 +1334,6 @@ public class SemanticAnalyzer extends Ba
       case HiveParser.TOK_CTE:
         processCTE(qb, ast);
         break;
-      case HiveParser.TOK_DELETE_FROM:
-        throw new RuntimeException("DELETE is not (yet) implemented...");
-      case HiveParser.TOK_UPDATE_TABLE:
-        throw new RuntimeException("UPDATE is not (yet) implemented...");
       default:
         skipRecursion = false;
         break;
@@ -1280,7 +1420,7 @@ public class SemanticAnalyzer extends Ba
 
         // Disallow INSERT INTO on bucketized tables
         if (qb.getParseInfo().isInsertIntoTable(tab.getDbName(), tab.getTableName()) &&
-            tab.getNumBuckets() > 0) {
+            tab.getNumBuckets() > 0 && !isAcidTable(tab)) {
           throw new SemanticException(ErrorMsg.INSERT_INTO_BUCKETIZED_TABLE.
               getMsg("Table: " + tab_name));
         }
@@ -4226,7 +4366,7 @@ public class SemanticAnalyzer extends Ba
                 groupingSetsPresent ? keyLength + 1 : keyLength,
                 reduceValues, distinctColIndices,
                 outputKeyColumnNames, outputValueColumnNames, true, -1, numPartitionFields,
-                numReducers),
+                numReducers, AcidUtils.Operation.NOT_ACID),
             new RowSchema(reduceSinkOutputRowResolver.getColumnInfos()), inputOperatorInfo),
         reduceSinkOutputRowResolver);
     rsOp.setColumnExprMap(colExprMap);
@@ -4429,7 +4569,7 @@ public class SemanticAnalyzer extends Ba
     }
     ReduceSinkDesc rsDesc = PlanUtils.getReduceSinkDesc(reduceKeys, keyLength, reduceValues,
         distinctColIndices, outputKeyColumnNames, outputValueColumnNames,
-        true, -1, keyLength, numReducers);
+        true, -1, keyLength, numReducers, AcidUtils.Operation.NOT_ACID);
 
     ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap(
         OperatorFactory.getAndMakeChild(rsDesc, new RowSchema(reduceSinkOutputRowResolver
@@ -4544,8 +4684,8 @@ public class SemanticAnalyzer extends Ba
     ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap(
         OperatorFactory.getAndMakeChild(PlanUtils.getReduceSinkDesc(reduceKeys,
             reduceValues, outputColumnNames, true, -1, numPartitionFields,
-            numReducers), new RowSchema(reduceSinkOutputRowResolver2
-            .getColumnInfos()), groupByOperatorInfo),
+            numReducers, AcidUtils.Operation.NOT_ACID),
+            new RowSchema(reduceSinkOutputRowResolver2.getColumnInfos()), groupByOperatorInfo),
         reduceSinkOutputRowResolver2);
 
     rsOp.setColumnExprMap(colExprMap);
@@ -5529,9 +5669,14 @@ public class SemanticAnalyzer extends Ba
     if ((dest_tab.getNumBuckets() > 0) &&
         (conf.getBoolVar(HiveConf.ConfVars.HIVEENFORCEBUCKETING))) {
       enforceBucketing = true;
-      partnCols = getPartitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input, true);
-      partnColsNoConvert = getPartitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input,
-          false);
+      if (updating() || deleting()) {
+        partnCols = getPartitionColsFromBucketColsForUpdateDelete(input, true);
+        partnColsNoConvert = getPartitionColsFromBucketColsForUpdateDelete(input, false);
+      } else {
+        partnCols = getPartitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input, true);
+        partnColsNoConvert = getPartitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input,
+            false);
+      }
     }
 
     if ((dest_tab.getSortCols() != null) &&
@@ -5553,6 +5698,7 @@ public class SemanticAnalyzer extends Ba
       }
       int numBuckets = dest_tab.getNumBuckets();
       if (numBuckets > maxReducers) {
+        LOG.debug("XXXXXX numBuckets is " + numBuckets + " and maxReducers is " + maxReducers);
         multiFileSpray = true;
         totalFiles = numBuckets;
         if (totalFiles % maxReducers == 0) {
@@ -5718,7 +5864,11 @@ public class SemanticAnalyzer extends Ba
       // Create the work for moving the table
       // NOTE: specify Dynamic partitions in dest_tab for WriteEntity
       if (!isNonNativeTable) {
-        ltd = new LoadTableDesc(queryTmpdir,table_desc, dpCtx);
+        AcidUtils.Operation acidOp = getAcidType(table_desc.getOutputFileFormatClass());
+        if (acidOp != AcidUtils.Operation.NOT_ACID) {
+          checkIfAcidAndOverwriting(qb, table_desc);
+        }
+        ltd = new LoadTableDesc(queryTmpdir,table_desc, dpCtx, acidOp);
         ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),
             dest_tab.getTableName()));
         ltd.setLbCtx(lbCtx);
@@ -5821,7 +5971,11 @@ public class SemanticAnalyzer extends Ba
       lbCtx = constructListBucketingCtx(dest_part.getSkewedColNames(),
           dest_part.getSkewedColValues(), dest_part.getSkewedColValueLocationMaps(),
           dest_part.isStoredAsSubDirectories(), conf);
-      ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec());
+      AcidUtils.Operation acidOp = getAcidType(table_desc.getOutputFileFormatClass());
+      if (acidOp != AcidUtils.Operation.NOT_ACID) {
+        checkIfAcidAndOverwriting(qb, table_desc);
+      }
+      ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp);
       ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),
           dest_tab.getTableName()));
       ltd.setLbCtx(lbCtx);
@@ -5973,18 +6127,25 @@ public class SemanticAnalyzer extends Ba
 
     ArrayList<ColumnInfo> vecCol = new ArrayList<ColumnInfo>();
 
-    try {
-      StructObjectInspector rowObjectInspector = (StructObjectInspector) table_desc
-          .getDeserializer().getObjectInspector();
-      List<? extends StructField> fields = rowObjectInspector
-          .getAllStructFieldRefs();
-      for (int i = 0; i < fields.size(); i++) {
-        vecCol.add(new ColumnInfo(fields.get(i).getFieldName(), TypeInfoUtils
-            .getTypeInfoFromObjectInspector(fields.get(i)
-            .getFieldObjectInspector()), "", false));
+    if (updating() || deleting()) {
+      vecCol.add(new ColumnInfo(VirtualColumn.ROWID.getName(),
+          //TypeInfoUtils.getTypeInfoFromObjectInspector(VirtualColumn.ROWID.getObjectInspector()),
+          VirtualColumn.ROWID.getTypeInfo(),
+          "", true));
+    } else {
+      try {
+        StructObjectInspector rowObjectInspector = (StructObjectInspector) table_desc
+            .getDeserializer().getObjectInspector();
+        List<? extends StructField> fields = rowObjectInspector
+            .getAllStructFieldRefs();
+        for (int i = 0; i < fields.size(); i++) {
+          vecCol.add(new ColumnInfo(fields.get(i).getFieldName(), TypeInfoUtils
+              .getTypeInfoFromObjectInspector(fields.get(i)
+                  .getFieldObjectInspector()), "", false));
+        }
+      } catch (Exception e) {
+        throw new SemanticException(e.getMessage(), e);
       }
-    } catch (Exception e) {
-      throw new SemanticException(e.getMessage(), e);
     }
 
     RowSchema fsRS = new RowSchema(vecCol);
@@ -5997,6 +6158,10 @@ public class SemanticAnalyzer extends Ba
         (dest_tab.getSortCols() != null && dest_tab.getSortCols().size() > 0 &&
         conf.getBoolVar(HiveConf.ConfVars.HIVEENFORCESORTING))));
 
+    // If this table is working with ACID semantics, turn off merging
+    boolean acidTable = isAcidTable(dest_tab);
+    canBeMerged &= !acidTable;
+
     FileSinkDesc fileSinkDesc = new FileSinkDesc(
       queryTmpdir,
       table_desc,
@@ -6009,6 +6174,15 @@ public class SemanticAnalyzer extends Ba
       rsCtx.getPartnCols(),
       dpCtx);
 
+    // If this is an insert, update, or delete on an ACID table then mark that so the
+    // FileSinkOperator knows how to properly write to it.
+    if (acidTable) {
+      AcidUtils.Operation wt = updating() ? AcidUtils.Operation.UPDATE :
+          (deleting() ? AcidUtils.Operation.DELETE : AcidUtils.Operation.INSERT);
+      fileSinkDesc.setWriteType(wt);
+      acidFileSinks.add(fileSinkDesc);
+    }
+
     /* Set List Bucketing context. */
     if (lbCtx != null) {
       lbCtx.processRowSkewedIndex(fsRS);
@@ -6059,6 +6233,17 @@ public class SemanticAnalyzer extends Ba
     return output;
   }
 
+  // Check if we are overwriting any tables.  If so, throw an exception as that is not allowed
+  // when using an Acid compliant txn manager and operating on an acid table.
+  private void checkIfAcidAndOverwriting(QB qb, TableDesc tableDesc) throws SemanticException {
+    String tableName = tableDesc.getTableName();
+    if (!qb.getParseInfo().isInsertIntoTable(tableName)) {
+      LOG.debug("Couldn't find table " + tableName + " in insertIntoTable");
+      throw new SemanticException(ErrorMsg.NO_INSERT_OVERWRITE_WITH_ACID.getMsg());
+    }
+
+  }
+
   /**
    * Generate the conversion SelectOperator that converts the columns into the
    * types that are expected by the table_desc.
@@ -6086,16 +6271,34 @@ public class SemanticAnalyzer extends Ba
       outColumnCnt += dpCtx.getNumDPCols();
     }
 
-    if (inColumnCnt != outColumnCnt) {
-      String reason = "Table " + dest + " has " + outColumnCnt
-          + " columns, but query has " + inColumnCnt + " columns.";
-      throw new SemanticException(ErrorMsg.TARGET_TABLE_COLUMN_MISMATCH.getMsg(
-          qb.getParseInfo().getDestForClause(dest), reason));
-    } else if (dynPart && dpCtx != null) {
-      // create the mapping from input ExprNode to dest table DP column
-      dpCtx.mapInputToDP(rowFields.subList(tableFields.size(), rowFields.size()));
+    if (deleting()) {
+      // Figure out if we have partition columns in the list or not.  If so,
+      // add them into the mapping.  Partition columns will be located after the row id.
+      if (rowFields.size() > 1) {
+        // This means we have partition columns to deal with, so set up the mapping from the
+        // input to the partition columns.
+        dpCtx.mapInputToDP(rowFields.subList(1, rowFields.size()));
+      }
+    } else if (updating()) {
+      // In this case we expect the number of in fields to exceed the number of out fields by one
+      // (for the ROW__ID virtual column).  If there are more columns than this,
+      // then the extras are for dynamic partitioning
+      if (dynPart && dpCtx != null) {
+        dpCtx.mapInputToDP(rowFields.subList(tableFields.size() + 1, rowFields.size()));
+      }
+    } else {
+      if (inColumnCnt != outColumnCnt) {
+        String reason = "Table " + dest + " has " + outColumnCnt
+            + " columns, but query has " + inColumnCnt + " columns.";
+        throw new SemanticException(ErrorMsg.TARGET_TABLE_COLUMN_MISMATCH.getMsg(
+            qb.getParseInfo().getDestForClause(dest), reason));
+      } else if (dynPart && dpCtx != null) {
+        // create the mapping from input ExprNode to dest table DP column
+        dpCtx.mapInputToDP(rowFields.subList(tableFields.size(), rowFields.size()));
+      }
     }
 
+
     // Check column types
     boolean converted = false;
     int columnNumber = tableFields.size();
@@ -6107,17 +6310,26 @@ public class SemanticAnalyzer extends Ba
         MetadataTypedColumnsetSerDe.class);
     boolean isLazySimpleSerDe = table_desc.getDeserializerClass().equals(
         LazySimpleSerDe.class);
-    if (!isMetaDataSerDe) {
+    if (!isMetaDataSerDe && !deleting()) {
+
+      // If we're updating, add the ROW__ID expression, then make the following column accesses
+      // offset by 1 so that we don't try to convert the ROW__ID
+      if (updating()) {
+        expressions.add(new ExprNodeColumnDesc(rowFields.get(0).getType(),
+            rowFields.get(0).getInternalName(), "", true));
+      }
 
       // here only deals with non-partition columns. We deal with partition columns next
       for (int i = 0; i < columnNumber; i++) {
+        int rowFieldsOffset = updating() ? i + 1 : i;
         ObjectInspector tableFieldOI = tableFields.get(i)
             .getFieldObjectInspector();
         TypeInfo tableFieldTypeInfo = TypeInfoUtils
             .getTypeInfoFromObjectInspector(tableFieldOI);
-        TypeInfo rowFieldTypeInfo = rowFields.get(i).getType();
+        TypeInfo rowFieldTypeInfo = rowFields.get(rowFieldsOffset).getType();
         ExprNodeDesc column = new ExprNodeColumnDesc(rowFieldTypeInfo,
-            rowFields.get(i).getInternalName(), "", false, rowFields.get(i).isSkewedCol());
+            rowFields.get(rowFieldsOffset).getInternalName(), "", false,
+            rowFields.get(rowFieldsOffset).isSkewedCol());
         // LazySimpleSerDe can convert any types to String type using
         // JSON-format.
         if (!tableFieldTypeInfo.equals(rowFieldTypeInfo)
@@ -6147,7 +6359,7 @@ public class SemanticAnalyzer extends Ba
     // deal with dynamic partition columns: convert ExprNodeDesc type to String??
     if (dynPart && dpCtx != null && dpCtx.getNumDPCols() > 0) {
       // DP columns starts with tableFields.size()
-      for (int i = tableFields.size(); i < rowFields.size(); ++i) {
+      for (int i = tableFields.size() + (updating() ? 1 : 0); i < rowFields.size(); ++i) {
         TypeInfo rowFieldTypeInfo = rowFields.get(i).getType();
         ExprNodeDesc column = new ExprNodeColumnDesc(
             rowFieldTypeInfo, rowFields.get(i).getInternalName(), "", false);
@@ -6341,6 +6553,27 @@ public class SemanticAnalyzer extends Ba
     return genConvertCol(dest, qb, tab, table_desc, input, posns, convert);
   }
 
+  // We have to set up the bucketing columns differently for update and deletes,
+  // as it is always using the ROW__ID column.
+  private ArrayList<ExprNodeDesc> getPartitionColsFromBucketColsForUpdateDelete(
+      Operator input, boolean convert) throws SemanticException {
+    //return genConvertCol(dest, qb, tab, table_desc, input, Arrays.asList(0), convert);
+    // In the case of update and delete the bucketing column is always the first column,
+    // and it isn't in the table info.  So rather than asking the table for it,
+    // we'll construct it ourself and send it back.  This is based on the work done in
+    // genConvertCol below.
+    ColumnInfo rowField = opParseCtx.get(input).getRowResolver().getColumnInfos().get(0);
+    TypeInfo rowFieldTypeInfo = rowField.getType();
+    ExprNodeDesc column = new ExprNodeColumnDesc(rowFieldTypeInfo, rowField.getInternalName(),
+        rowField.getTabAlias(), true);
+    if (convert) {
+      column = ParseUtils.createConversionCast(column, TypeInfoFactory.intTypeInfo);
+    }
+    ArrayList<ExprNodeDesc> rlist = new ArrayList<ExprNodeDesc>(1);
+    rlist.add(column);
+    return rlist;
+  }
+
   private ArrayList<ExprNodeDesc> genConvertCol(String dest, QB qb, Table tab,
       TableDesc table_desc, Operator input, List<Integer> posns, boolean convert)
       throws SemanticException {
@@ -6463,9 +6696,11 @@ public class SemanticAnalyzer extends Ba
       order.append(sortOrder == BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC ? '+' : '-');
     }
 
+    AcidUtils.Operation acidOp = (isAcidTable(tab) ? getAcidType() : AcidUtils.Operation.NOT_ACID);
+
     Operator interim = putOpInsertMap(OperatorFactory.getAndMakeChild(PlanUtils
         .getReduceSinkDesc(sortCols, valueCols, outputColumns, false, -1,
-            partitionCols, order.toString(), numReducers),
+            partitionCols, order.toString(), numReducers, acidOp),
         new RowSchema(inputRR.getColumnInfos()), input), inputRR);
     interim.setColumnExprMap(colExprMap);
     reduceSinkOperatorsAddedByEnforceBucketingSorting.add((ReduceSinkOperator) interim);
@@ -6621,8 +6856,9 @@ public class SemanticAnalyzer extends Ba
 
     dummy.setParentOperators(null);
 
+    // TODO Not 100% sure NOT_ACID is always right here.
     ReduceSinkDesc rsdesc = PlanUtils.getReduceSinkDesc(sortCols, valueCols, outputColumns,
-        false, -1, partitionCols, order.toString(), numReducers);
+        false, -1, partitionCols, order.toString(), numReducers, AcidUtils.Operation.NOT_ACID);
     Operator interim = putOpInsertMap(OperatorFactory.getAndMakeChild(rsdesc,
         new RowSchema(rsRR.getColumnInfos()), input), rsRR);
 
@@ -6887,7 +7123,7 @@ public class SemanticAnalyzer extends Ba
 
     ReduceSinkDesc rsDesc = PlanUtils.getReduceSinkDesc(reduceKeys,
         reduceValues, outputColumns, false, tag,
-        reduceKeys.size(), numReds);
+        reduceKeys.size(), numReds, AcidUtils.Operation.NOT_ACID);
 
     ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap(
         OperatorFactory.getAndMakeChild(rsDesc, new RowSchema(outputRR
@@ -8075,7 +8311,8 @@ public class SemanticAnalyzer extends Ba
 
     ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap(
         OperatorFactory.getAndMakeChild(PlanUtils.getReduceSinkDesc(reduceKeys,
-            reduceValues, outputColumnNames, true, -1, reduceKeys.size(), -1),
+            reduceValues, outputColumnNames, true, -1, reduceKeys.size(), -1,
+                AcidUtils.Operation.NOT_ACID),
             new RowSchema(reduceSinkOutputRowResolver.getColumnInfos()), input),
         reduceSinkOutputRowResolver);
 
@@ -11375,7 +11612,7 @@ public class SemanticAnalyzer extends Ba
       input = putOpInsertMap(OperatorFactory.getAndMakeChild(PlanUtils
           .getReduceSinkDesc(orderCols,
               valueCols, outputColumnNames, false,
-              -1, partCols, orderString.toString(), -1),
+              -1, partCols, orderString.toString(), -1, AcidUtils.Operation.NOT_ACID),
           new RowSchema(rsOpRR.getColumnInfos()), input), rsOpRR);
       input.setColumnExprMap(colExprMap);
     }
@@ -11500,7 +11737,7 @@ public class SemanticAnalyzer extends Ba
     input = putOpInsertMap(OperatorFactory.getAndMakeChild(PlanUtils
         .getReduceSinkDesc(orderCols,
             valueCols, outputColumnNames, false,
-            -1, partCols, orderString.toString(), -1),
+            -1, partCols, orderString.toString(), -1, AcidUtils.Operation.NOT_ACID),
         new RowSchema(rsNewRR.getColumnInfos()), input), rsNewRR);
     input.setColumnExprMap(colExprMap);
 
@@ -11657,4 +11894,49 @@ public class SemanticAnalyzer extends Ba
     else return (ltd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE :
         WriteEntity.WriteType.INSERT);
   }
+
+  // Even if the table is of Acid type, if we aren't working with an Acid compliant TxnManager
+  // then return false.
+  private boolean isAcidTable(Table tab) {
+    if (tab == null || tab.getOutputFormatClass() == null) return false;
+    if (!SessionState.get().getTxnMgr().supportsAcid()) return false;
+    return isAcidOutputFormat(tab.getOutputFormatClass());
+  }
+
+  private boolean isAcidOutputFormat(Class<? extends HiveOutputFormat> of) {
+    Class<?>[] interfaces = of.getInterfaces();
+    for (Class<?> iface : interfaces) {
+      if (iface.equals(AcidOutputFormat.class)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  // Note that this method assumes you have already decided this is an Acid table.  It cannot
+  // figure out if a table is Acid or not.
+  private AcidUtils.Operation getAcidType() {
+    return deleting() ? AcidUtils.Operation.DELETE :
+        (updating() ? AcidUtils.Operation.UPDATE :
+            AcidUtils.Operation.INSERT);
+  }
+
+  private AcidUtils.Operation getAcidType(Class<? extends HiveOutputFormat> of) {
+    if (SessionState.get() == null || !SessionState.get().getTxnMgr().supportsAcid()) {
+      return AcidUtils.Operation.NOT_ACID;
+    } else if (isAcidOutputFormat(of)) {
+      return getAcidType();
+    } else {
+      return AcidUtils.Operation.NOT_ACID;
+    }
+  }
+
+  protected boolean updating() {
+    return false;
+  }
+
+  protected boolean deleting() {
+    return false;
+  }
+
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java Sat Sep 13 22:09:31 2014
@@ -268,6 +268,11 @@ public final class SemanticAnalyzerFacto
       case HiveParser.TOK_CREATEMACRO:
       case HiveParser.TOK_DROPMACRO:
         return new MacroSemanticAnalyzer(conf);
+
+      case HiveParser.TOK_UPDATE_TABLE:
+      case HiveParser.TOK_DELETE_FROM:
+        return new UpdateDeleteSemanticAnalyzer(conf);
+
       default:
         return new SemanticAnalyzer(conf);
       }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/StorageFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/StorageFormat.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/StorageFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/StorageFormat.java Sat Sep 13 22:09:31 2014
@@ -80,7 +80,7 @@ public class StorageFormat {
     return true;
   }
 
-  private void processStorageFormat(String name) throws SemanticException {
+  protected void processStorageFormat(String name) throws SemanticException {
     if (name.isEmpty()) {
       throw new SemanticException("File format in STORED AS clause cannot be empty");
     }

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java?rev=1624788&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java Sat Sep 13 22:09:31 2014
@@ -0,0 +1,345 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.hooks.Entity;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+
+/**
+ * A subclass of the {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer} that just handles
+ * update and delete statements.  It works by rewriting the updates and deletes into insert
+ * statements (since they are actually inserts) and then doing some patch up to make them work as
+ * updates and deletes instead.
+ */
+public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
+
+  boolean useSuper = false;
+
+  public UpdateDeleteSemanticAnalyzer(HiveConf conf) throws SemanticException {
+    super(conf);
+  }
+
+  @Override
+  public void analyzeInternal(ASTNode tree) throws SemanticException {
+    if (useSuper) {
+      super.analyzeInternal(tree);
+    } else {
+
+      if (!SessionState.get().getTxnMgr().supportsAcid()) {
+        throw new SemanticException(ErrorMsg.ACID_OP_ON_NONACID_TXNMGR.getMsg());
+      }
+      switch (tree.getToken().getType()) {
+        case HiveParser.TOK_DELETE_FROM:
+          analyzeDelete(tree);
+          return;
+
+        case HiveParser.TOK_UPDATE_TABLE:
+          analyzeUpdate(tree);
+          return;
+
+        default:
+          throw new RuntimeException("Asked to parse token " + tree.getName() + " in " +
+              "UpdateDeleteSemanticAnalyzer");
+      }
+    }
+  }
+
+  @Override
+  protected boolean updating() {
+    return ctx.getAcidOperation() == AcidUtils.Operation.UPDATE;
+  }
+
+  @Override
+  protected boolean deleting() {
+    return ctx.getAcidOperation() == AcidUtils.Operation.DELETE;
+  }
+
+  private void analyzeUpdate(ASTNode tree) throws SemanticException {
+    ctx.setAcidOperation(AcidUtils.Operation.UPDATE);
+    reparseAndSuperAnalyze(tree);
+  }
+
+  private void analyzeDelete(ASTNode tree) throws SemanticException {
+    ctx.setAcidOperation(AcidUtils.Operation.DELETE);
+    reparseAndSuperAnalyze(tree);
+  }
+
+  private void reparseAndSuperAnalyze(ASTNode tree) throws SemanticException {
+    List<? extends Node> children = tree.getChildren();
+    // The first child should be the table we are deleting from
+    ASTNode tabName = (ASTNode)children.get(0);
+    assert tabName.getToken().getType() == HiveParser.TOK_TABNAME :
+        "Expected tablename as first child of " + operation() + " but found " + tabName.getName();
+    String[] tableName = getQualifiedTableName(tabName);
+
+    // Rewrite the delete or update into an insert.  Crazy, but it works as deletes and update
+    // actually are inserts into the delta file in Hive.  A delete
+    // DELETE FROM _tablename_ [WHERE ...]
+    // will be rewritten as
+    // INSERT INTO TABLE _tablename_ [PARTITION (_partcols_)] SELECT ROW__ID[,
+    // _partcols_] from _tablename_ SORT BY ROW__ID
+    // An update
+    // UPDATE _tablename_ SET x = _expr_ [WHERE...]
+    // will be rewritten as
+    // INSERT INTO TABLE _tablename_ [PARTITION (_partcols_)] SELECT _all_,
+    // _partcols_from _tablename_ SORT BY ROW__ID
+    // where _all_ is all the non-partition columns.  The expressions from the set clause will be
+    // re-attached later.
+    // The where clause will also be re-attached later.
+    // The sort by clause is put in there so that records come out in the right order to enable
+    // merge on read.
+
+    StringBuilder rewrittenQueryStr = new StringBuilder();
+    Table mTable;
+    try {
+      mTable = db.getTable(tableName[0], tableName[1]);
+    } catch (HiveException e) {
+      throw new SemanticException(ErrorMsg.UPDATEDELETE_PARSE_ERROR.getMsg(), e);
+    }
+    List<FieldSchema> partCols = mTable.getPartCols();
+
+    rewrittenQueryStr.append("insert into table ");
+    rewrittenQueryStr.append(getDotName(tableName));
+
+    // If the table is partitioned we have to put the partition() clause in
+    if (partCols != null && partCols.size() > 0) {
+      rewrittenQueryStr.append(" partition (");
+      boolean first = true;
+      for (FieldSchema fschema : partCols) {
+        if (first) first = false;
+        else rewrittenQueryStr.append(", ");
+        rewrittenQueryStr.append(fschema.getName());
+      }
+      rewrittenQueryStr.append(")");
+    }
+
+    rewrittenQueryStr.append(" select ROW__ID");
+    Map<Integer, ASTNode> setColExprs = null;
+    if (updating()) {
+      // An update needs to select all of the columns, as we rewrite the entire row.  Also,
+      // we need to figure out which columns we are going to replace.  We won't write the set
+      // expressions in the rewritten query.  We'll patch that up later.
+      // The set list from update should be the second child (index 1)
+      assert children.size() >= 2 : "Expected update token to have at least two children";
+      ASTNode setClause = (ASTNode)children.get(1);
+      assert setClause.getToken().getType() == HiveParser.TOK_SET_COLUMNS_CLAUSE :
+          "Expected second child of update token to be set token";
+
+      // Get the children of the set clause, each of which should be a column assignment
+      List<? extends Node> assignments = setClause.getChildren();
+      Map<String, ASTNode> setCols = new HashMap<String, ASTNode>(assignments.size());
+      setColExprs = new HashMap<Integer, ASTNode>(assignments.size());
+      for (Node a : assignments) {
+        ASTNode assignment = (ASTNode)a;
+        assert assignment.getToken().getType() == HiveParser.EQUAL :
+            "Expected set assignments to use equals operator but found " + assignment.getName();
+        ASTNode tableOrColTok = (ASTNode)assignment.getChildren().get(0);
+        assert tableOrColTok.getToken().getType() == HiveParser.TOK_TABLE_OR_COL :
+            "Expected left side of assignment to be table or column";
+        ASTNode colName = (ASTNode)tableOrColTok.getChildren().get(0);
+        assert colName.getToken().getType() == HiveParser.Identifier :
+            "Expected column name";
+
+        String columnName = colName.getText();
+
+        // Make sure this isn't one of the partitioning columns, that's not supported.
+        if (partCols != null) {
+          for (FieldSchema fschema : partCols) {
+            if (fschema.getName().equalsIgnoreCase(columnName)) {
+              throw new SemanticException(ErrorMsg.UPDATE_CANNOT_UPDATE_PART_VALUE.getMsg());
+            }
+          }
+        }
+
+        // This means that in UPDATE T SET x = _something_
+        // _something_ can be whatever is supported in SELECT _something_
+        setCols.put(columnName, (ASTNode)assignment.getChildren().get(1));
+      }
+
+      List<FieldSchema> nonPartCols = mTable.getCols();
+      for (int i = 0; i < nonPartCols.size(); i++) {
+        rewrittenQueryStr.append(',');
+        String name = nonPartCols.get(i).getName();
+        ASTNode setCol = setCols.get(name);
+        rewrittenQueryStr.append(name);
+        if (setCol != null) {
+          // This is one of the columns we're setting, record it's position so we can come back
+          // later and patch it up.
+          // Add one to the index because the select has the ROW__ID as the first column.
+          setColExprs.put(i + 1, setCol);
+        }
+      }
+    }
+
+    // If the table is partitioned, we need to select the partition columns as well.
+    if (partCols != null) {
+      for (FieldSchema fschema : partCols) {
+        rewrittenQueryStr.append(", ");
+        rewrittenQueryStr.append(fschema.getName());
+      }
+    }
+    rewrittenQueryStr.append(" from ");
+    rewrittenQueryStr.append(getDotName(tableName));
+
+    ASTNode where = null;
+    int whereIndex = deleting() ? 1 : 2;
+    if (children.size() > whereIndex) {
+      where = (ASTNode)children.get(whereIndex);
+      assert where.getToken().getType() == HiveParser.TOK_WHERE :
+          "Expected where clause, but found " + where.getName();
+    }
+
+    // Add a sort by clause so that the row ids come out in the correct order
+    rewrittenQueryStr.append(" sort by ROW__ID desc ");
+
+    // Parse the rewritten query string
+    Context rewrittenCtx;
+    try {
+      // Set dynamic partitioning to nonstrict so that queries do not need any partition
+      // references.
+      HiveConf.setVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+      rewrittenCtx = new Context(conf);
+    } catch (IOException e) {
+      throw new SemanticException(ErrorMsg.UPDATEDELETE_IO_ERROR.getMsg());
+    }
+    rewrittenCtx.setCmd(rewrittenQueryStr.toString());
+    rewrittenCtx.setAcidOperation(ctx.getAcidOperation());
+
+    ParseDriver pd = new ParseDriver();
+    ASTNode rewrittenTree;
+    try {
+      LOG.info("Going to reparse " + operation() + " as <" + rewrittenQueryStr.toString() + ">");
+      rewrittenTree = pd.parse(rewrittenQueryStr.toString(), rewrittenCtx);
+      rewrittenTree = ParseUtils.findRootNonNullToken(rewrittenTree);
+
+    } catch (ParseException e) {
+      throw new SemanticException(ErrorMsg.UPDATEDELETE_PARSE_ERROR.getMsg(), e);
+    }
+
+    ASTNode rewrittenInsert = (ASTNode)rewrittenTree.getChildren().get(1);
+    assert rewrittenInsert.getToken().getType() == HiveParser.TOK_INSERT :
+        "Expected TOK_INSERT as second child of TOK_QUERY but found " + rewrittenInsert.getName();
+
+    if (where != null) {
+      // The structure of the AST for the rewritten insert statement is:
+      // TOK_QUERY -> TOK_FROM
+      //          \-> TOK_INSERT -> TOK_INSERT_INTO
+      //                        \-> TOK_SELECT
+      //                        \-> TOK_SORTBY
+      // The following adds the TOK_WHERE and its subtree from the original query as a child of
+      // TOK_INSERT, which is where it would have landed if it had been there originally in the
+      // string.  We do it this way because it's easy then turning the original AST back into a
+      // string and reparsing it.  We have to move the SORT_BY over one,
+      // so grab it and then push it to the second slot, and put the where in the first slot
+      ASTNode sortBy = (ASTNode)rewrittenInsert.getChildren().get(2);
+      assert sortBy.getToken().getType() == HiveParser.TOK_SORTBY :
+          "Expected TOK_SORTBY to be first child of TOK_SELECT, but found " + sortBy.getName();
+      rewrittenInsert.addChild(sortBy);
+      rewrittenInsert.setChild(2, where);
+    }
+
+    // Patch up the projection list for updates, putting back the original set expressions.
+    if (updating() && setColExprs != null) {
+      // Walk through the projection list and replace the column names with the
+      // expressions from the original update.  Under the TOK_SELECT (see above) the structure
+      // looks like:
+      // TOK_SELECT -> TOK_SELEXPR -> expr
+      //           \-> TOK_SELEXPR -> expr ...
+      ASTNode rewrittenSelect = (ASTNode)rewrittenInsert.getChildren().get(1);
+      assert rewrittenSelect.getToken().getType() == HiveParser.TOK_SELECT :
+          "Expected TOK_SELECT as second child of TOK_INSERT but found " +
+              rewrittenSelect.getName();
+      for (Map.Entry<Integer, ASTNode> entry : setColExprs.entrySet()) {
+        ASTNode selExpr = (ASTNode)rewrittenSelect.getChildren().get(entry.getKey());
+        assert selExpr.getToken().getType() == HiveParser.TOK_SELEXPR :
+            "Expected child of TOK_SELECT to be TOK_SELEXPR but was " + selExpr.getName();
+        // Now, change it's child
+        selExpr.setChild(0, entry.getValue());
+      }
+    }
+
+    try {
+      useSuper = true;
+      super.analyze(rewrittenTree, rewrittenCtx);
+    } finally {
+      useSuper = false;
+    }
+
+    // Walk through all our inputs and set them to note that this read is part of an update or a
+    // delete.
+    for (ReadEntity input : inputs) {
+      input.setUpdateOrDelete(true);
+    }
+
+    if (inputIsPartitioned(inputs)) {
+      // In order to avoid locking the entire write table we need to replace the single WriteEntity
+      // with a WriteEntity for each partition
+      outputs.clear();
+      for (ReadEntity input : inputs) {
+        if (input.getTyp() == Entity.Type.PARTITION) {
+          WriteEntity.WriteType writeType = deleting() ? WriteEntity.WriteType.DELETE :
+              WriteEntity.WriteType.UPDATE;
+          outputs.add(new WriteEntity(input.getPartition(), writeType));
+        }
+      }
+    } else {
+      // We still need to patch up the WriteEntities as they will have an insert type.  Change
+      // them to the appropriate type for our operation.
+      for (WriteEntity output : outputs) {
+        output.setWriteType(deleting() ? WriteEntity.WriteType.DELETE :
+            WriteEntity.WriteType.UPDATE);
+      }
+    }
+  }
+
+  private String operation() {
+    if (updating()) return "update";
+    else if (deleting()) return "delete";
+    else throw new IllegalStateException("UpdateDeleteSemanticAnalyzer neither updating nor " +
+          "deleting, operation not known.");
+  }
+
+  private boolean inputIsPartitioned(Set<ReadEntity> inputs) {
+    // We cannot simply look at the first entry, as in the case where the input is partitioned
+    // there will be a table entry as well.  So look for at least one partition entry.
+    for (ReadEntity re : inputs) {
+      if (re.getTyp() == Entity.Type.PARTITION) {
+        return true;
+      }
+    }
+    return false;
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java Sat Sep 13 22:09:31 2014
@@ -23,6 +23,7 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 
 /**
  * LoadTableDesc.
@@ -37,6 +38,9 @@ public class LoadTableDesc extends org.a
   private boolean holdDDLTime;
   private boolean inheritTableSpecs = true; //For partitions, flag controlling whether the current
                                             //table specs are to be used
+  // Need to remember whether this is an acid compliant operation, and if so whether it is an
+  // insert, update, or delete.
+  private AcidUtils.Operation writeType;
 
   // TODO: the below seems like they should just be combined into partitionDesc
   private org.apache.hadoop.hive.ql.plan.TableDesc table;
@@ -48,36 +52,69 @@ public class LoadTableDesc extends org.a
 
   public LoadTableDesc(final Path sourcePath,
       final org.apache.hadoop.hive.ql.plan.TableDesc table,
-      final Map<String, String> partitionSpec, final boolean replace) {
+      final Map<String, String> partitionSpec,
+      final boolean replace,
+      final AcidUtils.Operation writeType) {
     super(sourcePath);
-    init(table, partitionSpec, replace);
+    init(table, partitionSpec, replace, writeType);
+  }
+
+  /**
+   * For use with non-ACID compliant operations, such as LOAD
+   * @param sourcePath
+   * @param table
+   * @param partitionSpec
+   * @param replace
+   */
+  public LoadTableDesc(final Path sourcePath,
+                       final TableDesc table,
+                       final Map<String, String> partitionSpec,
+                       final boolean replace) {
+    this(sourcePath, table, partitionSpec, replace, AcidUtils.Operation.NOT_ACID);
   }
 
   public LoadTableDesc(final Path sourcePath,
       final org.apache.hadoop.hive.ql.plan.TableDesc table,
-      final Map<String, String> partitionSpec) {
-    this(sourcePath, table, partitionSpec, true);
+      final Map<String, String> partitionSpec,
+      final AcidUtils.Operation writeType) {
+    this(sourcePath, table, partitionSpec, true, writeType);
+  }
+
+  /**
+   * For DDL operations that are not ACID compliant.
+   * @param sourcePath
+   * @param table
+   * @param partitionSpec
+   */
+  public LoadTableDesc(final Path sourcePath,
+                       final org.apache.hadoop.hive.ql.plan.TableDesc table,
+                       final Map<String, String> partitionSpec) {
+    this(sourcePath, table, partitionSpec, true, AcidUtils.Operation.NOT_ACID);
   }
 
   public LoadTableDesc(final Path sourcePath,
       final org.apache.hadoop.hive.ql.plan.TableDesc table,
-      final DynamicPartitionCtx dpCtx) {
+      final DynamicPartitionCtx dpCtx,
+      final AcidUtils.Operation writeType) {
     super(sourcePath);
     this.dpCtx = dpCtx;
     if (dpCtx != null && dpCtx.getPartSpec() != null && partitionSpec == null) {
-      init(table, dpCtx.getPartSpec(), true);
+      init(table, dpCtx.getPartSpec(), true, writeType);
     } else {
-      init(table, new LinkedHashMap<String, String>(), true);
+      init(table, new LinkedHashMap<String, String>(), true, writeType);
     }
   }
 
   private void init(
       final org.apache.hadoop.hive.ql.plan.TableDesc table,
-      final Map<String, String> partitionSpec, final boolean replace) {
+      final Map<String, String> partitionSpec,
+      final boolean replace,
+      AcidUtils.Operation writeType) {
     this.table = table;
     this.partitionSpec = partitionSpec;
     this.replace = replace;
     this.holdDDLTime = false;
+    this.writeType = writeType;
   }
 
   public void setHoldDDLTime(boolean ddlTime) {
@@ -144,4 +181,8 @@ public class LoadTableDesc extends org.a
   public void setLbCtx(ListBucketingCtx lbCtx) {
     this.lbCtx = lbCtx;
   }
+
+  public AcidUtils.Operation getWriteType() {
+    return writeType;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java Sat Sep 13 22:09:31 2014
@@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.exec.Ro
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat;
@@ -597,19 +598,22 @@ public final class PlanUtils {
    * @param numReducers
    *          The number of reducers, set to -1 for automatic inference based on
    *          input data size.
+   * @param writeType Whether this is an Acid write, and if so whether it is insert, update,
+   *                  or delete.
    * @return The reduceSinkDesc object.
    */
   public static ReduceSinkDesc getReduceSinkDesc(
       ArrayList<ExprNodeDesc> keyCols, ArrayList<ExprNodeDesc> valueCols,
       List<String> outputColumnNames, boolean includeKeyCols, int tag,
-      ArrayList<ExprNodeDesc> partitionCols, String order, int numReducers) {
+      ArrayList<ExprNodeDesc> partitionCols, String order, int numReducers,
+      AcidUtils.Operation writeType) {
     return getReduceSinkDesc(keyCols, keyCols.size(), valueCols,
         new ArrayList<List<Integer>>(),
         includeKeyCols ? outputColumnNames.subList(0, keyCols.size()) :
           new ArrayList<String>(),
         includeKeyCols ? outputColumnNames.subList(keyCols.size(),
             outputColumnNames.size()) : outputColumnNames,
-        includeKeyCols, tag, partitionCols, order, numReducers);
+        includeKeyCols, tag, partitionCols, order, numReducers, writeType);
   }
 
   /**
@@ -635,6 +639,8 @@ public final class PlanUtils {
    * @param numReducers
    *          The number of reducers, set to -1 for automatic inference based on
    *          input data size.
+   * @param writeType Whether this is an Acid write, and if so whether it is insert, update,
+   *                  or delete.
    * @return The reduceSinkDesc object.
    */
   public static ReduceSinkDesc getReduceSinkDesc(
@@ -644,7 +650,8 @@ public final class PlanUtils {
       List<String> outputKeyColumnNames,
       List<String> outputValueColumnNames,
       boolean includeKeyCols, int tag,
-      ArrayList<ExprNodeDesc> partitionCols, String order, int numReducers) {
+      ArrayList<ExprNodeDesc> partitionCols, String order, int numReducers,
+      AcidUtils.Operation writeType) {
     TableDesc keyTable = null;
     TableDesc valueTable = null;
     ArrayList<String> outputKeyCols = new ArrayList<String>();
@@ -670,7 +677,7 @@ public final class PlanUtils {
     return new ReduceSinkDesc(keyCols, numKeys, valueCols, outputKeyCols,
         distinctColIndices, outputValCols,
         tag, partitionCols, numReducers, keyTable,
-        valueTable);
+        valueTable, writeType);
   }
 
   /**
@@ -690,12 +697,15 @@ public final class PlanUtils {
    * @param numReducers
    *          The number of reducers, set to -1 for automatic inference based on
    *          input data size.
+   * @param writeType Whether this is an Acid write, and if so whether it is insert, update,
+   *                  or delete.
    * @return The reduceSinkDesc object.
    */
   public static ReduceSinkDesc getReduceSinkDesc(
       ArrayList<ExprNodeDesc> keyCols, ArrayList<ExprNodeDesc> valueCols,
       List<String> outputColumnNames, boolean includeKey, int tag,
-      int numPartitionFields, int numReducers) throws SemanticException {
+      int numPartitionFields, int numReducers, AcidUtils.Operation writeType)
+      throws SemanticException {
     return getReduceSinkDesc(keyCols, keyCols.size(), valueCols,
         new ArrayList<List<Integer>>(),
         includeKey ? outputColumnNames.subList(0, keyCols.size()) :
@@ -703,7 +713,7 @@ public final class PlanUtils {
         includeKey ?
             outputColumnNames.subList(keyCols.size(), outputColumnNames.size())
             : outputColumnNames,
-        includeKey, tag, numPartitionFields, numReducers);
+        includeKey, tag, numPartitionFields, numReducers, writeType);
   }
 
   /**
@@ -729,6 +739,8 @@ public final class PlanUtils {
    * @param numReducers
    *          The number of reducers, set to -1 for automatic inference based on
    *          input data size.
+   * @param writeType Whether this is an Acid write, and if so whether it is insert, update,
+   *                  or delete.
    * @return The reduceSinkDesc object.
    */
   public static ReduceSinkDesc getReduceSinkDesc(
@@ -737,7 +749,8 @@ public final class PlanUtils {
       List<List<Integer>> distinctColIndices,
       List<String> outputKeyColumnNames, List<String> outputValueColumnNames,
       boolean includeKey, int tag,
-      int numPartitionFields, int numReducers) throws SemanticException {
+      int numPartitionFields, int numReducers, AcidUtils.Operation writeType)
+      throws SemanticException {
 
     ArrayList<ExprNodeDesc> partitionCols = new ArrayList<ExprNodeDesc>();
     if (numPartitionFields >= keyCols.size()) {
@@ -755,7 +768,7 @@ public final class PlanUtils {
     }
     return getReduceSinkDesc(keyCols, numKeys, valueCols, distinctColIndices,
         outputKeyColumnNames, outputValueColumnNames, includeKey, tag,
-        partitionCols, order.toString(), numReducers);
+        partitionCols, order.toString(), numReducers, writeType);
   }
 
   /**

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java Sat Sep 13 22:09:31 2014
@@ -23,6 +23,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 
 
 /**
@@ -91,6 +92,9 @@ public class ReduceSinkDesc extends Abst
   private boolean skipTag; // Skip writing tags when feeding into mapjoin hashtable
   private Boolean autoParallel = null; // Is reducer auto-parallelism enabled, disabled or unset
 
+  // Write type, since this needs to calculate buckets differently for updates and deletes
+  private AcidUtils.Operation writeType;
+
   private static transient Log LOG = LogFactory.getLog(ReduceSinkDesc.class);
   public ReduceSinkDesc() {
   }
@@ -102,7 +106,8 @@ public class ReduceSinkDesc extends Abst
       List<List<Integer>> distinctColumnIndices,
       ArrayList<String> outputValueColumnNames, int tag,
       ArrayList<ExprNodeDesc> partitionCols, int numReducers,
-      final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
+      final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo,
+      AcidUtils.Operation writeType) {
     this.keyCols = keyCols;
     this.numDistributionKeys = numDistributionKeys;
     this.valueCols = valueCols;
@@ -116,6 +121,7 @@ public class ReduceSinkDesc extends Abst
     this.distinctColumnIndices = distinctColumnIndices;
     this.setNumBuckets(-1);
     this.setBucketCols(null);
+    this.writeType = writeType;
   }
 
   @Override
@@ -367,4 +373,8 @@ public class ReduceSinkDesc extends Abst
       this.autoParallel = autoParallel;
     }
   }
+
+  public AcidUtils.Operation getWriteType() {
+    return writeType;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java Sat Sep 13 22:09:31 2014
@@ -208,6 +208,11 @@ public class SessionState {
   private String hdfsScratchDirURIString;
 
   /**
+   * Next value to use in naming a temporary table created by an insert...values statement
+   */
+  private int nextValueTempTableSuffix = 1;
+
+  /**
    * Transaction manager to use for this session.  This is instantiated lazily by
    * {@link #initTxnMgr(org.apache.hadoop.hive.conf.HiveConf)}
    */
@@ -1341,4 +1346,12 @@ public class SessionState {
     this.userIpAddress = userIpAddress;
   }
 
+  /**
+   * Get the next suffix to use in naming a temporary table created by insert...values
+   * @return suffix
+   */
+  public String getNextValuesTempTableSuffix() {
+    return Integer.toString(nextValueTempTableSuffix++);
+  }
+
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java Sat Sep 13 22:09:31 2014
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.exec.ve
 import org.apache.hadoop.hive.ql.exec.vector.expressions.CastDecimalToLong;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.gen.CastDoubleToLong;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.gen.CastTimestampToLongViaLongToLong;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.hive.serde2.io.ByteWritable;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
@@ -204,4 +205,19 @@ public class UDFToInteger extends UDF {
     }
   }
 
+  /**
+   * Convert a RecordIdentifier.  This is done so that we can use the RecordIdentifier in place
+   * of the bucketing column.
+   * @param i RecordIdentifier to convert
+   * @return value of the bucket identifier
+   */
+  public IntWritable evaluate(RecordIdentifier i) {
+    if (i == null) {
+      return null;
+    } else {
+      intWritable.set(i.getBucketId());
+      return intWritable;
+    }
+  }
+
 }

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java Sat Sep 13 22:09:31 2014
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.DriverC
 import org.apache.hadoop.hive.ql.WindowsPathUtil;
 import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
 import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.Table;
@@ -137,7 +138,7 @@ public class TestExecDriver extends Test
         db.dropTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, src, true, true);
         db.createTable(src, cols, null, TextInputFormat.class,
             IgnoreKeyTextOutputFormat.class);
-        db.loadTable(hadoopDataFile[i], src, false, false, true, false);
+        db.loadTable(hadoopDataFile[i], src, false, false, true, false, false);
         i++;
       }
 
@@ -246,7 +247,7 @@ public class TestExecDriver extends Test
     Operator<ReduceSinkDesc> op1 = OperatorFactory.get(PlanUtils
         .getReduceSinkDesc(Utilities.makeList(getStringColumn("key")),
         Utilities.makeList(getStringColumn("value")), outputColumns, true,
-        -1, 1, -1));
+        -1, 1, -1, AcidUtils.Operation.NOT_ACID));
 
     addMapWork(mr, src, "a", op1);
     ReduceWork rWork = new ReduceWork();
@@ -276,7 +277,7 @@ public class TestExecDriver extends Test
         .getReduceSinkDesc(Utilities.makeList(getStringColumn("key")),
         Utilities
         .makeList(getStringColumn("key"), getStringColumn("value")),
-        outputColumns, false, -1, 1, -1));
+        outputColumns, false, -1, 1, -1, AcidUtils.Operation.NOT_ACID));
 
     addMapWork(mr, src, "a", op1);
     ReduceWork rWork = new ReduceWork();
@@ -310,14 +311,14 @@ public class TestExecDriver extends Test
     Operator<ReduceSinkDesc> op1 = OperatorFactory.get(PlanUtils
         .getReduceSinkDesc(Utilities.makeList(getStringColumn("key")),
         Utilities.makeList(getStringColumn("value")), outputColumns, true,
-        Byte.valueOf((byte) 0), 1, -1));
+        Byte.valueOf((byte) 0), 1, -1, AcidUtils.Operation.NOT_ACID));
 
     addMapWork(mr, src, "a", op1);
 
     Operator<ReduceSinkDesc> op2 = OperatorFactory.get(PlanUtils
         .getReduceSinkDesc(Utilities.makeList(getStringColumn("key")),
         Utilities.makeList(getStringColumn("key")), outputColumns, true,
-        Byte.valueOf((byte) 1), Integer.MAX_VALUE, -1));
+        Byte.valueOf((byte) 1), Integer.MAX_VALUE, -1, AcidUtils.Operation.NOT_ACID));
 
     addMapWork(mr, src2, "b", op2);
     ReduceWork rWork = new ReduceWork();
@@ -353,7 +354,7 @@ public class TestExecDriver extends Test
     Operator<ReduceSinkDesc> op1 = OperatorFactory.get(PlanUtils
         .getReduceSinkDesc(Utilities.makeList(getStringColumn("tkey")),
         Utilities.makeList(getStringColumn("tkey"),
-        getStringColumn("tvalue")), outputColumns, false, -1, 1, -1));
+        getStringColumn("tvalue")), outputColumns, false, -1, 1, -1, AcidUtils.Operation.NOT_ACID));
 
     Operator<ScriptDesc> op0 = OperatorFactory.get(new ScriptDesc("cat",
         PlanUtils.getDefaultTableDesc("" + Utilities.tabCode, "key,value"),
@@ -398,7 +399,7 @@ public class TestExecDriver extends Test
     Operator<ReduceSinkDesc> op0 = OperatorFactory.get(PlanUtils
         .getReduceSinkDesc(Utilities.makeList(getStringColumn("0")), Utilities
         .makeList(getStringColumn("0"), getStringColumn("1")),
-        outputColumns, false, -1, 1, -1));
+        outputColumns, false, -1, 1, -1, AcidUtils.Operation.NOT_ACID));
 
     Operator<SelectDesc> op4 = OperatorFactory.get(new SelectDesc(Utilities
         .makeList(getStringColumn("key"), getStringColumn("value")),
@@ -432,7 +433,7 @@ public class TestExecDriver extends Test
     Operator<ReduceSinkDesc> op1 = OperatorFactory.get(PlanUtils
         .getReduceSinkDesc(Utilities.makeList(getStringColumn("tkey")),
         Utilities.makeList(getStringColumn("tkey"),
-        getStringColumn("tvalue")), outputColumns, false, -1, 1, -1));
+        getStringColumn("tvalue")), outputColumns, false, -1, 1, -1, AcidUtils.Operation.NOT_ACID));
 
     Operator<ScriptDesc> op0 = OperatorFactory.get(new ScriptDesc(
         "\'cat\'", PlanUtils.getDefaultTableDesc("" + Utilities.tabCode,

Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/parse/TestUpdateDeleteSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/parse/TestUpdateDeleteSemanticAnalyzer.java?rev=1624788&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/parse/TestUpdateDeleteSemanticAnalyzer.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/parse/TestUpdateDeleteSemanticAnalyzer.java Sat Sep 13 22:09:31 2014
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import junit.framework.Assert;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.exec.ExplainTask;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.plan.ExplainWork;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestUpdateDeleteSemanticAnalyzer {
+
+  static final private Log LOG = LogFactory.getLog(TestSemanticAnalyzer.class.getName());
+
+  private HiveConf conf;
+  private Hive db;
+
+  // All of the insert, update, and delete tests assume two tables, T and U, each with columns a,
+  // and b.  U it partitioned by an additional column ds.  These are created by parseAndAnalyze
+  // and removed by cleanupTables().
+
+  @Test
+  public void testInsertSelect() throws Exception {
+    try {
+      ReturnInfo rc = parseAndAnalyze("insert into table T select a, b from U", "testInsertSelect");
+
+      LOG.info(explain((SemanticAnalyzer)rc.sem, rc.plan, rc.ast.dump()));
+
+    } finally {
+      cleanupTables();
+    }
+  }
+
+  @Test
+  public void testDeleteAllNonPartitioned() throws Exception {
+    try {
+      ReturnInfo rc = parseAndAnalyze("delete from T", "testDeleteAllNonPartitioned");
+      LOG.info(explain((SemanticAnalyzer)rc.sem, rc.plan, rc.ast.dump()));
+    } finally {
+      cleanupTables();
+    }
+  }
+
+  @Test
+  public void testDeleteWhereNoPartition() throws Exception {
+    try {
+      ReturnInfo rc = parseAndAnalyze("delete from T where a > 5", "testDeleteWhereNoPartition");
+      LOG.info(explain((SemanticAnalyzer)rc.sem, rc.plan, rc.ast.dump()));
+    } finally {
+      cleanupTables();
+    }
+  }
+
+  @Test
+  public void testDeleteAllPartitioned() throws Exception {
+    try {
+      ReturnInfo rc = parseAndAnalyze("delete from U", "testDeleteAllPartitioned");
+      LOG.info(explain((SemanticAnalyzer)rc.sem, rc.plan, rc.ast.dump()));
+    } finally {
+      cleanupTables();
+    }
+  }
+
+  @Test
+  public void testDeleteAllWherePartitioned() throws Exception {
+    try {
+      ReturnInfo rc = parseAndAnalyze("delete from U where a > 5", "testDeleteAllWherePartitioned");
+      LOG.info(explain((SemanticAnalyzer)rc.sem, rc.plan, rc.ast.dump()));
+    } finally {
+      cleanupTables();
+    }
+  }
+
+  @Test
+  public void testDeleteOnePartition() throws Exception {
+    try {
+      ReturnInfo rc = parseAndAnalyze("delete from U where ds = 'today'",
+          "testDeleteFromPartitionOnly");
+      LOG.info(explain((SemanticAnalyzer)rc.sem, rc.plan, rc.ast.dump()));
+    } finally {
+      cleanupTables();
+    }
+  }
+
+  @Test
+  public void testDeleteOnePartitionWhere() throws Exception {
+    try {
+      ReturnInfo rc = parseAndAnalyze("delete from U where ds = 'today' and a > 5",
+          "testDeletePartitionWhere");
+      LOG.info(explain((SemanticAnalyzer)rc.sem, rc.plan, rc.ast.dump()));
+    } finally {
+      cleanupTables();
+    }
+  }
+
+  @Test
+  public void testUpdateAllNonPartitioned() throws Exception {
+    try {
+      ReturnInfo rc = parseAndAnalyze("update T set a = 5", "testUpdateAllNonPartitioned");
+      LOG.info(explain((SemanticAnalyzer)rc.sem, rc.plan, rc.ast.dump()));
+    } finally {
+      cleanupTables();
+    }
+  }
+
+  @Test
+  public void testUpdateAllNonPartitionedWhere() throws Exception {
+    try {
+      ReturnInfo rc = parseAndAnalyze("update T set a = 5 where b > 5",
+          "testUpdateAllNonPartitionedWhere");
+      LOG.info(explain((SemanticAnalyzer)rc.sem, rc.plan, rc.ast.dump()));
+    } finally {
+      cleanupTables();
+    }
+  }
+
+  @Test
+  public void testUpdateAllPartitioned() throws Exception {
+    try {
+      ReturnInfo rc = parseAndAnalyze("update U set a = 5", "testUpdateAllPartitioned");
+      LOG.info(explain((SemanticAnalyzer)rc.sem, rc.plan, rc.ast.dump()));
+    } finally {
+      cleanupTables();
+    }
+  }
+
+  @Test
+  public void testUpdateAllPartitionedWhere() throws Exception {
+    try {
+      ReturnInfo rc = parseAndAnalyze("update U set a = 5 where b > 5",
+          "testUpdateAllPartitionedWhere");
+      LOG.info(explain((SemanticAnalyzer)rc.sem, rc.plan, rc.ast.dump()));
+    } finally {
+      cleanupTables();
+    }
+  }
+
+  @Test
+  public void testUpdateOnePartition() throws Exception {
+    try {
+      ReturnInfo rc = parseAndAnalyze("update U set a = 5 where ds = 'today'",
+          "testUpdateOnePartition");
+      LOG.info(explain((SemanticAnalyzer)rc.sem, rc.plan, rc.ast.dump()));
+    } finally {
+      cleanupTables();
+    }
+  }
+
+  @Test
+  public void testUpdateOnePartitionWhere() throws Exception {
+    try {
+      ReturnInfo rc = parseAndAnalyze("update U set a = 5 where ds = 'today' and b > 5",
+          "testUpdateOnePartitionWhere");
+      LOG.info(explain((SemanticAnalyzer)rc.sem, rc.plan, rc.ast.dump()));
+    } finally {
+      cleanupTables();
+    }
+  }
+
+  @Test
+  public void testInsertValues() throws Exception {
+    try {
+      ReturnInfo rc = parseAndAnalyze("insert into table T values ('abc', 3), ('ghi', 5)",
+          "testInsertValues");
+
+      LOG.info(explain((SemanticAnalyzer)rc.sem, rc.plan, rc.ast.dump()));
+
+    } finally {
+      cleanupTables();
+    }
+  }
+
+  @Test
+  public void testInsertValuesPartitioned() throws Exception {
+    try {
+      ReturnInfo rc = parseAndAnalyze("insert into table U partition (ds) values " +
+              "('abc', 3, 'today'), ('ghi', 5, 'tomorrow')",
+          "testInsertValuesPartitioned");
+
+      LOG.info(explain((SemanticAnalyzer) rc.sem, rc.plan, rc.ast.dump()));
+
+    } finally {
+      cleanupTables();
+    }
+  }
+
+  @Before
+  public void setup() {
+    conf = new HiveConf();
+    conf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+    conf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+  }
+
+  public void cleanupTables() throws HiveException {
+    if (db != null) {
+      db.dropTable("T");
+      db.dropTable("U");
+    }
+  }
+
+  private class ReturnInfo {
+    ASTNode ast;
+    BaseSemanticAnalyzer sem;
+    QueryPlan plan;
+
+    ReturnInfo(ASTNode a, BaseSemanticAnalyzer s, QueryPlan p) {
+      ast = a;
+      sem = s;
+      plan = p;
+    }
+  }
+
+  private ReturnInfo parseAndAnalyze(String query, String testName)
+      throws IOException, ParseException, HiveException {
+
+    SessionState.start(conf);
+    Context ctx = new Context(conf);
+    ctx.setCmd(query);
+    ctx.setHDFSCleanup(true);
+
+    ParseDriver pd = new ParseDriver();
+    ASTNode tree = pd.parse(query, ctx);
+    tree = ParseUtils.findRootNonNullToken(tree);
+
+    BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(conf, tree);
+    SessionState.get().initTxnMgr(conf);
+    db = sem.getDb();
+
+    // I have to create the tables here (rather than in setup()) because I need the Hive
+    // connection, which is conviently created by the semantic analyzer.
+    db.createTable("T", Arrays.asList("a", "b"), null, OrcInputFormat.class, OrcOutputFormat.class);
+    db.createTable("U", Arrays.asList("a", "b"), Arrays.asList("ds"), OrcInputFormat.class,
+        OrcOutputFormat.class);
+    Table u = db.getTable("U");
+    Map<String, String> partVals = new HashMap<String, String>(2);
+    partVals.put("ds", "yesterday");
+    db.createPartition(u, partVals);
+    partVals.clear();
+    partVals.put("ds", "today");
+    db.createPartition(u, partVals);
+    sem.analyze(tree, ctx);
+    // validate the plan
+    sem.validate();
+
+    QueryPlan plan = new QueryPlan(query, sem, 0L, testName);
+
+    return new ReturnInfo(tree, sem, plan);
+  }
+
+  private String explain(SemanticAnalyzer sem, QueryPlan plan, String astStringTree) throws
+      IOException {
+    FileSystem fs = FileSystem.get(conf);
+    File f = File.createTempFile("TestSemanticAnalyzer", "explain");
+    Path tmp = new Path(f.getPath());
+    fs.create(tmp);
+    fs.deleteOnExit(tmp);
+    ExplainWork work = new ExplainWork(tmp, sem.getParseContext(), sem.getRootTasks(),
+        sem.getFetchTask(), astStringTree, sem, true, false, false, false, false);
+    ExplainTask task = new ExplainTask();
+    task.setWork(work);
+    task.initialize(conf, plan, null);
+    task.execute(null);
+    FSDataInputStream in = fs.open(tmp);
+    StringBuilder builder = new StringBuilder();
+    final int bufSz = 4096;
+    byte[] buf = new byte[bufSz];
+    long pos = 0L;
+    while (true) {
+      int bytesRead = in.read(pos, buf, 0, bufSz);
+      if (bytesRead > 0) {
+        pos += bytesRead;
+        builder.append(new String(buf, 0, bytesRead));
+      } else {
+        // Reached end of file
+        in.close();
+        break;
+      }
+    }
+    return builder.toString()
+        .replaceAll("pfile:/.*\n", "pfile:MASKED-OUT\n")
+        .replaceAll("location file:/.*\n", "location file:MASKED-OUT\n")
+        .replaceAll("file:/.*\n", "file:MASKED-OUT\n")
+        .replaceAll("transient_lastDdlTime.*\n", "transient_lastDdlTime MASKED-OUT\n");
+  }
+}

Added: hive/trunk/ql/src/test/queries/clientnegative/acid_overwrite.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientnegative/acid_overwrite.q?rev=1624788&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientnegative/acid_overwrite.q (added)
+++ hive/trunk/ql/src/test/queries/clientnegative/acid_overwrite.q Sat Sep 13 22:09:31 2014
@@ -0,0 +1,9 @@
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+set hive.enforce.bucketing=true;
+
+create table acid_uanp(a int, b varchar(128)) clustered by (a) into 2 buckets stored as orc;
+
+insert into table acid_uanp select cint, cast(cstring1 as varchar(128)) from alltypesorc where cint < 0 order by cint limit 10;
+insert overwrite table acid_uanp select cint, cast(cstring1 as varchar(128)) from alltypesorc;

Added: hive/trunk/ql/src/test/queries/clientnegative/delete_not_acid.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientnegative/delete_not_acid.q?rev=1624788&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientnegative/delete_not_acid.q (added)
+++ hive/trunk/ql/src/test/queries/clientnegative/delete_not_acid.q Sat Sep 13 22:09:31 2014
@@ -0,0 +1,6 @@
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager;
+
+create table foo(a int, b varchar(128)) clustered by (a) into 1 buckets stored as orc;
+
+delete from foo;

Added: hive/trunk/ql/src/test/queries/clientnegative/update_not_acid.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientnegative/update_not_acid.q?rev=1624788&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientnegative/update_not_acid.q (added)
+++ hive/trunk/ql/src/test/queries/clientnegative/update_not_acid.q Sat Sep 13 22:09:31 2014
@@ -0,0 +1,6 @@
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager;
+
+create table foo(a int, b varchar(128)) clustered by (a) into 1 buckets stored as orc;
+
+update foo set b = 'fred';

Added: hive/trunk/ql/src/test/queries/clientnegative/update_partition_col.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientnegative/update_partition_col.q?rev=1624788&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientnegative/update_partition_col.q (added)
+++ hive/trunk/ql/src/test/queries/clientnegative/update_partition_col.q Sat Sep 13 22:09:31 2014
@@ -0,0 +1,8 @@
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+set hive.enforce.bucketing=true;
+
+create table foo(a int, b varchar(128)) partitioned by (ds string) clustered by (a) into 2 buckets stored as orc;
+
+update foo set ds = 'fred';

Added: hive/trunk/ql/src/test/queries/clientpositive/delete_all_non_partitioned.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/delete_all_non_partitioned.q?rev=1624788&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/delete_all_non_partitioned.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/delete_all_non_partitioned.q Sat Sep 13 22:09:31 2014
@@ -0,0 +1,17 @@
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+set hive.enforce.bucketing=true;
+set hive.exec.reducers.max = 1;
+
+create table acid_danp(a int, b varchar(128)) clustered by (a) into 2 buckets stored as orc;
+
+insert into table acid_danp select cint, cast(cstring1 as varchar(128)) from alltypesorc where cint < 0 order by cint limit 10;
+
+select a,b from acid_danp order by a;
+
+delete from acid_danp;
+
+select a,b from acid_danp;
+
+

Added: hive/trunk/ql/src/test/queries/clientpositive/delete_all_partitioned.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/delete_all_partitioned.q?rev=1624788&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/delete_all_partitioned.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/delete_all_partitioned.q Sat Sep 13 22:09:31 2014
@@ -0,0 +1,16 @@
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+set hive.enforce.bucketing=true;
+set hive.mapred.supports.subdirectories=true;
+
+create table acid_dap(a int, b varchar(128)) partitioned by (ds string) clustered by (a) into 2 buckets stored as orc;
+
+insert into table acid_dap partition (ds='today') select cint, cast(cstring1 as varchar(128)) from alltypesorc where cint is not null and cint < 0 order by cint limit 10;
+insert into table acid_dap partition (ds='tomorrow') select cint, cast(cstring1 as varchar(128)) from alltypesorc where cint is not null and cint > 1000 order by cint limit 10;
+
+select a,b,ds from acid_dap order by a,b;
+
+delete from acid_dap;
+
+select * from acid_dap;

Added: hive/trunk/ql/src/test/queries/clientpositive/delete_orig_table.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/delete_orig_table.q?rev=1624788&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/delete_orig_table.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/delete_orig_table.q Sat Sep 13 22:09:31 2014
@@ -0,0 +1,29 @@
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+set hive.enforce.bucketing=true;
+
+dfs ${system:test.dfs.mkdir} ${system:test.tmp.dir}/delete_orig_table;
+dfs -copyFromLocal ../../data/files/alltypesorc ${system:test.tmp.dir}/delete_orig_table/00000_0; 
+
+create table acid_dot(
+    ctinyint TINYINT,
+    csmallint SMALLINT,
+    cint INT,
+    cbigint BIGINT,
+    cfloat FLOAT,
+    cdouble DOUBLE,
+    cstring1 STRING,
+    cstring2 STRING,
+    ctimestamp1 TIMESTAMP,
+    ctimestamp2 TIMESTAMP,
+    cboolean1 BOOLEAN,
+    cboolean2 BOOLEAN) clustered by (cint) into 1 buckets stored as orc location '${system:test.tmp.dir}/delete_orig_table';
+
+select count(*) from acid_dot;
+
+delete from acid_dot where cint < -1070551679;
+
+select count(*) from acid_dot;
+
+dfs -rmr ${system:test.tmp.dir}/delete_orig_table;

Added: hive/trunk/ql/src/test/queries/clientpositive/delete_tmp_table.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/delete_tmp_table.q?rev=1624788&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/delete_tmp_table.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/delete_tmp_table.q Sat Sep 13 22:09:31 2014
@@ -0,0 +1,16 @@
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+set hive.enforce.bucketing=true;
+
+create temporary table acid_dtt(a int, b varchar(128)) clustered by (a) into 2 buckets stored as orc;
+
+insert into table acid_dtt select cint, cast(cstring1 as varchar(128)) from alltypesorc where cint is not null order by cint limit 10;
+
+select * from acid_dtt order by a;
+
+delete from acid_dtt where b = '0ruyd6Y50JpdGRf6HqD' or b = '2uLyD28144vklju213J1mr';
+
+select a,b from acid_dtt order by b;
+
+

Added: hive/trunk/ql/src/test/queries/clientpositive/delete_where_no_match.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/delete_where_no_match.q?rev=1624788&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/delete_where_no_match.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/delete_where_no_match.q Sat Sep 13 22:09:31 2014
@@ -0,0 +1,16 @@
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+set hive.enforce.bucketing=true;
+
+create table acid_dwnm(a int, b varchar(128)) clustered by (a) into 2 buckets stored as orc;
+
+insert into table acid_dwnm select cint, cast(cstring1 as varchar(128)) from alltypesorc where cint is not null order by cint limit 10;
+
+select * from acid_dwnm order by a;
+
+delete from acid_dwnm where b = 'nosuchvalue';
+
+select a,b from acid_dwnm order by b;
+
+

Added: hive/trunk/ql/src/test/queries/clientpositive/delete_where_non_partitioned.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/delete_where_non_partitioned.q?rev=1624788&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/delete_where_non_partitioned.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/delete_where_non_partitioned.q Sat Sep 13 22:09:31 2014
@@ -0,0 +1,16 @@
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+set hive.enforce.bucketing=true;
+
+create table acid_dwnp(a int, b varchar(128)) clustered by (a) into 2 buckets stored as orc;
+
+insert into table acid_dwnp select cint, cast(cstring1 as varchar(128)) from alltypesorc where cint is not null order by cint limit 10;
+
+select * from acid_dwnp order by a;
+
+delete from acid_dwnp where b = '0ruyd6Y50JpdGRf6HqD';
+
+select a,b from acid_dwnp order by b;
+
+

Added: hive/trunk/ql/src/test/queries/clientpositive/delete_where_partitioned.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/delete_where_partitioned.q?rev=1624788&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/delete_where_partitioned.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/delete_where_partitioned.q Sat Sep 13 22:09:31 2014
@@ -0,0 +1,16 @@
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+set hive.enforce.bucketing=true;
+set hive.mapred.supports.subdirectories=true;
+
+create table acid_dwp(a int, b varchar(128)) partitioned by (ds string) clustered by (a) into 2 buckets stored as orc;
+
+insert into table acid_dwp partition (ds='today') select cint, cast(cstring1 as varchar(128)) from alltypesorc where cint is not null and cint < 0 order by cint limit 10;
+insert into table acid_dwp partition (ds='tomorrow') select cint, cast(cstring1 as varchar(128)) from alltypesorc where cint is not null and cint > -10000000 order by cint limit 10;
+
+select a,b,ds from acid_dwp order by a, ds;
+
+delete from acid_dwp where a = '-1071363017';
+
+select * from acid_dwp order by a, ds;