You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2009/06/07 09:28:08 UTC

svn commit: r782333 - in /hadoop/hive/trunk: ./ common/src/java/org/apache/hadoop/hive/conf/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/

Author: zshao
Date: Sun Jun  7 07:28:07 2009
New Revision: 782333

URL: http://svn.apache.org/viewvc?rev=782333&view=rev
Log:
HIVE-528. Map Join followup: split MapJoinObject into MapJoinObjectKey and MapJoinObjectValue. (Namit Jain via zshao)

Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinObject.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=782333&r1=782332&r2=782333&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Sun Jun  7 07:28:07 2009
@@ -64,6 +64,9 @@
     HIVE-490. Add missing configuration variables to hive-default.xml.
     (Yongqiang He via zshao)
 
+    HIVE-528. Map Join followup: split MapJoinObject into MapJoinObjectKey and
+    MapJoinObjectValue. (Namit Jain via zshao)
+
   OPTIMIZATIONS
 
     HIVE-279. Predicate Pushdown support (Prasad Chakka via athusoo).

Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=782333&r1=782332&r2=782333&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Sun Jun  7 07:28:07 2009
@@ -113,7 +113,6 @@
     HIVEMAPSIDEAGGREGATE("hive.map.aggr", "true"),
     HIVEGROUPBYSKEW("hive.groupby.skewindata", "false"),
     HIVEJOINEMITINTERVAL("hive.join.emit.interval", 1000),
