You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by bo...@apache.org on 2018/05/17 21:57:57 UTC

[drill] 02/03: DRILL-6027: - Added memory claculator - Added unit tests and docs. - Fixed IOB caused by output vector allocation. - Don't double count records that were spilled in HashJoin

This is an automated email from the ASF dual-hosted git repository.

boaz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 349ac5ad431a6814f0a065bf16dee635cc6d3274
Author: Timothy Farkas <ti...@apache.org>
AuthorDate: Mon Feb 12 16:49:29 2018 -0800

    DRILL-6027:
     - Added memory claculator
     - Added unit tests and docs.
     - Fixed IOB caused by output vector allocation.
     - Don't double count records that were spilled in HashJoin
---
 .../java/org/apache/drill/exec/ExecConstants.java  |  16 +-
 .../drill/exec/physical/config/HashJoinPOP.java    |  14 +-
 .../physical/impl/aggregate/HashAggTemplate.java   |   1 -
 .../physical/impl/common/ChainedHashTable.java     |   8 +-
 .../exec/physical/impl/common/HashPartition.java   | 111 ++-
 .../drill/exec/physical/impl/common/HashTable.java |  80 +-
 .../impl/common/HashTableAllocationTracker.java    |  73 ++
 .../exec/physical/impl/common/HashTableConfig.java |  33 +-
 .../physical/impl/common/HashTableTemplate.java    | 116 +--
 .../exec/physical/impl/join/HashJoinBatch.java     | 468 ++++++++----
 .../exec/physical/impl/join/HashJoinHelper.java    |  12 +
 .../impl/join/HashJoinHelperSizeCalculator.java}   |  19 +-
 .../join/HashJoinHelperSizeCalculatorImpl.java     |  51 ++
 .../impl/join/HashJoinMemoryCalculator.java        | 320 ++++++++
 .../impl/join/HashJoinMemoryCalculatorImpl.java    | 833 +++++++++++++++++++++
 .../exec/physical/impl/join/HashJoinState.java     |  42 ++
 .../impl/join/HashJoinStateCalculator.java}        |  25 +-
 .../impl/join/HashTableSizeCalculator.java}        |  26 +-
 .../HashTableSizeCalculatorConservativeImpl.java   |  96 +++
 .../impl/join/HashTableSizeCalculatorLeanImpl.java | 109 +++
 .../join/MechanicalHashJoinMemoryCalculator.java   | 163 ++++
 .../drill/exec/planner/logical/DrillJoinRel.java   |   3 +-
 .../drill/exec/record/AbstractRecordBatch.java     |   8 +
 .../apache/drill/exec/record/RecordBatchSizer.java |  42 ++
 .../apache/drill/exec/record/VectorContainer.java  |  24 +
 .../exec/server/options/SystemOptionManager.java   |   9 +-
 .../java-exec/src/main/resources/drill-module.conf |   8 +-
 .../drill/exec/cache/TestBatchSerialization.java   |   5 +-
 .../exec/compile/TestLargeFileCompilation.java     |   1 +
 .../physical/impl/common/HashPartitionTest.java    | 306 ++++++++
 .../common/HashTableAllocationTrackerTest.java     | 101 +++
 .../exec/physical/impl/join/PartitionStatImpl.java |  79 ++
 .../impl/join/TestBuildSidePartitioningImpl.java   | 289 +++++++
 .../join/TestHashJoinHelperSizeCalculatorImpl.java |  44 ++
 .../join/TestHashJoinMemoryCalculatorImpl.java     |  82 ++
 .../exec/physical/impl/join/TestHashJoinSpill.java |   2 -
 ...estHashTableSizeCalculatorConservativeImpl.java |  67 ++
 .../join/TestHashTableSizeCalculatorLeanImpl.java  |  66 ++
 .../exec/physical/impl/join/TestPartitionStat.java |  57 ++
 .../impl/join/TestPostBuildCalculationsImpl.java   | 434 +++++++++++
 .../physical/impl/validate/TestBatchValidator.java |   7 +-
 .../physical/impl/xsort/managed/TestSortImpl.java  |  23 +-
 .../physical/impl/xsort/managed/TestSorter.java    |   8 +-
 .../exec/physical/unit/PhysicalOpUnitTestBase.java |   7 +-
 .../drill/exec/record/TestVectorContainer.java     |  41 +-
 .../drill/exec/util/TestQueryMemoryAlloc.java      |  13 +-
 .../org/apache/drill/test/BaseDirTestWatcher.java  |  18 +
 .../java/org/apache/drill/test/ConfigBuilder.java  |   7 +-
 .../org/apache/drill/test/OperatorFixture.java     |  28 +-
 .../org/apache/drill/test/SubOperatorTest.java     |   6 +-
 .../drill/test/rowSet/test/PerformanceTool.java    |   2 +-
 .../main/codegen/templates/FixedValueVectors.java  |   5 +
 .../codegen/templates/NullableValueVectors.java    |   7 +
 .../org/apache/drill/exec/vector/BitVector.java    |   5 +
 .../apache/drill/exec/vector/FixedWidthVector.java |   6 +
 .../drill/exec/vector/UntypedNullVector.java       |   5 +
 56 files changed, 4121 insertions(+), 310 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 57ecdf2..c48f414 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -104,16 +104,22 @@ public final class ExecConstants {
   public static final BooleanValidator EXTERNAL_SORT_DISABLE_MANAGED_OPTION = new BooleanValidator("exec.sort.disable_managed");
 
   // Hash Join Options
+  public static String HASHJOIN_HASHTABLE_CALC_TYPE_KEY = "exec.hashjoin.hash_table_calc_type";
+  public static StringValidator HASHJOIN_HASHTABLE_CALC_TYPE = new StringValidator(HASHJOIN_HASHTABLE_CALC_TYPE_KEY);
+  public static String HASHJOIN_SAFETY_FACTOR_KEY = "exec.hashjoin.safety_factor";
+  public static DoubleValidator HASHJOIN_SAFETY_FACTOR = new RangeDoubleValidator(HASHJOIN_SAFETY_FACTOR_KEY, 1.0, Double.MAX_VALUE);
+  public static String HASHJOIN_HASH_DOUBLE_FACTOR_KEY = "exec.hashjoin.hash_double_factor";
+  public static DoubleValidator HASHJOIN_HASH_DOUBLE_FACTOR = new RangeDoubleValidator(HASHJOIN_HASH_DOUBLE_FACTOR_KEY, 1.0, Double.MAX_VALUE);
+  public static String HASHJOIN_FRAGMENTATION_FACTOR_KEY = "exec.hashjoin.fragmentation_factor";
+  public static DoubleValidator HASHJOIN_FRAGMENTATION_FACTOR = new RangeDoubleValidator(HASHJOIN_FRAGMENTATION_FACTOR_KEY, 1.0, Double.MAX_VALUE);
   public static final String HASHJOIN_NUM_ROWS_IN_BATCH_KEY = "exec.hashjoin.num_rows_in_batch";
   public static final LongValidator HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR = new RangeLongValidator(HASHJOIN_NUM_ROWS_IN_BATCH_KEY, 1, 65536);
   public static final String HASHJOIN_MAX_BATCHES_IN_MEMORY_KEY = "exec.hashjoin.max_batches_in_memory";
-  public static final LongValidator HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR = new RangeLongValidator(HASHJOIN_MAX_BATCHES_IN_MEMORY_KEY, 1, 65536);
-  public static final String HASHJOIN_MAX_BATCHES_PER_PARTITION_KEY = "exec.hashjoin.max_batches_per_partition";
-  public static final LongValidator HASHJOIN_MAX_BATCHES_PER_PARTITION_VALIDATOR = new RangeLongValidator(HASHJOIN_MAX_BATCHES_PER_PARTITION_KEY, 1, 65536);
+  public static final LongValidator HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR = new RangeLongValidator(HASHJOIN_MAX_BATCHES_IN_MEMORY_KEY, 0, 65536);
   public static final String HASHJOIN_NUM_PARTITIONS_KEY = "exec.hashjoin.num_partitions";
   public static final LongValidator HASHJOIN_NUM_PARTITIONS_VALIDATOR = new RangeLongValidator(HASHJOIN_NUM_PARTITIONS_KEY, 1, 128); // 1 means - no spilling
-  public static final String HASHJOIN_MAX_MEMORY_KEY = "exec.hashjoin.mem_limit";
-  public static final LongValidator HASHJOIN_MAX_MEMORY_VALIDATOR = new RangeLongValidator(HASHJOIN_MAX_MEMORY_KEY, 0, Integer.MAX_VALUE);
+  public static final String HASHJOIN_MAX_MEMORY_KEY = "drill.exec.hashjoin.mem_limit";
+  public static final LongValidator HASHJOIN_MAX_MEMORY_VALIDATOR = new RangeLongValidator(HASHJOIN_MAX_MEMORY_KEY, 0L, Long.MAX_VALUE);
   public static final String HASHJOIN_SPILL_DIRS = "drill.exec.hashjoin.spill.directories";
   public static final String HASHJOIN_SPILL_FILESYSTEM = "drill.exec.hashjoin.spill.fs";
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
index 67ef7f5..b56950a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
@@ -45,7 +45,9 @@ public class HashJoinPOP extends AbstractJoinPop {
     @Override
     public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
         Preconditions.checkArgument(children.size() == 2);
-        return new HashJoinPOP(children.get(0), children.get(1), conditions, joinType);
+        HashJoinPOP newHashJoin = new HashJoinPOP(children.get(0), children.get(1), conditions, joinType);
+        newHashJoin.setMaxAllocation(getMaxAllocation());
+        return newHashJoin;
     }
 
     public HashJoinPOP flipIfRight() {
@@ -64,4 +66,14 @@ public class HashJoinPOP extends AbstractJoinPop {
     public int getOperatorType() {
         return CoreOperatorType.HASH_JOIN_VALUE;
     }
+
+    @Override
+    public void setMaxAllocation(long maxAllocation) {
+        this.maxAllocation = maxAllocation;
+    }
+
+    @Override
+    public boolean isBufferedOperator() {
+        return true;
+    }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 368fd2c..4f6a117 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -459,7 +459,6 @@ public abstract class HashAggTemplate implements HashAggregator {
     for (int i = 0; i < numPartitions; i++ ) {
       try {
         this.htables[i] = baseHashTable.createAndSetupHashTable(groupByOutFieldIds);
-        this.htables[i].setMaxVarcharSize(maxColumnWidth);
       } catch (ClassTransformationException e) {
         throw UserException.unsupportedError(e)
             .message("Code generation error - likely an error in the code.")
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
index 89e32a8..a14bf8c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
@@ -53,7 +53,9 @@ import org.apache.drill.exec.vector.ValueVector;
 import com.sun.codemodel.JConditional;
 import com.sun.codemodel.JExpr;
 
-
+/**
+ * This is a master class used to generate code for {@link HashTable}s.
+ */
 public class ChainedHashTable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ChainedHashTable.class);
 
@@ -282,9 +284,7 @@ public class ChainedHashTable {
 
     int i = 0;
     for (LogicalExpression expr : keyExprs) {
-      boolean useSetSafe = !Types.isFixedWidthType(expr.getMajorType()) || Types.isRepeated(expr.getMajorType());
-      ValueVectorWriteExpression vvwExpr = new ValueVectorWriteExpression(htKeyFieldIds[i++], expr, useSetSafe);
-
+      ValueVectorWriteExpression vvwExpr = new ValueVectorWriteExpression(htKeyFieldIds[i++], expr, true);
       cg.addExpr(vvwExpr, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND);
     }
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
index 5b4adf1..5d0197a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl.common;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.drill.common.exceptions.RetryAfterSpillException;
 import org.apache.drill.common.exceptions.UserException;
@@ -27,12 +28,13 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.join.HashJoinBatch;
 import org.apache.drill.exec.physical.impl.join.HashJoinHelper;
+import org.apache.drill.exec.physical.impl.join.HashJoinMemoryCalculator;
 import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
@@ -51,8 +53,8 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 /**
- *  The class HashPartition
- *
+ * <h2>Overview</h2>
+ * <p>
  *    Created to represent an active partition for the Hash-Join operator
  *  (active means: currently receiving data, or its data is being probed; as opposed to fully
  *   spilled partitions).
@@ -61,16 +63,19 @@ import java.util.concurrent.TimeUnit;
  *    If all this partition's build/inner data was spilled, then it begins to work as an outer
  *  partition (see the flag "processingOuter") -- reusing some of the fields (e.g., currentBatch,
  *  currHVVector, writer, spillFile, partitionBatchesCount) for the outer.
+ *  </p>
  */
-public class HashPartition {
+public class HashPartition implements HashJoinMemoryCalculator.PartitionStat {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashPartition.class);
 
+  public static final String HASH_VALUE_COLUMN_NAME = "Hash_Values";
+
   private int partitionNum = -1; // the current number of this partition, as used by the operator
 
   private static final int VARIABLE_MIN_WIDTH_VALUE_SIZE = 8;
   private int maxColumnWidth = VARIABLE_MIN_WIDTH_VALUE_SIZE; // to control memory allocation for varchars
 
-  private final MajorType HVtype = MajorType.newBuilder()
+  public static final MajorType HVtype = MajorType.newBuilder()
     .setMinorType(MinorType.INT /* dataType */ )
     .setMode(DataMode.REQUIRED /* mode */ )
     .build();
@@ -102,7 +107,6 @@ public class HashPartition {
   private String spillFile;
 
   private BufferAllocator allocator;
-  private FragmentContext context;
   private int RECORDS_PER_BATCH;
   ChainedHashTable baseHashTable;
   private SpillSet spillSet;
@@ -111,14 +115,14 @@ public class HashPartition {
   private boolean outerBatchNotNeeded; // when the inner is whole in memory
   private RecordBatch buildBatch;
   private RecordBatch probeBatch;
-  private HashJoinBatch.inMemBatchCounter inMemBatches; // shared among all partitions
   private int cycleNum;
+  private List<HashJoinMemoryCalculator.BatchStat> inMemoryBatchStats = Lists.newArrayList();
+  private long partitionInMemorySize;
+  private long numInMemoryRecords;
 
   public HashPartition(FragmentContext context, BufferAllocator allocator, ChainedHashTable baseHashTable,
                        RecordBatch buildBatch, RecordBatch probeBatch,
-                       int recordsPerBatch, SpillSet spillSet, int partNum,
-                       HashJoinBatch.inMemBatchCounter inMemBatches, int cycleNum) {
-    this.context = context;
+                       int recordsPerBatch, SpillSet spillSet, int partNum, int cycleNum) {
     this.allocator = allocator;
     this.baseHashTable = baseHashTable;
     this.buildBatch = buildBatch;
@@ -126,12 +130,10 @@ public class HashPartition {
     this.RECORDS_PER_BATCH = recordsPerBatch;
     this.spillSet = spillSet;
     this.partitionNum = partNum;
-    this.inMemBatches = inMemBatches;
     this.cycleNum = cycleNum;
 
     try {
       this.hashTable = baseHashTable.createAndSetupHashTable(null);
-      this.hashTable.setMaxVarcharSize(maxColumnWidth);
     } catch (ClassTransformationException e) {
       throw UserException.unsupportedError(e)
         .message("Code generation error - likely an error in the code.")
@@ -173,7 +175,6 @@ public class HashPartition {
         if (newVV instanceof FixedWidthVector) {
           ((FixedWidthVector) newVV).allocateNew(RECORDS_PER_BATCH);
         } else if (newVV instanceof VariableWidthVector) {
-          // Need to check - (is this case ever used?) if a varchar falls under ObjectVector which is allocated on the heap !
           ((VariableWidthVector) newVV).allocateNew(maxColumnWidth * RECORDS_PER_BATCH, RECORDS_PER_BATCH);
         } else if (newVV instanceof ObjectVector) {
           ((ObjectVector) newVV).allocateNew(RECORDS_PER_BATCH);
@@ -183,7 +184,6 @@ public class HashPartition {
       }
 
       newVC.setRecordCount(0);
-      inMemBatches.inc(); ; // one more batch in memory
       success = true;
     } finally {
       if ( !success ) {
@@ -199,18 +199,19 @@ public class HashPartition {
   public void allocateNewCurrentBatchAndHV() {
     if ( outerBatchNotNeeded ) { return; } // skip when the inner is whole in memory
     currentBatch = allocateNewVectorContainer(processingOuter ? probeBatch : buildBatch);
-    currHVVector = new IntVector(MaterializedField.create("Hash_Values", HVtype), allocator);
+    currHVVector = new IntVector(MaterializedField.create(HASH_VALUE_COLUMN_NAME, HVtype), allocator);
     currHVVector.allocateNew(RECORDS_PER_BATCH);
   }
 
   /**
    *  Spills if needed
    */
-  public void appendInnerRow(VectorContainer buildContainer, int ind, int hashCode, boolean needsSpill) {
+  public void appendInnerRow(VectorContainer buildContainer, int ind, int hashCode, HashJoinMemoryCalculator.BuildSidePartitioning calc) {
 
     int pos = currentBatch.appendRow(buildContainer,ind);
     currHVVector.getMutator().set(pos, hashCode);   // store the hash value in the new column
     if ( pos + 1 == RECORDS_PER_BATCH ) {
+      boolean needsSpill = isSpilled || calc.shouldSpill();
       completeAnInnerBatch(true, needsSpill);
     }
   }
@@ -244,11 +245,17 @@ public class HashPartition {
       currentBatch.buildSchema(BatchSchema.SelectionVectorMode.NONE);
       tmpBatchesList.add(currentBatch);
       partitionBatchesCount++;
+
+      long batchSize = new RecordBatchSizer(currentBatch).actualSize();
+      inMemoryBatchStats.add(new HashJoinMemoryCalculator.BatchStat(currentBatch.getRecordCount(), batchSize));
+
+      partitionInMemorySize += batchSize;
+      numInMemoryRecords += currentBatch.getRecordCount();
     } else {
       freeCurrentBatchAndHVVector();
     }
     if ( needsSpill ) { // spill this batch/partition and free its memory
-      spillThisPartition(tmpBatchesList, processingOuter ? "outer" : "inner");
+      spillThisPartition();
     }
     if ( toInitialize ) { // allocate a new batch and HV vector
       allocateNewCurrentBatchAndHV();
@@ -258,20 +265,14 @@ public class HashPartition {
     }
   }
 
-  private void spillThisPartition(List<VectorContainer> vcList, String side) {
-    if ( vcList.size() == 0 ) { return; } // in case empty - nothing to spill
-    logger.debug("HashJoin: Spilling partition {}, current cycle {}, part size {} batches", partitionNum, cycleNum, vcList.size());
+  public void spillThisPartition() {
+    if ( tmpBatchesList.size() == 0 ) { return; } // in case empty - nothing to spill
+    logger.debug("HashJoin: Spilling partition {}, current cycle {}, part size {} batches", partitionNum, cycleNum, tmpBatchesList.size());
 
     // If this is the first spill for this partition, create an output stream
     if ( writer == null ) {
-      // A special case - when (outer is) empty
-      if ( vcList.get(0).getRecordCount() == 0 ) {
-        VectorContainer vc = vcList.remove(0);
-        inMemBatches.dec();
-        vc.zeroVectors();
-        return;
-      }
-      String suffix = cycleNum > 0 ? side + "_" + Integer.toString(cycleNum) : side;
+      final String side = processingOuter ? "outer" : "inner";
+      final String suffix = cycleNum > 0 ? side + "_" + Integer.toString(cycleNum) : side;
       spillFile = spillSet.getNextSpillFile(suffix);
 
       try {
@@ -285,15 +286,14 @@ public class HashPartition {
       isSpilled = true;
     }
 
-    while ( vcList.size() > 0 ) {
-      VectorContainer vc = vcList.remove(0);
-      inMemBatches.dec();
+    partitionInMemorySize = 0L;
+    numInMemoryRecords = 0L;
+    inMemoryBatchStats.clear();
+
+    while ( tmpBatchesList.size() > 0 ) {
+      VectorContainer vc = tmpBatchesList.remove(0);
 
       int numRecords = vc.getRecordCount();
-      if (numRecords == 0) { // Spilling should to skip an empty batch
-        vc.zeroVectors();
-        continue;
-      }
 
       // set the value count for outgoing batch value vectors
       for (VectorWrapper<?> v : vc) {
@@ -312,7 +312,6 @@ public class HashPartition {
       }
       vc.zeroVectors();
       logger.trace("HASH JOIN: Took {} us to spill {} records", writer.time(TimeUnit.MICROSECONDS), numRecords);
-
     }
   }
 
@@ -362,9 +361,32 @@ public class HashPartition {
   public void updateBatches() throws SchemaChangeException {
     hashTable.updateBatches();
   }
+
+  @Override
+  public List<HashJoinMemoryCalculator.BatchStat> getInMemoryBatches() {
+    return inMemoryBatchStats;
+  }
+
+  @Override
+  public int getNumInMemoryBatches() {
+    return inMemoryBatchStats.size();
+  }
+
+  @Override
   public boolean isSpilled() {
     return isSpilled;
   }
+
+  @Override
+  public long getNumInMemoryRecords() {
+    return numInMemoryRecords;
+  }
+
+  @Override
+  public long getInMemorySize() {
+    return partitionInMemorySize;
+  }
+
   public String getSpillFile() {
     return spillFile;
   }
@@ -378,7 +400,6 @@ public class HashPartition {
 
   private void freeCurrentBatchAndHVVector() {
     if ( currentBatch != null ) {
-      inMemBatches.dec();
       currentBatch.clear();
       currentBatch = null;
     }
@@ -421,11 +442,13 @@ public class HashPartition {
   }
 
   /**
-   *
+   * Creates the hash table and join helper for this partition. This method should only be called after all the build side records
+   * have been consumed.
    */
   public void buildContainersHashTableAndHelper() throws SchemaChangeException {
     if ( isSpilled ) { return; } // no building for spilled partitions
     containers = new ArrayList<>();
+    hashTable.updateInitialCapacity((int) getNumInMemoryRecords());
     for (int curr = 0; curr < partitionBatchesCount; curr++) {
       VectorContainer nextBatch = tmpBatchesList.get(curr);
       final int currentRecordCount = nextBatch.getRecordCount();
@@ -467,6 +490,9 @@ public class HashPartition {
     hashTable.getStats(newStats);
   }
 
+  /**
+   * Frees memory allocated to the {@link HashTable} and {@link HashJoinHelper}.
+   */
   public void clearHashTableAndHelper() {
     if (hashTable != null) {
       hashTable.clear();
@@ -487,7 +513,6 @@ public class HashPartition {
     }
     while ( tmpBatchesList.size() > 0 ) {
       VectorContainer vc = tmpBatchesList.remove(0);
-      inMemBatches.dec();
       vc.clear();
     }
     closeWriter();
@@ -497,4 +522,12 @@ public class HashPartition {
     if ( containers != null ) { containers.clear(); }
   }
 
+  /**
+   * Creates a debugging string containing information about memory usage.
+   * @return A debugging string.
+   */
+  public String makeDebugString() {
+    return String.format("[hashTable = %s]",
+      hashTable == null ? "None": hashTable.makeDebugString());
+  }
 } // class HashPartition
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
index ed8f388..194c865 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
@@ -46,17 +46,51 @@ public interface HashTable {
   int BATCH_SIZE = Character.MAX_VALUE + 1;
   int BATCH_MASK = 0x0000FFFF;
 
+  /**
+   * {@link HashTable#setup(HashTableConfig, BufferAllocator, VectorContainer, RecordBatch, RecordBatch, VectorContainer)} must be called before anything can be done to the
+   * {@link HashTable}.
+   * @param htConfig
+   * @param allocator
+   * @param incomingBuild
+   * @param incomingProbe
+   * @param outgoing
+   * @param htContainerOrig
+   */
   void setup(HashTableConfig htConfig, BufferAllocator allocator, VectorContainer incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing,
              VectorContainer htContainerOrig);
 
+  /**
+   * Updates the incoming (build and probe side) value vectors references in the {@link HashTableTemplate.BatchHolder}s.
+   * This is useful on OK_NEW_SCHEMA (need to verify).
+   * @throws SchemaChangeException
+   */
   void updateBatches() throws SchemaChangeException;
 
+  /**
+   * Computes the hash code for the record at the given index in the build side batch.
+   * @param incomingRowIdx The index of the build side record of interest.
+   * @return The hash code for the record at the given index in the build side batch.
+   * @throws SchemaChangeException
+   */
   int getBuildHashCode(int incomingRowIdx) throws SchemaChangeException;
 
+  /**
+   * Computes the hash code for the record at the given index in the probe side batch.
+   * @param incomingRowIdx The index of the probe side record of interest.
+   * @return The hash code for the record at the given index in the probe side batch.
+   * @throws SchemaChangeException
+   */
   int getProbeHashCode(int incomingRowIdx) throws SchemaChangeException;
 
   PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException, RetryAfterSpillException;
 
+  /**
+   * @param incomingRowIdx The index of the key in the probe batch.
+   * @param hashCode The hashCode of the key.
+   * @return Returns -1 if the data in the probe batch at the given incomingRowIdx is not in the hash table. Otherwise returns
+   * the composite index of the key in the hash table (index of BatchHolder and record in Batch Holder).
+   * @throws SchemaChangeException
+   */
   int probeForKey(int incomingRowIdx, int hashCode) throws SchemaChangeException;
 
   void getStats(HashTableStats stats);
@@ -65,15 +99,55 @@ public interface HashTable {
 
   boolean isEmpty();
 
+  /**
+   * Frees all the direct memory consumed by the {@link HashTable}.
+   */
   void clear();
 
-  public void updateIncoming(VectorContainer newIncoming, RecordBatch newIncomingProbe);
+  /**
+   * Update the initial capacity for the hash table. This method will be removed after the key vectors are removed from the hash table. It is used
+   * to allocate {@link HashTableTemplate.BatchHolder}s of appropriate size when the final size of the HashTable is known.
+   *
+   * <b>Warning!</b> Only call this method before you have inserted elements into the HashTable.
+   *
+   * @param initialCapacity The new initial capacity to use.
+   */
+  void updateInitialCapacity(int initialCapacity);
+
+  /**
+   * Changes the incoming probe and build side batches, and then updates all the value vector references in the {@link HashTableTemplate.BatchHolder}s.
+   * @param newIncoming The new build side batch.
+   * @param newIncomingProbe The new probe side batch.
+   */
+  void updateIncoming(VectorContainer newIncoming, RecordBatch newIncomingProbe);
 
+  /**
+   * Clears all the memory used by the {@link HashTable} and re-initializes it.
+   */
   void reset();
 
-  void setMaxVarcharSize(int size);
-
+  /**
+   * Retrieves the key columns and transfers them to the output container. Note this operation removes the key columns from the {@link HashTable}.
+   * @param batchIdx The index of a {@link HashTableTemplate.BatchHolder} in the HashTable.
+   * @param outContainer The destination container for the key columns.
+   * @param outStartIndex The start index of the key records to transfer.
+   * @param numRecords The number of key recorts to transfer.
+   * @param numExpectedRecords
+   * @return
+   */
   boolean outputKeys(int batchIdx, VectorContainer outContainer, int outStartIndex, int numRecords, int numExpectedRecords);
+
+  /**
+   * Returns a message containing memory usage statistics. Intended to be used for printing debugging or error messages.
+   * @return A debug string.
+   */
+  String makeDebugString();
+
+  /**
+   * The amount of direct memory consumed by the hash table.
+   * @return
+   */
+  long getActualSize();
 }
 
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableAllocationTracker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableAllocationTracker.java
new file mode 100644
index 0000000..d72278d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableAllocationTracker.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.common;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A helper class used by {@link HashTableTemplate} in order to pre-allocate {@link HashTableTemplate.BatchHolder}s of the appropriate size.
+ *
+ * <b>Note:</b> This class will be obsolete once the key vectors are removed from the hash table.
+ */
+class HashTableAllocationTracker
+{
+  private enum State {
+    NO_ALLOCATION_IN_PROGRESS,
+    ALLOCATION_IN_PROGRESS
+  }
+
+  private final HashTableConfig config;
+  private final int maxBatchHolderSize;
+
+  private State state = State.NO_ALLOCATION_IN_PROGRESS;
+  private int remainingCapacity;
+
+  protected HashTableAllocationTracker(final HashTableConfig config,
+                                       final int maxBatchHolderSize)
+  {
+    this.config = Preconditions.checkNotNull(config);
+    this.maxBatchHolderSize = maxBatchHolderSize;
+
+    remainingCapacity = config.getInitialCapacity();
+  }
+
+  public int getNextBatchHolderSize() {
+    state = State.ALLOCATION_IN_PROGRESS;
+
+    if (!config.getInitialSizeIsFinal()) {
+      // We don't know the final size of the hash table, so return the default max batch holder size
+      return maxBatchHolderSize;
+    } else {
+      // We know the final size of the hash table so we need to compute the next batch holder size.
+
+      Preconditions.checkState(remainingCapacity > 0);
+      return computeNextBatchHolderSize();
+    }
+  }
+
+  private int computeNextBatchHolderSize() {
+    return Math.min(remainingCapacity, maxBatchHolderSize);
+  }
+
+  public void commit() {
+    Preconditions.checkState(state.equals(State.ALLOCATION_IN_PROGRESS));
+
+    remainingCapacity -= computeNextBatchHolderSize();
+    state = State.NO_ALLOCATION_IN_PROGRESS;
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java
index aa2497c..b689390 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java
@@ -29,27 +29,58 @@ import java.util.List;
 public class HashTableConfig  {
 
   private final int initialCapacity;
+  private final boolean initialSizeIsFinal;
   private final float loadFactor;
   private final List<NamedExpression> keyExprsBuild;
   private final List<NamedExpression> keyExprsProbe;
   private final List<Comparator> comparators;
 
   @JsonCreator
-  public HashTableConfig(@JsonProperty("initialCapacity") int initialCapacity, @JsonProperty("loadFactor") float loadFactor,
+  public HashTableConfig(@JsonProperty("initialCapacity") int initialCapacity,
+                         @JsonProperty("loadFactor") float loadFactor,
                          @JsonProperty("keyExprsBuild") List<NamedExpression> keyExprsBuild,
                          @JsonProperty("keyExprsProbe") List<NamedExpression> keyExprsProbe,
                          @JsonProperty("comparators") List<Comparator> comparators) {
     this.initialCapacity = initialCapacity;
+    this.initialSizeIsFinal = false;
     this.loadFactor = loadFactor;
     this.keyExprsBuild = keyExprsBuild;
     this.keyExprsProbe = keyExprsProbe;
     this.comparators = comparators;
   }
 
+  @JsonCreator
+  public HashTableConfig(@JsonProperty("initialCapacity") int initialCapacity,
+                         @JsonProperty("initialCapacity") boolean initialSizeIsFinal,
+                         @JsonProperty("loadFactor") float loadFactor,
+                         @JsonProperty("keyExprsBuild") List<NamedExpression> keyExprsBuild,
+                         @JsonProperty("keyExprsProbe") List<NamedExpression> keyExprsProbe,
+                         @JsonProperty("comparators") List<Comparator> comparators) {
+    this.initialCapacity = initialCapacity;
+    this.initialSizeIsFinal = initialSizeIsFinal;
+    this.loadFactor = loadFactor;
+    this.keyExprsBuild = keyExprsBuild;
+    this.keyExprsProbe = keyExprsProbe;
+    this.comparators = comparators;
+  }
+
+  public HashTableConfig withInitialCapacity(int initialCapacity) {
+    return new HashTableConfig(initialCapacity,
+      initialSizeIsFinal,
+      loadFactor,
+      keyExprsBuild,
+      keyExprsProbe,
+      comparators);
+  }
+
   public int getInitialCapacity() {
     return initialCapacity;
   }
 
+  public boolean getInitialSizeIsFinal() {
+    return initialSizeIsFinal;
+  }
+
   public float getLoadFactor() {
     return loadFactor;
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index b7a7f7b..06e3bcd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -19,18 +19,23 @@ package org.apache.drill.exec.physical.impl.common;
 
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.Set;
 
 import javax.inject.Named;
 
+import com.google.common.collect.Sets;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.compile.sig.RuntimeOverridden;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.AllocationManager;
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.impl.join.HashJoinMemoryCalculator;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
@@ -38,11 +43,13 @@ import org.apache.drill.exec.vector.BigIntVector;
 import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.VariableWidthVector;
 import org.apache.drill.common.exceptions.RetryAfterSpillException;
+import org.apache.drill.exec.vector.VariableWidthVector;
 
 public abstract class HashTableTemplate implements HashTable {
 
+  public static final int MAX_VARCHAR_SIZE = 8; // This is a bad heuristic which will be eliminated when the keys are removed from the HashTable.
+
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashTable.class);
   private static final boolean EXTRA_DEBUG = false;
 
@@ -90,6 +97,9 @@ public abstract class HashTableTemplate implements HashTable {
   // Hash table configuration parameters
   private HashTableConfig htConfig;
 
+  // Allocation tracker
+  private HashTableAllocationTracker allocationTracker;
+
   // The original container from which others may be cloned
   private VectorContainer htContainerOrig;
 
@@ -99,8 +109,6 @@ public abstract class HashTableTemplate implements HashTable {
 
   private int resizingTime = 0;
 
-  private int maxVarcharSize = 8; // for varchar allocation
-
   // This class encapsulates the links, keys and values for up to BATCH_SIZE
   // *unique* records. Thus, suppose there are N incoming record batches, each
   // of size BATCH_SIZE..but they have M unique keys altogether, the number of
@@ -121,7 +129,7 @@ public abstract class HashTableTemplate implements HashTable {
 
     private int batchIndex = 0;
 
-    public BatchHolder(int idx) {
+    public BatchHolder(int idx, int newBatchHolderSize) {
 
       this.batchIndex = idx;
 
@@ -132,24 +140,24 @@ public abstract class HashTableTemplate implements HashTable {
           ValueVector vv = TypeHelper.getNewVector(w.getField(), allocator);
           htContainer.add(vv); // add to container before actual allocation (to allow clearing in case of an OOM)
 
-          // Capacity for "hashValues" and "links" vectors is BATCH_SIZE records. It is better to allocate space for
-          // "key" vectors to store as close to as BATCH_SIZE records. A new BatchHolder is created when either BATCH_SIZE
+          // Capacity for "hashValues" and "links" vectors is newBatchHolderSize records. It is better to allocate space for
+          // "key" vectors to store as close to as newBatchHolderSize records. A new BatchHolder is created when either newBatchHolderSize
           // records are inserted or "key" vectors ran out of space. Allocating too less space for "key" vectors will
           // result in unused space in "hashValues" and "links" vectors in the BatchHolder. Also for each new
-          // BatchHolder we create a SV4 vector of BATCH_SIZE in HashJoinHelper.
+          // BatchHolder we create a SV4 vector of newBatchHolderSize in HashJoinHelper.
           if (vv instanceof FixedWidthVector) {
-            ((FixedWidthVector) vv).allocateNew(BATCH_SIZE);
+            ((FixedWidthVector) vv).allocateNew(newBatchHolderSize);
           } else if (vv instanceof VariableWidthVector) {
             long beforeMem = allocator.getAllocatedMemory();
-            ((VariableWidthVector) vv).allocateNew(maxVarcharSize * BATCH_SIZE, BATCH_SIZE);
-            logger.trace("HT allocated {} for varchar of max width {}",allocator.getAllocatedMemory() - beforeMem, maxVarcharSize);
+            ((VariableWidthVector) vv).allocateNew(MAX_VARCHAR_SIZE * newBatchHolderSize, newBatchHolderSize);
+            logger.trace("HT allocated {} for varchar of max width {}",allocator.getAllocatedMemory() - beforeMem, MAX_VARCHAR_SIZE);
           } else {
             vv.allocateNew();
           }
         }
 
-        links = allocMetadataVector(HashTable.BATCH_SIZE, EMPTY_SLOT);
-        hashValues = allocMetadataVector(HashTable.BATCH_SIZE, 0);
+        links = allocMetadataVector(newBatchHolderSize, EMPTY_SLOT);
+        hashValues = allocMetadataVector(newBatchHolderSize, 0);
         success = true;
       } finally {
         if (!success) {
@@ -182,7 +190,7 @@ public abstract class HashTableTemplate implements HashTable {
         boolean isProbe) throws SchemaChangeException {
 
       int currentIdxWithinBatch = currentIdxHolder.value & BATCH_MASK;
-      boolean match = false;
+      boolean match;
 
       if (currentIdxWithinBatch >= HashTable.BATCH_SIZE) {
         logger.debug("Batch size = {}, incomingRowIdx = {}, currentIdxWithinBatch = {}.", HashTable.BATCH_SIZE,
@@ -323,12 +331,6 @@ public abstract class HashTableTemplate implements HashTable {
     }
 
     private boolean outputKeys(VectorContainer outContainer, int outStartIndex, int numRecords, int numExpectedRecords) {
-
-      /** for debugging
-      BigIntVector vv0 = getValueVector(0);
-      BigIntHolder holder = new BigIntHolder();
-      */
-
       // set the value count for htContainer's value vectors before the transfer ..
       setValueCount();
 
@@ -352,26 +354,6 @@ public abstract class HashTableTemplate implements HashTable {
         }
       }
 
-/*
-      logger.debug("Attempting to output keys for batch index: {} from index {} to maxOccupiedIndex {}.",
-      this.batchIndex, 0, maxOccupiedIdx);
-      for (int i = batchOutputCount; i <= maxOccupiedIdx; i++) {
-        if (outputRecordKeys(i, batchOutputCount) ) {
-          if (EXTRA_DEBUG) logger.debug("Outputting keys to output index: {}", batchOutputCount) ;
-
-          // debugging
-          // holder.value = vv0.getAccessor().get(i);
-          // if (holder.value == 100018 || holder.value == 100021) {
-          //  logger.debug("Outputting key = {} at index - {} to outgoing index = {}.", holder.value, i,
-          //      batchOutputCount);
-          // }
-
-          batchOutputCount++;
-        } else {
-          return false;
-        }
-      }
- */
       return true;
     }
 
@@ -443,8 +425,21 @@ public abstract class HashTableTemplate implements HashTable {
     protected void outputRecordKeys(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) throws SchemaChangeException {
     }
 
-  } // class BatchHolder
+    public long getActualSize() {
+      Set<AllocationManager.BufferLedger> ledgers = Sets.newHashSet();
+      links.collectLedgers(ledgers);
+      hashValues.collectLedgers(ledgers);
+
+      long size = 0L;
 
+      for (AllocationManager.BufferLedger ledger: ledgers) {
+        size += ledger.getAccountedSize();
+      }
+
+      size += new RecordBatchSizer(htContainer).actualSize();
+      return size;
+    }
+  }
 
   @Override
   public void setup(HashTableConfig htConfig, BufferAllocator allocator, VectorContainer incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig) {
@@ -471,6 +466,7 @@ public abstract class HashTableTemplate implements HashTable {
     this.incomingProbe = incomingProbe;
     this.outgoing = outgoing;
     this.htContainerOrig = htContainerOrig;
+    this.allocationTracker = new HashTableAllocationTracker(htConfig, BATCH_SIZE);
 
     // round up the initial capacity to nearest highest power of 2
     tableSize = roundUpToPowerOf2(initialCap);
@@ -499,6 +495,12 @@ public abstract class HashTableTemplate implements HashTable {
   }
 
   @Override
+  public void updateInitialCapacity(int initialCapacity) {
+    htConfig = htConfig.withInitialCapacity(initialCapacity);
+    allocationTracker = new HashTableAllocationTracker(htConfig, BATCH_SIZE);
+  }
+
+  @Override
   public void updateBatches() throws SchemaChangeException {
     doSetup(incomingBuild, incomingProbe);
     for (BatchHolder batchHolder : batchHolders) {
@@ -711,19 +713,21 @@ public abstract class HashTableTemplate implements HashTable {
     int totalBatchSize = batchHolders.size() * BATCH_SIZE;
 
     if (currentIdx >= totalBatchSize) {
-      BatchHolder bh = newBatchHolder(batchHolders.size());
+      BatchHolder bh = newBatchHolder(batchHolders.size(), allocationTracker.getNextBatchHolderSize());
       batchHolders.add(bh);
       bh.setup();
       if (EXTRA_DEBUG) {
         logger.debug("HashTable: Added new batch. Num batches = {}.", batchHolders.size());
       }
+
+      allocationTracker.commit();
       return true;
     }
     return false;
   }
 
-  protected BatchHolder newBatchHolder(int index) { // special method to allow debugging of gen code
-    return new BatchHolder(index);
+  protected BatchHolder newBatchHolder(int index, int newBatchHolderSize) { // special method to allow debugging of gen code
+    return new BatchHolder(index, newBatchHolderSize);
   }
 
   // Resize the hash table if needed by creating a new one with double the number of buckets.
@@ -831,9 +835,6 @@ public abstract class HashTableTemplate implements HashTable {
     return vector;
   }
 
-  @Override
-  public void setMaxVarcharSize(int size) { maxVarcharSize = size; }
-
   // These methods will be code-generated in the context of the outer class
   protected abstract void doSetup(@Named("incomingBuild") VectorContainer incomingBuild, @Named("incomingProbe") RecordBatch incomingProbe) throws SchemaChangeException;
 
@@ -841,4 +842,27 @@ public abstract class HashTableTemplate implements HashTable {
 
   protected abstract int getHashProbe(@Named("incomingRowIdx") int incomingRowIdx, @Named("seedValue") int seedValue) throws SchemaChangeException;
 
+  @Override
+  public long getActualSize() {
+    Set<AllocationManager.BufferLedger> ledgers = Sets.newHashSet();
+    startIndices.collectLedgers(ledgers);
+
+    long size = 0L;
+
+    for (AllocationManager.BufferLedger ledger: ledgers) {
+      size += ledger.getAccountedSize();
+    }
+
+    for (BatchHolder batchHolder: batchHolders) {
+      size += batchHolder.getActualSize();
+    }
+
+    return size;
+  }
+
+  @Override
+  public String makeDebugString() {
+    return String.format("[numBuckets = %d, numEntries = %d, numBatchHolders = %d, actualSize = %s]",
+      numBuckets(), numEntries, batchHolders.size(), HashJoinMemoryCalculator.PartitionStatSet.prettyPrintBytes(getActualSize()));
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 998e0b1..0c46e36 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -20,13 +20,16 @@ package org.apache.drill.exec.physical.impl.join;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
+import java.util.Set;
 
 import com.google.common.collect.Lists;
-import org.apache.drill.common.exceptions.RetryAfterSpillException;
+import com.google.common.collect.Sets;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.data.JoinCondition;
 import org.apache.drill.common.logical.data.NamedExpression;
 import org.apache.drill.common.types.TypeProtos;
@@ -34,13 +37,13 @@ import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BaseAllocator;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.physical.base.AbstractBase;
 import org.apache.drill.exec.physical.config.HashJoinPOP;
 import org.apache.drill.exec.physical.impl.aggregate.SpilledRecordbatch;
 import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
@@ -55,15 +58,27 @@ import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VariableWidthVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
 import org.apache.calcite.rel.core.JoinRelType;
 
+/**
+ *
+ */
 public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
   protected static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashJoinBatch.class);
 
-  private int RECORDS_PER_BATCH = 128; // 1024; // internal batches
+  /**
+   * The maximum number of records within each internal batch.
+   */
+  private int RECORDS_PER_BATCH; // internal batches
+
+  /**
+   * The maximum number of records in each outgoing batch.
+   */
   private static final int TARGET_RECORDS_PER_BATCH = 4000;
 
   // Join type, INNER, LEFT, RIGHT or OUTER
@@ -71,32 +86,40 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
 
   // Join conditions
   private final List<JoinCondition> conditions;
+  private final List<NamedExpression> rightExpr;
 
-  private final List<Comparator> comparators;
+  /**
+   * Names of the join columns. This names are used in order to help estimate the size of the {@link HashTable}s.
+   */
+  private final Set<String> buildJoinColumns;
 
   // Fields used for partitioning
-  private int numPartitions = 1; // must be 2 to the power of bitsInMask (set in setup())
+
+  /**
+   * The number of {@link HashPartition}s. This is configured via a system option and set in {@link #partitionNumTuning(int, HashJoinMemoryCalculator.BuildSidePartitioning)}.
+   */
+  private int numPartitions = 1; // must be 2 to the power of bitsInMask
   private int partitionMask = 0; // numPartitions - 1
   private int bitsInMask = 0; // number of bits in the MASK
+
+  /**
+   * The master class used to generate {@link HashTable}s.
+   */
   private ChainedHashTable baseHashTable;
   private boolean buildSideIsEmpty = true;
   private boolean canSpill = true;
   private boolean wasKilled; // a kill was received, may need to clean spilled partns
 
+  /**
+   * This array holds the currently active {@link HashPartition}s.
+   */
   HashPartition partitions[];
 
   // Number of records in the output container
   private int outputRecords;
 
   // Schema of the build side
-  private BatchSchema rightSchema = null;
-
-  private final HashTableStats htStats = new HashTableStats();
-
-  private final MajorType HVtype = MajorType.newBuilder()
-    .setMinorType(org.apache.drill.common.types.TypeProtos.MinorType.INT /* dataType */ )
-    .setMode(DataMode.REQUIRED /* mode */ )
-    .build();
+  private BatchSchema rightSchema;
 
   private int rightHVColPosition;
   private BufferAllocator allocator;
@@ -111,17 +134,11 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
   private int cycleNum = 0; // primary, secondary, tertiary, etc.
   private int originalPartition = -1; // the partition a secondary reads from
   IntVector read_HV_vector; // HV vector that was read from the spilled batch
-  private int MAX_BATCHES_IN_MEMORY;
-  private int MAX_BATCHES_PER_PARTITION;
-
-  public class inMemBatchCounter {
-    private int inMemBatches;
-    public void inc() { inMemBatches++; }
-    public void dec() { inMemBatches--; }
-    public int value() { return inMemBatches; }
-  }
-  public inMemBatchCounter inMemBatches = new inMemBatchCounter();
+  private int maxBatchesInMemory;
 
+  /**
+   * This holds information about the spilled partitions for the build and probe side.
+   */
   private static class HJSpilledPartition {
     public int innerSpilledBatches;
     public String innerSpillFile;
@@ -131,10 +148,12 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
     int origPartn;
     int prevOrigPartn; }
 
+  /**
+   * Queue of spilled partitions to process.
+   */
   private ArrayList<HJSpilledPartition> spilledPartitionsList;
   private HJSpilledPartition spilledInners[]; // for the outer to find the partition
 
-  private int operatorId; // for the spill file name
   public enum Metric implements MetricDef {
 
     NUM_BUCKETS,
@@ -179,6 +198,78 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
   }
 
   @Override
+  protected boolean prefetchFirstBatchFromBothSides() {
+    leftUpstream = sniffNonEmptyBatch(0, left);
+    rightUpstream = sniffNonEmptyBatch(1, right);
+
+    if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) {
+      state = BatchState.STOP;
+      return false;
+    }
+
+    if (leftUpstream == IterOutcome.OUT_OF_MEMORY || rightUpstream == IterOutcome.OUT_OF_MEMORY) {
+      state = BatchState.OUT_OF_MEMORY;
+      return false;
+    }
+
+    if (checkForEarlyFinish()) {
+      state = BatchState.DONE;
+      return false;
+    }
+
+    state = BatchState.FIRST;  // Got our first batches on both sides
+    return true;
+  }
+
+  /**
+   * Currently in order to accurately predict memory usage for spilling, the first non-empty build side and probe side batches are needed. This method
+   * fetches the first non-empty batch from the left or right side.
+   * @param inputIndex Index specifying whether to work with the left or right input.
+   * @param recordBatch The left or right record batch.
+   * @return The {@link org.apache.drill.exec.record.RecordBatch.IterOutcome} for the left or right record batch.
+   */
+  private IterOutcome sniffNonEmptyBatch(int inputIndex, RecordBatch recordBatch) {
+    while (true) {
+      IterOutcome outcome = next(inputIndex, recordBatch);
+
+      switch (outcome) {
+        case OK_NEW_SCHEMA:
+          // We need to have the schema of the build side even when the build side is empty
+          rightSchema = buildBatch.getSchema();
+          // position of the new "column" for keeping the hash values (after the real columns)
+          rightHVColPosition = buildBatch.getContainer().getNumberOfColumns();
+          // new schema can also contain records
+        case OK:
+          if (recordBatch.getRecordCount() == 0) {
+            continue;
+          }
+          // We got a non empty batch
+        default:
+          // Other cases termination conditions
+          return outcome;
+      }
+    }
+  }
+
+  /**
+   * Determines the memory calculator to use. If maxNumBatches is configured simple batch counting is used to spill. Otherwise
+   * memory calculations are used to determine when to spill.
+   * @return The memory calculator to use.
+   */
+  public HashJoinMemoryCalculator getCalculatorImpl() {
+    if (maxBatchesInMemory == 0) {
+      final double safetyFactor = context.getOptions().getDouble(ExecConstants.HASHJOIN_SAFETY_FACTOR_KEY);
+      final double fragmentationFactor = context.getOptions().getDouble(ExecConstants.HASHJOIN_FRAGMENTATION_FACTOR_KEY);
+      final double hashTableDoublingFactor = context.getOptions().getDouble(ExecConstants.HASHJOIN_HASH_DOUBLE_FACTOR_KEY);
+      final String hashTableCalculatorType = context.getOptions().getString(ExecConstants.HASHJOIN_HASHTABLE_CALC_TYPE_KEY);
+
+      return new HashJoinMemoryCalculatorImpl(safetyFactor, fragmentationFactor, hashTableDoublingFactor, hashTableCalculatorType);
+    } else {
+      return new MechanicalHashJoinMemoryCalculator(maxBatchesInMemory);
+    }
+  }
+
+  @Override
   public IterOutcome innerNext() {
     try {
       /* If we are here for the first time, execute the build phase of the
@@ -265,13 +356,14 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
               this.cleanup();
               throw UserException
                 .unsupportedError()
-                .message("Hash-Join can not partition inner data any further (too many join-key duplicates? - try merge-join)")
+                .message("Hash-Join can not partition inner data any further (too many join-key duplicates? - try merge-join)\n"
+                + "On cycle num %d mem available %d num partitions %d", cycleNum, allocator.getLimit(), numPartitions)
                 .build(logger);
             }
           }
           logger.debug("Start reading spilled partition {} (prev {}) from cycle {} (with {}-{} batches). More {} spilled partitions left.", currSp.origPartn, currSp.prevOrigPartn, currSp.cycleNum, currSp.outerSpilledBatches, currSp.innerSpilledBatches, spilledPartitionsList.size());
 
-          state = BatchState.FIRST;  // build again, initialize probe, etc
+          state = BatchState.FIRST;  // TODO need to determine if this is still necessary since prefetchFirstBatchFromBothSides sets this
 
           return innerNext(); // start processing the next spilled partition "recursively"
         }
@@ -307,14 +399,19 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
   }
 
   private void setupHashTable() throws SchemaChangeException {
+    final List<Comparator> comparators = Lists.newArrayListWithExpectedSize(conditions.size());
+    // When DRILL supports Java 8, use the following instead of the for() loop
+    // conditions.forEach(cond->comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond)));
+    for (int i=0; i<conditions.size(); i++) {
+      JoinCondition cond = conditions.get(i);
+      comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond));
+    }
+
     // Setup the hash table configuration object
-    int conditionsSize = conditions.size();
-    final List<NamedExpression> rightExpr = new ArrayList<>(conditionsSize);
-    List<NamedExpression> leftExpr = new ArrayList<>(conditionsSize);
+    List<NamedExpression> leftExpr = new ArrayList<>(conditions.size());
 
     // Create named expressions from the conditions
-    for (int i = 0; i < conditionsSize; i++) {
-      rightExpr.add(new NamedExpression(conditions.get(i).getRight(), new FieldReference("build_side_" + i)));
+    for (int i = 0; i < conditions.size(); i++) {
       leftExpr.add(new NamedExpression(conditions.get(i).getLeft(), new FieldReference("probe_side_" + i)));
     }
 
@@ -328,8 +425,9 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
         throw new SchemaChangeException(errorMsg);
       }
     }
-    final HashTableConfig htConfig = new HashTableConfig((int) context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE), HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr, comparators);
 
+    final HashTableConfig htConfig = new HashTableConfig((int) context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE),
+      true, HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr, comparators);
 
     // Create the chained hash table
     baseHashTable =
@@ -352,23 +450,10 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
     //  ================================
 
     // Set the number of partitions from the configuration (raise to a power of two, if needed)
-    numPartitions = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR);
-    if ( numPartitions == 1 ) { //
-      canSpill = false;
-      logger.warn("Spilling is disabled due to configuration setting of num_partitions to 1");
-    }
-    numPartitions = BaseAllocator.nextPowerOfTwo(numPartitions); // in case not a power of 2
     // Based on the number of partitions: Set the mask and bit count
     partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
     bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
 
-    RECORDS_PER_BATCH = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR);
-
-    MAX_BATCHES_IN_MEMORY = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR);
-    MAX_BATCHES_PER_PARTITION = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_BATCHES_PER_PARTITION_VALIDATOR);
-
-    //  =================================
-
     // Create the FIFO list of spilled partitions (pairs - inner/outer)
     spilledPartitionsList = new ArrayList<>();
 
@@ -382,17 +467,73 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
    * Initialize fields (that may be reused when reading spilled partitions)
    */
   private void initializeBuild() {
-    assert inMemBatches.value() == 0; // check that no in-memory batches left
     baseHashTable.updateIncoming(buildBatch, probeBatch); // in case we process the spilled files
     // Recreate the partitions every time build is initialized
     for (int part = 0; part < numPartitions; part++ ) {
       partitions[part] = new HashPartition(context, allocator, baseHashTable, buildBatch, probeBatch,
-        RECORDS_PER_BATCH, spillSet, part, inMemBatches, cycleNum);
+        RECORDS_PER_BATCH, spillSet, part, cycleNum);
     }
 
     spilledInners = new HJSpilledPartition[numPartitions];
 
   }
+
+  /**
+   * Tunes the number of partitions used by {@link HashJoinBatch}. If it is not possible to spill it gives up and reverts
+   * to unbounded in memory operation.
+   * @param maxBatchSize
+   * @param buildCalc
+   * @return
+   */
+  private HashJoinMemoryCalculator.BuildSidePartitioning partitionNumTuning(
+    int maxBatchSize,
+    HashJoinMemoryCalculator.BuildSidePartitioning buildCalc) {
+    // Get auto tuning result
+    numPartitions = buildCalc.getNumPartitions();
+
+    if (logger.isDebugEnabled()) {
+      logger.debug(buildCalc.makeDebugString());
+    }
+
+    if (buildCalc.getMaxReservedMemory() > allocator.getLimit()) {
+      // We don't have enough memory to do any spilling. Give up and do no spilling and have no limits
+
+      // TODO dirty hack to prevent regressions. Remove this once batch sizing is implemented.
+      // We don't have enough memory to do partitioning, we have to do everything in memory
+      final String message = String.format("When using the minimum number of partitions %d we require %s memory but only have %s available. " +
+          "Forcing legacy behavoir of using unbounded memory in order to prevent regressions.",
+        numPartitions,
+        FileUtils.byteCountToDisplaySize(buildCalc.getMaxReservedMemory()),
+        FileUtils.byteCountToDisplaySize(allocator.getLimit()));
+      logger.warn(message);
+
+      // create a Noop memory calculator
+      final HashJoinMemoryCalculator calc = getCalculatorImpl();
+      calc.initialize(false);
+      buildCalc = calc.next();
+
+      buildCalc.initialize(true,
+        true, // TODO Fix after growing hash values bug fixed
+        buildBatch,
+        probeBatch,
+        buildJoinColumns,
+        allocator.getLimit(),
+        numPartitions,
+        RECORDS_PER_BATCH,
+        RECORDS_PER_BATCH,
+        maxBatchSize,
+        maxBatchSize,
+        TARGET_RECORDS_PER_BATCH,
+        HashTable.DEFAULT_LOAD_FACTOR);
+
+      numPartitions = 1; // We are only using one partition
+      canSpill = false; // We cannot spill
+      allocator.setLimit(AbstractBase.MAX_ALLOCATION); // Violate framework and force unbounded memory
+    }
+
+    return buildCalc;
+  }
+
   /**
    *  Execute the BUILD phase; first read incoming and split rows into partitions;
    *  may decide to spill some of the partitions
@@ -400,24 +541,56 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
    * @throws SchemaChangeException
    */
   public void executeBuildPhase() throws SchemaChangeException {
-    final HashJoinMemoryCalculator.BuildSidePartitioning buildCalc = new HashJoinMemoryCalculatorImpl().next();
-    boolean hasProbeData = leftUpstream != IterOutcome.NONE;
-
-    if ( rightUpstream == IterOutcome.NONE ) { return; } // empty right
+    if (rightUpstream == IterOutcome.NONE) {
+      // empty right
+      return;
+    }
 
-    // skip first batch if count is zero, as it may be an empty schema batch
-    if (false && buildBatch.getRecordCount() == 0) {
-      for (final VectorWrapper<?> w : buildBatch) {
-        w.clear();
+    HashJoinMemoryCalculator.BuildSidePartitioning buildCalc;
+    boolean firstCycle = cycleNum == 0;
+
+    {
+      // Initializing build calculator
+      // Limit scope of these variables to this block
+      int maxBatchSize = firstCycle? RecordBatch.MAX_BATCH_SIZE: RECORDS_PER_BATCH;
+      boolean hasProbeData = leftUpstream != IterOutcome.NONE;
+      boolean doMemoryCalculation = canSpill && hasProbeData;
+      HashJoinMemoryCalculator calc = getCalculatorImpl();
+
+      calc.initialize(doMemoryCalculation);
+      buildCalc = calc.next();
+
+      // We've sniffed first non empty build and probe batches so we have enough information to createa calculator
+      buildCalc.initialize(firstCycle, true, // TODO Fix after growing hash values bug fixed
+        buildBatch,
+        probeBatch,
+        buildJoinColumns,
+        allocator.getLimit(),
+        numPartitions,
+        RECORDS_PER_BATCH,
+        RECORDS_PER_BATCH,
+        maxBatchSize,
+        maxBatchSize,
+        TARGET_RECORDS_PER_BATCH,
+        HashTable.DEFAULT_LOAD_FACTOR);
+
+      if (firstCycle && doMemoryCalculation) {
+        // Do auto tuning
+        buildCalc = partitionNumTuning(maxBatchSize, buildCalc);
       }
-      rightUpstream = next(buildBatch);
     }
 
-    //Setup the underlying hash table
-    if ( cycleNum == 0 ) { delayedSetup(); } // first time only
+    if (firstCycle) {
+      // Do initial setup only on the first cycle
+      delayedSetup();
+    }
 
     initializeBuild();
 
+    // Make the calculator aware of our partitions
+    final HashJoinMemoryCalculator.PartitionStatSet partitionStatSet = new HashJoinMemoryCalculator.PartitionStatSet(partitions);
+    buildCalc.setPartitionStatSet(partitionStatSet);
+
     boolean moreData = true;
     while (moreData) {
       switch (rightUpstream) {
@@ -440,32 +613,15 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
         if ( cycleNum > 0 ) {
           read_HV_vector = (IntVector) buildBatch.getContainer().getLast();
         }
+
         // For every record in the build batch, hash the key columns and keep the result
         for (int ind = 0; ind < currentRecordCount; ind++) {
           int hashCode = ( cycleNum == 0 ) ? partitions[0].getBuildHashCode(ind)
             : read_HV_vector.getAccessor().get(ind); // get the hash value from the HV column
           int currPart = hashCode & partitionMask ;
           hashCode >>>= bitsInMask;
-/*
-          int pos = currentBatches[currPart].appendRow(buildBatch.getContainer(),ind);
-          currHVVectors[currPart].getMutator().set(pos, hashCode);   // store the hash value in the new column
-          if ( pos + 1 == RECORDS_PER_BATCH ) {
-            // The current decision on when-to-spill is crude
-            completeAnInnerBatch(currPart,true,
-              isSpilled(currPart) ||  // once spilled - then spill every new full batch
-                canSpill &&
-                  ( inMemBatches > MAX_BATCHES_IN_MEMORY ||
-                    tmpBatchesList[currPart].size() > MAX_BATCHES_PER_PARTITION ));
-          }
-*/
-
           // Append the new inner row to the appropriate partition; spill (that partition) if needed
-          partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind, hashCode,
-            // The current decision on when-to-spill is crude ...
-            partitions[currPart].isSpilled() || // once spilled - then spill every new full batch
-            canSpill &&
-              ( inMemBatches.value() > MAX_BATCHES_IN_MEMORY ||
-                partitions[currPart].getPartitionBatchesCount() > MAX_BATCHES_PER_PARTITION ) ); // may spill if needed
+          partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind, hashCode, buildCalc); // may spill if needed
         }
 
         if ( read_HV_vector != null ) {
@@ -483,66 +639,58 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
     // the spilled partitions list
     for (HashPartition partn : partitions) {
       partn.completeAnInnerBatch(false, partn.isSpilled() );
-      if ( partn.isSpilled() ) {
-        HJSpilledPartition sp = new HJSpilledPartition();
-        sp.innerSpillFile = partn.getSpillFile();
-        sp.innerSpilledBatches = partn.getPartitionBatchesCount();
-        sp.cycleNum = cycleNum; // remember the current cycle
-        sp.origPartn = partn.getPartitionNum(); // for debugging / filename
-        sp.prevOrigPartn = originalPartition; // for debugging / filename
-        spilledPartitionsList.add(sp);
-
-        spilledInners[partn.getPartitionNum()] = sp; // for the outer to find the SP later
-        partn.closeWriter();
-      }
     }
 
+    HashJoinMemoryCalculator.PostBuildCalculations postBuildCalc = buildCalc.next();
+    postBuildCalc.initialize();
+
     //
     //  Traverse all the in-memory partitions' incoming batches, and build their hash tables
     //
-/*
-    for (int currPart = 0; currPart < numPartitions; currPart++) {
 
-      // each partition is a regular array of batches
-      ArrayList<VectorContainer> thisPart = new ArrayList<>();
+    for (int index = 0; index < partitions.length; index++) {
+      final HashPartition partn = partitions[index];
 
-      for (int curr = 0; curr < partitionBatchesCount[currPart]; curr++) {
-        VectorContainer nextBatch = tmpBatchesList[currPart].get(curr);
-        final int currentRecordCount = nextBatch.getRecordCount();
-
-        // For every incoming build batch, we create a matching helper batch
-        hjHelpers[currPart].addNewBatch(currentRecordCount);
-
-        // Holder contains the global index where the key is hashed into using the hash table
-        final IndexPointer htIndex = new IndexPointer();
-
-        hashTables[currPart].updateIncoming(nextBatch, probeBatch );
-
-        IntVector HV_vector = (IntVector) nextBatch.getValueVector(rightHVColPosition).getValueVector();
+      if (partn.isSpilled()) {
+        // Don't build hash tables for spilled partitions
+        continue;
+      }
 
-        for (int recInd = 0; recInd < currentRecordCount; recInd++) {
-          int hashCode = HV_vector.getAccessor().get(recInd);
-          try {
-            hashTables[currPart].put(recInd, htIndex, hashCode);
-          } catch (RetryAfterSpillException RE) {
-            throw new OutOfMemoryException("HT put");
-          } // Hash Join can not retry yet
-                        // Use the global index returned by the hash table, to store
-                        //the current record index and batch index. This will be used
-                        // later when we probe and find a match.
-                        //
-          hjHelpers[currPart].setCurrentIndex(htIndex.value, curr , recInd);
+      try {
+        if (postBuildCalc.shouldSpill()) {
+          // Spill this partition if we need to make room
+          partn.spillThisPartition();
+        } else {
+          // Only build hash tables for partitions that are not spilled
+          partn.buildContainersHashTableAndHelper();
         }
-
-        thisPart.add(nextBatch);
+      } catch (OutOfMemoryException e) {
+        final String message = "Failed building hash table on partition " + index + ":\n"
+          + makeDebugString() + "\n"
+          + postBuildCalc.makeDebugString();
+        // Include debug info
+        throw new OutOfMemoryException(message, e);
       }
+    }
 
-      partitionContainers.add(thisPart);
-*/
-    for (HashPartition partn : partitions) {
-      partn.buildContainersHashTableAndHelper();
+    if (logger.isDebugEnabled()) {
+      logger.debug(postBuildCalc.makeDebugString());
     }
 
+    for (HashPartition partn : partitions) {
+      if ( partn.isSpilled() ) {
+        HJSpilledPartition sp = new HJSpilledPartition();
+        sp.innerSpillFile = partn.getSpillFile();
+        sp.innerSpilledBatches = partn.getPartitionBatchesCount();
+        sp.cycleNum = cycleNum; // remember the current cycle
+        sp.origPartn = partn.getPartitionNum(); // for debugging / filename
+        sp.prevOrigPartn = originalPartition; // for debugging / filename
+        spilledPartitionsList.add(sp);
+
+        spilledInners[partn.getPartitionNum()] = sp; // for the outer to find the SP later
+        partn.closeWriter();
+      }
+    }
   }
 
   private void setupOutputContainerSchema() {
@@ -592,9 +740,18 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
   }
 
   private void allocateVectors() {
-    for (final VectorWrapper<?> v : container) {
-      v.getValueVector().allocateNew();
+    for (final VectorWrapper<?> vectorWrapper : container) {
+      ValueVector valueVector = vectorWrapper.getValueVector();
+
+      if (valueVector instanceof FixedWidthVector) {
+        ((FixedWidthVector) valueVector).allocateNew(TARGET_RECORDS_PER_BATCH);
+      } else if (valueVector instanceof VariableWidthVector) {
+        ((VariableWidthVector) valueVector).allocateNew(8 * TARGET_RECORDS_PER_BATCH, TARGET_RECORDS_PER_BATCH);
+      } else {
+        valueVector.allocateNew();
+      }
     }
+
     container.setRecordCount(0); // reset container's counter back to zero records
   }
 
@@ -615,13 +772,25 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
     conditions = popConfig.getConditions();
     this.popConfig = popConfig;
 
-    comparators = Lists.newArrayListWithExpectedSize(conditions.size());
-    // When DRILL supports Java 8, use the following instead of the for() loop
-    // conditions.forEach(cond->comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond)));
-    for (int i=0; i<conditions.size(); i++) {
-      JoinCondition cond = conditions.get(i);
-      comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond));
+    rightExpr = new ArrayList<>(conditions.size());
+    buildJoinColumns = Sets.newHashSet();
+
+    for (int i = 0; i < conditions.size(); i++) {
+      final SchemaPath rightPath = (SchemaPath) conditions.get(i).getRight();
+      final PathSegment.NameSegment nameSegment = (PathSegment.NameSegment)rightPath.getLastSegment();
+      buildJoinColumns.add(nameSegment.getPath());
+      final String refName = "build_side_" + i;
+      rightExpr.add(new NamedExpression(conditions.get(i).getRight(), new FieldReference(refName)));
     }
+
+    numPartitions = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR);
+    if ( numPartitions == 1 ) { //
+      canSpill = false;
+      logger.warn("Spilling is disabled due to configuration setting of num_partitions to 1");
+    }
+
+    numPartitions = BaseAllocator.nextPowerOfTwo(numPartitions); // in case not a power of 2
+
     this.allocator = oContext.getAllocator();
 
     final long memLimit = context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR);
@@ -639,7 +808,11 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
     partitions = new HashPartition[0];
   }
 
-  public void cleanup() {
+  /**
+   * This method is called when {@link HashJoinBatch} closes. It cleans up left over spilled files that are in the spill queue, and closes the
+   * spillSet.
+   */
+  private void cleanup() {
     if ( buildSideIsEmpty ) { return; } // not set up; nothing to clean
     if ( spillSet.getWriteBytes() > 0 ) {
       stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled
@@ -669,9 +842,33 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
     spillSet.close(); // delete the spill directory(ies)
   }
 
+  /**
+   * This creates a string that summarizes the memory usage of the operator.
+   * @return A memory dump string.
+   */
+  public String makeDebugString() {
+    final StringBuilder sb = new StringBuilder();
+
+    for (int partitionIndex = 0; partitionIndex < partitions.length; partitionIndex++) {
+      final String partitionPrefix = "Partition " + partitionIndex + ": ";
+      final HashPartition hashPartition = partitions[partitionIndex];
+      sb.append(partitionPrefix).append(hashPartition.makeDebugString()).append("\n");
+    }
+
+    return sb.toString();
+  }
+
+  /**
+   * Updates the {@link HashTable} and spilling stats after the original build side is processed.
+   *
+   * Note: this does not update all the stats. The cycleNum is updated dynamically in {@link #innerNext()} and the total bytes
+   * written is updated at close time in {@link #cleanup()}.
+   */
   private void updateStats() {
     if ( buildSideIsEmpty ) { return; } // no stats when the right side is empty
     if ( cycleNum > 0 ) { return; } // These stats are only for before processing spilled files
+
+    final HashTableStats htStats = new HashTableStats();
     long numSpilled = 0;
     HashTableStats newStats = new HashTableStats();
     // sum the stats from all the partitions
@@ -983,5 +1180,4 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
       (joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) ? ProbeState.PROJECT_RIGHT :
       ProbeState.DONE; // else we're done
   }
-
-}  // public class HashJoinBatch
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
index 55146f4..b5b7183 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
@@ -128,6 +128,12 @@ public class HashJoinHelper {
     buildInfoList.add(info);
   }
 
+  /**
+   * Takes a composite index for a key produced by {@link HashTable#probeForKey(int, int)}, and uses it to look up the
+   * index of the first original key in the original data.
+   * @param keyIndex A composite index for a key produced by {@link HashTable#probeForKey(int, int)}
+   * @return The composite index for the first added key record in the original data.
+   */
   public int getStartIndex(int keyIndex) {
     int batchIdx  = keyIndex / HashTable.BATCH_SIZE;
     int offsetIdx = keyIndex % HashTable.BATCH_SIZE;
@@ -139,6 +145,12 @@ public class HashJoinHelper {
     return sv4.get(offsetIdx);
   }
 
+  /**
+   * Takes a composite index for a key produced by {@link HashJoinHelper#getStartIndex(int)}, and returns the composite index for the
+   * next record in the list of records that match a key. The result is a composite index for a record within the original data set.
+   * @param currentIdx A composite index for a key produced by {@link HashJoinHelper#getStartIndex(int)}.
+   * @return The composite index for the next record in the list of records that match a key. The result is a composite index for a record within the original data set.
+   */
   public int getNextIndex(int currentIdx) {
     // Get to the links field of the current index to get the next index
     int batchIdx = currentIdx >>> SHIFT_SIZE;
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculator.java
similarity index 66%
copy from exec/vector/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculator.java
index 09bcdd8..f5c826a 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculator.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,19 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.vector;
+package org.apache.drill.exec.physical.impl.join;
 
-public interface FixedWidthVector extends ValueVector {
-
-  /**
-   * Allocate a new memory space for this vector.  Must be called prior to using the ValueVector.
-   *
-   * @param valueCount   Number of values in the vector.
-   */
-  void allocateNew(int valueCount);
-
-  /**
-   * Zero out the underlying buffer backing this vector.
-   */
-  void zeroVector();
+public interface HashJoinHelperSizeCalculator {
+  long calculateSize(HashJoinMemoryCalculator.PartitionStat partitionStat, double fragmentationFactor);
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculatorImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculatorImpl.java
new file mode 100644
index 0000000..728fde5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculatorImpl.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import com.google.common.base.Preconditions;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.vector.IntVector;
+
+public class HashJoinHelperSizeCalculatorImpl implements HashJoinHelperSizeCalculator {
+  public static final HashJoinHelperSizeCalculatorImpl INSTANCE = new HashJoinHelperSizeCalculatorImpl();
+
+  private HashJoinHelperSizeCalculatorImpl() {
+    // Do nothing
+  }
+
+  @Override
+  public long calculateSize(HashJoinMemoryCalculator.PartitionStat partitionStat, double fragmentationFactor) {
+    Preconditions.checkArgument(!partitionStat.isSpilled());
+
+    // Account for the size of the SV4 in a hash join helper
+    long joinHelperSize = IntVector.VALUE_WIDTH * RecordBatch.MAX_BATCH_SIZE;
+
+    // Account for the SV4 for each batch that holds links for each batch
+    for (HashJoinMemoryCalculator.BatchStat batchStat: partitionStat.getInMemoryBatches()) {
+      // Note we don't have to round up to nearest power of 2, since we allocate the exact
+      // space needed for the value vector.
+      joinHelperSize += batchStat.getNumRecords() * IntVector.VALUE_WIDTH;
+    }
+
+    // Note the BitSets of the HashJoin helper are stored on heap, so we don't account for them here.
+    // TODO move BitSets to direct memory
+
+    return RecordBatchSizer.multiplyByFactor(joinHelperSize, fragmentationFactor);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
new file mode 100644
index 0000000..f2de0fe
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.io.FileUtils;
+import org.apache.drill.exec.record.RecordBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * <p>
+ * This class is responsible for managing the memory calculations for the HashJoin operator.
+ * Since the HashJoin operator has different phases of execution, this class needs to perform
+ * different memory calculations at each phase. The phases of execution have been broken down
+ * into an explicit state machine diagram below. What ocurrs in each state is described in
+ * the documentation of the {@link HashJoinState} class below. <b>Note:</b> the transition from Probing
+ * and Partitioning back to Build Side Partitioning. This happens we had to spill probe side
+ * partitions and we needed to recursively process spilled partitions. This recursion is
+ * described in more detail in the example below.
+ * </p>
+ * <p>
+ *
+ *                                  +--------------+ <-------+
+ *                                  |  Build Side  |         |
+ *                                  |  Partitioning|         |
+ *                                  |              |         |
+ *                                  +------+-------+         |
+ *                                         |                 |
+ *                                         |                 |
+ *                                         v                 |
+ *                                  +--------------+         |
+ *                                  |Probing and   |         |
+ *                                  |Partitioning  |         |
+ *                                  |              |         |
+ *                                  +--------------+         |
+ *                                          |                |
+ *                                          +----------------+
+ *                                          |
+ *                                          v
+ *                                        Done
+ * </p>
+ * <p>
+ * An overview of how these states interact can be summarized with the following example.<br/><br/>
+ *
+ * Consider the case where we have 4 partition configured initially.<br/><br/>
+ *
+ * <ol>
+ *   <li>We first start consuming build side batches and putting their records into one of 4 build side partitions.</li>
+ *   <li>Once we run out of memory we start spilling build side partition one by one</li>
+ *   <li>We keep partitioning build side batches until all the build side batches are consumed.</li>
+ *   <li>After we have consumed the build side we prepare to probe by building hashtables for the partitions
+ *   we have in memory. If we don't have enough room for all the hashtables in memory we spill build side
+ *   partitions until we do have enough room.</li>
+ *   <li>We now start processing the probe side. For each probe record we determine its build partition. If
+ *   the build partition is in memory we do the join for the record and emit it. If the build partition is
+ *   not in memory we spill the probe record. We continue this process until all the probe side records are consumed.</li>
+ *   <li>If we didn't spill any probe side partitions because all the build side partition were in memory, our join
+ *   operation is done. If we did spill probe side partitions we have to recursively repeat this whole process for each
+ *   spilled probe and build side partition pair.</li>
+ * </ol>
+ * </p>
+*/
+public interface HashJoinMemoryCalculator extends HashJoinStateCalculator<HashJoinMemoryCalculator.BuildSidePartitioning> {
+  void initialize(boolean doMemoryCalc);
+
+  /**
+   * The interface representing the {@link HashJoinStateCalculator} corresponding to the
+   * {@link HashJoinState#BUILD_SIDE_PARTITIONING} state.
+   */
+  interface BuildSidePartitioning extends HashJoinStateCalculator<PostBuildCalculations> {
+    void initialize(boolean autoTune,
+                    boolean reserveHash,
+                    RecordBatch buildSideBatch,
+                    RecordBatch probeSideBatch,
+                    Set<String> joinColumns,
+                    long memoryAvailable,
+                    int initialPartitions,
+                    int recordsPerPartitionBatchBuild,
+                    int recordsPerPartitionBatchProbe,
+                    int maxBatchNumRecordsBuild,
+                    int maxBatchNumRecordsProbe,
+                    int outputBatchNumRecords,
+                    double loadFactor);
+
+    void setPartitionStatSet(PartitionStatSet partitionStatSet);
+
+    int getNumPartitions();
+
+    long getBuildReservedMemory();
+
+    long getMaxReservedMemory();
+
+    boolean shouldSpill();
+
+    String makeDebugString();
+  }
+
+  /**
+   * The interface representing the {@link HashJoinStateCalculator} corresponding to the
+   * {@link HashJoinState#POST_BUILD_CALCULATIONS} state.
+   */
+  interface PostBuildCalculations extends HashJoinStateCalculator<HashJoinMemoryCalculator> {
+    void initialize();
+
+    boolean shouldSpill();
+
+    String makeDebugString();
+  }
+
+  interface PartitionStat {
+    List<BatchStat> getInMemoryBatches();
+
+    int getNumInMemoryBatches();
+
+    boolean isSpilled();
+
+    long getNumInMemoryRecords();
+
+    long getInMemorySize();
+  }
+
+  /**
+   * This class represents the memory size statistics for an entire set of partitions.
+   */
+  class PartitionStatSet {
+    private static final Logger log = LoggerFactory.getLogger(PartitionStatSet.class);
+    private final PartitionStat[] partitionStats;
+
+    public PartitionStatSet(final PartitionStat... partitionStats) {
+      this.partitionStats = Preconditions.checkNotNull(partitionStats);
+
+      for (PartitionStat partitionStat: partitionStats) {
+        Preconditions.checkNotNull(partitionStat);
+      }
+    }
+
+    public PartitionStat get(int partitionIndex) {
+      return partitionStats[partitionIndex];
+    }
+
+    public int getSize() {
+      return partitionStats.length;
+    }
+
+    // Somewhat inefficient but not a big deal since we don't deal with that many partitions
+    public long getNumInMemoryRecords() {
+      long numRecords = 0L;
+
+      for (final PartitionStat partitionStat: partitionStats) {
+        numRecords += partitionStat.getNumInMemoryRecords();
+      }
+
+      return numRecords;
+    }
+
+    public int getNumInMemoryBatches() {
+      int numBatches = 0;
+
+      for (final PartitionStat partitionStat: partitionStats) {
+        numBatches += partitionStat.getNumInMemoryBatches();
+      }
+
+      return numBatches;
+    }
+
+    // Somewhat inefficient but not a big deal since we don't deal with that many partitions
+    public long getConsumedMemory() {
+      long consumedMemory = 0L;
+
+      for (final PartitionStat partitionStat: partitionStats) {
+        consumedMemory += partitionStat.getInMemorySize();
+      }
+
+      return consumedMemory;
+    }
+
+    public List<Integer> getSpilledPartitions() {
+      return getPartitions(true);
+    }
+
+    public List<Integer> getInMemoryPartitions() {
+      return getPartitions(false);
+    }
+
+    public List<Integer> getPartitions(boolean spilled) {
+      List<Integer> partitionIndices = Lists.newArrayList();
+
+      for (int partitionIndex = 0; partitionIndex < partitionStats.length; partitionIndex++) {
+        final PartitionStat partitionStat = partitionStats[partitionIndex];
+
+        if (partitionStat.isSpilled() == spilled) {
+          partitionIndices.add(partitionIndex);
+        }
+      }
+
+      return partitionIndices;
+    }
+
+    public int getNumInMemoryPartitions() {
+      return getInMemoryPartitions().size();
+    }
+
+    public int getNumSpilledPartitions() {
+      return getSpilledPartitions().size();
+    }
+
+    public boolean allSpilled() {
+      return getSize() == getNumSpilledPartitions();
+    }
+
+    public boolean noneSpilled() {
+      return getSize() == getNumInMemoryPartitions();
+    }
+
+    public String makeDebugString() {
+      final StringBuilder sizeSb = new StringBuilder("Partition Sizes:\n");
+      final StringBuilder batchCountSb = new StringBuilder("Partition Batch Counts:\n");
+      final StringBuilder recordCountSb = new StringBuilder("Partition Record Counts:\n");
+
+      for (int partitionIndex = 0; partitionIndex < partitionStats.length; partitionIndex++) {
+        final PartitionStat partitionStat = partitionStats[partitionIndex];
+        final String partitionPrefix = partitionIndex + ": ";
+
+        sizeSb.append(partitionPrefix);
+        batchCountSb.append(partitionPrefix);
+        recordCountSb.append(partitionPrefix);
+
+        if (partitionStat.isSpilled()) {
+          sizeSb.append("Spilled");
+          batchCountSb.append("Spilled");
+          recordCountSb.append("Spilled");
+        } else if (partitionStat.getNumInMemoryRecords() == 0) {
+          sizeSb.append("Empty");
+          batchCountSb.append("Empty");
+          recordCountSb.append("Empty");
+        } else {
+          sizeSb.append(prettyPrintBytes(partitionStat.getInMemorySize()));
+          batchCountSb.append(partitionStat.getNumInMemoryBatches());
+          recordCountSb.append(partitionStat.getNumInMemoryRecords());
+        }
+
+        sizeSb.append("\n");
+        batchCountSb.append("\n");
+        recordCountSb.append("\n");
+      }
+
+      return sizeSb.toString() + "\n" + batchCountSb.toString() + "\n" + recordCountSb.toString();
+    }
+
+    public static String prettyPrintBytes(long byteCount) {
+      return String.format("%d (%s)", byteCount, FileUtils.byteCountToDisplaySize(byteCount));
+    }
+  }
+
+  class BatchStat {
+    private int numRecords;
+    private long batchSize;
+
+    public BatchStat(int numRecords, long batchSize) {
+      this.numRecords = numRecords;
+      this.batchSize = batchSize;
+    }
+
+    public long getNumRecords()
+    {
+      return numRecords;
+    }
+
+    public long getBatchSize()
+    {
+      return batchSize;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      BatchStat batchStat = (BatchStat) o;
+
+      if (numRecords != batchStat.numRecords) {
+        return false;
+      }
+
+      return batchSize == batchStat.batchSize;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = numRecords;
+      result = 31 * result + (int) (batchSize ^ (batchSize >>> 32));
+      return result;
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
new file mode 100644
index 0000000..5890a42
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
@@ -0,0 +1,833 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.commons.io.FileUtils;
+import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.vector.IntVector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+import static org.apache.drill.exec.physical.impl.join.HashJoinState.INITIALIZING;
+
+public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator {
+  private static final Logger log = LoggerFactory.getLogger(HashJoinMemoryCalculatorImpl.class);
+
+  private final double safetyFactor;
+  private final double fragmentationFactor;
+  private final double hashTableDoublingFactor;
+  private final String hashTableCalculatorType;
+
+  private boolean initialized = false;
+  private boolean doMemoryCalculation;
+
+  public HashJoinMemoryCalculatorImpl(final double safetyFactor,
+                                      final double fragmentationFactor,
+                                      final double hashTableDoublingFactor,
+                                      final String hashTableCalculatorType) {
+    this.safetyFactor = safetyFactor;
+    this.fragmentationFactor = fragmentationFactor;
+    this.hashTableDoublingFactor = hashTableDoublingFactor;
+    this.hashTableCalculatorType = hashTableCalculatorType;
+  }
+
+  public void initialize(boolean doMemoryCalculation) {
+    Preconditions.checkState(!initialized);
+    initialized = true;
+    this.doMemoryCalculation = doMemoryCalculation;
+  }
+
+  public BuildSidePartitioning next() {
+    Preconditions.checkState(initialized);
+
+    if (doMemoryCalculation) {
+      final HashTableSizeCalculator hashTableSizeCalculator;
+
+      if (hashTableCalculatorType.equals(HashTableSizeCalculatorLeanImpl.TYPE)) {
+        hashTableSizeCalculator = new HashTableSizeCalculatorLeanImpl(RecordBatch.MAX_BATCH_SIZE, hashTableDoublingFactor);
+      } else if (hashTableCalculatorType.equals(HashTableSizeCalculatorConservativeImpl.TYPE)) {
+        hashTableSizeCalculator = new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_SIZE, hashTableDoublingFactor);
+      } else {
+        throw new IllegalArgumentException("Invalid calc type: " + hashTableCalculatorType);
+      }
+
+      return new BuildSidePartitioningImpl(hashTableSizeCalculator,
+        HashJoinHelperSizeCalculatorImpl.INSTANCE,
+        fragmentationFactor, safetyFactor);
+    } else {
+      return new NoopBuildSidePartitioningImpl();
+    }
+  }
+
+  @Override
+  public HashJoinState getState() {
+    return INITIALIZING;
+  }
+
+  public static long computeMaxBatchSizeNoHash(final long incomingBatchSize,
+                                         final int incomingNumRecords,
+                                         final int desiredNumRecords,
+                                         final double fragmentationFactor,
+                                         final double safetyFactor) {
+    long maxBatchSize = HashJoinMemoryCalculatorImpl
+      .computePartitionBatchSize(incomingBatchSize, incomingNumRecords, desiredNumRecords);
+    // Multiple by fragmentation factor
+    return RecordBatchSizer.multiplyByFactors(maxBatchSize, fragmentationFactor, safetyFactor);
+  }
+
+  public static long computeMaxBatchSize(final long incomingBatchSize,
+                                         final int incomingNumRecords,
+                                         final int desiredNumRecords,
+                                         final double fragmentationFactor,
+                                         final double safetyFactor,
+                                         final boolean reserveHash) {
+    long size = computeMaxBatchSizeNoHash(incomingBatchSize,
+      incomingNumRecords,
+      desiredNumRecords,
+      fragmentationFactor,
+      safetyFactor);
+
+    if (!reserveHash) {
+      return size;
+    }
+
+    long hashSize = desiredNumRecords * ((long) IntVector.VALUE_WIDTH);
+    hashSize = RecordBatchSizer.multiplyByFactors(hashSize, fragmentationFactor);
+
+    return size + hashSize;
+  }
+
+  public static long computePartitionBatchSize(final long incomingBatchSize,
+                                               final int incomingNumRecords,
+                                               final int desiredNumRecords) {
+    return (long) Math.ceil((((double) incomingBatchSize) /
+      ((double) incomingNumRecords)) *
+      ((double) desiredNumRecords));
+  }
+
+  public static class NoopBuildSidePartitioningImpl implements BuildSidePartitioning {
+    private int initialPartitions;
+
+    @Override
+    public void initialize(boolean autoTune,
+                           boolean reserveHash,
+                           RecordBatch buildSideBatch,
+                           RecordBatch probeSideBatch, Set<String> joinColumns,
+                           long memoryAvailable,
+                           int initialPartitions,
+                           int recordsPerPartitionBatchBuild,
+                           int recordsPerPartitionBatchProbe,
+                           int maxBatchNumRecordsBuild,
+                           int maxBatchNumRecordsProbe,
+                           int outputBatchNumRecords,
+                           double loadFactor) {
+      this.initialPartitions = initialPartitions;
+    }
+
+    @Override
+    public void setPartitionStatSet(PartitionStatSet partitionStatSet) {
+      // Do nothing
+    }
+
+    @Override
+    public int getNumPartitions() {
+      return initialPartitions;
+    }
+
+    @Override
+    public long getBuildReservedMemory() {
+      return 0;
+    }
+
+    @Override
+    public long getMaxReservedMemory() {
+      return 0;
+    }
+
+    @Override
+    public boolean shouldSpill() {
+      return false;
+    }
+
+    @Override
+    public String makeDebugString() {
+      return "No debugging for " + NoopBuildSidePartitioningImpl.class.getCanonicalName();
+    }
+
+    @Nullable
+    @Override
+    public PostBuildCalculations next() {
+      return new NoopPostBuildCalculationsImpl();
+    }
+
+    @Override
+    public HashJoinState getState() {
+      return HashJoinState.BUILD_SIDE_PARTITIONING;
+    }
+  }
+
+  /**
+   * <h1>Basic Functionality</h1>
+   * <p>
+   * At this point we need to reserve memory for the following:
+   * <ol>
+   *   <li>An incoming batch</li>
+   *   <li>An incomplete batch for each partition</li>
+   * </ol>
+   * If there is available memory we keep the batches for each partition in memory.
+   * If we run out of room and need to start spilling, we need to specify which partitions
+   * need to be spilled.
+   * </p>
+   * <h1>Life Cycle</h1>
+   * <p>
+   *   <ul>
+   *     <li><b>Step 0:</b> Call {@link #initialize(boolean, boolean, RecordBatch, RecordBatch, Set, long, int, int, int, int, int, int, double)}.
+   *     This will initialize the StateCalculate with the additional information it needs.</li>
+   *     <li><b>Step 1:</b> Call {@link #getNumPartitions()} to see the number of partitions that fit in memory.</li>
+   *     <li><b>Step 2:</b> Call {@link #shouldSpill()} To determine if spilling needs to occurr.</li>
+   *     <li><b>Step 3:</b> Call {@link #next()} and get the next memory calculator associated with your next state.</li>
+   *   </ul>
+   * </p>
+   */
+  public static class BuildSidePartitioningImpl implements BuildSidePartitioning {
+    public static final Logger log = LoggerFactory.getLogger(BuildSidePartitioning.class);
+
+    private final HashTableSizeCalculator hashTableSizeCalculator;
+    private final HashJoinHelperSizeCalculator hashJoinHelperSizeCalculator;
+    private final double fragmentationFactor;
+    private final double safetyFactor;
+
+    private int maxBatchNumRecordsBuild;
+    private int maxBatchNumRecordsProbe;
+    private long memoryAvailable;
+    private long buildBatchSize;
+    private long probeBatchSize;
+    private int buildNumRecords;
+    private int probeNumRecords;
+    private long maxBuildBatchSize;
+    private long maxProbeBatchSize;
+    private long maxOutputBatchSize;
+    private int initialPartitions;
+    private int partitions;
+    private int recordsPerPartitionBatchBuild;
+    private int recordsPerPartitionBatchProbe;
+    private int outputBatchNumRecords;
+    private Map<String, Long> buildValueSizes;
+    private Map<String, Long> probeValueSizes;
+    private Map<String, Long> keySizes;
+    private boolean autoTune;
+    private boolean reserveHash;
+    private double loadFactor;
+
+    private PartitionStatSet partitionStatsSet;
+    private long partitionBuildBatchSize;
+    private long partitionProbeBatchSize;
+    private long reservedMemory;
+    private long maxReservedMemory;
+
+    private boolean firstInitialized;
+    private boolean initialized;
+
+    public BuildSidePartitioningImpl(final HashTableSizeCalculator hashTableSizeCalculator,
+                                     final HashJoinHelperSizeCalculator hashJoinHelperSizeCalculator,
+                                     final double fragmentationFactor,
+                                     final double safetyFactor) {
+      this.hashTableSizeCalculator = Preconditions.checkNotNull(hashTableSizeCalculator);
+      this.hashJoinHelperSizeCalculator = Preconditions.checkNotNull(hashJoinHelperSizeCalculator);
+      this.fragmentationFactor = fragmentationFactor;
+      this.safetyFactor = safetyFactor;
+    }
+
+    @Override
+    public void initialize(boolean autoTune,
+                           boolean reserveHash,
+                           RecordBatch buildSideBatch,
+                           RecordBatch probeSideBatch,
+                           Set<String> joinColumns,
+                           long memoryAvailable,
+                           int initialPartitions,
+                           int recordsPerPartitionBatchBuild,
+                           int recordsPerPartitionBatchProbe,
+                           int maxBatchNumRecordsBuild,
+                           int maxBatchNumRecordsProbe,
+                           int outputBatchNumRecords,
+                           double loadFactor) {
+      Preconditions.checkNotNull(buildSideBatch);
+      Preconditions.checkNotNull(probeSideBatch);
+      Preconditions.checkNotNull(joinColumns);
+
+      final RecordBatchSizer buildSizer = new RecordBatchSizer(buildSideBatch);
+      final RecordBatchSizer probeSizer = new RecordBatchSizer(probeSideBatch);
+
+      long buildBatchSize = getBatchSizeEstimate(buildSideBatch);
+      long probeBatchSize = getBatchSizeEstimate(probeSideBatch);
+
+      int buildNumRecords = buildSizer.rowCount();
+      int probeNumRecords = probeSizer.rowCount();
+
+      final CaseInsensitiveMap<Long> buildValueSizes = getNotExcludedColumnSizes(
+        joinColumns, buildSizer);
+      final CaseInsensitiveMap<Long> probeValueSizes = getNotExcludedColumnSizes(
+        joinColumns, probeSizer);
+      final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap();
+
+      for (String joinColumn: joinColumns) {
+        final RecordBatchSizer.ColumnSize columnSize = buildSizer.columns().get(joinColumn);
+        keySizes.put(joinColumn, (long)columnSize.getStdNetOrNetSizePerEntry());
+      }
+
+      initialize(autoTune,
+        reserveHash,
+        buildValueSizes,
+        probeValueSizes,
+        keySizes,
+        memoryAvailable,
+        initialPartitions,
+        buildBatchSize,
+        probeBatchSize,
+        buildNumRecords,
+        probeNumRecords,
+        recordsPerPartitionBatchBuild,
+        recordsPerPartitionBatchProbe,
+        maxBatchNumRecordsBuild,
+        maxBatchNumRecordsProbe,
+        outputBatchNumRecords,
+        loadFactor);
+    }
+
+    @VisibleForTesting
+    protected static CaseInsensitiveMap<Long> getNotExcludedColumnSizes(
+        final Set<String> excludedColumns,
+        final RecordBatchSizer batchSizer) {
+      final CaseInsensitiveMap<Long> columnSizes = CaseInsensitiveMap.newHashMap();
+      final CaseInsensitiveMap<Boolean> excludedSet = CaseInsensitiveMap.newHashMap();
+
+      for (final String excludedColumn: excludedColumns) {
+        excludedSet.put(excludedColumn, true);
+      }
+
+      for (final Map.Entry<String, RecordBatchSizer.ColumnSize> entry: batchSizer.columns().entrySet()) {
+        final String columnName = entry.getKey();
+        final RecordBatchSizer.ColumnSize columnSize = entry.getValue();
+
+        columnSizes.put(columnName, (long) columnSize.getStdNetOrNetSizePerEntry());
+      }
+
+      return columnSizes;
+    }
+
+    public static long getBatchSizeEstimate(final RecordBatch recordBatch) {
+      final RecordBatchSizer sizer = new RecordBatchSizer(recordBatch);
+      long size = 0L;
+
+      for (Map.Entry<String, RecordBatchSizer.ColumnSize> column: sizer.columns().entrySet()) {
+        size += PostBuildCalculationsImpl.computeValueVectorSize(recordBatch.getRecordCount(), column.getValue().getStdNetOrNetSizePerEntry());
+      }
+
+      return size;
+    }
+
+    @VisibleForTesting
+    protected void initialize(boolean autoTune,
+                              boolean reserveHash,
+                              CaseInsensitiveMap<Long> buildValueSizes,
+                              CaseInsensitiveMap<Long> probeValueSizes,
+                              CaseInsensitiveMap<Long> keySizes,
+                              long memoryAvailable,
+                              int initialPartitions,
+                              long buildBatchSize,
+                              long probeBatchSize,
+                              int buildNumRecords,
+                              int probeNumRecords,
+                              int recordsPerPartitionBatchBuild,
+                              int recordsPerPartitionBatchProbe,
+                              int maxBatchNumRecordsBuild,
+                              int maxBatchNumRecordsProbe,
+                              int outputBatchNumRecords,
+                              double loadFactor) {
+      Preconditions.checkState(!firstInitialized);
+      Preconditions.checkArgument(initialPartitions >= 1);
+      firstInitialized = true;
+
+      this.loadFactor = loadFactor;
+      this.autoTune = autoTune;
+      this.reserveHash = reserveHash;
+      this.buildValueSizes = Preconditions.checkNotNull(buildValueSizes);
+      this.probeValueSizes = Preconditions.checkNotNull(probeValueSizes);
+      this.keySizes = Preconditions.checkNotNull(keySizes);
+      this.memoryAvailable = memoryAvailable;
+      this.buildBatchSize = buildBatchSize;
+      this.probeBatchSize = probeBatchSize;
+      this.buildNumRecords = buildNumRecords;
+      this.probeNumRecords = probeNumRecords;
+      this.initialPartitions = initialPartitions;
+      this.recordsPerPartitionBatchBuild = recordsPerPartitionBatchBuild;
+      this.recordsPerPartitionBatchProbe = recordsPerPartitionBatchProbe;
+      this.maxBatchNumRecordsBuild = maxBatchNumRecordsBuild;
+      this.maxBatchNumRecordsProbe = maxBatchNumRecordsProbe;
+      this.outputBatchNumRecords = outputBatchNumRecords;
+
+      calculateMemoryUsage();
+
+      log.debug("Creating {} partitions when {} initial partitions configured.", partitions, initialPartitions);
+    }
+
+    @Override
+    public void setPartitionStatSet(final PartitionStatSet partitionStatSet) {
+      Preconditions.checkState(!initialized);
+      initialized = true;
+
+      partitionStatsSet = Preconditions.checkNotNull(partitionStatSet);
+    }
+
+    @Override
+    public int getNumPartitions() {
+      return partitions;
+    }
+
+    @Override
+    public long getBuildReservedMemory() {
+      Preconditions.checkState(firstInitialized);
+      return reservedMemory;
+    }
+
+    @Override
+    public long getMaxReservedMemory() {
+      Preconditions.checkState(firstInitialized);
+      return maxReservedMemory;
+    }
+
+    /**
+     * This method calculates the amount of memory we need to reserve while partitioning. It also
+     * calculates the size of a partition batch.
+     */
+    private void calculateMemoryUsage()
+    {
+      // Adjust based on number of records
+      maxBuildBatchSize = computeMaxBatchSizeNoHash(buildBatchSize, buildNumRecords,
+        maxBatchNumRecordsBuild, fragmentationFactor, safetyFactor);
+      maxProbeBatchSize = computeMaxBatchSizeNoHash(probeBatchSize, probeNumRecords,
+        maxBatchNumRecordsProbe, fragmentationFactor, safetyFactor);
+
+      // Safety factor can be multiplied at the end since these batches are coming from exchange operators, so no excess value vector doubling
+      partitionBuildBatchSize = computeMaxBatchSize(buildBatchSize,
+        buildNumRecords,
+        recordsPerPartitionBatchBuild,
+        fragmentationFactor,
+        safetyFactor,
+        reserveHash);
+
+      // Safety factor can be multiplied at the end since these batches are coming from exchange operators, so no excess value vector doubling
+      partitionProbeBatchSize = computeMaxBatchSize(
+        probeBatchSize,
+        probeNumRecords,
+        recordsPerPartitionBatchProbe,
+        fragmentationFactor,
+        safetyFactor,
+        reserveHash);
+
+      maxOutputBatchSize = computeMaxOutputBatchSize(buildValueSizes, probeValueSizes, keySizes,
+        outputBatchNumRecords, safetyFactor, fragmentationFactor);
+
+      long probeReservedMemory;
+
+      for (partitions = initialPartitions;; partitions /= 2) {
+        // The total amount of memory to reserve for incomplete batches across all partitions
+        long incompletePartitionsBatchSizes = ((long) partitions) * partitionBuildBatchSize;
+        // We need to reserve all the space for incomplete batches, and the incoming batch as well as the
+        // probe batch we sniffed.
+        // TODO when batch sizing project is complete we won't have to sniff probe batches since
+        // they will have a well defined size.
+        reservedMemory = incompletePartitionsBatchSizes + maxBuildBatchSize + maxProbeBatchSize;
+
+        probeReservedMemory = PostBuildCalculationsImpl.calculateReservedMemory(
+          partitions,
+          maxProbeBatchSize,
+          maxOutputBatchSize,
+          partitionProbeBatchSize);
+
+        maxReservedMemory = Math.max(reservedMemory, probeReservedMemory);
+
+        if (!autoTune || maxReservedMemory <= memoryAvailable) {
+          // Stop the tuning loop if we are not doing auto tuning, or if we are living within our memory limit
+          break;
+        }
+
+        if (partitions == 2) {
+          // Can't have fewer than 2 partitions
+          break;
+        }
+      }
+
+      if (maxReservedMemory > memoryAvailable) {
+        // We don't have enough memory we need to fail or warn
+
+        String message = String.format("HashJoin needs to reserve %d bytes of memory but there are " +
+          "only %d bytes available. Using %d num partitions with %d initial partitions. Additional info:\n" +
+          "buildBatchSize = %d\n" +
+          "buildNumRecords = %d\n" +
+          "partitionBuildBatchSize = %d\n" +
+          "recordsPerPartitionBatchBuild = %d\n" +
+          "probeBatchSize = %d\n" +
+          "probeNumRecords = %d\n" +
+          "partitionProbeBatchSize = %d\n" +
+          "recordsPerPartitionBatchProbe = %d\n",
+          reservedMemory, memoryAvailable, partitions, initialPartitions,
+          buildBatchSize,
+          buildNumRecords,
+          partitionBuildBatchSize,
+          recordsPerPartitionBatchBuild,
+          probeBatchSize,
+          probeNumRecords,
+          partitionProbeBatchSize,
+          recordsPerPartitionBatchProbe);
+
+        String phase = "Probe phase: ";
+
+        if (reservedMemory > memoryAvailable) {
+          if (probeReservedMemory > memoryAvailable) {
+            phase = "Build and Probe phases: ";
+          } else {
+            phase = "Build phase: ";
+          }
+        }
+
+        message = phase + message;
+        log.warn(message);
+      }
+    }
+
+    public static long computeMaxOutputBatchSize(Map<String, Long> buildValueSizes,
+                                                 Map<String, Long> probeValueSizes,
+                                                 Map<String, Long> keySizes,
+                                                 int outputNumRecords,
+                                                 double safetyFactor,
+                                                 double fragmentationFactor) {
+      long outputSize = HashTableSizeCalculatorConservativeImpl.computeVectorSizes(keySizes, outputNumRecords, safetyFactor)
+        + HashTableSizeCalculatorConservativeImpl.computeVectorSizes(buildValueSizes, outputNumRecords, safetyFactor)
+        + HashTableSizeCalculatorConservativeImpl.computeVectorSizes(probeValueSizes, outputNumRecords, safetyFactor);
+      return RecordBatchSizer.multiplyByFactor(outputSize, fragmentationFactor);
+    }
+
+    @Override
+    public boolean shouldSpill() {
+      Preconditions.checkState(initialized);
+
+      long consumedMemory = reservedMemory;
+
+      if (reserveHash) {
+        // Include the hash sizes for the batch
+        consumedMemory += ((long) IntVector.VALUE_WIDTH) * partitionStatsSet.getNumInMemoryRecords();
+      }
+
+      consumedMemory += RecordBatchSizer.multiplyByFactor(partitionStatsSet.getConsumedMemory(), fragmentationFactor);
+      return consumedMemory > memoryAvailable;
+    }
+
+    @Override
+    public PostBuildCalculations next() {
+      Preconditions.checkState(initialized);
+
+      return new PostBuildCalculationsImpl(memoryAvailable,
+        partitionProbeBatchSize,
+        maxProbeBatchSize,
+        maxOutputBatchSize,
+        partitionStatsSet,
+        keySizes,
+        hashTableSizeCalculator,
+        hashJoinHelperSizeCalculator,
+        fragmentationFactor,
+        safetyFactor,
+        loadFactor,
+        reserveHash);
+    }
+
+    @Override
+    public HashJoinState getState() {
+      return HashJoinState.BUILD_SIDE_PARTITIONING;
+    }
+
+    @Override
+    public String makeDebugString() {
+      final String calcVars = String.format(
+        "Build side calculator vars:\n" +
+        "memoryAvailable = %s\n" +
+        "maxBuildBatchSize = %s\n" +
+        "maxOutputBatchSize = %s\n",
+        PartitionStatSet.prettyPrintBytes(memoryAvailable),
+        PartitionStatSet.prettyPrintBytes(maxBuildBatchSize),
+        PartitionStatSet.prettyPrintBytes(maxOutputBatchSize));
+
+      String partitionStatDebugString = "";
+
+      if (partitionStatsSet != null) {
+        partitionStatDebugString = partitionStatsSet.makeDebugString();
+      }
+
+      return calcVars + "\n" + partitionStatDebugString;
+    }
+  }
+
+  public static class NoopPostBuildCalculationsImpl implements PostBuildCalculations {
+    @Override
+    public void initialize() {
+
+    }
+
+    @Override
+    public boolean shouldSpill() {
+      return false;
+    }
+
+    @Nullable
+    @Override
+    public HashJoinMemoryCalculator next() {
+      return null;
+    }
+
+    @Override
+    public HashJoinState getState() {
+      return HashJoinState.POST_BUILD_CALCULATIONS;
+    }
+
+    @Override
+    public String makeDebugString() {
+      return "Noop " + NoopPostBuildCalculationsImpl.class.getCanonicalName() + " calculator.";
+    }
+  }
+
+  /**
+   * <h1>Basic Functionality</h1>
+   * <p>
+   *   In this state, we need to make sure there is enough room to spill probe side batches, if
+   *   spilling is necessary. If there is not enough room, we have to evict build side partitions.
+   *   If we don't have to evict build side partitions in this state, then we are done. If we do have
+   *   to evict build side partitions then we have to recursively repeat the process.
+   * </p>
+   * <h1>Lifecycle</h1>
+   * <p>
+   *   <ul>
+   *     <li><b>Step 1:</b> Call {@link #initialize()}. This
+   *     gives the {@link HashJoinStateCalculator} additional information it needs to compute memory requirements.</li>
+   *     <li><b>Step 2:</b> Call {@link #shouldSpill()}. This tells
+   *     you which build side partitions need to be spilled in order to make room for probing.</li>
+   *     <li><b>Step 3:</b> Call {@link #next()}. After you are done probing
+   *     and partitioning the probe side, get the next calculator.</li>
+   *   </ul>
+   * </p>
+   */
+  public static class PostBuildCalculationsImpl implements PostBuildCalculations {
+    private final long memoryAvailable;
+    private final long partitionProbeBatchSize;
+    private final long maxProbeBatchSize;
+    private final long maxOutputBatchSize;
+    private final PartitionStatSet buildPartitionStatSet;
+    private final Map<String, Long> keySizes;
+    private final HashTableSizeCalculator hashTableSizeCalculator;
+    private final HashJoinHelperSizeCalculator hashJoinHelperSizeCalculator;
+    private final double fragmentationFactor;
+    private final double safetyFactor;
+    private final double loadFactor;
+    private final boolean reserveHash;
+    // private final long maxOutputBatchSize;
+
+    private boolean initialized;
+    private long consumedMemory;
+
+    public PostBuildCalculationsImpl(final long memoryAvailable,
+                                     final long partitionProbeBatchSize,
+                                     final long maxProbeBatchSize,
+                                     final long maxOutputBatchSize,
+                                     final PartitionStatSet buildPartitionStatSet,
+                                     final Map<String, Long> keySizes,
+                                     final HashTableSizeCalculator hashTableSizeCalculator,
+                                     final HashJoinHelperSizeCalculator hashJoinHelperSizeCalculator,
+                                     final double fragmentationFactor,
+                                     final double safetyFactor,
+                                     final double loadFactor,
+                                     final boolean reserveHash) {
+      this.memoryAvailable = memoryAvailable;
+      this.partitionProbeBatchSize = partitionProbeBatchSize;
+      this.maxProbeBatchSize = maxProbeBatchSize;
+      this.maxOutputBatchSize = maxOutputBatchSize;
+      this.buildPartitionStatSet = Preconditions.checkNotNull(buildPartitionStatSet);
+      this.keySizes = Preconditions.checkNotNull(keySizes);
+      this.hashTableSizeCalculator = Preconditions.checkNotNull(hashTableSizeCalculator);
+      this.hashJoinHelperSizeCalculator = Preconditions.checkNotNull(hashJoinHelperSizeCalculator);
+      this.fragmentationFactor = fragmentationFactor;
+      this.safetyFactor = safetyFactor;
+      this.loadFactor = loadFactor;
+      this.reserveHash = reserveHash;
+    }
+
+    // TODO take an incoming Probe RecordBatch
+    @Override
+    public void initialize() {
+      Preconditions.checkState(!initialized);
+      initialized = true;
+    }
+
+    public long getConsumedMemory() {
+      Preconditions.checkState(initialized);
+      return consumedMemory;
+    }
+
+    // TODO move this somewhere else that makes sense
+    public static long computeValueVectorSize(long numRecords, long byteSize)
+    {
+      long naiveSize = numRecords * byteSize;
+      return roundUpToPowerOf2(naiveSize);
+    }
+
+    public static long computeValueVectorSize(long numRecords, long byteSize, double safetyFactor)
+    {
+      long naiveSize = RecordBatchSizer.multiplyByFactor(numRecords * byteSize, safetyFactor);
+      return roundUpToPowerOf2(naiveSize);
+    }
+
+    // TODO move to drill common
+    public static long roundUpToPowerOf2(long num)
+    {
+      Preconditions.checkArgument(num >= 1);
+      return num == 1 ? 1 : Long.highestOneBit(num - 1) << 1;
+    }
+
+    public static long calculateReservedMemory(final int numSpilledPartitions,
+                                               final long maxProbeBatchSize,
+                                               final long maxOutputBatchSize,
+                                               final long partitionProbeBatchSize) {
+      // We need to have enough space for the incoming batch, as well as a batch for each probe side
+      // partition that is being spilled. And enough space for the output batch
+      return maxProbeBatchSize
+        + maxOutputBatchSize
+        + partitionProbeBatchSize * numSpilledPartitions;
+    }
+
+    @Override
+    public boolean shouldSpill() {
+      Preconditions.checkState(initialized);
+
+      long reservedMemory = calculateReservedMemory(
+        buildPartitionStatSet.getNumSpilledPartitions(),
+        maxProbeBatchSize,
+        maxOutputBatchSize,
+        partitionProbeBatchSize);
+
+      // We are consuming our reserved memory plus the amount of memory for each build side
+      // batch and the size of the hashtables and the size of the join helpers
+      consumedMemory = reservedMemory + RecordBatchSizer.multiplyByFactor(buildPartitionStatSet.getConsumedMemory(), fragmentationFactor);
+
+      // Handle early completion conditions
+      if (buildPartitionStatSet.allSpilled()) {
+        // All build side partitions are spilled so our memory calculation is complete
+        return false;
+      }
+
+      for (int partitionIndex: buildPartitionStatSet.getInMemoryPartitions()) {
+        final PartitionStat partitionStat = buildPartitionStatSet.get(partitionIndex);
+
+        if (partitionStat.getNumInMemoryRecords() == 0) {
+          // TODO Hash hoin still allocates empty hash tables and hash join helpers. We should fix hash join
+          // not to allocate empty tables and helpers.
+          continue;
+        }
+
+        long hashTableSize = hashTableSizeCalculator.calculateSize(partitionStat, keySizes, loadFactor, safetyFactor, fragmentationFactor);
+        long hashJoinHelperSize = hashJoinHelperSizeCalculator.calculateSize(partitionStat, fragmentationFactor);
+
+        consumedMemory += hashTableSize + hashJoinHelperSize;
+      }
+
+      return consumedMemory > memoryAvailable;
+    }
+
+    @Nullable
+    @Override
+    public HashJoinMemoryCalculator next() {
+      Preconditions.checkState(initialized);
+
+      if (buildPartitionStatSet.noneSpilled()) {
+        // If none of our partitions were spilled then we were able to probe everything and we are done
+        return null;
+      }
+
+      // Some of our probe side batches were spilled so we have to recursively process the partitions.
+      return new HashJoinMemoryCalculatorImpl(
+        safetyFactor, fragmentationFactor, hashTableSizeCalculator.getDoublingFactor(), hashTableSizeCalculator.getType());
+    }
+
+    @Override
+    public HashJoinState getState() {
+      return HashJoinState.POST_BUILD_CALCULATIONS;
+    }
+
+    @Override
+    public String makeDebugString() {
+      Preconditions.checkState(initialized);
+
+      String calcVars = String.format(
+        "Mem calc stats:\n" +
+        "memoryLimit = %s\n" +
+        "consumedMemory = %s\n" +
+        "maxProbeBatchSize = %s\n" +
+        "maxOutputBatchSize = %s\n",
+        PartitionStatSet.prettyPrintBytes(memoryAvailable),
+        PartitionStatSet.prettyPrintBytes(consumedMemory),
+        PartitionStatSet.prettyPrintBytes(maxProbeBatchSize),
+        PartitionStatSet.prettyPrintBytes(maxOutputBatchSize));
+
+      StringBuilder hashJoinHelperSb = new StringBuilder("Partition Hash Join Helpers\n");
+      StringBuilder hashTableSb = new StringBuilder("Partition Hash Tables\n");
+
+      for (int partitionIndex: buildPartitionStatSet.getInMemoryPartitions()) {
+        final PartitionStat partitionStat = buildPartitionStatSet.get(partitionIndex);
+        final String partitionPrefix = partitionIndex + ": ";
+
+        hashJoinHelperSb.append(partitionPrefix);
+        hashTableSb.append(partitionPrefix);
+
+        if (partitionStat.getNumInMemoryBatches() == 0) {
+          hashJoinHelperSb.append("Empty");
+          hashTableSb.append("Empty");
+        } else {
+          long hashJoinHelperSize = hashJoinHelperSizeCalculator.calculateSize(partitionStat, fragmentationFactor);
+          long hashTableSize = hashTableSizeCalculator.calculateSize(partitionStat, keySizes, loadFactor, safetyFactor, fragmentationFactor);
+
+          hashJoinHelperSb.append(PartitionStatSet.prettyPrintBytes(hashJoinHelperSize));
+          hashTableSb.append(PartitionStatSet.prettyPrintBytes(hashTableSize));
+        }
+
+        hashJoinHelperSb.append("\n");
+        hashTableSb.append("\n");
+      }
+
+      return calcVars
+        + "\n" + buildPartitionStatSet.makeDebugString()
+        + "\n" + hashJoinHelperSb.toString()
+        + "\n" + hashTableSb.toString();
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinState.java
new file mode 100644
index 0000000..33f22be
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinState.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+public enum HashJoinState {
+  INITIALIZING,
+  /**
+   * In this state, the build side of the join operation is partitioned. Each partition is
+   * kept in memory. If we are able to fit all the partitions in memory and we have completely
+   * consumed the build side then we move to the {@link HashJoinState#POST_BUILD_CALCULATIONS}. If we
+   * run out of memory and we still have not consumed all of the build side, we start evicting
+   * partitions from memory to free memory. Then resume processing the build side. We repeat
+   * this process until the entire build side is consumed. After the build side is consumed we
+   * proceed to the {@link HashJoinState#POST_BUILD_CALCULATIONS} state.
+   */
+  BUILD_SIDE_PARTITIONING,
+  /**
+   * In this state, the probe side is consumed. If data in the probe side matches a build side partition
+   * kept in memory, it is joined and sent out. If data in the probe side does not match a build side
+   * partition, then it is spilled to disk. After all the probe side data is consumed processing moves
+   * on to the {@link HashJoinState#POST_BUILD_CALCULATIONS} state if build side partitions are small enough
+   * to fit into memory. If build side partitions can't fit into memory processing moves to the
+   * {@link HashJoinState#POST_BUILD_CALCULATIONS}
+   * state.
+   */
+  POST_BUILD_CALCULATIONS
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinStateCalculator.java
similarity index 53%
copy from exec/vector/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinStateCalculator.java
index 09bcdd8..695a68c 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinStateCalculator.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,19 +15,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.vector;
+package org.apache.drill.exec.physical.impl.join;
 
-public interface FixedWidthVector extends ValueVector {
+import javax.annotation.Nullable;
 
+/**
+ * A {@link HashJoinStateCalculator} is a piece of code that compute the memory requirements for one of the states
+ * in the {@link HashJoinState} enum.
+ */
+public interface HashJoinStateCalculator<T extends HashJoinStateCalculator> {
   /**
-   * Allocate a new memory space for this vector.  Must be called prior to using the ValueVector.
-   *
-   * @param valueCount   Number of values in the vector.
+   * Signifies that the current state is complete and returns the next {@link HashJoinStateCalculator}.
+   * Returns null in the case where there is no next state.
+   * @return The next {@link HashJoinStateCalculator} or null if this was the last state.
    */
-  void allocateNew(int valueCount);
+  @Nullable
+  T next();
 
   /**
-   * Zero out the underlying buffer backing this vector.
+   * The current {@link HashJoinState} corresponding to this calculator.
+   * @return
    */
-  void zeroVector();
+  HashJoinState getState();
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/SubOperatorTest.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculator.java
similarity index 65%
copy from exec/java-exec/src/test/java/org/apache/drill/test/SubOperatorTest.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculator.java
index 6bc2afc..04d15ff 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/SubOperatorTest.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculator.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,22 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.test;
+package org.apache.drill.exec.physical.impl.join;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import java.util.Map;
 
-public class SubOperatorTest extends DrillTest {
+public interface HashTableSizeCalculator {
+  long calculateSize(HashJoinMemoryCalculator.PartitionStat partitionStat,
+                     Map<String, Long> keySizes,
+                     double loadFactor,
+                     double safetyFactor,
+                     double fragmentationFactor);
 
-  protected static OperatorFixture fixture;
+  double getDoublingFactor();
 
-  @BeforeClass
-  public static void classSetup() throws Exception {
-    fixture = OperatorFixture.standardFixture();
-  }
-
-  @AfterClass
-  public static void classTeardown() throws Exception {
-    fixture.close();
-  }
+  String getType();
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorConservativeImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorConservativeImpl.java
new file mode 100644
index 0000000..05354d5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorConservativeImpl.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import com.google.common.base.Preconditions;
+import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.vector.IntVector;
+
+import java.util.Map;
+
+import static org.apache.drill.exec.physical.impl.join.HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl.computeValueVectorSize;
+
+public class HashTableSizeCalculatorConservativeImpl implements HashTableSizeCalculator {
+  public static final String TYPE = "CONSERVATIVE";
+  public static final double HASHTABLE_DOUBLING_FACTOR = 2.0;
+  private final int maxNumRecords;
+  private final double hashTableDoublingFactor;
+
+  public HashTableSizeCalculatorConservativeImpl(int maxNumRecords, double hashTableDoublingFactor) {
+    this.maxNumRecords = maxNumRecords;
+    this.hashTableDoublingFactor = hashTableDoublingFactor;
+  }
+
+  @Override
+  public long calculateSize(final HashJoinMemoryCalculator.PartitionStat partitionStat,
+                            final Map<String, Long> keySizes,
+                            final double loadFactor,
+                            final double safetyFactor,
+                            final double fragmentationFactor) {
+    Preconditions.checkArgument(!keySizes.isEmpty());
+    Preconditions.checkArgument(!partitionStat.isSpilled());
+    Preconditions.checkArgument(partitionStat.getNumInMemoryRecords() > 0);
+
+    // The number of entries in the global index array. Note this array may not be completed populated and the degree of population depends on the load factor.
+    long numBuckets = (long) (((double) partitionStat.getNumInMemoryRecords()) * (1.0 / loadFactor));
+    // The number of pairs in the hash table
+    long numEntries = partitionStat.getNumInMemoryRecords();
+    // The number of Batch Holders in the hash table. Note that entries are tightly packed in the Batch Holders (no gaps).
+    long numBatchHolders = (numEntries + maxNumRecords - 1) / maxNumRecords;
+
+    // Include the size of the buckets array
+    long hashTableSize = RecordBatchSizer.multiplyByFactors(computeValueVectorSize(numBuckets, IntVector.VALUE_WIDTH), hashTableDoublingFactor);
+    // Each Batch Holder has an int vector of max size for holding links and hash values
+    hashTableSize += numBatchHolders * 2L * IntVector.VALUE_WIDTH * ((long) maxNumRecords);
+
+    long numFullBatchHolders = numEntries % maxNumRecords == 0? numBatchHolders: numBatchHolders - 1;
+    // Compute the size of the value vectors holding keys in each full bucket
+    hashTableSize += numFullBatchHolders * computeVectorSizes(keySizes, maxNumRecords, safetyFactor);
+
+    if (numFullBatchHolders != numBatchHolders) {
+      // The last bucket is a partial bucket
+      long partialNumEntries = numEntries % maxNumRecords;
+      final long partialSize = computeVectorSizes(keySizes, partialNumEntries, safetyFactor);
+      hashTableSize += RecordBatchSizer.multiplyByFactors(partialSize, hashTableDoublingFactor);
+    }
+
+    return RecordBatchSizer.multiplyByFactors(hashTableSize, fragmentationFactor);
+  }
+
+  @Override
+  public double getDoublingFactor() {
+    return hashTableDoublingFactor;
+  }
+
+  @Override
+  public String getType() {
+    return TYPE;
+  }
+
+  public static long computeVectorSizes(final Map<String, Long> vectorSizes,
+                                        final long numRecords,
+                                        final double safetyFactor) {
+    long totalKeySize = 0L;
+
+    for (Map.Entry<String, Long> entry: vectorSizes.entrySet()) {
+      totalKeySize += computeValueVectorSize(numRecords, entry.getValue(), safetyFactor);
+    }
+
+    return totalKeySize;
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorLeanImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorLeanImpl.java
new file mode 100644
index 0000000..87d8d1b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorLeanImpl.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import com.google.common.base.Preconditions;
+import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.vector.IntVector;
+
+import java.util.Map;
+
+import static org.apache.drill.exec.physical.impl.join.HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl.computeValueVectorSize;
+
+public class HashTableSizeCalculatorLeanImpl implements HashTableSizeCalculator {
+  public static final String TYPE = "LEAN";
+  private final int maxNumRecords;
+  private final double hashTableDoublingFactor;
+
+  public HashTableSizeCalculatorLeanImpl(int maxNumRecords, double hashTableDoublingFactor) {
+    this.maxNumRecords = maxNumRecords;
+    this.hashTableDoublingFactor = hashTableDoublingFactor;
+  }
+
+  @Override
+  public long calculateSize(final HashJoinMemoryCalculator.PartitionStat partitionStat,
+                            final Map<String, Long> keySizes,
+                            final double loadFactor,
+                            final double safetyFactor,
+                            final double fragmentationFactor) {
+    Preconditions.checkArgument(!keySizes.isEmpty());
+    Preconditions.checkArgument(!partitionStat.isSpilled());
+    Preconditions.checkArgument(partitionStat.getNumInMemoryRecords() > 0);
+
+    // The number of entries in the global index array. Note this array may not be completed populated and the degree of population depends on the load factor.
+    long numBuckets = (long) (((double) partitionStat.getNumInMemoryRecords()) * (1.0 / loadFactor));
+    // The number of pairs in the hash table
+    long numEntries = partitionStat.getNumInMemoryRecords();
+    // The number of Batch Holders in the hash table. Note that entries are tightly packed in the Batch Holders (no gaps).
+    long numBatchHolders = (numEntries + maxNumRecords - 1) / maxNumRecords;
+
+    // Include the size of the buckets array
+    long hashTableSize = RecordBatchSizer.multiplyByFactors(computeValueVectorSize(numBuckets, IntVector.VALUE_WIDTH), hashTableDoublingFactor);
+    // Each Batch Holder has an int vector of max size for holding links and hash values
+    hashTableSize += numBatchHolders * 2L * IntVector.VALUE_WIDTH * ((long) maxNumRecords);
+
+    long numFullBatchHolders = numEntries % maxNumRecords == 0? numBatchHolders: numBatchHolders - 1;
+    // Compute the size of the value vectors holding keys in each full bucket
+    hashTableSize += numFullBatchHolders * computeVectorSizes(keySizes, maxNumRecords, safetyFactor);
+
+    if (numFullBatchHolders != numBatchHolders) {
+      // The last bucket is a partial bucket
+      long partialNumEntries = numEntries % maxNumRecords;
+      hashTableSize += computeVectorSizes(keySizes, partialNumEntries, safetyFactor);
+      long maxVectorSize = computeMaxVectorSize(keySizes, partialNumEntries, safetyFactor);
+      hashTableSize -= maxVectorSize;
+      hashTableSize += RecordBatchSizer.multiplyByFactors(maxVectorSize, hashTableDoublingFactor);
+    }
+
+    return RecordBatchSizer.multiplyByFactors(hashTableSize, fragmentationFactor);
+  }
+
+  @Override
+  public double getDoublingFactor() {
+    return hashTableDoublingFactor;
+  }
+
+  @Override
+  public String getType() {
+    return TYPE;
+  }
+
+  public static long computeVectorSizes(final Map<String, Long> vectorSizes,
+                                        final long numRecords,
+                                        final double safetyFactor) {
+    long totalKeySize = 0L;
+
+    for (Map.Entry<String, Long> entry: vectorSizes.entrySet()) {
+      totalKeySize += computeValueVectorSize(numRecords, entry.getValue(), safetyFactor);
+    }
+
+    return totalKeySize;
+  }
+
+  public static long computeMaxVectorSize(final Map<String, Long> vectorSizes,
+                                        final long numRecords,
+                                        final double safetyFactor) {
+    long maxVectorSize = 0L;
+
+    for (Map.Entry<String, Long> entry: vectorSizes.entrySet()) {
+      maxVectorSize = Math.max(maxVectorSize, computeValueVectorSize(numRecords, entry.getValue(), safetyFactor));
+    }
+
+    return maxVectorSize;
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MechanicalHashJoinMemoryCalculator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MechanicalHashJoinMemoryCalculator.java
new file mode 100644
index 0000000..8b367dd
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MechanicalHashJoinMemoryCalculator.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.impl.join;
+
+import com.google.common.base.Preconditions;
+import org.apache.drill.exec.record.RecordBatch;
+
+import javax.annotation.Nullable;
+import java.util.Set;
+
+public class MechanicalHashJoinMemoryCalculator implements HashJoinMemoryCalculator {
+  private final int maxNumInMemBatches;
+
+  private boolean doMemoryCalc;
+
+  public MechanicalHashJoinMemoryCalculator(int maxNumInMemBatches) {
+    this.maxNumInMemBatches = maxNumInMemBatches;
+  }
+
+  @Override
+  public void initialize(boolean doMemoryCalc) {
+    this.doMemoryCalc = doMemoryCalc;
+  }
+
+  @Nullable
+  @Override
+  public BuildSidePartitioning next() {
+    if (doMemoryCalc) {
+      // return the mechanical implementation
+      return new MechanicalBuildSidePartitioning(maxNumInMemBatches);
+    } else {
+      // return Noop implementation
+      return new HashJoinMemoryCalculatorImpl.NoopBuildSidePartitioningImpl();
+    }
+  }
+
+  @Override
+  public HashJoinState getState() {
+    return HashJoinState.INITIALIZING;
+  }
+
+  public static class MechanicalBuildSidePartitioning implements BuildSidePartitioning {
+    private final int maxNumInMemBatches;
+
+    private int initialPartitions;
+    private PartitionStatSet partitionStatSet;
+
+    public MechanicalBuildSidePartitioning(int maxNumInMemBatches) {
+      this.maxNumInMemBatches = maxNumInMemBatches;
+    }
+
+    @Override
+    public void initialize(boolean autoTune,
+                           boolean reserveHash,
+                           RecordBatch buildSideBatch,
+                           RecordBatch probeSideBatch,
+                           Set<String> joinColumns,
+                           long memoryAvailable,
+                           int initialPartitions,
+                           int recordsPerPartitionBatchBuild,
+                           int recordsPerPartitionBatchProbe,
+                           int maxBatchNumRecordsBuild,
+                           int maxBatchNumRecordsProbe,
+                           int outputBatchNumRecords,
+                           double loadFactor) {
+      this.initialPartitions = initialPartitions;
+    }
+
+    @Override
+    public void setPartitionStatSet(PartitionStatSet partitionStatSet) {
+      this.partitionStatSet = Preconditions.checkNotNull(partitionStatSet);
+    }
+
+    @Override
+    public int getNumPartitions() {
+      return initialPartitions;
+    }
+
+    @Override
+    public long getBuildReservedMemory() {
+      return 0;
+    }
+
+    @Override
+    public long getMaxReservedMemory() {
+      return 0;
+    }
+
+    @Override
+    public boolean shouldSpill() {
+      return partitionStatSet.getNumInMemoryBatches() > maxNumInMemBatches;
+    }
+
+    @Override
+    public String makeDebugString() {
+      return "Mechanical build side calculations";
+    }
+
+    @Nullable
+    @Override
+    public PostBuildCalculations next() {
+      return new MechanicalPostBuildCalculations(maxNumInMemBatches, partitionStatSet);
+    }
+
+    @Override
+    public HashJoinState getState() {
+      return HashJoinState.BUILD_SIDE_PARTITIONING;
+    }
+  }
+
+  public static class MechanicalPostBuildCalculations implements PostBuildCalculations {
+    private final int maxNumInMemBatches;
+    private final PartitionStatSet partitionStatSet;
+
+    public MechanicalPostBuildCalculations(int maxNumInMemBatches,
+                                           PartitionStatSet partitionStatSet) {
+      this.maxNumInMemBatches = maxNumInMemBatches;
+      this.partitionStatSet = Preconditions.checkNotNull(partitionStatSet);
+    }
+
+    @Override
+    public void initialize() {
+      // Do nothing
+    }
+
+    @Override
+    public boolean shouldSpill() {
+      return partitionStatSet.getNumInMemoryBatches() > maxNumInMemBatches;
+    }
+
+    @Override
+    public String makeDebugString() {
+      return "Mechanical post build calculations";
+    }
+
+    @Nullable
+    @Override
+    public HashJoinMemoryCalculator next() {
+      return null;
+    }
+
+    @Override
+    public HashJoinState getState() {
+      return HashJoinState.POST_BUILD_CALCULATIONS;
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java
index b77fa61..42f7e72 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java
@@ -43,6 +43,7 @@ import org.apache.drill.exec.planner.torel.ConversionContext;
  * Logical Join implemented in Drill.
  */
 public class DrillJoinRel extends DrillJoinRelBase implements DrillRel {
+  public static final String EQUALITY_CONDITION = "==";
 
   /** Creates a DrillJoinRel.
    * We do not throw InvalidRelException in Logical planning phase. It's up to the post-logical planning check or physical planning
@@ -88,7 +89,7 @@ public class DrillJoinRel extends DrillJoinRelBase implements DrillRel {
     builder.right(rightOp);
 
     for (Pair<Integer, Integer> pair : Pair.zip(leftKeys, rightKeys)) {
-      builder.addCondition("==", new FieldReference(leftFields.get(pair.left)), new FieldReference(rightFields.get(pair.right)));
+      builder.addCondition(EQUALITY_CONDITION, new FieldReference(leftFields.get(pair.left)), new FieldReference(rightFields.get(pair.right)));
     }
 
     return builder.build();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 054ceec..dfa1424 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.impl.aggregate.SpilledRecordbatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.server.options.OptionValue;
@@ -120,6 +121,13 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
       stats.startProcessing();
     }
 
+    if (b instanceof SpilledRecordbatch) {
+      // Don't double count records which were already read and spilled.
+      // TODO evaluate whether swapping out upstream record batch with a SpilledRecordBatch
+      // is the right thing to do.
+      return next;
+    }
+
     switch(next) {
     case OK_NEW_SCHEMA:
       stats.batchReceived(inputIndex, b.getRecordCount(), true);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
index d30f565..7e531f8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
@@ -26,6 +26,7 @@ import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.AllocationManager.BufferLedger;
 import org.apache.drill.exec.memory.BaseAllocator;
+import org.apache.drill.exec.physical.impl.xsort.managed.SortMemoryManager;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.NullableVector;
@@ -49,9 +50,30 @@ import static org.apache.drill.exec.vector.AllocationHelper.STD_REPETITION_FACTO
  */
 
 public class RecordBatchSizer {
+  // TODO consolidate common memory estimation helpers
+  public static final double PAYLOAD_FROM_BUFFER = SortMemoryManager.PAYLOAD_FROM_BUFFER;
+  public static final double FRAGMENTATION_FACTOR = 1.0 / PAYLOAD_FROM_BUFFER;
+  public static final double BUFFER_FROM_PAYLOAD = SortMemoryManager.BUFFER_FROM_PAYLOAD;
+
   private static final int OFFSET_VECTOR_WIDTH = UInt4Vector.VALUE_WIDTH;
   private static final int BIT_VECTOR_WIDTH = UInt1Vector.VALUE_WIDTH;
 
+  public static long multiplyByFactors(long size, double... factors)
+  {
+    double doubleSize = (double) size;
+
+    for (double factor: factors) {
+      doubleSize *= factor;
+    }
+
+    return (long) doubleSize;
+  }
+
+  public static long multiplyByFactor(long size, double factor)
+  {
+    return (long) (((double) size) * factor);
+  }
+
   /**
    * Column size information.
    */
@@ -127,6 +149,14 @@ public class RecordBatchSizer {
     private Map<String, ColumnSize> children = CaseInsensitiveMap.newHashMap();
 
     /**
+     * Returns true if there is an accurate std size. Otherwise it returns false.
+     * @return True if there is an accurate std size. Otherwise it returns false.
+     */
+    public boolean hasStdDataSize() {
+      return !isVariableWidth && !isRepeated;
+    }
+
+    /**
      * std pure data size per entry from Drill metadata, based on type.
      * Does not include metadata vector overhead we add for cardinality,
      * variable length etc.
@@ -230,6 +260,18 @@ public class RecordBatchSizer {
     }
 
     /**
+     * If there is an accurate std net size, that is returned. Otherwise the net size is returned.
+     * @return If there is an accurate std net size, that is returned. Otherwise the net size is returned.
+     */
+    public int getStdNetOrNetSizePerEntry() {
+      if (hasStdDataSize()) {
+        return getStdNetSizePerEntry();
+      } else {
+        return getNetSizePerEntry();
+      }
+    }
+
+    /**
      * This is the total data size for the column, including children for map
      * columns. Does not include any overhead of metadata vectors.
      */
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 06a89b0..d966f50 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -560,4 +560,28 @@ public class VectorContainer implements VectorAccessible {
     schemaChanged = other.schemaChanged;
     other.schemaChanged = temp2;
   }
+
+  /**
+   * This method create a pretty string for a record in the {@link VectorContainer}.
+   * @param index The index of the record of interest.
+   * @return The string representation of a record.
+   */
+  public String prettyPrintRecord(int index) {
+    final StringBuilder sb = new StringBuilder();
+    String separator = "";
+    sb.append("[");
+
+    for (VectorWrapper vectorWrapper: wrappers) {
+      sb.append(separator);
+      separator = ", ";
+      final String columnName = vectorWrapper.getField().getName();
+      final Object value = vectorWrapper.getValueVector().getAccessor().getObject(index);
+
+      // "columnName" = 11
+      sb.append("\"").append(columnName).append("\" = ").append(value);
+    }
+
+    sb.append("]");
+    return sb.toString();
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 810fd37..183c1f1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -118,10 +118,9 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
       new OptionDefinition(PlannerSettings.ENABLE_UNNEST_LATERAL),
       new OptionDefinition(PlannerSettings.FORCE_2PHASE_AGGR), // for testing
       new OptionDefinition(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR),
-      new OptionDefinition(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR),
+      new OptionDefinition(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
       new OptionDefinition(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR),
-      new OptionDefinition(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR),
-      new OptionDefinition(ExecConstants.HASHJOIN_MAX_BATCHES_PER_PARTITION_VALIDATOR),
+      new OptionDefinition(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
       new OptionDefinition(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR),
       new OptionDefinition(ExecConstants.HASHAGG_MAX_MEMORY_VALIDATOR),
       new OptionDefinition(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR), // for tuning
@@ -188,6 +187,10 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
       new OptionDefinition(ExecConstants.MIN_MEMORY_PER_BUFFERED_OP),
       new OptionDefinition(ExecConstants.NON_BLOCKING_OPERATORS_MEMORY),
       new OptionDefinition(ExecConstants.HASH_JOIN_TABLE_FACTOR),
+      new OptionDefinition(ExecConstants.HASHJOIN_HASHTABLE_CALC_TYPE, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
+      new OptionDefinition(ExecConstants.HASHJOIN_SAFETY_FACTOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
+      new OptionDefinition(ExecConstants.HASHJOIN_HASH_DOUBLE_FACTOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
+      new OptionDefinition(ExecConstants.HASHJOIN_FRAGMENTATION_FACTOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
       new OptionDefinition(ExecConstants.HASH_AGG_TABLE_FACTOR),
       new OptionDefinition(ExecConstants.AVERAGE_FIELD_WIDTH),
       new OptionDefinition(ExecConstants.NEW_VIEW_DEFAULT_PERMS_VALIDATOR),
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index f321691..b4969c0 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -420,6 +420,7 @@ drill.exec.options: {
     debug.validate_vectors: false,
     drill.exec.functions.cast_empty_string_to_null: false,
     drill.exec.rpc.fragrunner.timeout: 10000,
+    drill.exec.hashjoin.mem_limit: 0,
     # Setting to control if HashAgg should fallback to older behavior of consuming
     # unbounded memory. In case of 2 phase Agg when available memory is not enough
     # to start at least 2 partitions then HashAgg fallbacks to this case. It can be
@@ -439,10 +440,13 @@ drill.exec.options: {
     exec.enable_union_type: false,
     exec.errors.verbose: false,
     exec.hashjoin.mem_limit: 0,
+    exec.hashjoin.hash_table_calc_type: "LEAN",
+    exec.hashjoin.safety_factor: 1.0,
+    exec.hashjoin.fragmentation_factor: 1.33,
+    exec.hashjoin.hash_double_factor: 2.0,
     exec.hashjoin.num_partitions: 32,
     exec.hashjoin.num_rows_in_batch: 1024,
-    exec.hashjoin.max_batches_in_memory: 128,
-    exec.hashjoin.max_batches_per_partition: 512,
+    exec.hashjoin.max_batches_in_memory: 0,
     exec.hashagg.mem_limit: 0,
     exec.hashagg.min_batches_per_partition: 2,
     exec.hashagg.num_partitions: 32,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java
index 6a4ec52..302fb59 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java
@@ -34,6 +34,7 @@ import org.apache.drill.exec.cache.VectorSerializer.Reader;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.test.BaseDirTestWatcher;
 import org.apache.drill.test.DirTestWatcher;
 import org.apache.drill.test.DrillTest;
 import org.apache.drill.test.OperatorFixture;
@@ -52,12 +53,12 @@ import org.junit.Test;
 public class TestBatchSerialization extends DrillTest {
 
   @ClassRule
-  public static final DirTestWatcher dirTestWatcher = new DirTestWatcher();
+  public static final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
   public static OperatorFixture fixture;
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
-    fixture = OperatorFixture.builder().build();
+    fixture = OperatorFixture.builder(dirTestWatcher).build();
   }
 
   @AfterClass
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
index 9fcb6a3..dd90edd 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
@@ -174,6 +174,7 @@ public class TestLargeFileCompilation extends BaseTestQuery {
   public void testHashJoin() throws Exception {
     String tableName = "wide_table_hash_join";
     try {
+      setSessionOption("drill.exec.hashjoin.fallback.enabled", true);
       testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION);
       testNoResult("alter session set `planner.enable_mergejoin` = false");
       testNoResult("alter session set `planner.enable_nestedloopjoin` = false");
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
new file mode 100644
index 0000000..b57329c
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.common;
+
+import com.google.common.collect.Lists;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.physical.impl.aggregate.SpilledRecordbatch;
+import org.apache.drill.exec.physical.impl.join.HashJoinMemoryCalculator;
+import org.apache.drill.exec.physical.impl.join.HashJoinMemoryCalculatorImpl;
+import org.apache.drill.exec.physical.impl.join.JoinUtils;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
+import org.apache.drill.exec.planner.logical.DrillJoinRel;
+import org.apache.drill.exec.proto.ExecProtos;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.test.BaseDirTestWatcher;
+import org.apache.drill.test.OperatorFixture;
+import org.apache.drill.test.rowSet.DirectRowSet;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetBatch;
+import org.apache.drill.test.rowSet.RowSetBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.List;
+
+public class HashPartitionTest {
+  @Rule
+  public final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
+
+  @Test
+  public void noSpillBuildSideTest() throws Exception
+  {
+    new HashPartitionFixture().run(new HashPartitionTestCase() {
+      private RowSet buildRowSet;
+      private RowSet probeRowSet;
+
+      @Override
+      public RecordBatch createBuildBatch(BatchSchema schema, BufferAllocator allocator) {
+        buildRowSet = new RowSetBuilder(allocator, schema)
+          .addRow(1, "green")
+          .addRow(3, "red")
+          .addRow(2, "blue")
+          .build();
+        return new RowSetBatch(buildRowSet);
+      }
+
+      @Override
+      public void createResultBuildBatch(BatchSchema schema, BufferAllocator allocator) {
+      }
+
+      @Override
+      public RecordBatch createProbeBatch(BatchSchema schema, BufferAllocator allocator) {
+        probeRowSet = new RowSetBuilder(allocator, schema)
+          .addRow(.5, "yellow")
+          .addRow(1.5, "blue")
+          .addRow(2.5, "black")
+          .build();
+        return new RowSetBatch(probeRowSet);
+      }
+
+      @Override
+      public void run(SpillSet spillSet,
+                      BatchSchema buildSchema,
+                      BatchSchema probeSchema,
+                      RecordBatch buildBatch,
+                      RecordBatch probeBatch,
+                      ChainedHashTable baseHashTable,
+                      FragmentContext context,
+                      OperatorContext operatorContext) throws Exception {
+
+        final HashPartition hashPartition = new HashPartition(context,
+          context.getAllocator(),
+          baseHashTable,
+          buildBatch,
+          probeBatch,
+          10,
+          spillSet,
+          0,
+          0);
+
+        final HashJoinMemoryCalculator.BuildSidePartitioning noopCalc = new HashJoinMemoryCalculatorImpl.NoopBuildSidePartitioningImpl();
+
+        hashPartition.appendInnerRow(buildRowSet.container(), 0, 10, noopCalc);
+        hashPartition.appendInnerRow(buildRowSet.container(), 1, 11, noopCalc);
+        hashPartition.appendInnerRow(buildRowSet.container(), 2, 12, noopCalc);
+        hashPartition.completeAnInnerBatch(false, false);
+        hashPartition.buildContainersHashTableAndHelper();
+
+        {
+          int compositeIndex = hashPartition.probeForKey(0, 16);
+          Assert.assertEquals(-1, compositeIndex);
+        }
+
+        {
+          int compositeIndex = hashPartition.probeForKey(1, 12);
+          int startIndex = hashPartition.getStartIndex(compositeIndex);
+          int nextIndex = hashPartition.getNextIndex(startIndex);
+
+          Assert.assertEquals(2, startIndex);
+          Assert.assertEquals(-1, nextIndex);
+        }
+
+        {
+          int compositeIndex = hashPartition.probeForKey(2, 15);
+          Assert.assertEquals(-1, compositeIndex);
+        }
+
+        buildRowSet.clear();
+        probeRowSet.clear();
+        hashPartition.close();
+      }
+    });
+  }
+
+  @Test
+  public void spillSingleIncompleteBatchBuildSideTest() throws Exception
+  {
+    new HashPartitionFixture().run(new HashPartitionTestCase() {
+      private RowSet buildRowSet;
+      private RowSet probeRowSet;
+      private RowSet actualBuildRowSet;
+
+      @Override
+      public RecordBatch createBuildBatch(BatchSchema schema, BufferAllocator allocator) {
+        buildRowSet = new RowSetBuilder(allocator, schema)
+          .addRow(1, "green")
+          .addRow(3, "red")
+          .addRow(2, "blue")
+          .build();
+        return new RowSetBatch(buildRowSet);
+      }
+
+      @Override
+      public void createResultBuildBatch(BatchSchema schema, BufferAllocator allocator) {
+        final BatchSchema newSchema = BatchSchema.newBuilder()
+          .addFields(schema)
+          .addField(MaterializedField.create(HashPartition.HASH_VALUE_COLUMN_NAME, HashPartition.HVtype))
+          .build();
+        actualBuildRowSet = new RowSetBuilder(allocator, newSchema)
+          .addRow(1, "green", 10)
+          .addRow(3, "red", 11)
+          .addRow(2, "blue", 12)
+          .build();
+      }
+
+      @Override
+      public RecordBatch createProbeBatch(BatchSchema schema, BufferAllocator allocator) {
+        probeRowSet = new RowSetBuilder(allocator, schema)
+          .addRow(.5, "yellow")
+          .addRow(1.5, "blue")
+          .addRow(2.5, "black")
+          .build();
+        return new RowSetBatch(probeRowSet);
+      }
+
+      @Override
+      public void run(SpillSet spillSet,
+                      BatchSchema buildSchema,
+                      BatchSchema probeSchema,
+                      RecordBatch buildBatch,
+                      RecordBatch probeBatch,
+                      ChainedHashTable baseHashTable,
+                      FragmentContext context,
+                      OperatorContext operatorContext) {
+
+        final HashPartition hashPartition = new HashPartition(context,
+          context.getAllocator(),
+          baseHashTable,
+          buildBatch,
+          probeBatch,
+          10,
+          spillSet,
+          0,
+          0);
+
+        final HashJoinMemoryCalculator.BuildSidePartitioning noopCalc = new HashJoinMemoryCalculatorImpl.NoopBuildSidePartitioningImpl();
+
+        hashPartition.appendInnerRow(buildRowSet.container(), 0, 10, noopCalc);
+        hashPartition.appendInnerRow(buildRowSet.container(), 1, 11, noopCalc);
+        hashPartition.appendInnerRow(buildRowSet.container(), 2, 12, noopCalc);
+        hashPartition.completeAnInnerBatch(false, false);
+        hashPartition.spillThisPartition();
+        final String spillFile = hashPartition.getSpillFile();
+        final int batchesCount = hashPartition.getPartitionBatchesCount();;
+        hashPartition.closeWriter();
+
+        SpilledRecordbatch spilledBuildBatch = new SpilledRecordbatch(spillFile, batchesCount, context, buildSchema, operatorContext, spillSet);
+        final RowSet actual = DirectRowSet.fromContainer(spilledBuildBatch.getContainer());
+
+        new RowSetComparison(actualBuildRowSet).verify(actual);
+
+        spilledBuildBatch.close();
+        buildRowSet.clear();
+        actualBuildRowSet.clear();
+        probeRowSet.clear();
+        hashPartition.close();
+      }
+    });
+  }
+
+  public class HashPartitionFixture {
+    public void run(HashPartitionTestCase testCase) throws Exception {
+      try (OperatorFixture operatorFixture = new OperatorFixture.Builder(HashPartitionTest.this.dirTestWatcher).build()) {
+
+        final FragmentContext context = operatorFixture.getFragmentContext();
+        final HashJoinPOP pop = new HashJoinPOP(null, null, null, JoinRelType.FULL);
+        final OperatorContext operatorContext = operatorFixture.operatorContext(pop);
+        final DrillConfig config = context.getConfig();
+        final BufferAllocator allocator = operatorFixture.allocator();
+
+        final UserBitShared.QueryId queryId = UserBitShared.QueryId.newBuilder()
+          .setPart1(1L)
+          .setPart2(2L)
+          .build();
+        final ExecProtos.FragmentHandle fragmentHandle = ExecProtos.FragmentHandle.newBuilder()
+          .setQueryId(queryId)
+          .setMinorFragmentId(1)
+          .setMajorFragmentId(2)
+          .build();
+
+        final SpillSet spillSet = new SpillSet(config, fragmentHandle, pop);
+
+        // Create build batch
+        MaterializedField buildColA = MaterializedField.create("buildColA", Types.required(TypeProtos.MinorType.INT));
+        MaterializedField buildColB = MaterializedField.create("buildColB", Types.required(TypeProtos.MinorType.VARCHAR));
+        List<MaterializedField> buildCols = Lists.newArrayList(buildColA, buildColB);
+        final BatchSchema buildSchema = new BatchSchema(BatchSchema.SelectionVectorMode.NONE, buildCols);
+        final RecordBatch buildBatch = testCase.createBuildBatch(buildSchema, allocator);
+        testCase.createResultBuildBatch(buildSchema, allocator);
+
+        // Create probe batch
+        MaterializedField probeColA = MaterializedField.create("probeColA", Types.required(TypeProtos.MinorType.FLOAT4));
+        MaterializedField probeColB = MaterializedField.create("probeColB", Types.required(TypeProtos.MinorType.VARCHAR));
+        List<MaterializedField> probeCols = Lists.newArrayList(probeColA, probeColB);
+        final BatchSchema probeSchema = new BatchSchema(BatchSchema.SelectionVectorMode.NONE, probeCols);
+        final RecordBatch probeBatch = testCase.createProbeBatch(probeSchema, allocator);
+
+        final LogicalExpression buildColExpression = SchemaPath.getSimplePath(buildColB.getName());;
+        final LogicalExpression probeColExpression = SchemaPath.getSimplePath(probeColB.getName());;
+
+        final JoinCondition condition = new JoinCondition(DrillJoinRel.EQUALITY_CONDITION, probeColExpression, buildColExpression);
+        final List<Comparator> comparators = Lists.newArrayList(JoinUtils.checkAndReturnSupportedJoinComparator(condition));
+
+        final List<NamedExpression> buildExpressions = Lists.newArrayList(new NamedExpression(buildColExpression, new FieldReference("build_side_0")));
+        final List<NamedExpression> probeExpressions = Lists.newArrayList(new NamedExpression(probeColExpression, new FieldReference("probe_side_0")));
+
+        final int hashTableSize = (int) context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE);
+        final HashTableConfig htConfig = new HashTableConfig(hashTableSize, HashTable.DEFAULT_LOAD_FACTOR, buildExpressions, probeExpressions, comparators);
+        final ChainedHashTable baseHashTable = new ChainedHashTable(htConfig, context, allocator, buildBatch, probeBatch, null);
+        baseHashTable.updateIncoming(buildBatch, probeBatch);
+
+        testCase.run(spillSet, buildSchema, probeSchema, buildBatch, probeBatch, baseHashTable, context, operatorContext);
+      }
+    }
+  }
+
+  interface HashPartitionTestCase {
+    RecordBatch createBuildBatch(BatchSchema schema, BufferAllocator allocator);
+    void createResultBuildBatch(BatchSchema schema, BufferAllocator allocator);
+    RecordBatch createProbeBatch(BatchSchema schema, BufferAllocator allocator);
+
+    void run(SpillSet spillSet,
+             BatchSchema buildSchema,
+             BatchSchema probeSchema,
+             RecordBatch buildBatch,
+             RecordBatch probeBatch,
+             ChainedHashTable baseHashTable,
+             FragmentContext context,
+             OperatorContext operatorContext) throws Exception;
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashTableAllocationTrackerTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashTableAllocationTrackerTest.java
new file mode 100644
index 0000000..131d82f
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashTableAllocationTrackerTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.common;
+
+import com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class HashTableAllocationTrackerTest {
+  @Test
+  public void testDoubleGetNextCall() {
+    final HashTableConfig config = new HashTableConfig(100, true, .5f, Lists.newArrayList(), Lists.newArrayList(), Lists.newArrayList());
+    final HashTableAllocationTracker tracker = new HashTableAllocationTracker(config, 30);
+
+    for (int counter = 0; counter < 100; counter++) {
+      Assert.assertEquals(30, tracker.getNextBatchHolderSize());
+    }
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testPrematureCommit() {
+    final HashTableConfig config = new HashTableConfig(100, .5f, Lists.newArrayList(), Lists.newArrayList(), Lists.newArrayList());
+    final HashTableAllocationTracker tracker = new HashTableAllocationTracker(config, 30);
+
+    tracker.commit();
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testDoubleCommit() {
+    final HashTableConfig config = new HashTableConfig(100, .5f, Lists.newArrayList(), Lists.newArrayList(), Lists.newArrayList());
+    final HashTableAllocationTracker tracker = new HashTableAllocationTracker(config, 30);
+
+    tracker.commit();
+    tracker.commit();
+  }
+
+  @Test
+  public void testOverAsking() {
+    final HashTableConfig config = new HashTableConfig(100, .5f, Lists.newArrayList(), Lists.newArrayList(), Lists.newArrayList());
+    final HashTableAllocationTracker tracker = new HashTableAllocationTracker(config, 30);
+
+    tracker.getNextBatchHolderSize();
+  }
+
+  /**
+   * Test for when we do not know the final size of the hash table.
+   */
+  @Test
+  public void testLifecycle1() {
+    final HashTableConfig config = new HashTableConfig(100, .5f, Lists.newArrayList(), Lists.newArrayList(), Lists.newArrayList());
+    final HashTableAllocationTracker tracker = new HashTableAllocationTracker(config, 30);
+
+    for (int counter = 0; counter < 100; counter++) {
+      Assert.assertEquals(30, tracker.getNextBatchHolderSize());
+      tracker.commit();
+    }
+  }
+
+  /**
+   * Test for when we know the final size of the hash table
+   */
+  @Test
+  public void testLifecycle() {
+    final HashTableConfig config = new HashTableConfig(100, true, .5f, Lists.newArrayList(), Lists.newArrayList(), Lists.newArrayList());
+    final HashTableAllocationTracker tracker = new HashTableAllocationTracker(config, 30);
+
+    Assert.assertEquals(30, tracker.getNextBatchHolderSize());
+    tracker.commit();
+    Assert.assertEquals(30, tracker.getNextBatchHolderSize());
+    tracker.commit();
+    Assert.assertEquals(30, tracker.getNextBatchHolderSize());
+    tracker.commit();
+    Assert.assertEquals(10, tracker.getNextBatchHolderSize());
+    tracker.commit();
+
+    boolean caughtException = false;
+
+    try {
+      tracker.getNextBatchHolderSize();
+    } catch (IllegalStateException ex) {
+      caughtException = true;
+    }
+
+    Assert.assertTrue(caughtException);
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/PartitionStatImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/PartitionStatImpl.java
new file mode 100644
index 0000000..43e02b6
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/PartitionStatImpl.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * This class represents the memory size statistics for an entire partition.
+ */
+public class PartitionStatImpl implements HashJoinMemoryCalculator.PartitionStat {
+  private boolean spilled;
+  private long numRecords;
+  private long partitionSize;
+  private LinkedList<HashJoinMemoryCalculator.BatchStat> batchStats = Lists.newLinkedList();
+
+  public PartitionStatImpl() {
+  }
+
+  public void add(HashJoinMemoryCalculator.BatchStat batchStat) {
+    Preconditions.checkState(!spilled);
+    Preconditions.checkNotNull(batchStat);
+    partitionSize += batchStat.getBatchSize();
+    numRecords += batchStat.getNumRecords();
+    batchStats.addLast(batchStat);
+  }
+
+  public void spill() {
+    Preconditions.checkState(!spilled);
+    spilled = true;
+    partitionSize = 0;
+    numRecords = 0;
+    batchStats.clear();
+  }
+
+  public List<HashJoinMemoryCalculator.BatchStat> getInMemoryBatches()
+  {
+    return Collections.unmodifiableList(batchStats);
+  }
+
+  public int getNumInMemoryBatches()
+  {
+    return batchStats.size();
+  }
+
+  public boolean isSpilled()
+  {
+    return spilled;
+  }
+
+  public long getNumInMemoryRecords()
+  {
+    return numRecords;
+  }
+
+  public long getInMemorySize()
+  {
+    return partitionSize;
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
new file mode 100644
index 0000000..dc05a70
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.record.RecordBatch;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestBuildSidePartitioningImpl {
+  @Test
+  public void testSimpleReserveMemoryCalculationNoHash() {
+    final int maxBatchNumRecords = 20;
+    final HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl calc =
+      new HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl(
+        new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_SIZE, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
+        HashJoinHelperSizeCalculatorImpl.INSTANCE,
+        2.0,
+        1.5);
+
+    final CaseInsensitiveMap<Long> buildValueSizes = CaseInsensitiveMap.newHashMap();
+    final CaseInsensitiveMap<Long> probeValueSizes = CaseInsensitiveMap.newHashMap();
+    final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap();
+
+    calc.initialize(true,
+      false,
+      buildValueSizes,
+      probeValueSizes,
+      keySizes,
+      200,
+      2,
+      20,
+      10,
+      20,
+      10,
+      10,
+      5,
+      maxBatchNumRecords,
+      maxBatchNumRecords,
+      10,
+      .75);
+
+    final HashJoinMemoryCalculator.PartitionStatSet partitionStatSet =
+      new HashJoinMemoryCalculator.PartitionStatSet(new PartitionStatImpl(), new PartitionStatImpl());
+    calc.setPartitionStatSet(partitionStatSet);
+
+    long expectedReservedMemory = 60 // Max incoming batch size
+      + 2 * 30 // build side batch for each spilled partition
+      + 60; // Max incoming probe batch size
+    long actualReservedMemory = calc.getBuildReservedMemory();
+
+    Assert.assertEquals(expectedReservedMemory, actualReservedMemory);
+    Assert.assertEquals(2, calc.getNumPartitions());
+  }
+
+  @Test
+  public void testSimpleReserveMemoryCalculationHash() {
+    final int maxBatchNumRecords = 20;
+    final HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl calc =
+      new HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl(
+        new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_SIZE, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
+        HashJoinHelperSizeCalculatorImpl.INSTANCE,
+        2.0,
+        1.5);
+
+    final CaseInsensitiveMap<Long> buildValueSizes = CaseInsensitiveMap.newHashMap();
+    final CaseInsensitiveMap<Long> probeValueSizes = CaseInsensitiveMap.newHashMap();
+    final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap();
+
+    calc.initialize(false,
+      true,
+      buildValueSizes,
+      probeValueSizes,
+      keySizes,
+      350,
+      2,
+      20,
+      10,
+      20,
+      10,
+      10,
+      5,
+      maxBatchNumRecords,
+      maxBatchNumRecords,
+      10,
+      .75);
+
+    final HashJoinMemoryCalculator.PartitionStatSet partitionStatSet =
+      new HashJoinMemoryCalculator.PartitionStatSet(new PartitionStatImpl(), new PartitionStatImpl());
+    calc.setPartitionStatSet(partitionStatSet);
+
+    long expectedReservedMemory = 60 // Max incoming batch size
+      + 2 * (/* data size for batch */ 30 + /* Space reserved for hash value vector */ 10 * 4 * 2) // build side batch for each spilled partition
+      + 60; // Max incoming probe batch size
+    long actualReservedMemory = calc.getBuildReservedMemory();
+
+    Assert.assertEquals(expectedReservedMemory, actualReservedMemory);
+    Assert.assertEquals(2, calc.getNumPartitions());
+  }
+
+  @Test
+  public void testAdjustInitialPartitions() {
+    final int maxBatchNumRecords = 20;
+    final HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl calc =
+      new HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl(
+        new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_SIZE, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
+        HashJoinHelperSizeCalculatorImpl.INSTANCE,
+        2.0,
+        1.5);
+
+    final CaseInsensitiveMap<Long> buildValueSizes = CaseInsensitiveMap.newHashMap();
+    final CaseInsensitiveMap<Long> probeValueSizes = CaseInsensitiveMap.newHashMap();
+    final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap();
+
+    calc.initialize(
+      true,
+      false,
+      buildValueSizes,
+      probeValueSizes,
+      keySizes,
+      200,
+      4,
+      20,
+      10,
+      20,
+      10,
+      10,
+      5,
+      maxBatchNumRecords,
+      maxBatchNumRecords,
+      10,
+      .75);
+
+    final HashJoinMemoryCalculator.PartitionStatSet partitionStatSet =
+      new HashJoinMemoryCalculator.PartitionStatSet(new PartitionStatImpl(), new PartitionStatImpl(),
+        new PartitionStatImpl(), new PartitionStatImpl());
+    calc.setPartitionStatSet(partitionStatSet);
+
+    long expectedReservedMemory = 60 // Max incoming batch size
+      + 2 * 30 // build side batch for each spilled partition
+      + 60; // Max incoming probe batch size
+    long actualReservedMemory = calc.getBuildReservedMemory();
+
+    Assert.assertEquals(expectedReservedMemory, actualReservedMemory);
+    Assert.assertEquals(2, calc.getNumPartitions());
+  }
+
+  @Test
+  public void testNoRoomInMemoryForBatch1() {
+    final int maxBatchNumRecords = 20;
+
+    final HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl calc =
+      new HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl(
+        new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_SIZE, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
+        HashJoinHelperSizeCalculatorImpl.INSTANCE,
+        2.0,
+        1.5);
+
+    final CaseInsensitiveMap<Long> buildValueSizes = CaseInsensitiveMap.newHashMap();
+    final CaseInsensitiveMap<Long> probeValueSizes = CaseInsensitiveMap.newHashMap();
+    final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap();
+
+    calc.initialize(
+      true,
+      false,
+      buildValueSizes,
+      probeValueSizes,
+      keySizes,
+      180,
+      2,
+      20,
+      10,
+      20,
+      10,
+      10,
+      5,
+      maxBatchNumRecords,
+      maxBatchNumRecords,
+      10,
+      .75);
+
+    final PartitionStatImpl partition1 = new PartitionStatImpl();
+    final PartitionStatImpl partition2 = new PartitionStatImpl();
+    final HashJoinMemoryCalculator.PartitionStatSet partitionStatSet =
+      new HashJoinMemoryCalculator.PartitionStatSet(partition1, partition2);
+    calc.setPartitionStatSet(partitionStatSet);
+
+    long expectedReservedMemory = 60 // Max incoming batch size
+      + 2 * 30 // build side batch for each spilled partition
+      + 60; // Max incoming probe batch size
+    long actualReservedMemory = calc.getBuildReservedMemory();
+
+    Assert.assertEquals(expectedReservedMemory, actualReservedMemory);
+    Assert.assertEquals(2, calc.getNumPartitions());
+
+    partition1.add(new HashJoinMemoryCalculator.BatchStat(10, 8));
+    Assert.assertTrue(calc.shouldSpill());
+  }
+
+  @Test
+  public void testCompleteLifeCycle() {
+    final int maxBatchNumRecords = 20;
+    final HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl calc =
+      new HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl(
+        new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_SIZE, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
+        HashJoinHelperSizeCalculatorImpl.INSTANCE,
+        2.0,
+        1.5);
+
+    final CaseInsensitiveMap<Long> buildValueSizes = CaseInsensitiveMap.newHashMap();
+    final CaseInsensitiveMap<Long> probeValueSizes = CaseInsensitiveMap.newHashMap();
+    final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap();
+
+    calc.initialize(
+      true,
+      false,
+      buildValueSizes,
+      probeValueSizes,
+      keySizes,
+      210,
+      2,
+      20,
+      10,
+      20,
+      10,
+      10,
+      5,
+      maxBatchNumRecords,
+      maxBatchNumRecords,
+      10,
+      .75);
+
+    final PartitionStatImpl partition1 = new PartitionStatImpl();
+    final PartitionStatImpl partition2 = new PartitionStatImpl();
+    final HashJoinMemoryCalculator.PartitionStatSet partitionStatSet =
+      new HashJoinMemoryCalculator.PartitionStatSet(partition1, partition2);
+    calc.setPartitionStatSet(partitionStatSet);
+
+
+    // Add to partition 1, no spill needed
+    {
+      partition1.add(new HashJoinMemoryCalculator.BatchStat(10, 7));
+      Assert.assertFalse(calc.shouldSpill());
+    }
+
+    // Add to partition 2, no spill needed
+    {
+      partition2.add(new HashJoinMemoryCalculator.BatchStat(10, 8));
+      Assert.assertFalse(calc.shouldSpill());
+    }
+
+    // Add to partition 1, and partition 1 spilled
+    {
+      partition1.add(new HashJoinMemoryCalculator.BatchStat(10, 8));
+      Assert.assertTrue(calc.shouldSpill());
+      partition1.spill();
+    }
+
+    // Add to partition 2, no spill needed
+    {
+      partition2.add(new HashJoinMemoryCalculator.BatchStat(10, 7));
+      Assert.assertFalse(calc.shouldSpill());
+    }
+
+    // Add to partition 2, and partition 2 spilled
+    {
+      partition2.add(new HashJoinMemoryCalculator.BatchStat(10, 8));
+      Assert.assertTrue(calc.shouldSpill());
+      partition2.spill();
+    }
+
+    Assert.assertNotNull(calc.next());
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinHelperSizeCalculatorImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinHelperSizeCalculatorImpl.java
new file mode 100644
index 0000000..bd34df4
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinHelperSizeCalculatorImpl.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.record.RecordBatch;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestHashJoinHelperSizeCalculatorImpl {
+  @Test
+  public void simpleCalculateSize() {
+    final long intSize =
+      ((long) TypeHelper.getSize(TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.INT).build()));
+
+    // Account for the overhead of a selection vector
+    long expected = intSize * RecordBatch.MAX_BATCH_SIZE;
+    // Account for sv4 vector for batches
+    expected += intSize * 3500;
+
+    PartitionStatImpl partitionStat = new PartitionStatImpl();
+    partitionStat.add(new HashJoinMemoryCalculator.BatchStat(1000, 2000));
+    partitionStat.add(new HashJoinMemoryCalculator.BatchStat(2500, 5000));
+
+    long actual = HashJoinHelperSizeCalculatorImpl.INSTANCE.calculateSize(partitionStat, 1.0);
+    Assert.assertEquals(expected, actual);
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinMemoryCalculatorImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinMemoryCalculatorImpl.java
new file mode 100644
index 0000000..4944c87
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinMemoryCalculatorImpl.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.drill.exec.vector.IntVector;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestHashJoinMemoryCalculatorImpl {
+  @Test
+  public void testComputeMaxBatchSizeNoHash() {
+    final long expected = 1200;
+    final long actual = HashJoinMemoryCalculatorImpl.computeMaxBatchSize(
+      100,
+      25,
+      100,
+      2.0,
+      1.5,
+      false);
+    final long actualNoHash = HashJoinMemoryCalculatorImpl.computeMaxBatchSizeNoHash(
+      100,
+      25,
+      100,
+      2.0,
+      1.5);
+
+    Assert.assertEquals(expected, actual);
+    Assert.assertEquals(expected, actualNoHash);
+  }
+
+  @Test
+  public void testComputeMaxBatchSizeHash()
+  {
+    long expected = HashJoinMemoryCalculatorImpl.computeMaxBatchSizeNoHash(
+      100,
+      25,
+      100,
+      2.0,
+      4.0) +
+      100 * IntVector.VALUE_WIDTH * 2;
+
+    final long actual = HashJoinMemoryCalculatorImpl.computeMaxBatchSize(
+      100,
+      25,
+      100,
+      2.0,
+      4.0,
+      true);
+
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test // Make sure no exception is thrown
+  public void testMakeDebugString()
+  {
+    final PartitionStatImpl partitionStat1 = new PartitionStatImpl();
+    final PartitionStatImpl partitionStat2 = new PartitionStatImpl();
+    final PartitionStatImpl partitionStat3 = new PartitionStatImpl();
+    final PartitionStatImpl partitionStat4 = new PartitionStatImpl();
+
+    final HashJoinMemoryCalculator.PartitionStatSet partitionStatSet =
+      new HashJoinMemoryCalculator.PartitionStatSet(partitionStat1, partitionStat2, partitionStat3, partitionStat4);
+    partitionStat1.add(new HashJoinMemoryCalculator.BatchStat(10, 7));
+    partitionStat2.add(new HashJoinMemoryCalculator.BatchStat(11, 20));
+    partitionStat3.spill();
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java
index 4cfe6b4..37a6a33 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java
@@ -33,8 +33,6 @@ import java.util.List;
 
 @Category({SlowTest.class, OperatorTest.class})
 public class TestHashJoinSpill extends PhysicalOpUnitTestBase {
-
-
   @SuppressWarnings("unchecked")
   @Test
   // Should spill, including recursive spill
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashTableSizeCalculatorConservativeImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashTableSizeCalculatorConservativeImpl.java
new file mode 100644
index 0000000..4e9f1c7
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashTableSizeCalculatorConservativeImpl.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import com.google.common.collect.Maps;
+import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.vector.UInt4Vector;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+
+/**
+ * This is a test for the more conservative hash table memory calculator {@link HashTableSizeCalculatorConservativeImpl}.
+ */
+public class TestHashTableSizeCalculatorConservativeImpl {
+  @Test
+  public void testCalculateHashTableSize() {
+    final int maxNumRecords = 40;
+    double loadFactor = .75;
+
+    final Map<String, Long> keySizes = Maps.newHashMap();
+    keySizes.put("a", 3L);
+    keySizes.put("b", 8L);
+
+    // 60 * 4/3 = 80 rounded to nearest power of 2 is 128 buckets
+    long expected = RecordBatchSizer.multiplyByFactor(
+      UInt4Vector.VALUE_WIDTH * 128, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR);
+    // First bucket key value vector sizes
+    expected += HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl.computeValueVectorSize(maxNumRecords, 3L);
+    expected += HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl.computeValueVectorSize(maxNumRecords, 8L);
+
+    // Second bucket key value vector sizes
+    expected += RecordBatchSizer.multiplyByFactor(
+      HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl.computeValueVectorSize(20, 3L), HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR);
+    expected += RecordBatchSizer.multiplyByFactor(
+      HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl.computeValueVectorSize(20, 8L), HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR);
+
+    // Overhead vectors for links and hash values for each batchHolder
+    expected += 2 * UInt4Vector.VALUE_WIDTH // links and hash values */
+       * 2 * maxNumRecords; // num batch holders
+
+    PartitionStatImpl partitionStat = new PartitionStatImpl();
+    partitionStat.add(
+      new HashJoinMemoryCalculator.BatchStat(maxNumRecords + 20, 1));
+
+    final HashTableSizeCalculatorConservativeImpl calc = new HashTableSizeCalculatorConservativeImpl(maxNumRecords, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR);
+    long actual = calc.calculateSize(partitionStat, keySizes, loadFactor, 1.0, 1.0);
+
+    Assert.assertEquals(expected, actual);
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashTableSizeCalculatorLeanImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashTableSizeCalculatorLeanImpl.java
new file mode 100644
index 0000000..5cdf524
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashTableSizeCalculatorLeanImpl.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import com.google.common.collect.Maps;
+import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.vector.UInt4Vector;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+
+/**
+ * This is a test for the more accurate hash table memory calculator {@link HashTableSizeCalculatorLeanImpl}.
+ */
+public class TestHashTableSizeCalculatorLeanImpl {
+  @Test
+  public void testCalculateHashTableSize() {
+    final int maxNumRecords = 40;
+    double loadFactor = .75;
+
+    final Map<String, Long> keySizes = Maps.newHashMap();
+    keySizes.put("a", 3L);
+    keySizes.put("b", 8L);
+
+    // 60 * 4/3 = 80 rounded to nearest power of 2 is 128 buckets
+    long expected = RecordBatchSizer.multiplyByFactor(
+      UInt4Vector.VALUE_WIDTH * 128, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR);
+    // First bucket key value vector sizes
+    expected += HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl.computeValueVectorSize(maxNumRecords, 3L);
+    expected += HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl.computeValueVectorSize(maxNumRecords, 8L);
+
+    // Second bucket key value vector sizes
+    expected += HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl.computeValueVectorSize(20, 3L);
+    expected += RecordBatchSizer.multiplyByFactor(
+      HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl.computeValueVectorSize(20, 8L), HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR);
+
+    // Overhead vectors for links and hash values for each batchHolder
+    expected += 2 * UInt4Vector.VALUE_WIDTH // links and hash values */
+       * 2 * maxNumRecords; // num batch holders
+
+    PartitionStatImpl partitionStat = new PartitionStatImpl();
+    partitionStat.add(
+      new HashJoinMemoryCalculator.BatchStat(maxNumRecords + 20, 1));
+
+    final HashTableSizeCalculatorLeanImpl calc = new HashTableSizeCalculatorLeanImpl(maxNumRecords, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR);
+    long actual = calc.calculateSize(partitionStat, keySizes, loadFactor, 1.0, 1.0);
+
+    Assert.assertEquals(expected, actual);
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPartitionStat.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPartitionStat.java
new file mode 100644
index 0000000..40eec6a
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPartitionStat.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestPartitionStat
+{
+  @Test
+  public void simpleAddBatchTest() {
+    final PartitionStatImpl partitionStat = new PartitionStatImpl();
+
+    comparePartitionStat(partitionStat, true, 0L, 0, 0L);
+    partitionStat.add(new HashJoinMemoryCalculator.BatchStat(1, 2));
+    comparePartitionStat(partitionStat, false, 2, 1, 1);
+    partitionStat.add(new HashJoinMemoryCalculator.BatchStat(2, 3));
+    comparePartitionStat(partitionStat, false, 5, 2, 3);
+  }
+
+  @Test
+  public void simpleSpillTest() {
+    final PartitionStatImpl partitionStat = new PartitionStatImpl();
+
+    Assert.assertFalse(partitionStat.isSpilled());
+    partitionStat.add(new HashJoinMemoryCalculator.BatchStat(1, 2));
+    Assert.assertFalse(partitionStat.isSpilled());
+    partitionStat.spill();
+    comparePartitionStat(partitionStat, true, 0, 0, 0);
+  }
+
+  public static void comparePartitionStat(final HashJoinMemoryCalculator.PartitionStat partitionStat,
+                                          boolean isEmpty,
+                                          long inMemorySize,
+                                          int numInMemoryBatches,
+                                          long numInMemoryRecords) {
+    Assert.assertEquals(isEmpty, partitionStat.getInMemoryBatches().isEmpty());
+    Assert.assertEquals(inMemorySize, partitionStat.getInMemorySize());
+    Assert.assertEquals(numInMemoryBatches, partitionStat.getNumInMemoryBatches());
+    Assert.assertEquals(numInMemoryRecords, partitionStat.getNumInMemoryRecords());
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPostBuildCalculationsImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPostBuildCalculationsImpl.java
new file mode 100644
index 0000000..1e9bb8a
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPostBuildCalculationsImpl.java
@@ -0,0 +1,434 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class TestPostBuildCalculationsImpl {
+  @Test
+  public void testRoundUpPowerOf2() {
+    long expected = 32;
+    long actual = HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl.roundUpToPowerOf2(expected);
+
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testRounUpNonPowerOf2ToPowerOf2() {
+    long expected = 32;
+    long actual = HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl.roundUpToPowerOf2(31);
+
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testComputeValueVectorSizePowerOf2() {
+    long expected = 4;
+    long actual =
+      HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl.computeValueVectorSize(2, 2);
+
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testComputeValueVectorSizeNonPowerOf2() {
+    long expected = 16;
+    long actual =
+      HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl.computeValueVectorSize(3, 3);
+
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testProbingAndPartitioningBuildAllInMemoryNoSpill() {
+    final Map<String, Long> keySizes = org.apache.drill.common.map.CaseInsensitiveMap.newHashMap();
+
+    final PartitionStatImpl partition1 = new PartitionStatImpl();
+    final PartitionStatImpl partition2 = new PartitionStatImpl();
+    final HashJoinMemoryCalculator.PartitionStatSet buildPartitionStatSet =
+      new HashJoinMemoryCalculator.PartitionStatSet(partition1, partition2);
+
+    final int recordsPerPartitionBatchBuild = 10;
+
+    addBatches(partition1, recordsPerPartitionBatchBuild,
+      10, 4);
+    addBatches(partition2, recordsPerPartitionBatchBuild,
+      10, 4);
+
+    final double fragmentationFactor = 2.0;
+    final double safetyFactor = 1.5;
+
+    HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc =
+      new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl(
+        290,
+        15,
+        60,
+        20,
+        buildPartitionStatSet,
+        keySizes,
+        new MockHashTableSizeCalculator(10),
+        new MockHashJoinHelperSizeCalculator(10),
+        fragmentationFactor,
+        safetyFactor,
+        .75,
+        false);
+
+    calc.initialize();
+
+    long expected = 60 // maxProbeBatchSize
+      + 160 // in memory partitions
+      + 20 // max output batch size
+      + 2 * 10 // Hash Table
+      + 2 * 10; // Hash join helper
+    Assert.assertFalse(calc.shouldSpill());
+    Assert.assertEquals(expected, calc.getConsumedMemory());
+    Assert.assertNull(calc.next());
+  }
+
+  @Test
+  public void testProbingAndPartitioningBuildAllInMemorySpill() {
+    final Map<String, Long> keySizes = org.apache.drill.common.map.CaseInsensitiveMap.newHashMap();
+
+    final PartitionStatImpl partition1 = new PartitionStatImpl();
+    final PartitionStatImpl partition2 = new PartitionStatImpl();
+    final HashJoinMemoryCalculator.PartitionStatSet buildPartitionStatSet =
+      new HashJoinMemoryCalculator.PartitionStatSet(partition1, partition2);
+
+    final int recordsPerPartitionBatchBuild = 10;
+
+    addBatches(partition1, recordsPerPartitionBatchBuild,
+      10, 4);
+    addBatches(partition2, recordsPerPartitionBatchBuild,
+      10, 4);
+
+    final double fragmentationFactor = 2.0;
+    final double safetyFactor = 1.5;
+
+    HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc =
+      new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl(
+        270,
+        15,
+        60,
+        20,
+        buildPartitionStatSet,
+        keySizes,
+        new MockHashTableSizeCalculator(10),
+        new MockHashJoinHelperSizeCalculator(10),
+        fragmentationFactor,
+        safetyFactor,
+        .75,
+        false);
+
+    calc.initialize();
+
+    long expected = 60 // maxProbeBatchSize
+      + 160 // in memory partitions
+      + 20 // max output batch size
+      + 2 * 10 // Hash Table
+      + 2 * 10; // Hash join helper
+    Assert.assertTrue(calc.shouldSpill());
+    Assert.assertEquals(expected, calc.getConsumedMemory());
+    partition1.spill();
+
+    expected = 60 // maxProbeBatchSize
+      + 80 // in memory partitions
+      + 20 // max output batch size
+      + 10 // Hash Table
+      + 10 // Hash join helper
+      + 15; // partition batch size
+    Assert.assertFalse(calc.shouldSpill());
+    Assert.assertEquals(expected, calc.getConsumedMemory());
+    Assert.assertNotNull(calc.next());
+  }
+
+  @Test
+  public void testProbingAndPartitioningBuildAllInMemoryNoSpillWithHash() {
+    final Map<String, Long> keySizes = org.apache.drill.common.map.CaseInsensitiveMap.newHashMap();
+
+    final PartitionStatImpl partition1 = new PartitionStatImpl();
+    final PartitionStatImpl partition2 = new PartitionStatImpl();
+    final HashJoinMemoryCalculator.PartitionStatSet buildPartitionStatSet =
+      new HashJoinMemoryCalculator.PartitionStatSet(partition1, partition2);
+
+    partition1.spill();
+    partition2.spill();
+
+    final double fragmentationFactor = 2.0;
+    final double safetyFactor = 1.5;
+
+    HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc =
+      new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl(
+        180,
+        15,
+        60,
+        20,
+        buildPartitionStatSet,
+        keySizes,
+        new MockHashTableSizeCalculator(10),
+        new MockHashJoinHelperSizeCalculator(10),
+        fragmentationFactor,
+        safetyFactor,
+        .75,
+        true);
+
+    calc.initialize();
+
+    long expected = 60 // maxProbeBatchSize
+      + 2 * 5 * 3 // partition batches
+      + 20; // max output batch size
+    Assert.assertFalse(calc.shouldSpill());
+    Assert.assertEquals(expected, calc.getConsumedMemory());
+    Assert.assertNotNull(calc.next());
+  }
+
+  @Test
+  public void testProbingAndPartitioningBuildAllInMemoryWithSpill() {
+    final Map<String, Long> keySizes = org.apache.drill.common.map.CaseInsensitiveMap.newHashMap();
+
+    final PartitionStatImpl partition1 = new PartitionStatImpl();
+    final PartitionStatImpl partition2 = new PartitionStatImpl();
+    final HashJoinMemoryCalculator.PartitionStatSet buildPartitionStatSet =
+      new HashJoinMemoryCalculator.PartitionStatSet(partition1, partition2);
+
+    final int recordsPerPartitionBatchBuild = 10;
+
+    addBatches(partition1, recordsPerPartitionBatchBuild, 10, 4);
+    addBatches(partition2, recordsPerPartitionBatchBuild, 10, 4);
+
+    final double fragmentationFactor = 2.0;
+    final double safetyFactor = 1.5;
+    final long hashTableSize = 10;
+    final long hashJoinHelperSize = 10;
+
+    HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc =
+      new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl(
+        200,
+        15,
+        60,
+        20,
+        buildPartitionStatSet,
+        keySizes,
+        new MockHashTableSizeCalculator(hashTableSize),
+        new MockHashJoinHelperSizeCalculator(hashJoinHelperSize),
+        fragmentationFactor,
+        safetyFactor,
+        .75,
+        false);
+
+    calc.initialize();
+
+    long expected = 60 // maxProbeBatchSize
+      + 80 // in memory partition
+      + 10 // hash table size
+      + 10 // hash join helper size
+      + 15 // max partition probe batch size
+      + 20; // outgoing batch size
+
+    Assert.assertTrue(calc.shouldSpill());
+    partition1.spill();
+    Assert.assertFalse(calc.shouldSpill());
+    Assert.assertEquals(expected, calc.getConsumedMemory());
+    Assert.assertNotNull(calc.next());
+  }
+
+  @Test
+  public void testProbingAndPartitioningBuildSomeInMemory() {
+    final Map<String, Long> keySizes = org.apache.drill.common.map.CaseInsensitiveMap.newHashMap();
+
+    final PartitionStatImpl partition1 = new PartitionStatImpl();
+    final PartitionStatImpl partition2 = new PartitionStatImpl();
+    final PartitionStatImpl partition3 = new PartitionStatImpl();
+    final PartitionStatImpl partition4 = new PartitionStatImpl();
+    final HashJoinMemoryCalculator.PartitionStatSet buildPartitionStatSet =
+      new HashJoinMemoryCalculator.PartitionStatSet(partition1, partition2, partition3, partition4);
+
+    final int recordsPerPartitionBatchBuild = 10;
+
+    partition1.spill();
+    partition2.spill();
+    addBatches(partition3, recordsPerPartitionBatchBuild, 10, 4);
+    addBatches(partition4, recordsPerPartitionBatchBuild, 10, 4);
+
+    final double fragmentationFactor = 2.0;
+    final double safetyFactor = 1.5;
+    final long hashTableSize = 10;
+    final long hashJoinHelperSize = 10;
+
+    HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc =
+      new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl(
+        230,
+        15,
+        60,
+        20,
+        buildPartitionStatSet,
+        keySizes,
+        new MockHashTableSizeCalculator(hashTableSize),
+        new MockHashJoinHelperSizeCalculator(hashJoinHelperSize),
+        fragmentationFactor,
+        safetyFactor,
+        .75,
+        false);
+
+    calc.initialize();
+
+    long expected = 60 // maxProbeBatchSize
+      + 80 // in memory partition
+      + 10 // hash table size
+      + 10 // hash join helper size
+      + 15 * 3 // max batch size for each spill probe partition
+      + 20;
+    Assert.assertTrue(calc.shouldSpill());
+    partition3.spill();
+    Assert.assertFalse(calc.shouldSpill());
+    Assert.assertEquals(expected, calc.getConsumedMemory());
+    Assert.assertNotNull(calc.next());
+  }
+
+  @Test
+  public void testProbingAndPartitioningBuildNoneInMemory() {
+
+    final Map<String, Long> keySizes = org.apache.drill.common.map.CaseInsensitiveMap.newHashMap();
+
+    final PartitionStatImpl partition1 = new PartitionStatImpl();
+    final PartitionStatImpl partition2 = new PartitionStatImpl();
+    final HashJoinMemoryCalculator.PartitionStatSet buildPartitionStatSet =
+      new HashJoinMemoryCalculator.PartitionStatSet(partition1, partition2);
+
+    partition1.spill();
+    partition2.spill();
+
+    final double fragmentationFactor = 2.0;
+    final double safetyFactor = 1.5;
+    final long hashTableSize = 10;
+    final long hashJoinHelperSize = 10;
+
+    HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc =
+      new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl(
+        100,
+        15,
+        60,
+        20,
+        buildPartitionStatSet,
+        keySizes,
+        new MockHashTableSizeCalculator(hashTableSize),
+        new MockHashJoinHelperSizeCalculator(hashJoinHelperSize),
+        fragmentationFactor,
+        safetyFactor,
+        .75,
+        false);
+
+    calc.initialize();
+    Assert.assertFalse(calc.shouldSpill());
+    Assert.assertEquals(110, calc.getConsumedMemory());
+    Assert.assertNotNull(calc.next());
+  }
+
+  @Test // Make sure I don't fail
+  public void testMakeDebugString()
+  {
+    final Map<String, Long> keySizes = org.apache.drill.common.map.CaseInsensitiveMap.newHashMap();
+
+    final PartitionStatImpl partition1 = new PartitionStatImpl();
+    final PartitionStatImpl partition2 = new PartitionStatImpl();
+    final PartitionStatImpl partition3 = new PartitionStatImpl();
+    final PartitionStatImpl partition4 = new PartitionStatImpl();
+    final HashJoinMemoryCalculator.PartitionStatSet buildPartitionStatSet =
+      new HashJoinMemoryCalculator.PartitionStatSet(partition1, partition2, partition3, partition4);
+
+    final int recordsPerPartitionBatchBuild = 10;
+
+    partition1.spill();
+    partition2.spill();
+    addBatches(partition3, recordsPerPartitionBatchBuild, 10, 4);
+    addBatches(partition4, recordsPerPartitionBatchBuild, 10, 4);
+
+    final double fragmentationFactor = 2.0;
+    final double safetyFactor = 1.5;
+    final long hashTableSize = 10;
+    final long hashJoinHelperSize = 10;
+
+    HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc =
+      new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl(
+        230,
+        15,
+        60,
+        20,
+        buildPartitionStatSet,
+        keySizes,
+        new MockHashTableSizeCalculator(hashTableSize),
+        new MockHashJoinHelperSizeCalculator(hashJoinHelperSize),
+        fragmentationFactor,
+        safetyFactor,
+        .75,
+        false);
+
+    calc.initialize();
+  }
+
+  private void addBatches(PartitionStatImpl partitionStat,
+                          int recordsPerPartitionBatchBuild,
+                          long batchSize,
+                          int numBatches) {
+    for (int counter = 0; counter < numBatches; counter++) {
+      partitionStat.add(new HashJoinMemoryCalculator.BatchStat(
+        recordsPerPartitionBatchBuild, batchSize));
+    }
+  }
+
+  public static class MockHashTableSizeCalculator implements HashTableSizeCalculator {
+    private final long size;
+
+    public MockHashTableSizeCalculator(final long size) {
+      this.size = size;
+    }
+
+    @Override
+    public long calculateSize(HashJoinMemoryCalculator.PartitionStat partitionStat,
+                              Map<String, Long> keySizes,
+                              double loadFactor, double safetyFactor, double fragmentationFactor) {
+      return size;
+    }
+
+    @Override
+    public double getDoublingFactor() {
+      return HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR;
+    }
+
+    @Override
+    public String getType() {
+      return null;
+    }
+  }
+
+  public static class MockHashJoinHelperSizeCalculator implements HashJoinHelperSizeCalculator {
+    private final long size;
+
+    public MockHashJoinHelperSizeCalculator(final long size)
+    {
+      this.size = size;
+    }
+
+    @Override
+    public long calculateSize(HashJoinMemoryCalculator.PartitionStat partitionStat, double fragmentationFactor) {
+      return size;
+    }
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestBatchValidator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestBatchValidator.java
index ff33c94..6982176 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestBatchValidator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestBatchValidator.java
@@ -32,12 +32,14 @@ import org.apache.drill.exec.vector.RepeatedVarCharVector;
 import org.apache.drill.exec.vector.UInt4Vector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VarCharVector;
+import org.apache.drill.test.BaseDirTestWatcher;
 import org.apache.drill.test.LogFixture;
 import org.apache.drill.test.OperatorFixture;
 import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
 import org.apache.drill.test.rowSet.schema.SchemaBuilder;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Test;
 
 import ch.qos.logback.classic.Level;
@@ -47,13 +49,16 @@ public class TestBatchValidator /* TODO: extends SubOperatorTest */ {
   protected static OperatorFixture fixture;
   protected static LogFixture logFixture;
 
+  @ClassRule
+  public static final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
+
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     logFixture = LogFixture.builder()
         .toConsole()
         .logger(BatchValidator.class, Level.TRACE)
         .build();
-    fixture = OperatorFixture.standardFixture();
+    fixture = OperatorFixture.standardFixture(dirTestWatcher);
   }
 
   @AfterClass
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
index 4fe650c..6924810 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
@@ -39,6 +39,7 @@ import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.test.BaseDirTestWatcher;
 import org.apache.drill.test.DrillTest;
 import org.apache.drill.test.OperatorFixture;
 import org.apache.drill.test.rowSet.DirectRowSet;
@@ -51,6 +52,7 @@ import org.apache.drill.test.rowSet.RowSetBuilder;
 import org.apache.drill.test.rowSet.RowSetComparison;
 import org.apache.drill.test.rowSet.RowSetReader;
 import org.apache.drill.test.rowSet.RowSetWriter;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -67,6 +69,9 @@ import io.netty.buffer.DrillBuf;
 @Category(OperatorTest.class)
 public class TestSortImpl extends DrillTest {
 
+  @Rule
+  public final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
+
   /**
    * Create the sort implementation to be used by test.
    *
@@ -209,7 +214,7 @@ public class TestSortImpl extends DrillTest {
 
   @Test
   public void testNullInput() throws Exception {
-    try (OperatorFixture fixture = OperatorFixture.standardFixture()) {
+    try (OperatorFixture fixture = OperatorFixture.standardFixture(dirTestWatcher)) {
       SortTestFixture sortTest = new SortTestFixture(fixture);
       sortTest.run();
     }
@@ -222,7 +227,7 @@ public class TestSortImpl extends DrillTest {
 
   @Test
   public void testEmptyInput() throws Exception {
-    try (OperatorFixture fixture = OperatorFixture.standardFixture()) {
+    try (OperatorFixture fixture = OperatorFixture.standardFixture(dirTestWatcher)) {
       BatchSchema schema = SortTestUtilities.nonNullSchema();
       SortTestFixture sortTest = new SortTestFixture(fixture);
       sortTest.addInput(fixture.rowSetBuilder(schema)
@@ -238,7 +243,7 @@ public class TestSortImpl extends DrillTest {
 
   @Test
   public void testSingleRow() throws Exception {
-    try (OperatorFixture fixture = OperatorFixture.standardFixture()) {
+    try (OperatorFixture fixture = OperatorFixture.standardFixture(dirTestWatcher)) {
       BatchSchema schema = SortTestUtilities.nonNullSchema();
       SortTestFixture sortTest = new SortTestFixture(fixture);
       sortTest.addInput(fixture.rowSetBuilder(schema)
@@ -258,7 +263,7 @@ public class TestSortImpl extends DrillTest {
 
   @Test
   public void testSingleBatch() throws Exception {
-    try (OperatorFixture fixture = OperatorFixture.standardFixture()) {
+    try (OperatorFixture fixture = OperatorFixture.standardFixture(dirTestWatcher)) {
       BatchSchema schema = SortTestUtilities.nonNullSchema();
       SortTestFixture sortTest = new SortTestFixture(fixture);
       sortTest.addInput(fixture.rowSetBuilder(schema)
@@ -281,7 +286,7 @@ public class TestSortImpl extends DrillTest {
 
   @Test
   public void testTwoBatches() throws Exception {
-    try (OperatorFixture fixture = OperatorFixture.standardFixture()) {
+    try (OperatorFixture fixture = OperatorFixture.standardFixture(dirTestWatcher)) {
       BatchSchema schema = SortTestUtilities.nonNullSchema();
       SortTestFixture sortTest = new SortTestFixture(fixture);
       sortTest.addInput(fixture.rowSetBuilder(schema)
@@ -471,7 +476,7 @@ public class TestSortImpl extends DrillTest {
    */
   @Test
   public void testModerateBatch() throws Exception {
-    try (OperatorFixture fixture = OperatorFixture.standardFixture()) {
+    try (OperatorFixture fixture = OperatorFixture.standardFixture(dirTestWatcher)) {
       runJumboBatchTest(fixture, 1000);
     }
   }
@@ -485,7 +490,7 @@ public class TestSortImpl extends DrillTest {
 
   @Test
   public void testLargeBatch() throws Exception {
-    try (OperatorFixture fixture = OperatorFixture.standardFixture()) {
+    try (OperatorFixture fixture = OperatorFixture.standardFixture(dirTestWatcher)) {
 //      partyOnMemory(fixture.allocator());
       runJumboBatchTest(fixture, ValueVector.MAX_ROW_COUNT);
     }
@@ -566,7 +571,7 @@ public class TestSortImpl extends DrillTest {
 
   @Test
   public void testWideRows() throws Exception {
-    try (OperatorFixture fixture = OperatorFixture.standardFixture()) {
+    try (OperatorFixture fixture = OperatorFixture.standardFixture(dirTestWatcher)) {
       runWideRowsTest(fixture, 1000, ValueVector.MAX_ROW_COUNT);
     }
   }
@@ -586,7 +591,7 @@ public class TestSortImpl extends DrillTest {
 
   @Test
   public void testSpill() throws Exception {
-    OperatorFixture.Builder builder = OperatorFixture.builder();
+    OperatorFixture.Builder builder = OperatorFixture.builder(dirTestWatcher);
     builder.configBuilder()
       .put(ExecConstants.EXTERNAL_SORT_BATCH_LIMIT, 2);
     try (OperatorFixture fixture = builder.build()) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java
index 88a7b99..b09b865 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.config.Sort;
 import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.test.BaseDirTestWatcher;
 import org.apache.drill.test.DrillTest;
 import org.apache.drill.test.OperatorFixture;
 import org.apache.drill.test.rowSet.RowSet;
@@ -46,7 +47,9 @@ import org.apache.drill.test.rowSet.RowSetUtilities;
 import org.joda.time.Period;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
@@ -61,9 +64,12 @@ public class TestSorter extends DrillTest {
 
   public static OperatorFixture fixture;
 
+  @ClassRule
+  public static final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
+
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
-    fixture = OperatorFixture.builder().build();
+    fixture = OperatorFixture.builder(dirTestWatcher).build();
   }
 
   @AfterClass
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
index 16081be..66354ff 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
@@ -42,6 +42,7 @@ import org.apache.drill.exec.server.QueryProfileStoreContext;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.easy.json.JSONRecordReader;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
+import org.apache.drill.test.BaseDirTestWatcher;
 import org.apache.drill.test.DrillTestWrapper;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -70,6 +71,7 @@ import org.apache.drill.exec.rpc.NamedThreadFactory;
 import org.apache.drill.test.OperatorFixture;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.mockito.Mockito;
 
 import java.io.IOException;
@@ -96,6 +98,9 @@ public class PhysicalOpUnitTestBase extends ExecTest {
   protected ExecutorService scanExecutor;
   protected ExecutorService scanDecodeExecutor;
 
+  @Rule
+  public final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
+
   private final DrillConfig drillConf = DrillConfig.create();
   private final ScanResult classpathScan = ClassPathScanner.fromPrescan(drillConf);
   private final OperatorCreatorRegistry opCreatorReg = new OperatorCreatorRegistry(classpathScan);
@@ -109,7 +114,7 @@ public class PhysicalOpUnitTestBase extends ExecTest {
     Mockito.when(drillbitContext.getScanExecutor()).thenReturn(scanExecutor);
     Mockito.when(drillbitContext.getScanDecodeExecutor()).thenReturn(scanDecodeExecutor);
 
-    final OperatorFixture.Builder builder = new OperatorFixture.Builder();
+    final OperatorFixture.Builder builder = new OperatorFixture.Builder(dirTestWatcher);
     builder.configBuilder().configProps(drillConf);
     operatorFixture = builder
       .setScanExecutor(scanExecutor)
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java
index aaeb358..c538ecd 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java
@@ -19,20 +19,30 @@ package org.apache.drill.exec.record;
 
 import static org.junit.Assert.fail;
 
+import com.google.common.collect.Lists;
 import org.apache.drill.categories.VectorTest;
+import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.memory.RootAllocator;
+import org.apache.drill.test.BaseDirTestWatcher;
 import org.apache.drill.test.DrillTest;
 import org.apache.drill.test.OperatorFixture;
 import org.apache.drill.test.rowSet.DirectRowSet;
 import org.apache.drill.test.rowSet.RowSet;
 import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
 import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.apache.drill.test.rowSet.RowSetBuilder;
 import org.apache.drill.test.rowSet.RowSetComparison;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import java.util.List;
+
 @Category(VectorTest.class)
 public class TestVectorContainer extends DrillTest {
 
@@ -40,9 +50,12 @@ public class TestVectorContainer extends DrillTest {
   // once that is available.
   protected static OperatorFixture fixture;
 
+  @ClassRule
+  public static final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
+
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
-    fixture = OperatorFixture.standardFixture();
+    fixture = OperatorFixture.standardFixture(dirTestWatcher);
   }
 
   @AfterClass
@@ -124,4 +137,30 @@ public class TestVectorContainer extends DrillTest {
     leftIndirect.clear();
     right.clear();
   }
+
+  @Test
+  public void testPrettyPrintRecord() {
+    final MaterializedField colA = MaterializedField.create("colA", Types.required(TypeProtos.MinorType.INT));
+    final MaterializedField colB = MaterializedField.create("colB", Types.required(TypeProtos.MinorType.VARCHAR));
+    final MaterializedField colC = MaterializedField.create("colC", Types.repeated(TypeProtos.MinorType.FLOAT4));
+    final MaterializedField colD = MaterializedField.create("colD", Types.repeated(TypeProtos.MinorType.VARCHAR));
+    final List<MaterializedField> cols = Lists.newArrayList(colA, colB, colC, colD);
+    final BatchSchema batchSchema = new BatchSchema(BatchSchema.SelectionVectorMode.NONE, cols);
+
+    try (RootAllocator allocator = new RootAllocator(10_000_000)) {
+      final RowSet rowSet = new RowSetBuilder(allocator, batchSchema)
+        .addRow(110, "green", new float[]{5.5f, 2.3f}, new String[]{"1a", "1b"})
+        .addRow(1440, "yellow", new float[]{1.0f}, new String[]{"dog"})
+        .build();
+
+      final String expected = "[\"colA\" = 110, \"colB\" = green, \"colC\" = [5.5,2.3], \"colD\" = [\"1a\",\"1b\"]]";
+      final String actual = rowSet.container().prettyPrintRecord(0);
+
+      try {
+        Assert.assertEquals(expected, actual);
+      } finally {
+        rowSet.clear();
+      }
+    }
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/util/TestQueryMemoryAlloc.java b/exec/java-exec/src/test/java/org/apache/drill/exec/util/TestQueryMemoryAlloc.java
index a62cc41..14a9447 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/util/TestQueryMemoryAlloc.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/util/TestQueryMemoryAlloc.java
@@ -21,8 +21,10 @@ import static org.junit.Assert.assertEquals;
 
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.test.BaseDirTestWatcher;
 import org.apache.drill.test.DrillTest;
 import org.apache.drill.test.OperatorFixture;
+import org.junit.Rule;
 import org.junit.Test;
 
 public class TestQueryMemoryAlloc extends DrillTest {
@@ -30,9 +32,12 @@ public class TestQueryMemoryAlloc extends DrillTest {
   public static final long ONE_MB = 1024 * 1024;
   public static final long ONE_GB = 1024L * ONE_MB;
 
+  @Rule
+  public final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
+
   @Test
   public void testDefaultOptions() throws Exception {
-    OperatorFixture.Builder builder = OperatorFixture.builder();
+    OperatorFixture.Builder builder = OperatorFixture.builder(dirTestWatcher);
     builder.systemOption(ExecConstants.PERCENT_MEMORY_PER_QUERY_KEY, 0.05);
     builder.systemOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY, 2 * ONE_GB);
 
@@ -60,7 +65,7 @@ public class TestQueryMemoryAlloc extends DrillTest {
 
   @Test
   public void testCustomFloor() throws Exception {
-    OperatorFixture.Builder builder = OperatorFixture.builder();
+    OperatorFixture.Builder builder = OperatorFixture.builder(dirTestWatcher);
     builder.systemOption(ExecConstants.PERCENT_MEMORY_PER_QUERY_KEY, 0.05);
     builder.systemOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY, 3 * ONE_GB);
 
@@ -88,7 +93,7 @@ public class TestQueryMemoryAlloc extends DrillTest {
 
   @Test
   public void testCustomPercent() throws Exception {
-    OperatorFixture.Builder builder = OperatorFixture.builder();
+    OperatorFixture.Builder builder = OperatorFixture.builder(dirTestWatcher);
     builder.systemOption(ExecConstants.PERCENT_MEMORY_PER_QUERY_KEY, 0.10);
     builder.systemOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY, 2 * ONE_GB);
 
@@ -126,7 +131,7 @@ public class TestQueryMemoryAlloc extends DrillTest {
 
   @Test
   public void testOpMemory() throws Exception {
-    OperatorFixture.Builder builder = OperatorFixture.builder();
+    OperatorFixture.Builder builder = OperatorFixture.builder(dirTestWatcher);
     builder.systemOption(ExecConstants.CPU_LOAD_AVERAGE_KEY, 0.7);
     builder.systemOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY, 10);
     builder.systemOption(ExecConstants.MIN_MEMORY_PER_BUFFERED_OP_KEY, 40 * ONE_MB);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/BaseDirTestWatcher.java b/exec/java-exec/src/test/java/org/apache/drill/test/BaseDirTestWatcher.java
index 21b4a64..ee2244a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/BaseDirTestWatcher.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/BaseDirTestWatcher.java
@@ -66,6 +66,8 @@ public class BaseDirTestWatcher extends DirTestWatcher {
     TEST_TMP // Corresponds to the directory that should be mapped to dfs.tmp
   }
 
+  private File codegenDir;
+  private File spillDir;
   private File tmpDir;
   private File storeDir;
   private File dfsTestTmpParentDir;
@@ -91,6 +93,8 @@ public class BaseDirTestWatcher extends DirTestWatcher {
   protected void starting(Description description) {
     super.starting(description);
 
+    codegenDir = makeSubDir(Paths.get("codegen"));
+    spillDir = makeSubDir(Paths.get("spill"));
     rootDir = makeSubDir(Paths.get("root"));
     tmpDir = makeSubDir(Paths.get("tmp"));
     storeDir = makeSubDir(Paths.get("store"));
@@ -104,6 +108,8 @@ public class BaseDirTestWatcher extends DirTestWatcher {
    */
   public void clear() {
     try {
+      FileUtils.cleanDirectory(codegenDir);
+      FileUtils.cleanDirectory(spillDir);
       FileUtils.cleanDirectory(rootDir);
       FileUtils.cleanDirectory(tmpDir);
       FileUtils.cleanDirectory(storeDir);
@@ -146,6 +152,18 @@ public class BaseDirTestWatcher extends DirTestWatcher {
   }
 
   /**
+   * Gets the temp directory that should be used to save generated code files.
+   * @return The temp directory that should be used to save generated code files.
+   */
+  public File getCodegenDir() {
+    return codegenDir;
+  }
+
+  public File getSpillDir() {
+    return spillDir;
+  }
+
+  /**
    * This methods creates a new directory which can be mapped to <b>dfs.tmp</b>.
    */
   public void newDfsTestTmpDir() {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ConfigBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/ConfigBuilder.java
index 02fac0a..4df7af5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ConfigBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ConfigBuilder.java
@@ -130,7 +130,12 @@ public class ConfigBuilder {
       configProps = createDefaultProperties();
     }
 
-    configProps.put(key, value.toString());
+    if (value instanceof Collection) {
+      configProps.put(key, value);
+    } else {
+      configProps.put(key, value.toString());
+    }
+
     return this;
   }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
index bb63277..8e4e4ff 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
@@ -21,6 +21,14 @@ import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import io.netty.buffer.DrillBuf;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.config.DrillConfig;
@@ -28,6 +36,7 @@ import org.apache.drill.common.scanner.ClassPathScanner;
 import org.apache.drill.common.scanner.persistence.ScanResult;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.compile.ClassBuilder;
 import org.apache.drill.exec.compile.CodeCompiler;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
@@ -113,8 +122,17 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
     protected ExecutorService scanExecutor;
     protected ExecutorService scanDecoderExecutor;
 
-    public Builder()
+    public Builder(BaseDirTestWatcher dirTestWatcher)
     {
+      // Set defaults for tmp dirs correctly
+
+      if (dirTestWatcher != null) {
+        configBuilder.put(ClassBuilder.CODE_DIR_OPTION, dirTestWatcher.getCodegenDir().getAbsolutePath());
+        configBuilder.put(ExecConstants.DRILL_TMP_DIR, dirTestWatcher.getTmpDir().getAbsolutePath());
+        configBuilder.put(ExecConstants.SYS_STORE_PROVIDER_LOCAL_PATH, dirTestWatcher.getStoreDir().getAbsolutePath());
+        configBuilder.put(ExecConstants.SPILL_DIRS, Lists.newArrayList(dirTestWatcher.getSpillDir().getAbsolutePath()));
+        configBuilder.put(ExecConstants.HASHJOIN_SPILL_DIRS, Lists.newArrayList(dirTestWatcher.getSpillDir().getAbsolutePath()));
+      }
     }
 
     public ConfigBuilder configBuilder() {
@@ -364,8 +382,8 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
     options.close();
   }
 
-  public static Builder builder() {
-    Builder builder = new Builder();
+  public static Builder builder(BaseDirTestWatcher dirTestWatcher) {
+    Builder builder = new Builder(dirTestWatcher);
     builder.configBuilder()
       // Required to avoid Dynamic UDF calls for missing or
       // ambiguous functions.
@@ -373,8 +391,8 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
     return builder;
   }
 
-  public static OperatorFixture standardFixture() {
-    return builder().build();
+  public static OperatorFixture standardFixture(BaseDirTestWatcher dirTestWatcher) {
+    return builder(dirTestWatcher).build();
   }
 
   public RowSetBuilder rowSetBuilder(BatchSchema schema) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/SubOperatorTest.java b/exec/java-exec/src/test/java/org/apache/drill/test/SubOperatorTest.java
index 6bc2afc..c93853f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/SubOperatorTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/SubOperatorTest.java
@@ -19,14 +19,18 @@ package org.apache.drill.test;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.ClassRule;
 
 public class SubOperatorTest extends DrillTest {
 
   protected static OperatorFixture fixture;
 
+  @ClassRule
+  public static final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
+
   @BeforeClass
   public static void classSetup() throws Exception {
-    fixture = OperatorFixture.standardFixture();
+    fixture = OperatorFixture.standardFixture(dirTestWatcher);
   }
 
   @AfterClass
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java
index 92ebdd5..7a027ad 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java
@@ -271,7 +271,7 @@ public class PerformanceTool {
   }
 
   public static void main(String args[]) {
-    try (OperatorFixture fixture = OperatorFixture.standardFixture();) {
+    try (OperatorFixture fixture = OperatorFixture.standardFixture(null);) {
       for (int i = 0; i < 2; i++) {
         System.out.println((i==0) ? "Warmup" : "Test run");
         new RequiredVectorTester(fixture).runTest();
diff --git a/exec/vector/src/main/codegen/templates/FixedValueVectors.java b/exec/vector/src/main/codegen/templates/FixedValueVectors.java
index 68ee33b..4a98c26 100644
--- a/exec/vector/src/main/codegen/templates/FixedValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/FixedValueVectors.java
@@ -298,6 +298,11 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
     return valueCount * ${type.width};
   }
 
+  @Override
+  public int getValueWidth() {
+    return ${type.width};
+  }
+
   private class TransferImpl implements TransferPair {
     private ${minor.class}Vector to;
 
diff --git a/exec/vector/src/main/codegen/templates/NullableValueVectors.java b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
index c9c0987..6d386a6 100644
--- a/exec/vector/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
@@ -213,6 +213,13 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     return bits.getPayloadByteCount(valueCount) + values.getPayloadByteCount(valueCount);
   }
 
+  <#if type.major != "VarLen">
+  @Override
+  public int getValueWidth(){
+    return bits.getValueWidth() + ${type.width};
+  }
+  </#if>
+
   <#if type.major == "VarLen">
   @Override
   public void allocateNew(int totalBytes, int valueCount) {
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java
index ca2be3a..2473556 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java
@@ -211,6 +211,11 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
     data.setZero(0, data.capacity());
   }
 
+  @Override
+  public int getValueWidth() {
+    return VALUE_WIDTH;
+  }
+
   public void copyFrom(int inIndex, int outIndex, BitVector from) {
     this.mutator.set(outIndex, from.accessor.get(inIndex));
   }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java
index 09bcdd8..e151e30 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java
@@ -30,4 +30,10 @@ public interface FixedWidthVector extends ValueVector {
    * Zero out the underlying buffer backing this vector.
    */
   void zeroVector();
+
+  /**
+   * The width of a record in bytes.
+   * @return The width of a record in bytes.
+   */
+  int getValueWidth();
 }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/UntypedNullVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/UntypedNullVector.java
index e2f9f77..8e3781e 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/UntypedNullVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/UntypedNullVector.java
@@ -91,6 +91,11 @@ public final class UntypedNullVector extends BaseDataValueVector implements Fixe
   }
 
   @Override
+  public int getValueWidth() {
+    return VALUE_WIDTH;
+  }
+
+  @Override
   public void load(SerializedField metadata, DrillBuf buffer) {
     Preconditions.checkArgument(this.field.getName().equals(metadata.getNamePart().getName()),
         "The field %s doesn't match the provided metadata %s.", this.field, metadata);

-- 
To stop receiving notification emails like this one, please contact
boaz@apache.org.