You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/08/16 17:52:43 UTC

svn commit: r1514760 [1/2] - in /hive/trunk/ql/src: java/org/apache/hadoop/hive/ql/exec/ java/org/apache/hadoop/hive/ql/exec/mapjoin/ java/org/apache/hadoop/hive/ql/exec/mr/ java/org/apache/hadoop/hive/ql/exec/persistence/ test/org/apache/hadoop/hive/q...

Author: hashutosh
Date: Fri Aug 16 15:52:42 2013
New Revision: 1514760

URL: http://svn.apache.org/r1514760
Log:
HIVE-4838 : Refactor MapJoin HashMap code to improve testability and readability (Brock Noland via Ashutosh Chauhan)

Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractMapJoinTableContainer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectSerDeContext.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/mapjoin/
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/mapjoin/TestMapJoinMemoryExhaustionHandler.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinEqualityTableContainer.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinKey.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinRowContainer.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinTableContainer.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/Utilities.java
Removed:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinMetaData.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractMapJoinKey.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/DCLLItem.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MRU.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinDoubleKeys.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectKey.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinSingleKey.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestHashMapWrapper.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinKeys.java
Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractRowContainer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java?rev=1514760&r1=1514759&r2=1514760&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java Fri Aug 16 15:52:42 2013
@@ -23,8 +23,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.persistence.AbstractMapJoinKey;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
 import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
@@ -54,7 +53,7 @@ public abstract class AbstractMapJoinOpe
 
   protected transient byte posBigTable = -1; // one of the tables that is not in memory
 
-  protected transient RowContainer<ArrayList<Object>> emptyList = null;
+  protected transient RowContainer<List<Object>> emptyList = null;
 
   transient int numMapRowsRead;
 
@@ -95,9 +94,9 @@ public abstract class AbstractMapJoinOpe
     // all other tables are small, and are cached in the hash table
     posBigTable = (byte) conf.getPosBigTable();
 
-    emptyList = new RowContainer<ArrayList<Object>>(1, hconf, reporter);
+    emptyList = new RowContainer<List<Object>>(1, hconf, reporter);
 
-    RowContainer bigPosRC = JoinUtil.getRowContainer(hconf,
+    RowContainer<List<Object>> bigPosRC = JoinUtil.getRowContainer(hconf,
         rowContainerStandardObjectInspectors[posBigTable],
         posBigTable, joinCacheSize,spillTableDesc, conf,
         !hasFilter(posBigTable), reporter);
@@ -160,7 +159,7 @@ public abstract class AbstractMapJoinOpe
   }
 
   // returns true if there are elements in key list and any of them is null
-  protected boolean hasAnyNulls(AbstractMapJoinKey key) {
+  protected boolean hasAnyNulls(MapJoinKey key) {
     return key.hasAnyNulls(nullsafes);
   }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java?rev=1514760&r1=1514759&r2=1514760&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java Fri Aug 16 15:52:42 2013
@@ -93,7 +93,7 @@ public abstract class CommonJoinOperator
   protected transient ArrayList<Object>[] dummyObj;
 
   // empty rows for each table
-  protected transient RowContainer<ArrayList<Object>>[] dummyObjVectors;
+  protected transient RowContainer<List<Object>>[] dummyObjVectors;
 
   protected transient int totalSz; // total size of the composite object
 
@@ -108,7 +108,7 @@ public abstract class CommonJoinOperator
   // input is too large
   // to fit in memory
 
-  AbstractRowContainer<ArrayList<Object>>[] storage; // map b/w table alias
+  AbstractRowContainer<List<Object>>[] storage; // map b/w table alias
   // to RowContainer
   int joinEmitInterval = -1;
   int joinCacheSize = 0;
@@ -274,7 +274,7 @@ public abstract class CommonJoinOperator
       }
       dummyObj[pos] = nr;
       // there should be only 1 dummy object in the RowContainer
