You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2010/02/17 17:35:42 UTC
svn commit: r911060 - in /hadoop/hive/branches/branch-0.5: ./
common/src/java/org/apache/hadoop/hive/conf/ conf/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/
Author: namit
Date: Wed Feb 17 16:35:41 2010
New Revision: 911060
URL: http://svn.apache.org/viewvc?rev=911060&view=rev
Log:
HIVE-1158. New parameter for specifying map join size
(Ning Zhang via namit)
Modified:
hadoop/hive/branches/branch-0.5/CHANGES.txt
hadoop/hive/branches/branch-0.5/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hadoop/hive/branches/branch-0.5/conf/hive-default.xml
hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java
hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
Modified: hadoop/hive/branches/branch-0.5/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/branches/branch-0.5/CHANGES.txt?rev=911060&r1=911059&r2=911060&view=diff
==============================================================================
--- hadoop/hive/branches/branch-0.5/CHANGES.txt (original)
+++ hadoop/hive/branches/branch-0.5/CHANGES.txt Wed Feb 17 16:35:41 2010
@@ -107,6 +107,9 @@
HIVE-1106. Support ALTER TABLE t ADD IF NOT EXIST PARTITION.
(Paul Yang via zshao)
+ HIVE-1158. New parameter for specifying map join size
+ (Ning Zhang via namit)
+
IMPROVEMENTS
HIVE-760. Add version info to META-INF/MANIFEST.MF.
Modified: hadoop/hive/branches/branch-0.5/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/branches/branch-0.5/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=911060&r1=911059&r2=911060&view=diff
==============================================================================
--- hadoop/hive/branches/branch-0.5/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/branches/branch-0.5/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Feb 17 16:35:41 2010
@@ -128,6 +128,7 @@
HIVEGROUPBYSKEW("hive.groupby.skewindata", "false"),
HIVEJOINEMITINTERVAL("hive.join.emit.interval", 1000),
HIVEJOINCACHESIZE("hive.join.cache.size", 25000),
+ HIVEMAPJOINBUCKETCACHESIZE("hive.mapjoin.bucket.cache.size", 100),
HIVEMAPJOINROWSIZE("hive.mapjoin.size.key", 10000),
HIVEMAPJOINCACHEROWS("hive.mapjoin.cache.numrows", 25000),
HIVEGROUPBYMAPINTERVAL("hive.groupby.mapaggr.checkinterval", 100000),
Modified: hadoop/hive/branches/branch-0.5/conf/hive-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/branches/branch-0.5/conf/hive-default.xml?rev=911060&r1=911059&r2=911060&view=diff
==============================================================================
--- hadoop/hive/branches/branch-0.5/conf/hive-default.xml (original)
+++ hadoop/hive/branches/branch-0.5/conf/hive-default.xml Wed Feb 17 16:35:41 2010
@@ -268,6 +268,18 @@
</property>
<property>
+ <name>hive.join.cache.size</name>
+ <value>25000</value>
+ <description>How many rows in the joining tables (except the streaming table) should be cached in memory. </description>
+</property>
+
+<property>
+ <name>hive.mapjoin.bucket.cache.size</name>
+ <value>100</value>
+ <description>How many values in each keys in the map-joined table should be cached in memory. </description>
+</property>
+
+<property>
<name>hive.mapjoin.maxsize</name>
<value>100000</value>
<description>Maximum # of rows of the small table that can be handled by map-side join. If the size is reached and hive.task.progress is set, a fatal error counter is set and the job will be killed.</description>
Modified: hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java?rev=911060&r1=911059&r2=911060&view=diff
==============================================================================
--- hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java (original)
+++ hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java Wed Feb 17 16:35:41 2010
@@ -193,9 +193,12 @@
return joinOutputObjectInspector;
}
+ Configuration hconf;
+
protected void initializeOp(Configuration hconf) throws HiveException {
LOG.info("COMMONJOIN " + ((StructObjectInspector)inputObjInspectors[0]).getTypeName());
totalSz = 0;
+ this.hconf = hconf;
// Map that contains the rows for each alias
storage = new HashMap<Byte, RowContainer<ArrayList<Object>>>();
@@ -232,29 +235,13 @@
nr.add(null);
dummyObj[pos] = nr;
// there should be only 1 dummy object in the RowContainer
- RowContainer<ArrayList<Object>> values = new RowContainer<ArrayList<Object>>(1);
+ RowContainer<ArrayList<Object>> values = getRowContainer(hconf, pos, alias, 1);
values.add((ArrayList<Object>) dummyObj[pos]);
dummyObjVectors[pos] = values;
// if serde is null, the input doesn't need to be spilled out
// e.g., the output columns does not contains the input table
- SerDe serde = getSpillSerDe(pos);
- RowContainer rc = new RowContainer(joinCacheSize);
- if ( serde != null ) {
-
- // arbitrary column names used internally for serializing to spill table
- List<String> colList = new ArrayList<String>();
- for ( int i = 0; i < sz; ++i )
- colList.add(alias + "_VAL_" + i);
-
- // object inspector for serializing input tuples
- StructObjectInspector rcOI =
- ObjectInspectorFactory.getStandardStructObjectInspector(
- colList,
- joinValuesStandardObjectInspectors.get(pos));
-
- rc.setSerDe(serde, rcOI);
- }
+ RowContainer rc = getRowContainer(hconf, pos, alias, joinCacheSize);
storage.put(pos, rc);
pos++;
}
@@ -788,5 +775,29 @@
public void setPosToAliasMap(Map<Integer, Set<String>> posToAliasMap) {
this.posToAliasMap = posToAliasMap;
}
+
+ RowContainer getRowContainer(Configuration hconf, byte pos, Byte alias,
+ int containerSize) throws HiveException {
+ tableDesc tblDesc = getSpillTableDesc(alias);
+ SerDe serde = getSpillSerDe(alias);
+
+ if (serde == null) {
+ containerSize = 1;
+ }
+
+ RowContainer rc = new RowContainer(containerSize);
+ StructObjectInspector rcOI = null;
+ if (tblDesc != null) {
+ // arbitrary column names used internally for serializing to spill table
+ List<String> colNames = Utilities.getColumnNames(tblDesc.getProperties());
+ // object inspector for serializing input tuples
+ rcOI = ObjectInspectorFactory.getStandardStructObjectInspector(colNames,
+ joinValuesStandardObjectInspectors.get(pos));
+ }
+
+ rc.setSerDe(serde, rcOI);
+ rc.setTableDesc(tblDesc);
+ return rc;
+ }
}
Modified: hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=911060&r1=911059&r2=911060&view=diff
==============================================================================
--- hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Wed Feb 17 16:35:41 2010
@@ -80,14 +80,23 @@
public static class MapJoinObjectCtx {
ObjectInspector standardOI;
SerDe serde;
-
+ tableDesc tblDesc;
+ Configuration conf;
+
/**
* @param standardOI
* @param serde
*/
public MapJoinObjectCtx(ObjectInspector standardOI, SerDe serde) {
+ this(standardOI, serde, null, null);
+ }
+
+ public MapJoinObjectCtx(ObjectInspector standardOI, SerDe serde,
+ tableDesc tblDesc, Configuration conf) {
this.standardOI = standardOI;
this.serde = serde;
+ this.tblDesc = tblDesc;
+ this.conf = conf;
}
/**
@@ -104,6 +113,14 @@
return serde;
}
+ public tableDesc getTblDesc() {
+ return tblDesc;
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+
}
transient static Map<Integer, MapJoinObjectCtx> mapMetadata = new HashMap<Integer, MapJoinObjectCtx>();
@@ -214,7 +231,7 @@
new MapJoinObjectCtx(
ObjectInspectorUtils.getStandardObjectInspector(keySerializer.getObjectInspector(),
ObjectInspectorCopyOption.WRITABLE),
- keySerializer));
+ keySerializer, keyTableDesc, hconf));
firstRow = false;
}
@@ -240,8 +257,9 @@
boolean needNewKey = true;
if (o == null) {
- res = new RowContainer();
- res.add(value);
+ int bucketSize = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEMAPJOINBUCKETCACHESIZE);
+ res = new RowContainer(bucketSize);
+ res.add(value);
} else {
res = o.getObj();
res.add(value);
@@ -266,13 +284,14 @@
new MapJoinObjectCtx(
ObjectInspectorUtils.getStandardObjectInspector(valueSerDe.getObjectInspector(),
ObjectInspectorCopyOption.WRITABLE),
- valueSerDe));
+ valueSerDe, valueTableDesc, hconf));
}
// Construct externalizable objects for key and value
if ( needNewKey ) {
MapJoinObjectKey keyObj = new MapJoinObjectKey(metadataKeyTag, key);
- MapJoinObjectValue valueObj = new MapJoinObjectValue(metadataValueTag[tag], res);
+ MapJoinObjectValue valueObj = new MapJoinObjectValue(metadataValueTag[tag], res);
+ valueObj.setConf(hconf);
// This may potentially increase the size of the hashmap on the mapper
if (res.size() > mapJoinRowsKey) {
Modified: hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=911060&r1=911059&r2=911060&view=diff
==============================================================================
--- hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Wed Feb 17 16:35:41 2010
@@ -58,6 +58,7 @@
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
@@ -836,6 +837,20 @@
return names;
}
+ public static List<String> getColumnNames(Properties props) {
+ List<String> names = new ArrayList<String>();
+ String colNames = props.getProperty(Constants.LIST_COLUMNS);
+ String[] cols = colNames.trim().split(",");
+ if (cols != null) {
+ for (String col : cols) {
+ if (col != null && !col.trim().equals("")) {
+ names.add(col);
+ }
+ }
+ }
+ return names;
+ }
+
public static void validateColumnNames(List<String> colNames,
List<String> checkCols) throws SemanticException {
Iterator<String> checkColsIter = checkCols.iterator();
Modified: hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java
URL: http://svn.apache.org/viewvc/hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java?rev=911060&r1=911059&r2=911060&view=diff
==============================================================================
--- hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java (original)
+++ hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java Wed Feb 17 16:35:41 2010
@@ -24,13 +24,16 @@
import java.io.ObjectOutput;
import java.util.ArrayList;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator.MapJoinObjectCtx;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.conf.Configuration;
/**
* Map Join Object used for both key and value
@@ -39,8 +42,11 @@
transient protected int metadataTag;
transient protected RowContainer obj;
+ transient protected Configuration conf;
+ transient protected int bucketSize;
public MapJoinObjectValue() {
+ this.bucketSize = 100;
}
/**
@@ -50,6 +56,7 @@
public MapJoinObjectValue(int metadataTag, RowContainer obj) {
this.metadataTag = metadataTag;
this.obj = obj;
+ this.bucketSize = 100;
}
public boolean equals(Object o) {
@@ -80,7 +87,9 @@
MapJoinObjectCtx ctx = MapJoinOperator.getMapMetadata().get(Integer.valueOf(metadataTag));
int sz = in.readInt();
- RowContainer res = new RowContainer();
+ RowContainer res = new RowContainer(bucketSize);
+ res.setSerDe(ctx.getSerDe(), ctx.getStandardOI());
+ res.setTableDesc(ctx.getTblDesc());
for (int pos = 0; pos < sz; pos++) {
Writable val = ctx.getSerDe().getSerializedClass().newInstance();
val.readFields(in);
@@ -152,4 +161,8 @@
this.obj = obj;
}
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ bucketSize = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVEMAPJOINBUCKETCACHESIZE);
+ }
}
Modified: hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java?rev=911060&r1=911059&r2=911060&view=diff
==============================================================================
--- hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java (original)
+++ hadoop/hive/branches/branch-0.5/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java Wed Feb 17 16:35:41 2010
@@ -30,6 +30,7 @@
import java.util.List;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.tableDesc;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.serde2.SerDe;
@@ -89,6 +90,7 @@
private SerDe serde; // serialization/deserialization for the row
private ObjectInspector standardOI; // object inspector for the row
private ArrayList dummyRow; // representing empty row (no columns since value art is null)
+ private tableDesc tblDesc;
public RowContainer() {
this(BLOCKSIZE);
@@ -330,4 +332,8 @@
}
return currBlock;
}
+
+ public void setTableDesc(tableDesc tblDesc) {
+ this.tblDesc = tblDesc;
+ }
}