You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2009/06/19 02:41:30 UTC
svn commit: r786343 - in /hadoop/hive/trunk: ./
ql/src/java/org/apache/hadoop/hive/ql/exec/
serde/src/java/org/apache/hadoop/hive/serde2/lazy/
serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/
Author: namit
Date: Fri Jun 19 00:41:29 2009
New Revision: 786343
URL: http://svn.apache.org/viewvc?rev=786343&view=rev
Log:
HIVE-529. Some cleanup for join operator
(Zheng Shao via namit)
Modified:
hadoop/hive/trunk/CHANGES.txt
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinObjectKey.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinObjectValue.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUtils.java
hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=786343&r1=786342&r2=786343&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Fri Jun 19 00:41:29 2009
@@ -241,6 +241,9 @@
HIVE-561. Make hash aggregation threshold configurable
(Zheng Shao via namit)
+ HIVE-529. Some cleanup for join operator
+ (Zheng Shao via namit)
+
Release 0.3.1 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java?rev=786343&r1=786342&r2=786343&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java Fri Jun 19 00:41:29 2009
@@ -27,6 +27,7 @@
import java.util.Set;
import java.util.Stack;
import java.util.Vector;
+import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -39,14 +40,14 @@
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
import org.apache.hadoop.mapred.Reporter;
/**
* Join operator implementation.
*/
-public class CommonJoinOperator<T extends joinDesc> extends Operator<T> implements Serializable {
+public abstract class CommonJoinOperator<T extends joinDesc> extends Operator<T> implements Serializable {
private static final long serialVersionUID = 1L;
static final protected Log LOG = LogFactory.getLog(CommonJoinOperator.class.getName());
@@ -76,9 +77,19 @@
}
}
- transient protected int numValues; // number of aliases
- transient protected Map<Byte, List<ExprNodeEvaluator>> joinValues;
- transient protected Map<Byte, List<ObjectInspector>> joinValuesObjectInspectors;
+ transient protected int numAliases; // number of aliases
+ /**
+ * The expressions for join outputs.
+ */
+ transient protected Map<Byte, List<ExprNodeEvaluator>> joinValues;
+ /**
+ * The ObjectInspectors for the join inputs.
+ */
+ transient protected Map<Byte, List<ObjectInspector>> joinValuesObjectInspectors;
+ /**
+ * The standard ObjectInspectors for the join inputs.
+ */
+ transient protected Map<Byte, List<ObjectInspector>> joinValuesStandardObjectInspectors;
transient static protected Byte[] order; // order in which the results should be output
transient protected joinCond[] condn;
@@ -89,7 +100,7 @@
transient private Vector<ArrayList<Object>>[] dummyObjVectors;
transient private Stack<Iterator<ArrayList<Object>>> iterators;
transient protected int totalSz; // total size of the composite object
- transient ObjectInspector joinOutputObjectInspector;
+ transient ObjectInspector joinOutputObjectInspector; // The OI for the output row
// keys are the column names. basically this maps the position of the column in
// the output of the CommonJoinOperator to the input columnInfo.
@@ -125,20 +136,65 @@
return total;
}
+ protected static HashMap<Byte, List<ObjectInspector>> getObjectInspectorsFromEvaluators(
+ Map<Byte, List<ExprNodeEvaluator>> exprEntries, ObjectInspector[] inputObjInspector)
+ throws HiveException {
+ HashMap<Byte, List<ObjectInspector>> result = new HashMap<Byte, List<ObjectInspector>>();
+ for(Entry<Byte, List<ExprNodeEvaluator>> exprEntry : exprEntries.entrySet()) {
+ Byte alias = exprEntry.getKey();
+ List<ExprNodeEvaluator> exprList = exprEntry.getValue();
+ ArrayList<ObjectInspector> fieldOIList = new ArrayList<ObjectInspector>();
+ for (int i=0; i<exprList.size(); i++) {
+ fieldOIList.add(exprList.get(i).initialize(inputObjInspector[alias]));
+ }
+ result.put(alias, fieldOIList);
+ }
+ return result;
+ }
+
+ protected static HashMap<Byte, List<ObjectInspector>> getStandardObjectInspectors(
+ Map<Byte, List<ObjectInspector>> aliasToObjectInspectors) {
+ HashMap<Byte, List<ObjectInspector>> result = new HashMap<Byte, List<ObjectInspector>>();
+ for(Entry<Byte, List<ObjectInspector>> oiEntry: aliasToObjectInspectors.entrySet()) {
+ Byte alias = oiEntry.getKey();
+ List<ObjectInspector> oiList = oiEntry.getValue();
+ ArrayList<ObjectInspector> fieldOIList = new ArrayList<ObjectInspector>(oiList.size());
+ for (int i=0; i<oiList.size(); i++) {
+ fieldOIList.add(ObjectInspectorUtils.getStandardObjectInspector(oiList.get(i),
+ ObjectInspectorCopyOption.WRITABLE));
+ }
+ result.put(alias, fieldOIList);
+ }
+ return result;
+
+ }
+
+ protected static <T extends joinDesc> ObjectInspector getJoinOutputObjectInspector(Byte[] order,
+ Map<Byte, List<ObjectInspector>> aliasToObjectInspectors, T conf) {
+ ArrayList<ObjectInspector> structFieldObjectInspectors = new ArrayList<ObjectInspector>();
+ for (Byte alias : order) {
+ List<ObjectInspector> oiList = aliasToObjectInspectors.get(alias);
+ structFieldObjectInspectors.addAll(oiList);
+ }
+
+ StructObjectInspector joinOutputObjectInspector = ObjectInspectorFactory
+ .getStandardStructObjectInspector(conf.getOutputColumnNames(), structFieldObjectInspectors);
+ return joinOutputObjectInspector;
+ }
+
public void initializeOp(Configuration hconf, Reporter reporter, ObjectInspector[] inputObjInspector) throws HiveException {
LOG.info("COMMONJOIN " + ((StructObjectInspector)inputObjInspector[0]).getTypeName());
totalSz = 0;
// Map that contains the rows for each alias
storage = new HashMap<Byte, Vector<ArrayList<Object>>>();
- numValues = conf.getExprs().size();
+ numAliases = conf.getExprs().size();
joinValues = new HashMap<Byte, List<ExprNodeEvaluator>>();
- joinValuesObjectInspectors = new HashMap<Byte, List<ObjectInspector>>();
if (order == null) {
- order = new Byte[numValues];
- for (int i = 0; i < numValues; i++)
+ order = new Byte[numAliases];
+ for (int i = 0; i < numAliases; i++)
order[i] = (byte) i;
}
condn = conf.getConds();
@@ -146,8 +202,11 @@
totalSz = populateJoinKeyValue(joinValues, conf.getExprs());
- dummyObj = new Object[numValues];
- dummyObjVectors = new Vector[numValues];
+ joinValuesObjectInspectors = getObjectInspectorsFromEvaluators(joinValues, inputObjInspector);
+ joinValuesStandardObjectInspectors = getStandardObjectInspectors(joinValuesObjectInspectors);
+
+ dummyObj = new Object[numAliases];
+ dummyObjVectors = new Vector[numAliases];
int pos = 0;
for (Byte alias : order) {
@@ -168,6 +227,9 @@
joinEmitInterval = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEJOINEMITINTERVAL);
forwardCache = new Object[totalSz];
+
+ joinOutputObjectInspector = getJoinOutputObjectInspector(order, joinValuesStandardObjectInspectors, conf);
+ LOG.info("JOIN " + ((StructObjectInspector)joinOutputObjectInspector).getTypeName() + " totalsz = " + totalSz);
}
public void startGroup() throws HiveException {
@@ -177,7 +239,7 @@
storage.put(alias, new Vector<ArrayList<Object>>());
}
- private int getNextSize(int sz) {
+ protected int getNextSize(int sz) {
// A very simple counter to keep track of join entries for a key
if (sz >= 100000)
return sz + 100000;
@@ -187,83 +249,31 @@
transient protected Byte alias;
- protected ArrayList<Object> computeValues(Object row, ObjectInspector rowInspector,
- List<ExprNodeEvaluator> valueFields, Map<Byte, List<ObjectInspector>> joinExprsObjectInspectors) throws HiveException {
-
- // Get the valueFields Object Inspectors
- List<ObjectInspector> valueFieldOI = joinExprsObjectInspectors.get(alias);
- if (valueFieldOI == null) {
- // Initialize the ExprEvaluator if necessary
- valueFieldOI = new ArrayList<ObjectInspector>();
- for (int i=0; i<valueFields.size(); i++) {
- valueFieldOI.add(valueFields.get(i).initialize(rowInspector));
- }
- joinExprsObjectInspectors.put(alias, valueFieldOI);
- }
+ /**
+ * Return the value as a standard object.
+ * StandardObject can be inspected by a standard ObjectInspector.
+ */
+ protected static ArrayList<Object> computeValues(Object row,
+ List<ExprNodeEvaluator> valueFields, List<ObjectInspector> valueFieldsOI) throws HiveException {
// Compute the values
ArrayList<Object> nr = new ArrayList<Object>(valueFields.size());
for (int i=0; i<valueFields.size(); i++) {
nr.add(ObjectInspectorUtils.copyToStandardObject(
valueFields.get(i).evaluate(row),
- valueFieldOI.get(i)));
+ valueFieldsOI.get(i),
+ ObjectInspectorCopyOption.WRITABLE));
}
return nr;
}
- public void process(Object row, ObjectInspector rowInspector, int tag)
- throws HiveException {
- try {
- // get alias
- alias = (byte)tag;
-
- if ((lastAlias == null) || (!lastAlias.equals(alias)))
- nextSz = joinEmitInterval;
-
- ArrayList<Object> nr = computeValues(row, rowInspector, joinValues.get(alias), joinValuesObjectInspectors);
-
- // number of rows for the key in the given table
- int sz = storage.get(alias).size();
-
- // Are we consuming too much memory
- if (alias == numValues - 1) {
- if (sz == joinEmitInterval) {
- // The input is sorted by alias, so if we are already in the last join operand,
- // we can emit some results now.
- // Note this has to be done before adding the current row to the storage,
- // to preserve the correctness for outer joins.
- checkAndGenObject();
- storage.get(alias).clear();
- }
- } else {
- if (sz == nextSz) {
- // Output a warning if we reached at least 1000 rows for a join operand
- // We won't output a warning for the last join operand since the size
- // will never goes to joinEmitInterval.
- StructObjectInspector soi = (StructObjectInspector)rowInspector;
- StructField sf = soi.getStructFieldRef(Utilities.ReduceField.KEY.toString());
- Object keyObject = soi.getStructFieldData(row, sf);
- LOG.warn("table " + alias + " has " + sz + " rows for join key " + keyObject);
- nextSz = getNextSize(nextSz);
- }
- }
-
- // Add the value to the vector
- storage.get(alias).add(nr);
-
- } catch (Exception e) {
- e.printStackTrace();
- throw new HiveException(e);
- }
- }
-
transient Object[] forwardCache;
private void createForwardJoinObject(IntermediateObject intObj,
boolean[] nullsArr) throws HiveException {
int p = 0;
- for (int i = 0; i < numValues; i++) {
+ for (int i = 0; i < numAliases; i++) {
Byte alias = order[i];
int sz = joinValues.get(alias).size();
if (nullsArr[i]) {
@@ -500,7 +510,7 @@
private void genObject(Vector<boolean[]> inputNulls, int aliasNum,
IntermediateObject intObj, boolean firstRow) throws HiveException {
boolean childFirstRow = firstRow;
- if (aliasNum < numValues) {
+ if (aliasNum < numAliases) {
Iterator<ArrayList<Object>> aliasRes = storage.get(order[aliasNum])
.iterator();
iterators.push(aliasRes);
@@ -531,13 +541,13 @@
* @throws HiveException
*/
public void endGroup() throws HiveException {
- LOG.trace("Join Op: endGroup called: numValues=" + numValues);
+ LOG.trace("Join Op: endGroup called: numValues=" + numAliases);
checkAndGenObject();
}
protected void checkAndGenObject() throws HiveException {
// does any result need to be emitted
- for (int i = 0; i < numValues; i++) {
+ for (int i = 0; i < numAliases; i++) {
Byte alias = order[i];
if (storage.get(alias).iterator().hasNext() == false) {
if (noOuterJoin) {
@@ -550,7 +560,7 @@
}
LOG.trace("calling genObject");
- genObject(null, 0, new IntermediateObject(new ArrayList[numValues], 0), true);
+ genObject(null, 0, new IntermediateObject(new ArrayList[numAliases], 0), true);
LOG.trace("called genObject");
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java?rev=786343&r1=786342&r2=786343&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java Fri Jun 19 00:41:29 2009
@@ -28,6 +28,7 @@
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
@@ -48,22 +49,55 @@
ArrayList<ObjectInspector> structFieldObjectInspectors = new ArrayList<ObjectInspector>(totalSz);
- for (Byte alias : order) {
- int sz = conf.getExprs().get(alias).size();
- StructObjectInspector fldObjIns = (StructObjectInspector)((StructObjectInspector)inputObjInspector[alias.intValue()]).getStructFieldRef("VALUE").getFieldObjectInspector();
- for (int i = 0; i < sz; i++) {
- structFieldObjectInspectors.add(
- ObjectInspectorUtils.getStandardObjectInspector(
- fldObjIns.getAllStructFieldRefs().get(i).getFieldObjectInspector(),
- ObjectInspectorCopyOption.KEEP));
+ initializeChildren(hconf, reporter, new ObjectInspector[]{joinOutputObjectInspector});
+ }
+
+ public void process(Object row, ObjectInspector rowInspector, int tag)
+ throws HiveException {
+ try {
+ // get alias
+ alias = (byte)tag;
+
+ if ((lastAlias == null) || (!lastAlias.equals(alias)))
+ nextSz = joinEmitInterval;
+
+ ArrayList<Object> nr = computeValues(row, joinValues.get(alias), joinValuesObjectInspectors.get(alias));
+
+ // number of rows for the key in the given table
+ int sz = storage.get(alias).size();
+
+ // Are we consuming too much memory
+ if (alias == numAliases - 1) {
+ if (sz == joinEmitInterval) {
+ // The input is sorted by alias, so if we are already in the last join operand,
+ // we can emit some results now.
+ // Note this has to be done before adding the current row to the storage,
+ // to preserve the correctness for outer joins.
+ checkAndGenObject();
+ storage.get(alias).clear();
+ }
+ } else {
+ if (sz == nextSz) {
+ // Output a warning if we reached at least 1000 rows for a join operand
+ // We won't output a warning for the last join operand since the size
+ // will never goes to joinEmitInterval.
+ StructObjectInspector soi = (StructObjectInspector)rowInspector;
+ StructField sf = soi.getStructFieldRef(Utilities.ReduceField.KEY.toString());
+ Object keyObject = soi.getStructFieldData(row, sf);
+ LOG.warn("table " + alias + " has " + sz + " rows for join key " + keyObject);
+ nextSz = getNextSize(nextSz);
+ }
}
- }
- joinOutputObjectInspector = ObjectInspectorFactory
- .getStandardStructObjectInspector(conf.getOutputColumnNames(), structFieldObjectInspectors);
- LOG.info("JOIN " + ((StructObjectInspector)joinOutputObjectInspector).getTypeName() + " totalsz = " + totalSz);
-
- initializeChildren(hconf, reporter, new ObjectInspector[]{joinOutputObjectInspector});
+ // Add the value to the vector
+ storage.get(alias).add(nr);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new HiveException(e);
+ }
}
+
+
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinObjectKey.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinObjectKey.java?rev=786343&r1=786342&r2=786343&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinObjectKey.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinObjectKey.java Fri Jun 19 00:41:29 2009
@@ -25,6 +25,9 @@
import java.util.ArrayList;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator.MapJoinObjectCtx;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
@@ -79,9 +82,14 @@
// get the tableDesc from the map stored in the mapjoin operator
MapJoinObjectCtx ctx = MapJoinOperator.getMapMetadata().get(Integer.valueOf(metadataTag));
val.readFields(in);
- obj = (ArrayList<Object>)ctx.getDeserializer().deserialize(val);
- } catch (Exception e) {
- throw new IOException(e.getMessage());
+ obj =
+ (ArrayList<Object>)
+ ObjectInspectorUtils.copyToStandardObject(
+ ctx.getSerDe().deserialize(val),
+ ctx.getSerDe().getObjectInspector(),
+ ObjectInspectorCopyOption.WRITABLE);
+ } catch (SerDeException e) {
+ throw new IOException(e);
}
}
@@ -94,11 +102,11 @@
MapJoinObjectCtx ctx = MapJoinOperator.getMapMetadata().get(Integer.valueOf(metadataTag));
// Different processing for key and value
- Writable outVal = ctx.getSerializer().serialize(obj, ctx.getSerObjInspector());
+ Writable outVal = ctx.getSerDe().serialize(obj, ctx.getStandardOI());
outVal.write(out);
}
- catch (Exception e) {
- throw new IOException(e.getMessage());
+ catch (SerDeException e) {
+ throw new IOException(e);
}
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinObjectValue.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinObjectValue.java?rev=786343&r1=786342&r2=786343&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinObjectValue.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinObjectValue.java Fri Jun 19 00:41:29 2009
@@ -29,10 +29,13 @@
import org.apache.hadoop.hive.ql.exec.MapJoinOperator.MapJoinObjectCtx;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.lazy.LazyObject;
import org.apache.hadoop.hive.serde2.lazy.LazyStruct;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
/**
* Map Join Object used for both key and value
@@ -89,22 +92,18 @@
Vector<ArrayList<Object>> res = new Vector<ArrayList<Object>>();
for (int pos = 0; pos < sz; pos++) {
- ArrayList<Object> memObj = new ArrayList<Object>();
- val.readFields(in);
- StructObjectInspector objIns = (StructObjectInspector) ctx
- .getDeserObjInspector();
- LazyStruct lazyObj = (LazyStruct) (((LazyObject) ctx.getDeserializer()
- .deserialize(val)).getObject());
- List<? extends StructField> listFields = objIns.getAllStructFieldRefs();
- for (StructField fld : listFields) {
- memObj.add(objIns.getStructFieldData(lazyObj, fld));
- }
-
+ ArrayList<Object> memObj =
+ (ArrayList<Object>)
+ ObjectInspectorUtils.copyToStandardObject(
+ ctx.getSerDe().deserialize(val),
+ ctx.getSerDe().getObjectInspector(),
+ ObjectInspectorCopyOption.WRITABLE);
+
res.add(memObj);
}
obj = res;
- } catch (Exception e) {
- throw new IOException(e.getMessage());
+ } catch (SerDeException e) {
+ throw new IOException(e);
}
}
@@ -122,12 +121,12 @@
out.writeInt(v.size());
for (int pos = 0; pos < v.size(); pos++) {
- Writable outVal = ctx.getSerializer().serialize(v.get(pos), ctx.getSerObjInspector());
+ Writable outVal = ctx.getSerDe().serialize(v.get(pos), ctx.getStandardOI());
outVal.write(out);
}
}
- catch (Exception e) {
- throw new IOException(e.getMessage());
+ catch (SerDeException e) {
+ throw new IOException(e);
}
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=786343&r1=786342&r2=786343&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Fri Jun 19 00:41:29 2009
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.exec;
import java.io.File;
+import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
@@ -32,20 +33,14 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
-import org.apache.hadoop.hive.ql.plan.exprNodeDesc;
import org.apache.hadoop.hive.ql.plan.mapJoinDesc;
import org.apache.hadoop.hive.ql.plan.tableDesc;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.serde2.Deserializer;
-import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.hive.ql.util.jdbm.htree.HTree;
@@ -60,8 +55,18 @@
private static final long serialVersionUID = 1L;
static final private Log LOG = LogFactory.getLog(MapJoinOperator.class.getName());
+ /**
+ * The expressions for join inputs's join keys.
+ */
transient protected Map<Byte, List<ExprNodeEvaluator>> joinKeys;
+ /**
+ * The ObjectInspectors for the join inputs's join keys.
+ */
transient protected Map<Byte, List<ObjectInspector>> joinKeysObjectInspectors;
+ /**
+ * The standard ObjectInspectors for the join inputs's join keys.
+ */
+ transient protected Map<Byte, List<ObjectInspector>> joinKeysStandardObjectInspectors;
transient private int posBigTable; // one of the tables that is not in memory
transient int mapJoinRowsKey; // rows for a given key
@@ -69,51 +74,30 @@
transient protected Map<Byte, HTree> mapJoinTables;
public static class MapJoinObjectCtx {
- ObjectInspector serObjInspector;
- Serializer serializer;
- Deserializer deserializer;
- ObjectInspector deserObjInspector;
+ ObjectInspector standardOI;
+ SerDe serde;
/**
- * @param serObjInspector
- * @param serializer
- * @param deserializer
- * @param deserObjInspector
+ * @param standardOI
+ * @param serde
*/
- public MapJoinObjectCtx(ObjectInspector serObjInspector,
- Serializer serializer, ObjectInspector deserObjInspector, Deserializer deserializer) {
- this.serObjInspector = serObjInspector;
- this.serializer = serializer;
- this.deserializer = deserializer;
- this.deserObjInspector = deserObjInspector;
+ public MapJoinObjectCtx(ObjectInspector standardOI, SerDe serde) {
+ this.standardOI = standardOI;
+ this.serde = serde;
}
/**
- * @return the objInspector
+ * @return the standardOI
*/
- public ObjectInspector getSerObjInspector() {
- return serObjInspector;
+ public ObjectInspector getStandardOI() {
+ return standardOI;
}
/**
- * @return the objInspector
+ * @return the serde
*/
- public ObjectInspector getDeserObjInspector() {
- return deserObjInspector;
- }
-
- /**
- * @return the serializer
- */
- public Serializer getSerializer() {
- return serializer;
- }
-
- /**
- * @return the deserializer
- */
- public Deserializer getDeserializer() {
- return deserializer;
+ public SerDe getSerDe() {
+ return serde;
}
}
@@ -137,22 +121,23 @@
firstRow = true;
try {
joinKeys = new HashMap<Byte, List<ExprNodeEvaluator>>();
- joinKeysObjectInspectors = new HashMap<Byte, List<ObjectInspector>>();
populateJoinKeyValue(joinKeys, conf.getKeys());
-
+ joinKeysObjectInspectors = getObjectInspectorsFromEvaluators(joinKeys, inputObjInspector);
+ joinKeysStandardObjectInspectors = getStandardObjectInspectors(joinKeysObjectInspectors);
+
// all other tables are small, and are cached in the hash table
posBigTable = conf.getPosBigTable();
- metadataValueTag = new int[numValues];
- for (int pos = 0; pos < numValues; pos++)
+ metadataValueTag = new int[numAliases];
+ for (int pos = 0; pos < numAliases; pos++)
metadataValueTag[pos] = -1;
mapJoinTables = new HashMap<Byte, HTree>();
hTables = new ArrayList<File>();
// initialize the hash tables for other tables
- for (int pos = 0; pos < numValues; pos++) {
+ for (int pos = 0; pos < numAliases; pos++) {
if (pos == posBigTable)
continue;
@@ -175,30 +160,15 @@
RecordManager recman = RecordManagerFactory.createRecordManager(newDirName + "/" + pos, props );
HTree hashTable = HTree.createInstance(recman);
- mapJoinTables.put(new Byte((byte)pos), hashTable);
+ mapJoinTables.put(Byte.valueOf((byte)pos), hashTable);
}
storage.put((byte)posBigTable, new Vector<ArrayList<Object>>());
mapJoinRowsKey = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEMAPJOINROWSIZE);
- // initialize the join output object inspectors
- ArrayList<ObjectInspector> structFieldObjectInspectors = new ArrayList<ObjectInspector>(totalSz);
-
- for (Byte alias : order) {
- int sz = conf.getExprs().get(alias).size();
- List<? extends StructField> listFlds = ((StructObjectInspector)inputObjInspector[alias.intValue()]).getAllStructFieldRefs();
- assert listFlds.size() == sz;
- for (StructField fld: listFlds) {
- structFieldObjectInspectors.add(fld.getFieldObjectInspector());
- }
- }
-
- joinOutputObjectInspector = ObjectInspectorFactory
- .getStandardStructObjectInspector(conf.getOutputColumnNames(), structFieldObjectInspectors);
-
initializeChildren(hconf, reporter, new ObjectInspector[]{joinOutputObjectInspector});
- } catch (Exception e) {
+ } catch (IOException e) {
e.printStackTrace();
throw new HiveException(e);
}
@@ -214,21 +184,9 @@
if ((lastAlias == null) || (!lastAlias.equals(alias)))
nextSz = joinEmitInterval;
- // compute keys and values
- ArrayList<Object> key = computeValues(row, rowInspector, joinKeys.get(alias), joinKeysObjectInspectors);
- ArrayList<Object> value = computeValues(row, rowInspector, joinValues.get(alias), joinValuesObjectInspectors);
-
- // Until there is one representation for the keys, convert explicitly
- int keyPos = 0;
- // TODO: use keyPos instead
- for (Object keyElem : key) {
- PrimitiveObjectInspector poi = (PrimitiveObjectInspector)joinKeysObjectInspectors.get(alias).get(keyPos);
- if (!poi.isWritable()) {
- // convert o to writable
- key.set(keyPos, ObjectInspectorUtils.copyToStandardObject(key.get(keyPos), poi, ObjectInspectorCopyOption.WRITABLE));
- }
- keyPos++;
- }
+ // compute keys and values as StandardObjects
+ ArrayList<Object> key = computeValues(row, joinKeys.get(alias), joinKeysObjectInspectors.get(alias));
+ ArrayList<Object> value = computeValues(row, joinValues.get(alias), joinValuesObjectInspectors.get(alias));
// does this source need to be stored in the hash map
if (tag != posBigTable) {
@@ -236,25 +194,14 @@
metadataKeyTag = nextVal++;
tableDesc keyTableDesc = conf.getKeyTblDesc();
- Serializer keySerializer = (Serializer)keyTableDesc.getDeserializerClass().newInstance();
+ SerDe keySerializer = (SerDe)ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(), null);
keySerializer.initialize(null, keyTableDesc.getProperties());
- ExprNodeEvaluator[] keyEval = new ExprNodeEvaluator[conf.getKeys().get(new Byte((byte)tag)).size()];
- int i=0;
- for (exprNodeDesc e: conf.getKeys().get(new Byte((byte)tag))) {
- keyEval[i++] = ExprNodeEvaluatorFactory.get(e);
- }
-
- List<String> keyOutputCols = new ArrayList<String>();
- for (int k = 0; k < keyEval.length; k++) {
- keyOutputCols.add(HiveConf.getColumnInternalName(k));
- }
- ObjectInspector keyObjectInspector = initEvaluatorsAndReturnStruct(keyEval, keyOutputCols, rowInspector);
-
- Deserializer deserializer = (Deserializer)ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(), null);
- deserializer.initialize(null, keyTableDesc.getProperties());
-
- mapMetadata.put(new Integer(metadataKeyTag), new MapJoinObjectCtx(keyObjectInspector, keySerializer, deserializer.getObjectInspector(), deserializer));
+ mapMetadata.put(Integer.valueOf(metadataKeyTag),
+ new MapJoinObjectCtx(
+ ObjectInspectorUtils.getStandardObjectInspector(keySerializer.getObjectInspector(),
+ ObjectInspectorCopyOption.WRITABLE),
+ keySerializer));
firstRow = false;
}
@@ -277,26 +224,14 @@
metadataValueTag[tag] = nextVal++;
tableDesc valueTableDesc = conf.getValueTblDescs().get(tag);
- Serializer valueSerializer = (Serializer)valueTableDesc.getDeserializerClass().newInstance();
- valueSerializer.initialize(null, valueTableDesc.getProperties());
-
- ExprNodeEvaluator[] valueEval = new ExprNodeEvaluator[conf.getExprs().get(new Byte((byte)tag)).size()];
- int i=0;
- for (exprNodeDesc e: conf.getExprs().get(new Byte((byte)tag))) {
- valueEval[i++] = ExprNodeEvaluatorFactory.get(e);
- }
- List<String> tagOutputCols = new ArrayList<String>();
- int start = 0;
- for (int k = 0; k < tag; k++)
- start+=conf.getExprs().get(new Byte((byte)k)).size();
- for (int k=0;k<conf.getExprs().get(new Byte((byte)tag)).size();k++)
- tagOutputCols.add(HiveConf.getColumnInternalName(k));
- ObjectInspector valueObjectInspector = initEvaluatorsAndReturnStruct(valueEval, tagOutputCols, rowInspector);
+ SerDe valueSerDe = (SerDe)ReflectionUtils.newInstance(valueTableDesc.getDeserializerClass(), null);
+ valueSerDe.initialize(null, valueTableDesc.getProperties());
- Deserializer deserializer = (Deserializer)ReflectionUtils.newInstance(valueTableDesc.getDeserializerClass(), null);
- deserializer.initialize(null, valueTableDesc.getProperties());
-
- mapMetadata.put(new Integer((byte)metadataValueTag[tag]), new MapJoinObjectCtx(valueObjectInspector, valueSerializer, deserializer.getObjectInspector(), deserializer));
+ mapMetadata.put(Integer.valueOf(metadataValueTag[tag]),
+ new MapJoinObjectCtx(
+ ObjectInspectorUtils.getStandardObjectInspector(valueSerDe.getObjectInspector(),
+ ObjectInspectorCopyOption.WRITABLE),
+ valueSerDe));
}
// Construct externalizable objects for key and value
@@ -342,7 +277,10 @@
if (pos.intValue() != tag)
storage.put(pos, null);
- } catch (Exception e) {
+ } catch (SerDeException e) {
+ e.printStackTrace();
+ throw new HiveException(e);
+ } catch (IOException e) {
e.printStackTrace();
throw new HiveException(e);
}
@@ -353,7 +291,7 @@
* @return the name of the operator
*/
public String getName() {
- return new String("MAPJOIN");
+ return "MAPJOIN";
}
public void close(boolean abort) throws HiveException {
Modified: hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUtils.java?rev=786343&r1=786342&r2=786343&view=diff
==============================================================================
--- hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUtils.java (original)
+++ hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUtils.java Fri Jun 19 00:41:29 2009
@@ -23,7 +23,10 @@
import java.nio.charset.CharacterCodingException;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
@@ -91,14 +94,27 @@
}
}
+
+ static byte[] trueBytes = {(byte)'t', 'r', 'u', 'e'};
+ static byte[] falseBytes = {(byte)'f', 'a', 'l', 's', 'e'};
+
/**
- * Write out the text representation of a Primitive Object to a UTF8 byte stream.
+ * Write out the text representation of a Primitive Object to a UTF8 byte stream.
* @param out The UTF8 byte OutputStream
* @param o The primitive Object
*/
public static void writePrimitiveUTF8(OutputStream out, Object o, PrimitiveObjectInspector oi) throws IOException {
switch (oi.getPrimitiveCategory()) {
+ case BOOLEAN: {
+ boolean b = ((BooleanObjectInspector)oi).get(o);
+ if (b) {
+ out.write(trueBytes, 0, trueBytes.length);
+ } else {
+ out.write(falseBytes, 0, falseBytes.length);
+ }
+ break;
+ }
case BYTE: {
LazyInteger.writeUTF8(out, ((ByteObjectInspector)oi).get(o));
break;
@@ -115,27 +131,25 @@
LazyLong.writeUTF8(out, ((LongObjectInspector)oi).get(o));
break;
}
- // TODO: We should enable this piece of code, once we pass ObjectInspector in the Operator.init()
- // instead of Operator.forward(). Until then, JoinOperator will assume the output columns are
- // all strings but they may not be.
- /*
+ case FLOAT: {
+ float f = ((FloatObjectInspector)oi).get(o);
+ ByteBuffer b = Text.encode(String.valueOf(f));
+ out.write(b.array(), 0, b.limit());
+ break;
+ }
+ case DOUBLE: {
+ double d = ((DoubleObjectInspector)oi).get(o);
+ ByteBuffer b = Text.encode(String.valueOf(d));
+ out.write(b.array(), 0, b.limit());
+ break;
+ }
case STRING: {
Text t = ((StringObjectInspector)oi).getPrimitiveWritableObject(o);
out.write(t.getBytes(), 0, t.getLength());
break;
}
- */
default: {
- if (o instanceof Text) {
- // This piece of code improves the performance because we don't need to
- // convert Text to String then back to Text. We should rely on the code
- // block above when JoinOperator is fixed.
- Text t = (Text)o;
- out.write(t.getBytes(), 0, t.getLength());
- } else {
- ByteBuffer b = Text.encode(o.toString());
- out.write(b.array(), 0, b.limit());
- }
+ throw new RuntimeException("Hive internal error.");
}
}
}
Modified: hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java?rev=786343&r1=786342&r2=786343&view=diff
==============================================================================
--- hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java (original)
+++ hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java Fri Jun 19 00:41:29 2009
@@ -183,7 +183,7 @@
break;
}
case WRITABLE: {
- result = loi.getPrimitiveWritableObject(o);
+ result = loi.getPrimitiveWritableObject(loi.copyObject(o));
break;
}
}