-      RowContainer<ArrayList<Object>> values = JoinUtil.getRowContainer(hconf,
+      RowContainer<List<Object>> values = JoinUtil.getRowContainer(hconf,
           rowContainerStandardObjectInspectors[pos],
           alias, 1, spillTableDesc, conf, !hasFilter(pos), reporter);
 
@@ -283,7 +283,7 @@ public abstract class CommonJoinOperator
 
       // if serde is null, the input doesn't need to be spilled out
       // e.g., the output columns does not contains the input table
-      RowContainer rc = JoinUtil.getRowContainer(hconf,
+      RowContainer<List<Object>> rc = JoinUtil.getRowContainer(hconf,
           rowContainerStandardObjectInspectors[pos],
           alias, joinCacheSize, spillTableDesc, conf, !hasFilter(pos), reporter);
       storage[pos] = rc;
@@ -328,7 +328,7 @@ public abstract class CommonJoinOperator
   public void startGroup() throws HiveException {
     LOG.trace("Join: Starting new group");
     newGroupStarted = true;
-    for (AbstractRowContainer<ArrayList<Object>> alw : storage) {
+    for (AbstractRowContainer<List<Object>> alw : storage) {
       alw.clear();
     }
     super.startGroup();
@@ -443,7 +443,7 @@ public abstract class CommonJoinOperator
   private void genJoinObject() throws HiveException {
     boolean rightFirst = true;
     boolean hasFilter = hasFilter(order[0]);
-    AbstractRowContainer<ArrayList<Object>> aliasRes = storage[order[0]];
+    AbstractRowContainer<List<Object>> aliasRes = storage[order[0]];
     for (List<Object> rightObj = aliasRes.first(); rightObj != null; rightObj = aliasRes.next()) {
       boolean rightNull = rightObj == dummyObj[0];
       if (hasFilter) {
@@ -471,7 +471,7 @@ public abstract class CommonJoinOperator
       int right = joinCond.getRight();
 
       // search for match in the rhs table
-      AbstractRowContainer<ArrayList<Object>> aliasRes = storage[order[aliasNum]];
+      AbstractRowContainer<List<Object>> aliasRes = storage[order[aliasNum]];
 
       boolean done = false;
       boolean loopAgain = false;
@@ -641,8 +641,8 @@ public abstract class CommonJoinOperator
 
   private void genUniqueJoinObject(int aliasNum, int forwardCachePos)
       throws HiveException {
-    AbstractRowContainer<ArrayList<Object>> alias = storage[order[aliasNum]];
-    for (ArrayList<Object> row = alias.first(); row != null; row = alias.next()) {
+    AbstractRowContainer<List<Object>> alias = storage[order[aliasNum]];
+    for (List<Object> row = alias.first(); row != null; row = alias.next()) {
       int sz = joinValues[order[aliasNum]].size();
       int p = forwardCachePos;
       for (int j = 0; j < sz; j++) {
@@ -662,7 +662,7 @@ public abstract class CommonJoinOperator
     int p = 0;
     for (int i = 0; i < numAliases; i++) {
       int sz = joinValues[order[i]].size();
-      ArrayList<Object> obj = storage[order[i]].first();
+      List<Object> obj = storage[order[i]].first();
       for (int j = 0; j < sz; j++) {
         forwardCache[p++] = obj.get(j);
       }
@@ -684,7 +684,7 @@ public abstract class CommonJoinOperator
       boolean allOne = true;
       for (int i = 0; i < numAliases; i++) {
         Byte alias = order[i];
-        AbstractRowContainer<ArrayList<Object>> alw = storage[alias];
+        AbstractRowContainer<List<Object>> alw = storage[alias];
 
         if (alw.size() != 1) {
           allOne = false;
@@ -717,7 +717,7 @@ public abstract class CommonJoinOperator
       boolean hasEmpty = false;
       for (int i = 0; i < numAliases; i++) {
         Byte alias = order[i];
-        AbstractRowContainer<ArrayList<Object>> alw = storage[alias];
+        AbstractRowContainer<List<Object>> alw = storage[alias];
 
         if (noOuterJoin) {
           if (alw.size() == 0) {
@@ -737,7 +737,7 @@ public abstract class CommonJoinOperator
           } else {
             mayHasMoreThanOne = true;
             if (!hasEmpty) {
-              for (ArrayList<Object> row = alw.first(); row != null; row = alw.next()) {
+              for (List<Object> row = alw.first(); row != null; row = alw.next()) {
                 reportProgress();
                 if (hasAnyFiltered(alias, row)) {
                   hasEmpty = true;
@@ -784,7 +784,7 @@ public abstract class CommonJoinOperator
   @Override
   public void closeOp(boolean abort) throws HiveException {
     LOG.trace("Join Op close");
-    for (AbstractRowContainer<ArrayList<Object>> alw : storage) {
+    for (AbstractRowContainer<List<Object>> alw : storage) {
       if (alw != null) {
         alw.clear(); // clean up the temp files
       }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java?rev=1514760&r1=1514759&r2=1514760&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java Fri Aug 16 15:52:42 2013
@@ -17,7 +17,8 @@
  */
 package org.apache.hadoop.hive.ql.exec;
 
-import java.io.File;
+import java.io.BufferedOutputStream;
+import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
@@ -28,11 +29,13 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.persistence.AbstractMapJoinKey;
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler;
 import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectValue;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer;
-import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.HashTableSinkDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -41,10 +44,8 @@ import org.apache.hadoop.hive.ql.session
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
-import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 import org.apache.hadoop.util.ReflectionUtils;
 
@@ -54,128 +55,51 @@ public class HashTableSinkOperator exten
   private static final long serialVersionUID = 1L;
   private static final Log LOG = LogFactory.getLog(HashTableSinkOperator.class.getName());
 
-  protected static MapJoinMetaData metadata = new MapJoinMetaData();
-  // from abstract map join operator
   /**
    * The expressions for join inputs's join keys.
    */
-  protected transient List<ExprNodeEvaluator>[] joinKeys;
+  private transient List<ExprNodeEvaluator>[] joinKeys;
   /**
    * The ObjectInspectors for the join inputs's join keys.
    */
-  protected transient List<ObjectInspector>[] joinKeysObjectInspectors;
-  /**
-   * The standard ObjectInspectors for the join inputs's join keys.
-   */
-  protected transient List<ObjectInspector>[] joinKeysStandardObjectInspectors;
-
-  protected transient int posBigTableAlias = -1; // one of the tables that is not in memory
+  private transient List<ObjectInspector>[] joinKeysObjectInspectors;
 
-  protected transient RowContainer<ArrayList<Object>> emptyList = null;
+  private transient int posBigTableAlias = -1; // one of the tables that is not in memory
 
-  transient int numMapRowsRead;
-  protected transient int totalSz; // total size of the composite object
-  transient boolean firstRow;
   /**
    * The filters for join
    */
-  protected transient List<ExprNodeEvaluator>[] joinFilters;
+  private transient List<ExprNodeEvaluator>[] joinFilters;  
 
-  protected transient int[][] filterMaps;
+  private transient int[][] filterMaps;
 
-  protected transient int numAliases; // number of aliases
   /**
    * The expressions for join outputs.
    */
-  protected transient List<ExprNodeEvaluator>[] joinValues;
+  private transient List<ExprNodeEvaluator>[] joinValues;
   /**
    * The ObjectInspectors for the join inputs.
    */
-  protected transient List<ObjectInspector>[] joinValuesObjectInspectors;
+  private transient List<ObjectInspector>[] joinValuesObjectInspectors;
   /**
    * The ObjectInspectors for join filters.
    */
-  protected transient List<ObjectInspector>[] joinFilterObjectInspectors;
-  /**
-   * The standard ObjectInspectors for the join inputs.
-   */
-  protected transient List<ObjectInspector>[] joinValuesStandardObjectInspectors;
+  private transient List<ObjectInspector>[] joinFilterObjectInspectors;
 
-  protected transient List<ObjectInspector>[] rowContainerStandardObjectInspectors;
-
-  protected transient Byte[] order; // order in which the results should
-  Configuration hconf;
-  protected transient Byte alias;
-  protected transient TableDesc[] spillTableDesc; // spill tables are
-
-  protected transient HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>[] mapJoinTables;
-  protected transient boolean noOuterJoin;
+  private transient Byte[] order; // order in which the results should
+  private Configuration hconf;
+  private transient Byte alias;
+
+  private transient MapJoinTableContainer[] mapJoinTables;
+  private transient MapJoinTableContainerSerDe[] mapJoinTableSerdes;  
+  
+  private transient boolean noOuterJoin;
 
   private long rowNumber = 0;
-  protected transient LogHelper console;
+  private transient LogHelper console;
   private long hashTableScale;
-  private boolean isAbort = false;
-
-  public static class HashTableSinkObjectCtx {
-    ObjectInspector standardOI;
-    SerDe serde;
-    TableDesc tblDesc;
-    Configuration conf;
-    boolean hasFilter;
-
-    /**
-     * @param standardOI
-     * @param serde
-     */
-    public HashTableSinkObjectCtx(ObjectInspector standardOI, SerDe serde, TableDesc tblDesc,
-        boolean hasFilter, Configuration conf) {
-      this.standardOI = standardOI;
-      this.serde = serde;
-      this.tblDesc = tblDesc;
-      this.hasFilter = hasFilter;
-      this.conf = conf;
-    }
-
-    /**
-     * @return the standardOI
-     */
-    public ObjectInspector getStandardOI() {
-      return standardOI;
-    }
-
-    /**
-     * @return the serde
-     */
-    public SerDe getSerDe() {
-      return serde;
-    }
-
-    public TableDesc getTblDesc() {
-      return tblDesc;
-    }
-
-    public boolean hasFilterTag() {
-      return hasFilter;
-    }
-
-    public Configuration getConf() {
-      return conf;
-    }
-
-  }
-
-  public static MapJoinMetaData getMetadata() {
-    return metadata;
-  }
-
-  private static final transient String[] FATAL_ERR_MSG = {
-      null, // counter value 0 means no error
-      "Mapside join exceeds available memory. "
-          + "Please try removing the mapjoin hint."};
-  private final int metadataKeyTag = -1;
-  transient int[] metadataValueTag;
-
-
+  private MapJoinMemoryExhaustionHandler memoryExhaustionHandler;
+  
   public HashTableSinkOperator() {
   }
 
@@ -189,8 +113,7 @@ public class HashTableSinkOperator exten
   protected void initializeOp(Configuration hconf) throws HiveException {
     boolean isSilent = HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVESESSIONSILENT);
     console = new LogHelper(LOG, isSilent);
-    numMapRowsRead = 0;
-    firstRow = true;
+    memoryExhaustionHandler = new MapJoinMemoryExhaustionHandler(console, conf.getHashtableMemoryUsage());
 
     // for small tables only; so get the big table position first
     posBigTableAlias = conf.getPosBigTable();
@@ -198,9 +121,7 @@ public class HashTableSinkOperator exten
     order = conf.getTagOrder();
 
     // initialize some variables, which used to be initialized in CommonJoinOperator
-    numAliases = conf.getExprs().size();
     this.hconf = hconf;
-    totalSz = 0;
 
     noOuterJoin = conf.isNoOuterJoin();
     filterMaps = conf.getFilterMap();
@@ -212,16 +133,12 @@ public class HashTableSinkOperator exten
     JoinUtil.populateJoinKeyValue(joinKeys, conf.getKeys(), posBigTableAlias);
     joinKeysObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinKeys,
         inputObjInspectors, posBigTableAlias, tagLen);
-    joinKeysStandardObjectInspectors = JoinUtil.getStandardObjectInspectors(
-        joinKeysObjectInspectors, posBigTableAlias, tagLen);
 
     // process join values
     joinValues = new List[tagLen];
     JoinUtil.populateJoinKeyValue(joinValues, conf.getExprs(), posBigTableAlias);
     joinValuesObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinValues,
         inputObjInspectors, posBigTableAlias, tagLen);
-    joinValuesStandardObjectInspectors = JoinUtil.getStandardObjectInspectors(
-        joinValuesObjectInspectors, posBigTableAlias, tagLen);
 
     // process join filters
     joinFilters = new List[tagLen];
@@ -229,9 +146,7 @@ public class HashTableSinkOperator exten
     joinFilterObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinFilters,
         inputObjInspectors, posBigTableAlias, tagLen);
 
-    if (noOuterJoin) {
-      rowContainerStandardObjectInspectors = joinValuesStandardObjectInspectors;
-    } else {
+    if (!noOuterJoin) {
       List<ObjectInspector>[] rowContainerObjectInspectors = new List[tagLen];
       for (Byte alias : order) {
         if (alias == posBigTableAlias) {
@@ -245,43 +160,43 @@ public class HashTableSinkOperator exten
         }
         rowContainerObjectInspectors[alias] = rcOIs;
       }
-      rowContainerStandardObjectInspectors = getStandardObjectInspectors(
-          rowContainerObjectInspectors, tagLen);
-    }
-
-    metadataValueTag = new int[numAliases];
-    for (int pos = 0; pos < numAliases; pos++) {
-      metadataValueTag[pos] = -1;
     }
-    mapJoinTables = new HashMapWrapper[tagLen];
-
+    mapJoinTables = new MapJoinTableContainer[tagLen];
+    mapJoinTableSerdes = new MapJoinTableContainerSerDe[tagLen];
     int hashTableThreshold = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD);
     float hashTableLoadFactor = HiveConf.getFloatVar(hconf,
         HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR);
-    float hashTableMaxMemoryUsage = this.getConf().getHashtableMemoryUsage();
-
     hashTableScale = HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVEHASHTABLESCALE);
     if (hashTableScale <= 0) {
       hashTableScale = 1;
     }
-
-    // initialize the hash tables for other tables
-    for (Byte pos : order) {
-      if (pos == posBigTableAlias) {
-        continue;
+    try {
+      TableDesc keyTableDesc = conf.getKeyTblDesc();
+      SerDe keySerde = (SerDe) ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(),
+          null);
+      keySerde.initialize(null, keyTableDesc.getProperties());
+      MapJoinObjectSerDeContext keyContext = new MapJoinObjectSerDeContext(keySerde, false);
+      for (Byte pos : order) {
+        if (pos == posBigTableAlias) {
+          continue;
+        }
+        mapJoinTables[pos] = new HashMapWrapper(hashTableThreshold, hashTableLoadFactor);        
+        TableDesc valueTableDesc = conf.getValueTblFilteredDescs().get(pos);
+        SerDe valueSerDe = (SerDe) ReflectionUtils.newInstance(valueTableDesc.getDeserializerClass(), null);
+        valueSerDe.initialize(null, valueTableDesc.getProperties());
+        mapJoinTableSerdes[pos] = new MapJoinTableContainerSerDe(keyContext, new MapJoinObjectSerDeContext(
+            valueSerDe, hasFilter(pos)));
       }
-
-      HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashTable = new HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>(
-          hashTableThreshold, hashTableLoadFactor, hashTableMaxMemoryUsage);
-
-      mapJoinTables[pos] = hashTable;
+    } catch (SerDeException e) {
+      throw new HiveException(e);
     }
   }
 
 
 
-  protected static List<ObjectInspector>[] getStandardObjectInspectors(
+  private static List<ObjectInspector>[] getStandardObjectInspectors(
       List<ObjectInspector>[] aliasToObjectInspectors, int maxTag) {
+    @SuppressWarnings("unchecked")
     List<ObjectInspector>[] result = new List[maxTag];
     for (byte alias = 0; alias < aliasToObjectInspectors.length; alias++) {
       List<ObjectInspector> oiList = aliasToObjectInspectors[alias];
@@ -299,104 +214,34 @@ public class HashTableSinkOperator exten
 
   }
 
-  private void setKeyMetaData() throws SerDeException {
-    TableDesc keyTableDesc = conf.getKeyTblDesc();
-    SerDe keySerializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(),
-        null);
-    keySerializer.initialize(null, keyTableDesc.getProperties());
-
-    metadata.put(Integer.valueOf(metadataKeyTag), new HashTableSinkObjectCtx(
-        ObjectInspectorUtils.getStandardObjectInspector(keySerializer.getObjectInspector(),
-            ObjectInspectorCopyOption.WRITABLE), keySerializer, keyTableDesc, false, hconf));
-  }
-
-  private boolean hasFilter(int alias) {
-    return filterMaps != null && filterMaps[alias] != null;
-  }
   /*
    * This operator only process small tables Read the key/value pairs Load them into hashtable
    */
   @Override
   public void processOp(Object row, int tag) throws HiveException {
-    // let the mapJoinOp process these small tables
-    try {
-      if (firstRow) {
-        // generate the map metadata
-        setKeyMetaData();
-        firstRow = false;
+    alias = (byte)tag;
+    // compute keys and values as StandardObjects
+    MapJoinKey key = JoinUtil.computeMapJoinKeys(null, row, joinKeys[alias],
+        joinKeysObjectInspectors[alias]);
+    Object[] value = JoinUtil.computeMapJoinValues(row, joinValues[alias],
+        joinValuesObjectInspectors[alias], joinFilters[alias], joinFilterObjectInspectors[alias],
+        filterMaps == null ? null : filterMaps[alias]);
+    MapJoinTableContainer tableContainer = mapJoinTables[alias];
+    MapJoinRowContainer rowContainer = tableContainer.get(key);
+    if (rowContainer == null) {
+      rowContainer = new MapJoinRowContainer();
+      rowContainer.add(value);
+      rowNumber++;
+      if (rowNumber > hashTableScale && rowNumber % hashTableScale == 0) {
+        memoryExhaustionHandler.checkMemoryStatus(tableContainer.size(), rowNumber);
       }
-      alias = (byte)tag;
-
-      // compute keys and values as StandardObjects
-      AbstractMapJoinKey keyMap = JoinUtil.computeMapJoinKeys(row, joinKeys[alias],
-          joinKeysObjectInspectors[alias]);
-
-      Object[] value = JoinUtil.computeMapJoinValues(row, joinValues[alias],
-          joinValuesObjectInspectors[alias], joinFilters[alias], joinFilterObjectInspectors[alias],
-          filterMaps == null ? null : filterMaps[alias]);
-
-      HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashTable = mapJoinTables[alias];
-
-      MapJoinObjectValue o = hashTable.get(keyMap);
-      MapJoinRowContainer<Object[]> res = null;
-
-      boolean needNewKey = true;
-      if (o == null) {
-        res = new MapJoinRowContainer<Object[]>();
-        res.add(value);
-
-        if (metadataValueTag[tag] == -1) {
-          metadataValueTag[tag] = order[tag];
-          setValueMetaData(tag);
-        }
-
-        // Construct externalizable objects for key and value
-        if (needNewKey) {
-          MapJoinObjectValue valueObj = new MapJoinObjectValue(
-              metadataValueTag[tag], res);
-
-          rowNumber++;
-          if (rowNumber > hashTableScale && rowNumber % hashTableScale == 0) {
-            isAbort = hashTable.isAbort(rowNumber, console);
-            if (isAbort) {
-              throw new HiveException("RunOutOfMeomoryUsage");
-            }
-          }
-          hashTable.put(keyMap, valueObj);
-        }
-
-      } else {
-        res = o.getObj();
-        res.add(value);
-      }
-
-
-    } catch (SerDeException e) {
-      throw new HiveException(e);
+      tableContainer.put(key, rowContainer);
+    } else {
+      rowContainer.add(value);
     }
-
   }
-
-  private void setValueMetaData(int tag) throws SerDeException {
-    TableDesc valueTableDesc = conf.getValueTblFilteredDescs().get(tag);
-    SerDe valueSerDe = (SerDe) ReflectionUtils.newInstance(valueTableDesc.getDeserializerClass(),
-        null);
-
-    valueSerDe.initialize(null, valueTableDesc.getProperties());
-
-    List<ObjectInspector> newFields = rowContainerStandardObjectInspectors[alias];
-    int length = newFields.size();
-    List<String> newNames = new ArrayList<String>(length);
-    for (int i = 0; i < length; i++) {
-      String tmp = new String("tmp_" + i);
-      newNames.add(tmp);
-    }
-    StandardStructObjectInspector standardOI = ObjectInspectorFactory
-        .getStandardStructObjectInspector(newNames, newFields);
-
-    int alias = Integer.valueOf(metadataValueTag[tag]);
-    metadata.put(Integer.valueOf(metadataValueTag[tag]), new HashTableSinkObjectCtx(
-        standardOI, valueSerDe, valueTableDesc, hasFilter(alias), hconf));
+  private boolean hasFilter(int alias) {
+    return filterMaps != null && filterMaps[alias] != null;
   }
 
   @Override
@@ -405,42 +250,36 @@ public class HashTableSinkOperator exten
       if (mapJoinTables != null) {
         // get tmp file URI
         String tmpURI = this.getExecContext().getLocalWork().getTmpFileURI();
-        LOG.info("Get TMP URI: " + tmpURI);
-        long fileLength;
+        LOG.info("Temp URI for side table: " + tmpURI);
         for (byte tag = 0; tag < mapJoinTables.length; tag++) {
           // get the key and value
-          HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashTable = mapJoinTables[tag];
-          if (hashTable == null) {
+          MapJoinTableContainer tableContainer = mapJoinTables[tag];
+          if (tableContainer == null) {
             continue;
           }
-
           // get current input file name
           String bigBucketFileName = getExecContext().getCurrentBigBucketFile();
-
           String fileName = getExecContext().getLocalWork().getBucketFileName(bigBucketFileName);
-
           // get the tmp URI path; it will be a hdfs path if not local mode
           String dumpFilePrefix = conf.getDumpFilePrefix();
           String tmpURIPath = Utilities.generatePath(tmpURI, dumpFilePrefix, tag, fileName);
-          hashTable.isAbort(rowNumber, console);
-          console.printInfo(Utilities.now() + "\tDump the hashtable into file: " + tmpURIPath);
+          console.printInfo(Utilities.now() + "\tDump the side-table into file: " + tmpURIPath);
           // get the hashtable file and path
           Path path = new Path(tmpURIPath);
           FileSystem fs = path.getFileSystem(hconf);
-          File file = new File(path.toUri().getPath());
-          fs.create(path);
-          fileLength = hashTable.flushMemoryCacheToPersistent(file);
-          console.printInfo(Utilities.now() + "\tUpload 1 File to: " + tmpURIPath + " File size: "
-              + fileLength);
-
-          hashTable.close();
+          ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(fs.create(path), 4096));
+          try {
+            mapJoinTableSerdes[tag].persist(out, tableContainer);
+          } finally {
+            out.close();
+          }
+          tableContainer.clear();
+          console.printInfo(Utilities.now() + "\tUpload 1 File to: " + tmpURIPath);
         }
       }
-
       super.closeOp(abort);
     } catch (Exception e) {
-      LOG.error("Generate Hashtable error", e);
-      e.printStackTrace();
+      LOG.error("Error generating side-table", e);
     }
   }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java?rev=1514760&r1=1514759&r2=1514760&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java Fri Aug 16 15:52:42 2013
@@ -23,10 +23,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.exec.persistence.AbstractMapJoinKey;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinDoubleKeys;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectKey;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinSingleKey;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
 import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
 import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -147,42 +144,22 @@ public class JoinUtil {
 
   /**
    * Return the key as a standard object. StandardObject can be inspected by a
-   * standard ObjectInspector.
+   * standard ObjectInspector. The first parameter a MapJoinKey can
+   * be null if the caller would like a new object to be instantiated.
    */
-  public static AbstractMapJoinKey computeMapJoinKeys(Object row,
+  public static MapJoinKey computeMapJoinKeys(MapJoinKey key, Object row,
       List<ExprNodeEvaluator> keyFields, List<ObjectInspector> keyFieldsOI)
       throws HiveException {
-
     int size = keyFields.size();
-    if(size == 1){
-      Object obj = (ObjectInspectorUtils.copyToStandardObject(keyFields.get(0)
-          .evaluate(row), keyFieldsOI.get(0),
-          ObjectInspectorCopyOption.WRITABLE));
-      MapJoinSingleKey key = new MapJoinSingleKey(obj);
-      return key;
-    }else if(size == 2){
-      Object obj1 = (ObjectInspectorUtils.copyToStandardObject(keyFields.get(0)
-          .evaluate(row), keyFieldsOI.get(0),
-          ObjectInspectorCopyOption.WRITABLE));
-
-      Object obj2 = (ObjectInspectorUtils.copyToStandardObject(keyFields.get(1)
-          .evaluate(row), keyFieldsOI.get(1),
-          ObjectInspectorCopyOption.WRITABLE));
-
-      MapJoinDoubleKeys key = new MapJoinDoubleKeys(obj1,obj2);
-      return key;
-    }else{
-      // Compute the keys
-      Object[] nr = new Object[keyFields.size()];
-      for (int i = 0; i < keyFields.size(); i++) {
-
-        nr[i] = (ObjectInspectorUtils.copyToStandardObject(keyFields.get(i)
-            .evaluate(row), keyFieldsOI.get(i),
-            ObjectInspectorCopyOption.WRITABLE));
-      }
-      MapJoinObjectKey key = new MapJoinObjectKey(nr);
-      return key;
-      }
+    if(key == null || key.getKey().length != size) {
+      key = new MapJoinKey(new Object[size]);
+    }
+    Object[] array = key.getKey();
+    for (int keyIndex = 0; keyIndex < size; keyIndex++) {
+      array[keyIndex] = (ObjectInspectorUtils.copyToStandardObject(keyFields.get(keyIndex)
+          .evaluate(row), keyFieldsOI.get(keyIndex), ObjectInspectorCopyOption.WRITABLE));
+    }
+    return key;
   }
 
 
@@ -354,7 +331,7 @@ public class JoinUtil {
   }
 
 
-  public static RowContainer getRowContainer(Configuration hconf,
+  public static RowContainer<List<Object>> getRowContainer(Configuration hconf,
       List<ObjectInspector> structFieldObjectInspectors,
       Byte alias,int containerSize, TableDesc[] spillTableDesc,
       JoinDesc conf,boolean noFilter, Reporter reporter) throws HiveException {
@@ -366,7 +343,7 @@ public class JoinUtil {
       containerSize = -1;
     }
 
-    RowContainer rc = new RowContainer(containerSize, hconf, reporter);
+    RowContainer<List<Object>> rc = new RowContainer<List<Object>>(containerSize, hconf, reporter);
     StructObjectInspector rcOI = null;
     if (tblDesc != null) {
       // arbitrary column names used internally for serializing to spill table

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1514760&r1=1514759&r2=1514760&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Fri Aug 16 15:52:42 2013
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.ObjectInputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
 
@@ -27,20 +30,17 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx;
-import org.apache.hadoop.hive.ql.exec.persistence.AbstractMapJoinKey;
-import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectValue;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.util.ReflectionUtils;
 
@@ -51,23 +51,15 @@ public class MapJoinOperator extends Abs
   private static final long serialVersionUID = 1L;
   private static final Log LOG = LogFactory.getLog(MapJoinOperator.class.getName());
 
-
-  protected transient HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>[] mapJoinTables;
-
-  protected static MapJoinMetaData metadata = new MapJoinMetaData();
-  public static MapJoinMetaData getMetadata() {
-    return metadata;
-  }
-
   private static final transient String[] FATAL_ERR_MSG = {
       null, // counter value 0 means no error
       "Mapside join exceeds available memory. "
           + "Please try removing the mapjoin hint."};
 
-  protected transient MapJoinRowContainer<ArrayList<Object>>[] rowContainerMap;
-  transient int metadataKeyTag;
-  transient int[] metadataValueTag;
-  transient boolean hashTblInitedOnce;
+  private transient MapJoinTableContainer[] mapJoinTables;
+  private transient MapJoinTableContainerSerDe[] mapJoinTableSerdes;
+  private transient boolean hashTblInitedOnce;
+  private transient MapJoinKey key;
 
   public MapJoinOperator() {
   }
@@ -77,35 +69,11 @@ public class MapJoinOperator extends Abs
   }
 
   @Override
-  @SuppressWarnings("unchecked")
   protected void initializeOp(Configuration hconf) throws HiveException {
-
     super.initializeOp(hconf);
-
-    metadataValueTag = new int[numAliases];
-    for (int pos = 0; pos < numAliases; pos++) {
-      metadataValueTag[pos] = -1;
-    }
-
-    metadataKeyTag = -1;
-
     int tagLen = conf.getTagLength();
-
-    mapJoinTables = new HashMapWrapper[tagLen];
-    rowContainerMap = new MapJoinRowContainer[tagLen];
-    // initialize the hash tables for other tables
-    for (int pos = 0; pos < numAliases; pos++) {
-      if (pos == posBigTable) {
-        continue;
-      }
-
-      HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashTable = new HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>();
-
-      mapJoinTables[pos] = hashTable;
-      MapJoinRowContainer<ArrayList<Object>> rowContainer = new MapJoinRowContainer<ArrayList<Object>>();
-      rowContainerMap[pos] = rowContainer;
-    }
-
+    mapJoinTables = new MapJoinTableContainer[tagLen];
+    mapJoinTableSerdes = new MapJoinTableContainerSerDe[tagLen];
     hashTblInitedOnce = false;
   }
 
@@ -118,14 +86,12 @@ public class MapJoinOperator extends Abs
   public void generateMapMetaData() throws HiveException, SerDeException {
     // generate the meta data for key
     // index for key is -1
+    
     TableDesc keyTableDesc = conf.getKeyTblDesc();
     SerDe keySerializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(),
         null);
     keySerializer.initialize(null, keyTableDesc.getProperties());
-    metadata.put(Integer.valueOf(metadataKeyTag), new HashTableSinkObjectCtx(
-        ObjectInspectorUtils.getStandardObjectInspector(keySerializer.getObjectInspector(),
-            ObjectInspectorCopyOption.WRITABLE), keySerializer, keyTableDesc, false, hconf));
-
+    MapJoinObjectSerDeContext keyContext = new MapJoinObjectSerDeContext(keySerializer, false);
     for (int pos = 0; pos < order.length; pos++) {
       if (pos == posBigTable) {
         continue;
@@ -139,16 +105,12 @@ public class MapJoinOperator extends Abs
       SerDe valueSerDe = (SerDe) ReflectionUtils.newInstance(valueTableDesc.getDeserializerClass(),
           null);
       valueSerDe.initialize(null, valueTableDesc.getProperties());
-
-      ObjectInspector inspector = valueSerDe.getObjectInspector();
-      metadata.put(Integer.valueOf(pos), new HashTableSinkObjectCtx(ObjectInspectorUtils
-          .getStandardObjectInspector(inspector, ObjectInspectorCopyOption.WRITABLE),
-          valueSerDe, valueTableDesc, hasFilter(pos), hconf));
+      MapJoinObjectSerDeContext valueContext = new MapJoinObjectSerDeContext(valueSerDe, hasFilter(pos));
+      mapJoinTableSerdes[pos] = new MapJoinTableContainerSerDe(keyContext, valueContext);
     }
   }
 
   private void loadHashTable() throws HiveException {
-
     if (!this.getExecContext().getLocalWork().getInputFileChangeSensitive()) {
       if (hashTblInitedOnce) {
         return;
@@ -158,12 +120,9 @@ public class MapJoinOperator extends Abs
     }
 
     String baseDir = null;
-
     String currentInputFile = getExecContext().getCurrentInputFile();
     LOG.info("******* Load from HashTable File: input : " + currentInputFile);
-
     String fileName = getExecContext().getLocalWork().getBucketFileName(currentInputFile);
-
     try {
       if (ShimLoader.getHadoopShims().isLocalMode(hconf)) {
         baseDir = this.getExecContext().getLocalWork().getTmpFileURI();
@@ -183,18 +142,25 @@ public class MapJoinOperator extends Abs
           baseDir = archiveLocalLink.toUri().getPath();
         }
       }
-      for (byte pos = 0; pos < mapJoinTables.length; pos++) {
-        HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashtable = mapJoinTables[pos];
-        if (hashtable == null) {
+      for (int pos = 0; pos < mapJoinTables.length; pos++) {
+        if (pos == posBigTable) {
           continue;
         }
-        String filePath = Utilities.generatePath(baseDir, conf.getDumpFilePrefix(), pos, fileName);
+        if(baseDir == null) {
+          throw new IllegalStateException("baseDir cannot be null");
+        }
+        String filePath = Utilities.generatePath(baseDir, conf.getDumpFilePrefix(), (byte)pos, fileName);
         Path path = new Path(filePath);
-        LOG.info("\tLoad back 1 hashtable file from tmp file uri:" + path.toString());
-        hashtable.initilizePersistentHash(path.toUri().getPath());
+        LOG.info("\tLoad back 1 hashtable file from tmp file uri:" + path);
+        ObjectInputStream in = new ObjectInputStream(new BufferedInputStream(
+            new FileInputStream(path.toUri().getPath()), 4096));
+        try{ 
+          mapJoinTables[pos] = mapJoinTableSerdes[pos].load(in);
+        } finally {
+          in.close();
+        }
       }
     } catch (Exception e) {
-      LOG.error("Load Distributed Cache Error", e);
       throw new HiveException(e);
     }
   }
@@ -208,39 +174,31 @@ public class MapJoinOperator extends Abs
         generateMapMetaData();
         firstRow = false;
       }
-
       loadHashTable();
     } catch (SerDeException e) {
-      e.printStackTrace();
       throw new HiveException(e);
     }
   }
 
   @Override
   public void processOp(Object row, int tag) throws HiveException {
-
     try {
       if (firstRow) {
         // generate the map metadata
         generateMapMetaData();
         firstRow = false;
       }
-
       alias = (byte)tag;
 
       // compute keys and values as StandardObjects
-      AbstractMapJoinKey key = JoinUtil.computeMapJoinKeys(row, joinKeys[alias],
+      key = JoinUtil.computeMapJoinKeys(key, row, joinKeys[alias],
           joinKeysObjectInspectors[alias]);
-
       boolean joinNeeded = false;
       for (byte pos = 0; pos < order.length; pos++) {
         if (pos != alias) {
-
-          MapJoinObjectValue o = mapJoinTables[pos].get(key);
-          MapJoinRowContainer<ArrayList<Object>> rowContainer = rowContainerMap[pos];
-
+          MapJoinRowContainer rowContainer = mapJoinTables[pos].get(key);
           // there is no join-value or join-key has all null elements
-          if (o == null || key.hasAnyNulls(nullsafes)) {
+          if (rowContainer == null || key.hasAnyNulls(nullsafes)) {
             if (!noOuterJoin) {
               joinNeeded = true;
               storage[pos] = dummyObjVectors[pos];
@@ -249,45 +207,36 @@ public class MapJoinOperator extends Abs
             }
           } else {
             joinNeeded = true;
-            rowContainer.reset(o.getObj());
-            storage[pos] = rowContainer;
-            aliasFilterTags[pos] = o.getAliasFilter();
+            storage[pos] = rowContainer.copy();
+            aliasFilterTags[pos] = rowContainer.getAliasFilter();
           }
         }
       }
-
       if (joinNeeded) {
         ArrayList<Object> value = getFilteredValue(alias, row);
-
         // Add the value to the ArrayList
         storage[alias].add(value);
-
         // generate the output records
         checkAndGenObject();
       }
-
       // done with the row
       storage[tag].clear();
-
       for (byte pos = 0; pos < order.length; pos++) {
         if (pos != tag) {
           storage[pos] = null;
         }
       }
-
     } catch (SerDeException e) {
-      e.printStackTrace();
       throw new HiveException(e);
     }
   }
 
   @Override
   public void closeOp(boolean abort) throws HiveException {
-
     if (mapJoinTables != null) {
-      for (HashMapWrapper<?, ?> hashTable : mapJoinTables) {
-        if (hashTable != null) {
-          hashTable.close();
+      for (MapJoinTableContainer tableContainer : mapJoinTables) {
+        if (tableContainer != null) {
+          tableContainer.clear();
         }
       }
     }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java?rev=1514760&r1=1514759&r2=1514760&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java Fri Aug 16 15:52:42 2013
@@ -64,10 +64,10 @@ public class SMBMapJoinOperator extends 
   private MapredLocalWork localWork = null;
   private Map<String, MergeQueue> aliasToMergeQueue = Collections.emptyMap();
 
-  transient ArrayList<Object>[] keyWritables;
-  transient ArrayList<Object>[] nextKeyWritables;
-  RowContainer<ArrayList<Object>>[] nextGroupStorage;
-  RowContainer<ArrayList<Object>>[] candidateStorage;
+  transient List<Object>[] keyWritables;
+  transient List<Object>[] nextKeyWritables;
+  RowContainer<List<Object>>[] nextGroupStorage;
+  RowContainer<List<Object>>[] candidateStorage;
 
   transient String[] tagToAlias;
   private transient boolean[] fetchDone;
@@ -136,12 +136,12 @@ public class SMBMapJoinOperator extends 
     }
 
     for (byte pos = 0; pos < order.length; pos++) {
-      RowContainer rc = JoinUtil.getRowContainer(hconf,
+      RowContainer<List<Object>> rc = JoinUtil.getRowContainer(hconf,
           rowContainerStandardObjectInspectors[pos],
           pos, bucketSize,spillTableDesc, conf, !hasFilter(pos),
           reporter);
       nextGroupStorage[pos] = rc;
-      RowContainer candidateRC = JoinUtil.getRowContainer(hconf,
+      RowContainer<List<Object>> candidateRC = JoinUtil.getRowContainer(hconf,
           rowContainerStandardObjectInspectors[pos],
           pos, bucketSize,spillTableDesc, conf, !hasFilter(pos),
           reporter);
@@ -435,7 +435,7 @@ public class SMBMapJoinOperator extends 
   private void promoteNextGroupToCandidate(Byte t) throws HiveException {
     this.keyWritables[t] = this.nextKeyWritables[t];
     this.nextKeyWritables[t] = null;
-    RowContainer<ArrayList<Object>> oldRowContainer = this.candidateStorage[t];
+    RowContainer<List<Object>> oldRowContainer = this.candidateStorage[t];
     oldRowContainer.clear();
     this.candidateStorage[t] = this.nextGroupStorage[t];
     this.nextGroupStorage[t] = oldRowContainer;
@@ -479,10 +479,10 @@ public class SMBMapJoinOperator extends 
 
   private int[] findSmallestKey() {
     int[] result = new int[order.length];
-    ArrayList<Object> smallestOne = null;
+    List<Object> smallestOne = null;
 
     for (byte pos = 0; pos < order.length; pos++) {
-      ArrayList<Object> key = keyWritables[pos];
+      List<Object> key = keyWritables[pos];
       if (key == null) {
         continue;
       }
@@ -501,7 +501,7 @@ public class SMBMapJoinOperator extends 
 
   private boolean processKey(byte alias, ArrayList<Object> key)
       throws HiveException {
-    ArrayList<Object> keyWritable = keyWritables[alias];
+    List<Object> keyWritable = keyWritables[alias];
     if (keyWritable == null) {
       //the first group.
       keyWritables[alias] = key;

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java?rev=1514760&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java Fri Aug 16 15:52:42 2013
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.mapjoin;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+
+
+public class MapJoinMemoryExhaustionException extends HiveException {
+  private static final long serialVersionUID = 3678353959830506881L;
+  public MapJoinMemoryExhaustionException(String msg) {
+    super(msg);
+  }
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java?rev=1514760&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java Fri Aug 16 15:52:42 2013
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.mapjoin;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.text.NumberFormat;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+
+/**
+ * Handles the logic around deciding when to throw an MapJoinMemoryExhaustionException
+ * for HashTableSinkOperator.
+ */
+public class MapJoinMemoryExhaustionHandler {
+  private static final Log LOG = LogFactory.getLog(MapJoinMemoryExhaustionHandler.class);
+
+  public final MemoryMXBean memoryMXBean;
+
+  /**
+   * The percentage of overall heap that the JVM is allowed
+   * to allocate before failing a MapJoin local task.
+   */
+  private final double maxMemoryUsage;
+  /**
+   * The max heap of the JVM in bytes.
+   */
+  private final long maxHeapSize;
+  private final LogHelper console;
+  private final NumberFormat percentageNumberFormat;
+  /**
+   * Constructor expects a LogHelper object in addition to the max percent
+   * of heap memory which can be consumed before a MapJoinMemoryExhaustionException
+   * is thrown.
+   */
+  public MapJoinMemoryExhaustionHandler(LogHelper console, double maxMemoryUsage) {
+    this.console = console;
+    this.maxMemoryUsage = maxMemoryUsage;
+    this.memoryMXBean = ManagementFactory.getMemoryMXBean();
+    long maxHeapSize = memoryMXBean.getHeapMemoryUsage().getMax();
+    /*
+     * According to the javadoc, getMax() can return -1. In this case
+     * default to 200MB. This will probably never actually happen.
+     */
+    if(maxHeapSize == -1) {
+      this.maxHeapSize = 200L * 1024L * 1024L;
+      LOG.warn("MemoryMXBean.getHeapMemoryUsage().getMax() returned -1, " +
+      		"defaulting maxHeapSize to 200MB");
+    } else {
+      this.maxHeapSize = maxHeapSize;
+    }
+    percentageNumberFormat = NumberFormat.getInstance();
+    percentageNumberFormat.setMinimumFractionDigits(2);
+    LOG.info("JVM Max Heap Size: " + this.maxHeapSize);
+  }
+  /**
+   * Throws MapJoinMemoryExhaustionException when the JVM has consumed the
+   * configured percentage of memory. The arguments are used simply for the error
+   * message.
+   *
+   * @param tableContainerSize currently table container size
+   * @param numRows number of rows processed
+   * @throws MapJoinMemoryExhaustionException
+   */
+  public void checkMemoryStatus(long tableContainerSize, long numRows)
+  throws MapJoinMemoryExhaustionException {
+    long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed();
+    double percentage = (double) usedMemory / (double) maxHeapSize;
+    String msg = Utilities.now() + "\tProcessing rows:\t" + numRows + "\tHashtable size:\t"
+        + tableContainerSize + "\tMemory usage:\t" + usedMemory + "\tpercentage:\t" + percentageNumberFormat.format(percentage);
+    console.printInfo(msg);
+    if(percentage > maxMemoryUsage) {
+      throw new MapJoinMemoryExhaustionException(msg);
+    }
+   }
+}
\ No newline at end of file

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java?rev=1514760&r1=1514759&r2=1514760&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java Fri Aug 16 15:52:42 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.m
 
 import java.io.File;
 import java.io.IOException;
+import java.io.ObjectOutputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.lang.management.ManagementFactory;
@@ -52,9 +53,8 @@ import org.apache.hadoop.hive.ql.exec.Ta
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter;
-import org.apache.hadoop.hive.ql.exec.persistence.AbstractMapJoinKey;
-import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectValue;
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionException;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.BucketMapJoinContext;
 import org.apache.hadoop.hive.ql.plan.FetchWork;
@@ -319,14 +319,13 @@ public class MapredLocalTask extends Tas
       long elapsed = currentTime - startTime;
       console.printInfo(Utilities.now() + "\tEnd of local task; Time Taken: "
           + Utilities.showTime(elapsed) + " sec.");
-    } catch (Throwable e) {
-      if (e instanceof OutOfMemoryError
-          || (e instanceof HiveException && e.getMessage().equals("RunOutOfMeomoryUsage"))) {
-        // Don't create a new object if we are already out of memory
+    } catch (Throwable throwable) {
+      if (throwable instanceof OutOfMemoryError
+          || (throwable instanceof MapJoinMemoryExhaustionException)) {
+        l4j.error("Hive Runtime Error: Map local work exhausted memory", throwable);
         return 3;
       } else {
-        l4j.error("Hive Runtime Error: Map local work failed");
-        e.printStackTrace();
+        l4j.error("Hive Runtime Error: Map local work failed", throwable);
         return 2;
       }
     }
@@ -336,7 +335,6 @@ public class MapredLocalTask extends Tas
   private void startForward(boolean inputFileChangeSenstive, String bigTableBucket)
       throws Exception {
     for (Map.Entry<String, FetchOperator> entry : fetchOperators.entrySet()) {
-      int fetchOpRows = 0;
       String alias = entry.getKey();
       FetchOperator fetchOp = entry.getValue();
 
@@ -364,7 +362,6 @@ public class MapredLocalTask extends Tas
           forwardOp.close(false);
           break;
         }
-        fetchOpRows++;
         forwardOp.process(row.o, 0);
         // check if any operator had a fatal error or early exit during
         // execution
@@ -425,7 +422,8 @@ public class MapredLocalTask extends Tas
     }
   }
 
-  private void generateDummyHashTable(String alias, String bigBucketFileName) throws HiveException,IOException {
+  private void generateDummyHashTable(String alias, String bigBucketFileName)
+      throws HiveException,IOException {
     // find the (byte)tag for the map join(HashTableSinkOperator)
     Operator<? extends OperatorDesc> parentOp = work.getAliasToWork().get(alias);
     Operator<? extends OperatorDesc> childOp = parentOp.getChildOperators().get(0);
@@ -442,8 +440,6 @@ public class MapredLocalTask extends Tas
 
     // generate empty hashtable for this (byte)tag
     String tmpURI = this.getWork().getTmpFileURI();
-    HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashTable =
-      new HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>();
 
     String fileName = work.getBucketFileName(bigBucketFileName);
 
@@ -453,12 +449,14 @@ public class MapredLocalTask extends Tas
     console.printInfo(Utilities.now() + "\tDump the hashtable into file: " + tmpURIPath);
     Path path = new Path(tmpURIPath);
     FileSystem fs = path.getFileSystem(job);
-    File file = new File(path.toUri().getPath());
-    fs.create(path);
-    long fileLength = hashTable.flushMemoryCacheToPersistent(file);
+    ObjectOutputStream out = new ObjectOutputStream(fs.create(path));
+    try {
+      MapJoinTableContainerSerDe.persistDummyTable(out);
+    } finally {
+      out.close();
+    }
     console.printInfo(Utilities.now() + "\tUpload 1 File to: " + tmpURIPath + " File size: "
-        + fileLength);
-    hashTable.close();
+        + fs.getFileStatus(path).getLen());
   }
 
   private void setUpFetchOpContext(FetchOperator fetchOp, String alias, String currentInputFile)

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractMapJoinTableContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractMapJoinTableContainer.java?rev=1514760&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractMapJoinTableContainer.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractMapJoinTableContainer.java Fri Aug 16 15:52:42 2013
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.persistence;
+
+import java.util.Collections;
+import java.util.Map;
+
+public abstract class AbstractMapJoinTableContainer implements MapJoinTableContainer {
+  private final Map<String, String> metaData;
+
+  protected AbstractMapJoinTableContainer(Map<String, String> metaData) {
+    this.metaData = metaData;
+  }
+  @Override
+  public Map<String, String> getMetaData() {
+    return Collections.unmodifiableMap(metaData);
+  }
+  
+  protected void putMetaData(String key, String value) {
+    metaData.put(key, value);
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractRowContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractRowContainer.java?rev=1514760&r1=1514759&r2=1514760&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractRowContainer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractRowContainer.java Fri Aug 16 15:52:42 2013
@@ -20,17 +20,17 @@ package org.apache.hadoop.hive.ql.exec.p
 
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 
-public abstract class AbstractRowContainer<Row> {
+public abstract class AbstractRowContainer<ROW> {
 
   public AbstractRowContainer() {
 
   }
 
-  public abstract void add(Row t) throws HiveException;
+  public abstract void add(ROW t) throws HiveException;
 
-  public abstract Row first() throws HiveException;
+  public abstract ROW first() throws HiveException;
 
-  public abstract Row next() throws HiveException;
+  public abstract ROW next() throws HiveException;
 
   /**
    * Get the number of elements in the RowContainer.

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java?rev=1514760&r1=1514759&r2=1514760&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java Fri Aug 16 15:52:42 2013
@@ -18,26 +18,14 @@
 
 package org.apache.hadoop.hive.ql.exec.persistence;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.io.Serializable;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
-import java.text.NumberFormat;
 import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 
 
 /**
@@ -47,26 +35,17 @@ import org.apache.hadoop.hive.ql.session
  * hash table.
  */
 
-public class HashMapWrapper<K, V> implements Serializable {
+public class HashMapWrapper extends AbstractMapJoinTableContainer implements Serializable {
 
   private static final long serialVersionUID = 1L;
-  protected Log LOG = LogFactory.getLog(this.getClass().getName());
+  protected static final Log LOG = LogFactory.getLog(HashMapWrapper.class);
 
   // default threshold for using main memory based HashMap
-
+  private static final String THESHOLD_NAME = "threshold";
+  private static final String LOAD_NAME = "load";
   private static final int THRESHOLD = 1000000;
   private static final float LOADFACTOR = 0.75f;
-  private static final float MEMORYUSAGE = 1;
-
-  private float maxMemoryUsage;
-  private HashMap<K, V> mHash; // main memory HashMap
-  protected transient LogHelper console;
-
-  private File dumpFile;
-  public static MemoryMXBean memoryMXBean;
-  private long maxMemory;
-  private long currentMemory;
-  private NumberFormat num;
+  private HashMap<MapJoinKey, MapJoinRowContainer> mHash; // main memory HashMap
 
   /**
    * Constructor.
@@ -74,163 +53,53 @@ public class HashMapWrapper<K, V> implem
    * @param threshold
    *          User specified threshold to store new values into persistent storage.
    */
-  public HashMapWrapper(int threshold, float loadFactor, float memoryUsage) {
-    maxMemoryUsage = memoryUsage;
-    mHash = new HashMap<K, V>(threshold, loadFactor);
-    memoryMXBean = ManagementFactory.getMemoryMXBean();
-    maxMemory = memoryMXBean.getHeapMemoryUsage().getMax();
-    LOG.info("maximum memory: " + maxMemory);
-    num = NumberFormat.getInstance();
-    num.setMinimumFractionDigits(2);
+  public HashMapWrapper(int threshold, float loadFactor) {
+    super(createConstructorMetaData(threshold, loadFactor));
+    mHash = new HashMap<MapJoinKey, MapJoinRowContainer>(threshold, loadFactor);
+
+  }
+  
+  public HashMapWrapper(Map<String, String> metaData) {
+    super(metaData);
+    int threshold = Integer.parseInt(metaData.get(THESHOLD_NAME));
+    float loadFactor = Float.parseFloat(metaData.get(LOAD_NAME));
+    mHash = new HashMap<MapJoinKey, MapJoinRowContainer>(threshold, loadFactor);
   }
 
   public HashMapWrapper(int threshold) {
-    this(threshold, LOADFACTOR, MEMORYUSAGE);
+    this(threshold, LOADFACTOR);
   }
 
   public HashMapWrapper() {
-    this(THRESHOLD, LOADFACTOR, MEMORYUSAGE);
+    this(THRESHOLD, LOADFACTOR);
   }
 
-  public V get(K key) {
+  @Override
+  public MapJoinRowContainer get(MapJoinKey key) {
     return mHash.get(key);
   }
 
-  public boolean put(K key, V value) throws HiveException {
-    // isAbort();
+  @Override
+  public void put(MapJoinKey key, MapJoinRowContainer value) {
     mHash.put(key, value);
-    return false;
-  }
-
-
-  public void remove(K key) {
-    mHash.remove(key);
-  }
-
-  /**
-   * Flush the main memory hash table into the persistent cache file
-   *
-   * @return persistent cache file
-   */
-  public long flushMemoryCacheToPersistent(File file) throws IOException {
-    ObjectOutputStream outputStream = null;
-    outputStream = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(file), 4096));
-    outputStream.writeObject(mHash);
-    outputStream.flush();
-    outputStream.close();
-
-    return file.length();
-  }
-
-  public void initilizePersistentHash(String fileName) throws IOException, ClassNotFoundException {
-    ObjectInputStream inputStream = null;
-    inputStream = new ObjectInputStream(new BufferedInputStream(new FileInputStream(fileName), 4096));
-    HashMap<K, V> hashtable = (HashMap<K, V>) inputStream.readObject();
-    this.setMHash(hashtable);
-
-    inputStream.close();
   }
 
+  @Override
   public int size() {
     return mHash.size();
   }
-
-  public Set<K> keySet() {
-    return mHash.keySet();
-  }
-
-
-  /**
-   * Close the persistent hash table and clean it up.
-   *
-   * @throws HiveException
-   */
-  public void close() throws HiveException {
-    mHash.clear();
+  @Override
+  public Set<Entry<MapJoinKey, MapJoinRowContainer>> entrySet() {
+    return mHash.entrySet();
   }
-
-  public void clear() throws HiveException {
+  @Override
+  public void clear() {
     mHash.clear();
   }
-
-  public int getKeySize() {
-    return mHash.size();
-  }
-
-  public boolean isAbort(long numRows,LogHelper console) {
-    int size = mHash.size();
-    long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed();
-    double rate = (double) usedMemory / (double) maxMemory;
-    console.printInfo(Utilities.now() + "\tProcessing rows:\t" + numRows + "\tHashtable size:\t"
-        + size + "\tMemory usage:\t" + usedMemory + "\trate:\t" + num.format(rate));
-    if (rate > (double) maxMemoryUsage) {
-      return true;
-    }
-    return false;
-  }
-
-  public void setLOG(Log log) {
-    LOG = log;
-  }
-
-  public HashMap<K, V> getMHash() {
-    return mHash;
+  private static Map<String, String> createConstructorMetaData(int threshold, float loadFactor) {
+    Map<String, String> metaData = new HashMap<String, String>();
+    metaData.put(THESHOLD_NAME, String.valueOf(threshold));
+    metaData.put(LOAD_NAME, String.valueOf(loadFactor));
+    return metaData;
   }
-
-  public void setMHash(HashMap<K, V> hash) {
-    mHash = hash;
-  }
-
-  public LogHelper getConsole() {
-    return console;
-  }
-
-  public void setConsole(LogHelper console) {
-    this.console = console;
-  }
-
-  public File getDumpFile() {
-    return dumpFile;
-  }
-
-  public void setDumpFile(File dumpFile) {
-    this.dumpFile = dumpFile;
-  }
-
-  public static MemoryMXBean getMemoryMXBean() {
-    return memoryMXBean;
-  }
-
-  public static void setMemoryMXBean(MemoryMXBean memoryMXBean) {
-    HashMapWrapper.memoryMXBean = memoryMXBean;
-  }
-
-  public long getMaxMemory() {
-    return maxMemory;
-  }
-
-  public void setMaxMemory(long maxMemory) {
-    this.maxMemory = maxMemory;
-  }
-
-  public long getCurrentMemory() {
-    return currentMemory;
-  }
-
-  public void setCurrentMemory(long currentMemory) {
-    this.currentMemory = currentMemory;
-  }
-
-  public NumberFormat getNum() {
-    return num;
-  }
-
-  public void setNum(NumberFormat num) {
-    this.num = num;
-  }
-
-  public static int getTHRESHOLD() {
-    return THRESHOLD;
-  }
-
 }

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java?rev=1514760&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java Fri Aug 16 15:52:42 2013
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.persistence;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.io.Writable;
+
+@SuppressWarnings("deprecation")
+public class MapJoinKey {
+  private static final Object[] EMPTY_OBJECT_ARRAY = new Object[0];
+
+  private Object[] key;
+  
+  public MapJoinKey(Object[] key) {
+    this.key = key;
+  }
+  public MapJoinKey() {
+    this(EMPTY_OBJECT_ARRAY);
+  }
+
+  public Object[] getKey() {
+    return key;
+  }
+  public boolean hasAnyNulls(boolean[] nullsafes){
+    if (key != null && key.length > 0) {
+      for (int i = 0; i < key.length; i++) {
+        if (key[i] == null && (nullsafes == null || !nullsafes[i])) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+  
+  
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + Arrays.hashCode(key);
+    return result;
+  }
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    MapJoinKey other = (MapJoinKey) obj;
+    if (!Arrays.equals(key, other.key))
+      return false;
+    return true;
+  }
+  @SuppressWarnings("unchecked")
+  public void read(MapJoinObjectSerDeContext context, ObjectInputStream in, Writable container) 
+  throws IOException, SerDeException {
+    SerDe serde = context.getSerDe();
+    container.readFields(in);
+    List<Object> value = (List<Object>)ObjectInspectorUtils.copyToStandardObject(serde.deserialize(container),
+        serde.getObjectInspector(), ObjectInspectorCopyOption.WRITABLE);
+    if(value == null) {
+      key = EMPTY_OBJECT_ARRAY;
+    } else {
+      key = value.toArray();
+    }
+  }
+  
+  public void write(MapJoinObjectSerDeContext context, ObjectOutputStream out) 
+  throws IOException, SerDeException {
+    SerDe serde = context.getSerDe();
+    ObjectInspector objectInspector = context.getStandardOI();
+    Writable container = serde.serialize(key, objectInspector);
+    container.write(out);
+  }
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectSerDeContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectSerDeContext.java?rev=1514760&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectSerDeContext.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectSerDeContext.java Fri Aug 16 15:52:42 2013
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.persistence;
+
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+
+@SuppressWarnings("deprecation")
+public class MapJoinObjectSerDeContext {
+  private final ObjectInspector standardOI;
+  private final SerDe serde;
+  private final boolean hasFilter;
+
+  public MapJoinObjectSerDeContext(SerDe serde, boolean hasFilter)
+      throws SerDeException {
+    this.serde = serde;
+    this.hasFilter = hasFilter;
+    this.standardOI = ObjectInspectorUtils.getStandardObjectInspector(serde.getObjectInspector(),
+        ObjectInspectorCopyOption.WRITABLE);
+  }
+
+  /**
+   * @return the standardOI
+   */
+  public ObjectInspector getStandardOI() {
+    return standardOI;
+  }
+
+  /**
+   * @return the serde
+   */
+  public SerDe getSerDe() {
+    return serde;
+  }
+
+  public boolean hasFilterTag() {
+    return hasFilter;
+  }
+
+  @Override
+  public String toString() {
+    return "MapJoinObjectSerDeContext [standardOI=" + standardOI + ", serde=" + serde
+        + ", hasFilter=" + hasFilter + "]";
+  }
+
+}
\ No newline at end of file

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java?rev=1514760&r1=1514759&r2=1514760&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java Fri Aug 16 15:52:42 2013
@@ -18,30 +18,46 @@
 
 package org.apache.hadoop.hive.ql.exec.persistence;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.AbstractList;
 import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
 import java.util.List;
 
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-
-public class MapJoinRowContainer<Row> extends AbstractRowContainer<Row> {
-
-  private List<Row> list;
-
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.io.Writable;
+
+@SuppressWarnings("deprecation")
+public class MapJoinRowContainer extends AbstractRowContainer<List<Object>> {
+  private static final Object[] EMPTY_OBJECT_ARRAY = new Object[0];
+  
+  private final List<List<Object>> list;
   private int index;
+  private byte aliasFilter = (byte) 0xff;
 
   public MapJoinRowContainer() {
     index = 0;
-    list = new ArrayList<Row>(1);
-  }
+    list = new ArrayList<List<Object>>(1);
+  } 
 
   @Override
-  public void add(Row t) throws HiveException {
+  public void add(List<Object> t) {
     list.add(t);
   }
 
+  public void add(Object[] t) {
+    add(toList(t));
+  }
 
   @Override
-  public Row first() throws HiveException {
+  public List<Object> first() {
     index = 0;
     if (index < list.size()) {
       return list.get(index);
@@ -50,13 +66,12 @@ public class MapJoinRowContainer<Row> ex
   }
 
   @Override
-  public Row next() throws HiveException {
+  public List<Object> next() {
     index++;
     if (index < list.size()) {
       return list.get(index);
     }
     return null;
-
   }
 
   /**
@@ -73,28 +88,88 @@ public class MapJoinRowContainer<Row> ex
    * Remove all elements in the RowContainer.
    */
   @Override
-  public void clear() throws HiveException {
+  public void clear() {
     list.clear();
     index = 0;
   }
-
-  public List<Row> getList() {
-    return list;
+  
+  public byte getAliasFilter() {
+    return aliasFilter;
+  }
+  
+  public MapJoinRowContainer copy() {
+    MapJoinRowContainer result = new MapJoinRowContainer();
+    for(List<Object> item : list) {
+      result.add(item);
+    }
+    return result;
   }
-
-  public void setList(List<Row> list) {
-    this.list = list;
+  
+  @SuppressWarnings({"unchecked"})
+  public void read(MapJoinObjectSerDeContext context, ObjectInputStream in, Writable container) 
+  throws IOException, SerDeException {
+    clear();
+    SerDe serde = context.getSerDe();
+    long numRows = in.readLong();
+    for (long rowIndex = 0L; rowIndex < numRows; rowIndex++) {
+      container.readFields(in);      
+      List<Object> value = (List<Object>)ObjectInspectorUtils.copyToStandardObject(serde.deserialize(container),
+          serde.getObjectInspector(), ObjectInspectorCopyOption.WRITABLE);
+      if(value == null) {
+        add(toList(EMPTY_OBJECT_ARRAY));
+      } else {
+        Object[] valuesArray = value.toArray();
+        if (context.hasFilterTag()) {
+          aliasFilter &= ((ShortWritable)valuesArray[valuesArray.length - 1]).get();
+        }
+        add(toList(valuesArray));
+      }
+    }
   }
+  
+  public void write(MapJoinObjectSerDeContext context, ObjectOutputStream out) 
+  throws IOException, SerDeException {
+    SerDe serde = context.getSerDe();
+    ObjectInspector valueObjectInspector = context.getStandardOI();
+    long numRows = size();
+    long numRowsWritten = 0L;
+    out.writeLong(numRows);
+    for (List<Object> row = first(); row != null; row = next()) {
+      serde.serialize(row.toArray(), valueObjectInspector).write(out);
+      ++numRowsWritten;      
+    }
+    if(numRows != size()) {
+      throw new ConcurrentModificationException("Values was modifified while persisting");
+    }
+    if(numRowsWritten != numRows) {
+      throw new IllegalStateException("Expected to write " + numRows + " but wrote " + numRowsWritten);
+    }
+  }
+  
+  private List<Object> toList(Object[] array) {
+    return new NoCopyingArrayList(array);
+  }
+  /**
+   * In this use case our objects will not be modified
+   * so we don't care about copying in and out.
+   */
+  private static class NoCopyingArrayList extends AbstractList<Object> {
+    private Object[] array;
+    public NoCopyingArrayList(Object[] array) {
+      this.array = array;
+    }
+    @Override
+    public Object get(int index) {
+      return array[index];
+    }
 
-  public void reset(MapJoinRowContainer<Object[]> other) throws HiveException {
-    list.clear();
-    Object[] obj;
-    for (obj = other.first(); obj != null; obj = other.next()) {
-      ArrayList<Object> ele = new ArrayList(obj.length);
-      for (int i = 0; i < obj.length; i++) {
-        ele.add(obj[i]);
-      }
-      list.add((Row) ele);
+    @Override
+    public int size() {
+      return array.length;
     }
+    
+    public Object[] toArray() {
+      return array;
+    }    
   }
 }

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java?rev=1514760&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java Fri Aug 16 15:52:42 2013
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.persistence;
+
+import java.util.Map;
+import java.util.Set;
+
+public interface MapJoinTableContainer {  
+  
+  public int size();
+  
+  public MapJoinRowContainer get(MapJoinKey key);
+  
+  public void put(MapJoinKey key, MapJoinRowContainer value);
+  
+  public Set<Map.Entry<MapJoinKey, MapJoinRowContainer>> entrySet();
+  
+  public Map<String, String> getMetaData();
+  
+  public void clear();
+
+}