You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2008/12/17 21:38:57 UTC
svn commit: r727506 - in /hadoop/hive/trunk: ./
common/src/java/org/apache/hadoop/hive/conf/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/parse/
ql/src/test/results/clientpositive/
Author: zshao
Date: Wed Dec 17 12:38:56 2008
New Revision: 727506
URL: http://svn.apache.org/viewvc?rev=727506&view=rev
Log:
HIVE-170. Map-side aggregations to estimate memory size. (Namit via zshao)
Modified:
hadoop/hive/trunk/CHANGES.txt
hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby4_map.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby5_map.q.out
Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=727506&r1=727505&r2=727506&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Wed Dec 17 12:38:56 2008
@@ -26,6 +26,8 @@
IMPROVEMENTS
+ HIVE-170. Map-side aggregations to estimate memory size. (Namit via zshao)
+
HIVE-180. Data Generator for thrift-serialized sequence files. (zshao)
HIVE-157. Update README.txt to remove refs to mirror.facebook.com. (zshao)
Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=727506&r1=727505&r2=727506&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Dec 17 12:38:56 2008
@@ -87,6 +87,7 @@
HIVEALIAS("hive.alias", ""),
HIVEMAPSIDEAGGREGATE("hive.map.aggr", "false"),
HIVEJOINEMITINTERVAL("hive.join.emit.interval", 1000),
+ HIVEMAPAGGRHASHMEMORY("hive.map.aggr.hash.percentmemory", (float)0.5),
// Default file format for CREATE TABLE statement
// Options: TextFile, SequenceFile
@@ -95,6 +96,7 @@
public final String varname;
public final String defaultVal;
public final int defaultIntVal;
+ public final float defaultFloatVal;
public final Class valClass;
public final boolean defaultBoolVal;
@@ -103,6 +105,7 @@
this.defaultVal = defaultVal;
this.valClass = String.class;
this.defaultIntVal = -1;
+ this.defaultFloatVal = -1;
this.defaultBoolVal = false;
}
@@ -110,14 +113,25 @@
this.varname = varname;
this.defaultVal = null;
this.defaultIntVal = defaultIntVal;
+ this.defaultFloatVal = -1;
this.valClass = Integer.class;
this.defaultBoolVal = false;
}
+ ConfVars(String varname, float defaultFloatVal) {
+ this.varname = varname;
+ this.defaultVal = null;
+ this.defaultIntVal = -1;
+ this.defaultFloatVal = defaultFloatVal;
+ this.valClass = Float.class;
+ this.defaultBoolVal = false;
+ }
+
ConfVars(String varname, boolean defaultBoolVal) {
this.varname = varname;
this.defaultVal = null;
this.defaultIntVal = -1;
+ this.defaultFloatVal = -1;
this.valClass = Boolean.class;
this.defaultBoolVal = defaultBoolVal;
}
@@ -136,6 +150,15 @@
return getIntVar(this, var);
}
+ public static float getFloatVar(Configuration conf, ConfVars var) {
+ assert(var.valClass == Float.class);
+ return conf.getFloat(var.varname, var.defaultFloatVal);
+ }
+
+ public float getFloatVar(ConfVars var) {
+ return getFloatVar(this, var);
+ }
+
public static boolean getBoolVar(Configuration conf, ConfVars var) {
assert(var.valClass == Boolean.class);
return conf.getBoolean(var.varname, var.defaultBoolVal);
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=727506&r1=727505&r2=727506&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Wed Dec 17 12:38:56 2008
@@ -26,6 +26,8 @@
import java.util.Map;
import java.io.Serializable;
import java.lang.reflect.Method;
+import java.lang.reflect.Field;
+import java.lang.IllegalAccessException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.aggregationDesc;
@@ -37,13 +39,22 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.parse.RowResolver;
import org.apache.hadoop.hive.ql.parse.OpParseContext;
+import org.apache.hadoop.hive.ql.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.ql.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* GroupBy operator implementation.
*/
public class GroupByOperator extends Operator <groupByDesc> implements Serializable {
+ static final private Log LOG = LogFactory.getLog(JoinOperator.class.getName());
+
private static final long serialVersionUID = 1L;
+ private static final int NUMROWSESTIMATESIZE = 1000;
+
transient protected ExprNodeEvaluator[] keyFields;
transient protected ExprNodeEvaluator[][] aggregationParameterFields;
// In the future, we may allow both count(DISTINCT a) and sum(DISTINCT a) in the same SQL clause,
@@ -69,6 +80,38 @@
transient long totalMemory;
transient boolean hashAggr;
+ /**
+ * This is used to store the position and field names for variable length fields.
+ **/
+ class varLenFields {
+ int aggrPos;
+ List<Field> fields;
+ varLenFields(int aggrPos, List<Field> fields) {
+ this.aggrPos = aggrPos;
+ this.fields = fields;
+ }
+
+ int getAggrPos() {
+ return aggrPos;
+ }
+
+ List<Field> getFields() {
+ return fields;
+ }
+ };
+
+ // for these positions, some variable primitive type (String) is used, so size cannot be estimated. sample it at runtime.
+ transient List<Integer> keyPositionsSize;
+
+ // for these positions, some variable primitive type (String) is used for the aggregation classes
+ transient List<varLenFields> aggrPositions;
+
+ transient int fixedRowSize;
+ transient long maxHashTblMemory;
+ transient int totalVariableSize;
+ transient int numEntriesVarSize;
+ transient int numEntriesHashTable;
+
public void initialize(Configuration hconf) throws HiveException {
super.initialize(hconf);
totalMemory = Runtime.getRuntime().totalMemory();
@@ -149,7 +192,9 @@
} else {
hashAggregations = new HashMap<ArrayList<Object>, UDAF[]>();
hashAggr = true;
+ keyPositionsSize = new ArrayList<Integer>();
}
+
// init objectInspectors
int totalFields = keyFields.length + aggregationClasses.length;
objectInspectors = new ArrayList<ObjectInspector>(totalFields);
@@ -162,6 +207,132 @@
}
firstRow = true;
+ // estimate the number of hash table entries based on the size of each entry. Since the size of a entry
+ // is not known, estimate that based on the number of entries
+ if (conf.getMode() == groupByDesc.Mode.HASH)
+ computeMaxEntriesHashAggr(hconf);
+ }
+
+ /**
+ * Estimate the number of entries in map-side hash table.
+ * The user can specify the total amount of memory to be used by the map-side hash. By default, all available
+ * memory is used. The size of each row is estimated, rather crudely, and the number of entries are figure out
+ * based on that.
+ * @return number of entries that can fit in hash table - useful for map-side aggregation only
+ **/
+ private void computeMaxEntriesHashAggr(Configuration hconf) {
+ maxHashTblMemory = (long)(HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY) * Runtime.getRuntime().maxMemory());
+ estimateRowSize();
+ }
+
+ private static final int javaObjectOverHead = 16;
+ private static final int javaHashEntryOverHead = 64;
+ private static final int javaSizePrimitiveType = 16;
+ private static final int javaSizeUnknownType = 256;
+
+ /**
+ * The size of the element at position 'pos' is returned, if possible.
+ * If the datatype is of variable length, STRING, a list of such key positions is maintained, and the size for such positions is
+ * then actually calculated at runtime.
+ * @param pos the position of the key
+ * @param c the type of the key
+ * @return the size of this datatype
+ **/
+ private int getSize(int pos, Class<?> c) {
+ if (c.isPrimitive() ||
+ c.isInstance(new Boolean(true)) ||
+ c.isInstance(new Byte((byte)0)) ||
+ c.isInstance(new Short((short)0)) ||
+ c.isInstance(new Integer(0)) ||
+ c.isInstance(new Long(0)) ||
+ c.isInstance(new Float(0)) ||
+ c.isInstance(new Double(0)))
+ return javaSizePrimitiveType;
+
+ if (c.isInstance(new String())) {
+ keyPositionsSize.add(new Integer(pos));
+ return javaObjectOverHead;
+ }
+
+ return javaSizeUnknownType;
+ }
+
+ /**
+ * The size of the element at position 'pos' is returned, if possible.
+ * If the field is of variable length, STRING, a list of such field names for the field position is maintained, and the size
+ * for such positions is then actually calculated at runtime.
+ * @param pos the position of the key
+ * @param c the type of the key
+ * @param f the field to be added
+ * @return the size of this datatype
+ **/
+ private int getSize(int pos, Class<?> c, Field f) {
+ if (c.isPrimitive() ||
+ c.isInstance(new Boolean(true)) ||
+ c.isInstance(new Byte((byte)0)) ||
+ c.isInstance(new Short((short)0)) ||
+ c.isInstance(new Integer(0)) ||
+ c.isInstance(new Long(0)) ||
+ c.isInstance(new Float(0)) ||
+ c.isInstance(new Double(0)))
+ return javaSizePrimitiveType;
+
+ if (c.isInstance(new String())) {
+ int idx = 0;
+ varLenFields v = null;
+ for (idx = 0; idx < aggrPositions.size(); idx++) {
+ v = aggrPositions.get(idx);
+ if (v.getAggrPos() == pos)
+ break;
+ }
+
+ if (idx == aggrPositions.size()) {
+ v = new varLenFields(pos, new ArrayList<Field>());
+ aggrPositions.add(v);
+ }
+
+ v.getFields().add(f);
+ return javaObjectOverHead;
+ }
+
+ return javaSizeUnknownType;
+ }
+
+ /**
+ * @param pos position of the key
+ * @param typeinfo type of the input
+ * @return the size of this datatype
+ **/
+ private int getSize(int pos, TypeInfo typeInfo) {
+ if (typeInfo instanceof PrimitiveTypeInfo)
+ return getSize(pos, typeInfo.getPrimitiveClass());
+ return javaSizeUnknownType;
+ }
+
+ /**
+ * @return the size of each row
+ **/
+ private void estimateRowSize() {
+ // estimate the size of each entry -
+ // a datatype with unknown size (String/Struct etc. - is assumed to be 256 bytes for now).
+ // 64 bytes is the overhead for a reference
+ fixedRowSize = javaHashEntryOverHead;
+
+ ArrayList<exprNodeDesc> keys = conf.getKeys();
+
+ // Go over all the keys and get the size of the fields of fixed length. Keep track of the variable length keys
+ for (int pos = 0; pos < keys.size(); pos++)
+ fixedRowSize += getSize(pos, keys.get(pos).getTypeInfo());
+
+ // Go over all the aggregation classes and and get the size of the fields of fixed length. Keep track of the variable length
+ // fields in these aggregation classes.
+ for(int i=0; i < aggregationClasses.length; i++) {
+ fixedRowSize += javaObjectOverHead;
+ Class<? extends UDAF> agg = aggregationClasses[i];
+ Field[] fArr = agg.getFields();
+ for (Field f : fArr)
+ fixedRowSize += getSize(i, agg, f);
+ }
}
protected UDAF[] newAggregations() throws HiveException {
@@ -182,6 +353,7 @@
protected void updateAggregations(UDAF[] aggs, Object row, ObjectInspector rowInspector, boolean hashAggr, boolean newEntry,
Object[][] lastInvoke) throws HiveException {
for(int ai=0; ai<aggs.length; ai++) {
+
// Calculate the parameters
Object[] o = new Object[aggregationParameterFields[ai].length];
for(int pi=0; pi<aggregationParameterFields[ai].length; pi++) {
@@ -266,14 +438,12 @@
hashAggregations.put(newKeys, aggs);
newEntry = true;
}
-
+
// Update the aggs
updateAggregations(aggs, row, rowInspector, true, newEntry, null);
- // currently, we use a simple approximation - if 90% of memory is being
- // used, flush
- long freeMemory = Runtime.getRuntime().freeMemory();
- if (shouldBeFlushed(totalMemory, freeMemory)) {
+ // based on used-specified pramaters, check if the hash table needs to be flushed
+ if (shouldBeFlushed(newKeys)) {
flush();
}
}
@@ -308,13 +478,52 @@
updateAggregations(aggs, row, rowInspector, false, false, lastInvoke);
}
- private boolean shouldBeFlushed(long total, long free) {
- if (10 * free >= total)
+ /**
+ * Based on user-parameters, should the hash table be flushed.
+ * @param newKeys keys for the row under consideration
+ **/
+ private boolean shouldBeFlushed(ArrayList<Object> newKeys) {
+ int numEntries = hashAggregations.size();
+
+ // The fixed size for the aggregation class is already known. Get the variable portion of the size every NUMROWSESTIMATESIZE rows.
+ if ((numEntries % NUMROWSESTIMATESIZE) == 0) {
+ for (Integer pos : keyPositionsSize) {
+ Object key = newKeys.get(pos.intValue());
+ totalVariableSize += ((String)key).length();
+ }
+
+ UDAF[] aggs = null;
+ if (aggrPositions.size() > 0)
+ aggs = hashAggregations.get(newKeys);
+
+ for (varLenFields v : aggrPositions) {
+ int aggrPos = v.getAggrPos();
+ List<Field> fieldsVarLen = v.getFields();
+ UDAF agg = aggs[aggrPos];
+
+ try
+ {
+ for (Field f : fieldsVarLen)
+ totalVariableSize += ((String)f.get(agg)).length();
+ } catch (IllegalAccessException e) {
+ assert false;
+ }
+ }
+
+ numEntriesVarSize++;
+ // Update the number of entries that can fit in the hash table
+ numEntriesHashTable = (int)(maxHashTblMemory / (fixedRowSize + ((int)totalVariableSize/numEntriesVarSize)));
+ LOG.trace("Hash Aggr: #hash table = " + numEntries + " #max in hash table = " + numEntriesHashTable);
+ }
+
+ // flush if necessary
+ if (numEntries >= numEntriesHashTable)
return true;
return false;
}
private void flush() throws HiveException {
+
// Currently, the algorithm flushes 10% of the entries - this can be
// changed in the future
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=727506&r1=727505&r2=727506&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Wed Dec 17 12:38:56 2008
@@ -1512,6 +1512,14 @@
private Operator genGroupByPlanReduceSinkOperator(QBParseInfo parseInfo,
String dest, Operator inputOperatorInfo)
throws SemanticException {
+
+ return genGroupByPlanReduceSinkOperator(parseInfo, dest, inputOperatorInfo, -1, false);
+ }
+
+ @SuppressWarnings("nls")
+ private Operator genGroupByPlanReduceSinkOperator(QBParseInfo parseInfo,
+ String dest, Operator inputOperatorInfo, int numReducers, boolean inferNumReducers)
+ throws SemanticException {
RowResolver reduceSinkInputRowResolver = opParseCtx.get(inputOperatorInfo).getRR();
RowResolver reduceSinkOutputRowResolver = new RowResolver();
reduceSinkOutputRowResolver.setIsExprResolver(true);
@@ -1569,7 +1577,7 @@
return putOpInsertMap(
OperatorFactory.getAndMakeChild(
PlanUtils.getReduceSinkDesc(reduceKeys, reduceValues, -1,
- (parseInfo.getDistinctFuncExprForClause(dest) == null ? -1 : Integer.MAX_VALUE), -1, false),
+ (parseInfo.getDistinctFuncExprForClause(dest) == null ? -1 : Integer.MAX_VALUE), numReducers, inferNumReducers),
new RowSchema(reduceSinkOutputRowResolver.getColumnInfos()),
inputOperatorInfo),
reduceSinkOutputRowResolver);
@@ -1885,12 +1893,14 @@
Operator groupByOperatorInfo = genGroupByPlanMapGroupByOperator(qb,
dest, inputOperatorInfo, groupByDesc.Mode.HASH);
- // ////// Generate ReduceSink Operator
- Operator reduceSinkOperatorInfo =
- genGroupByPlanReduceSinkOperator(parseInfo, dest, groupByOperatorInfo);
-
// Optimize the scenario when there are no grouping keys and no distinct - 2 map-reduce jobs are not needed
+ // For eg: select count(1) from T where t.ds = ....
if (!optimizeMapAggrGroupBy(dest, qb)) {
+
+ // ////// Generate ReduceSink Operator
+ Operator reduceSinkOperatorInfo =
+ genGroupByPlanReduceSinkOperator(parseInfo, dest, groupByOperatorInfo);
+
// ////// Generate GroupbyOperator for a partial aggregation
Operator groupByOperatorInfo2 = genGroupByPlanGroupByOperator1(parseInfo, dest, reduceSinkOperatorInfo,
groupByDesc.Mode.PARTIAL2);
@@ -1902,8 +1912,13 @@
// ////// Generate GroupbyOperator3
return genGroupByPlanGroupByOperator2MR(parseInfo, dest, reduceSinkOperatorInfo2, groupByDesc.Mode.FINAL);
}
- else
+ else {
+ // ////// Generate ReduceSink Operator
+ Operator reduceSinkOperatorInfo =
+ genGroupByPlanReduceSinkOperator(parseInfo, dest, groupByOperatorInfo, 1, false);
+
return genGroupByPlanGroupByOperator2MR(parseInfo, dest, reduceSinkOperatorInfo, groupByDesc.Mode.FINAL);
+ }
}
@SuppressWarnings("nls")
Modified: hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby4_map.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby4_map.q.out?rev=727506&r1=727505&r2=727506&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby4_map.q.out (original)
+++ hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby4_map.q.out Wed Dec 17 12:38:56 2008
@@ -24,6 +24,7 @@
value expressions:
expr: 0
type: bigint
+ # Reducers: 1
Reduce Operator Tree:
Group By Operator
aggregations:
Modified: hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby5_map.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby5_map.q.out?rev=727506&r1=727505&r2=727506&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby5_map.q.out (original)
+++ hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby5_map.q.out Wed Dec 17 12:38:56 2008
@@ -27,6 +27,7 @@
value expressions:
expr: 0
type: double
+ # Reducers: 1
Reduce Operator Tree:
Group By Operator
aggregations: