You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2009/06/19 02:41:30 UTC

svn commit: r786343 - in /hadoop/hive/trunk: ./ ql/src/java/org/apache/hadoop/hive/ql/exec/ serde/src/java/org/apache/hadoop/hive/serde2/lazy/ serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/

Author: namit
Date: Fri Jun 19 00:41:29 2009
New Revision: 786343

URL: http://svn.apache.org/viewvc?rev=786343&view=rev
Log:
HIVE-529. Some cleanup for join operator
(Zheng Shao via namit)


Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinObjectKey.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinObjectValue.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
    hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUtils.java
    hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=786343&r1=786342&r2=786343&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Fri Jun 19 00:41:29 2009
@@ -241,6 +241,9 @@
     HIVE-561. Make hash aggregation threshold configurable
     (Zheng Shao via namit)
 
+    HIVE-529. Some cleanup for join operator
+    (Zheng Shao via namit)
+
 Release 0.3.1 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java?rev=786343&r1=786342&r2=786343&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java Fri Jun 19 00:41:29 2009
@@ -27,6 +27,7 @@
 import java.util.Set;
 import java.util.Stack;
 import java.util.Vector;
+import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -39,14 +40,14 @@
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.mapred.Reporter;
 
 /**
  * Join operator implementation.
  */
-public class CommonJoinOperator<T extends joinDesc> extends Operator<T> implements Serializable {
+public abstract class CommonJoinOperator<T extends joinDesc> extends Operator<T> implements Serializable {
   private static final long serialVersionUID = 1L;
   static final protected Log LOG = LogFactory.getLog(CommonJoinOperator.class.getName());
 
@@ -76,9 +77,19 @@
     }
   }
 
-  transient protected int numValues; // number of aliases
-  transient protected Map<Byte, List<ExprNodeEvaluator>> joinValues;
-  transient protected Map<Byte, List<ObjectInspector>>   joinValuesObjectInspectors;
+  transient protected int numAliases; // number of aliases
+  /**
+   * The expressions for join outputs.
+   */
+  transient protected Map<Byte, List<ExprNodeEvaluator>> joinValues; 
+  /**
+   * The ObjectInspectors for the join inputs.
+   */
+  transient protected Map<Byte, List<ObjectInspector>> joinValuesObjectInspectors;
+  /**
+   * The standard ObjectInspectors for the join inputs.
+   */
+  transient protected Map<Byte, List<ObjectInspector>> joinValuesStandardObjectInspectors; 
   
   transient static protected Byte[] order; // order in which the results should be output
   transient protected joinCond[] condn;
@@ -89,7 +100,7 @@
   transient private Vector<ArrayList<Object>>[] dummyObjVectors;
   transient private Stack<Iterator<ArrayList<Object>>> iterators;
   transient protected int totalSz; // total size of the composite object
-  transient ObjectInspector joinOutputObjectInspector;
+  transient ObjectInspector joinOutputObjectInspector;  // The OI for the output row 
   
   // keys are the column names. basically this maps the position of the column in 
   // the output of the CommonJoinOperator to the input columnInfo.
@@ -125,20 +136,65 @@
     return total;
   }
 
