You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2010/02/17 17:35:42 UTC

svn commit: r911060 - in /hadoop/hive/branches/branch-0.5: ./ common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/

Author: namit
Date: Wed Feb 17 16:35:41 2010
New Revision: 911060

URL: http://svn.apache.org/viewvc?rev=911060&view=rev
Log:
HIVE-1158. New parameter for specifying map join size
(Ning Zhang via namit)


Modified:
    hadoop/hive/branches/branch-0.5/CHANGES.txt
    hadoop/hive/branches/branch-0.5/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hadoop/hive/branches/branch-0.5/conf/hive-default.xml
    hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
    hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
    hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
    hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java
    hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java

Modified: hadoop/hive/branches/branch-0.5/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/branches/branch-0.5/CHANGES.txt?rev=911060&r1=911059&r2=911060&view=diff
==============================================================================
--- hadoop/hive/branches/branch-0.5/CHANGES.txt (original)
+++ hadoop/hive/branches/branch-0.5/CHANGES.txt Wed Feb 17 16:35:41 2010
@@ -107,6 +107,9 @@
     HIVE-1106. Support ALTER TABLE t ADD IF NOT EXIST PARTITION.
     (Paul Yang via zshao)
 
+    HIVE-1158. New parameter for specifying map join size
+    (Ning Zhang via namit)
+
   IMPROVEMENTS
 
     HIVE-760. Add version info to META-INF/MANIFEST.MF.

Modified: hadoop/hive/branches/branch-0.5/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/branches/branch-0.5/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=911060&r1=911059&r2=911060&view=diff
==============================================================================
--- hadoop/hive/branches/branch-0.5/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/branches/branch-0.5/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Feb 17 16:35:41 2010
@@ -128,6 +128,7 @@
     HIVEGROUPBYSKEW("hive.groupby.skewindata", "false"),
     HIVEJOINEMITINTERVAL("hive.join.emit.interval", 1000),
     HIVEJOINCACHESIZE("hive.join.cache.size", 25000),
+    HIVEMAPJOINBUCKETCACHESIZE("hive.mapjoin.bucket.cache.size", 100),
     HIVEMAPJOINROWSIZE("hive.mapjoin.size.key", 10000),
     HIVEMAPJOINCACHEROWS("hive.mapjoin.cache.numrows", 25000),
     HIVEGROUPBYMAPINTERVAL("hive.groupby.mapaggr.checkinterval", 100000),

Modified: hadoop/hive/branches/branch-0.5/conf/hive-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/branches/branch-0.5/conf/hive-default.xml?rev=911060&r1=911059&r2=911060&view=diff
==============================================================================
--- hadoop/hive/branches/branch-0.5/conf/hive-default.xml (original)
+++ hadoop/hive/branches/branch-0.5/conf/hive-default.xml Wed Feb 17 16:35:41 2010
@@ -268,6 +268,18 @@
 </property>
 
 <property>
+  <name>hive.join.cache.size</name>
+  <value>25000</value>
+  <description>How many rows in the joining tables (except the streaming table) should be cached in memory. </description>
+</property>
+
+<property>
+  <name>hive.mapjoin.bucket.cache.size</name>
+  <value>100</value>
+  <description>How many values in each keys in the map-joined table should be cached in memory. </description>
+</property>
+
+<property>
   <name>hive.mapjoin.maxsize</name>
   <value>100000</value>
   <description>Maximum # of rows of the small table that can be handled by map-side join. If the size is reached and hive.task.progress is set, a fatal error counter is set and the job will be killed.</description>

Modified: hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java?rev=911060&r1=911059&r2=911060&view=diff
==============================================================================
--- hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java (original)
+++ hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java Wed Feb 17 16:35:41 2010
@@ -193,9 +193,12 @@
     return joinOutputObjectInspector;
   }
 
