You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2010/01/21 11:38:15 UTC

svn commit: r901644 [6/37] - in /hadoop/hive/trunk: ./ ql/src/java/org/apache/hadoop/hive/ql/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ ql/src/java/org/apache/hadoop/hive/ql/history/ ql/src/java...

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Thu Jan 21 10:37:58 2010
@@ -18,109 +18,115 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
+import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
-import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.lang.IllegalAccessException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.OpParseContext;
 import org.apache.hadoop.hive.ql.plan.aggregationDesc;
 import org.apache.hadoop.hive.ql.plan.exprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.groupByDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.parse.OpParseContext;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
-import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.io.Text;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 
 /**
  * GroupBy operator implementation.
  */
-public class GroupByOperator extends Operator <groupByDesc> implements Serializable {
+public class GroupByOperator extends Operator<groupByDesc> implements
+    Serializable {
 
-  static final private Log LOG = LogFactory.getLog(GroupByOperator.class.getName());
+  static final private Log LOG = LogFactory.getLog(GroupByOperator.class
+      .getName());
 
   private static final long serialVersionUID = 1L;
-  private static final int  NUMROWSESTIMATESIZE = 1000;
+  private static final int NUMROWSESTIMATESIZE = 1000;
 
   transient protected ExprNodeEvaluator[] keyFields;
   transient protected ObjectInspector[] keyObjectInspectors;
   transient protected Object[] keyObjects;
-  
+
   transient protected ExprNodeEvaluator[][] aggregationParameterFields;
   transient protected ObjectInspector[][] aggregationParameterObjectInspectors;
   transient protected ObjectInspector[][] aggregationParameterStandardObjectInspectors;
   transient protected Object[][] aggregationParameterObjects;
-  // In the future, we may allow both count(DISTINCT a) and sum(DISTINCT a) in the same SQL clause,
-  // so aggregationIsDistinct is a boolean array instead of a single number. 
+  // In the future, we may allow both count(DISTINCT a) and sum(DISTINCT a) in
+  // the same SQL clause,
+  // so aggregationIsDistinct is a boolean array instead of a single number.
   transient protected boolean[] aggregationIsDistinct;
 
   transient GenericUDAFEvaluator[] aggregationEvaluators;
-  
+
   transient protected ArrayList<ObjectInspector> objectInspectors;
   transient ArrayList<String> fieldNames;
 
-  // Used by sort-based GroupBy: Mode = COMPLETE, PARTIAL1, PARTIAL2, MERGEPARTIAL
+  // Used by sort-based GroupBy: Mode = COMPLETE, PARTIAL1, PARTIAL2,
+  // MERGEPARTIAL
   transient protected ArrayList<Object> currentKeys;
-  transient protected ArrayList<Object> newKeys;  
+  transient protected ArrayList<Object> newKeys;
   transient protected AggregationBuffer[] aggregations;
   transient protected Object[][] aggregationsParametersLastInvoke;
 
   // Used by hash-based GroupBy: Mode = HASH, PARTIALS
   transient protected HashMap<KeyWrapper, AggregationBuffer[]> hashAggregations;
-  
+
   // Used by hash distinct aggregations when hashGrpKeyNotRedKey is true
   transient protected HashSet<ArrayList<Object>> keysCurrentGroup;
-  
+
   transient boolean bucketGroup;
-  
+
   transient boolean firstRow;
-  transient long    totalMemory;
+  transient long totalMemory;
   transient boolean hashAggr;
-  // The reduction is happening on the reducer, and the grouping key and reduction keys are different.
+  // The reduction is happening on the reducer, and the grouping key and
+  // reduction keys are different.
   // For example: select a, count(distinct b) from T group by a
   // The data is sprayed by 'b' and the reducer is grouping it by 'a'
-  transient boolean groupKeyIsNotReduceKey;  
+  transient boolean groupKeyIsNotReduceKey;
   transient boolean firstRowInGroup;
-  transient long    numRowsInput;
-  transient long    numRowsHashTbl;
-  transient int     groupbyMapAggrInterval;
-  transient long    numRowsCompareHashAggr;
-  transient float   minReductionHashAggr;
+  transient long numRowsInput;
+  transient long numRowsHashTbl;
+  transient int groupbyMapAggrInterval;
+  transient long numRowsCompareHashAggr;
+  transient float minReductionHashAggr;
 
   // current Key ObjectInspectors are standard ObjectInspectors
   transient protected ObjectInspector[] currentKeyObjectInspectors;
   // new Key ObjectInspectors are objectInspectors from the parent
   transient StructObjectInspector newKeyObjectInspector;
   transient StructObjectInspector currentKeyObjectInspector;
-  
+
   /**
-   * This is used to store the position and field names for variable length fields.
+   * This is used to store the position and field names for variable length
+   * fields.
    **/
   class varLenFields {
-    int           aggrPos;
-    List<Field>   fields;
+    int aggrPos;
+    List<Field> fields;
+
     varLenFields(int aggrPos, List<Field> fields) {
       this.aggrPos = aggrPos;
-      this.fields  = fields;
+      this.fields = fields;
     }
 
     int getAggrPos() {
@@ -132,24 +138,27 @@
     }
   };
 
-  // for these positions, some variable primitive type (String) is used, so size cannot be estimated. sample it at runtime.
+  // for these positions, some variable primitive type (String) is used, so size
+  // cannot be estimated. sample it at runtime.
   transient List<Integer> keyPositionsSize;
 
-  // for these positions, some variable primitive type (String) is used for the aggregation classes
+  // for these positions, some variable primitive type (String) is used for the
+  // aggregation classes
   transient List<varLenFields> aggrPositions;
 
-  transient int           fixedRowSize;
-  transient long          maxHashTblMemory;
-  transient int           totalVariableSize;
-  transient int           numEntriesVarSize;
-  transient int           numEntriesHashTable;
-  
+  transient int fixedRowSize;
+  transient long maxHashTblMemory;
+  transient int totalVariableSize;
+  transient int numEntriesVarSize;
+  transient int numEntriesHashTable;
+
+  @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     totalMemory = Runtime.getRuntime().totalMemory();
     numRowsInput = 0;
     numRowsHashTbl = 0;
 
-    assert(inputObjInspectors.length == 1);
+    assert (inputObjInspectors.length == 1);
     ObjectInspector rowInspector = inputObjInspectors[0];
 
     // init keyFields
@@ -160,40 +169,51 @@
     for (int i = 0; i < keyFields.length; i++) {
       keyFields[i] = ExprNodeEvaluatorFactory.get(conf.getKeys().get(i));
       keyObjectInspectors[i] = keyFields[i].initialize(rowInspector);
-      currentKeyObjectInspectors[i] = ObjectInspectorUtils.getStandardObjectInspector(keyObjectInspectors[i], 
-          ObjectInspectorCopyOption.WRITABLE);
+      currentKeyObjectInspectors[i] = ObjectInspectorUtils
+          .getStandardObjectInspector(keyObjectInspectors[i],
+              ObjectInspectorCopyOption.WRITABLE);
       keyObjects[i] = null;
     }
     newKeys = new ArrayList<Object>(keyFields.length);
-    
+
     // init aggregationParameterFields
-    aggregationParameterFields = new ExprNodeEvaluator[conf.getAggregators().size()][];
-    aggregationParameterObjectInspectors = new ObjectInspector[conf.getAggregators().size()][];
-    aggregationParameterStandardObjectInspectors = new ObjectInspector[conf.getAggregators().size()][];
+    aggregationParameterFields = new ExprNodeEvaluator[conf.getAggregators()
+        .size()][];
+    aggregationParameterObjectInspectors = new ObjectInspector[conf
+        .getAggregators().size()][];
+    aggregationParameterStandardObjectInspectors = new ObjectInspector[conf
+        .getAggregators().size()][];
     aggregationParameterObjects = new Object[conf.getAggregators().size()][];
     for (int i = 0; i < aggregationParameterFields.length; i++) {
-      ArrayList<exprNodeDesc> parameters = conf.getAggregators().get(i).getParameters();
+      ArrayList<exprNodeDesc> parameters = conf.getAggregators().get(i)
+          .getParameters();
       aggregationParameterFields[i] = new ExprNodeEvaluator[parameters.size()];
-      aggregationParameterObjectInspectors[i] = new ObjectInspector[parameters.size()];
-      aggregationParameterStandardObjectInspectors[i] = new ObjectInspector[parameters.size()];
+      aggregationParameterObjectInspectors[i] = new ObjectInspector[parameters
+          .size()];
+      aggregationParameterStandardObjectInspectors[i] = new ObjectInspector[parameters
+          .size()];
       aggregationParameterObjects[i] = new Object[parameters.size()];
       for (int j = 0; j < parameters.size(); j++) {
-        aggregationParameterFields[i][j] = ExprNodeEvaluatorFactory.get(parameters.get(j));
-        aggregationParameterObjectInspectors[i][j] = aggregationParameterFields[i][j].initialize(rowInspector);
-        aggregationParameterStandardObjectInspectors[i][j] = 
-            ObjectInspectorUtils.getStandardObjectInspector(aggregationParameterObjectInspectors[i][j], 
+        aggregationParameterFields[i][j] = ExprNodeEvaluatorFactory
+            .get(parameters.get(j));
+        aggregationParameterObjectInspectors[i][j] = aggregationParameterFields[i][j]
+            .initialize(rowInspector);
+        aggregationParameterStandardObjectInspectors[i][j] = ObjectInspectorUtils
+            .getStandardObjectInspector(
+                aggregationParameterObjectInspectors[i][j],
                 ObjectInspectorCopyOption.WRITABLE);
         aggregationParameterObjects[i][j] = null;
       }
     }
     // init aggregationIsDistinct
     aggregationIsDistinct = new boolean[conf.getAggregators().size()];
-    for(int i=0; i<aggregationIsDistinct.length; i++) {
+    for (int i = 0; i < aggregationIsDistinct.length; i++) {
       aggregationIsDistinct[i] = conf.getAggregators().get(i).getDistinct();
     }
 
-    // init aggregationClasses  
-    aggregationEvaluators = new GenericUDAFEvaluator[conf.getAggregators().size()];
+    // init aggregationClasses
+    aggregationEvaluators = new GenericUDAFEvaluator[conf.getAggregators()
+        .size()];
     for (int i = 0; i < aggregationEvaluators.length; i++) {
       aggregationDesc agg = conf.getAggregators().get(i);
       aggregationEvaluators[i] = agg.getGenericUDAFEvaluator();
@@ -202,15 +222,15 @@
     // init objectInspectors
     int totalFields = keyFields.length + aggregationEvaluators.length;
     objectInspectors = new ArrayList<ObjectInspector>(totalFields);
-    for(int i=0; i<keyFields.length; i++) {
+    for (ExprNodeEvaluator keyField : keyFields) {
       objectInspectors.add(null);
     }
-    for(int i=0; i<aggregationEvaluators.length; i++) {
-      ObjectInspector roi = aggregationEvaluators[i].init(
-          conf.getAggregators().get(i).getMode(), aggregationParameterObjectInspectors[i]);
+    for (int i = 0; i < aggregationEvaluators.length; i++) {
+      ObjectInspector roi = aggregationEvaluators[i].init(conf.getAggregators()
+          .get(i).getMode(), aggregationParameterObjectInspectors[i]);
       objectInspectors.add(roi);
     }
-    
+
     bucketGroup = conf.getBucketGroup();
     aggregationsParametersLastInvoke = new Object[conf.getAggregators().size()][];
     if (conf.getMode() != groupByDesc.Mode.HASH || bucketGroup) {
@@ -222,117 +242,137 @@
       hashAggr = true;
       keyPositionsSize = new ArrayList<Integer>();
       aggrPositions = new ArrayList<varLenFields>();
-      groupbyMapAggrInterval = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEGROUPBYMAPINTERVAL);
+      groupbyMapAggrInterval = HiveConf.getIntVar(hconf,
+          HiveConf.ConfVars.HIVEGROUPBYMAPINTERVAL);
 
       // compare every groupbyMapAggrInterval rows
       numRowsCompareHashAggr = groupbyMapAggrInterval;
-      minReductionHashAggr = HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEMAPAGGRHASHMINREDUCTION);
+      minReductionHashAggr = HiveConf.getFloatVar(hconf,
+          HiveConf.ConfVars.HIVEMAPAGGRHASHMINREDUCTION);
       groupKeyIsNotReduceKey = conf.getGroupKeyNotReductionKey();
-      if (groupKeyIsNotReduceKey)
+      if (groupKeyIsNotReduceKey) {
         keysCurrentGroup = new HashSet<ArrayList<Object>>();
+      }
     }
 
     fieldNames = conf.getOutputColumnNames();
 
     for (int i = 0; i < keyFields.length; i++) {
-      objectInspectors.set(i, currentKeyObjectInspectors[i]);      
+      objectInspectors.set(i, currentKeyObjectInspectors[i]);
     }
-    
+
     // Generate key names
     ArrayList<String> keyNames = new ArrayList<String>(keyFields.length);
     for (int i = 0; i < keyFields.length; i++) {
       keyNames.add(fieldNames.get(i));
     }
-    newKeyObjectInspector = 
-        ObjectInspectorFactory.getStandardStructObjectInspector(keyNames, Arrays.asList(keyObjectInspectors));
-    currentKeyObjectInspector = 
-        ObjectInspectorFactory.getStandardStructObjectInspector(keyNames, Arrays.asList(currentKeyObjectInspectors));
-    
-    outputObjInspector = 
-      ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, objectInspectors);
+    newKeyObjectInspector = ObjectInspectorFactory
+        .getStandardStructObjectInspector(keyNames, Arrays
+            .asList(keyObjectInspectors));
+    currentKeyObjectInspector = ObjectInspectorFactory
+        .getStandardStructObjectInspector(keyNames, Arrays
+            .asList(currentKeyObjectInspectors));
+
+    outputObjInspector = ObjectInspectorFactory
+        .getStandardStructObjectInspector(fieldNames, objectInspectors);
 
     firstRow = true;
-    // estimate the number of hash table entries based on the size of each entry. Since the size of a entry
+    // estimate the number of hash table entries based on the size of each
+    // entry. Since the size of a entry
     // is not known, estimate that based on the number of entries
-    if (hashAggr)
+    if (hashAggr) {
       computeMaxEntriesHashAggr(hconf);
+    }
     initializeChildren(hconf);
   }
 
   /**
-   * Estimate the number of entries in map-side hash table. 
-   * The user can specify the total amount of memory to be used by the map-side hash. By default, all available
-   * memory is used. The size of each row is estimated, rather crudely, and the number of entries are figure out
-   * based on that. 
-   * @return number of entries that can fit in hash table - useful for map-side aggregation only
+   * Estimate the number of entries in map-side hash table. The user can specify
+   * the total amount of memory to be used by the map-side hash. By default, all
+   * available memory is used. The size of each row is estimated, rather
+   * crudely, and the number of entries are figure out based on that.
+   * 
+   * @return number of entries that can fit in hash table - useful for map-side
+   *         aggregation only
    **/
-  private void computeMaxEntriesHashAggr(Configuration hconf) throws HiveException {
-    maxHashTblMemory = (long)(HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY) * Runtime.getRuntime().maxMemory());
+  private void computeMaxEntriesHashAggr(Configuration hconf)
+      throws HiveException {
+    maxHashTblMemory = (long) (HiveConf.getFloatVar(hconf,
+        HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY) * Runtime.getRuntime()
+        .maxMemory());
     estimateRowSize();
   }
 
-  private static final int javaObjectOverHead    = 64;
+  private static final int javaObjectOverHead = 64;
   private static final int javaHashEntryOverHead = 64;
   private static final int javaSizePrimitiveType = 16;
-  private static final int javaSizeUnknownType   = 256;
+  private static final int javaSizeUnknownType = 256;
 
   /**
-   * The size of the element at position 'pos' is returned, if possible. 
-   * If the datatype is of variable length, STRING, a list of such key positions is maintained, and the size for such positions is
-   * then actually calculated at runtime.
-   * @param pos the position of the key
-   * @param c   the type of the key
+   * The size of the element at position 'pos' is returned, if possible. If the
+   * datatype is of variable length, STRING, a list of such key positions is
+   * maintained, and the size for such positions is then actually calculated at
+   * runtime.
+   * 
+   * @param pos
+   *          the position of the key
+   * @param c
+   *          the type of the key
    * @return the size of this datatype
    **/
   private int getSize(int pos, PrimitiveCategory category) {
-    switch(category) {
-      case VOID:
-      case BOOLEAN:
-      case BYTE:
-      case SHORT:
-      case INT:
-      case LONG:
-      case FLOAT:
-      case DOUBLE: {
-        return javaSizePrimitiveType;
-      }
-      case STRING: {
-        keyPositionsSize.add(new Integer(pos));
-        return javaObjectOverHead;
-      }
-      default: {
-        return javaSizeUnknownType;
-      }
+    switch (category) {
+    case VOID:
+    case BOOLEAN:
+    case BYTE:
+    case SHORT:
+    case INT:
+    case LONG:
+    case FLOAT:
+    case DOUBLE: {
+      return javaSizePrimitiveType;
+    }
+    case STRING: {
+      keyPositionsSize.add(new Integer(pos));
+      return javaObjectOverHead;
+    }
+    default: {
+      return javaSizeUnknownType;
+    }
     }
   }
 
   /**
-   * The size of the element at position 'pos' is returned, if possible. 
-   * If the field is of variable length, STRING, a list of such field names for the field position is maintained, and the size 
-   * for such positions is then actually calculated at runtime.
-   * @param pos the position of the key
-   * @param c   the type of the key
-   * @param f   the field to be added
+   * The size of the element at position 'pos' is returned, if possible. If the
+   * field is of variable length, STRING, a list of such field names for the
+   * field position is maintained, and the size for such positions is then
+   * actually calculated at runtime.
+   * 
+   * @param pos
+   *          the position of the key
+   * @param c
+   *          the type of the key
+   * @param f
+   *          the field to be added
    * @return the size of this datatype
    **/
   private int getSize(int pos, Class<?> c, Field f) {
-    if (c.isPrimitive() ||
-        c.isInstance(new Boolean(true)) ||
-        c.isInstance(new Byte((byte)0)) ||
-        c.isInstance(new Short((short)0)) ||
-        c.isInstance(new Integer(0)) ||
-        c.isInstance(new Long(0)) ||
-        c.isInstance(new Float(0)) ||
-        c.isInstance(new Double(0)))
+    if (c.isPrimitive() || c.isInstance(new Boolean(true))
+        || c.isInstance(new Byte((byte) 0))
+        || c.isInstance(new Short((short) 0)) || c.isInstance(new Integer(0))
+        || c.isInstance(new Long(0)) || c.isInstance(new Float(0))
+        || c.isInstance(new Double(0))) {
       return javaSizePrimitiveType;
+    }
 
     if (c.isInstance(new String())) {
       int idx = 0;
       varLenFields v = null;
       for (idx = 0; idx < aggrPositions.size(); idx++) {
         v = aggrPositions.get(idx);
-        if (v.getAggrPos() == pos)
+        if (v.getAggrPos() == pos) {
           break;
+        }
       }
 
       if (idx == aggrPositions.size()) {
@@ -343,18 +383,21 @@
       v.getFields().add(f);
       return javaObjectOverHead;
     }
-      
+
     return javaSizeUnknownType;
   }
 
   /**
-   * @param pos position of the key
-   * @param typeinfo type of the input
+   * @param pos
+   *          position of the key
+   * @param typeinfo
+   *          type of the input
    * @return the size of this datatype
    **/
   private int getSize(int pos, TypeInfo typeInfo) {
-    if (typeInfo instanceof PrimitiveTypeInfo) 
-      return getSize(pos, ((PrimitiveTypeInfo)typeInfo).getPrimitiveCategory());
+    if (typeInfo instanceof PrimitiveTypeInfo) {
+      return getSize(pos, ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory());
+    }
     return javaSizeUnknownType;
   }
 
@@ -362,23 +405,28 @@
    * @return the size of each row
    **/
   private void estimateRowSize() throws HiveException {
-    // estimate the size of each entry - 
-    // a datatype with unknown size (String/Struct etc. - is assumed to be 256 bytes for now).
+    // estimate the size of each entry -
+    // a datatype with unknown size (String/Struct etc. - is assumed to be 256
+    // bytes for now).
     // 64 bytes is the overhead for a reference
     fixedRowSize = javaHashEntryOverHead;
 
     ArrayList<exprNodeDesc> keys = conf.getKeys();
 
-    // Go over all the keys and get the size of the fields of fixed length. Keep track of the variable length keys
-    for (int pos = 0; pos < keys.size(); pos++)
+    // Go over all the keys and get the size of the fields of fixed length. Keep
+    // track of the variable length keys
+    for (int pos = 0; pos < keys.size(); pos++) {
       fixedRowSize += getSize(pos, keys.get(pos).getTypeInfo());
+    }
 
-    // Go over all the aggregation classes and and get the size of the fields of fixed length. Keep track of the variable length
+    // Go over all the aggregation classes and and get the size of the fields of
+    // fixed length. Keep track of the variable length
     // fields in these aggregation classes.
-    for(int i=0; i < aggregationEvaluators.length; i++) {
+    for (int i = 0; i < aggregationEvaluators.length; i++) {
 
       fixedRowSize += javaObjectOverHead;
-      Class<? extends AggregationBuffer> agg = aggregationEvaluators[i].getNewAggregationBuffer().getClass();
+      Class<? extends AggregationBuffer> agg = aggregationEvaluators[i]
+          .getNewAggregationBuffer().getClass();
       Field[] fArr = ObjectInspectorUtils.getDeclaredNonStaticFields(agg);
       for (Field f : fArr) {
         fixedRowSize += getSize(i, f.getType(), f);
@@ -386,43 +434,50 @@
     }
   }
 
-  protected AggregationBuffer[] newAggregations() throws HiveException {      
+  protected AggregationBuffer[] newAggregations() throws HiveException {
     AggregationBuffer[] aggs = new AggregationBuffer[aggregationEvaluators.length];
-    for(int i=0; i<aggregationEvaluators.length; i++) {
+    for (int i = 0; i < aggregationEvaluators.length; i++) {
       aggs[i] = aggregationEvaluators[i].getNewAggregationBuffer();
       // aggregationClasses[i].reset(aggs[i]);
     }
     return aggs;
   }
 
-  protected void resetAggregations(AggregationBuffer[] aggs) throws HiveException {      
-    for(int i=0; i<aggs.length; i++) {
+  protected void resetAggregations(AggregationBuffer[] aggs)
+      throws HiveException {
+    for (int i = 0; i < aggs.length; i++) {
       aggregationEvaluators[i].reset(aggs[i]);
     }
   }
 
-
   /*
-   * Update aggregations. 
-   * If the aggregation is for distinct, in case of hash aggregation, the client tells us whether it is a new entry.
-   * For sort-based aggregations, the last row is compared with the current one to figure out whether it has changed. 
-   * As a cleanup, the lastInvoke logic can be pushed in the caller, and this function can be independent of that. The
-   * client should always notify whether it is a different row or not.
+   * Update aggregations. If the aggregation is for distinct, in case of hash
+   * aggregation, the client tells us whether it is a new entry. For sort-based
+   * aggregations, the last row is compared with the current one to figure out
+   * whether it has changed. As a cleanup, the lastInvoke logic can be pushed in
+   * the caller, and this function can be independent of that. The client should
+   * always notify whether it is a different row or not.
    * 
    * @param aggs the aggregations to be evaluated
-   * @param row  the row being processed
+   * 
+   * @param row the row being processed
+   * 
    * @param rowInspector the inspector for the row
+   * 
    * @param hashAggr whether hash aggregation is being performed or not
-   * @param newEntryForHashAggr only valid if it is a hash aggregation, whether it is a new entry or not
+   * 
+   * @param newEntryForHashAggr only valid if it is a hash aggregation, whether
+   * it is a new entry or not
    */
-  protected void updateAggregations(AggregationBuffer[] aggs, Object row, ObjectInspector rowInspector, boolean hashAggr, boolean newEntryForHashAggr,
-                                    Object[][] lastInvoke) throws HiveException {
-    
-    for(int ai=0; ai<aggs.length; ai++) {
+  protected void updateAggregations(AggregationBuffer[] aggs, Object row,
+      ObjectInspector rowInspector, boolean hashAggr,
+      boolean newEntryForHashAggr, Object[][] lastInvoke) throws HiveException {
+
+    for (int ai = 0; ai < aggs.length; ai++) {
 
-      // Calculate the parameters 
+      // Calculate the parameters
       Object[] o = new Object[aggregationParameterFields[ai].length];
-      for(int pi=0; pi<aggregationParameterFields[ai].length; pi++) {
+      for (int pi = 0; pi < aggregationParameterFields[ai].length; pi++) {
         o[pi] = aggregationParameterFields[ai][pi].evaluate(row);
       }
 
@@ -432,36 +487,40 @@
           if (newEntryForHashAggr) {
             aggregationEvaluators[ai].aggregate(aggs[ai], o);
           }
-        }
-        else {
+        } else {
           if (lastInvoke[ai] == null) {
             lastInvoke[ai] = new Object[o.length];
           }
-          if (ObjectInspectorUtils.compare(o, aggregationParameterObjectInspectors[ai],
-                lastInvoke[ai], aggregationParameterStandardObjectInspectors[ai]) != 0) {
+          if (ObjectInspectorUtils.compare(o,
+              aggregationParameterObjectInspectors[ai], lastInvoke[ai],
+              aggregationParameterStandardObjectInspectors[ai]) != 0) {
             aggregationEvaluators[ai].aggregate(aggs[ai], o);
-            for (int pi=0; pi<o.length; pi++) {
-              lastInvoke[ai][pi] = ObjectInspectorUtils.copyToStandardObject(o[pi],
-                  aggregationParameterObjectInspectors[ai][pi], ObjectInspectorCopyOption.WRITABLE);
+            for (int pi = 0; pi < o.length; pi++) {
+              lastInvoke[ai][pi] = ObjectInspectorUtils.copyToStandardObject(
+                  o[pi], aggregationParameterObjectInspectors[ai][pi],
+                  ObjectInspectorCopyOption.WRITABLE);
             }
           }
         }
-      }
-      else {
+      } else {
         aggregationEvaluators[ai].aggregate(aggs[ai], o);
       }
     }
   }
 
+  @Override
   public void startGroup() throws HiveException {
     firstRowInGroup = true;
   }
 
+  @Override
   public void endGroup() throws HiveException {
-    if (groupKeyIsNotReduceKey)
+    if (groupKeyIsNotReduceKey) {
       keysCurrentGroup.clear();
+    }
   }
-  
+
+  @Override
   public void processOp(Object row, int tag) throws HiveException {
     firstRow = false;
     ObjectInspector rowInspector = inputObjInspectors[tag];
@@ -473,14 +532,17 @@
         numRowsCompareHashAggr += groupbyMapAggrInterval;
         // map-side aggregation should reduce the entries by at-least half
         if (numRowsHashTbl > numRowsInput * minReductionHashAggr) {
-          LOG.warn("Disable Hash Aggr: #hash table = " + numRowsHashTbl + " #total = " + numRowsInput 
-              + " reduction = " + 1.0*(numRowsHashTbl/numRowsInput) + " minReduction = " + minReductionHashAggr);
+          LOG.warn("Disable Hash Aggr: #hash table = " + numRowsHashTbl
+              + " #total = " + numRowsInput + " reduction = " + 1.0
+              * (numRowsHashTbl / numRowsInput) + " minReduction = "
+              + minReductionHashAggr);
           flush(true);
           hashAggr = false;
-        }
-        else {
-          LOG.trace("Hash Aggr Enabled: #hash table = " + numRowsHashTbl + " #total = " + numRowsInput 
-              + " reduction = " + 1.0*(numRowsHashTbl/numRowsInput) + " minReduction = " + minReductionHashAggr);
+        } else {
+          LOG.trace("Hash Aggr Enabled: #hash table = " + numRowsHashTbl
+              + " #total = " + numRowsInput + " reduction = " + 1.0
+              * (numRowsHashTbl / numRowsInput) + " minReduction = "
+              + minReductionHashAggr);
         }
       }
     }
@@ -496,10 +558,11 @@
         newKeys.add(keyObjects[i]);
       }
 
-      if (hashAggr)
+      if (hashAggr) {
         processHashAggr(row, rowInspector, newKeys);
-      else
+      } else {
         processAggr(row, rowInspector, newKeys);
+      }
 
       firstRowInGroup = false;
     } catch (HiveException e) {
@@ -509,167 +572,197 @@
     }
   }
 
-  private static ArrayList<Object> deepCopyElements(Object[] keys, ObjectInspector[] keyObjectInspectors,
+  private static ArrayList<Object> deepCopyElements(Object[] keys,
+      ObjectInspector[] keyObjectInspectors,
       ObjectInspectorCopyOption copyOption) {
     ArrayList<Object> result = new ArrayList<Object>(keys.length);
     deepCopyElements(keys, keyObjectInspectors, result, copyOption);
     return result;
   }
-  
-  private static void deepCopyElements(Object[] keys, ObjectInspector[] keyObjectInspectors, ArrayList<Object> result,
+
+  private static void deepCopyElements(Object[] keys,
+      ObjectInspector[] keyObjectInspectors, ArrayList<Object> result,
       ObjectInspectorCopyOption copyOption) {
     result.clear();
-    for (int i=0; i<keys.length; i++) {
-      result.add(ObjectInspectorUtils.copyToStandardObject(keys[i], keyObjectInspectors[i], copyOption));
+    for (int i = 0; i < keys.length; i++) {
+      result.add(ObjectInspectorUtils.copyToStandardObject(keys[i],
+          keyObjectInspectors[i], copyOption));
     }
   }
-  
-  class KeyWrapper{
+
+  class KeyWrapper {
     int hashcode;
     ArrayList<Object> keys;
     // decide whether this is already in hashmap (keys in hashmap are deepcopied
     // version, and we need to use 'currentKeyObjectInspector').
-    boolean copy = false; 
-    
-    KeyWrapper() {}
-    
+    boolean copy = false;
+
+    KeyWrapper() {
+    }
+
     public KeyWrapper(int hashcode, ArrayList<Object> copiedKeys) {
       this(hashcode, copiedKeys, false);
     }
-    
-    public KeyWrapper(int hashcode, ArrayList<Object> copiedKeys, boolean inHashMap) {
+
+    public KeyWrapper(int hashcode, ArrayList<Object> copiedKeys,
+        boolean inHashMap) {
       super();
       this.hashcode = hashcode;
-      this.keys = copiedKeys;
-      this.copy = inHashMap;
+      keys = copiedKeys;
+      copy = inHashMap;
     }
-    
-    public int hashCode(){
+
+    @Override
+    public int hashCode() {
       return hashcode;
     }
-    
+
+    @Override
     public boolean equals(Object obj) {
       ArrayList<Object> copied_in_hashmap = ((KeyWrapper) obj).keys;
-      if(!copy)
-       return ObjectInspectorUtils.compare(copied_in_hashmap, currentKeyObjectInspector, keys, newKeyObjectInspector) == 0;
-      else
-       return ObjectInspectorUtils.compare(copied_in_hashmap, currentKeyObjectInspector, keys, currentKeyObjectInspector) == 0;
+      if (!copy) {
+        return ObjectInspectorUtils.compare(copied_in_hashmap,
+            currentKeyObjectInspector, keys, newKeyObjectInspector) == 0;
+      } else {
+        return ObjectInspectorUtils.compare(copied_in_hashmap,
+            currentKeyObjectInspector, keys, currentKeyObjectInspector) == 0;
+      }
     }
   }
-  
+
   KeyWrapper keyProber = new KeyWrapper();
-  private void processHashAggr(Object row, ObjectInspector rowInspector, ArrayList<Object> newKeys) throws HiveException {
+
+  private void processHashAggr(Object row, ObjectInspector rowInspector,
+      ArrayList<Object> newKeys) throws HiveException {
     // Prepare aggs for updating
     AggregationBuffer[] aggs = null;
     boolean newEntryForHashAggr = false;
-    
+
     keyProber.hashcode = newKeys.hashCode();
-    //use this to probe the hashmap
+    // use this to probe the hashmap
     keyProber.keys = newKeys;
-    
+
     // hash-based aggregations
     aggs = hashAggregations.get(keyProber);
     ArrayList<Object> newDefaultKeys = null;
-    if(aggs == null) {
-      newDefaultKeys = deepCopyElements(keyObjects, keyObjectInspectors, ObjectInspectorCopyOption.WRITABLE);
-      KeyWrapper newKeyProber = new KeyWrapper(keyProber.hashcode, newDefaultKeys, true);
+    if (aggs == null) {
+      newDefaultKeys = deepCopyElements(keyObjects, keyObjectInspectors,
+          ObjectInspectorCopyOption.WRITABLE);
+      KeyWrapper newKeyProber = new KeyWrapper(keyProber.hashcode,
+          newDefaultKeys, true);
       aggs = newAggregations();
       hashAggregations.put(newKeyProber, aggs);
       newEntryForHashAggr = true;
-      numRowsHashTbl++;      // new entry in the hash table
+      numRowsHashTbl++; // new entry in the hash table
     }
-    
-    // If the grouping key and the reduction key are different, a set of grouping keys for the current reduction key are maintained in keysCurrentGroup
-    // Peek into the set to find out if a new grouping key is seen for the given reduction key
+
+    // If the grouping key and the reduction key are different, a set of
+    // grouping keys for the current reduction key are maintained in
+    // keysCurrentGroup
+    // Peek into the set to find out if a new grouping key is seen for the given
+    // reduction key
     if (groupKeyIsNotReduceKey) {
-      if(newDefaultKeys == null)
-        newDefaultKeys = deepCopyElements(keyObjects, keyObjectInspectors, ObjectInspectorCopyOption.WRITABLE);
+      if (newDefaultKeys == null) {
+        newDefaultKeys = deepCopyElements(keyObjects, keyObjectInspectors,
+            ObjectInspectorCopyOption.WRITABLE);
+      }
       newEntryForHashAggr = keysCurrentGroup.add(newDefaultKeys);
     }
 
     // Update the aggs
     updateAggregations(aggs, row, rowInspector, true, newEntryForHashAggr, null);
 
-    // We can only flush after the updateAggregations is done, or the potentially new entry "aggs"
+    // We can only flush after the updateAggregations is done, or the
+    // potentially new entry "aggs"
     // can be flushed out of the hash table.
-    
-    // Based on user-specified parameters, check if the hash table needs to be flushed.
-    // If the grouping key is not the same as reduction key, flushing can only happen at boundaries
-    if ((!groupKeyIsNotReduceKey || firstRowInGroup) && shouldBeFlushed(newKeys)) {
+
+    // Based on user-specified parameters, check if the hash table needs to be
+    // flushed.
+    // If the grouping key is not the same as reduction key, flushing can only
+    // happen at boundaries
+    if ((!groupKeyIsNotReduceKey || firstRowInGroup)
+        && shouldBeFlushed(newKeys)) {
       flush(false);
     }
   }
 
   // Non-hash aggregation
-  private void processAggr(Object row, ObjectInspector rowInspector, ArrayList<Object> newKeys) throws HiveException {
+  private void processAggr(Object row, ObjectInspector rowInspector,
+      ArrayList<Object> newKeys) throws HiveException {
     // Prepare aggs for updating
     AggregationBuffer[] aggs = null;
     Object[][] lastInvoke = null;
-    boolean keysAreEqual = ObjectInspectorUtils.compare(
-        newKeys, newKeyObjectInspector,
-        currentKeys, currentKeyObjectInspector) == 0;
-    
+    boolean keysAreEqual = ObjectInspectorUtils.compare(newKeys,
+        newKeyObjectInspector, currentKeys, currentKeyObjectInspector) == 0;
+
     // Forward the current keys if needed for sort-based aggregation
-    if (currentKeys != null && !keysAreEqual)
+    if (currentKeys != null && !keysAreEqual) {
       forward(currentKeys, aggregations);
-    
+    }
+
     // Need to update the keys?
     if (currentKeys == null || !keysAreEqual) {
       if (currentKeys == null) {
         currentKeys = new ArrayList<Object>(keyFields.length);
       }
-      deepCopyElements(keyObjects, keyObjectInspectors, currentKeys, ObjectInspectorCopyOption.WRITABLE);
-      
+      deepCopyElements(keyObjects, keyObjectInspectors, currentKeys,
+          ObjectInspectorCopyOption.WRITABLE);
+
       // Reset the aggregations
       resetAggregations(aggregations);
-      
+
       // clear parameters in last-invoke
-      for(int i=0; i<aggregationsParametersLastInvoke.length; i++)
+      for (int i = 0; i < aggregationsParametersLastInvoke.length; i++) {
         aggregationsParametersLastInvoke[i] = null;
+      }
     }
-    
+
     aggs = aggregations;
-    
+
     lastInvoke = aggregationsParametersLastInvoke;
     // Update the aggs
-    
+
     updateAggregations(aggs, row, rowInspector, false, false, lastInvoke);
   }
 
   /**
    * Based on user-parameters, should the hash table be flushed.
-   * @param newKeys keys for the row under consideration
+   * 
+   * @param newKeys
+   *          keys for the row under consideration
    **/
   private boolean shouldBeFlushed(ArrayList<Object> newKeys) {
     int numEntries = hashAggregations.size();
 
-    // The fixed size for the aggregation class is already known. Get the variable portion of the size every NUMROWSESTIMATESIZE rows.
+    // The fixed size for the aggregation class is already known. Get the
+    // variable portion of the size every NUMROWSESTIMATESIZE rows.
     if ((numEntriesHashTable == 0) || ((numEntries % NUMROWSESTIMATESIZE) == 0)) {
       for (Integer pos : keyPositionsSize) {
         Object key = newKeys.get(pos.intValue());
         // Ignore nulls
         if (key != null) {
           if (key instanceof String) {
-            totalVariableSize += ((String)key).length();
+            totalVariableSize += ((String) key).length();
           } else if (key instanceof Text) {
-            totalVariableSize += ((Text)key).getLength();
+            totalVariableSize += ((Text) key).getLength();
           }
         }
       }
 
       AggregationBuffer[] aggs = null;
-      if (aggrPositions.size() > 0)
+      if (aggrPositions.size() > 0) {
         aggs = hashAggregations.get(newKeys);
+      }
 
       for (varLenFields v : aggrPositions) {
-        int     aggrPos          = v.getAggrPos();
+        int aggrPos = v.getAggrPos();
         List<Field> fieldsVarLen = v.getFields();
-        AggregationBuffer    agg = aggs[aggrPos];
+        AggregationBuffer agg = aggs[aggrPos];
 
-        try 
-        {
-          for (Field f : fieldsVarLen)
-            totalVariableSize += ((String)f.get(agg)).length();
+        try {
+          for (Field f : fieldsVarLen) {
+            totalVariableSize += ((String) f.get(agg)).length();
+          }
         } catch (IllegalAccessException e) {
           assert false;
         }
@@ -678,24 +771,26 @@
       numEntriesVarSize++;
 
       // Update the number of entries that can fit in the hash table
-      numEntriesHashTable = (int)(maxHashTblMemory / (fixedRowSize + ((int)totalVariableSize/numEntriesVarSize)));
-      LOG.trace("Hash Aggr: #hash table = " + numEntries + " #max in hash table = " + numEntriesHashTable);
+      numEntriesHashTable = (int) (maxHashTblMemory / (fixedRowSize + (totalVariableSize / numEntriesVarSize)));
+      LOG.trace("Hash Aggr: #hash table = " + numEntries
+          + " #max in hash table = " + numEntriesHashTable);
     }
 
     // flush if necessary
-    if (numEntries >= numEntriesHashTable)
+    if (numEntries >= numEntriesHashTable) {
       return true;
+    }
     return false;
   }
 
   private void flush(boolean complete) throws HiveException {
-    
+
     // Currently, the algorithm flushes 10% of the entries - this can be
     // changed in the future
 
     if (complete) {
-      Iterator<Map.Entry<KeyWrapper, AggregationBuffer[]>> 
-          iter = hashAggregations.entrySet().iterator();
+      Iterator<Map.Entry<KeyWrapper, AggregationBuffer[]>> iter = hashAggregations
+          .entrySet().iterator();
       while (iter.hasNext()) {
         Map.Entry<KeyWrapper, AggregationBuffer[]> m = iter.next();
         forward(m.getKey().keys, m.getValue());
@@ -708,8 +803,8 @@
 
     int oldSize = hashAggregations.size();
     LOG.warn("Hash Tbl flush: #hash table = " + oldSize);
-    Iterator<Map.Entry<KeyWrapper, AggregationBuffer[]>>
-        iter = hashAggregations.entrySet().iterator();
+    Iterator<Map.Entry<KeyWrapper, AggregationBuffer[]>> iter = hashAggregations
+        .entrySet().iterator();
     int numDel = 0;
     while (iter.hasNext()) {
       Map.Entry<KeyWrapper, AggregationBuffer[]> m = iter.next();
@@ -732,24 +827,27 @@
    *          The keys in the record
    * @throws HiveException
    */
-  protected void forward(ArrayList<Object> keys, AggregationBuffer[] aggs) throws HiveException {
+  protected void forward(ArrayList<Object> keys, AggregationBuffer[] aggs)
+      throws HiveException {
     int totalFields = keys.size() + aggs.length;
     if (forwardCache == null) {
       forwardCache = new Object[totalFields];
     }
-    for(int i=0; i<keys.size(); i++) {
+    for (int i = 0; i < keys.size(); i++) {
       forwardCache[i] = keys.get(i);
     }
-    for(int i=0; i<aggs.length; i++) {
-      forwardCache[keys.size() + i] = aggregationEvaluators[i].evaluate(aggs[i]);
+    for (int i = 0; i < aggs.length; i++) {
+      forwardCache[keys.size() + i] = aggregationEvaluators[i]
+          .evaluate(aggs[i]);
     }
     forward(forwardCache, outputObjInspector);
   }
-  
+
   /**
    * We need to forward all the aggregations to children.
    * 
    */
+  @Override
   public void closeOp(boolean abort) throws HiveException {
     if (!abort) {
       try {
@@ -758,38 +856,40 @@
           firstRow = false;
 
           // There is no grouping key - simulate a null row
-          // This is based on the assumption that a null row is ignored by aggregation functions
-          for(int ai=0; ai<aggregations.length; ai++) {
-            // Calculate the parameters 
+          // This is based on the assumption that a null row is ignored by
+          // aggregation functions
+          for (int ai = 0; ai < aggregations.length; ai++) {
+            // Calculate the parameters
             Object[] o = new Object[aggregationParameterFields[ai].length];
-            for(int pi=0; pi<aggregationParameterFields[ai].length; pi++) { 
+            for (int pi = 0; pi < aggregationParameterFields[ai].length; pi++) {
               o[pi] = null;
             }
             aggregationEvaluators[ai].aggregate(aggregations[ai], o);
           }
-          
+
           // create dummy keys - size 0
           forward(new ArrayList<Object>(0), aggregations);
-        }
-        else {
+        } else {
           if (hashAggregations != null) {
-            LOG.warn("Begin Hash Table flush at close: size = " + hashAggregations.size());
+            LOG.warn("Begin Hash Table flush at close: size = "
+                + hashAggregations.size());
             Iterator iter = hashAggregations.entrySet().iterator();
             while (iter.hasNext()) {
-              Map.Entry<KeyWrapper, AggregationBuffer[]> m = (Map.Entry)iter.next();
+              Map.Entry<KeyWrapper, AggregationBuffer[]> m = (Map.Entry) iter
+                  .next();
               forward(m.getKey().keys, m.getValue());
               iter.remove();
             }
             hashAggregations.clear();
-          }
-          else if (aggregations != null) {
+          } else if (aggregations != null) {
             // sort-based aggregations
             if (currentKeys != null) {
               forward(currentKeys, aggregations);
             }
             currentKeys = null;
           } else {
-            // The GroupByOperator is not initialized, which means there is no data
+            // The GroupByOperator is not initialized, which means there is no
+            // data
             // (since we initialize the operators when we see the first record).
             // Just do nothing here.
           }
@@ -802,17 +902,20 @@
   }
 
   // Group by contains the columns needed - no need to aggregate from children
-  public List<String> genColLists(HashMap<Operator<? extends Serializable>, OpParseContext> opParseCtx) {
+  public List<String> genColLists(
+      HashMap<Operator<? extends Serializable>, OpParseContext> opParseCtx) {
     List<String> colLists = new ArrayList<String>();
     ArrayList<exprNodeDesc> keys = conf.getKeys();
-    for (exprNodeDesc key : keys)
+    for (exprNodeDesc key : keys) {
       colLists = Utilities.mergeUniqElems(colLists, key.getCols());
-    
+    }
+
     ArrayList<aggregationDesc> aggrs = conf.getAggregators();
-    for (aggregationDesc aggr : aggrs) { 
+    for (aggregationDesc aggr : aggrs) {
       ArrayList<exprNodeDesc> params = aggr.getParameters();
-      for (exprNodeDesc param : params) 
+      for (exprNodeDesc param : params) {
         colLists = Utilities.mergeUniqElems(colLists, param.getCols());
+      }
     }
 
     return colLists;
@@ -825,7 +928,8 @@
   public String getName() {
     return new String("GBY");
   }
-  
+
+  @Override
   public int getType() {
     return OperatorType.GROUPBY;
   }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JobTrackerURLResolver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JobTrackerURLResolver.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JobTrackerURLResolver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JobTrackerURLResolver.java Thu Jan 21 10:37:58 2010
@@ -34,10 +34,9 @@
     InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
     int infoPort = infoSocAddr.getPort();
 
-    String tracker = "http://" +
-                     JobTracker.getAddress(conf).getHostName() + ":" +
-                     infoPort;
-    
+    String tracker = "http://" + JobTracker.getAddress(conf).getHostName()
+        + ":" + infoPort;
+
     return tracker;
   }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java Thu Jan 21 10:37:58 2010
@@ -33,132 +33,148 @@
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 
-
 /**
  * Join operator implementation.
  */
-public class JoinOperator extends CommonJoinOperator<joinDesc> implements Serializable {
+public class JoinOperator extends CommonJoinOperator<joinDesc> implements
+    Serializable {
   private static final long serialVersionUID = 1L;
-  
+
   private transient SkewJoinHandler skewJoinKeyContext = null;
-  
+
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     super.initializeOp(hconf);
     initializeChildren(hconf);
-    if(this.handleSkewJoin) {
+    if (handleSkewJoin) {
       skewJoinKeyContext = new SkewJoinHandler(this);
       skewJoinKeyContext.initiliaze(hconf);
     }
   }
-  
-  public void processOp(Object row, int tag)
-      throws HiveException {
+
+  @Override
+  public void processOp(Object row, int tag) throws HiveException {
     try {
-      
+
       // get alias
-      alias = (byte)tag;
-    
-      if ((lastAlias == null) || (!lastAlias.equals(alias)))
+      alias = (byte) tag;
+
+      if ((lastAlias == null) || (!lastAlias.equals(alias))) {
         nextSz = joinEmitInterval;
-      
-      ArrayList<Object> nr = computeValues(row, joinValues.get(alias), joinValuesObjectInspectors.get(alias));
-      
-      if(this.handleSkewJoin)
+      }
+
+      ArrayList<Object> nr = computeValues(row, joinValues.get(alias),
+          joinValuesObjectInspectors.get(alias));
+
+      if (handleSkewJoin) {
         skewJoinKeyContext.handleSkew(tag);
-      
+      }
+
       // number of rows for the key in the given table
       int sz = storage.get(alias).size();
-    
+
       // Are we consuming too much memory
       if (alias == numAliases - 1) {
         if (sz == joinEmitInterval) {
-          // The input is sorted by alias, so if we are already in the last join operand,
+          // The input is sorted by alias, so if we are already in the last join
+          // operand,
           // we can emit some results now.
-          // Note this has to be done before adding the current row to the storage,
+          // Note this has to be done before adding the current row to the
+          // storage,
           // to preserve the correctness for outer joins.
           checkAndGenObject();
           storage.get(alias).clear();
         }
       } else {
         if (sz == nextSz) {
-          // Output a warning if we reached at least 1000 rows for a join operand
+          // Output a warning if we reached at least 1000 rows for a join
+          // operand
           // We won't output a warning for the last join operand since the size
           // will never goes to joinEmitInterval.
-          StructObjectInspector soi = (StructObjectInspector)inputObjInspectors[tag];
-          StructField sf = soi.getStructFieldRef(Utilities.ReduceField.KEY.toString());
+          StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[tag];
+          StructField sf = soi.getStructFieldRef(Utilities.ReduceField.KEY
+              .toString());
           Object keyObject = soi.getStructFieldData(row, sf);
-          LOG.warn("table " + alias + " has " + sz + " rows for join key " + keyObject);
+          LOG.warn("table " + alias + " has " + sz + " rows for join key "
+              + keyObject);
           nextSz = getNextSize(nextSz);
         }
       }
-    
+
       // Add the value to the vector
       storage.get(alias).add(nr);
-    
+
     } catch (Exception e) {
       e.printStackTrace();
       throw new HiveException(e);
     }
   }
-  
+
+  @Override
   public int getType() {
     return OperatorType.JOIN;
   }
 
   /**
    * All done
-   *
+   * 
    */
+  @Override
   public void closeOp(boolean abort) throws HiveException {
-    if (this.handleSkewJoin) {
+    if (handleSkewJoin) {
       skewJoinKeyContext.close(abort);
     }
     super.closeOp(abort);
   }
-  
+
   @Override
-  public void jobClose(Configuration hconf, boolean success) throws HiveException {
-    if(this.handleSkewJoin) {
+  public void jobClose(Configuration hconf, boolean success)
+      throws HiveException {
+    if (handleSkewJoin) {
       try {
         for (int i = 0; i < numAliases; i++) {
-          String specPath = this.conf.getBigKeysDirMap().get((byte)i);
+          String specPath = conf.getBigKeysDirMap().get((byte) i);
           FileSinkOperator.mvFileToFinalPath(specPath, hconf, success, LOG);
           for (int j = 0; j < numAliases; j++) {
-            if(j == i) continue;
-            specPath = getConf().getSmallKeysDirMap().get((byte)i).get((byte)j);
+            if (j == i) {
+              continue;
+            }
+            specPath = getConf().getSmallKeysDirMap().get((byte) i).get(
+                (byte) j);
             FileSinkOperator.mvFileToFinalPath(specPath, hconf, success, LOG);
           }
         }
-        
-        if(success) {
-          //move up files
+
+        if (success) {
+          // move up files
           for (int i = 0; i < numAliases; i++) {
-            String specPath = this.conf.getBigKeysDirMap().get((byte)i);
+            String specPath = conf.getBigKeysDirMap().get((byte) i);
             moveUpFiles(specPath, hconf, LOG);
             for (int j = 0; j < numAliases; j++) {
-              if(j == i) continue;
-              specPath = getConf().getSmallKeysDirMap().get((byte)i).get((byte)j);
+              if (j == i) {
+                continue;
+              }
+              specPath = getConf().getSmallKeysDirMap().get((byte) i).get(
+                  (byte) j);
               moveUpFiles(specPath, hconf, LOG);
             }
           }
         }
       } catch (IOException e) {
-        throw new HiveException (e);
+        throw new HiveException(e);
       }
     }
     super.jobClose(hconf, success);
   }
-  
-  
-  
-  private void moveUpFiles(String specPath, Configuration hconf, Log log) throws IOException, HiveException {
+
+  private void moveUpFiles(String specPath, Configuration hconf, Log log)
+      throws IOException, HiveException {
     FileSystem fs = (new Path(specPath)).getFileSystem(hconf);
     Path finalPath = new Path(specPath);
-    
-    if(fs.exists(finalPath)) {
+
+    if (fs.exists(finalPath)) {
       FileStatus[] taskOutputDirs = fs.listStatus(finalPath);
-      if(taskOutputDirs != null ) {
+      if (taskOutputDirs != null) {
         for (FileStatus dir : taskOutputDirs) {
           Utilities.renameOrMoveFiles(fs, dir.getPath(), finalPath);
           fs.delete(dir.getPath(), true);
@@ -166,27 +182,26 @@
       }
     }
   }
-  
+
   /**
    * Forward a record of join results.
-   *
+   * 
    * @throws HiveException
    */
+  @Override
   public void endGroup() throws HiveException {
-    //if this is a skew key, we need to handle it in a separate map reduce job.
-    if(this.handleSkewJoin && skewJoinKeyContext.currBigKeyTag >=0) {
+    // if this is a skew key, we need to handle it in a separate map reduce job.
+    if (handleSkewJoin && skewJoinKeyContext.currBigKeyTag >= 0) {
       try {
         skewJoinKeyContext.endGroup();
       } catch (IOException e) {
-        LOG.error(e.getMessage(),e);
+        LOG.error(e.getMessage(), e);
         throw new HiveException(e);
       }
       return;
-    }
-    else {
+    } else {
       checkAndGenObject();
     }
   }
-  
-}
 
+}

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java Thu Jan 21 10:37:58 2010
@@ -18,13 +18,9 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
-import java.util.HashMap;
-import java.util.List;
 import java.util.ArrayList;
-import java.util.Map;
+import java.util.List;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.lateralViewJoinDesc;
@@ -33,39 +29,29 @@
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 
-
 /**
  * The lateral view join operator is used to implement the lateral view
- * functionality. This operator was implemented with the following
- * operator DAG in mind. For a query such as
+ * functionality. This operator was implemented with the following operator DAG
+ * in mind. For a query such as
  * 
- * SELECT pageid, adid.* FROM example_table LATERAL VIEW explode(adid_list) AS adid
+ * SELECT pageid, adid.* FROM example_table LATERAL VIEW explode(adid_list) AS
+ * adid
  * 
  * The top of the operator tree will look similar to
  * 
- *           [Table Scan]
- *              /   \
- *   [Select](*)    [Select](adid_list)
- *            |      |
- *            |     [UDTF] (explode)
- *            \     /
- *      [Lateral View Join]
- *               |
- *               |
- *      [Select] (pageid, adid.*)
- *               |
- *              ....
- * 
- * Rows from the table scan operator are first sent to two select operators.
- * The select operator on the left picks all the columns while the select
- * operator on the right picks only the columns needed by the UDTF.
+ * [Table Scan] / \ [Select](*) [Select](adid_list) | | | [UDTF] (explode) \ /
+ * [Lateral View Join] | | [Select] (pageid, adid.*) | ....
  * 
- * The output of select in the left branch and output of the UDTF in the right 
+ * Rows from the table scan operator are first sent to two select operators. The
+ * select operator on the left picks all the columns while the select operator
+ * on the right picks only the columns needed by the UDTF.
+ * 
+ * The output of select in the left branch and output of the UDTF in the right
  * branch are then sent to the lateral view join (LVJ). In most cases, the UDTF
  * will generate > 1 row for every row received from the TS, while the left
- * select operator will generate only one. For each row output from the TS,
- * the LVJ outputs all possible rows that can be created by joining the row from
- * the left select and one of the rows output from the UDTF.
+ * select operator will generate only one. For each row output from the TS, the
+ * LVJ outputs all possible rows that can be created by joining the row from the
+ * left select and one of the rows output from the UDTF.
  * 
  * Additional lateral views can be supported by adding a similar DAG after the
  * previous LVJ operator.
@@ -78,41 +64,41 @@
   // The expected tags from the parent operators. See processOp() before
   // changing the tags.
   static final int SELECT_TAG = 0;
-  static final int UDTF_TAG   = 1;
-  
+  static final int UDTF_TAG = 1;
+
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
-    
+
     ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
     ArrayList<String> fieldNames = conf.getOutputInternalColNames();
 
     // The output of the lateral view join will be the columns from the select
     // parent, followed by the column from the UDTF parent
-    StructObjectInspector soi = 
-      (StructObjectInspector) inputObjInspectors[SELECT_TAG];
+    StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[SELECT_TAG];
     List<? extends StructField> sfs = soi.getAllStructFieldRefs();
     for (StructField sf : sfs) {
       ois.add(sf.getFieldObjectInspector());
     }
-    
+
     soi = (StructObjectInspector) inputObjInspectors[UDTF_TAG];
     sfs = soi.getAllStructFieldRefs();
     for (StructField sf : sfs) {
       ois.add(sf.getFieldObjectInspector());
     }
 
-    outputObjInspector = 
-      ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, ois);
+    outputObjInspector = ObjectInspectorFactory
+        .getStandardStructObjectInspector(fieldNames, ois);
 
     // Initialize the rest of the operator DAG
     super.initializeOp(hconf);
   }
-  
+
   // acc is short for accumulator. It's used to build the row before forwarding
   ArrayList<Object> acc = new ArrayList<Object>();
   // selectObjs hold the row from the select op, until receiving a row from
   // the udtf op
   ArrayList<Object> selectObjs = new ArrayList<Object>();
+
   /**
    * An important assumption for processOp() is that for a given row from the
    * TS, the LVJ will first get the row from the left select operator, followed
@@ -120,7 +106,7 @@
    */
   @Override
   public void processOp(Object row, int tag) throws HiveException {
-    StructObjectInspector soi = (StructObjectInspector)inputObjInspectors[tag];
+    StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[tag];
     if (tag == SELECT_TAG) {
       selectObjs.clear();
       selectObjs.addAll(soi.getStructFieldsDataAsList(row));
@@ -132,7 +118,7 @@
     } else {
       throw new HiveException("Invalid tag");
     }
-    
+
   }
 
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java Thu Jan 21 10:37:58 2010
@@ -26,36 +26,39 @@
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 
 /**
- * Limit operator implementation
- * Limits the number of rows to be passed on.
+ * Limit operator implementation Limits the number of rows to be passed on.
  **/
 public class LimitOperator extends Operator<limitDesc> implements Serializable {
   private static final long serialVersionUID = 1L;
-  
+
   transient protected int limit;
   transient protected int currCount;
 
+  @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     super.initializeOp(hconf);
     limit = conf.getLimit();
     currCount = 0;
   }
 
+  @Override
   public void processOp(Object row, int tag) throws HiveException {
     if (currCount < limit) {
       forward(row, inputObjInspectors[tag]);
       currCount++;
-    }
-    else
+    } else {
       setDone(true);
+    }
   }
-  
+
+  @Override
   public String getName() {
     return "LIM";
   }
-  
+
+  @Override
   public int getType() {
     return OperatorType.LIMIT;
   }
-  
+
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Thu Jan 21 10:37:58 2010
@@ -29,6 +29,10 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectKey;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectValue;
+import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.mapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.tableDesc;
@@ -42,17 +46,15 @@
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectKey;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectValue;
-import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
-import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
 
 /**
  * Map side Join operator implementation.
  */
-public class MapJoinOperator extends CommonJoinOperator<mapJoinDesc> implements Serializable {
+public class MapJoinOperator extends CommonJoinOperator<mapJoinDesc> implements
+    Serializable {
   private static final long serialVersionUID = 1L;
-  static final private Log LOG = LogFactory.getLog(MapJoinOperator.class.getName());
+  static final private Log LOG = LogFactory.getLog(MapJoinOperator.class
+      .getName());
 
   /**
    * The expressions for join inputs's join keys.
@@ -67,21 +69,23 @@
    */
   transient protected Map<Byte, List<ObjectInspector>> joinKeysStandardObjectInspectors;
 
-  transient private int posBigTable;       // one of the tables that is not in memory
-  transient int mapJoinRowsKey;            // rows for a given key
+  transient private int posBigTable; // one of the tables that is not in memory
+  transient int mapJoinRowsKey; // rows for a given key
 
   transient protected Map<Byte, HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue>> mapJoinTables;
-  
+
   transient protected RowContainer<ArrayList<Object>> emptyList = null;
-  
-  transient static final private String[] fatalErrMsg = { 
-    null,  // counter value 0 means no error
-    "Mapside join size exceeds hive.mapjoin.maxsize. Please increase that or remove the mapjoin hint." // counter value 1
+
+  transient static final private String[] fatalErrMsg = {
+      null, // counter value 0 means no error
+      "Mapside join size exceeds hive.mapjoin.maxsize. Please increase that or remove the mapjoin hint." // counter
+                                                                                                         // value
+                                                                                                         // 1
   };
-  
+
   public static class MapJoinObjectCtx {
     ObjectInspector standardOI;
-    SerDe      serde;
+    SerDe serde;
     tableDesc tblDesc;
     Configuration conf;
 
@@ -89,7 +93,8 @@
      * @param standardOI
      * @param serde
      */
-    public MapJoinObjectCtx(ObjectInspector standardOI, SerDe serde, tableDesc tblDesc, Configuration conf) {
+    public MapJoinObjectCtx(ObjectInspector standardOI, SerDe serde,
+        tableDesc tblDesc, Configuration conf) {
       this.standardOI = standardOI;
       this.serde = serde;
       this.tblDesc = tblDesc;
@@ -129,12 +134,12 @@
 
   transient boolean firstRow;
 
-  transient int   metadataKeyTag;
+  transient int metadataKeyTag;
   transient int[] metadataValueTag;
   transient List<File> hTables;
-  transient int      numMapRowsRead;
-  transient int      heartbeatInterval;
-  transient int      maxMapJoinSize;
+  transient int numMapRowsRead;
+  transient int heartbeatInterval;
+  transient int maxMapJoinSize;
 
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
@@ -142,44 +147,53 @@
     numMapRowsRead = 0;
 
     firstRow = true;
-    heartbeatInterval = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVESENDHEARTBEAT);
-    maxMapJoinSize = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEMAXMAPJOINSIZE);
-    
-    joinKeys  = new HashMap<Byte, List<ExprNodeEvaluator>>();
-    
+    heartbeatInterval = HiveConf.getIntVar(hconf,
+        HiveConf.ConfVars.HIVESENDHEARTBEAT);
+    maxMapJoinSize = HiveConf.getIntVar(hconf,
+        HiveConf.ConfVars.HIVEMAXMAPJOINSIZE);
+
+    joinKeys = new HashMap<Byte, List<ExprNodeEvaluator>>();
+
     populateJoinKeyValue(joinKeys, conf.getKeys());
-    joinKeysObjectInspectors = getObjectInspectorsFromEvaluators(joinKeys, inputObjInspectors);
+    joinKeysObjectInspectors = getObjectInspectorsFromEvaluators(joinKeys,
+        inputObjInspectors);
     joinKeysStandardObjectInspectors = getStandardObjectInspectors(joinKeysObjectInspectors);
-    
+
     // all other tables are small, and are cached in the hash table
     posBigTable = conf.getPosBigTable();
-    
+
     metadataValueTag = new int[numAliases];
-    for (int pos = 0; pos < numAliases; pos++)
+    for (int pos = 0; pos < numAliases; pos++) {
       metadataValueTag[pos] = -1;
-    
+    }
+
     mapJoinTables = new HashMap<Byte, HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue>>();
     hTables = new ArrayList<File>();
-    
+
     // initialize the hash tables for other tables
     for (int pos = 0; pos < numAliases; pos++) {
-      if (pos == posBigTable)
+      if (pos == posBigTable) {
         continue;
-      
-      int cacheSize = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEMAPJOINCACHEROWS);
-      HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue> hashTable = 
-        new HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue>(cacheSize);
-      
-      mapJoinTables.put(Byte.valueOf((byte)pos), hashTable);
+      }
+
+      int cacheSize = HiveConf.getIntVar(hconf,
+          HiveConf.ConfVars.HIVEMAPJOINCACHEROWS);
+      HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue> hashTable = new HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue>(
+          cacheSize);
+
+      mapJoinTables.put(Byte.valueOf((byte) pos), hashTable);
     }
-    
+
     emptyList = new RowContainer<ArrayList<Object>>(1, hconf);
-    RowContainer bigPosRC = getRowContainer(hconf, (byte)posBigTable, order[posBigTable], joinCacheSize);
-    storage.put((byte)posBigTable, bigPosRC);
-    
-    mapJoinRowsKey = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEMAPJOINROWSIZE);
-    
-    List<? extends StructField> structFields = ((StructObjectInspector)outputObjInspector).getAllStructFieldRefs();
+    RowContainer bigPosRC = getRowContainer(hconf, (byte) posBigTable,
+        order[posBigTable], joinCacheSize);
+    storage.put((byte) posBigTable, bigPosRC);
+
+    mapJoinRowsKey = HiveConf.getIntVar(hconf,
+        HiveConf.ConfVars.HIVEMAPJOINROWSIZE);
+
+    List<? extends StructField> structFields = ((StructObjectInspector) outputObjInspector)
+        .getAllStructFieldRefs();
     if (conf.getOutputColumnNames().size() < structFields.size()) {
       List<ObjectInspector> structFieldObjectInspectors = new ArrayList<ObjectInspector>();
       for (Byte alias : order) {
@@ -188,34 +202,37 @@
         for (int i = 0; i < sz; i++) {
           int pos = retained.get(i);
           structFieldObjectInspectors.add(structFields.get(pos)
-                                          .getFieldObjectInspector());
+              .getFieldObjectInspector());
         }
       }
       outputObjInspector = ObjectInspectorFactory
-        .getStandardStructObjectInspector(conf.getOutputColumnNames(),
-                                          structFieldObjectInspectors);
+          .getStandardStructObjectInspector(conf.getOutputColumnNames(),
+              structFieldObjectInspectors);
     }
     initializeChildren(hconf);
   }
-  
+
   @Override
   protected void fatalErrorMessage(StringBuffer errMsg, long counterCode) {
-    errMsg.append("Operator " + getOperatorId() + " (id=" + id + "): " + 
-        fatalErrMsg[(int)counterCode]);
+    errMsg.append("Operator " + getOperatorId() + " (id=" + id + "): "
+        + fatalErrMsg[(int) counterCode]);
   }
-  
+
   @Override
   public void processOp(Object row, int tag) throws HiveException {
     try {
       // get alias
-      alias = (byte)tag;
-      
-      if ((lastAlias == null) || (!lastAlias.equals(alias)))
+      alias = (byte) tag;
+
+      if ((lastAlias == null) || (!lastAlias.equals(alias))) {
         nextSz = joinEmitInterval;
-      
+      }
+
       // compute keys and values as StandardObjects
-      ArrayList<Object> key   = computeValues(row, joinKeys.get(alias), joinKeysObjectInspectors.get(alias));
-      ArrayList<Object> value = computeValues(row, joinValues.get(alias), joinValuesObjectInspectors.get(alias));
+      ArrayList<Object> key = computeValues(row, joinKeys.get(alias),
+          joinKeysObjectInspectors.get(alias));
+      ArrayList<Object> value = computeValues(row, joinValues.get(alias),
+          joinValuesObjectInspectors.get(alias));
 
       // does this source need to be stored in the hash map
       if (tag != posBigTable) {
@@ -223,79 +240,90 @@
           metadataKeyTag = nextVal++;
 
           tableDesc keyTableDesc = conf.getKeyTblDesc();
-          SerDe keySerializer = (SerDe)ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(), null);
+          SerDe keySerializer = (SerDe) ReflectionUtils.newInstance(
+              keyTableDesc.getDeserializerClass(), null);
           keySerializer.initialize(null, keyTableDesc.getProperties());
 
           mapMetadata.put(Integer.valueOf(metadataKeyTag),
               new MapJoinObjectCtx(
-                  ObjectInspectorUtils.getStandardObjectInspector(keySerializer.getObjectInspector(),
-                      ObjectInspectorCopyOption.WRITABLE),
-                  keySerializer, keyTableDesc, hconf));
+                  ObjectInspectorUtils
+                      .getStandardObjectInspector(keySerializer
+                          .getObjectInspector(),
+                          ObjectInspectorCopyOption.WRITABLE), keySerializer,
+                  keyTableDesc, hconf));
 
           firstRow = false;
         }
 
         // Send some status periodically
         numMapRowsRead++;
-        if (((numMapRowsRead % heartbeatInterval) == 0) && (reporter != null))
+        if (((numMapRowsRead % heartbeatInterval) == 0) && (reporter != null)) {
           reporter.progress();
-        
-        if ( (numMapRowsRead > maxMapJoinSize) && (reporter != null) && (counterNameToEnum != null)) {
+        }
+
+        if ((numMapRowsRead > maxMapJoinSize) && (reporter != null)
+            && (counterNameToEnum != null)) {
           // update counter
-          LOG.warn("Too many rows in map join tables. Fatal error counter will be incremented!!");
+          LOG
+              .warn("Too many rows in map join tables. Fatal error counter will be incremented!!");
           incrCounter(fatalErrorCntr, 1);
           fatalError = true;
           return;
         }
-          
 
-        HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue> hashTable =  mapJoinTables.get(alias);
+        HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue> hashTable = mapJoinTables
+            .get(alias);
         MapJoinObjectKey keyMap = new MapJoinObjectKey(metadataKeyTag, key);
         MapJoinObjectValue o = hashTable.get(keyMap);
         RowContainer res = null;
 
         boolean needNewKey = true;
         if (o == null) {
-          res = getRowContainer(this.hconf, (byte)tag, order[tag], joinCacheSize);
-        	res.add(value);
+          res = getRowContainer(hconf, (byte) tag, order[tag], joinCacheSize);
+          res.add(value);
         } else {
           res = o.getObj();
           res.add(value);
-          // If key already exists, HashMapWrapper.get() guarantees it is already in main memory HashMap
-          // cache. So just replacing the object value should update the HashMapWrapper. This will save
-          // the cost of constructing the new key/object and deleting old one and inserting the new one.
-          if ( hashTable.cacheSize() > 0) {
+          // If key already exists, HashMapWrapper.get() guarantees it is
+          // already in main memory HashMap
+          // cache. So just replacing the object value should update the
+          // HashMapWrapper. This will save
+          // the cost of constructing the new key/object and deleting old one
+          // and inserting the new one.
+          if (hashTable.cacheSize() > 0) {
             o.setObj(res);
             needNewKey = false;
-          } 
+          }
         }
-        
-        
+
         if (metadataValueTag[tag] == -1) {
           metadataValueTag[tag] = nextVal++;
-          
+
           tableDesc valueTableDesc = conf.getValueTblDescs().get(tag);
-          SerDe valueSerDe = (SerDe)ReflectionUtils.newInstance(valueTableDesc.getDeserializerClass(), null);
+          SerDe valueSerDe = (SerDe) ReflectionUtils.newInstance(valueTableDesc
+              .getDeserializerClass(), null);
           valueSerDe.initialize(null, valueTableDesc.getProperties());
-          
+
           mapMetadata.put(Integer.valueOf(metadataValueTag[tag]),
-                          new MapJoinObjectCtx(
-                                ObjectInspectorUtils.getStandardObjectInspector(valueSerDe.getObjectInspector(),
-                                  ObjectInspectorCopyOption.WRITABLE),
-                                valueSerDe, valueTableDesc, hconf));
+              new MapJoinObjectCtx(ObjectInspectorUtils
+                  .getStandardObjectInspector(valueSerDe.getObjectInspector(),
+                      ObjectInspectorCopyOption.WRITABLE), valueSerDe,
+                  valueTableDesc, hconf));
         }
-        
+
         // Construct externalizable objects for key and value
-        if ( needNewKey ) {
+        if (needNewKey) {
           MapJoinObjectKey keyObj = new MapJoinObjectKey(metadataKeyTag, key);
-	        MapJoinObjectValue valueObj = new MapJoinObjectValue(metadataValueTag[tag], res);
-	        valueObj.setConf(this.hconf); 
+          MapJoinObjectValue valueObj = new MapJoinObjectValue(
+              metadataValueTag[tag], res);
+          valueObj.setConf(hconf);
           // This may potentially increase the size of the hashmap on the mapper
-  	      if (res.size() > mapJoinRowsKey) {
-            if ( res.size() % 100 == 0 ) {
-    	        LOG.warn("Number of values for a given key " + keyObj + " are " + res.size());
+          if (res.size() > mapJoinRowsKey) {
+            if (res.size() % 100 == 0) {
+              LOG.warn("Number of values for a given key " + keyObj + " are "
+                  + res.size());
               LOG.warn("used memory " + Runtime.getRuntime().totalMemory());
-      	    }
+            }
           }
           hashTable.put(keyObj, valueObj);
         }
@@ -308,15 +336,15 @@
       for (Byte pos : order) {
         if (pos.intValue() != tag) {
           MapJoinObjectKey keyMap = new MapJoinObjectKey(metadataKeyTag, key);
-          MapJoinObjectValue o = (MapJoinObjectValue)mapJoinTables.get(pos).get(keyMap);
+          MapJoinObjectValue o = mapJoinTables.get(pos).get(keyMap);
 
           if (o == null) {
-            if(noOuterJoin)
+            if (noOuterJoin) {
               storage.put(pos, emptyList);
-            else
+            } else {
               storage.put(pos, dummyObjVectors[pos.intValue()]);
-          }
-          else {
+            }
+          } else {
             storage.put(pos, o.getObj());
           }
         }
@@ -328,30 +356,37 @@
       // done with the row
       storage.get(alias).clear();
 
-      for (Byte pos : order)
-        if (pos.intValue() != tag)
+      for (Byte pos : order) {
+        if (pos.intValue() != tag) {
           storage.put(pos, null);
+        }
+      }
 
     } catch (SerDeException e) {
       e.printStackTrace();
       throw new HiveException(e);
     }
   }
-  
+
+  @Override
   public void closeOp(boolean abort) throws HiveException {
-    for (HashMapWrapper hashTable: mapJoinTables.values()) {
+    for (HashMapWrapper hashTable : mapJoinTables.values()) {
       hashTable.close();
     }
     super.closeOp(abort);
   }
+
   /**
    * Implements the getName function for the Node Interface.
+   * 
    * @return the name of the operator
    */
+  @Override
   public String getName() {
     return "MAPJOIN";
   }
 
+  @Override
   public int getType() {
     return OperatorType.MAPJOIN;
   }