You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2014/09/07 09:53:06 UTC
svn commit: r1622983 -
/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
Author: prasanthj
Date: Sun Sep 7 07:53:06 2014
New Revision: 1622983
URL: http://svn.apache.org/r1622983
Log:
HIVE-7992: StatsRulesProcFactory should gracefully handle overflows (Prasanth J reviewed by Harish Butani)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java?rev=1622983&r1=1622982&r2=1622983&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java Sun Sep 7 07:53:06 2014
@@ -18,13 +18,8 @@
package org.apache.hadoop.hive.ql.optimizer.stats.annotation;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Stack;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -72,8 +67,12 @@ import org.apache.hadoop.hive.ql.udf.gen
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
import org.apache.hadoop.hive.serde.serdeConstants;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
public class StatsRulesProcFactory {
@@ -165,7 +164,7 @@ public class StatsRulesProcFactory {
sop.getSchema());
long dataSize = StatsUtils.getDataSizeFromColumnStats(stats.getNumRows(), colStats);
stats.setColumnStats(colStats);
- stats.setDataSize(dataSize);
+ stats.setDataSize(setMaxIfInvalid(dataSize));
sop.setStatistics(stats);
if (LOG.isDebugEnabled()) {
@@ -251,7 +250,8 @@ public class StatsRulesProcFactory {
ExprNodeDesc pred = fop.getConf().getPredicate();
// evaluate filter expression and update statistics
- long newNumRows = evaluateExpression(parentStats, pred, aspCtx, neededCols);
+ long newNumRows = evaluateExpression(parentStats, pred, aspCtx,
+ neededCols, fop);
Statistics st = parentStats.clone();
if (satisfyPrecondition(parentStats)) {
@@ -261,7 +261,7 @@ public class StatsRulesProcFactory {
// result in number of rows getting more than the input rows in
// which case stats need not be updated
if (newNumRows <= parentStats.getNumRows()) {
- updateStats(st, newNumRows, true);
+ updateStats(st, newNumRows, true, fop);
}
if (LOG.isDebugEnabled()) {
@@ -271,7 +271,7 @@ public class StatsRulesProcFactory {
// update only the basic statistics in the absence of column statistics
if (newNumRows <= parentStats.getNumRows()) {
- updateStats(st, newNumRows, false);
+ updateStats(st, newNumRows, false, fop);
}
if (LOG.isDebugEnabled()) {
@@ -288,7 +288,8 @@ public class StatsRulesProcFactory {
}
private long evaluateExpression(Statistics stats, ExprNodeDesc pred,
- AnnotateStatsProcCtx aspCtx, List<String> neededCols) throws CloneNotSupportedException {
+ AnnotateStatsProcCtx aspCtx, List<String> neededCols,
+ FilterOperator fop) throws CloneNotSupportedException {
long newNumRows = 0;
Statistics andStats = null;
@@ -303,24 +304,26 @@ public class StatsRulesProcFactory {
// evaluate children
for (ExprNodeDesc child : genFunc.getChildren()) {
- newNumRows = evaluateChildExpr(aspCtx.getAndExprStats(), child, aspCtx, neededCols);
+ newNumRows = evaluateChildExpr(aspCtx.getAndExprStats(), child,
+ aspCtx, neededCols, fop);
if (satisfyPrecondition(aspCtx.getAndExprStats())) {
- updateStats(aspCtx.getAndExprStats(), newNumRows, true);
+ updateStats(aspCtx.getAndExprStats(), newNumRows, true, fop);
} else {
- updateStats(aspCtx.getAndExprStats(), newNumRows, false);
+ updateStats(aspCtx.getAndExprStats(), newNumRows, false, fop);
}
}
} else if (udf instanceof GenericUDFOPOr) {
// for OR condition independently compute and update stats
for (ExprNodeDesc child : genFunc.getChildren()) {
- newNumRows += evaluateChildExpr(stats, child, aspCtx, neededCols);
+ newNumRows += evaluateChildExpr(stats, child, aspCtx, neededCols,
+ fop);
}
} else if (udf instanceof GenericUDFOPNot) {
- newNumRows = evaluateNotExpr(stats, pred, aspCtx, neededCols);
+ newNumRows = evaluateNotExpr(stats, pred, aspCtx, neededCols, fop);
} else {
// single predicate condition
- newNumRows = evaluateChildExpr(stats, pred, aspCtx, neededCols);
+ newNumRows = evaluateChildExpr(stats, pred, aspCtx, neededCols, fop);
}
} else if (pred instanceof ExprNodeColumnDesc) {
@@ -352,8 +355,9 @@ public class StatsRulesProcFactory {
return newNumRows;
}
- private long evaluateNotExpr(Statistics stats, ExprNodeDesc pred, AnnotateStatsProcCtx aspCtx,
- List<String> neededCols) throws CloneNotSupportedException {
+ private long evaluateNotExpr(Statistics stats, ExprNodeDesc pred,
+ AnnotateStatsProcCtx aspCtx, List<String> neededCols, FilterOperator fop)
+ throws CloneNotSupportedException {
long numRows = stats.getNumRows();
@@ -365,8 +369,9 @@ public class StatsRulesProcFactory {
// GenericUDF
long newNumRows = 0;
- for (ExprNodeDesc child : ((ExprNodeGenericFuncDesc) pred).getChildren()) {
- newNumRows = evaluateChildExpr(stats, child, aspCtx, neededCols);
+ for (ExprNodeDesc child : genFunc.getChildren()) {
+ newNumRows = evaluateChildExpr(stats, child, aspCtx, neededCols,
+ fop);
}
return numRows - newNumRows;
} else if (leaf instanceof ExprNodeConstantDesc) {
@@ -399,8 +404,7 @@ public class StatsRulesProcFactory {
return numRows / 2;
}
- private long evaluateColEqualsNullExpr(Statistics stats, ExprNodeDesc pred,
- AnnotateStatsProcCtx aspCtx) {
+ private long evaluateColEqualsNullExpr(Statistics stats, ExprNodeDesc pred) {
long numRows = stats.getNumRows();
@@ -426,7 +430,8 @@ public class StatsRulesProcFactory {
}
private long evaluateChildExpr(Statistics stats, ExprNodeDesc child,
- AnnotateStatsProcCtx aspCtx, List<String> neededCols) throws CloneNotSupportedException {
+ AnnotateStatsProcCtx aspCtx, List<String> neededCols,
+ FilterOperator fop) throws CloneNotSupportedException {
long numRows = stats.getNumRows();
@@ -435,7 +440,8 @@ public class StatsRulesProcFactory {
ExprNodeGenericFuncDesc genFunc = (ExprNodeGenericFuncDesc) child;
GenericUDF udf = genFunc.getGenericUDF();
- if (udf instanceof GenericUDFOPEqual || udf instanceof GenericUDFOPEqualNS) {
+ if (udf instanceof GenericUDFOPEqual ||
+ udf instanceof GenericUDFOPEqualNS) {
String colName = null;
String tabAlias = null;
boolean isConst = false;
@@ -507,13 +513,13 @@ public class StatsRulesProcFactory {
|| udf instanceof GenericUDFOPLessThan) {
return numRows / 3;
} else if (udf instanceof GenericUDFOPNotNull) {
- long newNumRows = evaluateColEqualsNullExpr(stats, genFunc, aspCtx);
+ long newNumRows = evaluateColEqualsNullExpr(stats, genFunc);
return stats.getNumRows() - newNumRows;
} else if (udf instanceof GenericUDFOPNull) {
- return evaluateColEqualsNullExpr(stats, genFunc, aspCtx);
+ return evaluateColEqualsNullExpr(stats, genFunc);
} else if (udf instanceof GenericUDFOPAnd || udf instanceof GenericUDFOPOr
|| udf instanceof GenericUDFOPNot) {
- return evaluateExpression(stats, genFunc, aspCtx, neededCols);
+ return evaluateExpression(stats, genFunc, aspCtx, neededCols, fop);
}
}
@@ -618,7 +624,8 @@ public class StatsRulesProcFactory {
}
// map side
- if (gop.getChildOperators().get(0) instanceof ReduceSinkOperator) {
+ if (gop.getChildOperators().get(0) instanceof ReduceSinkOperator ||
+ gop.getChildOperators().get(0) instanceof AppMasterEventOperator) {
// since we do not know if hash-aggregation will be enabled or disabled
// at runtime we will assume that map-side group by does not do any
@@ -631,8 +638,8 @@ public class StatsRulesProcFactory {
// take into account the map-side parallelism as well, default is 1
multiplier *= mapSideParallelism;
- newNumRows = multiplier * stats.getNumRows();
- long dataSize = multiplier * stats.getDataSize();
+ newNumRows = setMaxIfInvalid(multiplier * stats.getNumRows());
+ long dataSize = setMaxIfInvalid(multiplier * stats.getDataSize());
stats.setNumRows(newNumRows);
stats.setDataSize(dataSize);
for (ColStatistics cs : colStats) {
@@ -646,13 +653,13 @@ public class StatsRulesProcFactory {
// map side no grouping set
newNumRows = stats.getNumRows() * mapSideParallelism;
- updateStats(stats, newNumRows, true);
+ updateStats(stats, newNumRows, true, gop);
}
} else {
// reduce side
newNumRows = applyGBYRule(stats.getNumRows(), dvProd);
- updateStats(stats, newNumRows, true);
+ updateStats(stats, newNumRows, true, gop);
}
} else {
if (parentStats != null) {
@@ -668,7 +675,7 @@ public class StatsRulesProcFactory {
// reduce side
stats = parentStats.clone();
long newNumRows = parentStats.getNumRows() / 2;
- updateStats(stats, newNumRows, false);
+ updateStats(stats, newNumRows, false, gop);
}
}
}
@@ -702,7 +709,7 @@ public class StatsRulesProcFactory {
// only if the column stats is available, update the data size from
// the column stats
if (!stats.getColumnStatsState().equals(Statistics.State.NONE)) {
- updateStats(stats, stats.getNumRows(), true);
+ updateStats(stats, stats.getNumRows(), true, gop);
}
}
@@ -711,7 +718,7 @@ public class StatsRulesProcFactory {
// rows will be 1
if (colExprMap.isEmpty()) {
stats.setNumRows(1);
- updateStats(stats, 1, true);
+ updateStats(stats, 1, true, gop);
}
}
@@ -941,19 +948,12 @@ public class StatsRulesProcFactory {
long newRowCount = computeNewRowCount(
Lists.newArrayList(rowCountParents.values()), denom);
- if (newRowCount <= 0 && LOG.isDebugEnabled()) {
- newRowCount = 0;
- LOG.debug("[0] STATS-" + jop.toString() + ": Product of #rows might be greater than"
- + " denominator or overflow might have occurred. Resetting row count to 0."
- + " #Rows of parents: " + rowCountParents.toString() + ". Denominator: " + denom);
- }
-
- updateStatsForJoinType(stats, newRowCount, jop.getConf(),
- rowCountParents, outInTabAlias);
+ updateStatsForJoinType(stats, newRowCount, jop, rowCountParents,
+ outInTabAlias);
jop.setStatistics(stats);
if (LOG.isDebugEnabled()) {
- LOG.debug("[1] STATS-" + jop.toString() + ": " + stats.extendedToString());
+ LOG.debug("[0] STATS-" + jop.toString() + ": " + stats.extendedToString());
}
} else {
@@ -981,14 +981,13 @@ public class StatsRulesProcFactory {
long maxDataSize = parentSizes.get(maxRowIdx);
long newNumRows = (long) (joinFactor * maxRowCount * (numParents - 1));
long newDataSize = (long) (joinFactor * maxDataSize * (numParents - 1));
-
Statistics wcStats = new Statistics();
- wcStats.setNumRows(newNumRows);
- wcStats.setDataSize(newDataSize);
+ wcStats.setNumRows(setMaxIfInvalid(newNumRows));
+ wcStats.setDataSize(setMaxIfInvalid(newDataSize));
jop.setStatistics(wcStats);
if (LOG.isDebugEnabled()) {
- LOG.debug("[2] STATS-" + jop.toString() + ": " + wcStats.extendedToString());
+ LOG.debug("[1] STATS-" + jop.toString() + ": " + wcStats.extendedToString());
}
}
}
@@ -1010,8 +1009,15 @@ public class StatsRulesProcFactory {
}
private void updateStatsForJoinType(Statistics stats, long newNumRows,
- JoinDesc conf, Map<String, Long> rowCountParents,
+ CommonJoinOperator<? extends JoinDesc> jop,
+ Map<String, Long> rowCountParents,
Map<String, String> outInTabAlias) {
+
+ if (newNumRows <= 0) {
+ LOG.info("STATS-" + jop.toString() + ": Overflow in number of rows."
+ + newNumRows + " rows will be set to Long.MAX_VALUE");
+ }
+ newNumRows = setMaxIfInvalid(newNumRows);
stats.setNumRows(newNumRows);
// scale down/up the column statistics based on the changes in number of
@@ -1042,7 +1048,7 @@ public class StatsRulesProcFactory {
stats.setColumnStats(colStats);
long newDataSize = StatsUtils
.getDataSizeFromColumnStats(newNumRows, colStats);
- stats.setDataSize(newDataSize);
+ stats.setDataSize(setMaxIfInvalid(newDataSize));
}
private long computeNewRowCount(List<Long> rowCountParents, long denom) {
@@ -1170,7 +1176,7 @@ public class StatsRulesProcFactory {
// if limit is greater than available rows then do not update
// statistics
if (limit <= parentStats.getNumRows()) {
- updateStats(stats, limit, true);
+ updateStats(stats, limit, true, lop);
}
lop.setStatistics(stats);
@@ -1187,8 +1193,8 @@ public class StatsRulesProcFactory {
long numRows = limit;
long avgRowSize = parentStats.getAvgRowSize();
long dataSize = avgRowSize * limit;
- wcStats.setNumRows(numRows);
- wcStats.setDataSize(dataSize);
+ wcStats.setNumRows(setMaxIfInvalid(numRows));
+ wcStats.setDataSize(setMaxIfInvalid(dataSize));
}
lop.setStatistics(wcStats);
@@ -1366,7 +1372,15 @@ public class StatsRulesProcFactory {
* @param useColStats
* - use column statistics to compute data size
*/
- static void updateStats(Statistics stats, long newNumRows, boolean useColStats) {
+ static void updateStats(Statistics stats, long newNumRows,
+ boolean useColStats, Operator<? extends OperatorDesc> op) {
+
+ if (newNumRows <= 0) {
+ LOG.info("STATS-" + op.toString() + ": Overflow in number of rows."
+ + newNumRows + " rows will be set to Long.MAX_VALUE");
+ }
+
+ newNumRows = setMaxIfInvalid(newNumRows);
long oldRowCount = stats.getNumRows();
double ratio = (double) newNumRows / (double) oldRowCount;
stats.setNumRows(newNumRows);
@@ -1391,10 +1405,10 @@ public class StatsRulesProcFactory {
}
stats.setColumnStats(colStats);
long newDataSize = StatsUtils.getDataSizeFromColumnStats(newNumRows, colStats);
- stats.setDataSize(newDataSize);
+ stats.setDataSize(setMaxIfInvalid(newDataSize));
} else {
long newDataSize = (long) (ratio * stats.getDataSize());
- stats.setDataSize(newDataSize);
+ stats.setDataSize(setMaxIfInvalid(newDataSize));
}
}
@@ -1403,4 +1417,13 @@ public class StatsRulesProcFactory {
&& !stats.getColumnStatsState().equals(Statistics.State.NONE);
}
+ /**
+ * negative number of rows or data sizes are invalid. It could be because of
+ * long overflow in which case return Long.MAX_VALUE
+ * @param val - input value
+ * @return Long.MAX_VALUE if val is negative else val
+ */
+ static long setMaxIfInvalid(long val) {
+ return val < 0 ? Long.MAX_VALUE : val;
+ }
}