You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2010/01/21 11:38:15 UTC
svn commit: r901644 [6/37] - in /hadoop/hive/trunk: ./
ql/src/java/org/apache/hadoop/hive/ql/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/
ql/src/java/org/apache/hadoop/hive/ql/history/ ql/src/java...
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Thu Jan 21 10:37:58 2010
@@ -18,109 +18,115 @@
package org.apache.hadoop.hive.ql.exec;
+import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.ArrayList;
import java.util.HashSet;
-import java.util.List;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
-import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.lang.IllegalAccessException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.OpParseContext;
import org.apache.hadoop.hive.ql.plan.aggregationDesc;
import org.apache.hadoop.hive.ql.plan.exprNodeDesc;
import org.apache.hadoop.hive.ql.plan.groupByDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.parse.OpParseContext;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
-import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.Text;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
/**
* GroupBy operator implementation.
*/
-public class GroupByOperator extends Operator <groupByDesc> implements Serializable {
+public class GroupByOperator extends Operator<groupByDesc> implements
+ Serializable {
- static final private Log LOG = LogFactory.getLog(GroupByOperator.class.getName());
+ static final private Log LOG = LogFactory.getLog(GroupByOperator.class
+ .getName());
private static final long serialVersionUID = 1L;
- private static final int NUMROWSESTIMATESIZE = 1000;
+ private static final int NUMROWSESTIMATESIZE = 1000;
transient protected ExprNodeEvaluator[] keyFields;
transient protected ObjectInspector[] keyObjectInspectors;
transient protected Object[] keyObjects;
-
+
transient protected ExprNodeEvaluator[][] aggregationParameterFields;
transient protected ObjectInspector[][] aggregationParameterObjectInspectors;
transient protected ObjectInspector[][] aggregationParameterStandardObjectInspectors;
transient protected Object[][] aggregationParameterObjects;
- // In the future, we may allow both count(DISTINCT a) and sum(DISTINCT a) in the same SQL clause,
- // so aggregationIsDistinct is a boolean array instead of a single number.
+ // In the future, we may allow both count(DISTINCT a) and sum(DISTINCT a) in
+ // the same SQL clause,
+ // so aggregationIsDistinct is a boolean array instead of a single number.
transient protected boolean[] aggregationIsDistinct;
transient GenericUDAFEvaluator[] aggregationEvaluators;
-
+
transient protected ArrayList<ObjectInspector> objectInspectors;
transient ArrayList<String> fieldNames;
- // Used by sort-based GroupBy: Mode = COMPLETE, PARTIAL1, PARTIAL2, MERGEPARTIAL
+ // Used by sort-based GroupBy: Mode = COMPLETE, PARTIAL1, PARTIAL2,
+ // MERGEPARTIAL
transient protected ArrayList<Object> currentKeys;
- transient protected ArrayList<Object> newKeys;
+ transient protected ArrayList<Object> newKeys;
transient protected AggregationBuffer[] aggregations;
transient protected Object[][] aggregationsParametersLastInvoke;
// Used by hash-based GroupBy: Mode = HASH, PARTIALS
transient protected HashMap<KeyWrapper, AggregationBuffer[]> hashAggregations;
-
+
// Used by hash distinct aggregations when hashGrpKeyNotRedKey is true
transient protected HashSet<ArrayList<Object>> keysCurrentGroup;
-
+
transient boolean bucketGroup;
-
+
transient boolean firstRow;
- transient long totalMemory;
+ transient long totalMemory;
transient boolean hashAggr;
- // The reduction is happening on the reducer, and the grouping key and reduction keys are different.
+ // The reduction is happening on the reducer, and the grouping key and
+ // reduction keys are different.
// For example: select a, count(distinct b) from T group by a
// The data is sprayed by 'b' and the reducer is grouping it by 'a'
- transient boolean groupKeyIsNotReduceKey;
+ transient boolean groupKeyIsNotReduceKey;
transient boolean firstRowInGroup;
- transient long numRowsInput;
- transient long numRowsHashTbl;
- transient int groupbyMapAggrInterval;
- transient long numRowsCompareHashAggr;
- transient float minReductionHashAggr;
+ transient long numRowsInput;
+ transient long numRowsHashTbl;
+ transient int groupbyMapAggrInterval;
+ transient long numRowsCompareHashAggr;
+ transient float minReductionHashAggr;
// current Key ObjectInspectors are standard ObjectInspectors
transient protected ObjectInspector[] currentKeyObjectInspectors;
// new Key ObjectInspectors are objectInspectors from the parent
transient StructObjectInspector newKeyObjectInspector;
transient StructObjectInspector currentKeyObjectInspector;
-
+
/**
- * This is used to store the position and field names for variable length fields.
+ * This is used to store the position and field names for variable length
+ * fields.
**/
class varLenFields {
- int aggrPos;
- List<Field> fields;
+ int aggrPos;
+ List<Field> fields;
+
varLenFields(int aggrPos, List<Field> fields) {
this.aggrPos = aggrPos;
- this.fields = fields;
+ this.fields = fields;
}
int getAggrPos() {
@@ -132,24 +138,27 @@
}
};
- // for these positions, some variable primitive type (String) is used, so size cannot be estimated. sample it at runtime.
+ // for these positions, some variable primitive type (String) is used, so size
+ // cannot be estimated. sample it at runtime.
transient List<Integer> keyPositionsSize;
- // for these positions, some variable primitive type (String) is used for the aggregation classes
+ // for these positions, some variable primitive type (String) is used for the
+ // aggregation classes
transient List<varLenFields> aggrPositions;
- transient int fixedRowSize;
- transient long maxHashTblMemory;
- transient int totalVariableSize;
- transient int numEntriesVarSize;
- transient int numEntriesHashTable;
-
+ transient int fixedRowSize;
+ transient long maxHashTblMemory;
+ transient int totalVariableSize;
+ transient int numEntriesVarSize;
+ transient int numEntriesHashTable;
+
+ @Override
protected void initializeOp(Configuration hconf) throws HiveException {
totalMemory = Runtime.getRuntime().totalMemory();
numRowsInput = 0;
numRowsHashTbl = 0;
- assert(inputObjInspectors.length == 1);
+ assert (inputObjInspectors.length == 1);
ObjectInspector rowInspector = inputObjInspectors[0];
// init keyFields
@@ -160,40 +169,51 @@
for (int i = 0; i < keyFields.length; i++) {
keyFields[i] = ExprNodeEvaluatorFactory.get(conf.getKeys().get(i));
keyObjectInspectors[i] = keyFields[i].initialize(rowInspector);
- currentKeyObjectInspectors[i] = ObjectInspectorUtils.getStandardObjectInspector(keyObjectInspectors[i],
- ObjectInspectorCopyOption.WRITABLE);
+ currentKeyObjectInspectors[i] = ObjectInspectorUtils
+ .getStandardObjectInspector(keyObjectInspectors[i],
+ ObjectInspectorCopyOption.WRITABLE);
keyObjects[i] = null;
}
newKeys = new ArrayList<Object>(keyFields.length);
-
+
// init aggregationParameterFields
- aggregationParameterFields = new ExprNodeEvaluator[conf.getAggregators().size()][];
- aggregationParameterObjectInspectors = new ObjectInspector[conf.getAggregators().size()][];
- aggregationParameterStandardObjectInspectors = new ObjectInspector[conf.getAggregators().size()][];
+ aggregationParameterFields = new ExprNodeEvaluator[conf.getAggregators()
+ .size()][];
+ aggregationParameterObjectInspectors = new ObjectInspector[conf
+ .getAggregators().size()][];
+ aggregationParameterStandardObjectInspectors = new ObjectInspector[conf
+ .getAggregators().size()][];
aggregationParameterObjects = new Object[conf.getAggregators().size()][];
for (int i = 0; i < aggregationParameterFields.length; i++) {
- ArrayList<exprNodeDesc> parameters = conf.getAggregators().get(i).getParameters();
+ ArrayList<exprNodeDesc> parameters = conf.getAggregators().get(i)
+ .getParameters();
aggregationParameterFields[i] = new ExprNodeEvaluator[parameters.size()];
- aggregationParameterObjectInspectors[i] = new ObjectInspector[parameters.size()];
- aggregationParameterStandardObjectInspectors[i] = new ObjectInspector[parameters.size()];
+ aggregationParameterObjectInspectors[i] = new ObjectInspector[parameters
+ .size()];
+ aggregationParameterStandardObjectInspectors[i] = new ObjectInspector[parameters
+ .size()];
aggregationParameterObjects[i] = new Object[parameters.size()];
for (int j = 0; j < parameters.size(); j++) {
- aggregationParameterFields[i][j] = ExprNodeEvaluatorFactory.get(parameters.get(j));
- aggregationParameterObjectInspectors[i][j] = aggregationParameterFields[i][j].initialize(rowInspector);
- aggregationParameterStandardObjectInspectors[i][j] =
- ObjectInspectorUtils.getStandardObjectInspector(aggregationParameterObjectInspectors[i][j],
+ aggregationParameterFields[i][j] = ExprNodeEvaluatorFactory
+ .get(parameters.get(j));
+ aggregationParameterObjectInspectors[i][j] = aggregationParameterFields[i][j]
+ .initialize(rowInspector);
+ aggregationParameterStandardObjectInspectors[i][j] = ObjectInspectorUtils
+ .getStandardObjectInspector(
+ aggregationParameterObjectInspectors[i][j],
ObjectInspectorCopyOption.WRITABLE);
aggregationParameterObjects[i][j] = null;
}
}
// init aggregationIsDistinct
aggregationIsDistinct = new boolean[conf.getAggregators().size()];
- for(int i=0; i<aggregationIsDistinct.length; i++) {
+ for (int i = 0; i < aggregationIsDistinct.length; i++) {
aggregationIsDistinct[i] = conf.getAggregators().get(i).getDistinct();
}
- // init aggregationClasses
- aggregationEvaluators = new GenericUDAFEvaluator[conf.getAggregators().size()];
+ // init aggregationClasses
+ aggregationEvaluators = new GenericUDAFEvaluator[conf.getAggregators()
+ .size()];
for (int i = 0; i < aggregationEvaluators.length; i++) {
aggregationDesc agg = conf.getAggregators().get(i);
aggregationEvaluators[i] = agg.getGenericUDAFEvaluator();
@@ -202,15 +222,15 @@
// init objectInspectors
int totalFields = keyFields.length + aggregationEvaluators.length;
objectInspectors = new ArrayList<ObjectInspector>(totalFields);
- for(int i=0; i<keyFields.length; i++) {
+ for (ExprNodeEvaluator keyField : keyFields) {
objectInspectors.add(null);
}
- for(int i=0; i<aggregationEvaluators.length; i++) {
- ObjectInspector roi = aggregationEvaluators[i].init(
- conf.getAggregators().get(i).getMode(), aggregationParameterObjectInspectors[i]);
+ for (int i = 0; i < aggregationEvaluators.length; i++) {
+ ObjectInspector roi = aggregationEvaluators[i].init(conf.getAggregators()
+ .get(i).getMode(), aggregationParameterObjectInspectors[i]);
objectInspectors.add(roi);
}
-
+
bucketGroup = conf.getBucketGroup();
aggregationsParametersLastInvoke = new Object[conf.getAggregators().size()][];
if (conf.getMode() != groupByDesc.Mode.HASH || bucketGroup) {
@@ -222,117 +242,137 @@
hashAggr = true;
keyPositionsSize = new ArrayList<Integer>();
aggrPositions = new ArrayList<varLenFields>();
- groupbyMapAggrInterval = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEGROUPBYMAPINTERVAL);
+ groupbyMapAggrInterval = HiveConf.getIntVar(hconf,
+ HiveConf.ConfVars.HIVEGROUPBYMAPINTERVAL);
// compare every groupbyMapAggrInterval rows
numRowsCompareHashAggr = groupbyMapAggrInterval;
- minReductionHashAggr = HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEMAPAGGRHASHMINREDUCTION);
+ minReductionHashAggr = HiveConf.getFloatVar(hconf,
+ HiveConf.ConfVars.HIVEMAPAGGRHASHMINREDUCTION);
groupKeyIsNotReduceKey = conf.getGroupKeyNotReductionKey();
- if (groupKeyIsNotReduceKey)
+ if (groupKeyIsNotReduceKey) {
keysCurrentGroup = new HashSet<ArrayList<Object>>();
+ }
}
fieldNames = conf.getOutputColumnNames();
for (int i = 0; i < keyFields.length; i++) {
- objectInspectors.set(i, currentKeyObjectInspectors[i]);
+ objectInspectors.set(i, currentKeyObjectInspectors[i]);
}
-
+
// Generate key names
ArrayList<String> keyNames = new ArrayList<String>(keyFields.length);
for (int i = 0; i < keyFields.length; i++) {
keyNames.add(fieldNames.get(i));
}
- newKeyObjectInspector =
- ObjectInspectorFactory.getStandardStructObjectInspector(keyNames, Arrays.asList(keyObjectInspectors));
- currentKeyObjectInspector =
- ObjectInspectorFactory.getStandardStructObjectInspector(keyNames, Arrays.asList(currentKeyObjectInspectors));
-
- outputObjInspector =
- ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, objectInspectors);
+ newKeyObjectInspector = ObjectInspectorFactory
+ .getStandardStructObjectInspector(keyNames, Arrays
+ .asList(keyObjectInspectors));
+ currentKeyObjectInspector = ObjectInspectorFactory
+ .getStandardStructObjectInspector(keyNames, Arrays
+ .asList(currentKeyObjectInspectors));
+
+ outputObjInspector = ObjectInspectorFactory
+ .getStandardStructObjectInspector(fieldNames, objectInspectors);
firstRow = true;
- // estimate the number of hash table entries based on the size of each entry. Since the size of a entry
+ // estimate the number of hash table entries based on the size of each
+ // entry. Since the size of a entry
// is not known, estimate that based on the number of entries
- if (hashAggr)
+ if (hashAggr) {
computeMaxEntriesHashAggr(hconf);
+ }
initializeChildren(hconf);
}
/**
- * Estimate the number of entries in map-side hash table.
- * The user can specify the total amount of memory to be used by the map-side hash. By default, all available
- * memory is used. The size of each row is estimated, rather crudely, and the number of entries are figure out
- * based on that.
- * @return number of entries that can fit in hash table - useful for map-side aggregation only
+ * Estimate the number of entries in map-side hash table. The user can specify
+ * the total amount of memory to be used by the map-side hash. By default, all
+ * available memory is used. The size of each row is estimated, rather
+ * crudely, and the number of entries are figure out based on that.
+ *
+ * @return number of entries that can fit in hash table - useful for map-side
+ * aggregation only
**/
- private void computeMaxEntriesHashAggr(Configuration hconf) throws HiveException {
- maxHashTblMemory = (long)(HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY) * Runtime.getRuntime().maxMemory());
+ private void computeMaxEntriesHashAggr(Configuration hconf)
+ throws HiveException {
+ maxHashTblMemory = (long) (HiveConf.getFloatVar(hconf,
+ HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY) * Runtime.getRuntime()
+ .maxMemory());
estimateRowSize();
}
- private static final int javaObjectOverHead = 64;
+ private static final int javaObjectOverHead = 64;
private static final int javaHashEntryOverHead = 64;
private static final int javaSizePrimitiveType = 16;
- private static final int javaSizeUnknownType = 256;
+ private static final int javaSizeUnknownType = 256;
/**
- * The size of the element at position 'pos' is returned, if possible.
- * If the datatype is of variable length, STRING, a list of such key positions is maintained, and the size for such positions is
- * then actually calculated at runtime.
- * @param pos the position of the key
- * @param c the type of the key
+ * The size of the element at position 'pos' is returned, if possible. If the
+ * datatype is of variable length, STRING, a list of such key positions is
+ * maintained, and the size for such positions is then actually calculated at
+ * runtime.
+ *
+ * @param pos
+ * the position of the key
+ * @param c
+ * the type of the key
* @return the size of this datatype
**/
private int getSize(int pos, PrimitiveCategory category) {
- switch(category) {
- case VOID:
- case BOOLEAN:
- case BYTE:
- case SHORT:
- case INT:
- case LONG:
- case FLOAT:
- case DOUBLE: {
- return javaSizePrimitiveType;
- }
- case STRING: {
- keyPositionsSize.add(new Integer(pos));
- return javaObjectOverHead;
- }
- default: {
- return javaSizeUnknownType;
- }
+ switch (category) {
+ case VOID:
+ case BOOLEAN:
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE: {
+ return javaSizePrimitiveType;
+ }
+ case STRING: {
+ keyPositionsSize.add(new Integer(pos));
+ return javaObjectOverHead;
+ }
+ default: {
+ return javaSizeUnknownType;
+ }
}
}
/**
- * The size of the element at position 'pos' is returned, if possible.
- * If the field is of variable length, STRING, a list of such field names for the field position is maintained, and the size
- * for such positions is then actually calculated at runtime.
- * @param pos the position of the key
- * @param c the type of the key
- * @param f the field to be added
+ * The size of the element at position 'pos' is returned, if possible. If the
+ * field is of variable length, STRING, a list of such field names for the
+ * field position is maintained, and the size for such positions is then
+ * actually calculated at runtime.
+ *
+ * @param pos
+ * the position of the key
+ * @param c
+ * the type of the key
+ * @param f
+ * the field to be added
* @return the size of this datatype
**/
private int getSize(int pos, Class<?> c, Field f) {
- if (c.isPrimitive() ||
- c.isInstance(new Boolean(true)) ||
- c.isInstance(new Byte((byte)0)) ||
- c.isInstance(new Short((short)0)) ||
- c.isInstance(new Integer(0)) ||
- c.isInstance(new Long(0)) ||
- c.isInstance(new Float(0)) ||
- c.isInstance(new Double(0)))
+ if (c.isPrimitive() || c.isInstance(new Boolean(true))
+ || c.isInstance(new Byte((byte) 0))
+ || c.isInstance(new Short((short) 0)) || c.isInstance(new Integer(0))
+ || c.isInstance(new Long(0)) || c.isInstance(new Float(0))
+ || c.isInstance(new Double(0))) {
return javaSizePrimitiveType;
+ }
if (c.isInstance(new String())) {
int idx = 0;
varLenFields v = null;
for (idx = 0; idx < aggrPositions.size(); idx++) {
v = aggrPositions.get(idx);
- if (v.getAggrPos() == pos)
+ if (v.getAggrPos() == pos) {
break;
+ }
}
if (idx == aggrPositions.size()) {
@@ -343,18 +383,21 @@
v.getFields().add(f);
return javaObjectOverHead;
}
-
+
return javaSizeUnknownType;
}
/**
- * @param pos position of the key
- * @param typeinfo type of the input
+ * @param pos
+ * position of the key
+ * @param typeinfo
+ * type of the input
* @return the size of this datatype
**/
private int getSize(int pos, TypeInfo typeInfo) {
- if (typeInfo instanceof PrimitiveTypeInfo)
- return getSize(pos, ((PrimitiveTypeInfo)typeInfo).getPrimitiveCategory());
+ if (typeInfo instanceof PrimitiveTypeInfo) {
+ return getSize(pos, ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory());
+ }
return javaSizeUnknownType;
}
@@ -362,23 +405,28 @@
* @return the size of each row
**/
private void estimateRowSize() throws HiveException {
- // estimate the size of each entry -
- // a datatype with unknown size (String/Struct etc. - is assumed to be 256 bytes for now).
+ // estimate the size of each entry -
+ // a datatype with unknown size (String/Struct etc. - is assumed to be 256
+ // bytes for now).
// 64 bytes is the overhead for a reference
fixedRowSize = javaHashEntryOverHead;
ArrayList<exprNodeDesc> keys = conf.getKeys();
- // Go over all the keys and get the size of the fields of fixed length. Keep track of the variable length keys
- for (int pos = 0; pos < keys.size(); pos++)
+ // Go over all the keys and get the size of the fields of fixed length. Keep
+ // track of the variable length keys
+ for (int pos = 0; pos < keys.size(); pos++) {
fixedRowSize += getSize(pos, keys.get(pos).getTypeInfo());
+ }
- // Go over all the aggregation classes and and get the size of the fields of fixed length. Keep track of the variable length
+ // Go over all the aggregation classes and and get the size of the fields of
+ // fixed length. Keep track of the variable length
// fields in these aggregation classes.
- for(int i=0; i < aggregationEvaluators.length; i++) {
+ for (int i = 0; i < aggregationEvaluators.length; i++) {
fixedRowSize += javaObjectOverHead;
- Class<? extends AggregationBuffer> agg = aggregationEvaluators[i].getNewAggregationBuffer().getClass();
+ Class<? extends AggregationBuffer> agg = aggregationEvaluators[i]
+ .getNewAggregationBuffer().getClass();
Field[] fArr = ObjectInspectorUtils.getDeclaredNonStaticFields(agg);
for (Field f : fArr) {
fixedRowSize += getSize(i, f.getType(), f);
@@ -386,43 +434,50 @@
}
}
- protected AggregationBuffer[] newAggregations() throws HiveException {
+ protected AggregationBuffer[] newAggregations() throws HiveException {
AggregationBuffer[] aggs = new AggregationBuffer[aggregationEvaluators.length];
- for(int i=0; i<aggregationEvaluators.length; i++) {
+ for (int i = 0; i < aggregationEvaluators.length; i++) {
aggs[i] = aggregationEvaluators[i].getNewAggregationBuffer();
// aggregationClasses[i].reset(aggs[i]);
}
return aggs;
}
- protected void resetAggregations(AggregationBuffer[] aggs) throws HiveException {
- for(int i=0; i<aggs.length; i++) {
+ protected void resetAggregations(AggregationBuffer[] aggs)
+ throws HiveException {
+ for (int i = 0; i < aggs.length; i++) {
aggregationEvaluators[i].reset(aggs[i]);
}
}
-
/*
- * Update aggregations.
- * If the aggregation is for distinct, in case of hash aggregation, the client tells us whether it is a new entry.
- * For sort-based aggregations, the last row is compared with the current one to figure out whether it has changed.
- * As a cleanup, the lastInvoke logic can be pushed in the caller, and this function can be independent of that. The
- * client should always notify whether it is a different row or not.
+ * Update aggregations. If the aggregation is for distinct, in case of hash
+ * aggregation, the client tells us whether it is a new entry. For sort-based
+ * aggregations, the last row is compared with the current one to figure out
+ * whether it has changed. As a cleanup, the lastInvoke logic can be pushed in
+ * the caller, and this function can be independent of that. The client should
+ * always notify whether it is a different row or not.
*
* @param aggs the aggregations to be evaluated
- * @param row the row being processed
+ *
+ * @param row the row being processed
+ *
* @param rowInspector the inspector for the row
+ *
* @param hashAggr whether hash aggregation is being performed or not
- * @param newEntryForHashAggr only valid if it is a hash aggregation, whether it is a new entry or not
+ *
+ * @param newEntryForHashAggr only valid if it is a hash aggregation, whether
+ * it is a new entry or not
*/
- protected void updateAggregations(AggregationBuffer[] aggs, Object row, ObjectInspector rowInspector, boolean hashAggr, boolean newEntryForHashAggr,
- Object[][] lastInvoke) throws HiveException {
-
- for(int ai=0; ai<aggs.length; ai++) {
+ protected void updateAggregations(AggregationBuffer[] aggs, Object row,
+ ObjectInspector rowInspector, boolean hashAggr,
+ boolean newEntryForHashAggr, Object[][] lastInvoke) throws HiveException {
+
+ for (int ai = 0; ai < aggs.length; ai++) {
- // Calculate the parameters
+ // Calculate the parameters
Object[] o = new Object[aggregationParameterFields[ai].length];
- for(int pi=0; pi<aggregationParameterFields[ai].length; pi++) {
+ for (int pi = 0; pi < aggregationParameterFields[ai].length; pi++) {
o[pi] = aggregationParameterFields[ai][pi].evaluate(row);
}
@@ -432,36 +487,40 @@
if (newEntryForHashAggr) {
aggregationEvaluators[ai].aggregate(aggs[ai], o);
}
- }
- else {
+ } else {
if (lastInvoke[ai] == null) {
lastInvoke[ai] = new Object[o.length];
}
- if (ObjectInspectorUtils.compare(o, aggregationParameterObjectInspectors[ai],
- lastInvoke[ai], aggregationParameterStandardObjectInspectors[ai]) != 0) {
+ if (ObjectInspectorUtils.compare(o,
+ aggregationParameterObjectInspectors[ai], lastInvoke[ai],
+ aggregationParameterStandardObjectInspectors[ai]) != 0) {
aggregationEvaluators[ai].aggregate(aggs[ai], o);
- for (int pi=0; pi<o.length; pi++) {
- lastInvoke[ai][pi] = ObjectInspectorUtils.copyToStandardObject(o[pi],
- aggregationParameterObjectInspectors[ai][pi], ObjectInspectorCopyOption.WRITABLE);
+ for (int pi = 0; pi < o.length; pi++) {
+ lastInvoke[ai][pi] = ObjectInspectorUtils.copyToStandardObject(
+ o[pi], aggregationParameterObjectInspectors[ai][pi],
+ ObjectInspectorCopyOption.WRITABLE);
}
}
}
- }
- else {
+ } else {
aggregationEvaluators[ai].aggregate(aggs[ai], o);
}
}
}
+ @Override
public void startGroup() throws HiveException {
firstRowInGroup = true;
}
+ @Override
public void endGroup() throws HiveException {
- if (groupKeyIsNotReduceKey)
+ if (groupKeyIsNotReduceKey) {
keysCurrentGroup.clear();
+ }
}
-
+
+ @Override
public void processOp(Object row, int tag) throws HiveException {
firstRow = false;
ObjectInspector rowInspector = inputObjInspectors[tag];
@@ -473,14 +532,17 @@
numRowsCompareHashAggr += groupbyMapAggrInterval;
// map-side aggregation should reduce the entries by at-least half
if (numRowsHashTbl > numRowsInput * minReductionHashAggr) {
- LOG.warn("Disable Hash Aggr: #hash table = " + numRowsHashTbl + " #total = " + numRowsInput
- + " reduction = " + 1.0*(numRowsHashTbl/numRowsInput) + " minReduction = " + minReductionHashAggr);
+ LOG.warn("Disable Hash Aggr: #hash table = " + numRowsHashTbl
+ + " #total = " + numRowsInput + " reduction = " + 1.0
+ * (numRowsHashTbl / numRowsInput) + " minReduction = "
+ + minReductionHashAggr);
flush(true);
hashAggr = false;
- }
- else {
- LOG.trace("Hash Aggr Enabled: #hash table = " + numRowsHashTbl + " #total = " + numRowsInput
- + " reduction = " + 1.0*(numRowsHashTbl/numRowsInput) + " minReduction = " + minReductionHashAggr);
+ } else {
+ LOG.trace("Hash Aggr Enabled: #hash table = " + numRowsHashTbl
+ + " #total = " + numRowsInput + " reduction = " + 1.0
+ * (numRowsHashTbl / numRowsInput) + " minReduction = "
+ + minReductionHashAggr);
}
}
}
@@ -496,10 +558,11 @@
newKeys.add(keyObjects[i]);
}
- if (hashAggr)
+ if (hashAggr) {
processHashAggr(row, rowInspector, newKeys);
- else
+ } else {
processAggr(row, rowInspector, newKeys);
+ }
firstRowInGroup = false;
} catch (HiveException e) {
@@ -509,167 +572,197 @@
}
}
- private static ArrayList<Object> deepCopyElements(Object[] keys, ObjectInspector[] keyObjectInspectors,
+ private static ArrayList<Object> deepCopyElements(Object[] keys,
+ ObjectInspector[] keyObjectInspectors,
ObjectInspectorCopyOption copyOption) {
ArrayList<Object> result = new ArrayList<Object>(keys.length);
deepCopyElements(keys, keyObjectInspectors, result, copyOption);
return result;
}
-
- private static void deepCopyElements(Object[] keys, ObjectInspector[] keyObjectInspectors, ArrayList<Object> result,
+
+ private static void deepCopyElements(Object[] keys,
+ ObjectInspector[] keyObjectInspectors, ArrayList<Object> result,
ObjectInspectorCopyOption copyOption) {
result.clear();
- for (int i=0; i<keys.length; i++) {
- result.add(ObjectInspectorUtils.copyToStandardObject(keys[i], keyObjectInspectors[i], copyOption));
+ for (int i = 0; i < keys.length; i++) {
+ result.add(ObjectInspectorUtils.copyToStandardObject(keys[i],
+ keyObjectInspectors[i], copyOption));
}
}
-
- class KeyWrapper{
+
+ class KeyWrapper {
int hashcode;
ArrayList<Object> keys;
// decide whether this is already in hashmap (keys in hashmap are deepcopied
// version, and we need to use 'currentKeyObjectInspector').
- boolean copy = false;
-
- KeyWrapper() {}
-
+ boolean copy = false;
+
+ KeyWrapper() {
+ }
+
public KeyWrapper(int hashcode, ArrayList<Object> copiedKeys) {
this(hashcode, copiedKeys, false);
}
-
- public KeyWrapper(int hashcode, ArrayList<Object> copiedKeys, boolean inHashMap) {
+
+ public KeyWrapper(int hashcode, ArrayList<Object> copiedKeys,
+ boolean inHashMap) {
super();
this.hashcode = hashcode;
- this.keys = copiedKeys;
- this.copy = inHashMap;
+ keys = copiedKeys;
+ copy = inHashMap;
}
-
- public int hashCode(){
+
+ @Override
+ public int hashCode() {
return hashcode;
}
-
+
+ @Override
public boolean equals(Object obj) {
ArrayList<Object> copied_in_hashmap = ((KeyWrapper) obj).keys;
- if(!copy)
- return ObjectInspectorUtils.compare(copied_in_hashmap, currentKeyObjectInspector, keys, newKeyObjectInspector) == 0;
- else
- return ObjectInspectorUtils.compare(copied_in_hashmap, currentKeyObjectInspector, keys, currentKeyObjectInspector) == 0;
+ if (!copy) {
+ return ObjectInspectorUtils.compare(copied_in_hashmap,
+ currentKeyObjectInspector, keys, newKeyObjectInspector) == 0;
+ } else {
+ return ObjectInspectorUtils.compare(copied_in_hashmap,
+ currentKeyObjectInspector, keys, currentKeyObjectInspector) == 0;
+ }
}
}
-
+
KeyWrapper keyProber = new KeyWrapper();
- private void processHashAggr(Object row, ObjectInspector rowInspector, ArrayList<Object> newKeys) throws HiveException {
+
+ private void processHashAggr(Object row, ObjectInspector rowInspector,
+ ArrayList<Object> newKeys) throws HiveException {
// Prepare aggs for updating
AggregationBuffer[] aggs = null;
boolean newEntryForHashAggr = false;
-
+
keyProber.hashcode = newKeys.hashCode();
- //use this to probe the hashmap
+ // use this to probe the hashmap
keyProber.keys = newKeys;
-
+
// hash-based aggregations
aggs = hashAggregations.get(keyProber);
ArrayList<Object> newDefaultKeys = null;
- if(aggs == null) {
- newDefaultKeys = deepCopyElements(keyObjects, keyObjectInspectors, ObjectInspectorCopyOption.WRITABLE);
- KeyWrapper newKeyProber = new KeyWrapper(keyProber.hashcode, newDefaultKeys, true);
+ if (aggs == null) {
+ newDefaultKeys = deepCopyElements(keyObjects, keyObjectInspectors,
+ ObjectInspectorCopyOption.WRITABLE);
+ KeyWrapper newKeyProber = new KeyWrapper(keyProber.hashcode,
+ newDefaultKeys, true);
aggs = newAggregations();
hashAggregations.put(newKeyProber, aggs);
newEntryForHashAggr = true;
- numRowsHashTbl++; // new entry in the hash table
+ numRowsHashTbl++; // new entry in the hash table
}
-
- // If the grouping key and the reduction key are different, a set of grouping keys for the current reduction key are maintained in keysCurrentGroup
- // Peek into the set to find out if a new grouping key is seen for the given reduction key
+
+ // If the grouping key and the reduction key are different, a set of
+ // grouping keys for the current reduction key are maintained in
+ // keysCurrentGroup
+ // Peek into the set to find out if a new grouping key is seen for the given
+ // reduction key
if (groupKeyIsNotReduceKey) {
- if(newDefaultKeys == null)
- newDefaultKeys = deepCopyElements(keyObjects, keyObjectInspectors, ObjectInspectorCopyOption.WRITABLE);
+ if (newDefaultKeys == null) {
+ newDefaultKeys = deepCopyElements(keyObjects, keyObjectInspectors,
+ ObjectInspectorCopyOption.WRITABLE);
+ }
newEntryForHashAggr = keysCurrentGroup.add(newDefaultKeys);
}
// Update the aggs
updateAggregations(aggs, row, rowInspector, true, newEntryForHashAggr, null);
- // We can only flush after the updateAggregations is done, or the potentially new entry "aggs"
+ // We can only flush after the updateAggregations is done, or the
+ // potentially new entry "aggs"
// can be flushed out of the hash table.
-
- // Based on user-specified parameters, check if the hash table needs to be flushed.
- // If the grouping key is not the same as reduction key, flushing can only happen at boundaries
- if ((!groupKeyIsNotReduceKey || firstRowInGroup) && shouldBeFlushed(newKeys)) {
+
+ // Based on user-specified parameters, check if the hash table needs to be
+ // flushed.
+ // If the grouping key is not the same as reduction key, flushing can only
+ // happen at boundaries
+ if ((!groupKeyIsNotReduceKey || firstRowInGroup)
+ && shouldBeFlushed(newKeys)) {
flush(false);
}
}
// Non-hash aggregation
- private void processAggr(Object row, ObjectInspector rowInspector, ArrayList<Object> newKeys) throws HiveException {
+ private void processAggr(Object row, ObjectInspector rowInspector,
+ ArrayList<Object> newKeys) throws HiveException {
// Prepare aggs for updating
AggregationBuffer[] aggs = null;
Object[][] lastInvoke = null;
- boolean keysAreEqual = ObjectInspectorUtils.compare(
- newKeys, newKeyObjectInspector,
- currentKeys, currentKeyObjectInspector) == 0;
-
+ boolean keysAreEqual = ObjectInspectorUtils.compare(newKeys,
+ newKeyObjectInspector, currentKeys, currentKeyObjectInspector) == 0;
+
// Forward the current keys if needed for sort-based aggregation
- if (currentKeys != null && !keysAreEqual)
+ if (currentKeys != null && !keysAreEqual) {
forward(currentKeys, aggregations);
-
+ }
+
// Need to update the keys?
if (currentKeys == null || !keysAreEqual) {
if (currentKeys == null) {
currentKeys = new ArrayList<Object>(keyFields.length);
}
- deepCopyElements(keyObjects, keyObjectInspectors, currentKeys, ObjectInspectorCopyOption.WRITABLE);
-
+ deepCopyElements(keyObjects, keyObjectInspectors, currentKeys,
+ ObjectInspectorCopyOption.WRITABLE);
+
// Reset the aggregations
resetAggregations(aggregations);
-
+
// clear parameters in last-invoke
- for(int i=0; i<aggregationsParametersLastInvoke.length; i++)
+ for (int i = 0; i < aggregationsParametersLastInvoke.length; i++) {
aggregationsParametersLastInvoke[i] = null;
+ }
}
-
+
aggs = aggregations;
-
+
lastInvoke = aggregationsParametersLastInvoke;
// Update the aggs
-
+
updateAggregations(aggs, row, rowInspector, false, false, lastInvoke);
}
/**
* Based on user-parameters, should the hash table be flushed.
- * @param newKeys keys for the row under consideration
+ *
+ * @param newKeys
+ * keys for the row under consideration
**/
private boolean shouldBeFlushed(ArrayList<Object> newKeys) {
int numEntries = hashAggregations.size();
- // The fixed size for the aggregation class is already known. Get the variable portion of the size every NUMROWSESTIMATESIZE rows.
+ // The fixed size for the aggregation class is already known. Get the
+ // variable portion of the size every NUMROWSESTIMATESIZE rows.
if ((numEntriesHashTable == 0) || ((numEntries % NUMROWSESTIMATESIZE) == 0)) {
for (Integer pos : keyPositionsSize) {
Object key = newKeys.get(pos.intValue());
// Ignore nulls
if (key != null) {
if (key instanceof String) {
- totalVariableSize += ((String)key).length();
+ totalVariableSize += ((String) key).length();
} else if (key instanceof Text) {
- totalVariableSize += ((Text)key).getLength();
+ totalVariableSize += ((Text) key).getLength();
}
}
}
AggregationBuffer[] aggs = null;
- if (aggrPositions.size() > 0)
+ if (aggrPositions.size() > 0) {
aggs = hashAggregations.get(newKeys);
+ }
for (varLenFields v : aggrPositions) {
- int aggrPos = v.getAggrPos();
+ int aggrPos = v.getAggrPos();
List<Field> fieldsVarLen = v.getFields();
- AggregationBuffer agg = aggs[aggrPos];
+ AggregationBuffer agg = aggs[aggrPos];
- try
- {
- for (Field f : fieldsVarLen)
- totalVariableSize += ((String)f.get(agg)).length();
+ try {
+ for (Field f : fieldsVarLen) {
+ totalVariableSize += ((String) f.get(agg)).length();
+ }
} catch (IllegalAccessException e) {
assert false;
}
@@ -678,24 +771,26 @@
numEntriesVarSize++;
// Update the number of entries that can fit in the hash table
- numEntriesHashTable = (int)(maxHashTblMemory / (fixedRowSize + ((int)totalVariableSize/numEntriesVarSize)));
- LOG.trace("Hash Aggr: #hash table = " + numEntries + " #max in hash table = " + numEntriesHashTable);
+ numEntriesHashTable = (int) (maxHashTblMemory / (fixedRowSize + (totalVariableSize / numEntriesVarSize)));
+ LOG.trace("Hash Aggr: #hash table = " + numEntries
+ + " #max in hash table = " + numEntriesHashTable);
}
// flush if necessary
- if (numEntries >= numEntriesHashTable)
+ if (numEntries >= numEntriesHashTable) {
return true;
+ }
return false;
}
private void flush(boolean complete) throws HiveException {
-
+
// Currently, the algorithm flushes 10% of the entries - this can be
// changed in the future
if (complete) {
- Iterator<Map.Entry<KeyWrapper, AggregationBuffer[]>>
- iter = hashAggregations.entrySet().iterator();
+ Iterator<Map.Entry<KeyWrapper, AggregationBuffer[]>> iter = hashAggregations
+ .entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<KeyWrapper, AggregationBuffer[]> m = iter.next();
forward(m.getKey().keys, m.getValue());
@@ -708,8 +803,8 @@
int oldSize = hashAggregations.size();
LOG.warn("Hash Tbl flush: #hash table = " + oldSize);
- Iterator<Map.Entry<KeyWrapper, AggregationBuffer[]>>
- iter = hashAggregations.entrySet().iterator();
+ Iterator<Map.Entry<KeyWrapper, AggregationBuffer[]>> iter = hashAggregations
+ .entrySet().iterator();
int numDel = 0;
while (iter.hasNext()) {
Map.Entry<KeyWrapper, AggregationBuffer[]> m = iter.next();
@@ -732,24 +827,27 @@
* The keys in the record
* @throws HiveException
*/
- protected void forward(ArrayList<Object> keys, AggregationBuffer[] aggs) throws HiveException {
+ protected void forward(ArrayList<Object> keys, AggregationBuffer[] aggs)
+ throws HiveException {
int totalFields = keys.size() + aggs.length;
if (forwardCache == null) {
forwardCache = new Object[totalFields];
}
- for(int i=0; i<keys.size(); i++) {
+ for (int i = 0; i < keys.size(); i++) {
forwardCache[i] = keys.get(i);
}
- for(int i=0; i<aggs.length; i++) {
- forwardCache[keys.size() + i] = aggregationEvaluators[i].evaluate(aggs[i]);
+ for (int i = 0; i < aggs.length; i++) {
+ forwardCache[keys.size() + i] = aggregationEvaluators[i]
+ .evaluate(aggs[i]);
}
forward(forwardCache, outputObjInspector);
}
-
+
/**
* We need to forward all the aggregations to children.
*
*/
+ @Override
public void closeOp(boolean abort) throws HiveException {
if (!abort) {
try {
@@ -758,38 +856,40 @@
firstRow = false;
// There is no grouping key - simulate a null row
- // This is based on the assumption that a null row is ignored by aggregation functions
- for(int ai=0; ai<aggregations.length; ai++) {
- // Calculate the parameters
+ // This is based on the assumption that a null row is ignored by
+ // aggregation functions
+ for (int ai = 0; ai < aggregations.length; ai++) {
+ // Calculate the parameters
Object[] o = new Object[aggregationParameterFields[ai].length];
- for(int pi=0; pi<aggregationParameterFields[ai].length; pi++) {
+ for (int pi = 0; pi < aggregationParameterFields[ai].length; pi++) {
o[pi] = null;
}
aggregationEvaluators[ai].aggregate(aggregations[ai], o);
}
-
+
// create dummy keys - size 0
forward(new ArrayList<Object>(0), aggregations);
- }
- else {
+ } else {
if (hashAggregations != null) {
- LOG.warn("Begin Hash Table flush at close: size = " + hashAggregations.size());
+ LOG.warn("Begin Hash Table flush at close: size = "
+ + hashAggregations.size());
Iterator iter = hashAggregations.entrySet().iterator();
while (iter.hasNext()) {
- Map.Entry<KeyWrapper, AggregationBuffer[]> m = (Map.Entry)iter.next();
+ Map.Entry<KeyWrapper, AggregationBuffer[]> m = (Map.Entry) iter
+ .next();
forward(m.getKey().keys, m.getValue());
iter.remove();
}
hashAggregations.clear();
- }
- else if (aggregations != null) {
+ } else if (aggregations != null) {
// sort-based aggregations
if (currentKeys != null) {
forward(currentKeys, aggregations);
}
currentKeys = null;
} else {
- // The GroupByOperator is not initialized, which means there is no data
+ // The GroupByOperator is not initialized, which means there is no
+ // data
// (since we initialize the operators when we see the first record).
// Just do nothing here.
}
@@ -802,17 +902,20 @@
}
// Group by contains the columns needed - no need to aggregate from children
- public List<String> genColLists(HashMap<Operator<? extends Serializable>, OpParseContext> opParseCtx) {
+ public List<String> genColLists(
+ HashMap<Operator<? extends Serializable>, OpParseContext> opParseCtx) {
List<String> colLists = new ArrayList<String>();
ArrayList<exprNodeDesc> keys = conf.getKeys();
- for (exprNodeDesc key : keys)
+ for (exprNodeDesc key : keys) {
colLists = Utilities.mergeUniqElems(colLists, key.getCols());
-
+ }
+
ArrayList<aggregationDesc> aggrs = conf.getAggregators();
- for (aggregationDesc aggr : aggrs) {
+ for (aggregationDesc aggr : aggrs) {
ArrayList<exprNodeDesc> params = aggr.getParameters();
- for (exprNodeDesc param : params)
+ for (exprNodeDesc param : params) {
colLists = Utilities.mergeUniqElems(colLists, param.getCols());
+ }
}
return colLists;
@@ -825,7 +928,8 @@
public String getName() {
return new String("GBY");
}
-
+
+ @Override
public int getType() {
return OperatorType.GROUPBY;
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JobTrackerURLResolver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JobTrackerURLResolver.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JobTrackerURLResolver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JobTrackerURLResolver.java Thu Jan 21 10:37:58 2010
@@ -34,10 +34,9 @@
InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
int infoPort = infoSocAddr.getPort();
- String tracker = "http://" +
- JobTracker.getAddress(conf).getHostName() + ":" +
- infoPort;
-
+ String tracker = "http://" + JobTracker.getAddress(conf).getHostName()
+ + ":" + infoPort;
+
return tracker;
}
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java Thu Jan 21 10:37:58 2010
@@ -33,132 +33,148 @@
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-
/**
* Join operator implementation.
*/
-public class JoinOperator extends CommonJoinOperator<joinDesc> implements Serializable {
+public class JoinOperator extends CommonJoinOperator<joinDesc> implements
+ Serializable {
private static final long serialVersionUID = 1L;
-
+
private transient SkewJoinHandler skewJoinKeyContext = null;
-
+
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
super.initializeOp(hconf);
initializeChildren(hconf);
- if(this.handleSkewJoin) {
+ if (handleSkewJoin) {
skewJoinKeyContext = new SkewJoinHandler(this);
skewJoinKeyContext.initiliaze(hconf);
}
}
-
- public void processOp(Object row, int tag)
- throws HiveException {
+
+ @Override
+ public void processOp(Object row, int tag) throws HiveException {
try {
-
+
// get alias
- alias = (byte)tag;
-
- if ((lastAlias == null) || (!lastAlias.equals(alias)))
+ alias = (byte) tag;
+
+ if ((lastAlias == null) || (!lastAlias.equals(alias))) {
nextSz = joinEmitInterval;
-
- ArrayList<Object> nr = computeValues(row, joinValues.get(alias), joinValuesObjectInspectors.get(alias));
-
- if(this.handleSkewJoin)
+ }
+
+ ArrayList<Object> nr = computeValues(row, joinValues.get(alias),
+ joinValuesObjectInspectors.get(alias));
+
+ if (handleSkewJoin) {
skewJoinKeyContext.handleSkew(tag);
-
+ }
+
// number of rows for the key in the given table
int sz = storage.get(alias).size();
-
+
// Are we consuming too much memory
if (alias == numAliases - 1) {
if (sz == joinEmitInterval) {
- // The input is sorted by alias, so if we are already in the last join operand,
+ // The input is sorted by alias, so if we are already in the last join
+ // operand,
// we can emit some results now.
- // Note this has to be done before adding the current row to the storage,
+ // Note this has to be done before adding the current row to the
+ // storage,
// to preserve the correctness for outer joins.
checkAndGenObject();
storage.get(alias).clear();
}
} else {
if (sz == nextSz) {
- // Output a warning if we reached at least 1000 rows for a join operand
+ // Output a warning if we reached at least 1000 rows for a join
+ // operand
// We won't output a warning for the last join operand since the size
// will never goes to joinEmitInterval.
- StructObjectInspector soi = (StructObjectInspector)inputObjInspectors[tag];
- StructField sf = soi.getStructFieldRef(Utilities.ReduceField.KEY.toString());
+ StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[tag];
+ StructField sf = soi.getStructFieldRef(Utilities.ReduceField.KEY
+ .toString());
Object keyObject = soi.getStructFieldData(row, sf);
- LOG.warn("table " + alias + " has " + sz + " rows for join key " + keyObject);
+ LOG.warn("table " + alias + " has " + sz + " rows for join key "
+ + keyObject);
nextSz = getNextSize(nextSz);
}
}
-
+
// Add the value to the vector
storage.get(alias).add(nr);
-
+
} catch (Exception e) {
e.printStackTrace();
throw new HiveException(e);
}
}
-
+
+ @Override
public int getType() {
return OperatorType.JOIN;
}
/**
* All done
- *
+ *
*/
+ @Override
public void closeOp(boolean abort) throws HiveException {
- if (this.handleSkewJoin) {
+ if (handleSkewJoin) {
skewJoinKeyContext.close(abort);
}
super.closeOp(abort);
}
-
+
@Override
- public void jobClose(Configuration hconf, boolean success) throws HiveException {
- if(this.handleSkewJoin) {
+ public void jobClose(Configuration hconf, boolean success)
+ throws HiveException {
+ if (handleSkewJoin) {
try {
for (int i = 0; i < numAliases; i++) {
- String specPath = this.conf.getBigKeysDirMap().get((byte)i);
+ String specPath = conf.getBigKeysDirMap().get((byte) i);
FileSinkOperator.mvFileToFinalPath(specPath, hconf, success, LOG);
for (int j = 0; j < numAliases; j++) {
- if(j == i) continue;
- specPath = getConf().getSmallKeysDirMap().get((byte)i).get((byte)j);
+ if (j == i) {
+ continue;
+ }
+ specPath = getConf().getSmallKeysDirMap().get((byte) i).get(
+ (byte) j);
FileSinkOperator.mvFileToFinalPath(specPath, hconf, success, LOG);
}
}
-
- if(success) {
- //move up files
+
+ if (success) {
+ // move up files
for (int i = 0; i < numAliases; i++) {
- String specPath = this.conf.getBigKeysDirMap().get((byte)i);
+ String specPath = conf.getBigKeysDirMap().get((byte) i);
moveUpFiles(specPath, hconf, LOG);
for (int j = 0; j < numAliases; j++) {
- if(j == i) continue;
- specPath = getConf().getSmallKeysDirMap().get((byte)i).get((byte)j);
+ if (j == i) {
+ continue;
+ }
+ specPath = getConf().getSmallKeysDirMap().get((byte) i).get(
+ (byte) j);
moveUpFiles(specPath, hconf, LOG);
}
}
}
} catch (IOException e) {
- throw new HiveException (e);
+ throw new HiveException(e);
}
}
super.jobClose(hconf, success);
}
-
-
-
- private void moveUpFiles(String specPath, Configuration hconf, Log log) throws IOException, HiveException {
+
+ private void moveUpFiles(String specPath, Configuration hconf, Log log)
+ throws IOException, HiveException {
FileSystem fs = (new Path(specPath)).getFileSystem(hconf);
Path finalPath = new Path(specPath);
-
- if(fs.exists(finalPath)) {
+
+ if (fs.exists(finalPath)) {
FileStatus[] taskOutputDirs = fs.listStatus(finalPath);
- if(taskOutputDirs != null ) {
+ if (taskOutputDirs != null) {
for (FileStatus dir : taskOutputDirs) {
Utilities.renameOrMoveFiles(fs, dir.getPath(), finalPath);
fs.delete(dir.getPath(), true);
@@ -166,27 +182,26 @@
}
}
}
-
+
/**
* Forward a record of join results.
- *
+ *
* @throws HiveException
*/
+ @Override
public void endGroup() throws HiveException {
- //if this is a skew key, we need to handle it in a separate map reduce job.
- if(this.handleSkewJoin && skewJoinKeyContext.currBigKeyTag >=0) {
+ // if this is a skew key, we need to handle it in a separate map reduce job.
+ if (handleSkewJoin && skewJoinKeyContext.currBigKeyTag >= 0) {
try {
skewJoinKeyContext.endGroup();
} catch (IOException e) {
- LOG.error(e.getMessage(),e);
+ LOG.error(e.getMessage(), e);
throw new HiveException(e);
}
return;
- }
- else {
+ } else {
checkAndGenObject();
}
}
-
-}
+}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java Thu Jan 21 10:37:58 2010
@@ -18,13 +18,9 @@
package org.apache.hadoop.hive.ql.exec;
-import java.util.HashMap;
-import java.util.List;
import java.util.ArrayList;
-import java.util.Map;
+import java.util.List;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.lateralViewJoinDesc;
@@ -33,39 +29,29 @@
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-
/**
* The lateral view join operator is used to implement the lateral view
- * functionality. This operator was implemented with the following
- * operator DAG in mind. For a query such as
+ * functionality. This operator was implemented with the following operator DAG
+ * in mind. For a query such as
*
- * SELECT pageid, adid.* FROM example_table LATERAL VIEW explode(adid_list) AS adid
+ * SELECT pageid, adid.* FROM example_table LATERAL VIEW explode(adid_list) AS
+ * adid
*
* The top of the operator tree will look similar to
*
- * [Table Scan]
- * / \
- * [Select](*) [Select](adid_list)
- * | |
- * | [UDTF] (explode)
- * \ /
- * [Lateral View Join]
- * |
- * |
- * [Select] (pageid, adid.*)
- * |
- * ....
- *
- * Rows from the table scan operator are first sent to two select operators.
- * The select operator on the left picks all the columns while the select
- * operator on the right picks only the columns needed by the UDTF.
+ * [Table Scan] / \ [Select](*) [Select](adid_list) | | | [UDTF] (explode) \ /
+ * [Lateral View Join] | | [Select] (pageid, adid.*) | ....
*
- * The output of select in the left branch and output of the UDTF in the right
+ * Rows from the table scan operator are first sent to two select operators. The
+ * select operator on the left picks all the columns while the select operator
+ * on the right picks only the columns needed by the UDTF.
+ *
+ * The output of select in the left branch and output of the UDTF in the right
* branch are then sent to the lateral view join (LVJ). In most cases, the UDTF
* will generate > 1 row for every row received from the TS, while the left
- * select operator will generate only one. For each row output from the TS,
- * the LVJ outputs all possible rows that can be created by joining the row from
- * the left select and one of the rows output from the UDTF.
+ * select operator will generate only one. For each row output from the TS, the
+ * LVJ outputs all possible rows that can be created by joining the row from the
+ * left select and one of the rows output from the UDTF.
*
* Additional lateral views can be supported by adding a similar DAG after the
* previous LVJ operator.
@@ -78,41 +64,41 @@
// The expected tags from the parent operators. See processOp() before
// changing the tags.
static final int SELECT_TAG = 0;
- static final int UDTF_TAG = 1;
-
+ static final int UDTF_TAG = 1;
+
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
-
+
ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
ArrayList<String> fieldNames = conf.getOutputInternalColNames();
// The output of the lateral view join will be the columns from the select
// parent, followed by the column from the UDTF parent
- StructObjectInspector soi =
- (StructObjectInspector) inputObjInspectors[SELECT_TAG];
+ StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[SELECT_TAG];
List<? extends StructField> sfs = soi.getAllStructFieldRefs();
for (StructField sf : sfs) {
ois.add(sf.getFieldObjectInspector());
}
-
+
soi = (StructObjectInspector) inputObjInspectors[UDTF_TAG];
sfs = soi.getAllStructFieldRefs();
for (StructField sf : sfs) {
ois.add(sf.getFieldObjectInspector());
}
- outputObjInspector =
- ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, ois);
+ outputObjInspector = ObjectInspectorFactory
+ .getStandardStructObjectInspector(fieldNames, ois);
// Initialize the rest of the operator DAG
super.initializeOp(hconf);
}
-
+
// acc is short for accumulator. It's used to build the row before forwarding
ArrayList<Object> acc = new ArrayList<Object>();
// selectObjs hold the row from the select op, until receiving a row from
// the udtf op
ArrayList<Object> selectObjs = new ArrayList<Object>();
+
/**
* An important assumption for processOp() is that for a given row from the
* TS, the LVJ will first get the row from the left select operator, followed
@@ -120,7 +106,7 @@
*/
@Override
public void processOp(Object row, int tag) throws HiveException {
- StructObjectInspector soi = (StructObjectInspector)inputObjInspectors[tag];
+ StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[tag];
if (tag == SELECT_TAG) {
selectObjs.clear();
selectObjs.addAll(soi.getStructFieldsDataAsList(row));
@@ -132,7 +118,7 @@
} else {
throw new HiveException("Invalid tag");
}
-
+
}
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java Thu Jan 21 10:37:58 2010
@@ -26,36 +26,39 @@
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
/**
- * Limit operator implementation
- * Limits the number of rows to be passed on.
+ * Limit operator implementation Limits the number of rows to be passed on.
**/
public class LimitOperator extends Operator<limitDesc> implements Serializable {
private static final long serialVersionUID = 1L;
-
+
transient protected int limit;
transient protected int currCount;
+ @Override
protected void initializeOp(Configuration hconf) throws HiveException {
super.initializeOp(hconf);
limit = conf.getLimit();
currCount = 0;
}
+ @Override
public void processOp(Object row, int tag) throws HiveException {
if (currCount < limit) {
forward(row, inputObjInspectors[tag]);
currCount++;
- }
- else
+ } else {
setDone(true);
+ }
}
-
+
+ @Override
public String getName() {
return "LIM";
}
-
+
+ @Override
public int getType() {
return OperatorType.LIMIT;
}
-
+
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Thu Jan 21 10:37:58 2010
@@ -29,6 +29,10 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectKey;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectValue;
+import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.mapJoinDesc;
import org.apache.hadoop.hive.ql.plan.tableDesc;
@@ -42,17 +46,15 @@
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectKey;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectValue;
-import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
-import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
/**
* Map side Join operator implementation.
*/
-public class MapJoinOperator extends CommonJoinOperator<mapJoinDesc> implements Serializable {
+public class MapJoinOperator extends CommonJoinOperator<mapJoinDesc> implements
+ Serializable {
private static final long serialVersionUID = 1L;
- static final private Log LOG = LogFactory.getLog(MapJoinOperator.class.getName());
+ static final private Log LOG = LogFactory.getLog(MapJoinOperator.class
+ .getName());
/**
* The expressions for join inputs's join keys.
@@ -67,21 +69,23 @@
*/
transient protected Map<Byte, List<ObjectInspector>> joinKeysStandardObjectInspectors;
- transient private int posBigTable; // one of the tables that is not in memory
- transient int mapJoinRowsKey; // rows for a given key
+ transient private int posBigTable; // one of the tables that is not in memory
+ transient int mapJoinRowsKey; // rows for a given key
transient protected Map<Byte, HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue>> mapJoinTables;
-
+
transient protected RowContainer<ArrayList<Object>> emptyList = null;
-
- transient static final private String[] fatalErrMsg = {
- null, // counter value 0 means no error
- "Mapside join size exceeds hive.mapjoin.maxsize. Please increase that or remove the mapjoin hint." // counter value 1
+
+ transient static final private String[] fatalErrMsg = {
+ null, // counter value 0 means no error
+ "Mapside join size exceeds hive.mapjoin.maxsize. Please increase that or remove the mapjoin hint." // counter
+ // value
+ // 1
};
-
+
public static class MapJoinObjectCtx {
ObjectInspector standardOI;
- SerDe serde;
+ SerDe serde;
tableDesc tblDesc;
Configuration conf;
@@ -89,7 +93,8 @@
* @param standardOI
* @param serde
*/
- public MapJoinObjectCtx(ObjectInspector standardOI, SerDe serde, tableDesc tblDesc, Configuration conf) {
+ public MapJoinObjectCtx(ObjectInspector standardOI, SerDe serde,
+ tableDesc tblDesc, Configuration conf) {
this.standardOI = standardOI;
this.serde = serde;
this.tblDesc = tblDesc;
@@ -129,12 +134,12 @@
transient boolean firstRow;
- transient int metadataKeyTag;
+ transient int metadataKeyTag;
transient int[] metadataValueTag;
transient List<File> hTables;
- transient int numMapRowsRead;
- transient int heartbeatInterval;
- transient int maxMapJoinSize;
+ transient int numMapRowsRead;
+ transient int heartbeatInterval;
+ transient int maxMapJoinSize;
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
@@ -142,44 +147,53 @@
numMapRowsRead = 0;
firstRow = true;
- heartbeatInterval = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVESENDHEARTBEAT);
- maxMapJoinSize = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEMAXMAPJOINSIZE);
-
- joinKeys = new HashMap<Byte, List<ExprNodeEvaluator>>();
-
+ heartbeatInterval = HiveConf.getIntVar(hconf,
+ HiveConf.ConfVars.HIVESENDHEARTBEAT);
+ maxMapJoinSize = HiveConf.getIntVar(hconf,
+ HiveConf.ConfVars.HIVEMAXMAPJOINSIZE);
+
+ joinKeys = new HashMap<Byte, List<ExprNodeEvaluator>>();
+
populateJoinKeyValue(joinKeys, conf.getKeys());
- joinKeysObjectInspectors = getObjectInspectorsFromEvaluators(joinKeys, inputObjInspectors);
+ joinKeysObjectInspectors = getObjectInspectorsFromEvaluators(joinKeys,
+ inputObjInspectors);
joinKeysStandardObjectInspectors = getStandardObjectInspectors(joinKeysObjectInspectors);
-
+
// all other tables are small, and are cached in the hash table
posBigTable = conf.getPosBigTable();
-
+
metadataValueTag = new int[numAliases];
- for (int pos = 0; pos < numAliases; pos++)
+ for (int pos = 0; pos < numAliases; pos++) {
metadataValueTag[pos] = -1;
-
+ }
+
mapJoinTables = new HashMap<Byte, HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue>>();
hTables = new ArrayList<File>();
-
+
// initialize the hash tables for other tables
for (int pos = 0; pos < numAliases; pos++) {
- if (pos == posBigTable)
+ if (pos == posBigTable) {
continue;
-
- int cacheSize = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEMAPJOINCACHEROWS);
- HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue> hashTable =
- new HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue>(cacheSize);
-
- mapJoinTables.put(Byte.valueOf((byte)pos), hashTable);
+ }
+
+ int cacheSize = HiveConf.getIntVar(hconf,
+ HiveConf.ConfVars.HIVEMAPJOINCACHEROWS);
+ HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue> hashTable = new HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue>(
+ cacheSize);
+
+ mapJoinTables.put(Byte.valueOf((byte) pos), hashTable);
}
-
+
emptyList = new RowContainer<ArrayList<Object>>(1, hconf);
- RowContainer bigPosRC = getRowContainer(hconf, (byte)posBigTable, order[posBigTable], joinCacheSize);
- storage.put((byte)posBigTable, bigPosRC);
-
- mapJoinRowsKey = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEMAPJOINROWSIZE);
-
- List<? extends StructField> structFields = ((StructObjectInspector)outputObjInspector).getAllStructFieldRefs();
+ RowContainer bigPosRC = getRowContainer(hconf, (byte) posBigTable,
+ order[posBigTable], joinCacheSize);
+ storage.put((byte) posBigTable, bigPosRC);
+
+ mapJoinRowsKey = HiveConf.getIntVar(hconf,
+ HiveConf.ConfVars.HIVEMAPJOINROWSIZE);
+
+ List<? extends StructField> structFields = ((StructObjectInspector) outputObjInspector)
+ .getAllStructFieldRefs();
if (conf.getOutputColumnNames().size() < structFields.size()) {
List<ObjectInspector> structFieldObjectInspectors = new ArrayList<ObjectInspector>();
for (Byte alias : order) {
@@ -188,34 +202,37 @@
for (int i = 0; i < sz; i++) {
int pos = retained.get(i);
structFieldObjectInspectors.add(structFields.get(pos)
- .getFieldObjectInspector());
+ .getFieldObjectInspector());
}
}
outputObjInspector = ObjectInspectorFactory
- .getStandardStructObjectInspector(conf.getOutputColumnNames(),
- structFieldObjectInspectors);
+ .getStandardStructObjectInspector(conf.getOutputColumnNames(),
+ structFieldObjectInspectors);
}
initializeChildren(hconf);
}
-
+
@Override
protected void fatalErrorMessage(StringBuffer errMsg, long counterCode) {
- errMsg.append("Operator " + getOperatorId() + " (id=" + id + "): " +
- fatalErrMsg[(int)counterCode]);
+ errMsg.append("Operator " + getOperatorId() + " (id=" + id + "): "
+ + fatalErrMsg[(int) counterCode]);
}
-
+
@Override
public void processOp(Object row, int tag) throws HiveException {
try {
// get alias
- alias = (byte)tag;
-
- if ((lastAlias == null) || (!lastAlias.equals(alias)))
+ alias = (byte) tag;
+
+ if ((lastAlias == null) || (!lastAlias.equals(alias))) {
nextSz = joinEmitInterval;
-
+ }
+
// compute keys and values as StandardObjects
- ArrayList<Object> key = computeValues(row, joinKeys.get(alias), joinKeysObjectInspectors.get(alias));
- ArrayList<Object> value = computeValues(row, joinValues.get(alias), joinValuesObjectInspectors.get(alias));
+ ArrayList<Object> key = computeValues(row, joinKeys.get(alias),
+ joinKeysObjectInspectors.get(alias));
+ ArrayList<Object> value = computeValues(row, joinValues.get(alias),
+ joinValuesObjectInspectors.get(alias));
// does this source need to be stored in the hash map
if (tag != posBigTable) {
@@ -223,79 +240,90 @@
metadataKeyTag = nextVal++;
tableDesc keyTableDesc = conf.getKeyTblDesc();
- SerDe keySerializer = (SerDe)ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(), null);
+ SerDe keySerializer = (SerDe) ReflectionUtils.newInstance(
+ keyTableDesc.getDeserializerClass(), null);
keySerializer.initialize(null, keyTableDesc.getProperties());
mapMetadata.put(Integer.valueOf(metadataKeyTag),
new MapJoinObjectCtx(
- ObjectInspectorUtils.getStandardObjectInspector(keySerializer.getObjectInspector(),
- ObjectInspectorCopyOption.WRITABLE),
- keySerializer, keyTableDesc, hconf));
+ ObjectInspectorUtils
+ .getStandardObjectInspector(keySerializer
+ .getObjectInspector(),
+ ObjectInspectorCopyOption.WRITABLE), keySerializer,
+ keyTableDesc, hconf));
firstRow = false;
}
// Send some status periodically
numMapRowsRead++;
- if (((numMapRowsRead % heartbeatInterval) == 0) && (reporter != null))
+ if (((numMapRowsRead % heartbeatInterval) == 0) && (reporter != null)) {
reporter.progress();
-
- if ( (numMapRowsRead > maxMapJoinSize) && (reporter != null) && (counterNameToEnum != null)) {
+ }
+
+ if ((numMapRowsRead > maxMapJoinSize) && (reporter != null)
+ && (counterNameToEnum != null)) {
// update counter
- LOG.warn("Too many rows in map join tables. Fatal error counter will be incremented!!");
+ LOG
+ .warn("Too many rows in map join tables. Fatal error counter will be incremented!!");
incrCounter(fatalErrorCntr, 1);
fatalError = true;
return;
}
-
- HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue> hashTable = mapJoinTables.get(alias);
+ HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue> hashTable = mapJoinTables
+ .get(alias);
MapJoinObjectKey keyMap = new MapJoinObjectKey(metadataKeyTag, key);
MapJoinObjectValue o = hashTable.get(keyMap);
RowContainer res = null;
boolean needNewKey = true;
if (o == null) {
- res = getRowContainer(this.hconf, (byte)tag, order[tag], joinCacheSize);
- res.add(value);
+ res = getRowContainer(hconf, (byte) tag, order[tag], joinCacheSize);
+ res.add(value);
} else {
res = o.getObj();
res.add(value);
- // If key already exists, HashMapWrapper.get() guarantees it is already in main memory HashMap
- // cache. So just replacing the object value should update the HashMapWrapper. This will save
- // the cost of constructing the new key/object and deleting old one and inserting the new one.
- if ( hashTable.cacheSize() > 0) {
+ // If key already exists, HashMapWrapper.get() guarantees it is
+ // already in main memory HashMap
+ // cache. So just replacing the object value should update the
+ // HashMapWrapper. This will save
+ // the cost of constructing the new key/object and deleting old one
+ // and inserting the new one.
+ if (hashTable.cacheSize() > 0) {
o.setObj(res);
needNewKey = false;
- }
+ }
}
-
-
+
if (metadataValueTag[tag] == -1) {
metadataValueTag[tag] = nextVal++;
-
+
tableDesc valueTableDesc = conf.getValueTblDescs().get(tag);
- SerDe valueSerDe = (SerDe)ReflectionUtils.newInstance(valueTableDesc.getDeserializerClass(), null);
+ SerDe valueSerDe = (SerDe) ReflectionUtils.newInstance(valueTableDesc
+ .getDeserializerClass(), null);
valueSerDe.initialize(null, valueTableDesc.getProperties());
-
+
mapMetadata.put(Integer.valueOf(metadataValueTag[tag]),
- new MapJoinObjectCtx(
- ObjectInspectorUtils.getStandardObjectInspector(valueSerDe.getObjectInspector(),
- ObjectInspectorCopyOption.WRITABLE),
- valueSerDe, valueTableDesc, hconf));
+ new MapJoinObjectCtx(ObjectInspectorUtils
+ .getStandardObjectInspector(valueSerDe.getObjectInspector(),
+ ObjectInspectorCopyOption.WRITABLE), valueSerDe,
+ valueTableDesc, hconf));
}
-
+
// Construct externalizable objects for key and value
- if ( needNewKey ) {
+ if (needNewKey) {
MapJoinObjectKey keyObj = new MapJoinObjectKey(metadataKeyTag, key);
- MapJoinObjectValue valueObj = new MapJoinObjectValue(metadataValueTag[tag], res);
- valueObj.setConf(this.hconf);
+ MapJoinObjectValue valueObj = new MapJoinObjectValue(
+ metadataValueTag[tag], res);
+ valueObj.setConf(hconf);
// This may potentially increase the size of the hashmap on the mapper
- if (res.size() > mapJoinRowsKey) {
- if ( res.size() % 100 == 0 ) {
- LOG.warn("Number of values for a given key " + keyObj + " are " + res.size());
+ if (res.size() > mapJoinRowsKey) {
+ if (res.size() % 100 == 0) {
+ LOG.warn("Number of values for a given key " + keyObj + " are "
+ + res.size());
LOG.warn("used memory " + Runtime.getRuntime().totalMemory());
- }
+ }
}
hashTable.put(keyObj, valueObj);
}
@@ -308,15 +336,15 @@
for (Byte pos : order) {
if (pos.intValue() != tag) {
MapJoinObjectKey keyMap = new MapJoinObjectKey(metadataKeyTag, key);
- MapJoinObjectValue o = (MapJoinObjectValue)mapJoinTables.get(pos).get(keyMap);
+ MapJoinObjectValue o = mapJoinTables.get(pos).get(keyMap);
if (o == null) {
- if(noOuterJoin)
+ if (noOuterJoin) {
storage.put(pos, emptyList);
- else
+ } else {
storage.put(pos, dummyObjVectors[pos.intValue()]);
- }
- else {
+ }
+ } else {
storage.put(pos, o.getObj());
}
}
@@ -328,30 +356,37 @@
// done with the row
storage.get(alias).clear();
- for (Byte pos : order)
- if (pos.intValue() != tag)
+ for (Byte pos : order) {
+ if (pos.intValue() != tag) {
storage.put(pos, null);
+ }
+ }
} catch (SerDeException e) {
e.printStackTrace();
throw new HiveException(e);
}
}
-
+
+ @Override
public void closeOp(boolean abort) throws HiveException {
- for (HashMapWrapper hashTable: mapJoinTables.values()) {
+ for (HashMapWrapper hashTable : mapJoinTables.values()) {
hashTable.close();
}
super.closeOp(abort);
}
+
/**
* Implements the getName function for the Node Interface.
+ *
* @return the name of the operator
*/
+ @Override
public String getName() {
return "MAPJOIN";
}
+ @Override
public int getType() {
return OperatorType.MAPJOIN;
}