-    HIVEMAPJOINNUMROWSCACHE("hive.mapjoin.numrows", 1000),
     HIVEMAPJOINROWSIZE("hive.mapjoin.size.key", 10000),
     HIVEMAPJOINCACHEROWS("hive.mapjoin.cache.numrows", 10000),
     HIVEGROUPBYMAPINTERVAL("hive.groupby.mapaggr.checkinterval", 100000),

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinObject.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinObject.java?rev=782333&r1=782332&r2=782333&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinObject.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinObject.java Sun Jun  7 07:28:07 2009
@@ -1,193 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Vector;
-
-import org.apache.hadoop.hive.ql.exec.MapJoinOperator.MapJoinObjectCtx;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.hive.serde2.lazy.LazyObject;
-import org.apache.hadoop.hive.serde2.lazy.LazyStruct;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-
-/**
- * Map Join Object used for both key and value
- */
-public class MapJoinObject implements Externalizable {
-
-  transient protected int     metadataTag;
-  transient protected int     objectTypeTag;
-  transient protected Object  obj;
-  
-  public MapJoinObject() {
-  }
-
-  /**
-   * @param metadataTag
-   * @param objectTypeTag
-   * @param obj
-   */
-  public MapJoinObject(int metadataTag, int objectTypeTag, Object obj) {
-    this.metadataTag = metadataTag;
-    this.objectTypeTag = objectTypeTag;
-    this.obj = obj;
-  }
-  
-  public boolean equals(Object o) {
-    if (o instanceof MapJoinObject) {
-      MapJoinObject mObj = (MapJoinObject)o;
-      if ((mObj.getMetadataTag() == metadataTag) && (mObj.getObjectTypeTag() == objectTypeTag)) {
-        if ((obj == null) && (mObj.getObj() == null))
-          return true;
-        if ((obj != null) && (mObj.getObj() != null) && (mObj.getObj().equals(obj)))
-          return true;
-      }
-    }
-
-    return false;
-  }
-  
-  public int hashCode() {
-    return (obj == null) ? 0 : obj.hashCode();
-  }
-  
-  @Override
-  public void readExternal(ObjectInput in) throws IOException,
-      ClassNotFoundException {
-    try {
-      metadataTag   = in.readInt();
-      objectTypeTag = in.readInt();
-
-      // get the tableDesc from the map stored in the mapjoin operator
-      MapJoinObjectCtx ctx = MapJoinOperator.getMapMetadata().get(new Integer(metadataTag));
-      Writable val = null;
-    
-      assert ((objectTypeTag == 1) || (objectTypeTag == 2));
-      if (objectTypeTag == 1) {
-        val = new BytesWritable();
-        val.readFields(in);      
-        obj = (ArrayList<Object>)ctx.getDeserializer().deserialize(val);
-      }
-      else if (objectTypeTag == 2) {
-        int sz = in.readInt();
-
-        Vector<ArrayList<Object>> res = new Vector<ArrayList<Object>>();
-        for (int pos = 0; pos < sz; pos++) {
-          ArrayList<Object> memObj = new ArrayList<Object>();
-          val = new Text();
-          val.readFields(in);
-          StructObjectInspector objIns = (StructObjectInspector)ctx.getDeserObjInspector();
-          LazyStruct lazyObj = (LazyStruct)(((LazyObject)ctx.getDeserializer().deserialize(val)).getObject());
-          List<? extends StructField> listFields = objIns.getAllStructFieldRefs();
-          int k = 0;
-          for (StructField fld : listFields) {
-            memObj.add(objIns.getStructFieldData(lazyObj, fld));
-          }
-          
-          res.add(memObj);
-        }
-        obj = res;
-      }
-    } catch (Exception e) {
-      throw new IOException(e.getMessage());
-    }
-  }
-  
-  @Override
-  public void writeExternal(ObjectOutput out) throws IOException {
-    try {
-      
-      out.writeInt(metadataTag);
-      out.writeInt(objectTypeTag);
-
-      // get the tableDesc from the map stored in the mapjoin operator
-      MapJoinObjectCtx ctx = MapJoinOperator.getMapMetadata().get(new Integer(metadataTag));
-
-      // Different processing for key and value
-      if (objectTypeTag == 1) {
-        Writable val = ctx.getSerializer().serialize(obj, ctx.getSerObjInspector());
-        val.write(out);
-      }
-      else if (objectTypeTag == 2) {
-        Vector<Object> v = (Vector<Object>)obj;
-        out.writeInt(v.size());
-
-        for (int pos = 0; pos < v.size(); pos++) {
-          Writable val = ctx.getSerializer().serialize(v.get(pos), ctx.getSerObjInspector());
-          val.write(out);
-        }
-      }
-    }
-    catch (Exception e) {
-      throw new IOException(e.getMessage());
-    }
-  }
-
-  /**
-   * @return the metadataTag
-   */
-  public int getMetadataTag() {
-    return metadataTag;
-  }
-
-  /**
-   * @param metadataTag the metadataTag to set
-   */
-  public void setMetadataTag(int metadataTag) {
-    this.metadataTag = metadataTag;
-  }
-
-  /**
-   * @return the objectTypeTag
-   */
-  public int getObjectTypeTag() {
-    return objectTypeTag;
-  }
-
-  /**
-   * @param objectTypeTag the objectTypeTag to set
-   */
-  public void setObjectTypeTag(int objectTypeTag) {
-    this.objectTypeTag = objectTypeTag;
-  }
-
-  /**
-   * @return the obj
-   */
-  public Object getObj() {
-    return obj;
-  }
-
-  /**
-   * @param obj the obj to set
-   */
-  public void setObj(Object obj) {
-    this.obj = obj;
-  }
-
-}

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=782333&r1=782332&r2=782333&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Sun Jun  7 07:28:07 2009
@@ -256,22 +256,19 @@
         }
         
         HTree hashTable = mapJoinTables.get(alias);
-        MapJoinObject keyMap = new MapJoinObject(metadataKeyTag, 1, key);
-        MapJoinObject o = (MapJoinObject)hashTable.get(keyMap);
+        MapJoinObjectKey keyMap = new MapJoinObjectKey(metadataKeyTag, key);
+        MapJoinObjectValue o = (MapJoinObjectValue)hashTable.get(keyMap);
         Vector<ArrayList<Object>> res = null;
         
         if (o == null) {
           res = new Vector<ArrayList<Object>>();
         }
         else {
-          res = (Vector<ArrayList<Object>>)o.getObj();
+          res = o.getObj();
         }
         
         res.add(value);
-
-        // TODO: put some warning if the size of res exceeds a given threshold
   
-
         if (metadataValueTag[tag] == -1) {
           metadataValueTag[tag] = nextVal++;
                     
@@ -294,40 +291,34 @@
         }
         
         // Construct externalizable objects for key and value
