You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ga...@apache.org on 2014/09/14 00:09:33 UTC
svn commit: r1624788 [2/5] - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/ data/conf/tez/
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/
itests/sr...
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Sat Sep 13 22:09:31 2014
@@ -37,6 +37,8 @@ import java.util.UUID;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
+import org.antlr.runtime.ClassicToken;
+import org.antlr.runtime.Token;
import org.antlr.runtime.tree.Tree;
import org.antlr.runtime.tree.TreeWizard;
import org.antlr.runtime.tree.TreeWizard.ContextVisitor;
@@ -85,6 +87,8 @@ import org.apache.hadoop.hive.ql.exec.Un
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
@@ -209,6 +213,8 @@ public class SemanticAnalyzer extends Ba
// Max characters when auto generating the column name with func name
private static final int AUTOGEN_COLALIAS_PRFX_MAXLENGTH = 20;
+ private static final String VALUES_TMP_TABLE_NAME_PREFIX = "Values__Tmp__Table__";
+
private HashMap<TableScanOperator, ExprNodeDesc> opToPartPruner;
private HashMap<TableScanOperator, PrunedPartitionList> opToPartList;
private HashMap<String, Operator<? extends OperatorDesc>> topOps;
@@ -680,6 +686,140 @@ public class SemanticAnalyzer extends Ba
return alias;
}
+ // Generate a temp table out of a value clause
+ private ASTNode genValuesTempTable(ASTNode originalFrom) throws SemanticException {
+ // Pick a name for the table
+ SessionState ss = SessionState.get();
+ String tableName = VALUES_TMP_TABLE_NAME_PREFIX + ss.getNextValuesTempTableSuffix();
+
+ // Step 1, parse the values clause we were handed
+ List<? extends Node> fromChildren = originalFrom.getChildren();
+ // First child should be the virtual table ref
+ ASTNode virtualTableRef = (ASTNode)fromChildren.get(0);
+ assert virtualTableRef.getToken().getType() == HiveParser.TOK_VIRTUAL_TABREF :
+ "Expected first child of TOK_VIRTUAL_TABLE to be TOK_VIRTUAL_TABREF but was " +
+ virtualTableRef.getName();
+
+ List<? extends Node> virtualTableRefChildren = virtualTableRef.getChildren();
+ // First child of this should be the table name. If it's anonymous,
+ // then we don't have a table name.
+ ASTNode tabName = (ASTNode)virtualTableRefChildren.get(0);
+ if (tabName.getToken().getType() != HiveParser.TOK_ANONYMOUS) {
+ // TODO, if you want to make select ... from (values(...) as foo(...) work,
+ // you need to parse this list of columns names and build it into the table
+ throw new SemanticException(ErrorMsg.VALUES_TABLE_CONSTRUCTOR_NOT_SUPPORTED.getMsg());
+ }
+
+ // The second child of the TOK_VIRTUAL_TABLE should be TOK_VALUES_TABLE
+ ASTNode valuesTable = (ASTNode)fromChildren.get(1);
+ assert valuesTable.getToken().getType() == HiveParser.TOK_VALUES_TABLE :
+ "Expected second child of TOK_VIRTUAL_TABLE to be TOK_VALUE_TABLE but was " +
+ valuesTable.getName();
+ // Each of the children of TOK_VALUES_TABLE will be a TOK_VALUE_ROW
+ List<? extends Node> valuesTableChildren = valuesTable.getChildren();
+
+ // Now that we're going to start reading through the rows, open a file to write the rows too
+ // If we leave this method before creating the temporary table we need to be sure to clean up
+ // this file.
+ Path tablePath = null;
+ FileSystem fs = null;
+ try {
+ tablePath = Warehouse.getDnsPath(new Path(ss.getTempTableSpace(), tableName), conf);
+ fs = tablePath.getFileSystem(conf);
+ fs.mkdirs(tablePath);
+ Path dataFile = new Path(tablePath, "data_file");
+ FSDataOutputStream out = fs.create(dataFile);
+ List<FieldSchema> fields = new ArrayList<FieldSchema>();
+
+ boolean firstRow = true;
+ for (Node n : valuesTableChildren) {
+ ASTNode valuesRow = (ASTNode) n;
+ assert valuesRow.getToken().getType() == HiveParser.TOK_VALUE_ROW :
+ "Expected child of TOK_VALUE_TABLE to be TOK_VALUE_ROW but was " + valuesRow.getName();
+ // Each of the children of this should be a literal
+ List<? extends Node> valuesRowChildren = valuesRow.getChildren();
+ boolean isFirst = true;
+ int nextColNum = 1;
+ for (Node n1 : valuesRowChildren) {
+ ASTNode value = (ASTNode) n1;
+ if (firstRow) {
+ fields.add(new FieldSchema("tmp_values_col" + nextColNum++, "string", ""));
+ }
+ if (isFirst) isFirst = false;
+ else out.writeBytes("\u0001");
+ out.writeBytes(unparseExprForValuesClause(value));
+ }
+ out.writeBytes("\n");
+ firstRow = false;
+ }
+ out.close();
+
+ // Step 2, create a temp table, using the created file as the data
+ StorageFormat format = new StorageFormat(conf);
+ format.processStorageFormat("TextFile");
+ Table table = db.newTable(tableName);
+ table.setSerializationLib(format.getSerde());
+ table.setFields(fields);
+ table.setDataLocation(tablePath);
+ table.getTTable().setTemporary(true);
+ table.setStoredAsSubDirectories(false);
+ table.setInputFormatClass(format.getInputFormat());
+ table.setOutputFormatClass(format.getOutputFormat());
+ db.createTable(table, false);
+ } catch (Exception e) {
+ String errMsg = ErrorMsg.INSERT_CANNOT_CREATE_TEMP_FILE.getMsg() + e.getMessage();
+ LOG.error(errMsg);
+ // Try to delete the file
+ if (fs != null && tablePath != null) {
+ try {
+ fs.delete(tablePath, false);
+ } catch (IOException swallowIt) {}
+ }
+ throw new SemanticException(errMsg, e);
+ }
+
+ // Step 3, return a new subtree with a from clause built around that temp table
+ // The form of the tree is TOK_TABREF->TOK_TABNAME->identifier(tablename)
+ Token t = new ClassicToken(HiveParser.TOK_TABREF);
+ ASTNode tabRef = new ASTNode(t);
+ t = new ClassicToken(HiveParser.TOK_TABNAME);
+ ASTNode tabNameNode = new ASTNode(t);
+ tabRef.addChild(tabNameNode);
+ t = new ClassicToken(HiveParser.Identifier, tableName);
+ ASTNode identifier = new ASTNode(t);
+ tabNameNode.addChild(identifier);
+ return tabRef;
+ }
+
+ // Take an expression in the values clause and turn it back into a string. This is far from
+ // comprehensive. At the moment it only supports:
+ // * literals (all types)
+ // * unary negatives
+ // * true/false
+ private String unparseExprForValuesClause(ASTNode expr) throws SemanticException {
+ switch (expr.getToken().getType()) {
+ case HiveParser.Number:
+ return expr.getText();
+
+ case HiveParser.StringLiteral:
+ return PlanUtils.stripQuotes(expr.getText());
+
+ case HiveParser.KW_FALSE:
+ return "FALSE";
+
+ case HiveParser.KW_TRUE:
+ return "TRUE";
+
+ case HiveParser.MINUS:
+ return "-" + unparseExprForValuesClause((ASTNode)expr.getChildren().get(0));
+
+ default:
+ throw new SemanticException("Expression of type " + expr.getText() +
+ " not supported in insert/values");
+ }
+
+ }
+
private void assertCombineInputFormat(Tree numerator, String message) throws SemanticException {
String inputFormat = conf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") ?
HiveConf.getVar(conf, HiveConf.ConfVars.HIVETEZINPUTFORMAT):
@@ -997,7 +1137,11 @@ public class SemanticAnalyzer extends Ba
if (frm.getToken().getType() == HiveParser.TOK_TABREF) {
processTable(qb, frm);
} else if (frm.getToken().getType() == HiveParser.TOK_VIRTUAL_TABLE) {
- throw new RuntimeException("VALUES() clause is not fully supported yet...");
+ // Create a temp table with the passed values in it then rewrite this portion of the
+ // tree to be from that table.
+ ASTNode newFrom = genValuesTempTable(frm);
+ ast.setChild(0, newFrom);
+ processTable(qb, newFrom);
} else if (frm.getToken().getType() == HiveParser.TOK_SUBQUERY) {
processSubQuery(qb, frm);
} else if (frm.getToken().getType() == HiveParser.TOK_LATERAL_VIEW ||
@@ -1190,10 +1334,6 @@ public class SemanticAnalyzer extends Ba
case HiveParser.TOK_CTE:
processCTE(qb, ast);
break;
- case HiveParser.TOK_DELETE_FROM:
- throw new RuntimeException("DELETE is not (yet) implemented...");
- case HiveParser.TOK_UPDATE_TABLE:
- throw new RuntimeException("UPDATE is not (yet) implemented...");
default:
skipRecursion = false;
break;
@@ -1280,7 +1420,7 @@ public class SemanticAnalyzer extends Ba
// Disallow INSERT INTO on bucketized tables
if (qb.getParseInfo().isInsertIntoTable(tab.getDbName(), tab.getTableName()) &&
- tab.getNumBuckets() > 0) {
+ tab.getNumBuckets() > 0 && !isAcidTable(tab)) {
throw new SemanticException(ErrorMsg.INSERT_INTO_BUCKETIZED_TABLE.
getMsg("Table: " + tab_name));
}
@@ -4226,7 +4366,7 @@ public class SemanticAnalyzer extends Ba
groupingSetsPresent ? keyLength + 1 : keyLength,
reduceValues, distinctColIndices,
outputKeyColumnNames, outputValueColumnNames, true, -1, numPartitionFields,
- numReducers),
+ numReducers, AcidUtils.Operation.NOT_ACID),
new RowSchema(reduceSinkOutputRowResolver.getColumnInfos()), inputOperatorInfo),
reduceSinkOutputRowResolver);
rsOp.setColumnExprMap(colExprMap);
@@ -4429,7 +4569,7 @@ public class SemanticAnalyzer extends Ba
}
ReduceSinkDesc rsDesc = PlanUtils.getReduceSinkDesc(reduceKeys, keyLength, reduceValues,
distinctColIndices, outputKeyColumnNames, outputValueColumnNames,
- true, -1, keyLength, numReducers);
+ true, -1, keyLength, numReducers, AcidUtils.Operation.NOT_ACID);
ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap(
OperatorFactory.getAndMakeChild(rsDesc, new RowSchema(reduceSinkOutputRowResolver
@@ -4544,8 +4684,8 @@ public class SemanticAnalyzer extends Ba
ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap(
OperatorFactory.getAndMakeChild(PlanUtils.getReduceSinkDesc(reduceKeys,
reduceValues, outputColumnNames, true, -1, numPartitionFields,
- numReducers), new RowSchema(reduceSinkOutputRowResolver2
- .getColumnInfos()), groupByOperatorInfo),
+ numReducers, AcidUtils.Operation.NOT_ACID),
+ new RowSchema(reduceSinkOutputRowResolver2.getColumnInfos()), groupByOperatorInfo),
reduceSinkOutputRowResolver2);
rsOp.setColumnExprMap(colExprMap);
@@ -5529,9 +5669,14 @@ public class SemanticAnalyzer extends Ba
if ((dest_tab.getNumBuckets() > 0) &&
(conf.getBoolVar(HiveConf.ConfVars.HIVEENFORCEBUCKETING))) {
enforceBucketing = true;
- partnCols = getPartitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input, true);
- partnColsNoConvert = getPartitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input,
- false);
+ if (updating() || deleting()) {
+ partnCols = getPartitionColsFromBucketColsForUpdateDelete(input, true);
+ partnColsNoConvert = getPartitionColsFromBucketColsForUpdateDelete(input, false);
+ } else {
+ partnCols = getPartitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input, true);
+ partnColsNoConvert = getPartitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input,
+ false);
+ }
}
if ((dest_tab.getSortCols() != null) &&
@@ -5553,6 +5698,7 @@ public class SemanticAnalyzer extends Ba
}
int numBuckets = dest_tab.getNumBuckets();
if (numBuckets > maxReducers) {
+ LOG.debug("XXXXXX numBuckets is " + numBuckets + " and maxReducers is " + maxReducers);
multiFileSpray = true;
totalFiles = numBuckets;
if (totalFiles % maxReducers == 0) {
@@ -5718,7 +5864,11 @@ public class SemanticAnalyzer extends Ba
// Create the work for moving the table
// NOTE: specify Dynamic partitions in dest_tab for WriteEntity
if (!isNonNativeTable) {
- ltd = new LoadTableDesc(queryTmpdir,table_desc, dpCtx);
+ AcidUtils.Operation acidOp = getAcidType(table_desc.getOutputFileFormatClass());
+ if (acidOp != AcidUtils.Operation.NOT_ACID) {
+ checkIfAcidAndOverwriting(qb, table_desc);
+ }
+ ltd = new LoadTableDesc(queryTmpdir,table_desc, dpCtx, acidOp);
ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),
dest_tab.getTableName()));
ltd.setLbCtx(lbCtx);
@@ -5821,7 +5971,11 @@ public class SemanticAnalyzer extends Ba
lbCtx = constructListBucketingCtx(dest_part.getSkewedColNames(),
dest_part.getSkewedColValues(), dest_part.getSkewedColValueLocationMaps(),
dest_part.isStoredAsSubDirectories(), conf);
- ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec());
+ AcidUtils.Operation acidOp = getAcidType(table_desc.getOutputFileFormatClass());
+ if (acidOp != AcidUtils.Operation.NOT_ACID) {
+ checkIfAcidAndOverwriting(qb, table_desc);
+ }
+ ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp);
ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),
dest_tab.getTableName()));
ltd.setLbCtx(lbCtx);
@@ -5973,18 +6127,25 @@ public class SemanticAnalyzer extends Ba
ArrayList<ColumnInfo> vecCol = new ArrayList<ColumnInfo>();
- try {
- StructObjectInspector rowObjectInspector = (StructObjectInspector) table_desc
- .getDeserializer().getObjectInspector();
- List<? extends StructField> fields = rowObjectInspector
- .getAllStructFieldRefs();
- for (int i = 0; i < fields.size(); i++) {
- vecCol.add(new ColumnInfo(fields.get(i).getFieldName(), TypeInfoUtils
- .getTypeInfoFromObjectInspector(fields.get(i)
- .getFieldObjectInspector()), "", false));
+ if (updating() || deleting()) {
+ vecCol.add(new ColumnInfo(VirtualColumn.ROWID.getName(),
+ //TypeInfoUtils.getTypeInfoFromObjectInspector(VirtualColumn.ROWID.getObjectInspector()),
+ VirtualColumn.ROWID.getTypeInfo(),
+ "", true));
+ } else {
+ try {
+ StructObjectInspector rowObjectInspector = (StructObjectInspector) table_desc
+ .getDeserializer().getObjectInspector();
+ List<? extends StructField> fields = rowObjectInspector
+ .getAllStructFieldRefs();
+ for (int i = 0; i < fields.size(); i++) {
+ vecCol.add(new ColumnInfo(fields.get(i).getFieldName(), TypeInfoUtils
+ .getTypeInfoFromObjectInspector(fields.get(i)
+ .getFieldObjectInspector()), "", false));
+ }
+ } catch (Exception e) {
+ throw new SemanticException(e.getMessage(), e);
}
- } catch (Exception e) {
- throw new SemanticException(e.getMessage(), e);
}
RowSchema fsRS = new RowSchema(vecCol);
@@ -5997,6 +6158,10 @@ public class SemanticAnalyzer extends Ba
(dest_tab.getSortCols() != null && dest_tab.getSortCols().size() > 0 &&
conf.getBoolVar(HiveConf.ConfVars.HIVEENFORCESORTING))));
+ // If this table is working with ACID semantics, turn off merging
+ boolean acidTable = isAcidTable(dest_tab);
+ canBeMerged &= !acidTable;
+
FileSinkDesc fileSinkDesc = new FileSinkDesc(
queryTmpdir,
table_desc,
@@ -6009,6 +6174,15 @@ public class SemanticAnalyzer extends Ba
rsCtx.getPartnCols(),
dpCtx);
+ // If this is an insert, update, or delete on an ACID table then mark that so the
+ // FileSinkOperator knows how to properly write to it.
+ if (acidTable) {
+ AcidUtils.Operation wt = updating() ? AcidUtils.Operation.UPDATE :
+ (deleting() ? AcidUtils.Operation.DELETE : AcidUtils.Operation.INSERT);
+ fileSinkDesc.setWriteType(wt);
+ acidFileSinks.add(fileSinkDesc);
+ }
+
/* Set List Bucketing context. */
if (lbCtx != null) {
lbCtx.processRowSkewedIndex(fsRS);
@@ -6059,6 +6233,17 @@ public class SemanticAnalyzer extends Ba
return output;
}
+ // Check if we are overwriting any tables. If so, throw an exception as that is not allowed
+ // when using an Acid compliant txn manager and operating on an acid table.
+ private void checkIfAcidAndOverwriting(QB qb, TableDesc tableDesc) throws SemanticException {
+ String tableName = tableDesc.getTableName();
+ if (!qb.getParseInfo().isInsertIntoTable(tableName)) {
+ LOG.debug("Couldn't find table " + tableName + " in insertIntoTable");
+ throw new SemanticException(ErrorMsg.NO_INSERT_OVERWRITE_WITH_ACID.getMsg());
+ }
+
+ }
+
/**
* Generate the conversion SelectOperator that converts the columns into the
* types that are expected by the table_desc.
@@ -6086,16 +6271,34 @@ public class SemanticAnalyzer extends Ba
outColumnCnt += dpCtx.getNumDPCols();
}
- if (inColumnCnt != outColumnCnt) {
- String reason = "Table " + dest + " has " + outColumnCnt
- + " columns, but query has " + inColumnCnt + " columns.";
- throw new SemanticException(ErrorMsg.TARGET_TABLE_COLUMN_MISMATCH.getMsg(
- qb.getParseInfo().getDestForClause(dest), reason));
- } else if (dynPart && dpCtx != null) {
- // create the mapping from input ExprNode to dest table DP column
- dpCtx.mapInputToDP(rowFields.subList(tableFields.size(), rowFields.size()));
+ if (deleting()) {
+ // Figure out if we have partition columns in the list or not. If so,
+ // add them into the mapping. Partition columns will be located after the row id.
+ if (rowFields.size() > 1) {
+ // This means we have partition columns to deal with, so set up the mapping from the
+ // input to the partition columns.
+ dpCtx.mapInputToDP(rowFields.subList(1, rowFields.size()));
+ }
+ } else if (updating()) {
+ // In this case we expect the number of in fields to exceed the number of out fields by one
+ // (for the ROW__ID virtual column). If there are more columns than this,
+ // then the extras are for dynamic partitioning
+ if (dynPart && dpCtx != null) {
+ dpCtx.mapInputToDP(rowFields.subList(tableFields.size() + 1, rowFields.size()));
+ }
+ } else {
+ if (inColumnCnt != outColumnCnt) {
+ String reason = "Table " + dest + " has " + outColumnCnt
+ + " columns, but query has " + inColumnCnt + " columns.";
+ throw new SemanticException(ErrorMsg.TARGET_TABLE_COLUMN_MISMATCH.getMsg(
+ qb.getParseInfo().getDestForClause(dest), reason));
+ } else if (dynPart && dpCtx != null) {
+ // create the mapping from input ExprNode to dest table DP column
+ dpCtx.mapInputToDP(rowFields.subList(tableFields.size(), rowFields.size()));
+ }
}
+
// Check column types
boolean converted = false;
int columnNumber = tableFields.size();
@@ -6107,17 +6310,26 @@ public class SemanticAnalyzer extends Ba
MetadataTypedColumnsetSerDe.class);
boolean isLazySimpleSerDe = table_desc.getDeserializerClass().equals(
LazySimpleSerDe.class);
- if (!isMetaDataSerDe) {
+ if (!isMetaDataSerDe && !deleting()) {
+
+ // If we're updating, add the ROW__ID expression, then make the following column accesses
+ // offset by 1 so that we don't try to convert the ROW__ID
+ if (updating()) {
+ expressions.add(new ExprNodeColumnDesc(rowFields.get(0).getType(),
+ rowFields.get(0).getInternalName(), "", true));
+ }
// here only deals with non-partition columns. We deal with partition columns next
for (int i = 0; i < columnNumber; i++) {
+ int rowFieldsOffset = updating() ? i + 1 : i;
ObjectInspector tableFieldOI = tableFields.get(i)
.getFieldObjectInspector();
TypeInfo tableFieldTypeInfo = TypeInfoUtils
.getTypeInfoFromObjectInspector(tableFieldOI);
- TypeInfo rowFieldTypeInfo = rowFields.get(i).getType();
+ TypeInfo rowFieldTypeInfo = rowFields.get(rowFieldsOffset).getType();
ExprNodeDesc column = new ExprNodeColumnDesc(rowFieldTypeInfo,
- rowFields.get(i).getInternalName(), "", false, rowFields.get(i).isSkewedCol());
+ rowFields.get(rowFieldsOffset).getInternalName(), "", false,
+ rowFields.get(rowFieldsOffset).isSkewedCol());
// LazySimpleSerDe can convert any types to String type using
// JSON-format.
if (!tableFieldTypeInfo.equals(rowFieldTypeInfo)
@@ -6147,7 +6359,7 @@ public class SemanticAnalyzer extends Ba
// deal with dynamic partition columns: convert ExprNodeDesc type to String??
if (dynPart && dpCtx != null && dpCtx.getNumDPCols() > 0) {
// DP columns starts with tableFields.size()
- for (int i = tableFields.size(); i < rowFields.size(); ++i) {
+ for (int i = tableFields.size() + (updating() ? 1 : 0); i < rowFields.size(); ++i) {
TypeInfo rowFieldTypeInfo = rowFields.get(i).getType();
ExprNodeDesc column = new ExprNodeColumnDesc(
rowFieldTypeInfo, rowFields.get(i).getInternalName(), "", false);
@@ -6341,6 +6553,27 @@ public class SemanticAnalyzer extends Ba
return genConvertCol(dest, qb, tab, table_desc, input, posns, convert);
}
+ // We have to set up the bucketing columns differently for update and deletes,
+ // as it is always using the ROW__ID column.
+ private ArrayList<ExprNodeDesc> getPartitionColsFromBucketColsForUpdateDelete(
+ Operator input, boolean convert) throws SemanticException {
+ //return genConvertCol(dest, qb, tab, table_desc, input, Arrays.asList(0), convert);
+ // In the case of update and delete the bucketing column is always the first column,
+ // and it isn't in the table info. So rather than asking the table for it,
+ // we'll construct it ourself and send it back. This is based on the work done in
+ // genConvertCol below.
+ ColumnInfo rowField = opParseCtx.get(input).getRowResolver().getColumnInfos().get(0);
+ TypeInfo rowFieldTypeInfo = rowField.getType();
+ ExprNodeDesc column = new ExprNodeColumnDesc(rowFieldTypeInfo, rowField.getInternalName(),
+ rowField.getTabAlias(), true);
+ if (convert) {
+ column = ParseUtils.createConversionCast(column, TypeInfoFactory.intTypeInfo);
+ }
+ ArrayList<ExprNodeDesc> rlist = new ArrayList<ExprNodeDesc>(1);
+ rlist.add(column);
+ return rlist;
+ }
+
private ArrayList<ExprNodeDesc> genConvertCol(String dest, QB qb, Table tab,
TableDesc table_desc, Operator input, List<Integer> posns, boolean convert)
throws SemanticException {
@@ -6463,9 +6696,11 @@ public class SemanticAnalyzer extends Ba
order.append(sortOrder == BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC ? '+' : '-');
}
+ AcidUtils.Operation acidOp = (isAcidTable(tab) ? getAcidType() : AcidUtils.Operation.NOT_ACID);
+
Operator interim = putOpInsertMap(OperatorFactory.getAndMakeChild(PlanUtils
.getReduceSinkDesc(sortCols, valueCols, outputColumns, false, -1,
- partitionCols, order.toString(), numReducers),
+ partitionCols, order.toString(), numReducers, acidOp),
new RowSchema(inputRR.getColumnInfos()), input), inputRR);
interim.setColumnExprMap(colExprMap);
reduceSinkOperatorsAddedByEnforceBucketingSorting.add((ReduceSinkOperator) interim);
@@ -6621,8 +6856,9 @@ public class SemanticAnalyzer extends Ba
dummy.setParentOperators(null);
+ // TODO Not 100% sure NOT_ACID is always right here.
ReduceSinkDesc rsdesc = PlanUtils.getReduceSinkDesc(sortCols, valueCols, outputColumns,
- false, -1, partitionCols, order.toString(), numReducers);
+ false, -1, partitionCols, order.toString(), numReducers, AcidUtils.Operation.NOT_ACID);
Operator interim = putOpInsertMap(OperatorFactory.getAndMakeChild(rsdesc,
new RowSchema(rsRR.getColumnInfos()), input), rsRR);
@@ -6887,7 +7123,7 @@ public class SemanticAnalyzer extends Ba
ReduceSinkDesc rsDesc = PlanUtils.getReduceSinkDesc(reduceKeys,
reduceValues, outputColumns, false, tag,
- reduceKeys.size(), numReds);
+ reduceKeys.size(), numReds, AcidUtils.Operation.NOT_ACID);
ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap(
OperatorFactory.getAndMakeChild(rsDesc, new RowSchema(outputRR
@@ -8075,7 +8311,8 @@ public class SemanticAnalyzer extends Ba
ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap(
OperatorFactory.getAndMakeChild(PlanUtils.getReduceSinkDesc(reduceKeys,
- reduceValues, outputColumnNames, true, -1, reduceKeys.size(), -1),
+ reduceValues, outputColumnNames, true, -1, reduceKeys.size(), -1,
+ AcidUtils.Operation.NOT_ACID),
new RowSchema(reduceSinkOutputRowResolver.getColumnInfos()), input),
reduceSinkOutputRowResolver);
@@ -11375,7 +11612,7 @@ public class SemanticAnalyzer extends Ba
input = putOpInsertMap(OperatorFactory.getAndMakeChild(PlanUtils
.getReduceSinkDesc(orderCols,
valueCols, outputColumnNames, false,
- -1, partCols, orderString.toString(), -1),
+ -1, partCols, orderString.toString(), -1, AcidUtils.Operation.NOT_ACID),
new RowSchema(rsOpRR.getColumnInfos()), input), rsOpRR);
input.setColumnExprMap(colExprMap);
}
@@ -11500,7 +11737,7 @@ public class SemanticAnalyzer extends Ba
input = putOpInsertMap(OperatorFactory.getAndMakeChild(PlanUtils
.getReduceSinkDesc(orderCols,
valueCols, outputColumnNames, false,
- -1, partCols, orderString.toString(), -1),
+ -1, partCols, orderString.toString(), -1, AcidUtils.Operation.NOT_ACID),
new RowSchema(rsNewRR.getColumnInfos()), input), rsNewRR);
input.setColumnExprMap(colExprMap);
@@ -11657,4 +11894,49 @@ public class SemanticAnalyzer extends Ba
else return (ltd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE :
WriteEntity.WriteType.INSERT);
}
+
+ // Even if the table is of Acid type, if we aren't working with an Acid compliant TxnManager
+ // then return false.
+ private boolean isAcidTable(Table tab) {
+ if (tab == null || tab.getOutputFormatClass() == null) return false;
+ if (!SessionState.get().getTxnMgr().supportsAcid()) return false;
+ return isAcidOutputFormat(tab.getOutputFormatClass());
+ }
+
+ private boolean isAcidOutputFormat(Class<? extends HiveOutputFormat> of) {
+ Class<?>[] interfaces = of.getInterfaces();
+ for (Class<?> iface : interfaces) {
+ if (iface.equals(AcidOutputFormat.class)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ // Note that this method assumes you have already decided this is an Acid table. It cannot
+ // figure out if a table is Acid or not.
+ private AcidUtils.Operation getAcidType() {
+ return deleting() ? AcidUtils.Operation.DELETE :
+ (updating() ? AcidUtils.Operation.UPDATE :
+ AcidUtils.Operation.INSERT);
+ }
+
+ private AcidUtils.Operation getAcidType(Class<? extends HiveOutputFormat> of) {
+ if (SessionState.get() == null || !SessionState.get().getTxnMgr().supportsAcid()) {
+ return AcidUtils.Operation.NOT_ACID;
+ } else if (isAcidOutputFormat(of)) {
+ return getAcidType();
+ } else {
+ return AcidUtils.Operation.NOT_ACID;
+ }
+ }
+
+ protected boolean updating() {
+ return false;
+ }
+
+ protected boolean deleting() {
+ return false;
+ }
+
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java Sat Sep 13 22:09:31 2014
@@ -268,6 +268,11 @@ public final class SemanticAnalyzerFacto
case HiveParser.TOK_CREATEMACRO:
case HiveParser.TOK_DROPMACRO:
return new MacroSemanticAnalyzer(conf);
+
+ case HiveParser.TOK_UPDATE_TABLE:
+ case HiveParser.TOK_DELETE_FROM:
+ return new UpdateDeleteSemanticAnalyzer(conf);
+
default:
return new SemanticAnalyzer(conf);
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/StorageFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/StorageFormat.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/StorageFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/StorageFormat.java Sat Sep 13 22:09:31 2014
@@ -80,7 +80,7 @@ public class StorageFormat {
return true;
}
- private void processStorageFormat(String name) throws SemanticException {
+ protected void processStorageFormat(String name) throws SemanticException {
if (name.isEmpty()) {
throw new SemanticException("File format in STORED AS clause cannot be empty");
}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java?rev=1624788&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java Sat Sep 13 22:09:31 2014
@@ -0,0 +1,345 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.hooks.Entity;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+
+/**
+ * A subclass of the {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer} that just handles
+ * update and delete statements. It works by rewriting the updates and deletes into insert
+ * statements (since they are actually inserts) and then doing some patch up to make them work as
+ * updates and deletes instead.
+ */
+public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
+
+ boolean useSuper = false;
+
+ public UpdateDeleteSemanticAnalyzer(HiveConf conf) throws SemanticException {
+ super(conf);
+ }
+
+ @Override
+ public void analyzeInternal(ASTNode tree) throws SemanticException {
+ if (useSuper) {
+ super.analyzeInternal(tree);
+ } else {
+
+ if (!SessionState.get().getTxnMgr().supportsAcid()) {
+ throw new SemanticException(ErrorMsg.ACID_OP_ON_NONACID_TXNMGR.getMsg());
+ }
+ switch (tree.getToken().getType()) {
+ case HiveParser.TOK_DELETE_FROM:
+ analyzeDelete(tree);
+ return;
+
+ case HiveParser.TOK_UPDATE_TABLE:
+ analyzeUpdate(tree);
+ return;
+
+ default:
+ throw new RuntimeException("Asked to parse token " + tree.getName() + " in " +
+ "UpdateDeleteSemanticAnalyzer");
+ }
+ }
+ }
+
+ @Override
+ protected boolean updating() {
+ return ctx.getAcidOperation() == AcidUtils.Operation.UPDATE;
+ }
+
+ @Override
+ protected boolean deleting() {
+ return ctx.getAcidOperation() == AcidUtils.Operation.DELETE;
+ }
+
+ private void analyzeUpdate(ASTNode tree) throws SemanticException {
+ ctx.setAcidOperation(AcidUtils.Operation.UPDATE);
+ reparseAndSuperAnalyze(tree);
+ }
+
+ private void analyzeDelete(ASTNode tree) throws SemanticException {
+ ctx.setAcidOperation(AcidUtils.Operation.DELETE);
+ reparseAndSuperAnalyze(tree);
+ }
+
+ private void reparseAndSuperAnalyze(ASTNode tree) throws SemanticException {
+ List<? extends Node> children = tree.getChildren();
+ // The first child should be the table we are deleting from
+ ASTNode tabName = (ASTNode)children.get(0);
+ assert tabName.getToken().getType() == HiveParser.TOK_TABNAME :
+ "Expected tablename as first child of " + operation() + " but found " + tabName.getName();
+ String[] tableName = getQualifiedTableName(tabName);
+
+ // Rewrite the delete or update into an insert. Crazy, but it works as deletes and update
+ // actually are inserts into the delta file in Hive. A delete
+ // DELETE FROM _tablename_ [WHERE ...]
+ // will be rewritten as
+ // INSERT INTO TABLE _tablename_ [PARTITION (_partcols_)] SELECT ROW__ID[,
+ // _partcols_] from _tablename_ SORT BY ROW__ID
+ // An update
+ // UPDATE _tablename_ SET x = _expr_ [WHERE...]
+ // will be rewritten as
+ // INSERT INTO TABLE _tablename_ [PARTITION (_partcols_)] SELECT _all_,
+ // _partcols_from _tablename_ SORT BY ROW__ID
+ // where _all_ is all the non-partition columns. The expressions from the set clause will be
+ // re-attached later.
+ // The where clause will also be re-attached later.
+ // The sort by clause is put in there so that records come out in the right order to enable
+ // merge on read.
+
+ StringBuilder rewrittenQueryStr = new StringBuilder();
+ Table mTable;
+ try {
+ mTable = db.getTable(tableName[0], tableName[1]);
+ } catch (HiveException e) {
+ throw new SemanticException(ErrorMsg.UPDATEDELETE_PARSE_ERROR.getMsg(), e);
+ }
+ List<FieldSchema> partCols = mTable.getPartCols();
+
+ rewrittenQueryStr.append("insert into table ");
+ rewrittenQueryStr.append(getDotName(tableName));
+
+ // If the table is partitioned we have to put the partition() clause in
+ if (partCols != null && partCols.size() > 0) {
+ rewrittenQueryStr.append(" partition (");
+ boolean first = true;
+ for (FieldSchema fschema : partCols) {
+ if (first) first = false;
+ else rewrittenQueryStr.append(", ");
+ rewrittenQueryStr.append(fschema.getName());
+ }
+ rewrittenQueryStr.append(")");
+ }
+
+ rewrittenQueryStr.append(" select ROW__ID");
+ Map<Integer, ASTNode> setColExprs = null;
+ if (updating()) {
+ // An update needs to select all of the columns, as we rewrite the entire row. Also,
+ // we need to figure out which columns we are going to replace. We won't write the set
+ // expressions in the rewritten query. We'll patch that up later.
+ // The set list from update should be the second child (index 1)
+ assert children.size() >= 2 : "Expected update token to have at least two children";
+ ASTNode setClause = (ASTNode)children.get(1);
+ assert setClause.getToken().getType() == HiveParser.TOK_SET_COLUMNS_CLAUSE :
+ "Expected second child of update token to be set token";
+
+ // Get the children of the set clause, each of which should be a column assignment
+ List<? extends Node> assignments = setClause.getChildren();
+ Map<String, ASTNode> setCols = new HashMap<String, ASTNode>(assignments.size());
+ setColExprs = new HashMap<Integer, ASTNode>(assignments.size());
+ for (Node a : assignments) {
+ ASTNode assignment = (ASTNode)a;
+ assert assignment.getToken().getType() == HiveParser.EQUAL :
+ "Expected set assignments to use equals operator but found " + assignment.getName();
+ ASTNode tableOrColTok = (ASTNode)assignment.getChildren().get(0);
+ assert tableOrColTok.getToken().getType() == HiveParser.TOK_TABLE_OR_COL :
+ "Expected left side of assignment to be table or column";
+ ASTNode colName = (ASTNode)tableOrColTok.getChildren().get(0);
+ assert colName.getToken().getType() == HiveParser.Identifier :
+ "Expected column name";
+
+ String columnName = colName.getText();
+
+ // Make sure this isn't one of the partitioning columns, that's not supported.
+ if (partCols != null) {
+ for (FieldSchema fschema : partCols) {
+ if (fschema.getName().equalsIgnoreCase(columnName)) {
+ throw new SemanticException(ErrorMsg.UPDATE_CANNOT_UPDATE_PART_VALUE.getMsg());
+ }
+ }
+ }
+
+ // This means that in UPDATE T SET x = _something_
+ // _something_ can be whatever is supported in SELECT _something_
+ setCols.put(columnName, (ASTNode)assignment.getChildren().get(1));
+ }
+
+ List<FieldSchema> nonPartCols = mTable.getCols();
+ for (int i = 0; i < nonPartCols.size(); i++) {
+ rewrittenQueryStr.append(',');
+ String name = nonPartCols.get(i).getName();
+ ASTNode setCol = setCols.get(name);
+ rewrittenQueryStr.append(name);
+ if (setCol != null) {
+ // This is one of the columns we're setting, record it's position so we can come back
+ // later and patch it up.
+ // Add one to the index because the select has the ROW__ID as the first column.
+ setColExprs.put(i + 1, setCol);
+ }
+ }
+ }
+
+ // If the table is partitioned, we need to select the partition columns as well.
+ if (partCols != null) {
+ for (FieldSchema fschema : partCols) {
+ rewrittenQueryStr.append(", ");
+ rewrittenQueryStr.append(fschema.getName());
+ }
+ }
+ rewrittenQueryStr.append(" from ");
+ rewrittenQueryStr.append(getDotName(tableName));
+
+ ASTNode where = null;
+ int whereIndex = deleting() ? 1 : 2;
+ if (children.size() > whereIndex) {
+ where = (ASTNode)children.get(whereIndex);
+ assert where.getToken().getType() == HiveParser.TOK_WHERE :
+ "Expected where clause, but found " + where.getName();
+ }
+
+ // Add a sort by clause so that the row ids come out in the correct order
+ rewrittenQueryStr.append(" sort by ROW__ID desc ");
+
+ // Parse the rewritten query string
+ Context rewrittenCtx;
+ try {
+ // Set dynamic partitioning to nonstrict so that queries do not need any partition
+ // references.
+ HiveConf.setVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+ rewrittenCtx = new Context(conf);
+ } catch (IOException e) {
+ throw new SemanticException(ErrorMsg.UPDATEDELETE_IO_ERROR.getMsg());
+ }
+ rewrittenCtx.setCmd(rewrittenQueryStr.toString());
+ rewrittenCtx.setAcidOperation(ctx.getAcidOperation());
+
+ ParseDriver pd = new ParseDriver();
+ ASTNode rewrittenTree;
+ try {
+ LOG.info("Going to reparse " + operation() + " as <" + rewrittenQueryStr.toString() + ">");
+ rewrittenTree = pd.parse(rewrittenQueryStr.toString(), rewrittenCtx);
+ rewrittenTree = ParseUtils.findRootNonNullToken(rewrittenTree);
+
+ } catch (ParseException e) {
+ throw new SemanticException(ErrorMsg.UPDATEDELETE_PARSE_ERROR.getMsg(), e);
+ }
+
+ ASTNode rewrittenInsert = (ASTNode)rewrittenTree.getChildren().get(1);
+ assert rewrittenInsert.getToken().getType() == HiveParser.TOK_INSERT :
+ "Expected TOK_INSERT as second child of TOK_QUERY but found " + rewrittenInsert.getName();
+
+ if (where != null) {
+ // The structure of the AST for the rewritten insert statement is:
+ // TOK_QUERY -> TOK_FROM
+ // \-> TOK_INSERT -> TOK_INSERT_INTO
+ // \-> TOK_SELECT
+ // \-> TOK_SORTBY
+ // The following adds the TOK_WHERE and its subtree from the original query as a child of
+ // TOK_INSERT, which is where it would have landed if it had been there originally in the
+ // string. We do it this way because it's easy then turning the original AST back into a
+ // string and reparsing it. We have to move the SORT_BY over one,
+ // so grab it and then push it to the second slot, and put the where in the first slot
+ ASTNode sortBy = (ASTNode)rewrittenInsert.getChildren().get(2);
+ assert sortBy.getToken().getType() == HiveParser.TOK_SORTBY :
+ "Expected TOK_SORTBY to be first child of TOK_SELECT, but found " + sortBy.getName();
+ rewrittenInsert.addChild(sortBy);
+ rewrittenInsert.setChild(2, where);
+ }
+
+ // Patch up the projection list for updates, putting back the original set expressions.
+ if (updating() && setColExprs != null) {
+ // Walk through the projection list and replace the column names with the
+ // expressions from the original update. Under the TOK_SELECT (see above) the structure
+ // looks like:
+ // TOK_SELECT -> TOK_SELEXPR -> expr
+ // \-> TOK_SELEXPR -> expr ...
+ ASTNode rewrittenSelect = (ASTNode)rewrittenInsert.getChildren().get(1);
+ assert rewrittenSelect.getToken().getType() == HiveParser.TOK_SELECT :
+ "Expected TOK_SELECT as second child of TOK_INSERT but found " +
+ rewrittenSelect.getName();
+ for (Map.Entry<Integer, ASTNode> entry : setColExprs.entrySet()) {
+ ASTNode selExpr = (ASTNode)rewrittenSelect.getChildren().get(entry.getKey());
+ assert selExpr.getToken().getType() == HiveParser.TOK_SELEXPR :
+ "Expected child of TOK_SELECT to be TOK_SELEXPR but was " + selExpr.getName();
+ // Now, change it's child
+ selExpr.setChild(0, entry.getValue());
+ }
+ }
+
+ try {
+ useSuper = true;
+ super.analyze(rewrittenTree, rewrittenCtx);
+ } finally {
+ useSuper = false;
+ }
+
+ // Walk through all our inputs and set them to note that this read is part of an update or a
+ // delete.
+ for (ReadEntity input : inputs) {
+ input.setUpdateOrDelete(true);
+ }
+
+ if (inputIsPartitioned(inputs)) {
+ // In order to avoid locking the entire write table we need to replace the single WriteEntity
+ // with a WriteEntity for each partition
+ outputs.clear();
+ for (ReadEntity input : inputs) {
+ if (input.getTyp() == Entity.Type.PARTITION) {
+ WriteEntity.WriteType writeType = deleting() ? WriteEntity.WriteType.DELETE :
+ WriteEntity.WriteType.UPDATE;
+ outputs.add(new WriteEntity(input.getPartition(), writeType));
+ }
+ }
+ } else {
+ // We still need to patch up the WriteEntities as they will have an insert type. Change
+ // them to the appropriate type for our operation.
+ for (WriteEntity output : outputs) {
+ output.setWriteType(deleting() ? WriteEntity.WriteType.DELETE :
+ WriteEntity.WriteType.UPDATE);
+ }
+ }
+ }
+
+ private String operation() {
+ if (updating()) return "update";
+ else if (deleting()) return "delete";
+ else throw new IllegalStateException("UpdateDeleteSemanticAnalyzer neither updating nor " +
+ "deleting, operation not known.");
+ }
+
+ private boolean inputIsPartitioned(Set<ReadEntity> inputs) {
+ // We cannot simply look at the first entry, as in the case where the input is partitioned
+ // there will be a table entry as well. So look for at least one partition entry.
+ for (ReadEntity re : inputs) {
+ if (re.getTyp() == Entity.Type.PARTITION) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java Sat Sep 13 22:09:31 2014
@@ -23,6 +23,7 @@ import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
/**
* LoadTableDesc.
@@ -37,6 +38,9 @@ public class LoadTableDesc extends org.a
private boolean holdDDLTime;
private boolean inheritTableSpecs = true; //For partitions, flag controlling whether the current
//table specs are to be used
+ // Need to remember whether this is an acid compliant operation, and if so whether it is an
+ // insert, update, or delete.
+ private AcidUtils.Operation writeType;
// TODO: the below seems like they should just be combined into partitionDesc
private org.apache.hadoop.hive.ql.plan.TableDesc table;
@@ -48,36 +52,69 @@ public class LoadTableDesc extends org.a
public LoadTableDesc(final Path sourcePath,
final org.apache.hadoop.hive.ql.plan.TableDesc table,
- final Map<String, String> partitionSpec, final boolean replace) {
+ final Map<String, String> partitionSpec,
+ final boolean replace,
+ final AcidUtils.Operation writeType) {
super(sourcePath);
- init(table, partitionSpec, replace);
+ init(table, partitionSpec, replace, writeType);
+ }
+
+ /**
+ * For use with non-ACID compliant operations, such as LOAD
+ * @param sourcePath
+ * @param table
+ * @param partitionSpec
+ * @param replace
+ */
+ public LoadTableDesc(final Path sourcePath,
+ final TableDesc table,
+ final Map<String, String> partitionSpec,
+ final boolean replace) {
+ this(sourcePath, table, partitionSpec, replace, AcidUtils.Operation.NOT_ACID);
}
public LoadTableDesc(final Path sourcePath,
final org.apache.hadoop.hive.ql.plan.TableDesc table,
- final Map<String, String> partitionSpec) {
- this(sourcePath, table, partitionSpec, true);
+ final Map<String, String> partitionSpec,
+ final AcidUtils.Operation writeType) {
+ this(sourcePath, table, partitionSpec, true, writeType);
+ }
+
+ /**
+ * For DDL operations that are not ACID compliant.
+ * @param sourcePath
+ * @param table
+ * @param partitionSpec
+ */
+ public LoadTableDesc(final Path sourcePath,
+ final org.apache.hadoop.hive.ql.plan.TableDesc table,
+ final Map<String, String> partitionSpec) {
+ this(sourcePath, table, partitionSpec, true, AcidUtils.Operation.NOT_ACID);
}
public LoadTableDesc(final Path sourcePath,
final org.apache.hadoop.hive.ql.plan.TableDesc table,
- final DynamicPartitionCtx dpCtx) {
+ final DynamicPartitionCtx dpCtx,
+ final AcidUtils.Operation writeType) {
super(sourcePath);
this.dpCtx = dpCtx;
if (dpCtx != null && dpCtx.getPartSpec() != null && partitionSpec == null) {
- init(table, dpCtx.getPartSpec(), true);
+ init(table, dpCtx.getPartSpec(), true, writeType);
} else {
- init(table, new LinkedHashMap<String, String>(), true);
+ init(table, new LinkedHashMap<String, String>(), true, writeType);
}
}
private void init(
final org.apache.hadoop.hive.ql.plan.TableDesc table,
- final Map<String, String> partitionSpec, final boolean replace) {
+ final Map<String, String> partitionSpec,
+ final boolean replace,
+ AcidUtils.Operation writeType) {
this.table = table;
this.partitionSpec = partitionSpec;
this.replace = replace;
this.holdDDLTime = false;
+ this.writeType = writeType;
}
public void setHoldDDLTime(boolean ddlTime) {
@@ -144,4 +181,8 @@ public class LoadTableDesc extends org.a
public void setLbCtx(ListBucketingCtx lbCtx) {
this.lbCtx = lbCtx;
}
+
+ public AcidUtils.Operation getWriteType() {
+ return writeType;
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java Sat Sep 13 22:09:31 2014
@@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.exec.Ro
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat;
@@ -597,19 +598,22 @@ public final class PlanUtils {
* @param numReducers
* The number of reducers, set to -1 for automatic inference based on
* input data size.
+ * @param writeType Whether this is an Acid write, and if so whether it is insert, update,
+ * or delete.
* @return The reduceSinkDesc object.
*/
public static ReduceSinkDesc getReduceSinkDesc(
ArrayList<ExprNodeDesc> keyCols, ArrayList<ExprNodeDesc> valueCols,
List<String> outputColumnNames, boolean includeKeyCols, int tag,
- ArrayList<ExprNodeDesc> partitionCols, String order, int numReducers) {
+ ArrayList<ExprNodeDesc> partitionCols, String order, int numReducers,
+ AcidUtils.Operation writeType) {
return getReduceSinkDesc(keyCols, keyCols.size(), valueCols,
new ArrayList<List<Integer>>(),
includeKeyCols ? outputColumnNames.subList(0, keyCols.size()) :
new ArrayList<String>(),
includeKeyCols ? outputColumnNames.subList(keyCols.size(),
outputColumnNames.size()) : outputColumnNames,
- includeKeyCols, tag, partitionCols, order, numReducers);
+ includeKeyCols, tag, partitionCols, order, numReducers, writeType);
}
/**
@@ -635,6 +639,8 @@ public final class PlanUtils {
* @param numReducers
* The number of reducers, set to -1 for automatic inference based on
* input data size.
+ * @param writeType Whether this is an Acid write, and if so whether it is insert, update,
+ * or delete.
* @return The reduceSinkDesc object.
*/
public static ReduceSinkDesc getReduceSinkDesc(
@@ -644,7 +650,8 @@ public final class PlanUtils {
List<String> outputKeyColumnNames,
List<String> outputValueColumnNames,
boolean includeKeyCols, int tag,
- ArrayList<ExprNodeDesc> partitionCols, String order, int numReducers) {
+ ArrayList<ExprNodeDesc> partitionCols, String order, int numReducers,
+ AcidUtils.Operation writeType) {
TableDesc keyTable = null;
TableDesc valueTable = null;
ArrayList<String> outputKeyCols = new ArrayList<String>();
@@ -670,7 +677,7 @@ public final class PlanUtils {
return new ReduceSinkDesc(keyCols, numKeys, valueCols, outputKeyCols,
distinctColIndices, outputValCols,
tag, partitionCols, numReducers, keyTable,
- valueTable);
+ valueTable, writeType);
}
/**
@@ -690,12 +697,15 @@ public final class PlanUtils {
* @param numReducers
* The number of reducers, set to -1 for automatic inference based on
* input data size.
+ * @param writeType Whether this is an Acid write, and if so whether it is insert, update,
+ * or delete.
* @return The reduceSinkDesc object.
*/
public static ReduceSinkDesc getReduceSinkDesc(
ArrayList<ExprNodeDesc> keyCols, ArrayList<ExprNodeDesc> valueCols,
List<String> outputColumnNames, boolean includeKey, int tag,
- int numPartitionFields, int numReducers) throws SemanticException {
+ int numPartitionFields, int numReducers, AcidUtils.Operation writeType)
+ throws SemanticException {
return getReduceSinkDesc(keyCols, keyCols.size(), valueCols,
new ArrayList<List<Integer>>(),
includeKey ? outputColumnNames.subList(0, keyCols.size()) :
@@ -703,7 +713,7 @@ public final class PlanUtils {
includeKey ?
outputColumnNames.subList(keyCols.size(), outputColumnNames.size())
: outputColumnNames,
- includeKey, tag, numPartitionFields, numReducers);
+ includeKey, tag, numPartitionFields, numReducers, writeType);
}
/**
@@ -729,6 +739,8 @@ public final class PlanUtils {
* @param numReducers
* The number of reducers, set to -1 for automatic inference based on
* input data size.
+ * @param writeType Whether this is an Acid write, and if so whether it is insert, update,
+ * or delete.
* @return The reduceSinkDesc object.
*/
public static ReduceSinkDesc getReduceSinkDesc(
@@ -737,7 +749,8 @@ public final class PlanUtils {
List<List<Integer>> distinctColIndices,
List<String> outputKeyColumnNames, List<String> outputValueColumnNames,
boolean includeKey, int tag,
- int numPartitionFields, int numReducers) throws SemanticException {
+ int numPartitionFields, int numReducers, AcidUtils.Operation writeType)
+ throws SemanticException {
ArrayList<ExprNodeDesc> partitionCols = new ArrayList<ExprNodeDesc>();
if (numPartitionFields >= keyCols.size()) {
@@ -755,7 +768,7 @@ public final class PlanUtils {
}
return getReduceSinkDesc(keyCols, numKeys, valueCols, distinctColIndices,
outputKeyColumnNames, outputValueColumnNames, includeKey, tag,
- partitionCols, order.toString(), numReducers);
+ partitionCols, order.toString(), numReducers, writeType);
}
/**
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java Sat Sep 13 22:09:31 2014
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
/**
@@ -91,6 +92,9 @@ public class ReduceSinkDesc extends Abst
private boolean skipTag; // Skip writing tags when feeding into mapjoin hashtable
private Boolean autoParallel = null; // Is reducer auto-parallelism enabled, disabled or unset
+ // Write type, since this needs to calculate buckets differently for updates and deletes
+ private AcidUtils.Operation writeType;
+
private static transient Log LOG = LogFactory.getLog(ReduceSinkDesc.class);
public ReduceSinkDesc() {
}
@@ -102,7 +106,8 @@ public class ReduceSinkDesc extends Abst
List<List<Integer>> distinctColumnIndices,
ArrayList<String> outputValueColumnNames, int tag,
ArrayList<ExprNodeDesc> partitionCols, int numReducers,
- final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
+ final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo,
+ AcidUtils.Operation writeType) {
this.keyCols = keyCols;
this.numDistributionKeys = numDistributionKeys;
this.valueCols = valueCols;
@@ -116,6 +121,7 @@ public class ReduceSinkDesc extends Abst
this.distinctColumnIndices = distinctColumnIndices;
this.setNumBuckets(-1);
this.setBucketCols(null);
+ this.writeType = writeType;
}
@Override
@@ -367,4 +373,8 @@ public class ReduceSinkDesc extends Abst
this.autoParallel = autoParallel;
}
}
+
+ public AcidUtils.Operation getWriteType() {
+ return writeType;
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java Sat Sep 13 22:09:31 2014
@@ -208,6 +208,11 @@ public class SessionState {
private String hdfsScratchDirURIString;
/**
+ * Next value to use in naming a temporary table created by an insert...values statement
+ */
+ private int nextValueTempTableSuffix = 1;
+
+ /**
* Transaction manager to use for this session. This is instantiated lazily by
* {@link #initTxnMgr(org.apache.hadoop.hive.conf.HiveConf)}
*/
@@ -1341,4 +1346,12 @@ public class SessionState {
this.userIpAddress = userIpAddress;
}
+ /**
+ * Get the next suffix to use in naming a temporary table created by insert...values
+ * @return suffix
+ */
+ public String getNextValuesTempTableSuffix() {
+ return Integer.toString(nextValueTempTableSuffix++);
+ }
+
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java Sat Sep 13 22:09:31 2014
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.exec.ve
import org.apache.hadoop.hive.ql.exec.vector.expressions.CastDecimalToLong;
import org.apache.hadoop.hive.ql.exec.vector.expressions.gen.CastDoubleToLong;
import org.apache.hadoop.hive.ql.exec.vector.expressions.gen.CastTimestampToLongViaLongToLong;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
import org.apache.hadoop.hive.serde2.io.ByteWritable;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
@@ -204,4 +205,19 @@ public class UDFToInteger extends UDF {
}
}
+ /**
+ * Convert a RecordIdentifier. This is done so that we can use the RecordIdentifier in place
+ * of the bucketing column.
+ * @param i RecordIdentifier to convert
+ * @return value of the bucket identifier
+ */
+ public IntWritable evaluate(RecordIdentifier i) {
+ if (i == null) {
+ return null;
+ } else {
+ intWritable.set(i.getBucketId());
+ return intWritable;
+ }
+ }
+
}
Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java?rev=1624788&r1=1624787&r2=1624788&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java Sat Sep 13 22:09:31 2014
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.DriverC
import org.apache.hadoop.hive.ql.WindowsPathUtil;
import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.Table;
@@ -137,7 +138,7 @@ public class TestExecDriver extends Test
db.dropTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, src, true, true);
db.createTable(src, cols, null, TextInputFormat.class,
IgnoreKeyTextOutputFormat.class);
- db.loadTable(hadoopDataFile[i], src, false, false, true, false);
+ db.loadTable(hadoopDataFile[i], src, false, false, true, false, false);
i++;
}
@@ -246,7 +247,7 @@ public class TestExecDriver extends Test
Operator<ReduceSinkDesc> op1 = OperatorFactory.get(PlanUtils
.getReduceSinkDesc(Utilities.makeList(getStringColumn("key")),
Utilities.makeList(getStringColumn("value")), outputColumns, true,
- -1, 1, -1));
+ -1, 1, -1, AcidUtils.Operation.NOT_ACID));
addMapWork(mr, src, "a", op1);
ReduceWork rWork = new ReduceWork();
@@ -276,7 +277,7 @@ public class TestExecDriver extends Test
.getReduceSinkDesc(Utilities.makeList(getStringColumn("key")),
Utilities
.makeList(getStringColumn("key"), getStringColumn("value")),
- outputColumns, false, -1, 1, -1));
+ outputColumns, false, -1, 1, -1, AcidUtils.Operation.NOT_ACID));
addMapWork(mr, src, "a", op1);
ReduceWork rWork = new ReduceWork();
@@ -310,14 +311,14 @@ public class TestExecDriver extends Test
Operator<ReduceSinkDesc> op1 = OperatorFactory.get(PlanUtils
.getReduceSinkDesc(Utilities.makeList(getStringColumn("key")),
Utilities.makeList(getStringColumn("value")), outputColumns, true,
- Byte.valueOf((byte) 0), 1, -1));
+ Byte.valueOf((byte) 0), 1, -1, AcidUtils.Operation.NOT_ACID));
addMapWork(mr, src, "a", op1);
Operator<ReduceSinkDesc> op2 = OperatorFactory.get(PlanUtils
.getReduceSinkDesc(Utilities.makeList(getStringColumn("key")),
Utilities.makeList(getStringColumn("key")), outputColumns, true,
- Byte.valueOf((byte) 1), Integer.MAX_VALUE, -1));
+ Byte.valueOf((byte) 1), Integer.MAX_VALUE, -1, AcidUtils.Operation.NOT_ACID));
addMapWork(mr, src2, "b", op2);
ReduceWork rWork = new ReduceWork();
@@ -353,7 +354,7 @@ public class TestExecDriver extends Test
Operator<ReduceSinkDesc> op1 = OperatorFactory.get(PlanUtils
.getReduceSinkDesc(Utilities.makeList(getStringColumn("tkey")),
Utilities.makeList(getStringColumn("tkey"),
- getStringColumn("tvalue")), outputColumns, false, -1, 1, -1));
+ getStringColumn("tvalue")), outputColumns, false, -1, 1, -1, AcidUtils.Operation.NOT_ACID));
Operator<ScriptDesc> op0 = OperatorFactory.get(new ScriptDesc("cat",
PlanUtils.getDefaultTableDesc("" + Utilities.tabCode, "key,value"),
@@ -398,7 +399,7 @@ public class TestExecDriver extends Test
Operator<ReduceSinkDesc> op0 = OperatorFactory.get(PlanUtils
.getReduceSinkDesc(Utilities.makeList(getStringColumn("0")), Utilities
.makeList(getStringColumn("0"), getStringColumn("1")),
- outputColumns, false, -1, 1, -1));
+ outputColumns, false, -1, 1, -1, AcidUtils.Operation.NOT_ACID));
Operator<SelectDesc> op4 = OperatorFactory.get(new SelectDesc(Utilities
.makeList(getStringColumn("key"), getStringColumn("value")),
@@ -432,7 +433,7 @@ public class TestExecDriver extends Test
Operator<ReduceSinkDesc> op1 = OperatorFactory.get(PlanUtils
.getReduceSinkDesc(Utilities.makeList(getStringColumn("tkey")),
Utilities.makeList(getStringColumn("tkey"),
- getStringColumn("tvalue")), outputColumns, false, -1, 1, -1));
+ getStringColumn("tvalue")), outputColumns, false, -1, 1, -1, AcidUtils.Operation.NOT_ACID));
Operator<ScriptDesc> op0 = OperatorFactory.get(new ScriptDesc(
"\'cat\'", PlanUtils.getDefaultTableDesc("" + Utilities.tabCode,
Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/parse/TestUpdateDeleteSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/parse/TestUpdateDeleteSemanticAnalyzer.java?rev=1624788&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/parse/TestUpdateDeleteSemanticAnalyzer.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/parse/TestUpdateDeleteSemanticAnalyzer.java Sat Sep 13 22:09:31 2014
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import junit.framework.Assert;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.exec.ExplainTask;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.plan.ExplainWork;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestUpdateDeleteSemanticAnalyzer {
+
+ static final private Log LOG = LogFactory.getLog(TestSemanticAnalyzer.class.getName());
+
+ private HiveConf conf;
+ private Hive db;
+
+ // All of the insert, update, and delete tests assume two tables, T and U, each with columns a,
+ // and b. U it partitioned by an additional column ds. These are created by parseAndAnalyze
+ // and removed by cleanupTables().
+
+ @Test
+ public void testInsertSelect() throws Exception {
+ try {
+ ReturnInfo rc = parseAndAnalyze("insert into table T select a, b from U", "testInsertSelect");
+
+ LOG.info(explain((SemanticAnalyzer)rc.sem, rc.plan, rc.ast.dump()));
+
+ } finally {
+ cleanupTables();
+ }
+ }
+
+ @Test
+ public void testDeleteAllNonPartitioned() throws Exception {
+ try {
+ ReturnInfo rc = parseAndAnalyze("delete from T", "testDeleteAllNonPartitioned");
+ LOG.info(explain((SemanticAnalyzer)rc.sem, rc.plan, rc.ast.dump()));
+ } finally {
+ cleanupTables();
+ }
+ }
+
+ @Test
+ public void testDeleteWhereNoPartition() throws Exception {
+ try {
+ ReturnInfo rc = parseAndAnalyze("delete from T where a > 5", "testDeleteWhereNoPartition");
+ LOG.info(explain((SemanticAnalyzer)rc.sem, rc.plan, rc.ast.dump()));
+ } finally {
+ cleanupTables();
+ }
+ }
+
+ @Test
+ public void testDeleteAllPartitioned() throws Exception {
+ try {
+ ReturnInfo rc = parseAndAnalyze("delete from U", "testDeleteAllPartitioned");
+ LOG.info(explain((SemanticAnalyzer)rc.sem, rc.plan, rc.ast.dump()));
+ } finally {
+ cleanupTables();
+ }
+ }
+
+ @Test
+ public void testDeleteAllWherePartitioned() throws Exception {
+ try {
+ ReturnInfo rc = parseAndAnalyze("delete from U where a > 5", "testDeleteAllWherePartitioned");
+ LOG.info(explain((SemanticAnalyzer)rc.sem, rc.plan, rc.ast.dump()));
+ } finally {
+ cleanupTables();
+ }
+ }
+
+ @Test
+ public void testDeleteOnePartition() throws Exception {
+ try {
+ ReturnInfo rc = parseAndAnalyze("delete from U where ds = 'today'",
+ "testDeleteFromPartitionOnly");
+ LOG.info(explain((SemanticAnalyzer)rc.sem, rc.plan, rc.ast.dump()));
+ } finally {
+ cleanupTables();
+ }
+ }
+
+ @Test
+ public void testDeleteOnePartitionWhere() throws Exception {
+ try {
+ ReturnInfo rc = parseAndAnalyze("delete from U where ds = 'today' and a > 5",
+ "testDeletePartitionWhere");
+ LOG.info(explain((SemanticAnalyzer)rc.sem, rc.plan, rc.ast.dump()));
+ } finally {
+ cleanupTables();
+ }
+ }
+
+ @Test
+ public void testUpdateAllNonPartitioned() throws Exception {
+ try {
+ ReturnInfo rc = parseAndAnalyze("update T set a = 5", "testUpdateAllNonPartitioned");
+ LOG.info(explain((SemanticAnalyzer)rc.sem, rc.plan, rc.ast.dump()));
+ } finally {
+ cleanupTables();
+ }
+ }
+
+ @Test
+ public void testUpdateAllNonPartitionedWhere() throws Exception {
+ try {
+ ReturnInfo rc = parseAndAnalyze("update T set a = 5 where b > 5",
+ "testUpdateAllNonPartitionedWhere");
+ LOG.info(explain((SemanticAnalyzer)rc.sem, rc.plan, rc.ast.dump()));
+ } finally {
+ cleanupTables();
+ }
+ }
+
+ @Test
+ public void testUpdateAllPartitioned() throws Exception {
+ try {
+ ReturnInfo rc = parseAndAnalyze("update U set a = 5", "testUpdateAllPartitioned");
+ LOG.info(explain((SemanticAnalyzer)rc.sem, rc.plan, rc.ast.dump()));
+ } finally {
+ cleanupTables();
+ }
+ }
+
+ @Test
+ public void testUpdateAllPartitionedWhere() throws Exception {
+ try {
+ ReturnInfo rc = parseAndAnalyze("update U set a = 5 where b > 5",
+ "testUpdateAllPartitionedWhere");
+ LOG.info(explain((SemanticAnalyzer)rc.sem, rc.plan, rc.ast.dump()));
+ } finally {
+ cleanupTables();
+ }
+ }
+
+ @Test
+ public void testUpdateOnePartition() throws Exception {
+ try {
+ ReturnInfo rc = parseAndAnalyze("update U set a = 5 where ds = 'today'",
+ "testUpdateOnePartition");
+ LOG.info(explain((SemanticAnalyzer)rc.sem, rc.plan, rc.ast.dump()));
+ } finally {
+ cleanupTables();
+ }
+ }
+
+ @Test
+ public void testUpdateOnePartitionWhere() throws Exception {
+ try {
+ ReturnInfo rc = parseAndAnalyze("update U set a = 5 where ds = 'today' and b > 5",
+ "testUpdateOnePartitionWhere");
+ LOG.info(explain((SemanticAnalyzer)rc.sem, rc.plan, rc.ast.dump()));
+ } finally {
+ cleanupTables();
+ }
+ }
+
+ @Test
+ public void testInsertValues() throws Exception {
+ try {
+ ReturnInfo rc = parseAndAnalyze("insert into table T values ('abc', 3), ('ghi', 5)",
+ "testInsertValues");
+
+ LOG.info(explain((SemanticAnalyzer)rc.sem, rc.plan, rc.ast.dump()));
+
+ } finally {
+ cleanupTables();
+ }
+ }
+
+ @Test
+ public void testInsertValuesPartitioned() throws Exception {
+ try {
+ ReturnInfo rc = parseAndAnalyze("insert into table U partition (ds) values " +
+ "('abc', 3, 'today'), ('ghi', 5, 'tomorrow')",
+ "testInsertValuesPartitioned");
+
+ LOG.info(explain((SemanticAnalyzer) rc.sem, rc.plan, rc.ast.dump()));
+
+ } finally {
+ cleanupTables();
+ }
+ }
+
+ @Before
+ public void setup() {
+ conf = new HiveConf();
+ conf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+ conf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+ }
+
+ public void cleanupTables() throws HiveException {
+ if (db != null) {
+ db.dropTable("T");
+ db.dropTable("U");
+ }
+ }
+
+ private class ReturnInfo {
+ ASTNode ast;
+ BaseSemanticAnalyzer sem;
+ QueryPlan plan;
+
+ ReturnInfo(ASTNode a, BaseSemanticAnalyzer s, QueryPlan p) {
+ ast = a;
+ sem = s;
+ plan = p;
+ }
+ }
+
+ private ReturnInfo parseAndAnalyze(String query, String testName)
+ throws IOException, ParseException, HiveException {
+
+ SessionState.start(conf);
+ Context ctx = new Context(conf);
+ ctx.setCmd(query);
+ ctx.setHDFSCleanup(true);
+
+ ParseDriver pd = new ParseDriver();
+ ASTNode tree = pd.parse(query, ctx);
+ tree = ParseUtils.findRootNonNullToken(tree);
+
+ BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(conf, tree);
+ SessionState.get().initTxnMgr(conf);
+ db = sem.getDb();
+
+ // I have to create the tables here (rather than in setup()) because I need the Hive
+ // connection, which is conviently created by the semantic analyzer.
+ db.createTable("T", Arrays.asList("a", "b"), null, OrcInputFormat.class, OrcOutputFormat.class);
+ db.createTable("U", Arrays.asList("a", "b"), Arrays.asList("ds"), OrcInputFormat.class,
+ OrcOutputFormat.class);
+ Table u = db.getTable("U");
+ Map<String, String> partVals = new HashMap<String, String>(2);
+ partVals.put("ds", "yesterday");
+ db.createPartition(u, partVals);
+ partVals.clear();
+ partVals.put("ds", "today");
+ db.createPartition(u, partVals);
+ sem.analyze(tree, ctx);
+ // validate the plan
+ sem.validate();
+
+ QueryPlan plan = new QueryPlan(query, sem, 0L, testName);
+
+ return new ReturnInfo(tree, sem, plan);
+ }
+
+ private String explain(SemanticAnalyzer sem, QueryPlan plan, String astStringTree) throws
+ IOException {
+ FileSystem fs = FileSystem.get(conf);
+ File f = File.createTempFile("TestSemanticAnalyzer", "explain");
+ Path tmp = new Path(f.getPath());
+ fs.create(tmp);
+ fs.deleteOnExit(tmp);
+ ExplainWork work = new ExplainWork(tmp, sem.getParseContext(), sem.getRootTasks(),
+ sem.getFetchTask(), astStringTree, sem, true, false, false, false, false);
+ ExplainTask task = new ExplainTask();
+ task.setWork(work);
+ task.initialize(conf, plan, null);
+ task.execute(null);
+ FSDataInputStream in = fs.open(tmp);
+ StringBuilder builder = new StringBuilder();
+ final int bufSz = 4096;
+ byte[] buf = new byte[bufSz];
+ long pos = 0L;
+ while (true) {
+ int bytesRead = in.read(pos, buf, 0, bufSz);
+ if (bytesRead > 0) {
+ pos += bytesRead;
+ builder.append(new String(buf, 0, bytesRead));
+ } else {
+ // Reached end of file
+ in.close();
+ break;
+ }
+ }
+ return builder.toString()
+ .replaceAll("pfile:/.*\n", "pfile:MASKED-OUT\n")
+ .replaceAll("location file:/.*\n", "location file:MASKED-OUT\n")
+ .replaceAll("file:/.*\n", "file:MASKED-OUT\n")
+ .replaceAll("transient_lastDdlTime.*\n", "transient_lastDdlTime MASKED-OUT\n");
+ }
+}
Added: hive/trunk/ql/src/test/queries/clientnegative/acid_overwrite.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientnegative/acid_overwrite.q?rev=1624788&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientnegative/acid_overwrite.q (added)
+++ hive/trunk/ql/src/test/queries/clientnegative/acid_overwrite.q Sat Sep 13 22:09:31 2014
@@ -0,0 +1,9 @@
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+set hive.enforce.bucketing=true;
+
+create table acid_uanp(a int, b varchar(128)) clustered by (a) into 2 buckets stored as orc;
+
+insert into table acid_uanp select cint, cast(cstring1 as varchar(128)) from alltypesorc where cint < 0 order by cint limit 10;
+insert overwrite table acid_uanp select cint, cast(cstring1 as varchar(128)) from alltypesorc;
Added: hive/trunk/ql/src/test/queries/clientnegative/delete_not_acid.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientnegative/delete_not_acid.q?rev=1624788&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientnegative/delete_not_acid.q (added)
+++ hive/trunk/ql/src/test/queries/clientnegative/delete_not_acid.q Sat Sep 13 22:09:31 2014
@@ -0,0 +1,6 @@
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager;
+
+create table foo(a int, b varchar(128)) clustered by (a) into 1 buckets stored as orc;
+
+delete from foo;
Added: hive/trunk/ql/src/test/queries/clientnegative/update_not_acid.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientnegative/update_not_acid.q?rev=1624788&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientnegative/update_not_acid.q (added)
+++ hive/trunk/ql/src/test/queries/clientnegative/update_not_acid.q Sat Sep 13 22:09:31 2014
@@ -0,0 +1,6 @@
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager;
+
+create table foo(a int, b varchar(128)) clustered by (a) into 1 buckets stored as orc;
+
+update foo set b = 'fred';
Added: hive/trunk/ql/src/test/queries/clientnegative/update_partition_col.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientnegative/update_partition_col.q?rev=1624788&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientnegative/update_partition_col.q (added)
+++ hive/trunk/ql/src/test/queries/clientnegative/update_partition_col.q Sat Sep 13 22:09:31 2014
@@ -0,0 +1,8 @@
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+set hive.enforce.bucketing=true;
+
+create table foo(a int, b varchar(128)) partitioned by (ds string) clustered by (a) into 2 buckets stored as orc;
+
+update foo set ds = 'fred';
Added: hive/trunk/ql/src/test/queries/clientpositive/delete_all_non_partitioned.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/delete_all_non_partitioned.q?rev=1624788&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/delete_all_non_partitioned.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/delete_all_non_partitioned.q Sat Sep 13 22:09:31 2014
@@ -0,0 +1,17 @@
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+set hive.enforce.bucketing=true;
+set hive.exec.reducers.max = 1;
+
+create table acid_danp(a int, b varchar(128)) clustered by (a) into 2 buckets stored as orc;
+
+insert into table acid_danp select cint, cast(cstring1 as varchar(128)) from alltypesorc where cint < 0 order by cint limit 10;
+
+select a,b from acid_danp order by a;
+
+delete from acid_danp;
+
+select a,b from acid_danp;
+
+
Added: hive/trunk/ql/src/test/queries/clientpositive/delete_all_partitioned.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/delete_all_partitioned.q?rev=1624788&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/delete_all_partitioned.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/delete_all_partitioned.q Sat Sep 13 22:09:31 2014
@@ -0,0 +1,16 @@
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+set hive.enforce.bucketing=true;
+set hive.mapred.supports.subdirectories=true;
+
+create table acid_dap(a int, b varchar(128)) partitioned by (ds string) clustered by (a) into 2 buckets stored as orc;
+
+insert into table acid_dap partition (ds='today') select cint, cast(cstring1 as varchar(128)) from alltypesorc where cint is not null and cint < 0 order by cint limit 10;
+insert into table acid_dap partition (ds='tomorrow') select cint, cast(cstring1 as varchar(128)) from alltypesorc where cint is not null and cint > 1000 order by cint limit 10;
+
+select a,b,ds from acid_dap order by a,b;
+
+delete from acid_dap;
+
+select * from acid_dap;
Added: hive/trunk/ql/src/test/queries/clientpositive/delete_orig_table.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/delete_orig_table.q?rev=1624788&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/delete_orig_table.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/delete_orig_table.q Sat Sep 13 22:09:31 2014
@@ -0,0 +1,29 @@
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+set hive.enforce.bucketing=true;
+
+dfs ${system:test.dfs.mkdir} ${system:test.tmp.dir}/delete_orig_table;
+dfs -copyFromLocal ../../data/files/alltypesorc ${system:test.tmp.dir}/delete_orig_table/00000_0;
+
+create table acid_dot(
+ ctinyint TINYINT,
+ csmallint SMALLINT,
+ cint INT,
+ cbigint BIGINT,
+ cfloat FLOAT,
+ cdouble DOUBLE,
+ cstring1 STRING,
+ cstring2 STRING,
+ ctimestamp1 TIMESTAMP,
+ ctimestamp2 TIMESTAMP,
+ cboolean1 BOOLEAN,
+ cboolean2 BOOLEAN) clustered by (cint) into 1 buckets stored as orc location '${system:test.tmp.dir}/delete_orig_table';
+
+select count(*) from acid_dot;
+
+delete from acid_dot where cint < -1070551679;
+
+select count(*) from acid_dot;
+
+dfs -rmr ${system:test.tmp.dir}/delete_orig_table;
Added: hive/trunk/ql/src/test/queries/clientpositive/delete_tmp_table.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/delete_tmp_table.q?rev=1624788&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/delete_tmp_table.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/delete_tmp_table.q Sat Sep 13 22:09:31 2014
@@ -0,0 +1,16 @@
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+set hive.enforce.bucketing=true;
+
+create temporary table acid_dtt(a int, b varchar(128)) clustered by (a) into 2 buckets stored as orc;
+
+insert into table acid_dtt select cint, cast(cstring1 as varchar(128)) from alltypesorc where cint is not null order by cint limit 10;
+
+select * from acid_dtt order by a;
+
+delete from acid_dtt where b = '0ruyd6Y50JpdGRf6HqD' or b = '2uLyD28144vklju213J1mr';
+
+select a,b from acid_dtt order by b;
+
+
Added: hive/trunk/ql/src/test/queries/clientpositive/delete_where_no_match.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/delete_where_no_match.q?rev=1624788&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/delete_where_no_match.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/delete_where_no_match.q Sat Sep 13 22:09:31 2014
@@ -0,0 +1,16 @@
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+set hive.enforce.bucketing=true;
+
+create table acid_dwnm(a int, b varchar(128)) clustered by (a) into 2 buckets stored as orc;
+
+insert into table acid_dwnm select cint, cast(cstring1 as varchar(128)) from alltypesorc where cint is not null order by cint limit 10;
+
+select * from acid_dwnm order by a;
+
+delete from acid_dwnm where b = 'nosuchvalue';
+
+select a,b from acid_dwnm order by b;
+
+
Added: hive/trunk/ql/src/test/queries/clientpositive/delete_where_non_partitioned.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/delete_where_non_partitioned.q?rev=1624788&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/delete_where_non_partitioned.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/delete_where_non_partitioned.q Sat Sep 13 22:09:31 2014
@@ -0,0 +1,16 @@
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+set hive.enforce.bucketing=true;
+
+create table acid_dwnp(a int, b varchar(128)) clustered by (a) into 2 buckets stored as orc;
+
+insert into table acid_dwnp select cint, cast(cstring1 as varchar(128)) from alltypesorc where cint is not null order by cint limit 10;
+
+select * from acid_dwnp order by a;
+
+delete from acid_dwnp where b = '0ruyd6Y50JpdGRf6HqD';
+
+select a,b from acid_dwnp order by b;
+
+
Added: hive/trunk/ql/src/test/queries/clientpositive/delete_where_partitioned.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/delete_where_partitioned.q?rev=1624788&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/delete_where_partitioned.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/delete_where_partitioned.q Sat Sep 13 22:09:31 2014
@@ -0,0 +1,16 @@
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+set hive.enforce.bucketing=true;
+set hive.mapred.supports.subdirectories=true;
+
+create table acid_dwp(a int, b varchar(128)) partitioned by (ds string) clustered by (a) into 2 buckets stored as orc;
+
+insert into table acid_dwp partition (ds='today') select cint, cast(cstring1 as varchar(128)) from alltypesorc where cint is not null and cint < 0 order by cint limit 10;
+insert into table acid_dwp partition (ds='tomorrow') select cint, cast(cstring1 as varchar(128)) from alltypesorc where cint is not null and cint > -10000000 order by cint limit 10;
+
+select a,b,ds from acid_dwp order by a, ds;
+
+delete from acid_dwp where a = '-1071363017';
+
+select * from acid_dwp order by a, ds;