+  Configuration hconf;
+
   protected void initializeOp(Configuration hconf) throws HiveException {
     LOG.info("COMMONJOIN " + ((StructObjectInspector)inputObjInspectors[0]).getTypeName());
     totalSz = 0;
+    this.hconf = hconf;
     // Map that contains the rows for each alias
     storage = new HashMap<Byte, RowContainer<ArrayList<Object>>>();
 
@@ -232,29 +235,13 @@
         nr.add(null);
       dummyObj[pos] = nr;
       // there should be only 1 dummy object in the RowContainer
-      RowContainer<ArrayList<Object>> values = new RowContainer<ArrayList<Object>>(1);
+      RowContainer<ArrayList<Object>> values = getRowContainer(hconf, pos, alias, 1);
       values.add((ArrayList<Object>) dummyObj[pos]);
       dummyObjVectors[pos] = values;
 
       // if serde is null, the input doesn't need to be spilled out 
       // e.g., the output columns does not contains the input table
-      SerDe serde = getSpillSerDe(pos);
-      RowContainer rc = new RowContainer(joinCacheSize);
-      if ( serde != null ) {
-        
-        // arbitrary column names used internally for serializing to spill table
-        List<String> colList = new ArrayList<String>();
-        for ( int i = 0; i < sz; ++i )
-          colList.add(alias + "_VAL_" + i);
-        
-        // object inspector for serializing input tuples
-        StructObjectInspector rcOI = 
-          ObjectInspectorFactory.getStandardStructObjectInspector(
-                            colList,
-                            joinValuesStandardObjectInspectors.get(pos));
-        
-        rc.setSerDe(serde, rcOI);
-      }
+      RowContainer rc = getRowContainer(hconf, pos, alias, joinCacheSize);
       storage.put(pos, rc);
       pos++;
     }
@@ -788,5 +775,29 @@
   public void setPosToAliasMap(Map<Integer, Set<String>> posToAliasMap) {
     this.posToAliasMap = posToAliasMap;
   }
+  
+  RowContainer getRowContainer(Configuration hconf, byte pos, Byte alias,
+      int containerSize) throws HiveException {
+    tableDesc tblDesc = getSpillTableDesc(alias);
+    SerDe serde = getSpillSerDe(alias);
+
+    if (serde == null) {
+      containerSize = 1;
+    }
+
+    RowContainer rc = new RowContainer(containerSize);
+    StructObjectInspector rcOI = null;
+    if (tblDesc != null) {
+      // arbitrary column names used internally for serializing to spill table
+      List<String> colNames = Utilities.getColumnNames(tblDesc.getProperties());
+      // object inspector for serializing input tuples
+      rcOI = ObjectInspectorFactory.getStandardStructObjectInspector(colNames,
+          joinValuesStandardObjectInspectors.get(pos));
+    }
+
+    rc.setSerDe(serde, rcOI);
+    rc.setTableDesc(tblDesc);
+    return rc;
+  }
 
 }