-        MapJoinObject keyObj = new MapJoinObject();
-        
-        // currently, key is always byteswritable and value is text - TODO: generalize this
-        keyObj.setMetadataTag(metadataKeyTag);
-        keyObj.setObjectTypeTag(1);
-        keyObj.setObj(key);
-        
-        MapJoinObject valueObj = new MapJoinObject();
-        
-        valueObj.setMetadataTag(metadataValueTag[tag]);
-        valueObj.setObjectTypeTag(2);
-        valueObj.setObj(res);
+        MapJoinObjectKey keyObj = new MapJoinObjectKey(metadataKeyTag, key);
+        MapJoinObjectValue valueObj = new MapJoinObjectValue(metadataValueTag[tag], res);
 
         if (res.size() > 1)
           hashTable.remove(keyObj);
 
+        // This may potentially increase the size of the hashmap on the mapper
+        if (res.size() > mapJoinRowsKey) {
+          LOG.warn("Number of values for a given key " + keyObj + " are " + res.size());
+        }
+        
         hashTable.put(keyObj, valueObj);
         return;
       }
 
-
       // Add the value to the vector
       storage.get(alias).add(value);
 
       for (Byte pos : order) {
         if (pos.intValue() != tag) {
-          MapJoinObject keyMap = new MapJoinObject(metadataKeyTag, 1, key);
-          MapJoinObject o = (MapJoinObject)mapJoinTables.get(pos).get(keyMap);
+          MapJoinObjectKey keyMap = new MapJoinObjectKey(metadataKeyTag, key);
+          MapJoinObjectValue o = (MapJoinObjectValue)mapJoinTables.get(pos).get(keyMap);
 
           if (o == null) {
             storage.put(pos, new Vector<ArrayList<Object>>());
           }
           else {
-            storage.put(pos, (Vector<ArrayList<Object>>)o.getObj());
+            storage.put(pos, o.getObj());
           }
         }
       }
@@ -363,7 +354,7 @@
     super.close(abort);
   }
   
-  public static void deleteDir(File dir) {
+  private void deleteDir(File dir) {
     if (dir.isDirectory()) {
       String[] children = dir.list();
       for (int i = 0; i < children.length; i++) {

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=782333&r1=782332&r2=782333&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Sun Jun  7 07:28:07 2009
@@ -415,7 +415,7 @@
   transient protected int[] childOperatorsTag; 
 
    /**
-   * Replace one child with another at the same position.
+   * Replace one child with another at the same position. The parent of the child is not changed
    * @param child     the old child
    * @param newChild  the new child
    */
@@ -423,7 +423,6 @@
     int childIndex = childOperators.indexOf(child);
     assert childIndex != -1;
     childOperators.set(childIndex, newChild);
-    // TODO: set parent for newChild
   }
 
   public void  removeChild(Operator<? extends Serializable> child) {
@@ -433,10 +432,17 @@
       childOperators = null;
     else
       childOperators.remove(childIndex);
+    
+    int parentIndex = child.getParentOperators().indexOf(this);
+    assert parentIndex != -1;
+    if (child.getParentOperators().size() == 1)
+      child.setParentOperators(null);
+    else
+      child.getParentOperators().remove(parentIndex);
   }
 
   /**
-   * Replace one parent with another at the same position.
+   * Replace one parent with another at the same position. Chilren of the new parent are not updated
    * @param parent     the old parent
    * @param newParent  the new parent
    */
@@ -444,7 +450,6 @@
     int parentIndex = parentOperators.indexOf(parent);
     assert parentIndex != -1;
     parentOperators.set(parentIndex, newParent);
-    // TODO: set the child in newParent correctly
   }
 
   protected void forward(Object row, ObjectInspector rowInspector) throws HiveException {

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=782333&r1=782332&r2=782333&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Sun Jun  7 07:28:07 2009
@@ -242,7 +242,6 @@
     for (Operator<? extends Serializable> childOp : childOps) 
       childOp.replaceParent(op, mapJoinOp);
     
-    // TODO: do as part of replaceParent
     mapJoinOp.setChildOperators(childOps);
     mapJoinOp.setParentOperators(newParentOps);
     op.setChildOperators(null);