You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2009/06/07 09:28:08 UTC
svn commit: r782333 - in /hadoop/hive/trunk: ./
common/src/java/org/apache/hadoop/hive/conf/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/optimizer/
Author: zshao
Date: Sun Jun 7 07:28:07 2009
New Revision: 782333
URL: http://svn.apache.org/viewvc?rev=782333&view=rev
Log:
HIVE-528. Map Join followup: split MapJoinObject into MapJoinObjectKey and MapJoinObjectValue. (Namit Jain via zshao)
Modified:
hadoop/hive/trunk/CHANGES.txt
hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinObject.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=782333&r1=782332&r2=782333&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Sun Jun 7 07:28:07 2009
@@ -64,6 +64,9 @@
HIVE-490. Add missing configuration variables to hive-default.xml.
(Yongqiang He via zshao)
+ HIVE-528. Map Join followup: split MapJoinObject into MapJoinObjectKey and
+ MapJoinObjectValue. (Namit Jain via zshao)
+
OPTIMIZATIONS
HIVE-279. Predicate Pushdown support (Prasad Chakka via athusoo).
Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=782333&r1=782332&r2=782333&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Sun Jun 7 07:28:07 2009
@@ -113,7 +113,6 @@
HIVEMAPSIDEAGGREGATE("hive.map.aggr", "true"),
HIVEGROUPBYSKEW("hive.groupby.skewindata", "false"),
HIVEJOINEMITINTERVAL("hive.join.emit.interval", 1000),
- HIVEMAPJOINNUMROWSCACHE("hive.mapjoin.numrows", 1000),
HIVEMAPJOINROWSIZE("hive.mapjoin.size.key", 10000),
HIVEMAPJOINCACHEROWS("hive.mapjoin.cache.numrows", 10000),
HIVEGROUPBYMAPINTERVAL("hive.groupby.mapaggr.checkinterval", 100000),
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinObject.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinObject.java?rev=782333&r1=782332&r2=782333&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinObject.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinObject.java Sun Jun 7 07:28:07 2009
@@ -1,193 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Vector;
-
-import org.apache.hadoop.hive.ql.exec.MapJoinOperator.MapJoinObjectCtx;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.hive.serde2.lazy.LazyObject;
-import org.apache.hadoop.hive.serde2.lazy.LazyStruct;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-
-/**
- * Map Join Object used for both key and value
- */
-public class MapJoinObject implements Externalizable {
-
- transient protected int metadataTag;
- transient protected int objectTypeTag;
- transient protected Object obj;
-
- public MapJoinObject() {
- }
-
- /**
- * @param metadataTag
- * @param objectTypeTag
- * @param obj
- */
- public MapJoinObject(int metadataTag, int objectTypeTag, Object obj) {
- this.metadataTag = metadataTag;
- this.objectTypeTag = objectTypeTag;
- this.obj = obj;
- }
-
- public boolean equals(Object o) {
- if (o instanceof MapJoinObject) {
- MapJoinObject mObj = (MapJoinObject)o;
- if ((mObj.getMetadataTag() == metadataTag) && (mObj.getObjectTypeTag() == objectTypeTag)) {
- if ((obj == null) && (mObj.getObj() == null))
- return true;
- if ((obj != null) && (mObj.getObj() != null) && (mObj.getObj().equals(obj)))
- return true;
- }
- }
-
- return false;
- }
-
- public int hashCode() {
- return (obj == null) ? 0 : obj.hashCode();
- }
-
- @Override
- public void readExternal(ObjectInput in) throws IOException,
- ClassNotFoundException {
- try {
- metadataTag = in.readInt();
- objectTypeTag = in.readInt();
-
- // get the tableDesc from the map stored in the mapjoin operator
- MapJoinObjectCtx ctx = MapJoinOperator.getMapMetadata().get(new Integer(metadataTag));
- Writable val = null;
-
- assert ((objectTypeTag == 1) || (objectTypeTag == 2));
- if (objectTypeTag == 1) {
- val = new BytesWritable();
- val.readFields(in);
- obj = (ArrayList<Object>)ctx.getDeserializer().deserialize(val);
- }
- else if (objectTypeTag == 2) {
- int sz = in.readInt();
-
- Vector<ArrayList<Object>> res = new Vector<ArrayList<Object>>();
- for (int pos = 0; pos < sz; pos++) {
- ArrayList<Object> memObj = new ArrayList<Object>();
- val = new Text();
- val.readFields(in);
- StructObjectInspector objIns = (StructObjectInspector)ctx.getDeserObjInspector();
- LazyStruct lazyObj = (LazyStruct)(((LazyObject)ctx.getDeserializer().deserialize(val)).getObject());
- List<? extends StructField> listFields = objIns.getAllStructFieldRefs();
- int k = 0;
- for (StructField fld : listFields) {
- memObj.add(objIns.getStructFieldData(lazyObj, fld));
- }
-
- res.add(memObj);
- }
- obj = res;
- }
- } catch (Exception e) {
- throw new IOException(e.getMessage());
- }
- }
-
- @Override
- public void writeExternal(ObjectOutput out) throws IOException {
- try {
-
- out.writeInt(metadataTag);
- out.writeInt(objectTypeTag);
-
- // get the tableDesc from the map stored in the mapjoin operator
- MapJoinObjectCtx ctx = MapJoinOperator.getMapMetadata().get(new Integer(metadataTag));
-
- // Different processing for key and value
- if (objectTypeTag == 1) {
- Writable val = ctx.getSerializer().serialize(obj, ctx.getSerObjInspector());
- val.write(out);
- }
- else if (objectTypeTag == 2) {
- Vector<Object> v = (Vector<Object>)obj;
- out.writeInt(v.size());
-
- for (int pos = 0; pos < v.size(); pos++) {
- Writable val = ctx.getSerializer().serialize(v.get(pos), ctx.getSerObjInspector());
- val.write(out);
- }
- }
- }
- catch (Exception e) {
- throw new IOException(e.getMessage());
- }
- }
-
- /**
- * @return the metadataTag
- */
- public int getMetadataTag() {
- return metadataTag;
- }
-
- /**
- * @param metadataTag the metadataTag to set
- */
- public void setMetadataTag(int metadataTag) {
- this.metadataTag = metadataTag;
- }
-
- /**
- * @return the objectTypeTag
- */
- public int getObjectTypeTag() {
- return objectTypeTag;
- }
-
- /**
- * @param objectTypeTag the objectTypeTag to set
- */
- public void setObjectTypeTag(int objectTypeTag) {
- this.objectTypeTag = objectTypeTag;
- }
-
- /**
- * @return the obj
- */
- public Object getObj() {
- return obj;
- }
-
- /**
- * @param obj the obj to set
- */
- public void setObj(Object obj) {
- this.obj = obj;
- }
-
-}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=782333&r1=782332&r2=782333&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Sun Jun 7 07:28:07 2009
@@ -256,22 +256,19 @@
}
HTree hashTable = mapJoinTables.get(alias);
- MapJoinObject keyMap = new MapJoinObject(metadataKeyTag, 1, key);
- MapJoinObject o = (MapJoinObject)hashTable.get(keyMap);
+ MapJoinObjectKey keyMap = new MapJoinObjectKey(metadataKeyTag, key);
+ MapJoinObjectValue o = (MapJoinObjectValue)hashTable.get(keyMap);
Vector<ArrayList<Object>> res = null;
if (o == null) {
res = new Vector<ArrayList<Object>>();
}
else {
- res = (Vector<ArrayList<Object>>)o.getObj();
+ res = o.getObj();
}
res.add(value);
-
- // TODO: put some warning if the size of res exceeds a given threshold
-
if (metadataValueTag[tag] == -1) {
metadataValueTag[tag] = nextVal++;
@@ -294,40 +291,34 @@
}
// Construct externalizable objects for key and value
- MapJoinObject keyObj = new MapJoinObject();
-
- // currently, key is always byteswritable and value is text - TODO: generalize this
- keyObj.setMetadataTag(metadataKeyTag);
- keyObj.setObjectTypeTag(1);
- keyObj.setObj(key);
-
- MapJoinObject valueObj = new MapJoinObject();
-
- valueObj.setMetadataTag(metadataValueTag[tag]);
- valueObj.setObjectTypeTag(2);
- valueObj.setObj(res);
+ MapJoinObjectKey keyObj = new MapJoinObjectKey(metadataKeyTag, key);
+ MapJoinObjectValue valueObj = new MapJoinObjectValue(metadataValueTag[tag], res);
if (res.size() > 1)
hashTable.remove(keyObj);
+ // This may potentially increase the size of the hashmap on the mapper
+ if (res.size() > mapJoinRowsKey) {
+ LOG.warn("Number of values for a given key " + keyObj + " are " + res.size());
+ }
+
hashTable.put(keyObj, valueObj);
return;
}
-
// Add the value to the vector
storage.get(alias).add(value);
for (Byte pos : order) {
if (pos.intValue() != tag) {
- MapJoinObject keyMap = new MapJoinObject(metadataKeyTag, 1, key);
- MapJoinObject o = (MapJoinObject)mapJoinTables.get(pos).get(keyMap);
+ MapJoinObjectKey keyMap = new MapJoinObjectKey(metadataKeyTag, key);
+ MapJoinObjectValue o = (MapJoinObjectValue)mapJoinTables.get(pos).get(keyMap);
if (o == null) {
storage.put(pos, new Vector<ArrayList<Object>>());
}
else {
- storage.put(pos, (Vector<ArrayList<Object>>)o.getObj());
+ storage.put(pos, o.getObj());
}
}
}
@@ -363,7 +354,7 @@
super.close(abort);
}
- public static void deleteDir(File dir) {
+ private void deleteDir(File dir) {
if (dir.isDirectory()) {
String[] children = dir.list();
for (int i = 0; i < children.length; i++) {
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=782333&r1=782332&r2=782333&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Sun Jun 7 07:28:07 2009
@@ -415,7 +415,7 @@
transient protected int[] childOperatorsTag;
/**
- * Replace one child with another at the same position.
+ * Replace one child with another at the same position. The parent of the child is not changed
* @param child the old child
* @param newChild the new child
*/
@@ -423,7 +423,6 @@
int childIndex = childOperators.indexOf(child);
assert childIndex != -1;
childOperators.set(childIndex, newChild);
- // TODO: set parent for newChild
}
public void removeChild(Operator<? extends Serializable> child) {
@@ -433,10 +432,17 @@
childOperators = null;
else
childOperators.remove(childIndex);
+
+ int parentIndex = child.getParentOperators().indexOf(this);
+ assert parentIndex != -1;
+ if (child.getParentOperators().size() == 1)
+ child.setParentOperators(null);
+ else
+ child.getParentOperators().remove(parentIndex);
}
/**
- * Replace one parent with another at the same position.
+ * Replace one parent with another at the same position. Chilren of the new parent are not updated
* @param parent the old parent
* @param newParent the new parent
*/
@@ -444,7 +450,6 @@
int parentIndex = parentOperators.indexOf(parent);
assert parentIndex != -1;
parentOperators.set(parentIndex, newParent);
- // TODO: set the child in newParent correctly
}
protected void forward(Object row, ObjectInspector rowInspector) throws HiveException {
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=782333&r1=782332&r2=782333&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Sun Jun 7 07:28:07 2009
@@ -242,7 +242,6 @@
for (Operator<? extends Serializable> childOp : childOps)
childOp.replaceParent(op, mapJoinOp);
- // TODO: do as part of replaceParent
mapJoinOp.setChildOperators(childOps);
mapJoinOp.setParentOperators(newParentOps);
op.setChildOperators(null);