You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2008/12/17 21:38:57 UTC

svn commit: r727506 - in /hadoop/hive/trunk: ./ common/src/java/org/apache/hadoop/hive/conf/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/parse/ ql/src/test/results/clientpositive/

Author: zshao
Date: Wed Dec 17 12:38:56 2008
New Revision: 727506

URL: http://svn.apache.org/viewvc?rev=727506&view=rev
Log:
HIVE-170. Map-side aggregations to estimate memory size. (Namit via zshao)

Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby4_map.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby5_map.q.out

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=727506&r1=727505&r2=727506&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Wed Dec 17 12:38:56 2008
@@ -26,6 +26,8 @@
 
   IMPROVEMENTS
 
+    HIVE-170. Map-side aggregations to estimate memory size. (Namit via zshao)
+
     HIVE-180. Data Generator for thrift-serialized sequence files. (zshao)
 
     HIVE-157. Update README.txt to remove refs to mirror.facebook.com. (zshao)

Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=727506&r1=727505&r2=727506&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Dec 17 12:38:56 2008
@@ -87,6 +87,7 @@
     HIVEALIAS("hive.alias", ""),
     HIVEMAPSIDEAGGREGATE("hive.map.aggr", "false"),
     HIVEJOINEMITINTERVAL("hive.join.emit.interval", 1000),
+      HIVEMAPAGGRHASHMEMORY("hive.map.aggr.hash.percentmemory", (float)0.5),
     
     // Default file format for CREATE TABLE statement
     // Options: TextFile, SequenceFile
@@ -95,6 +96,7 @@
     public final String varname;
     public final String defaultVal;
     public final int defaultIntVal;
+    public final float defaultFloatVal;
     public final Class valClass;
     public final boolean defaultBoolVal;
 
@@ -103,6 +105,7 @@
       this.defaultVal = defaultVal;
       this.valClass = String.class;
       this.defaultIntVal = -1;
+      this.defaultFloatVal = -1;
       this.defaultBoolVal = false;
     }
 
@@ -110,14 +113,25 @@
       this.varname = varname;
       this.defaultVal = null;
       this.defaultIntVal = defaultIntVal;
+      this.defaultFloatVal = -1;
       this.valClass = Integer.class;
       this.defaultBoolVal = false;
     }
 
+    ConfVars(String varname, float defaultFloatVal) {
+      this.varname = varname;
+      this.defaultVal = null;
+      this.defaultIntVal = -1;
+      this.defaultFloatVal = defaultFloatVal;
+      this.valClass = Float.class;
+      this.defaultBoolVal = false;
+    }
+
     ConfVars(String varname, boolean defaultBoolVal) {
       this.varname = varname;
       this.defaultVal = null;
       this.defaultIntVal = -1;
+      this.defaultFloatVal = -1;
       this.valClass = Boolean.class;
       this.defaultBoolVal = defaultBoolVal;
     }
@@ -136,6 +150,15 @@
     return getIntVar(this, var);
   }
 
