You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/10/30 17:22:48 UTC
svn commit: r1635536 [12/28] - in /hive/branches/spark: ./ accumulo-handler/
accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/
accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/columns/
accumulo-handler/src/test/org/apache/hado...
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java Thu Oct 30 16:22:33 2014
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.optimi
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -28,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Stack;
+import java.util.TreeMap;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
@@ -67,7 +69,6 @@ import org.apache.hadoop.hive.ql.plan.Ex
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
@@ -121,7 +122,11 @@ import org.apache.hadoop.hive.serde2.Des
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.util.ReflectionUtils;
public class Vectorizer implements PhysicalPlanResolver {
@@ -282,13 +287,13 @@ public class Vectorizer implements Physi
private PhysicalContext pctx;
- private int keyColCount;
- private int valueColCount;
+ private List<String> reduceColumnNames;
+ private List<TypeInfo> reduceTypeInfos;
public VectorizationDispatcher(PhysicalContext pctx) {
this.pctx = pctx;
- keyColCount = 0;
- valueColCount = 0;
+ reduceColumnNames = null;
+ reduceTypeInfos = null;
}
@Override
@@ -385,14 +390,13 @@ public class Vectorizer implements Physi
HashMap<Node, Object> nodeOutput = new HashMap<Node, Object>();
ogw.startWalking(topNodes, nodeOutput);
- Map<String, Map<Integer, String>> columnVectorTypes = vnp.getScratchColumnVectorTypes();
- mapWork.setScratchColumnVectorTypes(columnVectorTypes);
- Map<String, Map<String, Integer>> columnMap = vnp.getScratchColumnMap();
- mapWork.setScratchColumnMap(columnMap);
+ Map<String, Map<Integer, String>> allScratchColumnVectorTypeMaps = vnp.getAllScratchColumnVectorTypeMaps();
+ mapWork.setAllScratchColumnVectorTypeMaps(allScratchColumnVectorTypeMaps);
+ Map<String, Map<String, Integer>> allColumnVectorMaps = vnp.getAllColumnVectorMaps();
+ mapWork.setAllColumnVectorMaps(allColumnVectorMaps);
if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("vectorTypes: %s", columnVectorTypes.toString()));
- LOG.debug(String.format("columnMap: %s", columnMap.toString()));
+ debugDisplayAllMaps(allColumnVectorMaps, allScratchColumnVectorTypeMaps);
}
return;
@@ -413,7 +417,7 @@ public class Vectorizer implements Physi
return false;
}
StructObjectInspector keyStructObjectInspector = (StructObjectInspector)keyObjectInspector;
- keyColCount = keyStructObjectInspector.getAllStructFieldRefs().size();
+ List<? extends StructField> keyFields = keyStructObjectInspector.getAllStructFieldRefs();
// Tez doesn't use tagging...
if (reduceWork.getNeedsTagging()) {
@@ -426,9 +430,20 @@ public class Vectorizer implements Physi
!(valueObjectInspector instanceof StructObjectInspector)) {
return false;
}
- StructObjectInspector valueStructObjectInspector =
- (StructObjectInspector)valueObjectInspector;
- valueColCount = valueStructObjectInspector.getAllStructFieldRefs().size();
+ StructObjectInspector valueStructObjectInspector = (StructObjectInspector)valueObjectInspector;
+ List<? extends StructField> valueFields = valueStructObjectInspector.getAllStructFieldRefs();
+
+ reduceColumnNames = new ArrayList<String>();
+ reduceTypeInfos = new ArrayList<TypeInfo>();
+
+ for (StructField field: keyFields) {
+ reduceColumnNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName());
+ reduceTypeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(field.getFieldObjectInspector().getTypeName()));
+ }
+ for (StructField field: valueFields) {
+ reduceColumnNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName());
+ reduceTypeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(field.getFieldObjectInspector().getTypeName()));
+ }
} catch (Exception e) {
throw new SemanticException(e);
}
@@ -478,7 +493,7 @@ public class Vectorizer implements Physi
// VectorizationContext... Do we use PreOrderWalker instead of DefaultGraphWalker.
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
ReduceWorkVectorizationNodeProcessor vnp =
- new ReduceWorkVectorizationNodeProcessor(reduceWork, keyColCount, valueColCount);
+ new ReduceWorkVectorizationNodeProcessor(reduceColumnNames);
addReduceWorkRules(opRules, vnp);
Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
GraphWalker ogw = new PreOrderWalker(disp);
@@ -495,18 +510,17 @@ public class Vectorizer implements Physi
Operator<? extends OperatorDesc> reducer = reduceWork.getReducer();
if (reducer.getType().equals(OperatorType.EXTRACT)) {
- ((VectorExtractOperator)reducer).setKeyAndValueColCounts(keyColCount, valueColCount);
+ ((VectorExtractOperator)reducer).setReduceTypeInfos(reduceTypeInfos);
}
- Map<String, Map<Integer, String>> columnVectorTypes = vnp.getScratchColumnVectorTypes();
- reduceWork.setScratchColumnVectorTypes(columnVectorTypes);
- Map<String, Map<String, Integer>> columnMap = vnp.getScratchColumnMap();
- reduceWork.setScratchColumnMap(columnMap);
+ Map<String, Map<Integer, String>> allScratchColumnVectorTypeMaps = vnp.getAllScratchColumnVectorTypeMaps();
+ reduceWork.setAllScratchColumnVectorTypeMaps(allScratchColumnVectorTypeMaps);
+ Map<String, Map<String, Integer>> allColumnVectorMaps = vnp.getAllColumnVectorMaps();
+ reduceWork.setAllColumnVectorMaps(allColumnVectorMaps);
if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("vectorTypes: %s", columnVectorTypes.toString()));
- LOG.debug(String.format("columnMap: %s", columnMap.toString()));
+ debugDisplayAllMaps(allColumnVectorMaps, allScratchColumnVectorTypeMaps);
}
}
}
@@ -571,26 +585,26 @@ public class Vectorizer implements Physi
protected final Set<Operator<? extends OperatorDesc>> opsDone =
new HashSet<Operator<? extends OperatorDesc>>();
- public Map<String, Map<Integer, String>> getScratchColumnVectorTypes() {
- Map<String, Map<Integer, String>> scratchColumnVectorTypes =
+ public Map<String, Map<Integer, String>> getAllScratchColumnVectorTypeMaps() {
+ Map<String, Map<Integer, String>> allScratchColumnVectorTypeMaps =
new HashMap<String, Map<Integer, String>>();
for (String onefile : scratchColumnContext.keySet()) {
VectorizationContext vc = scratchColumnContext.get(onefile);
- Map<Integer, String> cmap = vc.getOutputColumnTypeMap();
- scratchColumnVectorTypes.put(onefile, cmap);
+ Map<Integer, String> cmap = vc.getScratchColumnTypeMap();
+ allScratchColumnVectorTypeMaps.put(onefile, cmap);
}
- return scratchColumnVectorTypes;
+ return allScratchColumnVectorTypeMaps;
}
- public Map<String, Map<String, Integer>> getScratchColumnMap() {
- Map<String, Map<String, Integer>> scratchColumnMap =
+ public Map<String, Map<String, Integer>> getAllColumnVectorMaps() {
+ Map<String, Map<String, Integer>> allColumnVectorMaps =
new HashMap<String, Map<String, Integer>>();
for(String oneFile: scratchColumnContext.keySet()) {
VectorizationContext vc = scratchColumnContext.get(oneFile);
- Map<String, Integer> cmap = vc.getColumnMap();
- scratchColumnMap.put(oneFile, cmap);
+ Map<String, Integer> cmap = vc.getProjectionColumnMap();
+ allColumnVectorMaps.put(oneFile, cmap);
}
- return scratchColumnMap;
+ return allColumnVectorMaps;
}
public VectorizationContext walkStackToFindVectorizationContext(Stack<Node> stack,
@@ -676,10 +690,7 @@ public class Vectorizer implements Physi
vContext.setFileKey(onefile);
scratchColumnContext.put(onefile, vContext);
if (LOG.isDebugEnabled()) {
- LOG.debug("Vectorized MapWork operator " + op.getName() +
- " with vectorization context key=" + vContext.getFileKey() +
- ", vectorTypes: " + vContext.getOutputColumnTypeMap().toString() +
- ", columnMap: " + vContext.getColumnMap().toString());
+ LOG.debug("Vectorized MapWork operator " + op.getName() + " vectorization context " + vContext.toString());
}
break;
}
@@ -710,17 +721,11 @@ public class Vectorizer implements Physi
Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext);
if (LOG.isDebugEnabled()) {
- LOG.debug("Vectorized MapWork operator " + vectorOp.getName() +
- " with vectorization context key=" + vContext.getFileKey() +
- ", vectorTypes: " + vContext.getOutputColumnTypeMap().toString() +
- ", columnMap: " + vContext.getColumnMap().toString());
+ LOG.debug("Vectorized MapWork operator " + vectorOp.getName() + " vectorization context " + vContext.toString());
if (vectorOp instanceof VectorizationContextRegion) {
VectorizationContextRegion vcRegion = (VectorizationContextRegion) vectorOp;
VectorizationContext vOutContext = vcRegion.getOuputVectorizationContext();
- LOG.debug("Vectorized MapWork operator " + vectorOp.getName() +
- " added new vectorization context key=" + vOutContext.getFileKey() +
- ", vectorTypes: " + vOutContext.getOutputColumnTypeMap().toString() +
- ", columnMap: " + vOutContext.getColumnMap().toString());
+ LOG.debug("Vectorized MapWork operator " + vectorOp.getName() + " added vectorization context " + vContext.toString());
}
}
@@ -730,10 +735,7 @@ public class Vectorizer implements Physi
class ReduceWorkVectorizationNodeProcessor extends VectorizationNodeProcessor {
- private final ReduceWork rWork;
- private int keyColCount;
- private int valueColCount;
- private Map<String, Integer> reduceColumnNameMap;
+ private List<String> reduceColumnNames;
private VectorizationContext reduceShuffleVectorizationContext;
@@ -743,12 +745,8 @@ public class Vectorizer implements Physi
return rootVectorOp;
}
- public ReduceWorkVectorizationNodeProcessor(ReduceWork rWork, int keyColCount,
- int valueColCount) {
- this.rWork = rWork;
- reduceColumnNameMap = rWork.getReduceColumnNameMap();
- this.keyColCount = keyColCount;
- this.valueColCount = valueColCount;
+ public ReduceWorkVectorizationNodeProcessor(List<String> reduceColumnNames) {
+ this.reduceColumnNames = reduceColumnNames;
rootVectorOp = null;
reduceShuffleVectorizationContext = null;
}
@@ -766,17 +764,16 @@ public class Vectorizer implements Physi
boolean saveRootVectorOp = false;
if (op.getParentOperators().size() == 0) {
- vContext = getReduceVectorizationContext(reduceColumnNameMap);
+ LOG.info("ReduceWorkVectorizationNodeProcessor process reduceColumnNames " + reduceColumnNames.toString());
+
+ vContext = new VectorizationContext(reduceColumnNames);
vContext.setFileKey("_REDUCE_SHUFFLE_");
scratchColumnContext.put("_REDUCE_SHUFFLE_", vContext);
reduceShuffleVectorizationContext = vContext;
saveRootVectorOp = true;
if (LOG.isDebugEnabled()) {
- LOG.debug("Vectorized ReduceWork reduce shuffle vectorization context key=" +
- vContext.getFileKey() +
- ", vectorTypes: " + vContext.getOutputColumnTypeMap().toString() +
- ", columnMap: " + vContext.getColumnMap().toString());
+ LOG.debug("Vectorized ReduceWork reduce shuffle vectorization context " + vContext.toString());
}
} else {
vContext = walkStackToFindVectorizationContext(stack, op);
@@ -802,17 +799,11 @@ public class Vectorizer implements Physi
Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext);
if (LOG.isDebugEnabled()) {
- LOG.debug("Vectorized ReduceWork operator " + vectorOp.getName() +
- " with vectorization context key=" + vContext.getFileKey() +
- ", vectorTypes: " + vContext.getOutputColumnTypeMap().toString() +
- ", columnMap: " + vContext.getColumnMap().toString());
+ LOG.debug("Vectorized ReduceWork operator " + vectorOp.getName() + " vectorization context " + vContext.toString());
if (vectorOp instanceof VectorizationContextRegion) {
VectorizationContextRegion vcRegion = (VectorizationContextRegion) vectorOp;
VectorizationContext vOutContext = vcRegion.getOuputVectorizationContext();
- LOG.debug("Vectorized ReduceWork operator " + vectorOp.getName() +
- " added new vectorization context key=" + vOutContext.getFileKey() +
- ", vectorTypes: " + vOutContext.getOutputColumnTypeMap().toString() +
- ", columnMap: " + vOutContext.getColumnMap().toString());
+ LOG.debug("Vectorized ReduceWork operator " + vectorOp.getName() + " added vectorization context " + vContext.toString());
}
}
if (vectorOp instanceof VectorGroupByOperator) {
@@ -830,7 +821,7 @@ public class Vectorizer implements Physi
private static class ValidatorVectorizationContext extends VectorizationContext {
private ValidatorVectorizationContext() {
- super(null, -1);
+ super();
}
@Override
@@ -918,7 +909,12 @@ public class Vectorizer implements Physi
}
break;
case GROUPBY:
- ret = validateGroupByOperator((GroupByOperator) op, true, true);
+ if (HiveConf.getBoolVar(physicalContext.getConf(),
+ HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_GROUPBY_ENABLED)) {
+ ret = validateGroupByOperator((GroupByOperator) op, true, true);
+ } else {
+ ret = false;
+ }
break;
case FILTER:
ret = validateFilterOperator((FilterOperator) op);
@@ -1026,12 +1022,10 @@ public class Vectorizer implements Physi
if (!ret) {
return false;
}
- boolean isVectorOutput = isTez && aggregatorsOutputIsPrimitive(desc.getAggregators(), isReduce);
- vectorDesc.setVectorOutput(isVectorOutput);
if (isReduce) {
if (desc.isDistinct()) {
LOG.info("Distinct not supported in reduce vector mode");
- return false;
+ return false;
}
// Sort-based GroupBy?
if (desc.getMode() != GroupByDesc.Mode.COMPLETE &&
@@ -1044,21 +1038,24 @@ public class Vectorizer implements Physi
LOG.info("Reduce GROUP BY mode is " + desc.getMode().name());
if (desc.getGroupKeyNotReductionKey()) {
LOG.info("Reduce vector mode not supported when group key is not reduction key");
- return false;
+ return false;
}
- if (!isVectorOutput) {
+ if (!aggregatorsOutputIsPrimitive(desc.getAggregators(), isReduce)) {
LOG.info("Reduce vector mode only supported when aggregate outputs are primitive types");
- return false;
+ return false;
}
if (desc.getKeys().size() > 0) {
+ if (op.getParentOperators().size() > 0) {
+ LOG.info("Reduce vector mode can only handle a key group GROUP BY operator when it is fed by reduce-shuffle");
+ return false;
+ }
LOG.info("Reduce-side GROUP BY will process key groups");
vectorDesc.setVectorGroupBatches(true);
} else {
LOG.info("Reduce-side GROUP BY will do global aggregation");
}
+ vectorDesc.setVectorOutput(true);
vectorDesc.setIsReduce(true);
- } else {
- LOG.info("Downstream operators of map-side GROUP BY will be vectorized: " + isVectorOutput);
}
return true;
}
@@ -1227,21 +1224,17 @@ public class Vectorizer implements Physi
PhysicalContext pctx) {
RowSchema rs = op.getSchema();
- Map<String, Integer> cmap = new HashMap<String, Integer>();
- int columnCount = 0;
+ // Add all non-virtual columns to make a vectorization context for
+ // the TableScan operator.
+ VectorizationContext vContext = new VectorizationContext();
for (ColumnInfo c : rs.getSignature()) {
// Earlier, validation code should have eliminated virtual columns usage (HIVE-5560).
if (!isVirtualColumn(c)) {
- cmap.put(c.getInternalName(), columnCount++);
+ vContext.addInitialColumn(c.getInternalName());
}
}
-
- return new VectorizationContext(cmap, columnCount);
- }
-
- private VectorizationContext getReduceVectorizationContext(
- Map<String, Integer> reduceColumnNameMap) {
- return new VectorizationContext(reduceColumnNameMap, reduceColumnNameMap.size());
+ vContext.finishedAddingInitialColumns();
+ return vContext;
}
private void fixupParentChildOperators(Operator<? extends OperatorDesc> op,
@@ -1297,4 +1290,41 @@ public class Vectorizer implements Physi
}
return false;
}
+
+ public void debugDisplayAllMaps(Map<String, Map<String, Integer>> allColumnVectorMaps,
+ Map<String, Map<Integer, String>> allScratchColumnVectorTypeMaps) {
+
+ // Context keys grow in length since they are a path...
+ Comparator<String> comparerShorterString = new Comparator<String>() {
+ @Override
+ public int compare(String o1, String o2) {
+ Integer length1 = o1.length();
+ Integer length2 = o2.length();
+ return length1.compareTo(length2);
+ }};
+
+ Comparator<Integer> comparerInteger = new Comparator<Integer>() {
+ @Override
+ public int compare(Integer o1, Integer o2) {
+ return o1.compareTo(o2);
+ }};
+
+ Map<String, Map<Integer, String>> sortedAllColumnVectorMaps = new TreeMap<String, Map<Integer, String>>(comparerShorterString);
+ for (Map.Entry<String, Map<String, Integer>> entry : allColumnVectorMaps.entrySet()) {
+ Map<Integer, String> sortedColumnMap = new TreeMap<Integer, String>(comparerInteger);
+ for (Map.Entry<String, Integer> innerEntry : entry.getValue().entrySet()) {
+ sortedColumnMap.put(innerEntry.getValue(), innerEntry.getKey());
+ }
+ sortedAllColumnVectorMaps.put(entry.getKey(), sortedColumnMap);
+ }
+ LOG.debug("sortedAllColumnVectorMaps " + sortedAllColumnVectorMaps);
+
+ Map<String, Map<Integer, String>> sortedAllScratchColumnVectorTypeMap = new TreeMap<String, Map<Integer, String>>(comparerShorterString);
+ for (Map.Entry<String, Map<Integer, String>> entry : allScratchColumnVectorTypeMaps.entrySet()) {
+ Map<Integer, String> sortedScratchColumnTypeMap = new TreeMap<Integer, String>(comparerInteger);
+ sortedScratchColumnTypeMap.putAll(entry.getValue());
+ sortedAllScratchColumnVectorTypeMap.put(entry.getKey(), sortedScratchColumnTypeMap);
+ }
+ LOG.debug("sortedAllScratchColumnVectorTypeMap " + sortedAllScratchColumnVectorTypeMap);
+ }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java Thu Oct 30 16:22:33 2014
@@ -56,7 +56,9 @@ import org.apache.hadoop.hive.ql.plan.Ex
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
+import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
/**
@@ -188,12 +190,18 @@ public class PartitionPruner implements
// Replace virtual columns with nulls. See javadoc for details.
prunerExpr = removeNonPartCols(prunerExpr, extractPartColNames(tab), partColsUsedInFilter);
// Remove all parts that are not partition columns. See javadoc for details.
- ExprNodeGenericFuncDesc compactExpr = (ExprNodeGenericFuncDesc)compactExpr(prunerExpr.clone());
+ ExprNodeDesc compactExpr = compactExpr(prunerExpr.clone());
String oldFilter = prunerExpr.getExprString();
- if (compactExpr == null) {
- // Non-strict mode, and all the predicates are on non-partition columns - get everything.
- LOG.debug("Filter " + oldFilter + " was null after compacting");
- return getAllPartsFromCacheOrServer(tab, key, true, prunedPartitionsMap);
+ if (isBooleanExpr(compactExpr)) {
+ // For null and true values, return every partition
+ if (!isFalseExpr(compactExpr)) {
+ // Non-strict mode, and all the predicates are on non-partition columns - get everything.
+ LOG.debug("Filter " + oldFilter + " was null after compacting");
+ return getAllPartsFromCacheOrServer(tab, key, true, prunedPartitionsMap);
+ } else {
+ return new PrunedPartitionList(tab, new LinkedHashSet<Partition>(new ArrayList<Partition>()),
+ new ArrayList<String>(), false);
+ }
}
LOG.debug("Filter w/ compacting: " + compactExpr.getExprString()
+ "; filter w/o compacting: " + oldFilter);
@@ -204,7 +212,7 @@ public class PartitionPruner implements
return ppList;
}
- ppList = getPartitionsFromServer(tab, compactExpr, conf, alias, partColsUsedInFilter, oldFilter.equals(compactExpr.getExprString()));
+ ppList = getPartitionsFromServer(tab, (ExprNodeGenericFuncDesc)compactExpr, conf, alias, partColsUsedInFilter, oldFilter.equals(compactExpr.getExprString()));
prunedPartitionsMap.put(key, ppList);
return ppList;
}
@@ -225,16 +233,22 @@ public class PartitionPruner implements
partsCache.put(key, ppList);
return ppList;
}
-
- private static ExprNodeDesc removeTruePredciates(ExprNodeDesc e) {
- if (e instanceof ExprNodeConstantDesc) {
- ExprNodeConstantDesc eC = (ExprNodeConstantDesc) e;
- if (e.getTypeInfo() == TypeInfoFactory.booleanTypeInfo
- && eC.getValue() == Boolean.TRUE) {
- return null;
- }
- }
- return e;
+
+ static private boolean isBooleanExpr(ExprNodeDesc expr) {
+ return expr != null && expr instanceof ExprNodeConstantDesc &&
+ ((ExprNodeConstantDesc)expr).getTypeInfo() instanceof PrimitiveTypeInfo &&
+ ((PrimitiveTypeInfo)(((ExprNodeConstantDesc)expr).getTypeInfo())).
+ getTypeName().equals(serdeConstants.BOOLEAN_TYPE_NAME);
+ }
+ static private boolean isTrueExpr(ExprNodeDesc expr) {
+ return isBooleanExpr(expr) &&
+ ((ExprNodeConstantDesc)expr).getValue() != null &&
+ ((ExprNodeConstantDesc)expr).getValue().equals(Boolean.TRUE);
+ }
+ static private boolean isFalseExpr(ExprNodeDesc expr) {
+ return isBooleanExpr(expr) &&
+ ((ExprNodeConstantDesc)expr).getValue() != null &&
+ ((ExprNodeConstantDesc)expr).getValue().equals(Boolean.FALSE);
}
/**
@@ -245,10 +259,13 @@ public class PartitionPruner implements
* @return partition pruning expression that only contains partition columns.
*/
static private ExprNodeDesc compactExpr(ExprNodeDesc expr) {
- if (expr instanceof ExprNodeConstantDesc) {
- expr = removeTruePredciates(expr);
- if (expr == null || ((ExprNodeConstantDesc)expr).getValue() == null) {
- return null;
+ // If this is a constant boolean expression, return the value.
+ if (expr == null) {
+ return null;
+ }
+ if (expr instanceof ExprNodeConstantDesc) {
+ if (isBooleanExpr(expr)) {
+ return expr;
} else {
throw new IllegalStateException("Unexpected non-null ExprNodeConstantDesc: "
+ expr.getExprString());
@@ -256,22 +273,29 @@ public class PartitionPruner implements
} else if (expr instanceof ExprNodeGenericFuncDesc) {
GenericUDF udf = ((ExprNodeGenericFuncDesc)expr).getGenericUDF();
boolean isAnd = udf instanceof GenericUDFOPAnd;
- if (isAnd || udf instanceof GenericUDFOPOr) {
+ boolean isOr = udf instanceof GenericUDFOPOr;
+
+ if (isAnd || isOr) {
List<ExprNodeDesc> children = expr.getChildren();
- ExprNodeDesc left = removeTruePredciates(children.get(0));
- children.set(0, left == null ? null : compactExpr(left));
- ExprNodeDesc right = removeTruePredciates(children.get(1));
- children.set(1, right == null ? null : compactExpr(right));
-
- // Note that one does not simply compact (not-null or null) to not-null.
- // Only if we have an "and" is it valid to send one side to metastore.
- if (children.get(0) == null && children.get(1) == null) {
- return null;
- } else if (children.get(0) == null) {
- return isAnd ? children.get(1) : null;
- } else if (children.get(1) == null) {
- return isAnd ? children.get(0) : null;
- }
+ ExprNodeDesc left = children.get(0);
+ children.set(0, compactExpr(left));
+ ExprNodeDesc right = children.get(1);
+ children.set(1, compactExpr(right));
+
+ if (isTrueExpr(children.get(0)) && isTrueExpr(children.get(1))) {
+ return new ExprNodeConstantDesc(Boolean.TRUE);
+ } else if (isTrueExpr(children.get(0))) {
+ return isAnd ? children.get(1) : new ExprNodeConstantDesc(Boolean.TRUE);
+ } else if (isTrueExpr(children.get(1))) {
+ return isAnd ? children.get(0) : new ExprNodeConstantDesc(Boolean.TRUE);
+ } else if (isFalseExpr(children.get(0)) && isFalseExpr(children.get(1))) {
+ return new ExprNodeConstantDesc(Boolean.FALSE);
+ } else if (isFalseExpr(children.get(0))) {
+ return isAnd ? new ExprNodeConstantDesc(Boolean.FALSE) : children.get(1);
+ } else if (isFalseExpr(children.get(1))) {
+ return isAnd ? new ExprNodeConstantDesc(Boolean.FALSE) : children.get(0);
+ }
+
}
return expr;
} else {
@@ -296,9 +320,9 @@ public class PartitionPruner implements
if (!partCols.contains(column)) {
// Column doesn't appear to be a partition column for the table.
return new ExprNodeConstantDesc(expr.getTypeInfo(), null);
- }
+ }
referred.add(column);
- }
+ }
if (expr instanceof ExprNodeGenericFuncDesc) {
List<ExprNodeDesc> children = expr.getChildren();
for (int i = 0; i < children.size(); ++i) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java Thu Oct 30 16:22:33 2014
@@ -42,7 +42,6 @@ import org.apache.hadoop.hive.ql.exec.Re
import org.apache.hadoop.hive.ql.exec.RowSchema;
import org.apache.hadoop.hive.ql.exec.SelectOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
-import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -151,28 +150,28 @@ public class StatsRulesProcFactory {
Statistics parentStats = parent.getStatistics();
AnnotateStatsProcCtx aspCtx = (AnnotateStatsProcCtx) procCtx;
HiveConf conf = aspCtx.getConf();
+ Statistics stats = null;
- // SELECT (*) does not change the statistics. Just pass on the parent statistics
- if (sop.getConf().isSelectStar()) {
+ if (parentStats != null) {
try {
- if (parentStats != null) {
- sop.setStatistics(parentStats.clone());
- }
+ stats = parentStats.clone();
} catch (CloneNotSupportedException e) {
throw new SemanticException(ErrorMsg.STATISTICS_CLONING_FAILED.getMsg());
}
- return null;
}
try {
if (satisfyPrecondition(parentStats)) {
- Statistics stats = parentStats.clone();
- List<ColStatistics> colStats =
- StatsUtils.getColStatisticsFromExprMap(conf, parentStats, sop.getColumnExprMap(),
- sop.getSchema());
- long dataSize = StatsUtils.getDataSizeFromColumnStats(stats.getNumRows(), colStats);
+ // this will take care of mapping between input column names and output column names. The
+ // returned column stats will have the output column names.
+ List<ColStatistics> colStats = StatsUtils.getColStatisticsFromExprMap(conf, parentStats,
+ sop.getColumnExprMap(), sop.getSchema());
stats.setColumnStats(colStats);
- stats.setDataSize(setMaxIfInvalid(dataSize));
+ // in case of select(*) the data size does not change
+ if (!sop.getConf().isSelectStar() && !sop.getConf().isSelStarNoCompute()) {
+ long dataSize = StatsUtils.getDataSizeFromColumnStats(stats.getNumRows(), colStats);
+ stats.setDataSize(setMaxIfInvalid(dataSize));
+ }
sop.setStatistics(stats);
if (isDebugEnabled) {
@@ -889,16 +888,12 @@ public class StatsRulesProcFactory {
GroupByDesc.Mode mode = desc.getMode();
if (mode.equals(GroupByDesc.Mode.HASH)) {
- float hashAggMem = conf.getFloatVar(
- HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY);
- float hashAggMaxThreshold = conf.getFloatVar(
- HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD);
-
- // get memory for container. May be use mapreduce.map.java.opts instead?
- long totalMemory =
- DagUtils.getContainerResource(conf).getMemory() * 1000L * 1000L;
- long maxMemHashAgg = Math
- .round(totalMemory * hashAggMem * hashAggMaxThreshold);
+ float hashAggMem = conf.getFloatVar(HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY);
+ float hashAggMaxThreshold = conf.getFloatVar(HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD);
+
+ // get available map memory
+ long totalMemory = StatsUtils.getAvailableMemory(conf) * 1000L * 1000L;
+ long maxMemHashAgg = Math.round(totalMemory * hashAggMem * hashAggMaxThreshold);
// estimated number of rows will be product of NDVs
long numEstimatedRows = 1;
@@ -1026,11 +1021,17 @@ public class StatsRulesProcFactory {
*/
public static class JoinStatsRule extends DefaultStatsRule implements NodeProcessor {
+ private boolean pkfkInferred = false;
+ private long newNumRows = 0;
+ private List<Operator<? extends OperatorDesc>> parents;
+ private CommonJoinOperator<? extends JoinDesc> jop;
+ private int numAttr = 1;
+
@Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
- CommonJoinOperator<? extends JoinDesc> jop = (CommonJoinOperator<? extends JoinDesc>) nd;
- List<Operator<? extends OperatorDesc>> parents = jop.getParentOperators();
+ jop = (CommonJoinOperator<? extends JoinDesc>) nd;
+ parents = jop.getParentOperators();
AnnotateStatsProcCtx aspCtx = (AnnotateStatsProcCtx) procCtx;
HiveConf conf = aspCtx.getConf();
boolean allStatsAvail = true;
@@ -1057,22 +1058,26 @@ public class StatsRulesProcFactory {
Statistics stats = new Statistics();
Map<String, Long> rowCountParents = new HashMap<String, Long>();
List<Long> distinctVals = Lists.newArrayList();
-
- // 2 relations, multiple attributes
- boolean multiAttr = false;
- int numAttr = 1;
int numParent = parents.size();
-
Map<String, ColStatistics> joinedColStats = Maps.newHashMap();
Map<Integer, List<String>> joinKeys = Maps.newHashMap();
List<Long> rowCounts = Lists.newArrayList();
+ // detect if there are multiple attributes in join key
+ ReduceSinkOperator rsOp = (ReduceSinkOperator) jop.getParentOperators().get(0);
+ List<ExprNodeDesc> keyExprs = rsOp.getConf().getKeyCols();
+ numAttr = keyExprs.size();
+
+ // infer PK-FK relationship in single attribute join case
+ pkfkInferred = false;
+ inferPKFKRelationship();
+
// get the join keys from parent ReduceSink operators
for (int pos = 0; pos < parents.size(); pos++) {
ReduceSinkOperator parent = (ReduceSinkOperator) jop.getParentOperators().get(pos);
Statistics parentStats = parent.getStatistics();
- List<ExprNodeDesc> keyExprs = parent.getConf().getKeyCols();
+ keyExprs = parent.getConf().getKeyCols();
// Parent RS may have column statistics from multiple parents.
// Populate table alias to row count map, this will be used later to
@@ -1087,12 +1092,6 @@ public class StatsRulesProcFactory {
}
rowCounts.add(parentStats.getNumRows());
- // multi-attribute join key
- if (keyExprs.size() > 1) {
- multiAttr = true;
- numAttr = keyExprs.size();
- }
-
// compute fully qualified join key column names. this name will be
// used to quickly look-up for column statistics of join key.
// TODO: expressions in join condition will be ignored. assign
@@ -1115,7 +1114,7 @@ public class StatsRulesProcFactory {
// attribute join, else max(V(R,y1), V(S,y1)) * max(V(R,y2), V(S,y2))
// in case of multi-attribute join
long denom = 1;
- if (multiAttr) {
+ if (numAttr > 1) {
List<Long> perAttrDVs = Lists.newArrayList();
for (int idx = 0; idx < numAttr; idx++) {
for (Integer i : joinKeys.keySet()) {
@@ -1154,9 +1153,7 @@ public class StatsRulesProcFactory {
}
// Update NDV of joined columns to be min(V(R,y), V(S,y))
- if (multiAttr) {
- updateJoinColumnsNDV(joinKeys, joinedColStats, numAttr);
- }
+ updateJoinColumnsNDV(joinKeys, joinedColStats, numAttr);
// column statistics from different sources are put together and rename
// fully qualified column names based on output schema of join operator
@@ -1186,10 +1183,8 @@ public class StatsRulesProcFactory {
// update join statistics
stats.setColumnStats(outColStats);
- long newRowCount = computeNewRowCount(rowCounts, denom);
-
- updateStatsForJoinType(stats, newRowCount, jop, rowCountParents,
- outInTabAlias);
+ long newRowCount = pkfkInferred ? newNumRows : computeNewRowCount(rowCounts, denom);
+ updateStatsForJoinType(stats, newRowCount, jop, rowCountParents,outInTabAlias);
jop.setStatistics(stats);
if (isDebugEnabled) {
@@ -1234,6 +1229,195 @@ public class StatsRulesProcFactory {
return null;
}
+ private void inferPKFKRelationship() {
+ if (numAttr == 1) {
+ List<Integer> parentsWithPK = getPrimaryKeyCandidates(parents);
+
+ // in case of fact to many dimensional tables join, the join key in fact table will be
+ // mostly foreign key which will have corresponding primary key in dimension table.
+ // The selectivity of fact table in that case will be product of all selectivities of
+ // dimension tables (assumes conjunctivity)
+ for (Integer id : parentsWithPK) {
+ ColStatistics csPK = null;
+ Operator<? extends OperatorDesc> parent = parents.get(id);
+ for (ColStatistics cs : parent.getStatistics().getColumnStats()) {
+ if (cs.isPrimaryKey()) {
+ csPK = cs;
+ break;
+ }
+ }
+
+ // infer foreign key candidates positions
+ List<Integer> parentsWithFK = getForeignKeyCandidates(parents, csPK);
+ if (parentsWithFK.size() == 1 &&
+ parentsWithFK.size() + parentsWithPK.size() == parents.size()) {
+ Operator<? extends OperatorDesc> parentWithFK = parents.get(parentsWithFK.get(0));
+ List<Float> parentsSel = getSelectivity(parents, parentsWithPK);
+ Float prodSelectivity = 1.0f;
+ for (Float selectivity : parentsSel) {
+ prodSelectivity *= selectivity;
+ }
+ newNumRows = (long) Math.ceil(
+ parentWithFK.getStatistics().getNumRows() * prodSelectivity);
+ pkfkInferred = true;
+
+ // some debug information
+ if (isDebugEnabled) {
+ List<String> parentIds = Lists.newArrayList();
+
+ // print primary key containing parents
+ for (Integer i : parentsWithPK) {
+ parentIds.add(parents.get(i).toString());
+ }
+ LOG.debug("STATS-" + jop.toString() + ": PK parent id(s) - " + parentIds);
+ parentIds.clear();
+
+ // print foreign key containing parents
+ for (Integer i : parentsWithFK) {
+ parentIds.add(parents.get(i).toString());
+ }
+ LOG.debug("STATS-" + jop.toString() + ": FK parent id(s) - " + parentIds);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Get selectivity of reduce sink operators.
+ * @param ops - reduce sink operators
+ * @param opsWithPK - reduce sink operators with primary keys
+ * @return - list of selectivity for primary key containing operators
+ */
+ private List<Float> getSelectivity(List<Operator<? extends OperatorDesc>> ops,
+ List<Integer> opsWithPK) {
+ List<Float> result = Lists.newArrayList();
+ for (Integer idx : opsWithPK) {
+ Operator<? extends OperatorDesc> op = ops.get(idx);
+ float selectivity = getSelectivitySimpleTree(op);
+ result.add(selectivity);
+ }
+ return result;
+ }
+
+ private float getSelectivitySimpleTree(Operator<? extends OperatorDesc> op) {
+ TableScanOperator tsOp = OperatorUtils
+ .findSingleOperatorUpstream(op, TableScanOperator.class);
+ if (tsOp == null) {
+ // complex tree with multiple parents
+ return getSelectivityComplexTree(op);
+ } else {
+ // simple tree with single parent
+ long inputRow = tsOp.getStatistics().getNumRows();
+ long outputRow = op.getStatistics().getNumRows();
+ return (float) outputRow / (float) inputRow;
+ }
+ }
+
+ private float getSelectivityComplexTree(Operator<? extends OperatorDesc> op) {
+ Operator<? extends OperatorDesc> multiParentOp = null;
+ Operator<? extends OperatorDesc> currentOp = op;
+
+ // TS-1 TS-2
+ // | |
+ // RS-1 RS-2
+ // \ /
+ // JOIN
+ // |
+ // FIL
+ // |
+ // RS-3
+ //
+ // For the above complex operator tree,
+ // selectivity(JOIN) = selectivity(RS-1) * selectivity(RS-2) and
+ // selectivity(RS-3) = numRows(RS-3)/numRows(JOIN) * selectivity(JOIN)
+ while(multiParentOp == null) {
+ if (op.getParentOperators().size() > 1) {
+ multiParentOp = op;
+ } else {
+ op = op.getParentOperators().get(0);
+ }
+ }
+
+ float selMultiParent = 1.0f;
+ for(Operator<? extends OperatorDesc> parent : multiParentOp.getParentOperators()) {
+ // In the above example, TS-1 -> RS-1 and TS-2 -> RS-2 are simple trees
+ selMultiParent *= getSelectivitySimpleTree(parent);
+ }
+
+ float selCurrOp = ((float) currentOp.getStatistics().getNumRows() /
+ (float) multiParentOp.getStatistics().getNumRows()) * selMultiParent;
+
+ return selCurrOp;
+ }
+
+ /**
+ * Returns the index of parents whose join key column statistics ranges are within the specified
+ * primary key range (inferred as foreign keys).
+ * @param ops - operators
+ * @param csPK - column statistics of primary key
+ * @return - list of foreign key containing parent ids
+ */
+ private List<Integer> getForeignKeyCandidates(List<Operator<? extends OperatorDesc>> ops,
+ ColStatistics csPK) {
+ List<Integer> result = Lists.newArrayList();
+ if (csPK == null || ops == null) {
+ return result;
+ }
+
+ for (int i = 0; i < ops.size(); i++) {
+ Operator<? extends OperatorDesc> op = ops.get(i);
+ if (op != null && op instanceof ReduceSinkOperator) {
+ ReduceSinkOperator rsOp = (ReduceSinkOperator) op;
+ List<ExprNodeDesc> keys = rsOp.getConf().getKeyCols();
+ List<String> fqCols = StatsUtils.getFullQualifedColNameFromExprs(keys,
+ rsOp.getColumnExprMap());
+ if (fqCols.size() == 1) {
+ String joinCol = fqCols.get(0);
+ if (rsOp.getStatistics() != null) {
+ ColStatistics cs = rsOp.getStatistics().getColumnStatisticsFromFQColName(joinCol);
+ if (cs != null && !cs.isPrimaryKey()) {
+ if (StatsUtils.inferForeignKey(csPK, cs)) {
+ result.add(i);
+ }
+ }
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Returns the index of parents whose join key columns are infer as primary keys
+ * @param ops - operators
+ * @return - list of primary key containing parent ids
+ */
+ private List<Integer> getPrimaryKeyCandidates(List<Operator<? extends OperatorDesc>> ops) {
+ List<Integer> result = Lists.newArrayList();
+ if (ops != null && !ops.isEmpty()) {
+ for (int i = 0; i < ops.size(); i++) {
+ Operator<? extends OperatorDesc> op = ops.get(i);
+ if (op instanceof ReduceSinkOperator) {
+ ReduceSinkOperator rsOp = (ReduceSinkOperator) op;
+ List<ExprNodeDesc> keys = rsOp.getConf().getKeyCols();
+ List<String> fqCols = StatsUtils.getFullQualifedColNameFromExprs(keys,
+ rsOp.getColumnExprMap());
+ if (fqCols.size() == 1) {
+ String joinCol = fqCols.get(0);
+ if (rsOp.getStatistics() != null) {
+ ColStatistics cs = rsOp.getStatistics().getColumnStatisticsFromFQColName(joinCol);
+ if (cs != null && cs.isPrimaryKey()) {
+ result.add(i);
+ }
+ }
+ }
+ }
+ }
+ }
+ return result;
+ }
+
private Long getEasedOutDenominator(List<Long> distinctVals) {
// Exponential back-off for NDVs.
// 1) Descending order sort of NDVs
@@ -1253,7 +1437,7 @@ public class StatsRulesProcFactory {
Map<String, Long> rowCountParents,
Map<String, String> outInTabAlias) {
- if (newNumRows <= 0) {
+ if (newNumRows < 0) {
LOG.info("STATS-" + jop.toString() + ": Overflow in number of rows."
+ newNumRows + " rows will be set to Long.MAX_VALUE");
}
@@ -1528,6 +1712,8 @@ public class StatsRulesProcFactory {
Object... nodeOutputs) throws SemanticException {
Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) nd;
OperatorDesc conf = op.getConf();
+ AnnotateStatsProcCtx aspCtx = (AnnotateStatsProcCtx) procCtx;
+ HiveConf hconf = aspCtx.getConf();
if (conf != null) {
Statistics stats = conf.getStatistics();
@@ -1544,7 +1730,9 @@ public class StatsRulesProcFactory {
stats.addToNumRows(parentStats.getNumRows());
stats.addToDataSize(parentStats.getDataSize());
stats.updateColumnStatsState(parentStats.getColumnStatsState());
- stats.addToColumnStats(parentStats.getColumnStats());
+ List<ColStatistics> colStats = StatsUtils.getColStatisticsFromExprMap(hconf,
+ parentStats, op.getColumnExprMap(), op.getSchema());
+ stats.addToColumnStats(colStats);
op.getConf().setStatistics(stats);
if (isDebugEnabled) {
@@ -1622,7 +1810,7 @@ public class StatsRulesProcFactory {
boolean useColStats, Operator<? extends OperatorDesc> op,
boolean updateNDV) {
- if (newNumRows <= 0) {
+ if (newNumRows < 0) {
LOG.info("STATS-" + op.toString() + ": Overflow in number of rows."
+ newNumRows + " rows will be set to Long.MAX_VALUE");
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java Thu Oct 30 16:22:33 2014
@@ -79,7 +79,7 @@ import com.google.common.annotations.Vis
*
*/
public abstract class BaseSemanticAnalyzer {
- private static final Log STATIC_LOG = LogFactory.getLog(BaseSemanticAnalyzer.class.getName());
+ protected static final Log STATIC_LOG = LogFactory.getLog(BaseSemanticAnalyzer.class.getName());
protected final Hive db;
protected final HiveConf conf;
protected List<Task<? extends Serializable>> rootTasks;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java Thu Oct 30 16:22:33 2014
@@ -53,6 +53,7 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Index;
import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.metastore.api.SkewedInfo;
import org.apache.hadoop.hive.ql.Driver;
@@ -214,24 +215,6 @@ public class DDLSemanticAnalyzer extends
return typeName;
}
- static class TablePartition {
- String tableName;
- HashMap<String, String> partSpec = null;
-
- public TablePartition() {
- }
-
- public TablePartition(ASTNode tblPart) throws SemanticException {
- tableName = getDotName((getQualifiedTableName((ASTNode) tblPart.getChild(0))));
- if (tblPart.getChildCount() > 1) {
- ASTNode part = (ASTNode) tblPart.getChild(1);
- if (part.getToken().getType() == HiveParser.TOK_PARTSPEC) {
- this.partSpec = DDLSemanticAnalyzer.getPartSpec(part);
- }
- }
- }
- }
-
public DDLSemanticAnalyzer(HiveConf conf) throws SemanticException {
this(conf, createHiveDB(conf));
}
@@ -1034,7 +1017,7 @@ public class DDLSemanticAnalyzer extends
rootTasks.add(truncateTask);
}
- private boolean isFullSpec(Table table, Map<String, String> partSpec) {
+ public static boolean isFullSpec(Table table, Map<String, String> partSpec) {
for (FieldSchema partCol : table.getPartCols()) {
if (partSpec.get(partCol.getName()) == null) {
return false;
@@ -1139,20 +1122,25 @@ public class DDLSemanticAnalyzer extends
// configured not to ignore this
boolean throwException =
!ifExists && !HiveConf.getBoolVar(conf, ConfVars.DROPIGNORESNONEXISTENT);
- if (throwException) {
- try {
- Index idx = db.getIndex(tableName, indexName);
- } catch (HiveException e) {
+ Table tbl = getTable(tableName, false);
+ if (throwException && tbl == null) {
+ throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(tableName));
+ }
+ try {
+ Index idx = db.getIndex(tableName, indexName);
+ } catch (HiveException e) {
+ if (!(e.getCause() instanceof NoSuchObjectException)) {
+ throw new SemanticException(ErrorMsg.GENERIC_ERROR.getMsg("dropping index"), e);
+ }
+ if (throwException) {
throw new SemanticException(ErrorMsg.INVALID_INDEX.getMsg(indexName));
}
}
-
- Table tbl = getTable(tableName, false);
if (tbl != null) {
- inputs.add(new ReadEntity(getTable(tableName)));
+ inputs.add(new ReadEntity(tbl));
}
- DropIndexDesc dropIdxDesc = new DropIndexDesc(indexName, tableName);
+ DropIndexDesc dropIdxDesc = new DropIndexDesc(indexName, tableName, throwException);
rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(),
dropIdxDesc), conf));
}
@@ -1399,11 +1387,22 @@ public class DDLSemanticAnalyzer extends
// ReadEntity as no lock.
re.noLockNeeded();
inputs.add(re);
- if (desc == null || desc.getOp() != AlterTableDesc.AlterTableTypes.ALTERPROTECTMODE) {
+
+ if (isFullSpec(tab, partSpec)) {
+ // Fully specified partition spec
Partition part = getPartition(tab, partSpec, true);
- outputs.add(new WriteEntity(part, writeType));
- }
- else {
+ outputs.add(new WriteEntity(part, writeType));
+ } else {
+ // Partial partition spec supplied. Make sure this is allowed.
+ if (desc == null
+ || !AlterTableDesc.doesAlterTableTypeSupportPartialPartitionSpec(desc.getOp())) {
+ String alterTabletype = (desc != null) ? desc.getOp().name() : "";
+ throw new SemanticException(
+ ErrorMsg.ALTER_TABLE_TYPE_PARTIAL_PARTITION_SPEC_NO_SUPPORTED, alterTabletype);
+ } else if (!conf.getBoolVar(HiveConf.ConfVars.DYNAMICPARTITIONING)) {
+ throw new SemanticException(ErrorMsg.DYNAMIC_PARTITION_DISABLED);
+ }
+
for (Partition part : getPartitions(tab, partSpec, true)) {
outputs.add(new WriteEntity(part, writeType));
}
@@ -2240,6 +2239,10 @@ public class DDLSemanticAnalyzer extends
if (ast.getChildCount() == 1) {
String funcNames = stripQuotes(ast.getChild(0).getText());
showFuncsDesc = new ShowFunctionsDesc(ctx.getResFile(), funcNames);
+ } else if (ast.getChildCount() == 2) {
+ assert (ast.getChild(0).getType() == HiveParser.KW_LIKE);
+ String funcNames = stripQuotes(ast.getChild(1).getText());
+ showFuncsDesc = new ShowFunctionsDesc(ctx.getResFile(), funcNames, true);
} else {
showFuncsDesc = new ShowFunctionsDesc(ctx.getResFile());
}
@@ -2805,7 +2808,7 @@ public class DDLSemanticAnalyzer extends
* @param ast
* The parsed command tree.
* @throws SemanticException
- * Parsin failed
+ * Parsing failed
*/
private void analyzeAlterTableTouch(String[] qualified, CommonTree ast)
throws SemanticException {
@@ -2929,8 +2932,8 @@ public class DDLSemanticAnalyzer extends
*
* @param ast Tree to extract partitions from.
* @param tab Table.
- * @param result Map of partitions by prefix length. Most of the time prefix length will
- * be the same for all partition specs, so we can just OR the expressions.
+ * @return Map of partitions by prefix length. Most of the time prefix length will
+ * be the same for all partition specs, so we can just OR the expressions.
*/
private Map<Integer, List<ExprNodeGenericFuncDesc>> getFullPartitionSpecs(
CommonTree ast, Table tab, boolean canGroupExprs) throws SemanticException {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java Thu Oct 30 16:22:33 2014
@@ -57,6 +57,8 @@ import org.apache.hadoop.hive.ql.plan.Un
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
+import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.AUTOPARALLEL;
+
/**
* GenTezUtils is a collection of shared helper methods to produce
* TezWork
@@ -117,7 +119,7 @@ public class GenTezUtils {
reduceWork.setNumReduceTasks(reduceSink.getConf().getNumReducers());
- if (isAutoReduceParallelism && reduceSink.getConf().isAutoParallel()) {
+ if (isAutoReduceParallelism && reduceSink.getConf().getReducerTraits().contains(AUTOPARALLEL)) {
reduceWork.setAutoReduceParallelism(true);
// configured limit for reducers
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java Thu Oct 30 16:22:33 2014
@@ -123,6 +123,11 @@ public class GenTezWork implements NodeP
context.rootToWorkMap.put(root, work);
}
+ // this is where we set the sort columns that we will be using for KeyValueInputMerge
+ if (operator instanceof DummyStoreOperator) {
+ work.addSortCols(root.getOpTraits().getSortCols().get(0));
+ }
+
if (!context.childToWorkMap.containsKey(operator)) {
List<BaseWork> workItems = new LinkedList<BaseWork>();
workItems.add(work);
@@ -137,17 +142,18 @@ public class GenTezWork implements NodeP
// we are currently walking the big table side of the merge join. we need to create or hook up
// merge join work.
MergeJoinWork mergeJoinWork = null;
- if (context.opMergeJoinWorkMap.containsKey(operator)) {
+ if (context.opMergeJoinWorkMap.containsKey(context.currentMergeJoinOperator)) {
// we have found a merge work corresponding to this closing operator. Hook up this work.
- mergeJoinWork = context.opMergeJoinWorkMap.get(operator);
+ mergeJoinWork = context.opMergeJoinWorkMap.get(context.currentMergeJoinOperator);
} else {
// we need to create the merge join work
mergeJoinWork = new MergeJoinWork();
mergeJoinWork.setMergeJoinOperator(context.currentMergeJoinOperator);
tezWork.add(mergeJoinWork);
- context.opMergeJoinWorkMap.put(operator, mergeJoinWork);
+ context.opMergeJoinWorkMap.put(context.currentMergeJoinOperator, mergeJoinWork);
}
// connect the work correctly.
+ work.addSortCols(root.getOpTraits().getSortCols().get(0));
mergeJoinWork.addMergedWork(work, null);
Operator<? extends OperatorDesc> parentOp =
getParentFromStack(context.currentMergeJoinOperator, stack);
@@ -334,10 +340,16 @@ public class GenTezWork implements NodeP
UnionWork unionWork = (UnionWork) followingWork;
int index = getMergeIndex(tezWork, unionWork, rs);
// guaranteed to be instance of MergeJoinWork if index is valid
- MergeJoinWork mergeJoinWork = (MergeJoinWork) tezWork.getChildren(unionWork).get(index);
- // disconnect the connection to union work and connect to merge work
- followingWork = mergeJoinWork;
- rWork = (ReduceWork) mergeJoinWork.getMainWork();
+ BaseWork baseWork = tezWork.getChildren(unionWork).get(index);
+ if (baseWork instanceof MergeJoinWork) {
+ MergeJoinWork mergeJoinWork = (MergeJoinWork) baseWork;
+ // disconnect the connection to union work and connect to merge work
+ followingWork = mergeJoinWork;
+ rWork = (ReduceWork) mergeJoinWork.getMainWork();
+ } else {
+ throw new SemanticException("Unknown work type found: "
+ + baseWork.getClass().getCanonicalName());
+ }
} else {
rWork = (ReduceWork) followingWork;
}
@@ -391,6 +403,8 @@ public class GenTezWork implements NodeP
} else {
index++;
}
+ } else {
+ index++;
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g Thu Oct 30 16:22:33 2014
@@ -1334,7 +1334,7 @@ showStatement
| KW_SHOW KW_TABLES ((KW_FROM|KW_IN) db_name=identifier)? (KW_LIKE showStmtIdentifier|showStmtIdentifier)? -> ^(TOK_SHOWTABLES (TOK_FROM $db_name)? showStmtIdentifier?)
| KW_SHOW KW_COLUMNS (KW_FROM|KW_IN) tableName ((KW_FROM|KW_IN) db_name=identifier)?
-> ^(TOK_SHOWCOLUMNS tableName $db_name?)
- | KW_SHOW KW_FUNCTIONS showFunctionIdentifier? -> ^(TOK_SHOWFUNCTIONS showFunctionIdentifier?)
+ | KW_SHOW KW_FUNCTIONS (KW_LIKE showFunctionIdentifier|showFunctionIdentifier)? -> ^(TOK_SHOWFUNCTIONS KW_LIKE? showFunctionIdentifier?)
| KW_SHOW KW_PARTITIONS tabName=tableName partitionSpec? -> ^(TOK_SHOWPARTITIONS $tabName partitionSpec?)
| KW_SHOW KW_CREATE KW_TABLE tabName=tableName -> ^(TOK_SHOW_CREATETABLE $tabName)
| KW_SHOW KW_TABLE KW_EXTENDED ((KW_FROM|KW_IN) db_name=identifier)? KW_LIKE showStmtIdentifier partitionSpec?
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveSemanticAnalyzerHookContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveSemanticAnalyzerHookContext.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveSemanticAnalyzerHookContext.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveSemanticAnalyzerHookContext.java Thu Oct 30 16:22:33 2014
@@ -57,4 +57,12 @@ public interface HiveSemanticAnalyzerHoo
public String getUserName();
public void setUserName(String userName);
+
+ public String getIpAddress();
+
+ public void setIpAddress(String ipAddress);
+
+ public String getCommand();
+
+ public void setCommand(String command);
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveSemanticAnalyzerHookContextImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveSemanticAnalyzerHookContextImpl.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveSemanticAnalyzerHookContextImpl.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveSemanticAnalyzerHookContextImpl.java Thu Oct 30 16:22:33 2014
@@ -33,6 +33,8 @@ public class HiveSemanticAnalyzerHookCon
Set<ReadEntity> inputs = null;
Set<WriteEntity> outputs = null;
private String userName;
+ private String ipAddress;
+ private String command;
@Override
public Hive getHive() throws HiveException {
@@ -73,4 +75,24 @@ public class HiveSemanticAnalyzerHookCon
public void setUserName(String userName) {
this.userName = userName;
}
+
+ @Override
+ public String getIpAddress() {
+ return ipAddress;
+ }
+
+ @Override
+ public void setIpAddress(String ipAddress) {
+ this.ipAddress = ipAddress;
+ }
+
+ @Override
+ public String getCommand() {
+ return command;
+ }
+
+ @Override
+ public void setCommand(String command) {
+ this.command = command;
+ }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java Thu Oct 30 16:22:33 2014
@@ -130,6 +130,10 @@ public class QB {
return (outer_id == null ? alias : outer_id + ":" + alias);
}
+ public String getAlias() {
+ return qbp.getAlias();
+ }
+
public QBParseInfo getParseInfo() {
return qbp;
}
@@ -248,6 +252,12 @@ public class QB {
return isQuery;
}
+ // to decide whether to rewrite RR of subquery
+ public boolean isTopLevelSelectStarQuery() {
+ return !isCTAS() && qbp.isTopLevelSimpleSelectStarQuery();
+ }
+
+ // to find target for fetch task conversion optimizer (not allows subqueries)
public boolean isSimpleSelectQuery() {
return qbp.isSimpleSelectQuery() && aliasToSubq.isEmpty() && !isCTAS() &&
!qbp.isAnalyzeCommand();
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java Thu Oct 30 16:22:33 2014
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.antlr.runtime.tree.Tree;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.tableSpec;
@@ -449,39 +450,49 @@ public class QBParseInfo {
this.outerQueryLimit = outerQueryLimit;
}
+ public boolean isTopLevelSimpleSelectStarQuery() {
+ if (alias != null || destToSelExpr.size() != 1 || !isSimpleSelectQuery()) {
+ return false;
+ }
+ for (ASTNode selExprs : destToSelExpr.values()) {
+ if (selExprs.getChildCount() != 1) {
+ return false;
+ }
+ Tree sel = selExprs.getChild(0).getChild(0);
+ if (sel == null || sel.getType() != HiveParser.TOK_ALLCOLREF) {
+ return false;
+ }
+ }
+ return true;
+ }
+
public boolean isSimpleSelectQuery() {
- if (isSubQ || (joinExpr != null)
- || (!destToGroupby.isEmpty()) || (!destToClusterby.isEmpty())
- || (!aliasToLateralViews.isEmpty())) {
+ if (isSubQ || joinExpr != null || !destToOrderby.isEmpty() || !destToSortby.isEmpty()
+ || !destToGroupby.isEmpty() || !destToClusterby.isEmpty() || !destToDistributeby.isEmpty()
+ || !aliasToLateralViews.isEmpty() || !destToLateralView.isEmpty()) {
return false;
}
- Iterator<Map.Entry<String, LinkedHashMap<String, ASTNode>>> aggrIter = destToAggregationExprs
- .entrySet().iterator();
- while (aggrIter.hasNext()) {
- HashMap<String, ASTNode> h = aggrIter.next().getValue();
- if ((h != null) && (!h.isEmpty())) {
+ for (Map<String, ASTNode> entry : destToAggregationExprs.values()) {
+ if (entry != null && !entry.isEmpty()) {
return false;
}
}
- if (!destToDistinctFuncExprs.isEmpty()) {
- Iterator<Map.Entry<String, List<ASTNode>>> distn = destToDistinctFuncExprs
- .entrySet().iterator();
- while (distn.hasNext()) {
- List<ASTNode> ct = distn.next().getValue();
- if (!ct.isEmpty()) {
- return false;
- }
+ for (Map<String, ASTNode> entry : destToWindowingExprs.values()) {
+ if (entry != null && !entry.isEmpty()) {
+ return false;
+ }
+ }
+
+ for (List<ASTNode> ct : destToDistinctFuncExprs.values()) {
+ if (!ct.isEmpty()) {
+ return false;
}
}
- Iterator<Map.Entry<String, ASTNode>> iter = nameToDest.entrySet()
- .iterator();
- while (iter.hasNext()) {
- Map.Entry<String, ASTNode> entry = iter.next();
- ASTNode v = entry.getValue();
- if (!(((ASTNode)v.getChild(0)).getToken().getType() == HiveParser.TOK_TMP_FILE)) {
+ for (ASTNode v : nameToDest.values()) {
+ if (!(v.getChild(0).getType() == HiveParser.TOK_TMP_FILE)) {
return false;
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java Thu Oct 30 16:22:33 2014
@@ -29,7 +29,6 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.RowSchema;
@@ -120,7 +119,11 @@ public class RowResolver implements Seri
f_map = new LinkedHashMap<String, ColumnInfo>();
rslvMap.put(tab_alias, f_map);
}
- f_map.put(col_alias, colInfo);
+ ColumnInfo oldColInfo = f_map.put(col_alias, colInfo);
+ if (oldColInfo != null) {
+ LOG.warn("Duplicate column info for " + tab_alias + "." + col_alias
+ + " was overwritten in RowResolver map: " + oldColInfo + " by " + colInfo);
+ }
String[] qualifiedAlias = new String[2];
qualifiedAlias[0] = tab_alias;
@@ -195,17 +198,6 @@ public class RowResolver implements Seri
return ret;
}
- /**
- * check if column name is already exist in RR
- */
- public void checkColumn(String tableAlias, String columnAlias) throws SemanticException {
- ColumnInfo prev = get(null, columnAlias);
- if (prev != null &&
- (tableAlias == null || !tableAlias.equalsIgnoreCase(prev.getTabAlias()))) {
- throw new SemanticException(ErrorMsg.AMBIGUOUS_COLUMN.getMsg(columnAlias));
- }
- }
-
public ArrayList<ColumnInfo> getColumnInfos() {
return rowSchema.getSignature();
}
@@ -351,40 +343,44 @@ public class RowResolver implements Seri
this.expressionMap = expressionMap;
}
+ private static class IntRef {
+ public int val = 0;
+ }
- // TODO: 1) How to handle collisions? 2) Should we be cloning ColumnInfo or
- // not?
- public static int add(RowResolver rrToAddTo, RowResolver rrToAddFrom,
- int outputColPos, int numColumns) throws SemanticException {
+ public static boolean add(RowResolver rrToAddTo, RowResolver rrToAddFrom, int numColumns)
+ throws SemanticException {
+ return add(rrToAddTo, rrToAddFrom, null, numColumns);
+ }
+
+ // TODO: 1) How to handle collisions? 2) Should we be cloning ColumnInfo or not?
+ private static boolean add(RowResolver rrToAddTo, RowResolver rrToAddFrom,
+ IntRef outputColPosRef, int numColumns) throws SemanticException {
+ boolean hasDuplicates = false;
String tabAlias;
String colAlias;
String[] qualifiedColName;
int i = 0;
+ int outputColPos = outputColPosRef == null ? 0 : outputColPosRef.val;
for (ColumnInfo cInfoFrmInput : rrToAddFrom.getRowSchema().getSignature()) {
if ( numColumns >= 0 && i == numColumns ) {
break;
}
ColumnInfo newCI = null;
- qualifiedColName = rrToAddFrom.getInvRslvMap().get(
- cInfoFrmInput.getInternalName());
+ String internalName = cInfoFrmInput.getInternalName();
+ qualifiedColName = rrToAddFrom.reverseLookup(internalName);
tabAlias = qualifiedColName[0];
colAlias = qualifiedColName[1];
newCI = new ColumnInfo(cInfoFrmInput);
- newCI.setInternalName(SemanticAnalyzer
- .getColumnInternalName(outputColPos));
+ newCI.setInternalName(SemanticAnalyzer.getColumnInternalName(outputColPos));
outputColPos++;
- if (rrToAddTo.get(tabAlias, colAlias) != null) {
- LOG.debug("Found duplicate column alias in RR: " + rrToAddTo.get(tabAlias, colAlias));
- } else {
- rrToAddTo.put(tabAlias, colAlias, newCI);
- }
+ boolean isUnique = rrToAddTo.putWithCheck(tabAlias, colAlias, internalName, newCI);
+ hasDuplicates |= (!isUnique);
- qualifiedColName = rrToAddFrom.getAlternateMappings(cInfoFrmInput
- .getInternalName());
+ qualifiedColName = rrToAddFrom.getAlternateMappings(internalName);
if (qualifiedColName != null) {
tabAlias = qualifiedColName[0];
colAlias = qualifiedColName[1];
@@ -393,31 +389,73 @@ public class RowResolver implements Seri
i++;
}
- return outputColPos;
- }
+ if (outputColPosRef != null) {
+ outputColPosRef.val = outputColPos;
+ }
+ return !hasDuplicates;
+ }
- public static int add(RowResolver rrToAddTo, RowResolver rrToAddFrom,
- int outputColPos) throws SemanticException {
- return add(rrToAddTo, rrToAddFrom, outputColPos, -1);
- }
-
- /**
- * Return a new row resolver that is combination of left RR and right RR.
- * The schema will be schema of left, schema of right
- *
- * @param leftRR
- * @param rightRR
- * @return
- * @throws SemanticException
- */
- public static RowResolver getCombinedRR(RowResolver leftRR,
- RowResolver rightRR) throws SemanticException {
- int outputColPos = 0;
-
- RowResolver combinedRR = new RowResolver();
- outputColPos = add(combinedRR, leftRR, outputColPos);
- outputColPos = add(combinedRR, rightRR, outputColPos);
+ /**
+ * Adds column to RR, checking for duplicate columns. Needed because CBO cannot handle the Hive
+ * behavior of blindly overwriting old mapping in RR and still somehow working after that.
+ * @return True if mapping was added without duplicates.
+ */
+ public boolean putWithCheck(String tabAlias, String colAlias,
+ String internalName, ColumnInfo newCI) throws SemanticException {
+ ColumnInfo existing = get(tabAlias, colAlias);
+ // Hive adds the same mapping twice... I wish we could fix stuff like that.
+ if (existing == null) {
+ put(tabAlias, colAlias, newCI);
+ return true;
+ } else if (existing.isSameColumnForRR(newCI)) {
+ return true;
+ }
+ LOG.warn("Found duplicate column alias in RR: "
+ + existing.toMappingString(tabAlias, colAlias) + " adding "
+ + newCI.toMappingString(tabAlias, colAlias));
+ if (internalName != null) {
+ existing = get(tabAlias, internalName);
+ if (existing == null) {
+ put(tabAlias, internalName, newCI);
+ return true;
+ } else if (existing.isSameColumnForRR(newCI)) {
+ return true;
+ }
+ LOG.warn("Failed to use internal name after finding a duplicate: "
+ + existing.toMappingString(tabAlias, internalName));
+ }
+ return false;
+ }
+
+ private static boolean add(RowResolver rrToAddTo, RowResolver rrToAddFrom,
+ IntRef outputColPosRef) throws SemanticException {
+ return add(rrToAddTo, rrToAddFrom, outputColPosRef, -1);
+ }
+
+ public static boolean add(RowResolver rrToAddTo, RowResolver rrToAddFrom)
+ throws SemanticException {
+ return add(rrToAddTo, rrToAddFrom, null, -1);
+ }
- return combinedRR;
- }
+ /**
+ * Return a new row resolver that is combination of left RR and right RR.
+ * The schema will be schema of left, schema of right
+ *
+ * @param leftRR
+ * @param rightRR
+ * @return
+ * @throws SemanticException
+ */
+ public static RowResolver getCombinedRR(RowResolver leftRR,
+ RowResolver rightRR) throws SemanticException {
+ RowResolver combinedRR = new RowResolver();
+ IntRef outputColPos = new IntRef();
+ if (!add(combinedRR, leftRR, outputColPos)) {
+ LOG.warn("Duplicates detected when adding columns to RR: see previous message");
+ }
+ if (!add(combinedRR, rightRR, outputColPos)) {
+ LOG.warn("Duplicates detected when adding columns to RR: see previous message");
+ }
+ return combinedRR;
+ }
}