You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/08/16 17:52:43 UTC
svn commit: r1514760 [1/2] - in /hive/trunk/ql/src:
java/org/apache/hadoop/hive/ql/exec/
java/org/apache/hadoop/hive/ql/exec/mapjoin/
java/org/apache/hadoop/hive/ql/exec/mr/
java/org/apache/hadoop/hive/ql/exec/persistence/
test/org/apache/hadoop/hive/q...
Author: hashutosh
Date: Fri Aug 16 15:52:42 2013
New Revision: 1514760
URL: http://svn.apache.org/r1514760
Log:
HIVE-4838 : Refactor MapJoin HashMap code to improve testability and readability (Brock Noland via Ashutosh Chauhan)
Added:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractMapJoinTableContainer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectSerDeContext.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/mapjoin/
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/mapjoin/TestMapJoinMemoryExhaustionHandler.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinEqualityTableContainer.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinKey.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinRowContainer.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinTableContainer.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/Utilities.java
Removed:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinMetaData.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractMapJoinKey.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/DCLLItem.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MRU.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinDoubleKeys.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectKey.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinSingleKey.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestHashMapWrapper.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinKeys.java
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractRowContainer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java?rev=1514760&r1=1514759&r2=1514760&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java Fri Aug 16 15:52:42 2013
@@ -23,8 +23,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.persistence.AbstractMapJoinKey;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
@@ -54,7 +53,7 @@ public abstract class AbstractMapJoinOpe
protected transient byte posBigTable = -1; // one of the tables that is not in memory
- protected transient RowContainer<ArrayList<Object>> emptyList = null;
+ protected transient RowContainer<List<Object>> emptyList = null;
transient int numMapRowsRead;
@@ -95,9 +94,9 @@ public abstract class AbstractMapJoinOpe
// all other tables are small, and are cached in the hash table
posBigTable = (byte) conf.getPosBigTable();
- emptyList = new RowContainer<ArrayList<Object>>(1, hconf, reporter);
+ emptyList = new RowContainer<List<Object>>(1, hconf, reporter);
- RowContainer bigPosRC = JoinUtil.getRowContainer(hconf,
+ RowContainer<List<Object>> bigPosRC = JoinUtil.getRowContainer(hconf,
rowContainerStandardObjectInspectors[posBigTable],
posBigTable, joinCacheSize,spillTableDesc, conf,
!hasFilter(posBigTable), reporter);
@@ -160,7 +159,7 @@ public abstract class AbstractMapJoinOpe
}
// returns true if there are elements in key list and any of them is null
- protected boolean hasAnyNulls(AbstractMapJoinKey key) {
+ protected boolean hasAnyNulls(MapJoinKey key) {
return key.hasAnyNulls(nullsafes);
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java?rev=1514760&r1=1514759&r2=1514760&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java Fri Aug 16 15:52:42 2013
@@ -93,7 +93,7 @@ public abstract class CommonJoinOperator
protected transient ArrayList<Object>[] dummyObj;
// empty rows for each table
- protected transient RowContainer<ArrayList<Object>>[] dummyObjVectors;
+ protected transient RowContainer<List<Object>>[] dummyObjVectors;
protected transient int totalSz; // total size of the composite object
@@ -108,7 +108,7 @@ public abstract class CommonJoinOperator
// input is too large
// to fit in memory
- AbstractRowContainer<ArrayList<Object>>[] storage; // map b/w table alias
+ AbstractRowContainer<List<Object>>[] storage; // map b/w table alias
// to RowContainer
int joinEmitInterval = -1;
int joinCacheSize = 0;
@@ -274,7 +274,7 @@ public abstract class CommonJoinOperator
}
dummyObj[pos] = nr;
// there should be only 1 dummy object in the RowContainer
- RowContainer<ArrayList<Object>> values = JoinUtil.getRowContainer(hconf,
+ RowContainer<List<Object>> values = JoinUtil.getRowContainer(hconf,
rowContainerStandardObjectInspectors[pos],
alias, 1, spillTableDesc, conf, !hasFilter(pos), reporter);
@@ -283,7 +283,7 @@ public abstract class CommonJoinOperator
// if serde is null, the input doesn't need to be spilled out
// e.g., the output columns does not contains the input table
- RowContainer rc = JoinUtil.getRowContainer(hconf,
+ RowContainer<List<Object>> rc = JoinUtil.getRowContainer(hconf,
rowContainerStandardObjectInspectors[pos],
alias, joinCacheSize, spillTableDesc, conf, !hasFilter(pos), reporter);
storage[pos] = rc;
@@ -328,7 +328,7 @@ public abstract class CommonJoinOperator
public void startGroup() throws HiveException {
LOG.trace("Join: Starting new group");
newGroupStarted = true;
- for (AbstractRowContainer<ArrayList<Object>> alw : storage) {
+ for (AbstractRowContainer<List<Object>> alw : storage) {
alw.clear();
}
super.startGroup();
@@ -443,7 +443,7 @@ public abstract class CommonJoinOperator
private void genJoinObject() throws HiveException {
boolean rightFirst = true;
boolean hasFilter = hasFilter(order[0]);
- AbstractRowContainer<ArrayList<Object>> aliasRes = storage[order[0]];
+ AbstractRowContainer<List<Object>> aliasRes = storage[order[0]];
for (List<Object> rightObj = aliasRes.first(); rightObj != null; rightObj = aliasRes.next()) {
boolean rightNull = rightObj == dummyObj[0];
if (hasFilter) {
@@ -471,7 +471,7 @@ public abstract class CommonJoinOperator
int right = joinCond.getRight();
// search for match in the rhs table
- AbstractRowContainer<ArrayList<Object>> aliasRes = storage[order[aliasNum]];
+ AbstractRowContainer<List<Object>> aliasRes = storage[order[aliasNum]];
boolean done = false;
boolean loopAgain = false;
@@ -641,8 +641,8 @@ public abstract class CommonJoinOperator
private void genUniqueJoinObject(int aliasNum, int forwardCachePos)
throws HiveException {
- AbstractRowContainer<ArrayList<Object>> alias = storage[order[aliasNum]];
- for (ArrayList<Object> row = alias.first(); row != null; row = alias.next()) {
+ AbstractRowContainer<List<Object>> alias = storage[order[aliasNum]];
+ for (List<Object> row = alias.first(); row != null; row = alias.next()) {
int sz = joinValues[order[aliasNum]].size();
int p = forwardCachePos;
for (int j = 0; j < sz; j++) {
@@ -662,7 +662,7 @@ public abstract class CommonJoinOperator
int p = 0;
for (int i = 0; i < numAliases; i++) {
int sz = joinValues[order[i]].size();
- ArrayList<Object> obj = storage[order[i]].first();
+ List<Object> obj = storage[order[i]].first();
for (int j = 0; j < sz; j++) {
forwardCache[p++] = obj.get(j);
}
@@ -684,7 +684,7 @@ public abstract class CommonJoinOperator
boolean allOne = true;
for (int i = 0; i < numAliases; i++) {
Byte alias = order[i];
- AbstractRowContainer<ArrayList<Object>> alw = storage[alias];
+ AbstractRowContainer<List<Object>> alw = storage[alias];
if (alw.size() != 1) {
allOne = false;
@@ -717,7 +717,7 @@ public abstract class CommonJoinOperator
boolean hasEmpty = false;
for (int i = 0; i < numAliases; i++) {
Byte alias = order[i];
- AbstractRowContainer<ArrayList<Object>> alw = storage[alias];
+ AbstractRowContainer<List<Object>> alw = storage[alias];
if (noOuterJoin) {
if (alw.size() == 0) {
@@ -737,7 +737,7 @@ public abstract class CommonJoinOperator
} else {
mayHasMoreThanOne = true;
if (!hasEmpty) {
- for (ArrayList<Object> row = alw.first(); row != null; row = alw.next()) {
+ for (List<Object> row = alw.first(); row != null; row = alw.next()) {
reportProgress();
if (hasAnyFiltered(alias, row)) {
hasEmpty = true;
@@ -784,7 +784,7 @@ public abstract class CommonJoinOperator
@Override
public void closeOp(boolean abort) throws HiveException {
LOG.trace("Join Op close");
- for (AbstractRowContainer<ArrayList<Object>> alw : storage) {
+ for (AbstractRowContainer<List<Object>> alw : storage) {
if (alw != null) {
alw.clear(); // clean up the temp files
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java?rev=1514760&r1=1514759&r2=1514760&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java Fri Aug 16 15:52:42 2013
@@ -17,7 +17,8 @@
*/
package org.apache.hadoop.hive.ql.exec;
-import java.io.File;
+import java.io.BufferedOutputStream;
+import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
@@ -28,11 +29,13 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.persistence.AbstractMapJoinKey;
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler;
import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectValue;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer;
-import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.HashTableSinkDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -41,10 +44,8 @@ import org.apache.hadoop.hive.ql.session
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
-import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.util.ReflectionUtils;
@@ -54,128 +55,51 @@ public class HashTableSinkOperator exten
private static final long serialVersionUID = 1L;
private static final Log LOG = LogFactory.getLog(HashTableSinkOperator.class.getName());
- protected static MapJoinMetaData metadata = new MapJoinMetaData();
- // from abstract map join operator
/**
* The expressions for join inputs's join keys.
*/
- protected transient List<ExprNodeEvaluator>[] joinKeys;
+ private transient List<ExprNodeEvaluator>[] joinKeys;
/**
* The ObjectInspectors for the join inputs's join keys.
*/
- protected transient List<ObjectInspector>[] joinKeysObjectInspectors;
- /**
- * The standard ObjectInspectors for the join inputs's join keys.
- */
- protected transient List<ObjectInspector>[] joinKeysStandardObjectInspectors;
-
- protected transient int posBigTableAlias = -1; // one of the tables that is not in memory
+ private transient List<ObjectInspector>[] joinKeysObjectInspectors;
- protected transient RowContainer<ArrayList<Object>> emptyList = null;
+ private transient int posBigTableAlias = -1; // one of the tables that is not in memory
- transient int numMapRowsRead;
- protected transient int totalSz; // total size of the composite object
- transient boolean firstRow;
/**
* The filters for join
*/
- protected transient List<ExprNodeEvaluator>[] joinFilters;
+ private transient List<ExprNodeEvaluator>[] joinFilters;
- protected transient int[][] filterMaps;
+ private transient int[][] filterMaps;
- protected transient int numAliases; // number of aliases
/**
* The expressions for join outputs.
*/
- protected transient List<ExprNodeEvaluator>[] joinValues;
+ private transient List<ExprNodeEvaluator>[] joinValues;
/**
* The ObjectInspectors for the join inputs.
*/
- protected transient List<ObjectInspector>[] joinValuesObjectInspectors;
+ private transient List<ObjectInspector>[] joinValuesObjectInspectors;
/**
* The ObjectInspectors for join filters.
*/
- protected transient List<ObjectInspector>[] joinFilterObjectInspectors;
- /**
- * The standard ObjectInspectors for the join inputs.
- */
- protected transient List<ObjectInspector>[] joinValuesStandardObjectInspectors;
+ private transient List<ObjectInspector>[] joinFilterObjectInspectors;
- protected transient List<ObjectInspector>[] rowContainerStandardObjectInspectors;
-
- protected transient Byte[] order; // order in which the results should
- Configuration hconf;
- protected transient Byte alias;
- protected transient TableDesc[] spillTableDesc; // spill tables are
-
- protected transient HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>[] mapJoinTables;
- protected transient boolean noOuterJoin;
+ private transient Byte[] order; // order in which the results should
+ private Configuration hconf;
+ private transient Byte alias;
+
+ private transient MapJoinTableContainer[] mapJoinTables;
+ private transient MapJoinTableContainerSerDe[] mapJoinTableSerdes;
+
+ private transient boolean noOuterJoin;
private long rowNumber = 0;
- protected transient LogHelper console;
+ private transient LogHelper console;
private long hashTableScale;
- private boolean isAbort = false;
-
- public static class HashTableSinkObjectCtx {
- ObjectInspector standardOI;
- SerDe serde;
- TableDesc tblDesc;
- Configuration conf;
- boolean hasFilter;
-
- /**
- * @param standardOI
- * @param serde
- */
- public HashTableSinkObjectCtx(ObjectInspector standardOI, SerDe serde, TableDesc tblDesc,
- boolean hasFilter, Configuration conf) {
- this.standardOI = standardOI;
- this.serde = serde;
- this.tblDesc = tblDesc;
- this.hasFilter = hasFilter;
- this.conf = conf;
- }
-
- /**
- * @return the standardOI
- */
- public ObjectInspector getStandardOI() {
- return standardOI;
- }
-
- /**
- * @return the serde
- */
- public SerDe getSerDe() {
- return serde;
- }
-
- public TableDesc getTblDesc() {
- return tblDesc;
- }
-
- public boolean hasFilterTag() {
- return hasFilter;
- }
-
- public Configuration getConf() {
- return conf;
- }
-
- }
-
- public static MapJoinMetaData getMetadata() {
- return metadata;
- }
-
- private static final transient String[] FATAL_ERR_MSG = {
- null, // counter value 0 means no error
- "Mapside join exceeds available memory. "
- + "Please try removing the mapjoin hint."};
- private final int metadataKeyTag = -1;
- transient int[] metadataValueTag;
-
-
+ private MapJoinMemoryExhaustionHandler memoryExhaustionHandler;
+
public HashTableSinkOperator() {
}
@@ -189,8 +113,7 @@ public class HashTableSinkOperator exten
protected void initializeOp(Configuration hconf) throws HiveException {
boolean isSilent = HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVESESSIONSILENT);
console = new LogHelper(LOG, isSilent);
- numMapRowsRead = 0;
- firstRow = true;
+ memoryExhaustionHandler = new MapJoinMemoryExhaustionHandler(console, conf.getHashtableMemoryUsage());
// for small tables only; so get the big table position first
posBigTableAlias = conf.getPosBigTable();
@@ -198,9 +121,7 @@ public class HashTableSinkOperator exten
order = conf.getTagOrder();
// initialize some variables, which used to be initialized in CommonJoinOperator
- numAliases = conf.getExprs().size();
this.hconf = hconf;
- totalSz = 0;
noOuterJoin = conf.isNoOuterJoin();
filterMaps = conf.getFilterMap();
@@ -212,16 +133,12 @@ public class HashTableSinkOperator exten
JoinUtil.populateJoinKeyValue(joinKeys, conf.getKeys(), posBigTableAlias);
joinKeysObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinKeys,
inputObjInspectors, posBigTableAlias, tagLen);
- joinKeysStandardObjectInspectors = JoinUtil.getStandardObjectInspectors(
- joinKeysObjectInspectors, posBigTableAlias, tagLen);
// process join values
joinValues = new List[tagLen];
JoinUtil.populateJoinKeyValue(joinValues, conf.getExprs(), posBigTableAlias);
joinValuesObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinValues,
inputObjInspectors, posBigTableAlias, tagLen);
- joinValuesStandardObjectInspectors = JoinUtil.getStandardObjectInspectors(
- joinValuesObjectInspectors, posBigTableAlias, tagLen);
// process join filters
joinFilters = new List[tagLen];
@@ -229,9 +146,7 @@ public class HashTableSinkOperator exten
joinFilterObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinFilters,
inputObjInspectors, posBigTableAlias, tagLen);
- if (noOuterJoin) {
- rowContainerStandardObjectInspectors = joinValuesStandardObjectInspectors;
- } else {
+ if (!noOuterJoin) {
List<ObjectInspector>[] rowContainerObjectInspectors = new List[tagLen];
for (Byte alias : order) {
if (alias == posBigTableAlias) {
@@ -245,43 +160,43 @@ public class HashTableSinkOperator exten
}
rowContainerObjectInspectors[alias] = rcOIs;
}
- rowContainerStandardObjectInspectors = getStandardObjectInspectors(
- rowContainerObjectInspectors, tagLen);
- }
-
- metadataValueTag = new int[numAliases];
- for (int pos = 0; pos < numAliases; pos++) {
- metadataValueTag[pos] = -1;
}
- mapJoinTables = new HashMapWrapper[tagLen];
-
+ mapJoinTables = new MapJoinTableContainer[tagLen];
+ mapJoinTableSerdes = new MapJoinTableContainerSerDe[tagLen];
int hashTableThreshold = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD);
float hashTableLoadFactor = HiveConf.getFloatVar(hconf,
HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR);
- float hashTableMaxMemoryUsage = this.getConf().getHashtableMemoryUsage();
-
hashTableScale = HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVEHASHTABLESCALE);
if (hashTableScale <= 0) {
hashTableScale = 1;
}
-
- // initialize the hash tables for other tables
- for (Byte pos : order) {
- if (pos == posBigTableAlias) {
- continue;
+ try {
+ TableDesc keyTableDesc = conf.getKeyTblDesc();
+ SerDe keySerde = (SerDe) ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(),
+ null);
+ keySerde.initialize(null, keyTableDesc.getProperties());
+ MapJoinObjectSerDeContext keyContext = new MapJoinObjectSerDeContext(keySerde, false);
+ for (Byte pos : order) {
+ if (pos == posBigTableAlias) {
+ continue;
+ }
+ mapJoinTables[pos] = new HashMapWrapper(hashTableThreshold, hashTableLoadFactor);
+ TableDesc valueTableDesc = conf.getValueTblFilteredDescs().get(pos);
+ SerDe valueSerDe = (SerDe) ReflectionUtils.newInstance(valueTableDesc.getDeserializerClass(), null);
+ valueSerDe.initialize(null, valueTableDesc.getProperties());
+ mapJoinTableSerdes[pos] = new MapJoinTableContainerSerDe(keyContext, new MapJoinObjectSerDeContext(
+ valueSerDe, hasFilter(pos)));
}
-
- HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashTable = new HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>(
- hashTableThreshold, hashTableLoadFactor, hashTableMaxMemoryUsage);
-
- mapJoinTables[pos] = hashTable;
+ } catch (SerDeException e) {
+ throw new HiveException(e);
}
}
- protected static List<ObjectInspector>[] getStandardObjectInspectors(
+ private static List<ObjectInspector>[] getStandardObjectInspectors(
List<ObjectInspector>[] aliasToObjectInspectors, int maxTag) {
+ @SuppressWarnings("unchecked")
List<ObjectInspector>[] result = new List[maxTag];
for (byte alias = 0; alias < aliasToObjectInspectors.length; alias++) {
List<ObjectInspector> oiList = aliasToObjectInspectors[alias];
@@ -299,104 +214,34 @@ public class HashTableSinkOperator exten
}
- private void setKeyMetaData() throws SerDeException {
- TableDesc keyTableDesc = conf.getKeyTblDesc();
- SerDe keySerializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(),
- null);
- keySerializer.initialize(null, keyTableDesc.getProperties());
-
- metadata.put(Integer.valueOf(metadataKeyTag), new HashTableSinkObjectCtx(
- ObjectInspectorUtils.getStandardObjectInspector(keySerializer.getObjectInspector(),
- ObjectInspectorCopyOption.WRITABLE), keySerializer, keyTableDesc, false, hconf));
- }
-
- private boolean hasFilter(int alias) {
- return filterMaps != null && filterMaps[alias] != null;
- }
/*
* This operator only process small tables Read the key/value pairs Load them into hashtable
*/
@Override
public void processOp(Object row, int tag) throws HiveException {
- // let the mapJoinOp process these small tables
- try {
- if (firstRow) {
- // generate the map metadata
- setKeyMetaData();
- firstRow = false;
+ alias = (byte)tag;
+ // compute keys and values as StandardObjects
+ MapJoinKey key = JoinUtil.computeMapJoinKeys(null, row, joinKeys[alias],
+ joinKeysObjectInspectors[alias]);
+ Object[] value = JoinUtil.computeMapJoinValues(row, joinValues[alias],
+ joinValuesObjectInspectors[alias], joinFilters[alias], joinFilterObjectInspectors[alias],
+ filterMaps == null ? null : filterMaps[alias]);
+ MapJoinTableContainer tableContainer = mapJoinTables[alias];
+ MapJoinRowContainer rowContainer = tableContainer.get(key);
+ if (rowContainer == null) {
+ rowContainer = new MapJoinRowContainer();
+ rowContainer.add(value);
+ rowNumber++;
+ if (rowNumber > hashTableScale && rowNumber % hashTableScale == 0) {
+ memoryExhaustionHandler.checkMemoryStatus(tableContainer.size(), rowNumber);
}
- alias = (byte)tag;
-
- // compute keys and values as StandardObjects
- AbstractMapJoinKey keyMap = JoinUtil.computeMapJoinKeys(row, joinKeys[alias],
- joinKeysObjectInspectors[alias]);
-
- Object[] value = JoinUtil.computeMapJoinValues(row, joinValues[alias],
- joinValuesObjectInspectors[alias], joinFilters[alias], joinFilterObjectInspectors[alias],
- filterMaps == null ? null : filterMaps[alias]);
-
- HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashTable = mapJoinTables[alias];
-
- MapJoinObjectValue o = hashTable.get(keyMap);
- MapJoinRowContainer<Object[]> res = null;
-
- boolean needNewKey = true;
- if (o == null) {
- res = new MapJoinRowContainer<Object[]>();
- res.add(value);
-
- if (metadataValueTag[tag] == -1) {
- metadataValueTag[tag] = order[tag];
- setValueMetaData(tag);
- }
-
- // Construct externalizable objects for key and value
- if (needNewKey) {
- MapJoinObjectValue valueObj = new MapJoinObjectValue(
- metadataValueTag[tag], res);
-
- rowNumber++;
- if (rowNumber > hashTableScale && rowNumber % hashTableScale == 0) {
- isAbort = hashTable.isAbort(rowNumber, console);
- if (isAbort) {
- throw new HiveException("RunOutOfMeomoryUsage");
- }
- }
- hashTable.put(keyMap, valueObj);
- }
-
- } else {
- res = o.getObj();
- res.add(value);
- }
-
-
- } catch (SerDeException e) {
- throw new HiveException(e);
+ tableContainer.put(key, rowContainer);
+ } else {
+ rowContainer.add(value);
}
-
}
-
- private void setValueMetaData(int tag) throws SerDeException {
- TableDesc valueTableDesc = conf.getValueTblFilteredDescs().get(tag);
- SerDe valueSerDe = (SerDe) ReflectionUtils.newInstance(valueTableDesc.getDeserializerClass(),
- null);
-
- valueSerDe.initialize(null, valueTableDesc.getProperties());
-
- List<ObjectInspector> newFields = rowContainerStandardObjectInspectors[alias];
- int length = newFields.size();
- List<String> newNames = new ArrayList<String>(length);
- for (int i = 0; i < length; i++) {
- String tmp = new String("tmp_" + i);
- newNames.add(tmp);
- }
- StandardStructObjectInspector standardOI = ObjectInspectorFactory
- .getStandardStructObjectInspector(newNames, newFields);
-
- int alias = Integer.valueOf(metadataValueTag[tag]);
- metadata.put(Integer.valueOf(metadataValueTag[tag]), new HashTableSinkObjectCtx(
- standardOI, valueSerDe, valueTableDesc, hasFilter(alias), hconf));
+ private boolean hasFilter(int alias) {
+ return filterMaps != null && filterMaps[alias] != null;
}
@Override
@@ -405,42 +250,36 @@ public class HashTableSinkOperator exten
if (mapJoinTables != null) {
// get tmp file URI
String tmpURI = this.getExecContext().getLocalWork().getTmpFileURI();
- LOG.info("Get TMP URI: " + tmpURI);
- long fileLength;
+ LOG.info("Temp URI for side table: " + tmpURI);
for (byte tag = 0; tag < mapJoinTables.length; tag++) {
// get the key and value
- HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashTable = mapJoinTables[tag];
- if (hashTable == null) {
+ MapJoinTableContainer tableContainer = mapJoinTables[tag];
+ if (tableContainer == null) {
continue;
}
-
// get current input file name
String bigBucketFileName = getExecContext().getCurrentBigBucketFile();
-
String fileName = getExecContext().getLocalWork().getBucketFileName(bigBucketFileName);
-
// get the tmp URI path; it will be a hdfs path if not local mode
String dumpFilePrefix = conf.getDumpFilePrefix();
String tmpURIPath = Utilities.generatePath(tmpURI, dumpFilePrefix, tag, fileName);
- hashTable.isAbort(rowNumber, console);
- console.printInfo(Utilities.now() + "\tDump the hashtable into file: " + tmpURIPath);
+ console.printInfo(Utilities.now() + "\tDump the side-table into file: " + tmpURIPath);
// get the hashtable file and path
Path path = new Path(tmpURIPath);
FileSystem fs = path.getFileSystem(hconf);
- File file = new File(path.toUri().getPath());
- fs.create(path);
- fileLength = hashTable.flushMemoryCacheToPersistent(file);
- console.printInfo(Utilities.now() + "\tUpload 1 File to: " + tmpURIPath + " File size: "
- + fileLength);
-
- hashTable.close();
+ ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(fs.create(path), 4096));
+ try {
+ mapJoinTableSerdes[tag].persist(out, tableContainer);
+ } finally {
+ out.close();
+ }
+ tableContainer.clear();
+ console.printInfo(Utilities.now() + "\tUpload 1 File to: " + tmpURIPath);
}
}
-
super.closeOp(abort);
} catch (Exception e) {
- LOG.error("Generate Hashtable error", e);
- e.printStackTrace();
+ LOG.error("Error generating side-table", e);
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java?rev=1514760&r1=1514759&r2=1514760&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java Fri Aug 16 15:52:42 2013
@@ -23,10 +23,7 @@ import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.exec.persistence.AbstractMapJoinKey;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinDoubleKeys;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectKey;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinSingleKey;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -147,42 +144,22 @@ public class JoinUtil {
/**
* Return the key as a standard object. StandardObject can be inspected by a
- * standard ObjectInspector.
+ * standard ObjectInspector. The first parameter a MapJoinKey can
+ * be null if the caller would like a new object to be instantiated.
*/
- public static AbstractMapJoinKey computeMapJoinKeys(Object row,
+ public static MapJoinKey computeMapJoinKeys(MapJoinKey key, Object row,
List<ExprNodeEvaluator> keyFields, List<ObjectInspector> keyFieldsOI)
throws HiveException {
-
int size = keyFields.size();
- if(size == 1){
- Object obj = (ObjectInspectorUtils.copyToStandardObject(keyFields.get(0)
- .evaluate(row), keyFieldsOI.get(0),
- ObjectInspectorCopyOption.WRITABLE));
- MapJoinSingleKey key = new MapJoinSingleKey(obj);
- return key;
- }else if(size == 2){
- Object obj1 = (ObjectInspectorUtils.copyToStandardObject(keyFields.get(0)
- .evaluate(row), keyFieldsOI.get(0),
- ObjectInspectorCopyOption.WRITABLE));
-
- Object obj2 = (ObjectInspectorUtils.copyToStandardObject(keyFields.get(1)
- .evaluate(row), keyFieldsOI.get(1),
- ObjectInspectorCopyOption.WRITABLE));
-
- MapJoinDoubleKeys key = new MapJoinDoubleKeys(obj1,obj2);
- return key;
- }else{
- // Compute the keys
- Object[] nr = new Object[keyFields.size()];
- for (int i = 0; i < keyFields.size(); i++) {
-
- nr[i] = (ObjectInspectorUtils.copyToStandardObject(keyFields.get(i)
- .evaluate(row), keyFieldsOI.get(i),
- ObjectInspectorCopyOption.WRITABLE));
- }
- MapJoinObjectKey key = new MapJoinObjectKey(nr);
- return key;
- }
+ if(key == null || key.getKey().length != size) {
+ key = new MapJoinKey(new Object[size]);
+ }
+ Object[] array = key.getKey();
+ for (int keyIndex = 0; keyIndex < size; keyIndex++) {
+ array[keyIndex] = (ObjectInspectorUtils.copyToStandardObject(keyFields.get(keyIndex)
+ .evaluate(row), keyFieldsOI.get(keyIndex), ObjectInspectorCopyOption.WRITABLE));
+ }
+ return key;
}
@@ -354,7 +331,7 @@ public class JoinUtil {
}
- public static RowContainer getRowContainer(Configuration hconf,
+ public static RowContainer<List<Object>> getRowContainer(Configuration hconf,
List<ObjectInspector> structFieldObjectInspectors,
Byte alias,int containerSize, TableDesc[] spillTableDesc,
JoinDesc conf,boolean noFilter, Reporter reporter) throws HiveException {
@@ -366,7 +343,7 @@ public class JoinUtil {
containerSize = -1;
}
- RowContainer rc = new RowContainer(containerSize, hconf, reporter);
+ RowContainer<List<Object>> rc = new RowContainer<List<Object>>(containerSize, hconf, reporter);
StructObjectInspector rcOI = null;
if (tblDesc != null) {
// arbitrary column names used internally for serializing to spill table
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1514760&r1=1514759&r2=1514760&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Fri Aug 16 15:52:42 2013
@@ -18,6 +18,9 @@
package org.apache.hadoop.hive.ql.exec;
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.ArrayList;
@@ -27,20 +30,17 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx;
-import org.apache.hadoop.hive.ql.exec.persistence.AbstractMapJoinKey;
-import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectValue;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.util.ReflectionUtils;
@@ -51,23 +51,15 @@ public class MapJoinOperator extends Abs
private static final long serialVersionUID = 1L;
private static final Log LOG = LogFactory.getLog(MapJoinOperator.class.getName());
-
- protected transient HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>[] mapJoinTables;
-
- protected static MapJoinMetaData metadata = new MapJoinMetaData();
- public static MapJoinMetaData getMetadata() {
- return metadata;
- }
-
private static final transient String[] FATAL_ERR_MSG = {
null, // counter value 0 means no error
"Mapside join exceeds available memory. "
+ "Please try removing the mapjoin hint."};
- protected transient MapJoinRowContainer<ArrayList<Object>>[] rowContainerMap;
- transient int metadataKeyTag;
- transient int[] metadataValueTag;
- transient boolean hashTblInitedOnce;
+ private transient MapJoinTableContainer[] mapJoinTables;
+ private transient MapJoinTableContainerSerDe[] mapJoinTableSerdes;
+ private transient boolean hashTblInitedOnce;
+ private transient MapJoinKey key;
public MapJoinOperator() {
}
@@ -77,35 +69,11 @@ public class MapJoinOperator extends Abs
}
@Override
- @SuppressWarnings("unchecked")
protected void initializeOp(Configuration hconf) throws HiveException {
-
super.initializeOp(hconf);
-
- metadataValueTag = new int[numAliases];
- for (int pos = 0; pos < numAliases; pos++) {
- metadataValueTag[pos] = -1;
- }
-
- metadataKeyTag = -1;
-
int tagLen = conf.getTagLength();
-
- mapJoinTables = new HashMapWrapper[tagLen];
- rowContainerMap = new MapJoinRowContainer[tagLen];
- // initialize the hash tables for other tables
- for (int pos = 0; pos < numAliases; pos++) {
- if (pos == posBigTable) {
- continue;
- }
-
- HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashTable = new HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>();
-
- mapJoinTables[pos] = hashTable;
- MapJoinRowContainer<ArrayList<Object>> rowContainer = new MapJoinRowContainer<ArrayList<Object>>();
- rowContainerMap[pos] = rowContainer;
- }
-
+ mapJoinTables = new MapJoinTableContainer[tagLen];
+ mapJoinTableSerdes = new MapJoinTableContainerSerDe[tagLen];
hashTblInitedOnce = false;
}
@@ -118,14 +86,12 @@ public class MapJoinOperator extends Abs
public void generateMapMetaData() throws HiveException, SerDeException {
// generate the meta data for key
// index for key is -1
+
TableDesc keyTableDesc = conf.getKeyTblDesc();
SerDe keySerializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(),
null);
keySerializer.initialize(null, keyTableDesc.getProperties());
- metadata.put(Integer.valueOf(metadataKeyTag), new HashTableSinkObjectCtx(
- ObjectInspectorUtils.getStandardObjectInspector(keySerializer.getObjectInspector(),
- ObjectInspectorCopyOption.WRITABLE), keySerializer, keyTableDesc, false, hconf));
-
+ MapJoinObjectSerDeContext keyContext = new MapJoinObjectSerDeContext(keySerializer, false);
for (int pos = 0; pos < order.length; pos++) {
if (pos == posBigTable) {
continue;
@@ -139,16 +105,12 @@ public class MapJoinOperator extends Abs
SerDe valueSerDe = (SerDe) ReflectionUtils.newInstance(valueTableDesc.getDeserializerClass(),
null);
valueSerDe.initialize(null, valueTableDesc.getProperties());
-
- ObjectInspector inspector = valueSerDe.getObjectInspector();
- metadata.put(Integer.valueOf(pos), new HashTableSinkObjectCtx(ObjectInspectorUtils
- .getStandardObjectInspector(inspector, ObjectInspectorCopyOption.WRITABLE),
- valueSerDe, valueTableDesc, hasFilter(pos), hconf));
+ MapJoinObjectSerDeContext valueContext = new MapJoinObjectSerDeContext(valueSerDe, hasFilter(pos));
+ mapJoinTableSerdes[pos] = new MapJoinTableContainerSerDe(keyContext, valueContext);
}
}
private void loadHashTable() throws HiveException {
-
if (!this.getExecContext().getLocalWork().getInputFileChangeSensitive()) {
if (hashTblInitedOnce) {
return;
@@ -158,12 +120,9 @@ public class MapJoinOperator extends Abs
}
String baseDir = null;
-
String currentInputFile = getExecContext().getCurrentInputFile();
LOG.info("******* Load from HashTable File: input : " + currentInputFile);
-
String fileName = getExecContext().getLocalWork().getBucketFileName(currentInputFile);
-
try {
if (ShimLoader.getHadoopShims().isLocalMode(hconf)) {
baseDir = this.getExecContext().getLocalWork().getTmpFileURI();
@@ -183,18 +142,25 @@ public class MapJoinOperator extends Abs
baseDir = archiveLocalLink.toUri().getPath();
}
}
- for (byte pos = 0; pos < mapJoinTables.length; pos++) {
- HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashtable = mapJoinTables[pos];
- if (hashtable == null) {
+ for (int pos = 0; pos < mapJoinTables.length; pos++) {
+ if (pos == posBigTable) {
continue;
}
- String filePath = Utilities.generatePath(baseDir, conf.getDumpFilePrefix(), pos, fileName);
+ if(baseDir == null) {
+ throw new IllegalStateException("baseDir cannot be null");
+ }
+ String filePath = Utilities.generatePath(baseDir, conf.getDumpFilePrefix(), (byte)pos, fileName);
Path path = new Path(filePath);
- LOG.info("\tLoad back 1 hashtable file from tmp file uri:" + path.toString());
- hashtable.initilizePersistentHash(path.toUri().getPath());
+ LOG.info("\tLoad back 1 hashtable file from tmp file uri:" + path);
+ ObjectInputStream in = new ObjectInputStream(new BufferedInputStream(
+ new FileInputStream(path.toUri().getPath()), 4096));
+ try{
+ mapJoinTables[pos] = mapJoinTableSerdes[pos].load(in);
+ } finally {
+ in.close();
+ }
}
} catch (Exception e) {
- LOG.error("Load Distributed Cache Error", e);
throw new HiveException(e);
}
}
@@ -208,39 +174,31 @@ public class MapJoinOperator extends Abs
generateMapMetaData();
firstRow = false;
}
-
loadHashTable();
} catch (SerDeException e) {
- e.printStackTrace();
throw new HiveException(e);
}
}
@Override
public void processOp(Object row, int tag) throws HiveException {
-
try {
if (firstRow) {
// generate the map metadata
generateMapMetaData();
firstRow = false;
}
-
alias = (byte)tag;
// compute keys and values as StandardObjects
- AbstractMapJoinKey key = JoinUtil.computeMapJoinKeys(row, joinKeys[alias],
+ key = JoinUtil.computeMapJoinKeys(key, row, joinKeys[alias],
joinKeysObjectInspectors[alias]);
-
boolean joinNeeded = false;
for (byte pos = 0; pos < order.length; pos++) {
if (pos != alias) {
-
- MapJoinObjectValue o = mapJoinTables[pos].get(key);
- MapJoinRowContainer<ArrayList<Object>> rowContainer = rowContainerMap[pos];
-
+ MapJoinRowContainer rowContainer = mapJoinTables[pos].get(key);
// there is no join-value or join-key has all null elements
- if (o == null || key.hasAnyNulls(nullsafes)) {
+ if (rowContainer == null || key.hasAnyNulls(nullsafes)) {
if (!noOuterJoin) {
joinNeeded = true;
storage[pos] = dummyObjVectors[pos];
@@ -249,45 +207,36 @@ public class MapJoinOperator extends Abs
}
} else {
joinNeeded = true;
- rowContainer.reset(o.getObj());
- storage[pos] = rowContainer;
- aliasFilterTags[pos] = o.getAliasFilter();
+ storage[pos] = rowContainer.copy();
+ aliasFilterTags[pos] = rowContainer.getAliasFilter();
}
}
}
-
if (joinNeeded) {
ArrayList<Object> value = getFilteredValue(alias, row);
-
// Add the value to the ArrayList
storage[alias].add(value);
-
// generate the output records
checkAndGenObject();
}
-
// done with the row
storage[tag].clear();
-
for (byte pos = 0; pos < order.length; pos++) {
if (pos != tag) {
storage[pos] = null;
}
}
-
} catch (SerDeException e) {
- e.printStackTrace();
throw new HiveException(e);
}
}
@Override
public void closeOp(boolean abort) throws HiveException {
-
if (mapJoinTables != null) {
- for (HashMapWrapper<?, ?> hashTable : mapJoinTables) {
- if (hashTable != null) {
- hashTable.close();
+ for (MapJoinTableContainer tableContainer : mapJoinTables) {
+ if (tableContainer != null) {
+ tableContainer.clear();
}
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java?rev=1514760&r1=1514759&r2=1514760&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java Fri Aug 16 15:52:42 2013
@@ -64,10 +64,10 @@ public class SMBMapJoinOperator extends
private MapredLocalWork localWork = null;
private Map<String, MergeQueue> aliasToMergeQueue = Collections.emptyMap();
- transient ArrayList<Object>[] keyWritables;
- transient ArrayList<Object>[] nextKeyWritables;
- RowContainer<ArrayList<Object>>[] nextGroupStorage;
- RowContainer<ArrayList<Object>>[] candidateStorage;
+ transient List<Object>[] keyWritables;
+ transient List<Object>[] nextKeyWritables;
+ RowContainer<List<Object>>[] nextGroupStorage;
+ RowContainer<List<Object>>[] candidateStorage;
transient String[] tagToAlias;
private transient boolean[] fetchDone;
@@ -136,12 +136,12 @@ public class SMBMapJoinOperator extends
}
for (byte pos = 0; pos < order.length; pos++) {
- RowContainer rc = JoinUtil.getRowContainer(hconf,
+ RowContainer<List<Object>> rc = JoinUtil.getRowContainer(hconf,
rowContainerStandardObjectInspectors[pos],
pos, bucketSize,spillTableDesc, conf, !hasFilter(pos),
reporter);
nextGroupStorage[pos] = rc;
- RowContainer candidateRC = JoinUtil.getRowContainer(hconf,
+ RowContainer<List<Object>> candidateRC = JoinUtil.getRowContainer(hconf,
rowContainerStandardObjectInspectors[pos],
pos, bucketSize,spillTableDesc, conf, !hasFilter(pos),
reporter);
@@ -435,7 +435,7 @@ public class SMBMapJoinOperator extends
private void promoteNextGroupToCandidate(Byte t) throws HiveException {
this.keyWritables[t] = this.nextKeyWritables[t];
this.nextKeyWritables[t] = null;
- RowContainer<ArrayList<Object>> oldRowContainer = this.candidateStorage[t];
+ RowContainer<List<Object>> oldRowContainer = this.candidateStorage[t];
oldRowContainer.clear();
this.candidateStorage[t] = this.nextGroupStorage[t];
this.nextGroupStorage[t] = oldRowContainer;
@@ -479,10 +479,10 @@ public class SMBMapJoinOperator extends
private int[] findSmallestKey() {
int[] result = new int[order.length];
- ArrayList<Object> smallestOne = null;
+ List<Object> smallestOne = null;
for (byte pos = 0; pos < order.length; pos++) {
- ArrayList<Object> key = keyWritables[pos];
+ List<Object> key = keyWritables[pos];
if (key == null) {
continue;
}
@@ -501,7 +501,7 @@ public class SMBMapJoinOperator extends
private boolean processKey(byte alias, ArrayList<Object> key)
throws HiveException {
- ArrayList<Object> keyWritable = keyWritables[alias];
+ List<Object> keyWritable = keyWritables[alias];
if (keyWritable == null) {
//the first group.
keyWritables[alias] = key;
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java?rev=1514760&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java Fri Aug 16 15:52:42 2013
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.mapjoin;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+
+
+public class MapJoinMemoryExhaustionException extends HiveException {
+ private static final long serialVersionUID = 3678353959830506881L;
+ public MapJoinMemoryExhaustionException(String msg) {
+ super(msg);
+ }
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java?rev=1514760&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java Fri Aug 16 15:52:42 2013
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.mapjoin;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.text.NumberFormat;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+
+/**
+ * Handles the logic around deciding when to throw an MapJoinMemoryExhaustionException
+ * for HashTableSinkOperator.
+ */
+public class MapJoinMemoryExhaustionHandler {
+ private static final Log LOG = LogFactory.getLog(MapJoinMemoryExhaustionHandler.class);
+
+ public final MemoryMXBean memoryMXBean;
+
+ /**
+ * The percentage of overall heap that the JVM is allowed
+ * to allocate before failing a MapJoin local task.
+ */
+ private final double maxMemoryUsage;
+ /**
+ * The max heap of the JVM in bytes.
+ */
+ private final long maxHeapSize;
+ private final LogHelper console;
+ private final NumberFormat percentageNumberFormat;
+ /**
+ * Constructor expects a LogHelper object in addition to the max percent
+ * of heap memory which can be consumed before a MapJoinMemoryExhaustionException
+ * is thrown.
+ */
+ public MapJoinMemoryExhaustionHandler(LogHelper console, double maxMemoryUsage) {
+ this.console = console;
+ this.maxMemoryUsage = maxMemoryUsage;
+ this.memoryMXBean = ManagementFactory.getMemoryMXBean();
+ long maxHeapSize = memoryMXBean.getHeapMemoryUsage().getMax();
+ /*
+ * According to the javadoc, getMax() can return -1. In this case
+ * default to 200MB. This will probably never actually happen.
+ */
+ if(maxHeapSize == -1) {
+ this.maxHeapSize = 200L * 1024L * 1024L;
+ LOG.warn("MemoryMXBean.getHeapMemoryUsage().getMax() returned -1, " +
+ "defaulting maxHeapSize to 200MB");
+ } else {
+ this.maxHeapSize = maxHeapSize;
+ }
+ percentageNumberFormat = NumberFormat.getInstance();
+ percentageNumberFormat.setMinimumFractionDigits(2);
+ LOG.info("JVM Max Heap Size: " + this.maxHeapSize);
+ }
+ /**
+ * Throws MapJoinMemoryExhaustionException when the JVM has consumed the
+ * configured percentage of memory. The arguments are used simply for the error
+ * message.
+ *
+ * @param tableContainerSize currently table container size
+ * @param numRows number of rows processed
+ * @throws MapJoinMemoryExhaustionException
+ */
+ public void checkMemoryStatus(long tableContainerSize, long numRows)
+ throws MapJoinMemoryExhaustionException {
+ long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed();
+ double percentage = (double) usedMemory / (double) maxHeapSize;
+ String msg = Utilities.now() + "\tProcessing rows:\t" + numRows + "\tHashtable size:\t"
+ + tableContainerSize + "\tMemory usage:\t" + usedMemory + "\tpercentage:\t" + percentageNumberFormat.format(percentage);
+ console.printInfo(msg);
+ if(percentage > maxMemoryUsage) {
+ throw new MapJoinMemoryExhaustionException(msg);
+ }
+ }
+}
\ No newline at end of file
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java?rev=1514760&r1=1514759&r2=1514760&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java Fri Aug 16 15:52:42 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.m
import java.io.File;
import java.io.IOException;
+import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.lang.management.ManagementFactory;
@@ -52,9 +53,8 @@ import org.apache.hadoop.hive.ql.exec.Ta
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter;
-import org.apache.hadoop.hive.ql.exec.persistence.AbstractMapJoinKey;
-import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectValue;
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionException;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.BucketMapJoinContext;
import org.apache.hadoop.hive.ql.plan.FetchWork;
@@ -319,14 +319,13 @@ public class MapredLocalTask extends Tas
long elapsed = currentTime - startTime;
console.printInfo(Utilities.now() + "\tEnd of local task; Time Taken: "
+ Utilities.showTime(elapsed) + " sec.");
- } catch (Throwable e) {
- if (e instanceof OutOfMemoryError
- || (e instanceof HiveException && e.getMessage().equals("RunOutOfMeomoryUsage"))) {
- // Don't create a new object if we are already out of memory
+ } catch (Throwable throwable) {
+ if (throwable instanceof OutOfMemoryError
+ || (throwable instanceof MapJoinMemoryExhaustionException)) {
+ l4j.error("Hive Runtime Error: Map local work exhausted memory", throwable);
return 3;
} else {
- l4j.error("Hive Runtime Error: Map local work failed");
- e.printStackTrace();
+ l4j.error("Hive Runtime Error: Map local work failed", throwable);
return 2;
}
}
@@ -336,7 +335,6 @@ public class MapredLocalTask extends Tas
private void startForward(boolean inputFileChangeSenstive, String bigTableBucket)
throws Exception {
for (Map.Entry<String, FetchOperator> entry : fetchOperators.entrySet()) {
- int fetchOpRows = 0;
String alias = entry.getKey();
FetchOperator fetchOp = entry.getValue();
@@ -364,7 +362,6 @@ public class MapredLocalTask extends Tas
forwardOp.close(false);
break;
}
- fetchOpRows++;
forwardOp.process(row.o, 0);
// check if any operator had a fatal error or early exit during
// execution
@@ -425,7 +422,8 @@ public class MapredLocalTask extends Tas
}
}
- private void generateDummyHashTable(String alias, String bigBucketFileName) throws HiveException,IOException {
+ private void generateDummyHashTable(String alias, String bigBucketFileName)
+ throws HiveException,IOException {
// find the (byte)tag for the map join(HashTableSinkOperator)
Operator<? extends OperatorDesc> parentOp = work.getAliasToWork().get(alias);
Operator<? extends OperatorDesc> childOp = parentOp.getChildOperators().get(0);
@@ -442,8 +440,6 @@ public class MapredLocalTask extends Tas
// generate empty hashtable for this (byte)tag
String tmpURI = this.getWork().getTmpFileURI();
- HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashTable =
- new HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>();
String fileName = work.getBucketFileName(bigBucketFileName);
@@ -453,12 +449,14 @@ public class MapredLocalTask extends Tas
console.printInfo(Utilities.now() + "\tDump the hashtable into file: " + tmpURIPath);
Path path = new Path(tmpURIPath);
FileSystem fs = path.getFileSystem(job);
- File file = new File(path.toUri().getPath());
- fs.create(path);
- long fileLength = hashTable.flushMemoryCacheToPersistent(file);
+ ObjectOutputStream out = new ObjectOutputStream(fs.create(path));
+ try {
+ MapJoinTableContainerSerDe.persistDummyTable(out);
+ } finally {
+ out.close();
+ }
console.printInfo(Utilities.now() + "\tUpload 1 File to: " + tmpURIPath + " File size: "
- + fileLength);
- hashTable.close();
+ + fs.getFileStatus(path).getLen());
}
private void setUpFetchOpContext(FetchOperator fetchOp, String alias, String currentInputFile)
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractMapJoinTableContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractMapJoinTableContainer.java?rev=1514760&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractMapJoinTableContainer.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractMapJoinTableContainer.java Fri Aug 16 15:52:42 2013
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.persistence;
+
+import java.util.Collections;
+import java.util.Map;
+
+public abstract class AbstractMapJoinTableContainer implements MapJoinTableContainer {
+ private final Map<String, String> metaData;
+
+ protected AbstractMapJoinTableContainer(Map<String, String> metaData) {
+ this.metaData = metaData;
+ }
+ @Override
+ public Map<String, String> getMetaData() {
+ return Collections.unmodifiableMap(metaData);
+ }
+
+ protected void putMetaData(String key, String value) {
+ metaData.put(key, value);
+ }
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractRowContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractRowContainer.java?rev=1514760&r1=1514759&r2=1514760&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractRowContainer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractRowContainer.java Fri Aug 16 15:52:42 2013
@@ -20,17 +20,17 @@ package org.apache.hadoop.hive.ql.exec.p
import org.apache.hadoop.hive.ql.metadata.HiveException;
-public abstract class AbstractRowContainer<Row> {
+public abstract class AbstractRowContainer<ROW> {
public AbstractRowContainer() {
}
- public abstract void add(Row t) throws HiveException;
+ public abstract void add(ROW t) throws HiveException;
- public abstract Row first() throws HiveException;
+ public abstract ROW first() throws HiveException;
- public abstract Row next() throws HiveException;
+ public abstract ROW next() throws HiveException;
/**
* Get the number of elements in the RowContainer.
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java?rev=1514760&r1=1514759&r2=1514760&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java Fri Aug 16 15:52:42 2013
@@ -18,26 +18,14 @@
package org.apache.hadoop.hive.ql.exec.persistence;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.io.Serializable;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
-import java.text.NumberFormat;
import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
/**
@@ -47,26 +35,17 @@ import org.apache.hadoop.hive.ql.session
* hash table.
*/
-public class HashMapWrapper<K, V> implements Serializable {
+public class HashMapWrapper extends AbstractMapJoinTableContainer implements Serializable {
private static final long serialVersionUID = 1L;
- protected Log LOG = LogFactory.getLog(this.getClass().getName());
+ protected static final Log LOG = LogFactory.getLog(HashMapWrapper.class);
// default threshold for using main memory based HashMap
-
+ private static final String THESHOLD_NAME = "threshold";
+ private static final String LOAD_NAME = "load";
private static final int THRESHOLD = 1000000;
private static final float LOADFACTOR = 0.75f;
- private static final float MEMORYUSAGE = 1;
-
- private float maxMemoryUsage;
- private HashMap<K, V> mHash; // main memory HashMap
- protected transient LogHelper console;
-
- private File dumpFile;
- public static MemoryMXBean memoryMXBean;
- private long maxMemory;
- private long currentMemory;
- private NumberFormat num;
+ private HashMap<MapJoinKey, MapJoinRowContainer> mHash; // main memory HashMap
/**
* Constructor.
@@ -74,163 +53,53 @@ public class HashMapWrapper<K, V> implem
* @param threshold
* User specified threshold to store new values into persistent storage.
*/
- public HashMapWrapper(int threshold, float loadFactor, float memoryUsage) {
- maxMemoryUsage = memoryUsage;
- mHash = new HashMap<K, V>(threshold, loadFactor);
- memoryMXBean = ManagementFactory.getMemoryMXBean();
- maxMemory = memoryMXBean.getHeapMemoryUsage().getMax();
- LOG.info("maximum memory: " + maxMemory);
- num = NumberFormat.getInstance();
- num.setMinimumFractionDigits(2);
+ public HashMapWrapper(int threshold, float loadFactor) {
+ super(createConstructorMetaData(threshold, loadFactor));
+ mHash = new HashMap<MapJoinKey, MapJoinRowContainer>(threshold, loadFactor);
+
+ }
+
+ public HashMapWrapper(Map<String, String> metaData) {
+ super(metaData);
+ int threshold = Integer.parseInt(metaData.get(THESHOLD_NAME));
+ float loadFactor = Float.parseFloat(metaData.get(LOAD_NAME));
+ mHash = new HashMap<MapJoinKey, MapJoinRowContainer>(threshold, loadFactor);
}
public HashMapWrapper(int threshold) {
- this(threshold, LOADFACTOR, MEMORYUSAGE);
+ this(threshold, LOADFACTOR);
}
public HashMapWrapper() {
- this(THRESHOLD, LOADFACTOR, MEMORYUSAGE);
+ this(THRESHOLD, LOADFACTOR);
}
- public V get(K key) {
+ @Override
+ public MapJoinRowContainer get(MapJoinKey key) {
return mHash.get(key);
}
- public boolean put(K key, V value) throws HiveException {
- // isAbort();
+ @Override
+ public void put(MapJoinKey key, MapJoinRowContainer value) {
mHash.put(key, value);
- return false;
- }
-
-
- public void remove(K key) {
- mHash.remove(key);
- }
-
- /**
- * Flush the main memory hash table into the persistent cache file
- *
- * @return persistent cache file
- */
- public long flushMemoryCacheToPersistent(File file) throws IOException {
- ObjectOutputStream outputStream = null;
- outputStream = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(file), 4096));
- outputStream.writeObject(mHash);
- outputStream.flush();
- outputStream.close();
-
- return file.length();
- }
-
- public void initilizePersistentHash(String fileName) throws IOException, ClassNotFoundException {
- ObjectInputStream inputStream = null;
- inputStream = new ObjectInputStream(new BufferedInputStream(new FileInputStream(fileName), 4096));
- HashMap<K, V> hashtable = (HashMap<K, V>) inputStream.readObject();
- this.setMHash(hashtable);
-
- inputStream.close();
}
+ @Override
public int size() {
return mHash.size();
}
-
- public Set<K> keySet() {
- return mHash.keySet();
- }
-
-
- /**
- * Close the persistent hash table and clean it up.
- *
- * @throws HiveException
- */
- public void close() throws HiveException {
- mHash.clear();
+ @Override
+ public Set<Entry<MapJoinKey, MapJoinRowContainer>> entrySet() {
+ return mHash.entrySet();
}
-
- public void clear() throws HiveException {
+ @Override
+ public void clear() {
mHash.clear();
}
-
- public int getKeySize() {
- return mHash.size();
- }
-
- public boolean isAbort(long numRows,LogHelper console) {
- int size = mHash.size();
- long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed();
- double rate = (double) usedMemory / (double) maxMemory;
- console.printInfo(Utilities.now() + "\tProcessing rows:\t" + numRows + "\tHashtable size:\t"
- + size + "\tMemory usage:\t" + usedMemory + "\trate:\t" + num.format(rate));
- if (rate > (double) maxMemoryUsage) {
- return true;
- }
- return false;
- }
-
- public void setLOG(Log log) {
- LOG = log;
- }
-
- public HashMap<K, V> getMHash() {
- return mHash;
+ private static Map<String, String> createConstructorMetaData(int threshold, float loadFactor) {
+ Map<String, String> metaData = new HashMap<String, String>();
+ metaData.put(THESHOLD_NAME, String.valueOf(threshold));
+ metaData.put(LOAD_NAME, String.valueOf(loadFactor));
+ return metaData;
}
-
- public void setMHash(HashMap<K, V> hash) {
- mHash = hash;
- }
-
- public LogHelper getConsole() {
- return console;
- }
-
- public void setConsole(LogHelper console) {
- this.console = console;
- }
-
- public File getDumpFile() {
- return dumpFile;
- }
-
- public void setDumpFile(File dumpFile) {
- this.dumpFile = dumpFile;
- }
-
- public static MemoryMXBean getMemoryMXBean() {
- return memoryMXBean;
- }
-
- public static void setMemoryMXBean(MemoryMXBean memoryMXBean) {
- HashMapWrapper.memoryMXBean = memoryMXBean;
- }
-
- public long getMaxMemory() {
- return maxMemory;
- }
-
- public void setMaxMemory(long maxMemory) {
- this.maxMemory = maxMemory;
- }
-
- public long getCurrentMemory() {
- return currentMemory;
- }
-
- public void setCurrentMemory(long currentMemory) {
- this.currentMemory = currentMemory;
- }
-
- public NumberFormat getNum() {
- return num;
- }
-
- public void setNum(NumberFormat num) {
- this.num = num;
- }
-
- public static int getTHRESHOLD() {
- return THRESHOLD;
- }
-
}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java?rev=1514760&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java Fri Aug 16 15:52:42 2013
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.persistence;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.io.Writable;
+
+@SuppressWarnings("deprecation")
+public class MapJoinKey {
+ private static final Object[] EMPTY_OBJECT_ARRAY = new Object[0];
+
+ private Object[] key;
+
+ public MapJoinKey(Object[] key) {
+ this.key = key;
+ }
+ public MapJoinKey() {
+ this(EMPTY_OBJECT_ARRAY);
+ }
+
+ public Object[] getKey() {
+ return key;
+ }
+ public boolean hasAnyNulls(boolean[] nullsafes){
+ if (key != null && key.length > 0) {
+ for (int i = 0; i < key.length; i++) {
+ if (key[i] == null && (nullsafes == null || !nullsafes[i])) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + Arrays.hashCode(key);
+ return result;
+ }
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ MapJoinKey other = (MapJoinKey) obj;
+ if (!Arrays.equals(key, other.key))
+ return false;
+ return true;
+ }
+ @SuppressWarnings("unchecked")
+ public void read(MapJoinObjectSerDeContext context, ObjectInputStream in, Writable container)
+ throws IOException, SerDeException {
+ SerDe serde = context.getSerDe();
+ container.readFields(in);
+ List<Object> value = (List<Object>)ObjectInspectorUtils.copyToStandardObject(serde.deserialize(container),
+ serde.getObjectInspector(), ObjectInspectorCopyOption.WRITABLE);
+ if(value == null) {
+ key = EMPTY_OBJECT_ARRAY;
+ } else {
+ key = value.toArray();
+ }
+ }
+
+ public void write(MapJoinObjectSerDeContext context, ObjectOutputStream out)
+ throws IOException, SerDeException {
+ SerDe serde = context.getSerDe();
+ ObjectInspector objectInspector = context.getStandardOI();
+ Writable container = serde.serialize(key, objectInspector);
+ container.write(out);
+ }
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectSerDeContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectSerDeContext.java?rev=1514760&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectSerDeContext.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectSerDeContext.java Fri Aug 16 15:52:42 2013
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.persistence;
+
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+
+@SuppressWarnings("deprecation")
+public class MapJoinObjectSerDeContext {
+ private final ObjectInspector standardOI;
+ private final SerDe serde;
+ private final boolean hasFilter;
+
+ public MapJoinObjectSerDeContext(SerDe serde, boolean hasFilter)
+ throws SerDeException {
+ this.serde = serde;
+ this.hasFilter = hasFilter;
+ this.standardOI = ObjectInspectorUtils.getStandardObjectInspector(serde.getObjectInspector(),
+ ObjectInspectorCopyOption.WRITABLE);
+ }
+
+ /**
+ * @return the standardOI
+ */
+ public ObjectInspector getStandardOI() {
+ return standardOI;
+ }
+
+ /**
+ * @return the serde
+ */
+ public SerDe getSerDe() {
+ return serde;
+ }
+
+ public boolean hasFilterTag() {
+ return hasFilter;
+ }
+
+ @Override
+ public String toString() {
+ return "MapJoinObjectSerDeContext [standardOI=" + standardOI + ", serde=" + serde
+ + ", hasFilter=" + hasFilter + "]";
+ }
+
+}
\ No newline at end of file
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java?rev=1514760&r1=1514759&r2=1514760&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java Fri Aug 16 15:52:42 2013
@@ -18,30 +18,46 @@
package org.apache.hadoop.hive.ql.exec.persistence;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.AbstractList;
import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
import java.util.List;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-
-public class MapJoinRowContainer<Row> extends AbstractRowContainer<Row> {
-
- private List<Row> list;
-
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.io.Writable;
+
+@SuppressWarnings("deprecation")
+public class MapJoinRowContainer extends AbstractRowContainer<List<Object>> {
+ private static final Object[] EMPTY_OBJECT_ARRAY = new Object[0];
+
+ private final List<List<Object>> list;
private int index;
+ private byte aliasFilter = (byte) 0xff;
public MapJoinRowContainer() {
index = 0;
- list = new ArrayList<Row>(1);
- }
+ list = new ArrayList<List<Object>>(1);
+ }
@Override
- public void add(Row t) throws HiveException {
+ public void add(List<Object> t) {
list.add(t);
}
+ public void add(Object[] t) {
+ add(toList(t));
+ }
@Override
- public Row first() throws HiveException {
+ public List<Object> first() {
index = 0;
if (index < list.size()) {
return list.get(index);
@@ -50,13 +66,12 @@ public class MapJoinRowContainer<Row> ex
}
@Override
- public Row next() throws HiveException {
+ public List<Object> next() {
index++;
if (index < list.size()) {
return list.get(index);
}
return null;
-
}
/**
@@ -73,28 +88,88 @@ public class MapJoinRowContainer<Row> ex
* Remove all elements in the RowContainer.
*/
@Override
- public void clear() throws HiveException {
+ public void clear() {
list.clear();
index = 0;
}
-
- public List<Row> getList() {
- return list;
+
+ public byte getAliasFilter() {
+ return aliasFilter;
+ }
+
+ public MapJoinRowContainer copy() {
+ MapJoinRowContainer result = new MapJoinRowContainer();
+ for(List<Object> item : list) {
+ result.add(item);
+ }
+ return result;
}
-
- public void setList(List<Row> list) {
- this.list = list;
+
+ @SuppressWarnings({"unchecked"})
+ public void read(MapJoinObjectSerDeContext context, ObjectInputStream in, Writable container)
+ throws IOException, SerDeException {
+ clear();
+ SerDe serde = context.getSerDe();
+ long numRows = in.readLong();
+ for (long rowIndex = 0L; rowIndex < numRows; rowIndex++) {
+ container.readFields(in);
+ List<Object> value = (List<Object>)ObjectInspectorUtils.copyToStandardObject(serde.deserialize(container),
+ serde.getObjectInspector(), ObjectInspectorCopyOption.WRITABLE);
+ if(value == null) {
+ add(toList(EMPTY_OBJECT_ARRAY));
+ } else {
+ Object[] valuesArray = value.toArray();
+ if (context.hasFilterTag()) {
+ aliasFilter &= ((ShortWritable)valuesArray[valuesArray.length - 1]).get();
+ }
+ add(toList(valuesArray));
+ }
+ }
}
+
+ public void write(MapJoinObjectSerDeContext context, ObjectOutputStream out)
+ throws IOException, SerDeException {
+ SerDe serde = context.getSerDe();
+ ObjectInspector valueObjectInspector = context.getStandardOI();
+ long numRows = size();
+ long numRowsWritten = 0L;
+ out.writeLong(numRows);
+ for (List<Object> row = first(); row != null; row = next()) {
+ serde.serialize(row.toArray(), valueObjectInspector).write(out);
+ ++numRowsWritten;
+ }
+ if(numRows != size()) {
+ throw new ConcurrentModificationException("Values was modifified while persisting");
+ }
+ if(numRowsWritten != numRows) {
+ throw new IllegalStateException("Expected to write " + numRows + " but wrote " + numRowsWritten);
+ }
+ }
+
+ private List<Object> toList(Object[] array) {
+ return new NoCopyingArrayList(array);
+ }
+ /**
+ * In this use case our objects will not be modified
+ * so we don't care about copying in and out.
+ */
+ private static class NoCopyingArrayList extends AbstractList<Object> {
+ private Object[] array;
+ public NoCopyingArrayList(Object[] array) {
+ this.array = array;
+ }
+ @Override
+ public Object get(int index) {
+ return array[index];
+ }
- public void reset(MapJoinRowContainer<Object[]> other) throws HiveException {
- list.clear();
- Object[] obj;
- for (obj = other.first(); obj != null; obj = other.next()) {
- ArrayList<Object> ele = new ArrayList(obj.length);
- for (int i = 0; i < obj.length; i++) {
- ele.add(obj[i]);
- }
- list.add((Row) ele);
+ @Override
+ public int size() {
+ return array.length;
}
+
+ public Object[] toArray() {
+ return array;
+ }
}
}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java?rev=1514760&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java Fri Aug 16 15:52:42 2013
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.persistence;
+
+import java.util.Map;
+import java.util.Set;
+
+public interface MapJoinTableContainer {
+
+ public int size();
+
+ public MapJoinRowContainer get(MapJoinKey key);
+
+ public void put(MapJoinKey key, MapJoinRowContainer value);
+
+ public Set<Map.Entry<MapJoinKey, MapJoinRowContainer>> entrySet();
+
+ public Map<String, String> getMetaData();
+
+ public void clear();
+
+}