+  protected static HashMap<Byte, List<ObjectInspector>> getObjectInspectorsFromEvaluators(
+      Map<Byte, List<ExprNodeEvaluator>> exprEntries, ObjectInspector[] inputObjInspector)
+      throws HiveException {
+    HashMap<Byte, List<ObjectInspector>> result = new HashMap<Byte, List<ObjectInspector>>();
+    for(Entry<Byte, List<ExprNodeEvaluator>> exprEntry : exprEntries.entrySet()) {
+      Byte alias = exprEntry.getKey();
+      List<ExprNodeEvaluator> exprList = exprEntry.getValue();
+      ArrayList<ObjectInspector> fieldOIList = new ArrayList<ObjectInspector>();
+      for (int i=0; i<exprList.size(); i++) {
+        fieldOIList.add(exprList.get(i).initialize(inputObjInspector[alias]));
+      }
+      result.put(alias, fieldOIList);
+    }
+    return result;
+  }
+  
+  protected static HashMap<Byte, List<ObjectInspector>> getStandardObjectInspectors(
+      Map<Byte, List<ObjectInspector>> aliasToObjectInspectors) {
+    HashMap<Byte, List<ObjectInspector>> result = new HashMap<Byte, List<ObjectInspector>>();
+    for(Entry<Byte, List<ObjectInspector>> oiEntry: aliasToObjectInspectors.entrySet()) {
+      Byte alias = oiEntry.getKey();
+      List<ObjectInspector> oiList = oiEntry.getValue();
+      ArrayList<ObjectInspector> fieldOIList = new ArrayList<ObjectInspector>(oiList.size());
+      for (int i=0; i<oiList.size(); i++) {
+        fieldOIList.add(ObjectInspectorUtils.getStandardObjectInspector(oiList.get(i), 
+            ObjectInspectorCopyOption.WRITABLE));
+      }
+      result.put(alias, fieldOIList);
+    }
+    return result;
+    
+  }
+  
+  protected static <T extends joinDesc> ObjectInspector getJoinOutputObjectInspector(Byte[] order,
+      Map<Byte, List<ObjectInspector>> aliasToObjectInspectors, T conf) {
+    ArrayList<ObjectInspector> structFieldObjectInspectors = new ArrayList<ObjectInspector>();
+    for (Byte alias : order) {
+      List<ObjectInspector> oiList = aliasToObjectInspectors.get(alias);
+      structFieldObjectInspectors.addAll(oiList);
+    }
+    
+    StructObjectInspector joinOutputObjectInspector = ObjectInspectorFactory
+      .getStandardStructObjectInspector(conf.getOutputColumnNames(), structFieldObjectInspectors);
+    return joinOutputObjectInspector;
+  }
+  
   public void initializeOp(Configuration hconf, Reporter reporter, ObjectInspector[] inputObjInspector) throws HiveException {
     LOG.info("COMMONJOIN " + ((StructObjectInspector)inputObjInspector[0]).getTypeName());   
     totalSz = 0;
     // Map that contains the rows for each alias
     storage = new HashMap<Byte, Vector<ArrayList<Object>>>();
 
-    numValues = conf.getExprs().size();
+    numAliases = conf.getExprs().size();
     
     joinValues = new HashMap<Byte, List<ExprNodeEvaluator>>();
-    joinValuesObjectInspectors = new HashMap<Byte, List<ObjectInspector>>();
 
     if (order == null) {
-      order = new Byte[numValues];
-      for (int i = 0; i < numValues; i++)
+      order = new Byte[numAliases];
+      for (int i = 0; i < numAliases; i++)
         order[i] = (byte) i;
     }
     condn = conf.getConds();
@@ -146,8 +202,11 @@
 
     totalSz = populateJoinKeyValue(joinValues, conf.getExprs());
 
-    dummyObj = new Object[numValues];
-    dummyObjVectors = new Vector[numValues];
+    joinValuesObjectInspectors = getObjectInspectorsFromEvaluators(joinValues, inputObjInspector);
+    joinValuesStandardObjectInspectors = getStandardObjectInspectors(joinValuesObjectInspectors);
+      
+    dummyObj = new Object[numAliases];
+    dummyObjVectors = new Vector[numAliases];
 
     int pos = 0;
     for (Byte alias : order) {
@@ -168,6 +227,9 @@
     joinEmitInterval = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEJOINEMITINTERVAL);
     
     forwardCache = new Object[totalSz];
+    
+    joinOutputObjectInspector = getJoinOutputObjectInspector(order, joinValuesStandardObjectInspectors, conf);
+    LOG.info("JOIN " + ((StructObjectInspector)joinOutputObjectInspector).getTypeName() + " totalsz = " + totalSz);
   }
 
   public void startGroup() throws HiveException {
@@ -177,7 +239,7 @@
       storage.put(alias, new Vector<ArrayList<Object>>());
   }
 