+  public static float getFloatVar(Configuration conf, ConfVars var) {
+    assert(var.valClass == Float.class);
+    return conf.getFloat(var.varname, var.defaultFloatVal);
+  }
+
+  public float getFloatVar(ConfVars var) {
+    return getFloatVar(this, var);
+  }
+
   public static boolean getBoolVar(Configuration conf, ConfVars var) {
     assert(var.valClass == Boolean.class);
     return conf.getBoolean(var.varname, var.defaultBoolVal);

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=727506&r1=727505&r2=727506&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Wed Dec 17 12:38:56 2008
@@ -26,6 +26,8 @@
 import java.util.Map;
 import java.io.Serializable;
 import java.lang.reflect.Method;
+import java.lang.reflect.Field;
+import java.lang.IllegalAccessException;
 
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.aggregationDesc;
@@ -37,13 +39,22 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.parse.RowResolver;
 import org.apache.hadoop.hive.ql.parse.OpParseContext;
+import org.apache.hadoop.hive.ql.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.ql.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * GroupBy operator implementation.
  */
 public class GroupByOperator extends Operator <groupByDesc> implements Serializable {
 
+  static final private Log LOG = LogFactory.getLog(JoinOperator.class.getName());
+
   private static final long serialVersionUID = 1L;
+  private static final int  NUMROWSESTIMATESIZE = 1000;
+
   transient protected ExprNodeEvaluator[] keyFields;
   transient protected ExprNodeEvaluator[][] aggregationParameterFields;
   // In the future, we may allow both count(DISTINCT a) and sum(DISTINCT a) in the same SQL clause,
@@ -69,6 +80,38 @@
   transient long    totalMemory;
   transient boolean hashAggr;
 
+  /**
+   * This is used to store the position and field names for variable length fields.
+   **/
+  class varLenFields {
+    int           aggrPos;
+    List<Field>   fields;
+    varLenFields(int aggrPos, List<Field> fields) {
+      this.aggrPos = aggrPos;
+      this.fields  = fields;
+    }
+
+    int getAggrPos() {
+      return aggrPos;
+    }
+
+    List<Field> getFields() {
+      return fields;
+    }
+  };
+
+  // for these positions, some variable primitive type (String) is used, so size cannot be estimated. sample it at runtime.
+  transient List<Integer> keyPositionsSize;
+
+  // for these positions, some variable primitive type (String) is used for the aggregation classes
+  transient List<varLenFields> aggrPositions;
+
+  transient int           fixedRowSize;
+  transient long          maxHashTblMemory;
+  transient int           totalVariableSize;
+  transient int           numEntriesVarSize;
+  transient int           numEntriesHashTable;
+
   public void initialize(Configuration hconf) throws HiveException {
     super.initialize(hconf);
     totalMemory = Runtime.getRuntime().totalMemory();
@@ -149,7 +192,9 @@
     } else {
       hashAggregations = new HashMap<ArrayList<Object>, UDAF[]>();
       hashAggr = true;
+      keyPositionsSize = new ArrayList<Integer>();
     }
+
     // init objectInspectors
     int totalFields = keyFields.length + aggregationClasses.length;
     objectInspectors = new ArrayList<ObjectInspector>(totalFields);
@@ -162,6 +207,132 @@
     }
     
     firstRow = true;
+    // estimate the number of hash table entries based on the size of each entry. Since the size of a entry
+    // is not known, estimate that based on the number of entries
+    if (conf.getMode() == groupByDesc.Mode.HASH)
+      computeMaxEntriesHashAggr(hconf);
+  }
+
+  /**
+   * Estimate the number of entries in map-side hash table. 
+   * The user can specify the total amount of memory to be used by the map-side hash. By default, all available
+   * memory is used. The size of each row is estimated, rather crudely, and the number of entries are figure out
+   * based on that. 
+   * @return number of entries that can fit in hash table - useful for map-side aggregation only
+   **/
+  private void computeMaxEntriesHashAggr(Configuration hconf) {
+    maxHashTblMemory = (long)(HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY) * Runtime.getRuntime().maxMemory());
+    estimateRowSize();
+  }
+
+  private static final int javaObjectOverHead    = 16;
+  private static final int javaHashEntryOverHead = 64;
+  private static final int javaSizePrimitiveType = 16;
+  private static final int javaSizeUnknownType   = 256;
+
+  /**
+   * The size of the element at position 'pos' is returned, if possible. 
+   * If the datatype is of variable length, STRING, a list of such key positions is maintained, and the size for such positions is
+   * then actually calculated at runtime.
+   * @param pos the position of the key
+   * @param c   the type of the key
+   * @return the size of this datatype
+   **/
+  private int getSize(int pos, Class<?> c) {
+    if (c.isPrimitive() ||
+        c.isInstance(new Boolean(true)) ||
+        c.isInstance(new Byte((byte)0)) ||
+        c.isInstance(new Short((short)0)) ||
+        c.isInstance(new Integer(0)) ||
+        c.isInstance(new Long(0)) ||
+        c.isInstance(new Float(0)) ||
+        c.isInstance(new Double(0)))
+      return javaSizePrimitiveType;
+
+    if (c.isInstance(new String())) {
+      keyPositionsSize.add(new Integer(pos));
+      return javaObjectOverHead;
+    }
+      
+    return javaSizeUnknownType;
+  }
+
+  /**
+   * The size of the element at position 'pos' is returned, if possible. 
+   * If the field is of variable length, STRING, a list of such field names for the field position is maintained, and the size 
+   * for such positions is then actually calculated at runtime.
+   * @param pos the position of the key
+   * @param c   the type of the key
+   * @param f   the field to be added
+   * @return the size of this datatype
+   **/
+  private int getSize(int pos, Class<?> c, Field f) {
+    if (c.isPrimitive() ||
+        c.isInstance(new Boolean(true)) ||
+        c.isInstance(new Byte((byte)0)) ||
+        c.isInstance(new Short((short)0)) ||
+        c.isInstance(new Integer(0)) ||
+        c.isInstance(new Long(0)) ||
+        c.isInstance(new Float(0)) ||
+        c.isInstance(new Double(0)))
+      return javaSizePrimitiveType;
+
+    if (c.isInstance(new String())) {
+      int idx = 0;
+      varLenFields v = null;
+      for (idx = 0; idx < aggrPositions.size(); idx++) {
+        v = aggrPositions.get(idx);
+        if (v.getAggrPos() == pos)
+          break;
+      }
+
+      if (idx == aggrPositions.size()) {
+        v = new varLenFields(pos, new ArrayList<Field>());
+        aggrPositions.add(v);
+      }
+
+      v.getFields().add(f);
+      return javaObjectOverHead;
+    }
+      
+    return javaSizeUnknownType;
+  }
+
+  /**
+   * @param pos position of the key
+   * @param typeinfo type of the input
+   * @return the size of this datatype
+   **/
+  private int getSize(int pos, TypeInfo typeInfo) {
+    if (typeInfo instanceof PrimitiveTypeInfo) 
+      return getSize(pos, typeInfo.getPrimitiveClass());
+    return javaSizeUnknownType;
+  }
+
+  /**
+   * @return the size of each row
+   **/
+  private void estimateRowSize() {
+    // estimate the size of each entry - 
+    // a datatype with unknown size (String/Struct etc. - is assumed to be 256 bytes for now).
+    // 64 bytes is the overhead for a reference
+    fixedRowSize = javaHashEntryOverHead;
+
+    ArrayList<exprNodeDesc> keys = conf.getKeys();
+
+    // Go over all the keys and get the size of the fields of fixed length. Keep track of the variable length keys
+    for (int pos = 0; pos < keys.size(); pos++)
+      fixedRowSize += getSize(pos, keys.get(pos).getTypeInfo());
+
+    // Go over all the aggregation classes and and get the size of the fields of fixed length. Keep track of the variable length
+    // fields in these aggregation classes.
+    for(int i=0; i < aggregationClasses.length; i++) {
+      fixedRowSize += javaObjectOverHead;
+      Class<? extends UDAF> agg = aggregationClasses[i];
+      Field[] fArr = agg.getFields();
+      for (Field f : fArr) 
+        fixedRowSize += getSize(i, agg, f);
+    }
   }
 
   protected UDAF[] newAggregations() throws HiveException {      
@@ -182,6 +353,7 @@
   protected void updateAggregations(UDAF[] aggs, Object row, ObjectInspector rowInspector, boolean hashAggr, boolean newEntry,
                                     Object[][] lastInvoke) throws HiveException {
     for(int ai=0; ai<aggs.length; ai++) {
+
       // Calculate the parameters 
       Object[] o = new Object[aggregationParameterFields[ai].length];
       for(int pi=0; pi<aggregationParameterFields[ai].length; pi++) {
@@ -266,14 +438,12 @@
       hashAggregations.put(newKeys, aggs);
       newEntry = true;
     }
-    
+
     // Update the aggs
     updateAggregations(aggs, row, rowInspector, true, newEntry, null);
     
-    // currently, we use a simple approximation - if 90% of memory is being
-    // used, flush 
-    long freeMemory = Runtime.getRuntime().freeMemory();
-    if (shouldBeFlushed(totalMemory, freeMemory)) {
+    // based on used-specified pramaters, check if the hash table needs to be flushed
+    if (shouldBeFlushed(newKeys)) {
       flush();
     }
   }
@@ -308,13 +478,52 @@
     updateAggregations(aggs, row, rowInspector, false, false, lastInvoke);
   }
 
-  private boolean shouldBeFlushed(long total, long free) {
-    if (10 * free >= total)
+  /**
+   * Based on user-parameters, should the hash table be flushed.
+   * @param newKeys keys for the row under consideration
+   **/
+  private boolean shouldBeFlushed(ArrayList<Object> newKeys) {
+    int numEntries = hashAggregations.size();
+
+    // The fixed size for the aggregation class is already known. Get the variable portion of the size every NUMROWSESTIMATESIZE rows.
+    if ((numEntries % NUMROWSESTIMATESIZE) == 0) {
+      for (Integer pos : keyPositionsSize) {
+        Object key = newKeys.get(pos.intValue());
+        totalVariableSize += ((String)key).length();
+      }
+
+      UDAF[] aggs = null;
+      if (aggrPositions.size() > 0)
+        aggs = hashAggregations.get(newKeys);
+
+      for (varLenFields v : aggrPositions) {
+        int     aggrPos          = v.getAggrPos();
+        List<Field> fieldsVarLen = v.getFields();
+        UDAF    agg              = aggs[aggrPos];
+
+        try 
+        {
+          for (Field f : fieldsVarLen)
+            totalVariableSize += ((String)f.get(agg)).length();
+        } catch (IllegalAccessException e) {
+          assert false;
+        }
+      }
+
+      numEntriesVarSize++;
+      // Update the number of entries that can fit in the hash table
+      numEntriesHashTable = (int)(maxHashTblMemory / (fixedRowSize + ((int)totalVariableSize/numEntriesVarSize)));
+      LOG.trace("Hash Aggr: #hash table = " + numEntries + " #max in hash table = " + numEntriesHashTable);
+    }
+
+    // flush if necessary
+    if (numEntries >= numEntriesHashTable)
       return true;
     return false;
   }
 
   private void flush() throws HiveException {
+    
     // Currently, the algorithm flushes 10% of the entries - this can be
     // changed in the future
 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=727506&r1=727505&r2=727506&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Wed Dec 17 12:38:56 2008
@@ -1512,6 +1512,14 @@
   private Operator genGroupByPlanReduceSinkOperator(QBParseInfo parseInfo,
       String dest, Operator inputOperatorInfo)
       throws SemanticException {
+
+    return genGroupByPlanReduceSinkOperator(parseInfo, dest, inputOperatorInfo, -1, false);
+  }
+
+  @SuppressWarnings("nls")
+  private Operator genGroupByPlanReduceSinkOperator(QBParseInfo parseInfo,
+    String dest, Operator inputOperatorInfo, int numReducers, boolean inferNumReducers)
+    throws SemanticException {
     RowResolver reduceSinkInputRowResolver = opParseCtx.get(inputOperatorInfo).getRR();
     RowResolver reduceSinkOutputRowResolver = new RowResolver();
     reduceSinkOutputRowResolver.setIsExprResolver(true);
@@ -1569,7 +1577,7 @@
     return putOpInsertMap(
       OperatorFactory.getAndMakeChild(
         PlanUtils.getReduceSinkDesc(reduceKeys, reduceValues, -1,
-                                    (parseInfo.getDistinctFuncExprForClause(dest) == null ? -1 : Integer.MAX_VALUE), -1, false),
+                                    (parseInfo.getDistinctFuncExprForClause(dest) == null ? -1 : Integer.MAX_VALUE), numReducers, inferNumReducers),
         new RowSchema(reduceSinkOutputRowResolver.getColumnInfos()),
         inputOperatorInfo),
       reduceSinkOutputRowResolver);
@@ -1885,12 +1893,14 @@
     Operator groupByOperatorInfo = genGroupByPlanMapGroupByOperator(qb,
       dest, inputOperatorInfo, groupByDesc.Mode.HASH);
 
-    // ////// Generate ReduceSink Operator
-    Operator reduceSinkOperatorInfo = 
-      genGroupByPlanReduceSinkOperator(parseInfo, dest, groupByOperatorInfo);
-
     // Optimize the scenario when there are no grouping keys and no distinct - 2 map-reduce jobs are not needed
+    // For eg: select count(1) from T where t.ds = ....
     if (!optimizeMapAggrGroupBy(dest, qb)) {
+
+      // ////// Generate ReduceSink Operator
+      Operator reduceSinkOperatorInfo = 
+        genGroupByPlanReduceSinkOperator(parseInfo, dest, groupByOperatorInfo);
+      
       // ////// Generate GroupbyOperator for a partial aggregation
       Operator groupByOperatorInfo2 = genGroupByPlanGroupByOperator1(parseInfo, dest, reduceSinkOperatorInfo, 
                                                                          groupByDesc.Mode.PARTIAL2);
@@ -1902,8 +1912,13 @@
       // ////// Generate GroupbyOperator3
       return genGroupByPlanGroupByOperator2MR(parseInfo, dest, reduceSinkOperatorInfo2, groupByDesc.Mode.FINAL);
     }
-    else
+    else {
+      // ////// Generate ReduceSink Operator
+      Operator reduceSinkOperatorInfo = 
+        genGroupByPlanReduceSinkOperator(parseInfo, dest, groupByOperatorInfo, 1, false);
+      
       return genGroupByPlanGroupByOperator2MR(parseInfo, dest, reduceSinkOperatorInfo, groupByDesc.Mode.FINAL);
+    }
   }
 
   @SuppressWarnings("nls")

Modified: hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby4_map.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby4_map.q.out?rev=727506&r1=727505&r2=727506&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby4_map.q.out (original)
+++ hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby4_map.q.out Wed Dec 17 12:38:56 2008
@@ -24,6 +24,7 @@
                   value expressions:
                         expr: 0
                         type: bigint
+      # Reducers: 1
       Reduce Operator Tree:
         Group By Operator
           aggregations:

Modified: hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby5_map.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby5_map.q.out?rev=727506&r1=727505&r2=727506&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby5_map.q.out (original)
+++ hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby5_map.q.out Wed Dec 17 12:38:56 2008
@@ -27,6 +27,7 @@
                   value expressions:
                         expr: 0
                         type: double
+      # Reducers: 1
       Reduce Operator Tree:
         Group By Operator
           aggregations: