You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/10/06 00:26:58 UTC
svn commit: r1629544 [10/33] - in /hive/branches/spark-new: ./
accumulo-handler/ beeline/ beeline/src/java/org/apache/hive/beeline/ bin/
bin/ext/ common/ common/src/java/org/apache/hadoop/hive/conf/
common/src/test/org/apache/hadoop/hive/common/type/ c...
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java Sun Oct 5 22:26:43 2014
@@ -71,7 +71,6 @@ import org.apache.hadoop.hive.ql.plan.Pl
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import org.apache.hadoop.io.IntWritable;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -85,6 +84,7 @@ import com.google.common.collect.Maps;
*/
public class SortedDynPartitionOptimizer implements Transform {
+ private static final String BUCKET_NUMBER_COL_NAME = "_bucket_number";
@Override
public ParseContext transform(ParseContext pCtx) throws SemanticException {
@@ -216,6 +216,13 @@ public class SortedDynPartitionOptimizer
ReduceSinkDesc rsConf = getReduceSinkDesc(partitionPositions, sortPositions, sortOrder,
newValueCols, bucketColumns, numBuckets, fsParent, fsOp.getConf().getWriteType());
+ if (!bucketColumns.isEmpty()) {
+ String tableAlias = outRR.getColumnInfos().get(0).getTabAlias();
+ ColumnInfo ci = new ColumnInfo(BUCKET_NUMBER_COL_NAME, TypeInfoFactory.stringTypeInfo,
+ tableAlias, true, true);
+ outRR.put(tableAlias, BUCKET_NUMBER_COL_NAME, ci);
+ }
+
// Create ReduceSink operator
ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap(
OperatorFactory.getAndMakeChild(rsConf, new RowSchema(outRR.getColumnInfos()), fsParent),
@@ -380,8 +387,11 @@ public class SortedDynPartitionOptimizer
// corresponding with bucket number and hence their OIs
for (Integer idx : keyColsPosInVal) {
if (idx < 0) {
- newKeyCols.add(new ExprNodeConstantDesc(TypeInfoFactory
- .getPrimitiveTypeInfoFromPrimitiveWritable(IntWritable.class), -1));
+ // add bucket number column to both key and value
+ ExprNodeConstantDesc encd = new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo,
+ BUCKET_NUMBER_COL_NAME);
+ newKeyCols.add(encd);
+ newValueCols.add(encd);
} else {
newKeyCols.add(newValueCols.get(idx).clone());
}
@@ -395,7 +405,8 @@ public class SortedDynPartitionOptimizer
// should honor the ordering of records provided by ORDER BY in SELECT statement
ReduceSinkOperator parentRSOp = OperatorUtils.findSingleOperatorUpstream(parent,
ReduceSinkOperator.class);
- if (parentRSOp != null) {
+ boolean isOrderBy = parseCtx.getQB().getParseInfo().getDestToOrderBy().size() > 0;
+ if (parentRSOp != null && isOrderBy) {
String parentRSOpOrder = parentRSOp.getConf().getOrder();
if (parentRSOpOrder != null && !parentRSOpOrder.isEmpty() && sortPositions.isEmpty()) {
newKeyCols.addAll(parentRSOp.getConf().getKeyCols());
@@ -417,6 +428,9 @@ public class SortedDynPartitionOptimizer
List<String> outCols = Utilities.getInternalColumnNamesFromSignature(parent.getSchema()
.getSignature());
ArrayList<String> outValColNames = Lists.newArrayList(outCols);
+ if (!bucketColumns.isEmpty()) {
+ outValColNames.add(BUCKET_NUMBER_COL_NAME);
+ }
List<FieldSchema> valFields = PlanUtils.getFieldSchemasFromColumnList(newValueCols,
outValColNames, 0, "");
TableDesc valueTable = PlanUtils.getReduceValueTableDesc(valFields);
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java Sun Oct 5 22:26:43 2014
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map.Entry;
import java.util.Stack;
+import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
@@ -104,7 +105,12 @@ public class OpTraitsRulesProcFactory {
List<List<String>> listBucketCols = new ArrayList<List<String>>();
listBucketCols.add(bucketCols);
- OpTraits opTraits = new OpTraits(listBucketCols, -1);
+ int numBuckets = -1;
+ OpTraits parentOpTraits = rs.getParentOperators().get(0).getConf().getOpTraits();
+ if (parentOpTraits != null) {
+ numBuckets = parentOpTraits.getNumBuckets();
+ }
+ OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listBucketCols);
rs.setOpTraits(opTraits);
return null;
}
@@ -163,15 +169,21 @@ public class OpTraitsRulesProcFactory {
} catch (HiveException e) {
prunedPartList = null;
}
- boolean bucketMapJoinConvertible = checkBucketedTable(table,
+ boolean isBucketed = checkBucketedTable(table,
opTraitsCtx.getParseContext(), prunedPartList);
- List<List<String>>bucketCols = new ArrayList<List<String>>();
+ List<List<String>> bucketColsList = new ArrayList<List<String>>();
+ List<List<String>> sortedColsList = new ArrayList<List<String>>();
int numBuckets = -1;
- if (bucketMapJoinConvertible) {
- bucketCols.add(table.getBucketCols());
+ if (isBucketed) {
+ bucketColsList.add(table.getBucketCols());
numBuckets = table.getNumBuckets();
+ List<String> sortCols = new ArrayList<String>();
+ for (Order colSortOrder : table.getSortCols()) {
+ sortCols.add(colSortOrder.getCol());
+ }
+ sortedColsList.add(sortCols);
}
- OpTraits opTraits = new OpTraits(bucketCols, numBuckets);
+ OpTraits opTraits = new OpTraits(bucketColsList, numBuckets, sortedColsList);
ts.setOpTraits(opTraits);
return null;
}
@@ -197,7 +209,7 @@ public class OpTraitsRulesProcFactory {
List<List<String>> listBucketCols = new ArrayList<List<String>>();
listBucketCols.add(gbyKeys);
- OpTraits opTraits = new OpTraits(listBucketCols, -1);
+ OpTraits opTraits = new OpTraits(listBucketCols, -1, listBucketCols);
gbyOp.setOpTraits(opTraits);
return null;
}
@@ -205,22 +217,17 @@ public class OpTraitsRulesProcFactory {
public static class SelectRule implements NodeProcessor {
- @Override
- public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
- Object... nodeOutputs) throws SemanticException {
- SelectOperator selOp = (SelectOperator)nd;
- List<List<String>> parentBucketColNames =
- selOp.getParentOperators().get(0).getOpTraits().getBucketColNames();
-
+ public List<List<String>> getConvertedColNames(List<List<String>> parentColNames,
+ SelectOperator selOp) {
List<List<String>> listBucketCols = new ArrayList<List<String>>();
if (selOp.getColumnExprMap() != null) {
- if (parentBucketColNames != null) {
- for (List<String> colNames : parentBucketColNames) {
+ if (parentColNames != null) {
+ for (List<String> colNames : parentColNames) {
List<String> bucketColNames = new ArrayList<String>();
for (String colName : colNames) {
for (Entry<String, ExprNodeDesc> entry : selOp.getColumnExprMap().entrySet()) {
if (entry.getValue() instanceof ExprNodeColumnDesc) {
- if(((ExprNodeColumnDesc)(entry.getValue())).getColumn().equals(colName)) {
+ if (((ExprNodeColumnDesc) (entry.getValue())).getColumn().equals(colName)) {
bucketColNames.add(entry.getKey());
}
}
@@ -231,11 +238,34 @@ public class OpTraitsRulesProcFactory {
}
}
+ return listBucketCols;
+ }
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ SelectOperator selOp = (SelectOperator)nd;
+ List<List<String>> parentBucketColNames =
+ selOp.getParentOperators().get(0).getOpTraits().getBucketColNames();
+
+ List<List<String>> listBucketCols = null;
+ List<List<String>> listSortCols = null;
+ if (selOp.getColumnExprMap() != null) {
+ if (parentBucketColNames != null) {
+ listBucketCols = getConvertedColNames(parentBucketColNames, selOp);
+ }
+ List<List<String>> parentSortColNames = selOp.getParentOperators().get(0).getOpTraits()
+ .getSortCols();
+ if (parentSortColNames != null) {
+ listSortCols = getConvertedColNames(parentSortColNames, selOp);
+ }
+ }
+
int numBuckets = -1;
if (selOp.getParentOperators().get(0).getOpTraits() != null) {
numBuckets = selOp.getParentOperators().get(0).getOpTraits().getNumBuckets();
}
- OpTraits opTraits = new OpTraits(listBucketCols, numBuckets);
+ OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listSortCols);
selOp.setOpTraits(opTraits);
return null;
}
@@ -248,6 +278,7 @@ public class OpTraitsRulesProcFactory {
Object... nodeOutputs) throws SemanticException {
JoinOperator joinOp = (JoinOperator)nd;
List<List<String>> bucketColsList = new ArrayList<List<String>>();
+ List<List<String>> sortColsList = new ArrayList<List<String>>();
byte pos = 0;
for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
if (!(parentOp instanceof ReduceSinkOperator)) {
@@ -259,26 +290,24 @@ public class OpTraitsRulesProcFactory {
ReduceSinkRule rsRule = new ReduceSinkRule();
rsRule.process(rsOp, stack, procCtx, nodeOutputs);
}
- bucketColsList.add(getOutputColNames(joinOp, rsOp, pos));
+ bucketColsList.add(getOutputColNames(joinOp, rsOp.getOpTraits().getBucketColNames(), pos));
+ sortColsList.add(getOutputColNames(joinOp, rsOp.getOpTraits().getSortCols(), pos));
pos++;
}
- joinOp.setOpTraits(new OpTraits(bucketColsList, -1));
+ joinOp.setOpTraits(new OpTraits(bucketColsList, -1, bucketColsList));
return null;
}
- private List<String> getOutputColNames(JoinOperator joinOp,
- ReduceSinkOperator rs, byte pos) {
- List<List<String>> parentBucketColNames =
- rs.getOpTraits().getBucketColNames();
-
- if (parentBucketColNames != null) {
+ private List<String> getOutputColNames(JoinOperator joinOp, List<List<String>> parentColNames,
+ byte pos) {
+ if (parentColNames != null) {
List<String> bucketColNames = new ArrayList<String>();
// guaranteed that there is only 1 list within this list because
// a reduce sink always brings down the bucketing cols to a single list.
// may not be true with correlation operators (mux-demux)
- List<String> colNames = parentBucketColNames.get(0);
+ List<String> colNames = parentColNames.get(0);
for (String colName : colNames) {
for (ExprNodeDesc exprNode : joinOp.getConf().getExprs().get(pos)) {
if (exprNode instanceof ExprNodeColumnDesc) {
@@ -317,7 +346,7 @@ public class OpTraitsRulesProcFactory {
@Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
- OpTraits opTraits = new OpTraits(null, -1);
+ OpTraits opTraits = new OpTraits(null, -1, null);
@SuppressWarnings("unchecked")
Operator<? extends OperatorDesc> operator = (Operator<? extends OperatorDesc>)nd;
operator.setOpTraits(opTraits);
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java Sun Oct 5 22:26:43 2014
@@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
+import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
@@ -56,6 +57,7 @@ import org.apache.hadoop.hive.ql.plan.Ex
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
@@ -152,6 +154,11 @@ public class CrossProductCheck implement
private void checkMapJoins(TezWork tzWrk) throws SemanticException {
for(BaseWork wrk : tzWrk.getAllWork() ) {
+
+ if ( wrk instanceof MergeJoinWork ) {
+ wrk = ((MergeJoinWork)wrk).getMainWork();
+ }
+
List<String> warnings = new MapJoinCheck(wrk.getName()).analyze(wrk);
if ( !warnings.isEmpty() ) {
for(String w : warnings) {
@@ -163,12 +170,17 @@ public class CrossProductCheck implement
private void checkTezReducer(TezWork tzWrk) throws SemanticException {
for(BaseWork wrk : tzWrk.getAllWork() ) {
- if ( !(wrk instanceof ReduceWork) ) {
+
+ if ( wrk instanceof MergeJoinWork ) {
+ wrk = ((MergeJoinWork)wrk).getMainWork();
+ }
+
+ if ( !(wrk instanceof ReduceWork ) ) {
continue;
}
ReduceWork rWork = (ReduceWork) wrk;
Operator<? extends OperatorDesc> reducer = ((ReduceWork)wrk).getReducer();
- if ( reducer instanceof JoinOperator ) {
+ if ( reducer instanceof JoinOperator || reducer instanceof CommonMergeJoinOperator ) {
Map<Integer, ExtractReduceSinkInfo.Info> rsInfo =
new HashMap<Integer, ExtractReduceSinkInfo.Info>();
for(Map.Entry<Integer, String> e : rWork.getTagToInput().entrySet()) {
@@ -185,7 +197,7 @@ public class CrossProductCheck implement
return;
}
Operator<? extends OperatorDesc> reducer = rWrk.getReducer();
- if ( reducer instanceof JoinOperator ) {
+ if ( reducer instanceof JoinOperator|| reducer instanceof CommonMergeJoinOperator ) {
BaseWork prntWork = mrWrk.getMapWork();
checkForCrossProduct(taskName, reducer,
new ExtractReduceSinkInfo(null).analyze(prntWork));
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java Sun Oct 5 22:26:43 2014
@@ -102,13 +102,19 @@ public class NullScanTaskDispatcher impl
}
private void processAlias(MapWork work, String alias) {
+ List<String> paths = getPathsForAlias(work, alias);
+ if (paths.isEmpty()) {
+ // partitioned table which don't select any partitions
+ // there are no paths to replace with fakePath
+ return;
+ }
work.setUseOneNullRowInputFormat(true);
// Change the alias partition desc
PartitionDesc aliasPartn = work.getAliasToPartnInfo().get(alias);
changePartitionToMetadataOnly(aliasPartn);
- List<String> paths = getPathsForAlias(work, alias);
+
for (String path : paths) {
PartitionDesc partDesc = work.getPathToPartitionInfo().get(path);
PartitionDesc newPartition = changePartitionToMetadataOnly(partDesc);
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java Sun Oct 5 22:26:43 2014
@@ -422,10 +422,12 @@ public class Vectorizer implements Physi
// Check value ObjectInspector.
ObjectInspector valueObjectInspector = reduceWork.getValueObjectInspector();
- if (valueObjectInspector == null || !(valueObjectInspector instanceof StructObjectInspector)) {
+ if (valueObjectInspector == null ||
+ !(valueObjectInspector instanceof StructObjectInspector)) {
return false;
}
- StructObjectInspector valueStructObjectInspector = (StructObjectInspector)valueObjectInspector;
+ StructObjectInspector valueStructObjectInspector =
+ (StructObjectInspector)valueObjectInspector;
valueColCount = valueStructObjectInspector.getAllStructFieldRefs().size();
} catch (Exception e) {
throw new SemanticException(e);
@@ -471,18 +473,20 @@ public class Vectorizer implements Physi
LOG.info("Vectorizing ReduceWork...");
reduceWork.setVectorMode(true);
- // For some reason, the DefaultGraphWalker does not descend down from the reducer Operator as expected.
- // We need to descend down, otherwise it breaks our algorithm that determines VectorizationContext...
- // Do we use PreOrderWalker instead of DefaultGraphWalker.
+ // For some reason, the DefaultGraphWalker does not descend down from the reducer Operator as
+ // expected. We need to descend down, otherwise it breaks our algorithm that determines
+ // VectorizationContext... Do we use PreOrderWalker instead of DefaultGraphWalker.
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
- ReduceWorkVectorizationNodeProcessor vnp = new ReduceWorkVectorizationNodeProcessor(reduceWork, keyColCount, valueColCount);
+ ReduceWorkVectorizationNodeProcessor vnp =
+ new ReduceWorkVectorizationNodeProcessor(reduceWork, keyColCount, valueColCount);
addReduceWorkRules(opRules, vnp);
Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
GraphWalker ogw = new PreOrderWalker(disp);
// iterator the reduce operator tree
ArrayList<Node> topNodes = new ArrayList<Node>();
topNodes.add(reduceWork.getReducer());
- LOG.info("vectorizeReduceWork reducer Operator: " + reduceWork.getReducer().getName() + "...");
+ LOG.info("vectorizeReduceWork reducer Operator: " +
+ reduceWork.getReducer().getName() + "...");
HashMap<Node, Object> nodeOutput = new HashMap<Node, Object>();
ogw.startWalking(topNodes, nodeOutput);
@@ -561,7 +565,7 @@ public class Vectorizer implements Physi
protected final Map<String, VectorizationContext> scratchColumnContext =
new HashMap<String, VectorizationContext>();
- protected final Map<Operator<? extends OperatorDesc>, VectorizationContext> vContextsByTSOp =
+ protected final Map<Operator<? extends OperatorDesc>, VectorizationContext> vContextsByOp =
new HashMap<Operator<? extends OperatorDesc>, VectorizationContext>();
protected final Set<Operator<? extends OperatorDesc>> opsDone =
@@ -589,28 +593,30 @@ public class Vectorizer implements Physi
return scratchColumnMap;
}
- public VectorizationContext walkStackToFindVectorizationContext(Stack<Node> stack, Operator<? extends OperatorDesc> op)
- throws SemanticException {
+ public VectorizationContext walkStackToFindVectorizationContext(Stack<Node> stack,
+ Operator<? extends OperatorDesc> op) throws SemanticException {
VectorizationContext vContext = null;
if (stack.size() <= 1) {
- throw new SemanticException(String.format("Expected operator stack for operator %s to have at least 2 operators", op.getName()));
+ throw new SemanticException(
+ String.format("Expected operator stack for operator %s to have at least 2 operators",
+ op.getName()));
}
// Walk down the stack of operators until we found one willing to give us a context.
// At the bottom will be the root operator, guaranteed to have a context
int i= stack.size()-2;
while (vContext == null) {
if (i < 0) {
- throw new SemanticException(String.format("Did not find vectorization context for operator %s in operator stack", op.getName()));
+ return null;
}
Operator<? extends OperatorDesc> opParent = (Operator<? extends OperatorDesc>) stack.get(i);
- vContext = vContextsByTSOp.get(opParent);
+ vContext = vContextsByOp.get(opParent);
--i;
}
return vContext;
}
- public Operator<? extends OperatorDesc> doVectorize(Operator<? extends OperatorDesc> op, VectorizationContext vContext)
- throws SemanticException {
+ public Operator<? extends OperatorDesc> doVectorize(Operator<? extends OperatorDesc> op,
+ VectorizationContext vContext) throws SemanticException {
Operator<? extends OperatorDesc> vectorOp = op;
try {
if (!opsDone.contains(op)) {
@@ -622,7 +628,7 @@ public class Vectorizer implements Physi
if (vectorOp instanceof VectorizationContextRegion) {
VectorizationContextRegion vcRegion = (VectorizationContextRegion) vectorOp;
VectorizationContext vOutContext = vcRegion.getOuputVectorizationContext();
- vContextsByTSOp.put(op, vOutContext);
+ vContextsByOp.put(op, vOutContext);
scratchColumnContext.put(vOutContext.getFileKey(), vOutContext);
}
}
@@ -669,13 +675,24 @@ public class Vectorizer implements Physi
//
vContext.setFileKey(onefile);
scratchColumnContext.put(onefile, vContext);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Vectorized MapWork operator " + op.getName() +
+ " with vectorization context key=" + vContext.getFileKey() +
+ ", vectorTypes: " + vContext.getOutputColumnTypeMap().toString() +
+ ", columnMap: " + vContext.getColumnMap().toString());
+ }
break;
}
}
}
- vContextsByTSOp.put(op, vContext);
+ vContextsByOp.put(op, vContext);
} else {
vContext = walkStackToFindVectorizationContext(stack, op);
+ if (vContext == null) {
+ throw new SemanticException(
+ String.format("Did not find vectorization context for operator %s in operator stack",
+ op.getName()));
+ }
}
assert vContext != null;
@@ -690,7 +707,22 @@ public class Vectorizer implements Physi
return null;
}
- doVectorize(op, vContext);
+ Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Vectorized MapWork operator " + vectorOp.getName() +
+ " with vectorization context key=" + vContext.getFileKey() +
+ ", vectorTypes: " + vContext.getOutputColumnTypeMap().toString() +
+ ", columnMap: " + vContext.getColumnMap().toString());
+ if (vectorOp instanceof VectorizationContextRegion) {
+ VectorizationContextRegion vcRegion = (VectorizationContextRegion) vectorOp;
+ VectorizationContext vOutContext = vcRegion.getOuputVectorizationContext();
+ LOG.debug("Vectorized MapWork operator " + vectorOp.getName() +
+ " added new vectorization context key=" + vOutContext.getFileKey() +
+ ", vectorTypes: " + vOutContext.getOutputColumnTypeMap().toString() +
+ ", columnMap: " + vOutContext.getColumnMap().toString());
+ }
+ }
return null;
}
@@ -702,6 +734,8 @@ public class Vectorizer implements Physi
private int keyColCount;
private int valueColCount;
private Map<String, Integer> reduceColumnNameMap;
+
+ private VectorizationContext reduceShuffleVectorizationContext;
private Operator<? extends OperatorDesc> rootVectorOp;
@@ -709,12 +743,14 @@ public class Vectorizer implements Physi
return rootVectorOp;
}
- public ReduceWorkVectorizationNodeProcessor(ReduceWork rWork, int keyColCount, int valueColCount) {
+ public ReduceWorkVectorizationNodeProcessor(ReduceWork rWork, int keyColCount,
+ int valueColCount) {
this.rWork = rWork;
reduceColumnNameMap = rWork.getReduceColumnNameMap();
this.keyColCount = keyColCount;
this.valueColCount = valueColCount;
rootVectorOp = null;
+ reduceShuffleVectorizationContext = null;
}
@Override
@@ -722,7 +758,8 @@ public class Vectorizer implements Physi
Object... nodeOutputs) throws SemanticException {
Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) nd;
- LOG.info("ReduceWorkVectorizationNodeProcessor processing Operator: " + op.getName() + "...");
+ LOG.info("ReduceWorkVectorizationNodeProcessor processing Operator: " +
+ op.getName() + "...");
VectorizationContext vContext = null;
@@ -730,10 +767,24 @@ public class Vectorizer implements Physi
if (op.getParentOperators().size() == 0) {
vContext = getReduceVectorizationContext(reduceColumnNameMap);
- vContextsByTSOp.put(op, vContext);
+ vContext.setFileKey("_REDUCE_SHUFFLE_");
+ scratchColumnContext.put("_REDUCE_SHUFFLE_", vContext);
+ reduceShuffleVectorizationContext = vContext;
saveRootVectorOp = true;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Vectorized ReduceWork reduce shuffle vectorization context key=" +
+ vContext.getFileKey() +
+ ", vectorTypes: " + vContext.getOutputColumnTypeMap().toString() +
+ ", columnMap: " + vContext.getColumnMap().toString());
+ }
} else {
vContext = walkStackToFindVectorizationContext(stack, op);
+ if (vContext == null) {
+ // If we didn't find a context among the operators, assume the top -- reduce shuffle's
+ // vectorization context.
+ vContext = reduceShuffleVectorizationContext;
+ }
}
assert vContext != null;
@@ -749,6 +800,21 @@ public class Vectorizer implements Physi
}
Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Vectorized ReduceWork operator " + vectorOp.getName() +
+ " with vectorization context key=" + vContext.getFileKey() +
+ ", vectorTypes: " + vContext.getOutputColumnTypeMap().toString() +
+ ", columnMap: " + vContext.getColumnMap().toString());
+ if (vectorOp instanceof VectorizationContextRegion) {
+ VectorizationContextRegion vcRegion = (VectorizationContextRegion) vectorOp;
+ VectorizationContext vOutContext = vcRegion.getOuputVectorizationContext();
+ LOG.debug("Vectorized ReduceWork operator " + vectorOp.getName() +
+ " added new vectorization context key=" + vOutContext.getFileKey() +
+ ", vectorTypes: " + vOutContext.getOutputColumnTypeMap().toString() +
+ ", columnMap: " + vOutContext.getColumnMap().toString());
+ }
+ }
if (vectorOp instanceof VectorGroupByOperator) {
VectorGroupByOperator groupBy = (VectorGroupByOperator) vectorOp;
VectorGroupByDesc vectorDesc = groupBy.getConf().getVectorDesc();
@@ -827,6 +893,7 @@ public class Vectorizer implements Physi
break;
case FILESINK:
case LIMIT:
+ case EVENT:
ret = true;
break;
default:
@@ -866,6 +933,7 @@ public class Vectorizer implements Physi
ret = validateFileSinkOperator((FileSinkOperator) op);
break;
case LIMIT:
+ case EVENT:
ret = true;
break;
default:
@@ -1005,11 +1073,6 @@ public class Vectorizer implements Physi
}
private boolean validateFileSinkOperator(FileSinkOperator op) {
- // HIVE-7557: For now, turn off dynamic partitioning to give more time to
- // figure out how to make VectorFileSink work correctly with it...
- if (op.getConf().getDynPartCtx() != null) {
- return false;
- }
return true;
}
@@ -1017,7 +1080,8 @@ public class Vectorizer implements Physi
return validateExprNodeDesc(descs, VectorExpressionDescriptor.Mode.PROJECTION);
}
- private boolean validateExprNodeDesc(List<ExprNodeDesc> descs, VectorExpressionDescriptor.Mode mode) {
+ private boolean validateExprNodeDesc(List<ExprNodeDesc> descs,
+ VectorExpressionDescriptor.Mode mode) {
for (ExprNodeDesc d : descs) {
boolean ret = validateExprNodeDesc(d, mode);
if (!ret) {
@@ -1109,8 +1173,8 @@ public class Vectorizer implements Physi
if (!supportedAggregationUdfs.contains(aggDesc.getGenericUDAFName().toLowerCase())) {
return false;
}
- if (aggDesc.getParameters() != null) {
- return validateExprNodeDesc(aggDesc.getParameters());
+ if (aggDesc.getParameters() != null && !validateExprNodeDesc(aggDesc.getParameters())) {
+ return false;
}
// See if we can vectorize the aggregation.
try {
@@ -1175,11 +1239,13 @@ public class Vectorizer implements Physi
return new VectorizationContext(cmap, columnCount);
}
- private VectorizationContext getReduceVectorizationContext(Map<String, Integer> reduceColumnNameMap) {
+ private VectorizationContext getReduceVectorizationContext(
+ Map<String, Integer> reduceColumnNameMap) {
return new VectorizationContext(reduceColumnNameMap, reduceColumnNameMap.size());
}
- private void fixupParentChildOperators(Operator<? extends OperatorDesc> op, Operator<? extends OperatorDesc> vectorOp) {
+ private void fixupParentChildOperators(Operator<? extends OperatorDesc> op,
+ Operator<? extends OperatorDesc> vectorOp) {
if (op.getParentOperators() != null) {
vectorOp.setParentOperators(op.getParentOperators());
for (Operator<? extends OperatorDesc> p : op.getParentOperators()) {
@@ -1207,6 +1273,7 @@ public class Vectorizer implements Physi
case REDUCESINK:
case LIMIT:
case EXTRACT:
+ case EVENT:
vectorOp = OperatorFactory.getVectorOperator(op.getConf(), vContext);
break;
default:
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java Sun Oct 5 22:26:43 2014
@@ -57,6 +57,7 @@ import org.apache.hadoop.hive.ql.udf.gen
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
/**
* The transformation step that does partition pruning.
@@ -155,27 +156,85 @@ public class PartitionPruner implements
* pruner condition.
* @throws HiveException
*/
- private static PrunedPartitionList prune(Table tab, ExprNodeDesc prunerExpr,
+ public static PrunedPartitionList prune(Table tab, ExprNodeDesc prunerExpr,
HiveConf conf, String alias, Map<String, PrunedPartitionList> prunedPartitionsMap)
throws SemanticException {
+
LOG.trace("Started pruning partiton");
LOG.trace("dbname = " + tab.getDbName());
LOG.trace("tabname = " + tab.getTableName());
- LOG.trace("prune Expression = " + prunerExpr);
+ LOG.trace("prune Expression = " + prunerExpr == null ? "" : prunerExpr);
String key = tab.getDbName() + "." + tab.getTableName() + ";";
- if (prunerExpr != null) {
- key = key + prunerExpr.getExprString();
+ if (!tab.isPartitioned()) {
+ // If the table is not partitioned, return empty list.
+ return getAllPartsFromCacheOrServer(tab, key, false, prunedPartitionsMap);
+ }
+
+ if ("strict".equalsIgnoreCase(HiveConf.getVar(conf, HiveConf.ConfVars.HIVEMAPREDMODE))
+ && !hasColumnExpr(prunerExpr)) {
+ // If the "strict" mode is on, we have to provide partition pruner for each table.
+ throw new SemanticException(ErrorMsg.NO_PARTITION_PREDICATE
+ .getMsg("for Alias \"" + alias + "\" Table \"" + tab.getTableName() + "\""));
+ }
+
+ if (prunerExpr == null) {
+ // In non-strict mode and there is no predicates at all - get everything.
+ return getAllPartsFromCacheOrServer(tab, key, false, prunedPartitionsMap);
+ }
+
+ Set<String> partColsUsedInFilter = new LinkedHashSet<String>();
+ // Replace virtual columns with nulls. See javadoc for details.
+ prunerExpr = removeNonPartCols(prunerExpr, extractPartColNames(tab), partColsUsedInFilter);
+ // Remove all parts that are not partition columns. See javadoc for details.
+ ExprNodeGenericFuncDesc compactExpr = (ExprNodeGenericFuncDesc)compactExpr(prunerExpr.clone());
+ String oldFilter = prunerExpr.getExprString();
+ if (compactExpr == null) {
+ // Non-strict mode, and all the predicates are on non-partition columns - get everything.
+ LOG.debug("Filter " + oldFilter + " was null after compacting");
+ return getAllPartsFromCacheOrServer(tab, key, true, prunedPartitionsMap);
+ }
+ LOG.debug("Filter w/ compacting: " + compactExpr.getExprString()
+ + "; filter w/o compacting: " + oldFilter);
+
+ key = key + compactExpr.getExprString();
+ PrunedPartitionList ppList = prunedPartitionsMap.get(key);
+ if (ppList != null) {
+ return ppList;
+ }
+
+ ppList = getPartitionsFromServer(tab, compactExpr, conf, alias, partColsUsedInFilter, oldFilter.equals(compactExpr.getExprString()));
+ prunedPartitionsMap.put(key, ppList);
+ return ppList;
+ }
+
+ private static PrunedPartitionList getAllPartsFromCacheOrServer(Table tab, String key, boolean unknownPartitions,
+ Map<String, PrunedPartitionList> partsCache) throws SemanticException {
+ PrunedPartitionList ppList = partsCache.get(key);
+ if (ppList != null) {
+ return ppList;
}
- PrunedPartitionList ret = prunedPartitionsMap.get(key);
- if (ret != null) {
- return ret;
+ Set<Partition> parts;
+ try {
+ parts = getAllPartitions(tab);
+ } catch (HiveException e) {
+ throw new SemanticException(e);
}
+ ppList = new PrunedPartitionList(tab, parts, null, unknownPartitions);
+ partsCache.put(key, ppList);
+ return ppList;
+ }
- ret = getPartitionsFromServer(tab, prunerExpr, conf, alias);
- prunedPartitionsMap.put(key, ret);
- return ret;
+ private static ExprNodeDesc removeTruePredciates(ExprNodeDesc e) {
+ if (e instanceof ExprNodeConstantDesc) {
+ ExprNodeConstantDesc eC = (ExprNodeConstantDesc) e;
+ if (e.getTypeInfo() == TypeInfoFactory.booleanTypeInfo
+ && eC.getValue() == Boolean.TRUE) {
+ return null;
+ }
+ }
+ return e;
}
/**
@@ -187,7 +246,8 @@ public class PartitionPruner implements
*/
static private ExprNodeDesc compactExpr(ExprNodeDesc expr) {
if (expr instanceof ExprNodeConstantDesc) {
- if (((ExprNodeConstantDesc)expr).getValue() == null) {
+ expr = removeTruePredciates(expr);
+ if (expr == null || ((ExprNodeConstantDesc)expr).getValue() == null) {
return null;
} else {
throw new IllegalStateException("Unexpected non-null ExprNodeConstantDesc: "
@@ -198,10 +258,11 @@ public class PartitionPruner implements
boolean isAnd = udf instanceof GenericUDFOPAnd;
if (isAnd || udf instanceof GenericUDFOPOr) {
List<ExprNodeDesc> children = expr.getChildren();
- ExprNodeDesc left = children.get(0);
- children.set(0, compactExpr(left));
- ExprNodeDesc right = children.get(1);
- children.set(1, compactExpr(right));
+ ExprNodeDesc left = removeTruePredciates(children.get(0));
+ children.set(0, left == null ? null : compactExpr(left));
+ ExprNodeDesc right = removeTruePredciates(children.get(1));
+ children.set(1, right == null ? null : compactExpr(right));
+
// Note that one does not simply compact (not-null or null) to not-null.
// Only if we have an "and" is it valid to send one side to metastore.
if (children.get(0) == null && children.get(1) == null) {
@@ -267,40 +328,8 @@ public class PartitionPruner implements
}
private static PrunedPartitionList getPartitionsFromServer(Table tab,
- ExprNodeDesc prunerExpr, HiveConf conf, String alias) throws SemanticException {
+ final ExprNodeGenericFuncDesc compactExpr, HiveConf conf, String alias, Set<String> partColsUsedInFilter, boolean isPruningByExactFilter) throws SemanticException {
try {
- if (!tab.isPartitioned()) {
- // If the table is not partitioned, return everything.
- return new PrunedPartitionList(tab, getAllPartitions(tab), null, false);
- }
- LOG.debug("tabname = " + tab.getTableName() + " is partitioned");
-
- if ("strict".equalsIgnoreCase(HiveConf.getVar(conf, HiveConf.ConfVars.HIVEMAPREDMODE))
- && !hasColumnExpr(prunerExpr)) {
- // If the "strict" mode is on, we have to provide partition pruner for each table.
- throw new SemanticException(ErrorMsg.NO_PARTITION_PREDICATE
- .getMsg("for Alias \"" + alias + "\" Table \"" + tab.getTableName() + "\""));
- }
-
- if (prunerExpr == null) {
- // Non-strict mode, and there is no predicates at all - get everything.
- return new PrunedPartitionList(tab, getAllPartitions(tab), null, false);
- }
-
- Set<String> referred = new LinkedHashSet<String>();
- // Replace virtual columns with nulls. See javadoc for details.
- prunerExpr = removeNonPartCols(prunerExpr, extractPartColNames(tab), referred);
- // Remove all parts that are not partition columns. See javadoc for details.
- ExprNodeGenericFuncDesc compactExpr = (ExprNodeGenericFuncDesc)compactExpr(prunerExpr.clone());
- String oldFilter = prunerExpr.getExprString();
- if (compactExpr == null) {
- // Non-strict mode, and all the predicates are on non-partition columns - get everything.
- LOG.debug("Filter " + oldFilter + " was null after compacting");
- return new PrunedPartitionList(tab, getAllPartitions(tab), null, true);
- }
-
- LOG.debug("Filter w/ compacting: " + compactExpr.getExprString()
- + "; filter w/o compacting: " + oldFilter);
// Finally, check the filter for non-built-in UDFs. If these are present, we cannot
// do filtering on the server, and have to fall back to client path.
@@ -330,9 +359,8 @@ public class PartitionPruner implements
// The partitions are "unknown" if the call says so due to the expression
// evaluator returning null for a partition, or if we sent a partial expression to
// metastore and so some partitions may have no data based on other filters.
- boolean isPruningByExactFilter = oldFilter.equals(compactExpr.getExprString());
return new PrunedPartitionList(tab, new LinkedHashSet<Partition>(partitions),
- new ArrayList<String>(referred),
+ new ArrayList<String>(partColsUsedInFilter),
hasUnknownPartitions || !isPruningByExactFilter);
} catch (SemanticException e) {
throw e;
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java Sun Oct 5 22:26:43 2014
@@ -18,8 +18,14 @@
package org.apache.hadoop.hive.ql.optimizer.stats.annotation;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -31,10 +37,12 @@ import org.apache.hadoop.hive.ql.exec.Fi
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
import org.apache.hadoop.hive.ql.exec.LimitOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.RowSchema;
import org.apache.hadoop.hive.ql.exec.SelectOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -48,10 +56,12 @@ import org.apache.hadoop.hive.ql.plan.Ex
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
import org.apache.hadoop.hive.ql.plan.JoinDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.Statistics;
import org.apache.hadoop.hive.ql.stats.StatsUtils;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
@@ -66,17 +76,15 @@ import org.apache.hadoop.hive.ql.udf.gen
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNull;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Stack;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
public class StatsRulesProcFactory {
private static final Log LOG = LogFactory.getLog(StatsRulesProcFactory.class.getName());
+ private static final boolean isDebugEnabled = LOG.isDebugEnabled();
/**
* Collect basic statistics like number of rows, data size and column level statistics from the
@@ -103,9 +111,9 @@ public class StatsRulesProcFactory {
Statistics stats = StatsUtils.collectStatistics(aspCtx.getConf(), partList, table, tsop);
tsop.setStatistics(stats.clone());
- if (LOG.isDebugEnabled()) {
- LOG.debug("[0] STATS-" + tsop.toString() + " (" + table.getTableName()
- + "): " + stats.extendedToString());
+ if (isDebugEnabled) {
+ LOG.debug("[0] STATS-" + tsop.toString() + " (" + table.getTableName() + "): " +
+ stats.extendedToString());
}
} catch (CloneNotSupportedException e) {
throw new SemanticException(ErrorMsg.STATISTICS_CLONING_FAILED.getMsg());
@@ -167,14 +175,14 @@ public class StatsRulesProcFactory {
stats.setDataSize(setMaxIfInvalid(dataSize));
sop.setStatistics(stats);
- if (LOG.isDebugEnabled()) {
+ if (isDebugEnabled) {
LOG.debug("[0] STATS-" + sop.toString() + ": " + stats.extendedToString());
}
} else {
if (parentStats != null) {
sop.setStatistics(parentStats.clone());
- if (LOG.isDebugEnabled()) {
+ if (isDebugEnabled) {
LOG.debug("[1] STATS-" + sop.toString() + ": " + parentStats.extendedToString());
}
}
@@ -264,7 +272,7 @@ public class StatsRulesProcFactory {
updateStats(st, newNumRows, true, fop);
}
- if (LOG.isDebugEnabled()) {
+ if (isDebugEnabled) {
LOG.debug("[0] STATS-" + fop.toString() + ": " + st.extendedToString());
}
} else {
@@ -274,7 +282,7 @@ public class StatsRulesProcFactory {
updateStats(st, newNumRows, false, fop);
}
- if (LOG.isDebugEnabled()) {
+ if (isDebugEnabled) {
LOG.debug("[1] STATS-" + fop.toString() + ": " + st.extendedToString());
}
}
@@ -576,52 +584,103 @@ public class StatsRulesProcFactory {
@Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
+
GroupByOperator gop = (GroupByOperator) nd;
Operator<? extends OperatorDesc> parent = gop.getParentOperators().get(0);
Statistics parentStats = parent.getStatistics();
+
+ // parent stats are not populated yet
+ if (parentStats == null) {
+ return null;
+ }
+
AnnotateStatsProcCtx aspCtx = (AnnotateStatsProcCtx) procCtx;
HiveConf conf = aspCtx.getConf();
- int mapSideParallelism =
- HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_STATS_MAP_SIDE_PARALLELISM);
+ long maxSplitSize = HiveConf.getLongVar(conf, HiveConf.ConfVars.MAPREDMAXSPLITSIZE);
List<AggregationDesc> aggDesc = gop.getConf().getAggregators();
Map<String, ExprNodeDesc> colExprMap = gop.getColumnExprMap();
RowSchema rs = gop.getSchema();
Statistics stats = null;
+ List<ColStatistics> colStats = StatsUtils.getColStatisticsFromExprMap(conf, parentStats,
+ colExprMap, rs);
+ long cardinality;
+ long parallelism = 1L;
boolean mapSide = false;
- int multiplier = mapSideParallelism;
- long newNumRows;
- long newDataSize;
+ boolean mapSideHashAgg = false;
+ long inputSize = 1L;
+ boolean containsGroupingSet = gop.getConf().isGroupingSetsPresent();
+ long sizeOfGroupingSet =
+ containsGroupingSet ? gop.getConf().getListGroupingSets().size() : 1L;
+
+ // There are different cases for Group By depending on map/reduce side, hash aggregation,
+ // grouping sets and column stats. If we don't have column stats, we just assume hash
+ // aggregation is disabled. Following are the possible cases and rule for cardinality
+ // estimation
+
+ // MAP SIDE:
+ // Case 1: NO column stats, NO hash aggregation, NO grouping sets â numRows
+ // Case 2: NO column stats, NO hash aggregation, grouping sets â numRows * sizeOfGroupingSet
+ // Case 3: column stats, hash aggregation, NO grouping sets â Min(numRows / 2, ndvProduct * parallelism)
+ // Case 4: column stats, hash aggregation, grouping sets â Min((numRows * sizeOfGroupingSet) / 2, ndvProduct * parallelism * sizeOfGroupingSet)
+ // Case 5: column stats, NO hash aggregation, NO grouping sets â numRows
+ // Case 6: column stats, NO hash aggregation, grouping sets â numRows * sizeOfGroupingSet
+
+ // REDUCE SIDE:
+ // Case 7: NO column stats â numRows / 2
+ // Case 8: column stats, grouping sets â Min(numRows, ndvProduct * sizeOfGroupingSet)
+ // Case 9: column stats, NO grouping sets - Min(numRows, ndvProduct)
- // map side
if (gop.getChildOperators().get(0) instanceof ReduceSinkOperator ||
gop.getChildOperators().get(0) instanceof AppMasterEventOperator) {
- mapSide = true;
+ mapSide = true;
- // map-side grouping set present. if grouping set is present then
- // multiply the number of rows by number of elements in grouping set
- if (gop.getConf().isGroupingSetsPresent()) {
- multiplier *= gop.getConf().getListGroupingSets().size();
+ // consider approximate map side parallelism to be table data size
+ // divided by max split size
+ TableScanOperator top = OperatorUtils.findSingleOperatorUpstream(gop,
+ TableScanOperator.class);
+ // if top is null then there are multiple parents (RS as well), hence
+ // lets use parent statistics to get data size. Also maxSplitSize should
+ // be updated to bytes per reducer (1GB default)
+ if (top == null) {
+ inputSize = parentStats.getDataSize();
+ maxSplitSize = HiveConf.getLongVar(conf, HiveConf.ConfVars.BYTESPERREDUCER);
+ } else {
+ inputSize = top.getConf().getStatistics().getDataSize();
}
+ parallelism = (int) Math.ceil((double) inputSize / maxSplitSize);
+ }
+
+ if (isDebugEnabled) {
+ LOG.debug("STATS-" + gop.toString() + ": inputSize: " + inputSize + " maxSplitSize: " +
+ maxSplitSize + " parallelism: " + parallelism + " containsGroupingSet: " +
+ containsGroupingSet + " sizeOfGroupingSet: " + sizeOfGroupingSet);
}
try {
+ // satisfying precondition means column statistics is available
if (satisfyPrecondition(parentStats)) {
- stats = parentStats.clone();
- List<ColStatistics> colStats =
- StatsUtils.getColStatisticsFromExprMap(conf, parentStats, colExprMap, rs);
+ // check if map side aggregation is possible or not based on column stats
+ mapSideHashAgg = checkMapSideAggregation(gop, colStats, conf);
+
+ if (isDebugEnabled) {
+ LOG.debug("STATS-" + gop.toString() + " mapSideHashAgg: " + mapSideHashAgg);
+ }
+
+ stats = parentStats.clone();
stats.setColumnStats(colStats);
- long dvProd = 1;
+ long ndvProduct = 1;
+ final long parentNumRows = stats.getNumRows();
// compute product of distinct values of grouping columns
for (ColStatistics cs : colStats) {
if (cs != null) {
- long dv = cs.getCountDistint();
+ long ndv = cs.getCountDistint();
if (cs.getNumNulls() > 0) {
- dv += 1;
+ ndv += 1;
}
- dvProd *= dv;
+ ndvProduct *= ndv;
} else {
if (parentStats.getColumnStatsState().equals(Statistics.State.COMPLETE)) {
// the column must be an aggregate column inserted by GBY. We
@@ -632,65 +691,130 @@ public class StatsRulesProcFactory {
// partial column statistics on grouping attributes case.
// if column statistics on grouping attribute is missing, then
// assume worst case.
- // GBY rule will emit half the number of rows if dvProd is 0
- dvProd = 0;
+ // GBY rule will emit half the number of rows if ndvProduct is 0
+ ndvProduct = 0;
}
break;
}
}
- // map side
+ // if ndvProduct is 0 then column stats state must be partial and we are missing
+ // column stats for a group by column
+ if (ndvProduct == 0) {
+ ndvProduct = parentNumRows / 2;
+
+ if (isDebugEnabled) {
+ LOG.debug("STATS-" + gop.toString() + ": ndvProduct became 0 as some column does not" +
+ " have stats. ndvProduct changed to: " + ndvProduct);
+ }
+ }
+
if (mapSide) {
+ // MAP SIDE
- // since we do not know if hash-aggregation will be enabled or disabled
- // at runtime we will assume that map-side group by does not do any
- // reduction.hence no group by rule will be applied
-
- // map-side grouping set present. if grouping set is present then
- // multiply the number of rows by number of elements in grouping set
- if (gop.getConf().isGroupingSetsPresent()) {
- newNumRows = setMaxIfInvalid(multiplier * stats.getNumRows());
- newDataSize = setMaxIfInvalid(multiplier * stats.getDataSize());
- stats.setNumRows(newNumRows);
- stats.setDataSize(newDataSize);
- for (ColStatistics cs : colStats) {
- if (cs != null) {
- long oldNumNulls = cs.getNumNulls();
- long newNumNulls = multiplier * oldNumNulls;
- cs.setNumNulls(newNumNulls);
+ if (mapSideHashAgg) {
+ if (containsGroupingSet) {
+ // Case 4: column stats, hash aggregation, grouping sets
+ cardinality = Math.min((parentNumRows * sizeOfGroupingSet) / 2,
+ ndvProduct * parallelism * sizeOfGroupingSet);
+
+ if (isDebugEnabled) {
+ LOG.debug("[Case 4] STATS-" + gop.toString() + ": cardinality: " + cardinality);
+ }
+ } else {
+ // Case 3: column stats, hash aggregation, NO grouping sets
+ cardinality = Math.min(parentNumRows / 2, ndvProduct * parallelism);
+
+ if (isDebugEnabled) {
+ LOG.debug("[Case 3] STATS-" + gop.toString() + ": cardinality: " + cardinality);
}
}
} else {
+ if (containsGroupingSet) {
+ // Case 6: column stats, NO hash aggregation, grouping sets
+ cardinality = parentNumRows * sizeOfGroupingSet;
+
+ if (isDebugEnabled) {
+ LOG.debug("[Case 6] STATS-" + gop.toString() + ": cardinality: " + cardinality);
+ }
+ } else {
+ // Case 5: column stats, NO hash aggregation, NO grouping sets
+ cardinality = parentNumRows;
- // map side no grouping set
- newNumRows = stats.getNumRows() * multiplier;
- updateStats(stats, newNumRows, true, gop);
+ if (isDebugEnabled) {
+ LOG.debug("[Case 5] STATS-" + gop.toString() + ": cardinality: " + cardinality);
+ }
+ }
}
} else {
+ // REDUCE SIDE
+
+ // in reduce side GBY, we don't know if the grouping set was present or not. so get it
+ // from map side GBY
+ GroupByOperator mGop = OperatorUtils.findSingleOperatorUpstream(parent, GroupByOperator.class);
+ if (mGop != null) {
+ containsGroupingSet = mGop.getConf().isGroupingSetsPresent();
+ sizeOfGroupingSet = mGop.getConf().getListGroupingSets().size();
+ }
+
+ if (containsGroupingSet) {
+ // Case 8: column stats, grouping sets
+ cardinality = Math.min(parentNumRows, ndvProduct * sizeOfGroupingSet);
+
+ if (isDebugEnabled) {
+ LOG.debug("[Case 8] STATS-" + gop.toString() + ": cardinality: " + cardinality);
+ }
+ } else {
+ // Case 9: column stats, NO grouping sets
+ cardinality = Math.min(parentNumRows, ndvProduct);
- // reduce side
- newNumRows = applyGBYRule(stats.getNumRows(), dvProd);
- updateStats(stats, newNumRows, true, gop);
+ if (isDebugEnabled) {
+ LOG.debug("[Case 9] STATS-" + gop.toString() + ": cardinality: " + cardinality);
+ }
+ }
}
+
+ // update stats, but don't update NDV as it will not change
+ updateStats(stats, cardinality, true, gop, false);
} else {
+
+ // NO COLUMN STATS
if (parentStats != null) {
stats = parentStats.clone();
+ final long parentNumRows = stats.getNumRows();
- // worst case, in the absence of column statistics assume half the rows are emitted
+ // if we don't have column stats, we just assume hash aggregation is disabled
if (mapSide) {
+ // MAP SIDE
+
+ if (containsGroupingSet) {
+ // Case 2: NO column stats, NO hash aggregation, grouping sets
+ cardinality = parentNumRows * sizeOfGroupingSet;
+
+ if (isDebugEnabled) {
+ LOG.debug("[Case 2] STATS-" + gop.toString() + ": cardinality: " + cardinality);
+ }
+ } else {
+ // Case 1: NO column stats, NO hash aggregation, NO grouping sets
+ cardinality = parentNumRows;
- // map side
- newNumRows = multiplier * stats.getNumRows();
- newDataSize = multiplier * stats.getDataSize();
- stats.setNumRows(newNumRows);
- stats.setDataSize(newDataSize);
+ if (isDebugEnabled) {
+ LOG.debug("[Case 1] STATS-" + gop.toString() + ": cardinality: " + cardinality);
+ }
+ }
} else {
+ // REDUCE SIDE
+
+ // Case 7: NO column stats
+ cardinality = parentNumRows / 2;
- // reduce side
- newNumRows = parentStats.getNumRows() / 2;
- updateStats(stats, newNumRows, false, gop);
+ if (isDebugEnabled) {
+ LOG.debug("[Case 7] STATS-" + gop.toString() + ": cardinality: " + cardinality);
+ }
}
+
+ updateStats(stats, cardinality, false, gop);
}
}
@@ -738,7 +862,7 @@ public class StatsRulesProcFactory {
gop.setStatistics(stats);
- if (LOG.isDebugEnabled() && stats != null) {
+ if (isDebugEnabled && stats != null) {
LOG.debug("[0] STATS-" + gop.toString() + ": " + stats.extendedToString());
}
} catch (CloneNotSupportedException e) {
@@ -747,6 +871,107 @@ public class StatsRulesProcFactory {
return null;
}
+ /**
+ * This method does not take into account many configs used at runtime to
+ * disable hash aggregation like HIVEMAPAGGRHASHMINREDUCTION. This method
+ * roughly estimates the number of rows and size of each row to see if it
+ * can fit in hashtable for aggregation.
+ * @param gop - group by operator
+ * @param colStats - column stats for key columns
+ * @param conf - hive conf
+ * @return
+ */
+ private boolean checkMapSideAggregation(GroupByOperator gop,
+ List<ColStatistics> colStats, HiveConf conf) {
+
+ List<AggregationDesc> aggDesc = gop.getConf().getAggregators();
+ GroupByDesc desc = gop.getConf();
+ GroupByDesc.Mode mode = desc.getMode();
+
+ if (mode.equals(GroupByDesc.Mode.HASH)) {
+ float hashAggMem = conf.getFloatVar(
+ HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY);
+ float hashAggMaxThreshold = conf.getFloatVar(
+ HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD);
+
+ // get memory for container. May be use mapreduce.map.java.opts instead?
+ long totalMemory =
+ DagUtils.getContainerResource(conf).getMemory() * 1000L * 1000L;
+ long maxMemHashAgg = Math
+ .round(totalMemory * hashAggMem * hashAggMaxThreshold);
+
+ // estimated number of rows will be product of NDVs
+ long numEstimatedRows = 1;
+
+ // estimate size of key from column statistics
+ long avgKeySize = 0;
+ for (ColStatistics cs : colStats) {
+ if (cs != null) {
+ numEstimatedRows *= cs.getCountDistint();
+ avgKeySize += Math.ceil(cs.getAvgColLen());
+ }
+ }
+
+ // average value size will be sum of all sizes of aggregation buffers
+ long avgValSize = 0;
+ // go over all aggregation buffers and see they implement estimable
+ // interface if so they aggregate the size of the aggregation buffer
+ GenericUDAFEvaluator[] aggregationEvaluators;
+ aggregationEvaluators = new GenericUDAFEvaluator[aggDesc.size()];
+
+ // get aggregation evaluators
+ for (int i = 0; i < aggregationEvaluators.length; i++) {
+ AggregationDesc agg = aggDesc.get(i);
+ aggregationEvaluators[i] = agg.getGenericUDAFEvaluator();
+ }
+
+ // estimate size of aggregation buffer
+ for (int i = 0; i < aggregationEvaluators.length; i++) {
+
+ // each evaluator has constant java object overhead
+ avgValSize += gop.javaObjectOverHead;
+ GenericUDAFEvaluator.AggregationBuffer agg = null;
+ try {
+ agg = aggregationEvaluators[i].getNewAggregationBuffer();
+ } catch (HiveException e) {
+ // in case of exception assume unknown type (256 bytes)
+ avgValSize += gop.javaSizeUnknownType;
+ }
+
+ // aggregate size from aggregation buffers
+ if (agg != null) {
+ if (GenericUDAFEvaluator.isEstimable(agg)) {
+ avgValSize += ((GenericUDAFEvaluator.AbstractAggregationBuffer) agg)
+ .estimate();
+ } else {
+ // if the aggregation buffer is not estimable then get all the
+ // declared fields and compute the sizes from field types
+ Field[] fArr = ObjectInspectorUtils
+ .getDeclaredNonStaticFields(agg.getClass());
+ for (Field f : fArr) {
+ long avgSize = StatsUtils
+ .getAvgColLenOfFixedLengthTypes(f.getType().getName());
+ avgValSize += avgSize == 0 ? gop.javaSizeUnknownType : avgSize;
+ }
+ }
+ }
+ }
+
+ // total size of each hash entry
+ long hashEntrySize = gop.javaHashEntryOverHead + avgKeySize + avgValSize;
+
+ // estimated hash table size
+ long estHashTableSize = numEstimatedRows * hashEntrySize;
+
+ if (estHashTableSize < maxMemHashAgg) {
+ return true;
+ }
+ }
+
+ // worst-case, hash aggregation disabled
+ return false;
+ }
+
private long applyGBYRule(long numRows, long dvProd) {
long newNumRows = numRows;
@@ -967,7 +1192,7 @@ public class StatsRulesProcFactory {
outInTabAlias);
jop.setStatistics(stats);
- if (LOG.isDebugEnabled()) {
+ if (isDebugEnabled) {
LOG.debug("[0] STATS-" + jop.toString() + ": " + stats.extendedToString());
}
} else {
@@ -1001,7 +1226,7 @@ public class StatsRulesProcFactory {
wcStats.setDataSize(setMaxIfInvalid(newDataSize));
jop.setStatistics(wcStats);
- if (LOG.isDebugEnabled()) {
+ if (isDebugEnabled) {
LOG.debug("[1] STATS-" + jop.toString() + ": " + wcStats.extendedToString());
}
}
@@ -1195,7 +1420,7 @@ public class StatsRulesProcFactory {
}
lop.setStatistics(stats);
- if (LOG.isDebugEnabled()) {
+ if (isDebugEnabled) {
LOG.debug("[0] STATS-" + lop.toString() + ": " + stats.extendedToString());
}
} else {
@@ -1213,7 +1438,7 @@ public class StatsRulesProcFactory {
}
lop.setStatistics(wcStats);
- if (LOG.isDebugEnabled()) {
+ if (isDebugEnabled) {
LOG.debug("[1] STATS-" + lop.toString() + ": " + wcStats.extendedToString());
}
}
@@ -1281,7 +1506,7 @@ public class StatsRulesProcFactory {
outStats.setColumnStats(colStats);
}
rop.setStatistics(outStats);
- if (LOG.isDebugEnabled()) {
+ if (isDebugEnabled) {
LOG.debug("[0] STATS-" + rop.toString() + ": " + outStats.extendedToString());
}
} catch (CloneNotSupportedException e) {
@@ -1322,7 +1547,7 @@ public class StatsRulesProcFactory {
stats.addToColumnStats(parentStats.getColumnStats());
op.getConf().setStatistics(stats);
- if (LOG.isDebugEnabled()) {
+ if (isDebugEnabled) {
LOG.debug("[0] STATS-" + op.toString() + ": " + stats.extendedToString());
}
}
@@ -1378,6 +1603,7 @@ public class StatsRulesProcFactory {
return new DefaultStatsRule();
}
+
/**
* Update the basic statistics of the statistics object based on the row number
* @param stats
@@ -1389,6 +1615,12 @@ public class StatsRulesProcFactory {
*/
static void updateStats(Statistics stats, long newNumRows,
boolean useColStats, Operator<? extends OperatorDesc> op) {
+ updateStats(stats, newNumRows, useColStats, op, true);
+ }
+
+ static void updateStats(Statistics stats, long newNumRows,
+ boolean useColStats, Operator<? extends OperatorDesc> op,
+ boolean updateNDV) {
if (newNumRows <= 0) {
LOG.info("STATS-" + op.toString() + ": Overflow in number of rows."
@@ -1406,17 +1638,19 @@ public class StatsRulesProcFactory {
long oldNumNulls = cs.getNumNulls();
long oldDV = cs.getCountDistint();
long newNumNulls = Math.round(ratio * oldNumNulls);
- long newDV = oldDV;
+ cs.setNumNulls(newNumNulls);
+ if (updateNDV) {
+ long newDV = oldDV;
- // if ratio is greater than 1, then number of rows increases. This can happen
- // when some operators like GROUPBY duplicates the input rows in which case
- // number of distincts should not change. Update the distinct count only when
- // the output number of rows is less than input number of rows.
- if (ratio <= 1.0) {
- newDV = (long) Math.ceil(ratio * oldDV);
+ // if ratio is greater than 1, then number of rows increases. This can happen
+ // when some operators like GROUPBY duplicates the input rows in which case
+ // number of distincts should not change. Update the distinct count only when
+ // the output number of rows is less than input number of rows.
+ if (ratio <= 1.0) {
+ newDV = (long) Math.ceil(ratio * oldDV);
+ }
+ cs.setCountDistint(newDV);
}
- cs.setNumNulls(newNumNulls);
- cs.setCountDistint(newDV);
}
stats.setColumnStats(colStats);
long newDataSize = StatsUtils.getDataSizeFromColumnStats(newNumRows, colStats);
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java Sun Oct 5 22:26:43 2014
@@ -207,7 +207,7 @@ public abstract class BaseSemanticAnalyz
}
public abstract void analyzeInternal(ASTNode ast) throws SemanticException;
- public void init() {
+ public void init(boolean clearPartsCache) {
//no-op
}
@@ -217,7 +217,7 @@ public abstract class BaseSemanticAnalyz
public void analyze(ASTNode ast, Context ctx) throws SemanticException {
initCtx(ctx);
- init();
+ init(true);
analyzeInternal(ast);
}
@@ -244,7 +244,7 @@ public abstract class BaseSemanticAnalyz
this.fetchTask = fetchTask;
}
- protected void reset() {
+ protected void reset(boolean clearPartsCache) {
rootTasks = new ArrayList<Task<? extends Serializable>>();
}
@@ -406,7 +406,6 @@ public abstract class BaseSemanticAnalyz
@SuppressWarnings("nls")
public static String unescapeSQLString(String b) {
-
Character enclosure = null;
// Some of the strings can be passed in as unicode. For example, the
@@ -487,7 +486,7 @@ public abstract class BaseSemanticAnalyz
case '\\':
sb.append("\\");
break;
- // The following 2 lines are exactly what MySQL does
+ // The following 2 lines are exactly what MySQL does TODO: why do we do this?
case '%':
sb.append("\\%");
break;
@@ -505,6 +504,58 @@ public abstract class BaseSemanticAnalyz
return sb.toString();
}
+ /**
+ * Escapes the string for AST; doesn't enclose it in quotes, however.
+ */
+ public static String escapeSQLString(String b) {
+ // There's usually nothing to escape so we will be optimistic.
+ String result = b;
+ for (int i = 0; i < result.length(); ++i) {
+ char currentChar = result.charAt(i);
+ if (currentChar == '\\' && ((i + 1) < result.length())) {
+ // TODO: do we need to handle the "this is what MySQL does" here?
+ char nextChar = result.charAt(i + 1);
+ if (nextChar == '%' || nextChar == '_') {
+ ++i;
+ continue;
+ }
+ }
+ switch (currentChar) {
+ case '\0': result = spliceString(result, i, "\\0"); ++i; break;
+ case '\'': result = spliceString(result, i, "\\'"); ++i; break;
+ case '\"': result = spliceString(result, i, "\\\""); ++i; break;
+ case '\b': result = spliceString(result, i, "\\b"); ++i; break;
+ case '\n': result = spliceString(result, i, "\\n"); ++i; break;
+ case '\r': result = spliceString(result, i, "\\r"); ++i; break;
+ case '\t': result = spliceString(result, i, "\\t"); ++i; break;
+ case '\\': result = spliceString(result, i, "\\\\"); ++i; break;
+ case '\u001A': result = spliceString(result, i, "\\Z"); ++i; break;
+ default: {
+ if (currentChar < ' ') {
+ String hex = Integer.toHexString(currentChar);
+ String unicode = "\\u";
+ for (int j = 4; j > hex.length(); --j) {
+ unicode += '0';
+ }
+ unicode += hex;
+ result = spliceString(result, i, unicode);
+ i += (unicode.length() - 1);
+ }
+ break; // if not a control character, do nothing
+ }
+ }
+ }
+ return result;
+ }
+
+ private static String spliceString(String str, int i, String replacement) {
+ return spliceString(str, i, 1, replacement);
+ }
+
+ private static String spliceString(String str, int i, int length, String replacement) {
+ return str.substring(0, i) + replacement + str.substring(i + length);
+ }
+
public HashSet<ReadEntity> getInputs() {
return inputs;
}
@@ -1234,7 +1285,7 @@ public abstract class BaseSemanticAnalyz
try {
database = db.getDatabase(dbName);
} catch (Exception e) {
- throw new SemanticException(ErrorMsg.DATABASE_NOT_EXISTS.getMsg(dbName), e);
+ throw new SemanticException(e.getMessage(), e);
}
if (database == null && throwException) {
throw new SemanticException(ErrorMsg.DATABASE_NOT_EXISTS.getMsg(dbName));
@@ -1264,9 +1315,13 @@ public abstract class BaseSemanticAnalyz
try {
tab = database == null ? db.getTable(tblName, false)
: db.getTable(database, tblName, false);
- } catch (Exception e) {
+ }
+ catch (InvalidTableException e) {
throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(tblName), e);
}
+ catch (Exception e) {
+ throw new SemanticException(e.getMessage(), e);
+ }
if (tab == null && throwException) {
throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(tblName));
}
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java Sun Oct 5 22:26:43 2014
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.Context
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
import org.apache.hadoop.hive.ql.metadata.Table;
/**
@@ -58,7 +59,7 @@ public class ColumnStatsSemanticAnalyzer
private Table tbl;
public ColumnStatsSemanticAnalyzer(HiveConf conf) throws SemanticException {
- super(conf);
+ super(conf, false);
}
private boolean shouldRewrite(ASTNode tree) {
@@ -95,8 +96,10 @@ public class ColumnStatsSemanticAnalyzer
String tableName = getUnescapedName((ASTNode) tree.getChild(0).getChild(0));
try {
return db.getTable(tableName);
+ } catch (InvalidTableException e) {
+ throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(tableName), e);
} catch (HiveException e) {
- throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(tableName));
+ throw new SemanticException(e.getMessage(), e);
}
}
@@ -377,7 +380,7 @@ public class ColumnStatsSemanticAnalyzer
QBParseInfo qbp;
// initialize QB
- init();
+ init(true);
// check if it is no scan. grammar prevents coexit noscan/columns
super.processNoScanCommand(ast);
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java Sun Oct 5 22:26:43 2014
@@ -79,6 +79,7 @@ import org.apache.hadoop.hive.ql.lockmgr
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.authorization.AuthorizationParseUtils;
@@ -267,11 +268,11 @@ public class DDLSemanticAnalyzer extends
} else if (ast.getType() == HiveParser.TOK_ALTERTABLE_UNARCHIVE) {
analyzeAlterTableArchive(qualified, ast, true);
} else if (ast.getType() == HiveParser.TOK_ALTERTABLE_ADDCOLS) {
- analyzeAlterTableModifyCols(qualified, ast, AlterTableTypes.ADDCOLS);
+ analyzeAlterTableModifyCols(qualified, ast, partSpec, AlterTableTypes.ADDCOLS);
} else if (ast.getType() == HiveParser.TOK_ALTERTABLE_REPLACECOLS) {
- analyzeAlterTableModifyCols(qualified, ast, AlterTableTypes.REPLACECOLS);
+ analyzeAlterTableModifyCols(qualified, ast, partSpec, AlterTableTypes.REPLACECOLS);
} else if (ast.getType() == HiveParser.TOK_ALTERTABLE_RENAMECOL) {
- analyzeAlterTableRenameCol(qualified, ast);
+ analyzeAlterTableRenameCol(qualified, ast, partSpec);
} else if (ast.getType() == HiveParser.TOK_ALTERTABLE_ADDPARTS) {
analyzeAlterTableAddParts(qualified, ast, false);
} else if (ast.getType() == HiveParser.TOK_ALTERTABLE_DROPPARTS) {
@@ -847,7 +848,8 @@ public class DDLSemanticAnalyzer extends
outputs.add(new WriteEntity(tab, WriteEntity.WriteType.DDL_EXCLUSIVE));
}
- DropTableDesc dropTblDesc = new DropTableDesc(tableName, expectView, ifExists);
+ boolean ifPurge = (ast.getFirstChildWithType(HiveParser.KW_PURGE) != null);
+ DropTableDesc dropTblDesc = new DropTableDesc(tableName, expectView, ifExists, ifPurge);
rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(),
dropTblDesc), conf));
}
@@ -1717,7 +1719,8 @@ public class DDLSemanticAnalyzer extends
// assume the first component of DOT delimited name is tableName
// get the attemptTableName
- static public String getAttemptTableName(Hive db, String qualifiedName, boolean isColumn) {
+ static public String getAttemptTableName(Hive db, String qualifiedName, boolean isColumn)
+ throws SemanticException {
// check whether the name starts with table
// DESCRIBE table
// DESCRIBE table.column
@@ -1738,11 +1741,13 @@ public class DDLSemanticAnalyzer extends
return tableName;
}
}
- } catch (HiveException e) {
+ } catch (InvalidTableException e) {
// assume the first DOT delimited component is tableName
// OK if it is not
// do nothing when having exception
return null;
+ } catch (HiveException e) {
+ throw new SemanticException(e.getMessage(), e);
}
return null;
}
@@ -1823,7 +1828,7 @@ public class DDLSemanticAnalyzer extends
ASTNode parentAst,
ASTNode ast,
String tableName,
- Map<String, String> partSpec) {
+ Map<String, String> partSpec) throws SemanticException {
// if parent has two children
// it could be DESCRIBE table key
@@ -1879,11 +1884,13 @@ public class DDLSemanticAnalyzer extends
Table tab = null;
try {
tab = db.getTable(tableName);
- } catch (HiveException e) {
- // if table not valid
- // throw semantic exception
+ }
+ catch (InvalidTableException e) {
throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(tableName), e);
}
+ catch (HiveException e) {
+ throw new SemanticException(e.getMessage(), e);
+ }
if (partSpec != null) {
Partition part = null;
@@ -2480,7 +2487,8 @@ public class DDLSemanticAnalyzer extends
alterTblDesc), conf));
}
- private void analyzeAlterTableRenameCol(String[] qualified, ASTNode ast) throws SemanticException {
+ private void analyzeAlterTableRenameCol(String[] qualified, ASTNode ast,
+ HashMap<String, String> partSpec) throws SemanticException {
String newComment = null;
String newType = null;
newType = getTypeStringFromAST((ASTNode) ast.getChild(2));
@@ -2521,10 +2529,10 @@ public class DDLSemanticAnalyzer extends
}
String tblName = getDotName(qualified);
- AlterTableDesc alterTblDesc = new AlterTableDesc(tblName,
+ AlterTableDesc alterTblDesc = new AlterTableDesc(tblName, partSpec,
unescapeIdentifier(oldColName), unescapeIdentifier(newColName),
newType, newComment, first, flagCol);
- addInputsOutputsAlterTable(tblName, null, alterTblDesc);
+ addInputsOutputsAlterTable(tblName, partSpec, alterTblDesc);
rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(),
alterTblDesc), conf));
@@ -2568,14 +2576,14 @@ public class DDLSemanticAnalyzer extends
}
private void analyzeAlterTableModifyCols(String[] qualified, ASTNode ast,
- AlterTableTypes alterType) throws SemanticException {
+ HashMap<String, String> partSpec, AlterTableTypes alterType) throws SemanticException {
String tblName = getDotName(qualified);
List<FieldSchema> newCols = getColumns((ASTNode) ast.getChild(0));
- AlterTableDesc alterTblDesc = new AlterTableDesc(tblName, newCols,
+ AlterTableDesc alterTblDesc = new AlterTableDesc(tblName, partSpec, newCols,
alterType);
- addInputsOutputsAlterTable(tblName, null, alterTblDesc);
+ addInputsOutputsAlterTable(tblName, partSpec, alterTblDesc);
rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(),
alterTblDesc), conf));
}
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g Sun Oct 5 22:26:43 2014
@@ -263,7 +263,7 @@ searchCondition
// INSERT INTO <table> (col1,col2,...) SELECT * FROM (VALUES(1,2,3),(4,5,6),...) as Foo(a,b,c)
valueRowConstructor
:
- LPAREN atomExpression (COMMA atomExpression)* RPAREN -> ^(TOK_VALUE_ROW atomExpression+)
+ LPAREN precedenceUnaryPrefixExpression (COMMA precedenceUnaryPrefixExpression)* RPAREN -> ^(TOK_VALUE_ROW precedenceUnaryPrefixExpression+)
;
valuesTableConstructor
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/FunctionSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/FunctionSemanticAnalyzer.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/FunctionSemanticAnalyzer.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/FunctionSemanticAnalyzer.java Sun Oct 5 22:26:43 2014
@@ -22,6 +22,8 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.Database;
@@ -81,7 +83,7 @@ public class FunctionSemanticAnalyzer ex
new CreateFunctionDesc(functionName, isTemporaryFunction, className, resources);
rootTasks.add(TaskFactory.get(new FunctionWork(desc), conf));
- addEntities(functionName, isTemporaryFunction);
+ addEntities(functionName, isTemporaryFunction, resources);
}
private void analyzeDropFunction(ASTNode ast) throws SemanticException {
@@ -106,7 +108,7 @@ public class FunctionSemanticAnalyzer ex
DropFunctionDesc desc = new DropFunctionDesc(functionName, isTemporaryFunction);
rootTasks.add(TaskFactory.get(new FunctionWork(desc), conf));
- addEntities(functionName, isTemporaryFunction);
+ addEntities(functionName, isTemporaryFunction, null);
}
private ResourceType getResourceType(ASTNode token) throws SemanticException {
@@ -152,8 +154,8 @@ public class FunctionSemanticAnalyzer ex
/**
* Add write entities to the semantic analyzer to restrict function creation to privileged users.
*/
- private void addEntities(String functionName, boolean isTemporaryFunction)
- throws SemanticException {
+ private void addEntities(String functionName, boolean isTemporaryFunction,
+ List<ResourceUri> resources) throws SemanticException {
// If the function is being added under a database 'namespace', then add an entity representing
// the database (only applicable to permanent/metastore functions).
// We also add a second entity representing the function name.
@@ -183,5 +185,13 @@ public class FunctionSemanticAnalyzer ex
// Add the function name as a WriteEntity
outputs.add(new WriteEntity(database, functionName, Type.FUNCTION,
WriteEntity.WriteType.DDL_NO_LOCK));
+
+ if (resources != null) {
+ for (ResourceUri resource : resources) {
+ String uriPath = resource.getUri();
+ outputs.add(new WriteEntity(new Path(uriPath),
+ FileUtils.isLocalFile(conf, uriPath)));
+ }
+ }
}
}
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java Sun Oct 5 22:26:43 2014
@@ -29,6 +29,7 @@ import java.util.Set;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
+import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
@@ -45,6 +46,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
@@ -132,6 +134,8 @@ public class GenTezProcContext implement
// remember which reducesinks we've already connected
public final Set<ReduceSinkOperator> connectedReduceSinks;
+ public final Map<Operator<?>, MergeJoinWork> opMergeJoinWorkMap;
+ public CommonMergeJoinOperator currentMergeJoinOperator;
// remember the event operators we've seen
public final Set<AppMasterEventOperator> eventOperatorSet;
@@ -176,6 +180,8 @@ public class GenTezProcContext implement
this.eventOperatorSet = new LinkedHashSet<AppMasterEventOperator>();
this.abandonedEventOperatorSet = new LinkedHashSet<AppMasterEventOperator>();
this.tsToEventMap = new LinkedHashMap<TableScanOperator, List<AppMasterEventOperator>>();
+ this.opMergeJoinWorkMap = new LinkedHashMap<Operator<?>, MergeJoinWork>();
+ this.currentMergeJoinOperator = null;
rootTasks.add(currentTask);
}