-  private int getNextSize(int sz) {
+  protected int getNextSize(int sz) {
     // A very simple counter to keep track of join entries for a key
     if (sz >= 100000)
       return sz + 100000;
@@ -187,83 +249,31 @@
 
   transient protected Byte alias;
   
-  protected ArrayList<Object> computeValues(Object row, ObjectInspector rowInspector,
-    List<ExprNodeEvaluator> valueFields, Map<Byte, List<ObjectInspector>> joinExprsObjectInspectors) throws HiveException {
-    
-    // Get the valueFields Object Inspectors
-    List<ObjectInspector> valueFieldOI = joinExprsObjectInspectors.get(alias);
-    if (valueFieldOI == null) {
-      // Initialize the ExprEvaluator if necessary
-      valueFieldOI = new ArrayList<ObjectInspector>();
-      for (int i=0; i<valueFields.size(); i++) {
-        valueFieldOI.add(valueFields.get(i).initialize(rowInspector));
-      }
-      joinExprsObjectInspectors.put(alias, valueFieldOI);
-    }
+  /**
+   * Return the value as a standard object.
+   * StandardObject can be inspected by a standard ObjectInspector.
+   */
+  protected static ArrayList<Object> computeValues(Object row,
+    List<ExprNodeEvaluator> valueFields, List<ObjectInspector> valueFieldsOI) throws HiveException {
     
     // Compute the values
     ArrayList<Object> nr = new ArrayList<Object>(valueFields.size());
     for (int i=0; i<valueFields.size(); i++) {
       nr.add(ObjectInspectorUtils.copyToStandardObject(
           valueFields.get(i).evaluate(row),
-          valueFieldOI.get(i)));
+          valueFieldsOI.get(i),
+          ObjectInspectorCopyOption.WRITABLE));
     }
     
     return nr;
   }
   
-  public void process(Object row, ObjectInspector rowInspector, int tag)
-      throws HiveException {
-    try {
-      // get alias
-      alias = (byte)tag;
-
-      if ((lastAlias == null) || (!lastAlias.equals(alias)))
-        nextSz = joinEmitInterval;
-      
-      ArrayList<Object> nr = computeValues(row, rowInspector, joinValues.get(alias), joinValuesObjectInspectors);
-      
-      // number of rows for the key in the given table
-      int sz = storage.get(alias).size();
-
-      // Are we consuming too much memory
-      if (alias == numValues - 1) {
-        if (sz == joinEmitInterval) {
-          // The input is sorted by alias, so if we are already in the last join operand,
-          // we can emit some results now.
-          // Note this has to be done before adding the current row to the storage,
-          // to preserve the correctness for outer joins.
-          checkAndGenObject();
-          storage.get(alias).clear();
-        }
-      } else {
-        if (sz == nextSz) {
-          // Output a warning if we reached at least 1000 rows for a join operand
-          // We won't output a warning for the last join operand since the size
-          // will never goes to joinEmitInterval.
-          StructObjectInspector soi = (StructObjectInspector)rowInspector;
-          StructField sf = soi.getStructFieldRef(Utilities.ReduceField.KEY.toString());
-          Object keyObject = soi.getStructFieldData(row, sf);
-          LOG.warn("table " + alias + " has " + sz + " rows for join key " + keyObject);
-          nextSz = getNextSize(nextSz);
-        }
-      }
-
-      // Add the value to the vector
-      storage.get(alias).add(nr);
-
-    } catch (Exception e) {
-      e.printStackTrace();
-      throw new HiveException(e);
-    }
-  }
-
   transient Object[] forwardCache;
   
   private void createForwardJoinObject(IntermediateObject intObj,
       boolean[] nullsArr) throws HiveException {
     int p = 0;
-    for (int i = 0; i < numValues; i++) {
+    for (int i = 0; i < numAliases; i++) {
       Byte alias = order[i];
       int sz = joinValues.get(alias).size();
       if (nullsArr[i]) {
@@ -500,7 +510,7 @@
   private void genObject(Vector<boolean[]> inputNulls, int aliasNum,
                          IntermediateObject intObj, boolean firstRow) throws HiveException {
     boolean childFirstRow = firstRow;
-    if (aliasNum < numValues) {
+    if (aliasNum < numAliases) {
       Iterator<ArrayList<Object>> aliasRes = storage.get(order[aliasNum])
           .iterator();
       iterators.push(aliasRes);
@@ -531,13 +541,13 @@
    * @throws HiveException
    */
   public void endGroup() throws HiveException {
-    LOG.trace("Join Op: endGroup called: numValues=" + numValues);
+    LOG.trace("Join Op: endGroup called: numValues=" + numAliases);
     checkAndGenObject();
   }
 
   protected void checkAndGenObject() throws HiveException {
     // does any result need to be emitted
-    for (int i = 0; i < numValues; i++) {
+    for (int i = 0; i < numAliases; i++) {
       Byte alias = order[i];
       if (storage.get(alias).iterator().hasNext() == false) {
         if (noOuterJoin) {
@@ -550,7 +560,7 @@
     }
 
     LOG.trace("calling genObject");
-    genObject(null, 0, new IntermediateObject(new ArrayList[numValues], 0), true);
+    genObject(null, 0, new IntermediateObject(new ArrayList[numAliases], 0), true);
     LOG.trace("called genObject");
   }
 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java?rev=786343&r1=786342&r2=786343&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java Fri Jun 19 00:41:29 2009
@@ -28,6 +28,7 @@
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
@@ -48,22 +49,55 @@
 
     ArrayList<ObjectInspector> structFieldObjectInspectors = new ArrayList<ObjectInspector>(totalSz);
 
-    for (Byte alias : order) {
-      int sz = conf.getExprs().get(alias).size();
-      StructObjectInspector fldObjIns = (StructObjectInspector)((StructObjectInspector)inputObjInspector[alias.intValue()]).getStructFieldRef("VALUE").getFieldObjectInspector();
-      for (int i = 0; i < sz; i++) {
-        structFieldObjectInspectors.add(
-            ObjectInspectorUtils.getStandardObjectInspector(
-                fldObjIns.getAllStructFieldRefs().get(i).getFieldObjectInspector(),
-                ObjectInspectorCopyOption.KEEP));
+    initializeChildren(hconf, reporter, new ObjectInspector[]{joinOutputObjectInspector});
+  }
+  
+  public void process(Object row, ObjectInspector rowInspector, int tag)
+      throws HiveException {
+    try {
+      // get alias
+      alias = (byte)tag;
+    
+      if ((lastAlias == null) || (!lastAlias.equals(alias)))
+        nextSz = joinEmitInterval;
+      
+      ArrayList<Object> nr = computeValues(row, joinValues.get(alias), joinValuesObjectInspectors.get(alias));
+      
+      // number of rows for the key in the given table
+      int sz = storage.get(alias).size();
+    
+      // Are we consuming too much memory
+      if (alias == numAliases - 1) {
+        if (sz == joinEmitInterval) {
+          // The input is sorted by alias, so if we are already in the last join operand,
+          // we can emit some results now.
+          // Note this has to be done before adding the current row to the storage,
+          // to preserve the correctness for outer joins.
+          checkAndGenObject();
+          storage.get(alias).clear();
+        }
+      } else {
+        if (sz == nextSz) {
+          // Output a warning if we reached at least 1000 rows for a join operand
+          // We won't output a warning for the last join operand since the size
+          // will never goes to joinEmitInterval.
+          StructObjectInspector soi = (StructObjectInspector)rowInspector;
+          StructField sf = soi.getStructFieldRef(Utilities.ReduceField.KEY.toString());
+          Object keyObject = soi.getStructFieldData(row, sf);
+          LOG.warn("table " + alias + " has " + sz + " rows for join key " + keyObject);
+          nextSz = getNextSize(nextSz);
+        }
       }
-    }
     
-    joinOutputObjectInspector = ObjectInspectorFactory
-    .getStandardStructObjectInspector(conf.getOutputColumnNames(), structFieldObjectInspectors);
-    LOG.info("JOIN " + ((StructObjectInspector)joinOutputObjectInspector).getTypeName() + " totalsz = " + totalSz);
-
-    initializeChildren(hconf, reporter, new ObjectInspector[]{joinOutputObjectInspector});
+      // Add the value to the vector
+      storage.get(alias).add(nr);
+    
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw new HiveException(e);
+    }
   }
+
+  
 }
 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinObjectKey.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinObjectKey.java?rev=786343&r1=786342&r2=786343&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinObjectKey.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinObjectKey.java Fri Jun 19 00:41:29 2009
@@ -25,6 +25,9 @@
 import java.util.ArrayList;
 
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator.MapJoinObjectCtx;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
 
@@ -79,9 +82,14 @@
       // get the tableDesc from the map stored in the mapjoin operator
       MapJoinObjectCtx ctx = MapJoinOperator.getMapMetadata().get(Integer.valueOf(metadataTag));
       val.readFields(in);      
-      obj = (ArrayList<Object>)ctx.getDeserializer().deserialize(val);
-    } catch (Exception e) {
-      throw new IOException(e.getMessage());
+      obj = 
+        (ArrayList<Object>)
+        ObjectInspectorUtils.copyToStandardObject(
+            ctx.getSerDe().deserialize(val),
+            ctx.getSerDe().getObjectInspector(),
+            ObjectInspectorCopyOption.WRITABLE);
+    } catch (SerDeException e) {
+      throw new IOException(e);
     }
   }
   
@@ -94,11 +102,11 @@
       MapJoinObjectCtx ctx = MapJoinOperator.getMapMetadata().get(Integer.valueOf(metadataTag));
 
       // Different processing for key and value
-      Writable outVal = ctx.getSerializer().serialize(obj, ctx.getSerObjInspector());
+      Writable outVal = ctx.getSerDe().serialize(obj, ctx.getStandardOI());
       outVal.write(out);
     }
-    catch (Exception e) {
-      throw new IOException(e.getMessage());
+    catch (SerDeException e) {
+      throw new IOException(e);
     }
   }
 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinObjectValue.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinObjectValue.java?rev=786343&r1=786342&r2=786343&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinObjectValue.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinObjectValue.java Fri Jun 19 00:41:29 2009
@@ -29,10 +29,13 @@
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator.MapJoinObjectCtx;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.lazy.LazyObject;
 import org.apache.hadoop.hive.serde2.lazy.LazyStruct;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 
 /**
  * Map Join Object used for both key and value
@@ -89,22 +92,18 @@
 
       Vector<ArrayList<Object>> res = new Vector<ArrayList<Object>>();
       for (int pos = 0; pos < sz; pos++) {
-        ArrayList<Object> memObj = new ArrayList<Object>();
-        val.readFields(in);
-        StructObjectInspector objIns = (StructObjectInspector) ctx
-            .getDeserObjInspector();
-        LazyStruct lazyObj = (LazyStruct) (((LazyObject) ctx.getDeserializer()
-            .deserialize(val)).getObject());
-        List<? extends StructField> listFields = objIns.getAllStructFieldRefs();
-        for (StructField fld : listFields) {
-          memObj.add(objIns.getStructFieldData(lazyObj, fld));
-        }
-
+        ArrayList<Object> memObj =
+          (ArrayList<Object>)
+          ObjectInspectorUtils.copyToStandardObject(
+              ctx.getSerDe().deserialize(val),
+              ctx.getSerDe().getObjectInspector(),
+              ObjectInspectorCopyOption.WRITABLE);
+        
         res.add(memObj);
       }
       obj = res;
-    } catch (Exception e) {
-      throw new IOException(e.getMessage());
+    } catch (SerDeException e) {
+      throw new IOException(e);
     }
   }
   
@@ -122,12 +121,12 @@
       out.writeInt(v.size());
 
       for (int pos = 0; pos < v.size(); pos++) {
-        Writable outVal = ctx.getSerializer().serialize(v.get(pos), ctx.getSerObjInspector());
+        Writable outVal = ctx.getSerDe().serialize(v.get(pos), ctx.getStandardOI());
         outVal.write(out);
       }
     }
-    catch (Exception e) {
-      throw new IOException(e.getMessage());
+    catch (SerDeException e) {
+      throw new IOException(e);
     }
   }
 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=786343&r1=786342&r2=786343&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Fri Jun 19 00:41:29 2009
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.exec;
 
 import java.io.File;
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
@@ -32,20 +33,14 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
-import org.apache.hadoop.hive.ql.plan.exprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.mapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.tableDesc;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.serde2.Deserializer;
-import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.hive.ql.util.jdbm.htree.HTree;
@@ -60,8 +55,18 @@
   private static final long serialVersionUID = 1L;
   static final private Log LOG = LogFactory.getLog(MapJoinOperator.class.getName());
 
+  /**
+   * The expressions for join inputs's join keys.
+   */
   transient protected Map<Byte, List<ExprNodeEvaluator>> joinKeys;
+  /**
+   * The ObjectInspectors for the join inputs's join keys.
+   */
   transient protected Map<Byte, List<ObjectInspector>> joinKeysObjectInspectors;
+  /**
+   * The standard ObjectInspectors for the join inputs's join keys.
+   */
+  transient protected Map<Byte, List<ObjectInspector>> joinKeysStandardObjectInspectors;
 
   transient private int posBigTable;       // one of the tables that is not in memory
   transient int mapJoinRowsKey;            // rows for a given key
@@ -69,51 +74,30 @@
   transient protected Map<Byte, HTree> mapJoinTables;
 
   public static class MapJoinObjectCtx {
-    ObjectInspector serObjInspector;
-    Serializer      serializer;
-    Deserializer    deserializer;
-    ObjectInspector deserObjInspector;
+    ObjectInspector standardOI;
+    SerDe      serde;
     
     /**
-     * @param serObjInspector
-     * @param serializer
-     * @param deserializer
-     * @param deserObjInspector
+     * @param standardOI
+     * @param serde
      */
-    public MapJoinObjectCtx(ObjectInspector serObjInspector,
-        Serializer serializer, ObjectInspector deserObjInspector, Deserializer deserializer) {
-      this.serObjInspector = serObjInspector;
-      this.serializer = serializer;
-      this.deserializer = deserializer;
-      this.deserObjInspector = deserObjInspector;
+    public MapJoinObjectCtx(ObjectInspector standardOI, SerDe serde) {
+      this.standardOI = standardOI;
+      this.serde = serde;
     }
     
     /**
-     * @return the objInspector
+     * @return the standardOI
      */
-    public ObjectInspector getSerObjInspector() {
-      return serObjInspector;
+    public ObjectInspector getStandardOI() {
+      return standardOI;
     }
 
     /**
-     * @return the objInspector
+     * @return the serde
      */
-    public ObjectInspector getDeserObjInspector() {
-      return deserObjInspector;
-    }
-    
-    /**
-     * @return the serializer
-     */
-    public Serializer getSerializer() {
-      return serializer;
-    }
-
-    /**
-     * @return the deserializer
-     */
-    public Deserializer getDeserializer() {
-      return deserializer;
+    public SerDe getSerDe() {
+      return serde;
     }
 
   }
@@ -137,22 +121,23 @@
     firstRow = true;
     try {
       joinKeys  = new HashMap<Byte, List<ExprNodeEvaluator>>();
-      joinKeysObjectInspectors = new HashMap<Byte, List<ObjectInspector>>();
       
       populateJoinKeyValue(joinKeys, conf.getKeys());
-      
+      joinKeysObjectInspectors = getObjectInspectorsFromEvaluators(joinKeys, inputObjInspector);
+      joinKeysStandardObjectInspectors = getStandardObjectInspectors(joinKeysObjectInspectors); 
+        
       // all other tables are small, and are cached in the hash table
       posBigTable = conf.getPosBigTable();
 
-      metadataValueTag = new int[numValues];
-      for (int pos = 0; pos < numValues; pos++)
+      metadataValueTag = new int[numAliases];
+      for (int pos = 0; pos < numAliases; pos++)
         metadataValueTag[pos] = -1;
       
       mapJoinTables = new HashMap<Byte, HTree>();
       hTables = new ArrayList<File>();
       
       // initialize the hash tables for other tables
-      for (int pos = 0; pos < numValues; pos++) {
+      for (int pos = 0; pos < numAliases; pos++) {
         if (pos == posBigTable)
           continue;
         
@@ -175,30 +160,15 @@
         RecordManager recman = RecordManagerFactory.createRecordManager(newDirName + "/" + pos, props );
         HTree hashTable = HTree.createInstance(recman);
         
-        mapJoinTables.put(new Byte((byte)pos), hashTable);
+        mapJoinTables.put(Byte.valueOf((byte)pos), hashTable);
       }
 
       storage.put((byte)posBigTable, new Vector<ArrayList<Object>>());
       
       mapJoinRowsKey = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEMAPJOINROWSIZE);
       
-      // initialize the join output object inspectors
-      ArrayList<ObjectInspector> structFieldObjectInspectors = new ArrayList<ObjectInspector>(totalSz);
-
-      for (Byte alias : order) {
-        int sz = conf.getExprs().get(alias).size();
-        List<? extends StructField> listFlds = ((StructObjectInspector)inputObjInspector[alias.intValue()]).getAllStructFieldRefs();
-        assert listFlds.size() == sz;
-        for (StructField fld: listFlds) {
-          structFieldObjectInspectors.add(fld.getFieldObjectInspector());
-        }
-      }
-      
-      joinOutputObjectInspector = ObjectInspectorFactory
-      .getStandardStructObjectInspector(conf.getOutputColumnNames(), structFieldObjectInspectors);
-
       initializeChildren(hconf, reporter, new ObjectInspector[]{joinOutputObjectInspector});
-    } catch (Exception e) {
+    } catch (IOException e) {
       e.printStackTrace();
       throw new HiveException(e);
     }
@@ -214,21 +184,9 @@
       if ((lastAlias == null) || (!lastAlias.equals(alias)))
         nextSz = joinEmitInterval;
       
-      // compute keys and values     
-      ArrayList<Object> key   = computeValues(row, rowInspector, joinKeys.get(alias), joinKeysObjectInspectors);
-      ArrayList<Object> value = computeValues(row, rowInspector, joinValues.get(alias), joinValuesObjectInspectors);
-
-      // Until there is one representation for the keys, convert explicitly
-      int keyPos = 0;
-      // TODO: use keyPos instead
-      for (Object keyElem : key) {
-        PrimitiveObjectInspector poi = (PrimitiveObjectInspector)joinKeysObjectInspectors.get(alias).get(keyPos);
-        if (!poi.isWritable()) {
-          // convert o to writable
-          key.set(keyPos, ObjectInspectorUtils.copyToStandardObject(key.get(keyPos), poi, ObjectInspectorCopyOption.WRITABLE));
-        }
-        keyPos++;
-      }
+      // compute keys and values as StandardObjects     
+      ArrayList<Object> key   = computeValues(row, joinKeys.get(alias), joinKeysObjectInspectors.get(alias));
+      ArrayList<Object> value = computeValues(row, joinValues.get(alias), joinValuesObjectInspectors.get(alias));
 
       // does this source need to be stored in the hash map
       if (tag != posBigTable) {
@@ -236,25 +194,14 @@
           metadataKeyTag = nextVal++;
           
           tableDesc keyTableDesc = conf.getKeyTblDesc();
-          Serializer keySerializer = (Serializer)keyTableDesc.getDeserializerClass().newInstance();
+          SerDe keySerializer = (SerDe)ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(), null);
           keySerializer.initialize(null, keyTableDesc.getProperties());
 
-          ExprNodeEvaluator[] keyEval = new ExprNodeEvaluator[conf.getKeys().get(new Byte((byte)tag)).size()];
-          int i=0;
-          for (exprNodeDesc e: conf.getKeys().get(new Byte((byte)tag))) {
-            keyEval[i++] = ExprNodeEvaluatorFactory.get(e);
-          }
-          
-          List<String> keyOutputCols = new ArrayList<String>();
-          for (int k = 0; k < keyEval.length; k++) {
-            keyOutputCols.add(HiveConf.getColumnInternalName(k));
-          }
-          ObjectInspector keyObjectInspector = initEvaluatorsAndReturnStruct(keyEval, keyOutputCols, rowInspector);
-
-          Deserializer deserializer = (Deserializer)ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(), null);
-          deserializer.initialize(null, keyTableDesc.getProperties());
-          
-          mapMetadata.put(new Integer(metadataKeyTag), new MapJoinObjectCtx(keyObjectInspector, keySerializer, deserializer.getObjectInspector(), deserializer));
+          mapMetadata.put(Integer.valueOf(metadataKeyTag), 
+              new MapJoinObjectCtx(
+                  ObjectInspectorUtils.getStandardObjectInspector(keySerializer.getObjectInspector(),
+                      ObjectInspectorCopyOption.WRITABLE),
+                  keySerializer));
           
           firstRow = false;
         }
@@ -277,26 +224,14 @@
           metadataValueTag[tag] = nextVal++;
                     
           tableDesc valueTableDesc = conf.getValueTblDescs().get(tag);
-          Serializer valueSerializer = (Serializer)valueTableDesc.getDeserializerClass().newInstance();
-          valueSerializer.initialize(null, valueTableDesc.getProperties());
-
-          ExprNodeEvaluator[] valueEval = new ExprNodeEvaluator[conf.getExprs().get(new Byte((byte)tag)).size()];
-          int i=0;
-          for (exprNodeDesc e: conf.getExprs().get(new Byte((byte)tag))) {
-            valueEval[i++] = ExprNodeEvaluatorFactory.get(e);
-          }
-          List<String> tagOutputCols = new ArrayList<String>(); 
-          int start = 0;
-          for (int k = 0; k < tag; k++)
-            start+=conf.getExprs().get(new Byte((byte)k)).size();
-          for (int k=0;k<conf.getExprs().get(new Byte((byte)tag)).size();k++)
-            tagOutputCols.add(HiveConf.getColumnInternalName(k));
-          ObjectInspector valueObjectInspector = initEvaluatorsAndReturnStruct(valueEval, tagOutputCols, rowInspector);
+          SerDe valueSerDe = (SerDe)ReflectionUtils.newInstance(valueTableDesc.getDeserializerClass(), null);
+          valueSerDe.initialize(null, valueTableDesc.getProperties());
  
-          Deserializer deserializer = (Deserializer)ReflectionUtils.newInstance(valueTableDesc.getDeserializerClass(), null);
-          deserializer.initialize(null, valueTableDesc.getProperties());
-          
-          mapMetadata.put(new Integer((byte)metadataValueTag[tag]), new MapJoinObjectCtx(valueObjectInspector, valueSerializer, deserializer.getObjectInspector(), deserializer));
+          mapMetadata.put(Integer.valueOf(metadataValueTag[tag]),
+              new MapJoinObjectCtx(
+                  ObjectInspectorUtils.getStandardObjectInspector(valueSerDe.getObjectInspector(),
+                      ObjectInspectorCopyOption.WRITABLE),
+              valueSerDe));
         }
         
         // Construct externalizable objects for key and value
@@ -342,7 +277,10 @@
         if (pos.intValue() != tag)
           storage.put(pos, null);
     
-    } catch (Exception e) {
+    } catch (SerDeException e) {
+      e.printStackTrace();
+      throw new HiveException(e);
+    } catch (IOException e) {
       e.printStackTrace();
       throw new HiveException(e);
     }
@@ -353,7 +291,7 @@
    * @return the name of the operator
    */
   public String getName() {
-    return new String("MAPJOIN");
+    return "MAPJOIN";
   }
   
   public void close(boolean abort) throws HiveException {

Modified: hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUtils.java?rev=786343&r1=786342&r2=786343&view=diff
==============================================================================
--- hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUtils.java (original)
+++ hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUtils.java Fri Jun 19 00:41:29 2009
@@ -23,7 +23,10 @@
 import java.nio.charset.CharacterCodingException;
 
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
@@ -91,14 +94,27 @@
     }
   }
 
+
+  static byte[] trueBytes = {(byte)'t', 'r', 'u', 'e'};
+  static byte[] falseBytes = {(byte)'f', 'a', 'l', 's', 'e'};
+
   /**
-   * Write out the text representation of a Primitive Object to a UTF8 byte stream. 
+   * Write out the text representation of a Primitive Object to a UTF8 byte stream.
    * @param out  The UTF8 byte OutputStream
    * @param o    The primitive Object
    */
   public static void writePrimitiveUTF8(OutputStream out, Object o, PrimitiveObjectInspector oi) throws IOException {
     
     switch (oi.getPrimitiveCategory()) {
+      case BOOLEAN: {
+        boolean b = ((BooleanObjectInspector)oi).get(o);
+        if (b) {
+          out.write(trueBytes, 0, trueBytes.length);
+        } else {
+          out.write(falseBytes, 0, falseBytes.length);
+        }
+        break;
+      }
       case BYTE: {
         LazyInteger.writeUTF8(out, ((ByteObjectInspector)oi).get(o));
         break;
@@ -115,27 +131,25 @@
         LazyLong.writeUTF8(out, ((LongObjectInspector)oi).get(o));
         break;
       }
-      // TODO: We should enable this piece of code, once we pass ObjectInspector in the Operator.init() 
-      // instead of Operator.forward().  Until then, JoinOperator will assume the output columns are
-      // all strings but they may not be.
-      /*
+      case FLOAT: {
+        float f = ((FloatObjectInspector)oi).get(o);
+        ByteBuffer b = Text.encode(String.valueOf(f));
+        out.write(b.array(), 0, b.limit());
+        break;
+      }
+      case DOUBLE: {
+        double d = ((DoubleObjectInspector)oi).get(o);
+        ByteBuffer b = Text.encode(String.valueOf(d));
+        out.write(b.array(), 0, b.limit());
+        break;
+      }
       case STRING: {
         Text t = ((StringObjectInspector)oi).getPrimitiveWritableObject(o);
         out.write(t.getBytes(), 0, t.getLength());
         break;
       }
-      */
       default: {
-        if (o instanceof Text) {
-          // This piece of code improves the performance because we don't need to
-          // convert Text to String then back to Text.  We should rely on the code
-          // block above when JoinOperator is fixed.
-          Text t = (Text)o;
-          out.write(t.getBytes(), 0, t.getLength());
-        } else {
-          ByteBuffer b = Text.encode(o.toString());
-          out.write(b.array(), 0, b.limit());
-        }
+        throw new RuntimeException("Hive internal error.");
       }
     }
   }

Modified: hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java?rev=786343&r1=786342&r2=786343&view=diff
==============================================================================
--- hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java (original)
+++ hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java Fri Jun 19 00:41:29 2009
@@ -183,7 +183,7 @@
             break;
           }
           case WRITABLE: {
-            result = loi.getPrimitiveWritableObject(o);
+            result = loi.getPrimitiveWritableObject(loi.copyObject(o));
             break;
           }
         }