Modified: hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=911060&r1=911059&r2=911060&view=diff
==============================================================================
--- hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Wed Feb 17 16:35:41 2010
@@ -80,14 +80,23 @@
   public static class MapJoinObjectCtx {
     ObjectInspector standardOI;
     SerDe      serde;
-
+    tableDesc tblDesc;
+    Configuration conf;
+    
     /**
      * @param standardOI
      * @param serde
      */
     public MapJoinObjectCtx(ObjectInspector standardOI, SerDe serde) {
+      this(standardOI, serde, null, null);
+    }
+    
+    public MapJoinObjectCtx(ObjectInspector standardOI, SerDe serde,
+        tableDesc tblDesc, Configuration conf) {
       this.standardOI = standardOI;
       this.serde = serde;
+      this.tblDesc = tblDesc;
+      this.conf = conf;
     }
 
     /**
@@ -104,6 +113,14 @@
       return serde;
     }
 
+    public tableDesc getTblDesc() {
+      return tblDesc;
+    }
+
+    public Configuration getConf() {
+      return conf;
+    }
+    
   }
 
   transient static Map<Integer, MapJoinObjectCtx> mapMetadata = new HashMap<Integer, MapJoinObjectCtx>();
@@ -214,7 +231,7 @@
               new MapJoinObjectCtx(
                   ObjectInspectorUtils.getStandardObjectInspector(keySerializer.getObjectInspector(),
                       ObjectInspectorCopyOption.WRITABLE),
-                  keySerializer));
+                  keySerializer, keyTableDesc, hconf));
 
           firstRow = false;
         }
@@ -240,8 +257,9 @@
 
         boolean needNewKey = true;
         if (o == null) {
-          res = new RowContainer();
-        	res.add(value);
+          int bucketSize = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEMAPJOINBUCKETCACHESIZE);
+          res = new RowContainer(bucketSize);
+          res.add(value);
         } else {
           res = o.getObj();
           res.add(value);
@@ -266,13 +284,14 @@
                           new MapJoinObjectCtx(
                                 ObjectInspectorUtils.getStandardObjectInspector(valueSerDe.getObjectInspector(),
                                   ObjectInspectorCopyOption.WRITABLE),
-                                valueSerDe));
+                                valueSerDe, valueTableDesc, hconf));
         }
         
         // Construct externalizable objects for key and value
         if ( needNewKey ) {
           MapJoinObjectKey keyObj = new MapJoinObjectKey(metadataKeyTag, key);
-	        MapJoinObjectValue valueObj = new MapJoinObjectValue(metadataValueTag[tag], res);
+          MapJoinObjectValue valueObj = new MapJoinObjectValue(metadataValueTag[tag], res);
+          valueObj.setConf(hconf);
           
           // This may potentially increase the size of the hashmap on the mapper
   	      if (res.size() > mapJoinRowsKey) {

Modified: hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=911060&r1=911059&r2=911060&view=diff
==============================================================================
--- hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Wed Feb 17 16:35:41 2010
@@ -58,6 +58,7 @@
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.serde.Constants;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
@@ -836,6 +837,20 @@
     return names;
   }
 
+  public static List<String> getColumnNames(Properties props) {
+    List<String> names = new ArrayList<String>();
+    String colNames = props.getProperty(Constants.LIST_COLUMNS);
+    String[] cols = colNames.trim().split(",");
+    if (cols != null) {
+      for (String col : cols) {
+        if (col != null && !col.trim().equals("")) {
+          names.add(col);
+        }
+      }
+    }
+    return names;
+  }
+
   public static void validateColumnNames(List<String> colNames,
       List<String> checkCols) throws SemanticException {
     Iterator<String> checkColsIter = checkCols.iterator();

Modified: hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java
URL: http://svn.apache.org/viewvc/hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java?rev=911060&r1=911059&r2=911060&view=diff
==============================================================================
--- hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java (original)
+++ hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java Wed Feb 17 16:35:41 2010
@@ -24,13 +24,16 @@
 import java.io.ObjectOutput;
 import java.util.ArrayList;
 
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator.MapJoinObjectCtx;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.conf.Configuration;
 
 /**
  * Map Join Object used for both key and value
@@ -39,8 +42,11 @@
 
   transient protected int     metadataTag;
   transient protected RowContainer obj;
+  transient protected Configuration conf;
+  transient protected int bucketSize;
 
   public MapJoinObjectValue() {
+    this.bucketSize = 100;
   }
 
   /**
@@ -50,6 +56,7 @@
   public MapJoinObjectValue(int metadataTag, RowContainer obj) {
     this.metadataTag = metadataTag;
     this.obj = obj;
+    this.bucketSize = 100;
   }
   
   public boolean equals(Object o) {
@@ -80,7 +87,9 @@
       MapJoinObjectCtx ctx = MapJoinOperator.getMapMetadata().get(Integer.valueOf(metadataTag));
       int sz = in.readInt();
 
-      RowContainer res = new RowContainer();
+      RowContainer res = new RowContainer(bucketSize);
+      res.setSerDe(ctx.getSerDe(), ctx.getStandardOI());
+      res.setTableDesc(ctx.getTblDesc());
       for (int pos = 0; pos < sz; pos++) {
         Writable val = ctx.getSerDe().getSerializedClass().newInstance();
         val.readFields(in);
@@ -152,4 +161,8 @@
     this.obj = obj;
   }
 
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+    bucketSize = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVEMAPJOINBUCKETCACHESIZE);
+  }
 }

Modified: hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java?rev=911060&r1=911059&r2=911060&view=diff
==============================================================================
--- hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java (original)
+++ hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java Wed Feb 17 16:35:41 2010
@@ -30,6 +30,7 @@
 import java.util.List;
 
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.tableDesc;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.serde2.SerDe;
@@ -89,6 +90,7 @@
   private SerDe serde;       // serialization/deserialization for the row
   private ObjectInspector standardOI;  // object inspector for the row
   private ArrayList dummyRow; // representing empty row (no columns since value art is null)
+  private tableDesc tblDesc;
   
   public RowContainer() {
     this(BLOCKSIZE);
@@ -330,4 +332,8 @@
     }
     return currBlock;
   }
+  
+  public void setTableDesc(tableDesc tblDesc) {
+    this.tblDesc = tblDesc;
+  }
 }