You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by bo...@apache.org on 2018/05/17 21:57:57 UTC
[drill] 02/03: DRILL-6027: - Added memory claculator - Added unit
tests and docs. - Fixed IOB caused by output vector allocation. - Don't
double count records that were spilled in HashJoin
This is an automated email from the ASF dual-hosted git repository.
boaz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit 349ac5ad431a6814f0a065bf16dee635cc6d3274
Author: Timothy Farkas <ti...@apache.org>
AuthorDate: Mon Feb 12 16:49:29 2018 -0800
DRILL-6027:
- Added memory claculator
- Added unit tests and docs.
- Fixed IOB caused by output vector allocation.
- Don't double count records that were spilled in HashJoin
---
.../java/org/apache/drill/exec/ExecConstants.java | 16 +-
.../drill/exec/physical/config/HashJoinPOP.java | 14 +-
.../physical/impl/aggregate/HashAggTemplate.java | 1 -
.../physical/impl/common/ChainedHashTable.java | 8 +-
.../exec/physical/impl/common/HashPartition.java | 111 ++-
.../drill/exec/physical/impl/common/HashTable.java | 80 +-
.../impl/common/HashTableAllocationTracker.java | 73 ++
.../exec/physical/impl/common/HashTableConfig.java | 33 +-
.../physical/impl/common/HashTableTemplate.java | 116 +--
.../exec/physical/impl/join/HashJoinBatch.java | 468 ++++++++----
.../exec/physical/impl/join/HashJoinHelper.java | 12 +
.../impl/join/HashJoinHelperSizeCalculator.java} | 19 +-
.../join/HashJoinHelperSizeCalculatorImpl.java | 51 ++
.../impl/join/HashJoinMemoryCalculator.java | 320 ++++++++
.../impl/join/HashJoinMemoryCalculatorImpl.java | 833 +++++++++++++++++++++
.../exec/physical/impl/join/HashJoinState.java | 42 ++
.../impl/join/HashJoinStateCalculator.java} | 25 +-
.../impl/join/HashTableSizeCalculator.java} | 26 +-
.../HashTableSizeCalculatorConservativeImpl.java | 96 +++
.../impl/join/HashTableSizeCalculatorLeanImpl.java | 109 +++
.../join/MechanicalHashJoinMemoryCalculator.java | 163 ++++
.../drill/exec/planner/logical/DrillJoinRel.java | 3 +-
.../drill/exec/record/AbstractRecordBatch.java | 8 +
.../apache/drill/exec/record/RecordBatchSizer.java | 42 ++
.../apache/drill/exec/record/VectorContainer.java | 24 +
.../exec/server/options/SystemOptionManager.java | 9 +-
.../java-exec/src/main/resources/drill-module.conf | 8 +-
.../drill/exec/cache/TestBatchSerialization.java | 5 +-
.../exec/compile/TestLargeFileCompilation.java | 1 +
.../physical/impl/common/HashPartitionTest.java | 306 ++++++++
.../common/HashTableAllocationTrackerTest.java | 101 +++
.../exec/physical/impl/join/PartitionStatImpl.java | 79 ++
.../impl/join/TestBuildSidePartitioningImpl.java | 289 +++++++
.../join/TestHashJoinHelperSizeCalculatorImpl.java | 44 ++
.../join/TestHashJoinMemoryCalculatorImpl.java | 82 ++
.../exec/physical/impl/join/TestHashJoinSpill.java | 2 -
...estHashTableSizeCalculatorConservativeImpl.java | 67 ++
.../join/TestHashTableSizeCalculatorLeanImpl.java | 66 ++
.../exec/physical/impl/join/TestPartitionStat.java | 57 ++
.../impl/join/TestPostBuildCalculationsImpl.java | 434 +++++++++++
.../physical/impl/validate/TestBatchValidator.java | 7 +-
.../physical/impl/xsort/managed/TestSortImpl.java | 23 +-
.../physical/impl/xsort/managed/TestSorter.java | 8 +-
.../exec/physical/unit/PhysicalOpUnitTestBase.java | 7 +-
.../drill/exec/record/TestVectorContainer.java | 41 +-
.../drill/exec/util/TestQueryMemoryAlloc.java | 13 +-
.../org/apache/drill/test/BaseDirTestWatcher.java | 18 +
.../java/org/apache/drill/test/ConfigBuilder.java | 7 +-
.../org/apache/drill/test/OperatorFixture.java | 28 +-
.../org/apache/drill/test/SubOperatorTest.java | 6 +-
.../drill/test/rowSet/test/PerformanceTool.java | 2 +-
.../main/codegen/templates/FixedValueVectors.java | 5 +
.../codegen/templates/NullableValueVectors.java | 7 +
.../org/apache/drill/exec/vector/BitVector.java | 5 +
.../apache/drill/exec/vector/FixedWidthVector.java | 6 +
.../drill/exec/vector/UntypedNullVector.java | 5 +
56 files changed, 4121 insertions(+), 310 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 57ecdf2..c48f414 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -104,16 +104,22 @@ public final class ExecConstants {
public static final BooleanValidator EXTERNAL_SORT_DISABLE_MANAGED_OPTION = new BooleanValidator("exec.sort.disable_managed");
// Hash Join Options
+ public static String HASHJOIN_HASHTABLE_CALC_TYPE_KEY = "exec.hashjoin.hash_table_calc_type";
+ public static StringValidator HASHJOIN_HASHTABLE_CALC_TYPE = new StringValidator(HASHJOIN_HASHTABLE_CALC_TYPE_KEY);
+ public static String HASHJOIN_SAFETY_FACTOR_KEY = "exec.hashjoin.safety_factor";
+ public static DoubleValidator HASHJOIN_SAFETY_FACTOR = new RangeDoubleValidator(HASHJOIN_SAFETY_FACTOR_KEY, 1.0, Double.MAX_VALUE);
+ public static String HASHJOIN_HASH_DOUBLE_FACTOR_KEY = "exec.hashjoin.hash_double_factor";
+ public static DoubleValidator HASHJOIN_HASH_DOUBLE_FACTOR = new RangeDoubleValidator(HASHJOIN_HASH_DOUBLE_FACTOR_KEY, 1.0, Double.MAX_VALUE);
+ public static String HASHJOIN_FRAGMENTATION_FACTOR_KEY = "exec.hashjoin.fragmentation_factor";
+ public static DoubleValidator HASHJOIN_FRAGMENTATION_FACTOR = new RangeDoubleValidator(HASHJOIN_FRAGMENTATION_FACTOR_KEY, 1.0, Double.MAX_VALUE);
public static final String HASHJOIN_NUM_ROWS_IN_BATCH_KEY = "exec.hashjoin.num_rows_in_batch";
public static final LongValidator HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR = new RangeLongValidator(HASHJOIN_NUM_ROWS_IN_BATCH_KEY, 1, 65536);
public static final String HASHJOIN_MAX_BATCHES_IN_MEMORY_KEY = "exec.hashjoin.max_batches_in_memory";
- public static final LongValidator HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR = new RangeLongValidator(HASHJOIN_MAX_BATCHES_IN_MEMORY_KEY, 1, 65536);
- public static final String HASHJOIN_MAX_BATCHES_PER_PARTITION_KEY = "exec.hashjoin.max_batches_per_partition";
- public static final LongValidator HASHJOIN_MAX_BATCHES_PER_PARTITION_VALIDATOR = new RangeLongValidator(HASHJOIN_MAX_BATCHES_PER_PARTITION_KEY, 1, 65536);
+ public static final LongValidator HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR = new RangeLongValidator(HASHJOIN_MAX_BATCHES_IN_MEMORY_KEY, 0, 65536);
public static final String HASHJOIN_NUM_PARTITIONS_KEY = "exec.hashjoin.num_partitions";
public static final LongValidator HASHJOIN_NUM_PARTITIONS_VALIDATOR = new RangeLongValidator(HASHJOIN_NUM_PARTITIONS_KEY, 1, 128); // 1 means - no spilling
- public static final String HASHJOIN_MAX_MEMORY_KEY = "exec.hashjoin.mem_limit";
- public static final LongValidator HASHJOIN_MAX_MEMORY_VALIDATOR = new RangeLongValidator(HASHJOIN_MAX_MEMORY_KEY, 0, Integer.MAX_VALUE);
+ public static final String HASHJOIN_MAX_MEMORY_KEY = "drill.exec.hashjoin.mem_limit";
+ public static final LongValidator HASHJOIN_MAX_MEMORY_VALIDATOR = new RangeLongValidator(HASHJOIN_MAX_MEMORY_KEY, 0L, Long.MAX_VALUE);
public static final String HASHJOIN_SPILL_DIRS = "drill.exec.hashjoin.spill.directories";
public static final String HASHJOIN_SPILL_FILESYSTEM = "drill.exec.hashjoin.spill.fs";
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
index 67ef7f5..b56950a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
@@ -45,7 +45,9 @@ public class HashJoinPOP extends AbstractJoinPop {
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.size() == 2);
- return new HashJoinPOP(children.get(0), children.get(1), conditions, joinType);
+ HashJoinPOP newHashJoin = new HashJoinPOP(children.get(0), children.get(1), conditions, joinType);
+ newHashJoin.setMaxAllocation(getMaxAllocation());
+ return newHashJoin;
}
public HashJoinPOP flipIfRight() {
@@ -64,4 +66,14 @@ public class HashJoinPOP extends AbstractJoinPop {
public int getOperatorType() {
return CoreOperatorType.HASH_JOIN_VALUE;
}
+
+ @Override
+ public void setMaxAllocation(long maxAllocation) {
+ this.maxAllocation = maxAllocation;
+ }
+
+ @Override
+ public boolean isBufferedOperator() {
+ return true;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 368fd2c..4f6a117 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -459,7 +459,6 @@ public abstract class HashAggTemplate implements HashAggregator {
for (int i = 0; i < numPartitions; i++ ) {
try {
this.htables[i] = baseHashTable.createAndSetupHashTable(groupByOutFieldIds);
- this.htables[i].setMaxVarcharSize(maxColumnWidth);
} catch (ClassTransformationException e) {
throw UserException.unsupportedError(e)
.message("Code generation error - likely an error in the code.")
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
index 89e32a8..a14bf8c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
@@ -53,7 +53,9 @@ import org.apache.drill.exec.vector.ValueVector;
import com.sun.codemodel.JConditional;
import com.sun.codemodel.JExpr;
-
+/**
+ * This is a master class used to generate code for {@link HashTable}s.
+ */
public class ChainedHashTable {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ChainedHashTable.class);
@@ -282,9 +284,7 @@ public class ChainedHashTable {
int i = 0;
for (LogicalExpression expr : keyExprs) {
- boolean useSetSafe = !Types.isFixedWidthType(expr.getMajorType()) || Types.isRepeated(expr.getMajorType());
- ValueVectorWriteExpression vvwExpr = new ValueVectorWriteExpression(htKeyFieldIds[i++], expr, useSetSafe);
-
+ ValueVectorWriteExpression vvwExpr = new ValueVectorWriteExpression(htKeyFieldIds[i++], expr, true);
cg.addExpr(vvwExpr, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
index 5b4adf1..5d0197a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.physical.impl.common;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.drill.common.exceptions.RetryAfterSpillException;
import org.apache.drill.common.exceptions.UserException;
@@ -27,12 +28,13 @@ import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.join.HashJoinBatch;
import org.apache.drill.exec.physical.impl.join.HashJoinHelper;
+import org.apache.drill.exec.physical.impl.join.HashJoinMemoryCalculator;
import org.apache.drill.exec.physical.impl.spill.SpillSet;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchSizer;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.WritableBatch;
@@ -51,8 +53,8 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
/**
- * The class HashPartition
- *
+ * <h2>Overview</h2>
+ * <p>
* Created to represent an active partition for the Hash-Join operator
* (active means: currently receiving data, or its data is being probed; as opposed to fully
* spilled partitions).
@@ -61,16 +63,19 @@ import java.util.concurrent.TimeUnit;
* If all this partition's build/inner data was spilled, then it begins to work as an outer
* partition (see the flag "processingOuter") -- reusing some of the fields (e.g., currentBatch,
* currHVVector, writer, spillFile, partitionBatchesCount) for the outer.
+ * </p>
*/
-public class HashPartition {
+public class HashPartition implements HashJoinMemoryCalculator.PartitionStat {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashPartition.class);
+ public static final String HASH_VALUE_COLUMN_NAME = "Hash_Values";
+
private int partitionNum = -1; // the current number of this partition, as used by the operator
private static final int VARIABLE_MIN_WIDTH_VALUE_SIZE = 8;
private int maxColumnWidth = VARIABLE_MIN_WIDTH_VALUE_SIZE; // to control memory allocation for varchars
- private final MajorType HVtype = MajorType.newBuilder()
+ public static final MajorType HVtype = MajorType.newBuilder()
.setMinorType(MinorType.INT /* dataType */ )
.setMode(DataMode.REQUIRED /* mode */ )
.build();
@@ -102,7 +107,6 @@ public class HashPartition {
private String spillFile;
private BufferAllocator allocator;
- private FragmentContext context;
private int RECORDS_PER_BATCH;
ChainedHashTable baseHashTable;
private SpillSet spillSet;
@@ -111,14 +115,14 @@ public class HashPartition {
private boolean outerBatchNotNeeded; // when the inner is whole in memory
private RecordBatch buildBatch;
private RecordBatch probeBatch;
- private HashJoinBatch.inMemBatchCounter inMemBatches; // shared among all partitions
private int cycleNum;
+ private List<HashJoinMemoryCalculator.BatchStat> inMemoryBatchStats = Lists.newArrayList();
+ private long partitionInMemorySize;
+ private long numInMemoryRecords;
public HashPartition(FragmentContext context, BufferAllocator allocator, ChainedHashTable baseHashTable,
RecordBatch buildBatch, RecordBatch probeBatch,
- int recordsPerBatch, SpillSet spillSet, int partNum,
- HashJoinBatch.inMemBatchCounter inMemBatches, int cycleNum) {
- this.context = context;
+ int recordsPerBatch, SpillSet spillSet, int partNum, int cycleNum) {
this.allocator = allocator;
this.baseHashTable = baseHashTable;
this.buildBatch = buildBatch;
@@ -126,12 +130,10 @@ public class HashPartition {
this.RECORDS_PER_BATCH = recordsPerBatch;
this.spillSet = spillSet;
this.partitionNum = partNum;
- this.inMemBatches = inMemBatches;
this.cycleNum = cycleNum;
try {
this.hashTable = baseHashTable.createAndSetupHashTable(null);
- this.hashTable.setMaxVarcharSize(maxColumnWidth);
} catch (ClassTransformationException e) {
throw UserException.unsupportedError(e)
.message("Code generation error - likely an error in the code.")
@@ -173,7 +175,6 @@ public class HashPartition {
if (newVV instanceof FixedWidthVector) {
((FixedWidthVector) newVV).allocateNew(RECORDS_PER_BATCH);
} else if (newVV instanceof VariableWidthVector) {
- // Need to check - (is this case ever used?) if a varchar falls under ObjectVector which is allocated on the heap !
((VariableWidthVector) newVV).allocateNew(maxColumnWidth * RECORDS_PER_BATCH, RECORDS_PER_BATCH);
} else if (newVV instanceof ObjectVector) {
((ObjectVector) newVV).allocateNew(RECORDS_PER_BATCH);
@@ -183,7 +184,6 @@ public class HashPartition {
}
newVC.setRecordCount(0);
- inMemBatches.inc(); ; // one more batch in memory
success = true;
} finally {
if ( !success ) {
@@ -199,18 +199,19 @@ public class HashPartition {
public void allocateNewCurrentBatchAndHV() {
if ( outerBatchNotNeeded ) { return; } // skip when the inner is whole in memory
currentBatch = allocateNewVectorContainer(processingOuter ? probeBatch : buildBatch);
- currHVVector = new IntVector(MaterializedField.create("Hash_Values", HVtype), allocator);
+ currHVVector = new IntVector(MaterializedField.create(HASH_VALUE_COLUMN_NAME, HVtype), allocator);
currHVVector.allocateNew(RECORDS_PER_BATCH);
}
/**
* Spills if needed
*/
- public void appendInnerRow(VectorContainer buildContainer, int ind, int hashCode, boolean needsSpill) {
+ public void appendInnerRow(VectorContainer buildContainer, int ind, int hashCode, HashJoinMemoryCalculator.BuildSidePartitioning calc) {
int pos = currentBatch.appendRow(buildContainer,ind);
currHVVector.getMutator().set(pos, hashCode); // store the hash value in the new column
if ( pos + 1 == RECORDS_PER_BATCH ) {
+ boolean needsSpill = isSpilled || calc.shouldSpill();
completeAnInnerBatch(true, needsSpill);
}
}
@@ -244,11 +245,17 @@ public class HashPartition {
currentBatch.buildSchema(BatchSchema.SelectionVectorMode.NONE);
tmpBatchesList.add(currentBatch);
partitionBatchesCount++;
+
+ long batchSize = new RecordBatchSizer(currentBatch).actualSize();
+ inMemoryBatchStats.add(new HashJoinMemoryCalculator.BatchStat(currentBatch.getRecordCount(), batchSize));
+
+ partitionInMemorySize += batchSize;
+ numInMemoryRecords += currentBatch.getRecordCount();
} else {
freeCurrentBatchAndHVVector();
}
if ( needsSpill ) { // spill this batch/partition and free its memory
- spillThisPartition(tmpBatchesList, processingOuter ? "outer" : "inner");
+ spillThisPartition();
}
if ( toInitialize ) { // allocate a new batch and HV vector
allocateNewCurrentBatchAndHV();
@@ -258,20 +265,14 @@ public class HashPartition {
}
}
- private void spillThisPartition(List<VectorContainer> vcList, String side) {
- if ( vcList.size() == 0 ) { return; } // in case empty - nothing to spill
- logger.debug("HashJoin: Spilling partition {}, current cycle {}, part size {} batches", partitionNum, cycleNum, vcList.size());
+ public void spillThisPartition() {
+ if ( tmpBatchesList.size() == 0 ) { return; } // in case empty - nothing to spill
+ logger.debug("HashJoin: Spilling partition {}, current cycle {}, part size {} batches", partitionNum, cycleNum, tmpBatchesList.size());
// If this is the first spill for this partition, create an output stream
if ( writer == null ) {
- // A special case - when (outer is) empty
- if ( vcList.get(0).getRecordCount() == 0 ) {
- VectorContainer vc = vcList.remove(0);
- inMemBatches.dec();
- vc.zeroVectors();
- return;
- }
- String suffix = cycleNum > 0 ? side + "_" + Integer.toString(cycleNum) : side;
+ final String side = processingOuter ? "outer" : "inner";
+ final String suffix = cycleNum > 0 ? side + "_" + Integer.toString(cycleNum) : side;
spillFile = spillSet.getNextSpillFile(suffix);
try {
@@ -285,15 +286,14 @@ public class HashPartition {
isSpilled = true;
}
- while ( vcList.size() > 0 ) {
- VectorContainer vc = vcList.remove(0);
- inMemBatches.dec();
+ partitionInMemorySize = 0L;
+ numInMemoryRecords = 0L;
+ inMemoryBatchStats.clear();
+
+ while ( tmpBatchesList.size() > 0 ) {
+ VectorContainer vc = tmpBatchesList.remove(0);
int numRecords = vc.getRecordCount();
- if (numRecords == 0) { // Spilling should to skip an empty batch
- vc.zeroVectors();
- continue;
- }
// set the value count for outgoing batch value vectors
for (VectorWrapper<?> v : vc) {
@@ -312,7 +312,6 @@ public class HashPartition {
}
vc.zeroVectors();
logger.trace("HASH JOIN: Took {} us to spill {} records", writer.time(TimeUnit.MICROSECONDS), numRecords);
-
}
}
@@ -362,9 +361,32 @@ public class HashPartition {
public void updateBatches() throws SchemaChangeException {
hashTable.updateBatches();
}
+
+ @Override
+ public List<HashJoinMemoryCalculator.BatchStat> getInMemoryBatches() {
+ return inMemoryBatchStats;
+ }
+
+ @Override
+ public int getNumInMemoryBatches() {
+ return inMemoryBatchStats.size();
+ }
+
+ @Override
public boolean isSpilled() {
return isSpilled;
}
+
+ @Override
+ public long getNumInMemoryRecords() {
+ return numInMemoryRecords;
+ }
+
+ @Override
+ public long getInMemorySize() {
+ return partitionInMemorySize;
+ }
+
public String getSpillFile() {
return spillFile;
}
@@ -378,7 +400,6 @@ public class HashPartition {
private void freeCurrentBatchAndHVVector() {
if ( currentBatch != null ) {
- inMemBatches.dec();
currentBatch.clear();
currentBatch = null;
}
@@ -421,11 +442,13 @@ public class HashPartition {
}
/**
- *
+ * Creates the hash table and join helper for this partition. This method should only be called after all the build side records
+ * have been consumed.
*/
public void buildContainersHashTableAndHelper() throws SchemaChangeException {
if ( isSpilled ) { return; } // no building for spilled partitions
containers = new ArrayList<>();
+ hashTable.updateInitialCapacity((int) getNumInMemoryRecords());
for (int curr = 0; curr < partitionBatchesCount; curr++) {
VectorContainer nextBatch = tmpBatchesList.get(curr);
final int currentRecordCount = nextBatch.getRecordCount();
@@ -467,6 +490,9 @@ public class HashPartition {
hashTable.getStats(newStats);
}
+ /**
+ * Frees memory allocated to the {@link HashTable} and {@link HashJoinHelper}.
+ */
public void clearHashTableAndHelper() {
if (hashTable != null) {
hashTable.clear();
@@ -487,7 +513,6 @@ public class HashPartition {
}
while ( tmpBatchesList.size() > 0 ) {
VectorContainer vc = tmpBatchesList.remove(0);
- inMemBatches.dec();
vc.clear();
}
closeWriter();
@@ -497,4 +522,12 @@ public class HashPartition {
if ( containers != null ) { containers.clear(); }
}
+ /**
+ * Creates a debugging string containing information about memory usage.
+ * @return A debugging string.
+ */
+ public String makeDebugString() {
+ return String.format("[hashTable = %s]",
+ hashTable == null ? "None": hashTable.makeDebugString());
+ }
} // class HashPartition
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
index ed8f388..194c865 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
@@ -46,17 +46,51 @@ public interface HashTable {
int BATCH_SIZE = Character.MAX_VALUE + 1;
int BATCH_MASK = 0x0000FFFF;
+ /**
+ * {@link HashTable#setup(HashTableConfig, BufferAllocator, VectorContainer, RecordBatch, RecordBatch, VectorContainer)} must be called before anything can be done to the
+ * {@link HashTable}.
+ * @param htConfig
+ * @param allocator
+ * @param incomingBuild
+ * @param incomingProbe
+ * @param outgoing
+ * @param htContainerOrig
+ */
void setup(HashTableConfig htConfig, BufferAllocator allocator, VectorContainer incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing,
VectorContainer htContainerOrig);
+ /**
+ * Updates the incoming (build and probe side) value vectors references in the {@link HashTableTemplate.BatchHolder}s.
+ * This is useful on OK_NEW_SCHEMA (need to verify).
+ * @throws SchemaChangeException
+ */
void updateBatches() throws SchemaChangeException;
+ /**
+ * Computes the hash code for the record at the given index in the build side batch.
+ * @param incomingRowIdx The index of the build side record of interest.
+ * @return The hash code for the record at the given index in the build side batch.
+ * @throws SchemaChangeException
+ */
int getBuildHashCode(int incomingRowIdx) throws SchemaChangeException;
+ /**
+ * Computes the hash code for the record at the given index in the probe side batch.
+ * @param incomingRowIdx The index of the probe side record of interest.
+ * @return The hash code for the record at the given index in the probe side batch.
+ * @throws SchemaChangeException
+ */
int getProbeHashCode(int incomingRowIdx) throws SchemaChangeException;
PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException, RetryAfterSpillException;
+ /**
+ * @param incomingRowIdx The index of the key in the probe batch.
+ * @param hashCode The hashCode of the key.
+ * @return Returns -1 if the data in the probe batch at the given incomingRowIdx is not in the hash table. Otherwise returns
+ * the composite index of the key in the hash table (index of BatchHolder and record in Batch Holder).
+ * @throws SchemaChangeException
+ */
int probeForKey(int incomingRowIdx, int hashCode) throws SchemaChangeException;
void getStats(HashTableStats stats);
@@ -65,15 +99,55 @@ public interface HashTable {
boolean isEmpty();
+ /**
+ * Frees all the direct memory consumed by the {@link HashTable}.
+ */
void clear();
- public void updateIncoming(VectorContainer newIncoming, RecordBatch newIncomingProbe);
+ /**
+ * Update the initial capacity for the hash table. This method will be removed after the key vectors are removed from the hash table. It is used
+ * to allocate {@link HashTableTemplate.BatchHolder}s of appropriate size when the final size of the HashTable is known.
+ *
+ * <b>Warning!</b> Only call this method before you have inserted elements into the HashTable.
+ *
+ * @param initialCapacity The new initial capacity to use.
+ */
+ void updateInitialCapacity(int initialCapacity);
+
+ /**
+ * Changes the incoming probe and build side batches, and then updates all the value vector references in the {@link HashTableTemplate.BatchHolder}s.
+ * @param newIncoming The new build side batch.
+ * @param newIncomingProbe The new probe side batch.
+ */
+ void updateIncoming(VectorContainer newIncoming, RecordBatch newIncomingProbe);
+ /**
+ * Clears all the memory used by the {@link HashTable} and re-initializes it.
+ */
void reset();
- void setMaxVarcharSize(int size);
-
+ /**
+ * Retrieves the key columns and transfers them to the output container. Note this operation removes the key columns from the {@link HashTable}.
+ * @param batchIdx The index of a {@link HashTableTemplate.BatchHolder} in the HashTable.
+ * @param outContainer The destination container for the key columns.
+ * @param outStartIndex The start index of the key records to transfer.
+ * @param numRecords The number of key recorts to transfer.
+ * @param numExpectedRecords
+ * @return
+ */
boolean outputKeys(int batchIdx, VectorContainer outContainer, int outStartIndex, int numRecords, int numExpectedRecords);
+
+ /**
+ * Returns a message containing memory usage statistics. Intended to be used for printing debugging or error messages.
+ * @return A debug string.
+ */
+ String makeDebugString();
+
+ /**
+ * The amount of direct memory consumed by the hash table.
+ * @return
+ */
+ long getActualSize();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableAllocationTracker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableAllocationTracker.java
new file mode 100644
index 0000000..d72278d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableAllocationTracker.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.common;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A helper class used by {@link HashTableTemplate} in order to pre-allocate {@link HashTableTemplate.BatchHolder}s of the appropriate size.
+ *
+ * <b>Note:</b> This class will be obsolete once the key vectors are removed from the hash table.
+ */
+class HashTableAllocationTracker
+{
+ private enum State {
+ NO_ALLOCATION_IN_PROGRESS,
+ ALLOCATION_IN_PROGRESS
+ }
+
+ private final HashTableConfig config;
+ private final int maxBatchHolderSize;
+
+ private State state = State.NO_ALLOCATION_IN_PROGRESS;
+ private int remainingCapacity;
+
+ protected HashTableAllocationTracker(final HashTableConfig config,
+ final int maxBatchHolderSize)
+ {
+ this.config = Preconditions.checkNotNull(config);
+ this.maxBatchHolderSize = maxBatchHolderSize;
+
+ remainingCapacity = config.getInitialCapacity();
+ }
+
+ public int getNextBatchHolderSize() {
+ state = State.ALLOCATION_IN_PROGRESS;
+
+ if (!config.getInitialSizeIsFinal()) {
+ // We don't know the final size of the hash table, so return the default max batch holder size
+ return maxBatchHolderSize;
+ } else {
+ // We know the final size of the hash table so we need to compute the next batch holder size.
+
+ Preconditions.checkState(remainingCapacity > 0);
+ return computeNextBatchHolderSize();
+ }
+ }
+
+ private int computeNextBatchHolderSize() {
+ return Math.min(remainingCapacity, maxBatchHolderSize);
+ }
+
+ public void commit() {
+ Preconditions.checkState(state.equals(State.ALLOCATION_IN_PROGRESS));
+
+ remainingCapacity -= computeNextBatchHolderSize();
+ state = State.NO_ALLOCATION_IN_PROGRESS;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java
index aa2497c..b689390 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java
@@ -29,27 +29,58 @@ import java.util.List;
public class HashTableConfig {
private final int initialCapacity;
+ private final boolean initialSizeIsFinal;
private final float loadFactor;
private final List<NamedExpression> keyExprsBuild;
private final List<NamedExpression> keyExprsProbe;
private final List<Comparator> comparators;
@JsonCreator
- public HashTableConfig(@JsonProperty("initialCapacity") int initialCapacity, @JsonProperty("loadFactor") float loadFactor,
+ public HashTableConfig(@JsonProperty("initialCapacity") int initialCapacity,
+ @JsonProperty("loadFactor") float loadFactor,
@JsonProperty("keyExprsBuild") List<NamedExpression> keyExprsBuild,
@JsonProperty("keyExprsProbe") List<NamedExpression> keyExprsProbe,
@JsonProperty("comparators") List<Comparator> comparators) {
this.initialCapacity = initialCapacity;
+ this.initialSizeIsFinal = false;
this.loadFactor = loadFactor;
this.keyExprsBuild = keyExprsBuild;
this.keyExprsProbe = keyExprsProbe;
this.comparators = comparators;
}
+ @JsonCreator
+ public HashTableConfig(@JsonProperty("initialCapacity") int initialCapacity,
+ @JsonProperty("initialCapacity") boolean initialSizeIsFinal,
+ @JsonProperty("loadFactor") float loadFactor,
+ @JsonProperty("keyExprsBuild") List<NamedExpression> keyExprsBuild,
+ @JsonProperty("keyExprsProbe") List<NamedExpression> keyExprsProbe,
+ @JsonProperty("comparators") List<Comparator> comparators) {
+ this.initialCapacity = initialCapacity;
+ this.initialSizeIsFinal = initialSizeIsFinal;
+ this.loadFactor = loadFactor;
+ this.keyExprsBuild = keyExprsBuild;
+ this.keyExprsProbe = keyExprsProbe;
+ this.comparators = comparators;
+ }
+
+ public HashTableConfig withInitialCapacity(int initialCapacity) {
+ return new HashTableConfig(initialCapacity,
+ initialSizeIsFinal,
+ loadFactor,
+ keyExprsBuild,
+ keyExprsProbe,
+ comparators);
+ }
+
public int getInitialCapacity() {
return initialCapacity;
}
+ public boolean getInitialSizeIsFinal() {
+ return initialSizeIsFinal;
+ }
+
public float getLoadFactor() {
return loadFactor;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index b7a7f7b..06e3bcd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -19,18 +19,23 @@ package org.apache.drill.exec.physical.impl.common;
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.Set;
import javax.inject.Named;
+import com.google.common.collect.Sets;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.compile.sig.RuntimeOverridden;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.AllocationManager;
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.impl.join.HashJoinMemoryCalculator;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchSizer;
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
@@ -38,11 +43,13 @@ import org.apache.drill.exec.vector.BigIntVector;
import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.IntVector;
import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.VariableWidthVector;
import org.apache.drill.common.exceptions.RetryAfterSpillException;
+import org.apache.drill.exec.vector.VariableWidthVector;
public abstract class HashTableTemplate implements HashTable {
+ public static final int MAX_VARCHAR_SIZE = 8; // This is a bad heuristic which will be eliminated when the keys are removed from the HashTable.
+
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashTable.class);
private static final boolean EXTRA_DEBUG = false;
@@ -90,6 +97,9 @@ public abstract class HashTableTemplate implements HashTable {
// Hash table configuration parameters
private HashTableConfig htConfig;
+ // Allocation tracker
+ private HashTableAllocationTracker allocationTracker;
+
// The original container from which others may be cloned
private VectorContainer htContainerOrig;
@@ -99,8 +109,6 @@ public abstract class HashTableTemplate implements HashTable {
private int resizingTime = 0;
- private int maxVarcharSize = 8; // for varchar allocation
-
// This class encapsulates the links, keys and values for up to BATCH_SIZE
// *unique* records. Thus, suppose there are N incoming record batches, each
// of size BATCH_SIZE..but they have M unique keys altogether, the number of
@@ -121,7 +129,7 @@ public abstract class HashTableTemplate implements HashTable {
private int batchIndex = 0;
- public BatchHolder(int idx) {
+ public BatchHolder(int idx, int newBatchHolderSize) {
this.batchIndex = idx;
@@ -132,24 +140,24 @@ public abstract class HashTableTemplate implements HashTable {
ValueVector vv = TypeHelper.getNewVector(w.getField(), allocator);
htContainer.add(vv); // add to container before actual allocation (to allow clearing in case of an OOM)
- // Capacity for "hashValues" and "links" vectors is BATCH_SIZE records. It is better to allocate space for
- // "key" vectors to store as close to as BATCH_SIZE records. A new BatchHolder is created when either BATCH_SIZE
+ // Capacity for "hashValues" and "links" vectors is newBatchHolderSize records. It is better to allocate space for
+ // "key" vectors to store as close to as newBatchHolderSize records. A new BatchHolder is created when either newBatchHolderSize
// records are inserted or "key" vectors ran out of space. Allocating too less space for "key" vectors will
// result in unused space in "hashValues" and "links" vectors in the BatchHolder. Also for each new
- // BatchHolder we create a SV4 vector of BATCH_SIZE in HashJoinHelper.
+ // BatchHolder we create a SV4 vector of newBatchHolderSize in HashJoinHelper.
if (vv instanceof FixedWidthVector) {
- ((FixedWidthVector) vv).allocateNew(BATCH_SIZE);
+ ((FixedWidthVector) vv).allocateNew(newBatchHolderSize);
} else if (vv instanceof VariableWidthVector) {
long beforeMem = allocator.getAllocatedMemory();
- ((VariableWidthVector) vv).allocateNew(maxVarcharSize * BATCH_SIZE, BATCH_SIZE);
- logger.trace("HT allocated {} for varchar of max width {}",allocator.getAllocatedMemory() - beforeMem, maxVarcharSize);
+ ((VariableWidthVector) vv).allocateNew(MAX_VARCHAR_SIZE * newBatchHolderSize, newBatchHolderSize);
+ logger.trace("HT allocated {} for varchar of max width {}",allocator.getAllocatedMemory() - beforeMem, MAX_VARCHAR_SIZE);
} else {
vv.allocateNew();
}
}
- links = allocMetadataVector(HashTable.BATCH_SIZE, EMPTY_SLOT);
- hashValues = allocMetadataVector(HashTable.BATCH_SIZE, 0);
+ links = allocMetadataVector(newBatchHolderSize, EMPTY_SLOT);
+ hashValues = allocMetadataVector(newBatchHolderSize, 0);
success = true;
} finally {
if (!success) {
@@ -182,7 +190,7 @@ public abstract class HashTableTemplate implements HashTable {
boolean isProbe) throws SchemaChangeException {
int currentIdxWithinBatch = currentIdxHolder.value & BATCH_MASK;
- boolean match = false;
+ boolean match;
if (currentIdxWithinBatch >= HashTable.BATCH_SIZE) {
logger.debug("Batch size = {}, incomingRowIdx = {}, currentIdxWithinBatch = {}.", HashTable.BATCH_SIZE,
@@ -323,12 +331,6 @@ public abstract class HashTableTemplate implements HashTable {
}
private boolean outputKeys(VectorContainer outContainer, int outStartIndex, int numRecords, int numExpectedRecords) {
-
- /** for debugging
- BigIntVector vv0 = getValueVector(0);
- BigIntHolder holder = new BigIntHolder();
- */
-
// set the value count for htContainer's value vectors before the transfer ..
setValueCount();
@@ -352,26 +354,6 @@ public abstract class HashTableTemplate implements HashTable {
}
}
-/*
- logger.debug("Attempting to output keys for batch index: {} from index {} to maxOccupiedIndex {}.",
- this.batchIndex, 0, maxOccupiedIdx);
- for (int i = batchOutputCount; i <= maxOccupiedIdx; i++) {
- if (outputRecordKeys(i, batchOutputCount) ) {
- if (EXTRA_DEBUG) logger.debug("Outputting keys to output index: {}", batchOutputCount) ;
-
- // debugging
- // holder.value = vv0.getAccessor().get(i);
- // if (holder.value == 100018 || holder.value == 100021) {
- // logger.debug("Outputting key = {} at index - {} to outgoing index = {}.", holder.value, i,
- // batchOutputCount);
- // }
-
- batchOutputCount++;
- } else {
- return false;
- }
- }
- */
return true;
}
@@ -443,8 +425,21 @@ public abstract class HashTableTemplate implements HashTable {
protected void outputRecordKeys(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) throws SchemaChangeException {
}
- } // class BatchHolder
+ public long getActualSize() {
+ Set<AllocationManager.BufferLedger> ledgers = Sets.newHashSet();
+ links.collectLedgers(ledgers);
+ hashValues.collectLedgers(ledgers);
+
+ long size = 0L;
+ for (AllocationManager.BufferLedger ledger: ledgers) {
+ size += ledger.getAccountedSize();
+ }
+
+ size += new RecordBatchSizer(htContainer).actualSize();
+ return size;
+ }
+ }
@Override
public void setup(HashTableConfig htConfig, BufferAllocator allocator, VectorContainer incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig) {
@@ -471,6 +466,7 @@ public abstract class HashTableTemplate implements HashTable {
this.incomingProbe = incomingProbe;
this.outgoing = outgoing;
this.htContainerOrig = htContainerOrig;
+ this.allocationTracker = new HashTableAllocationTracker(htConfig, BATCH_SIZE);
// round up the initial capacity to nearest highest power of 2
tableSize = roundUpToPowerOf2(initialCap);
@@ -499,6 +495,12 @@ public abstract class HashTableTemplate implements HashTable {
}
@Override
+ public void updateInitialCapacity(int initialCapacity) {
+ htConfig = htConfig.withInitialCapacity(initialCapacity);
+ allocationTracker = new HashTableAllocationTracker(htConfig, BATCH_SIZE);
+ }
+
+ @Override
public void updateBatches() throws SchemaChangeException {
doSetup(incomingBuild, incomingProbe);
for (BatchHolder batchHolder : batchHolders) {
@@ -711,19 +713,21 @@ public abstract class HashTableTemplate implements HashTable {
int totalBatchSize = batchHolders.size() * BATCH_SIZE;
if (currentIdx >= totalBatchSize) {
- BatchHolder bh = newBatchHolder(batchHolders.size());
+ BatchHolder bh = newBatchHolder(batchHolders.size(), allocationTracker.getNextBatchHolderSize());
batchHolders.add(bh);
bh.setup();
if (EXTRA_DEBUG) {
logger.debug("HashTable: Added new batch. Num batches = {}.", batchHolders.size());
}
+
+ allocationTracker.commit();
return true;
}
return false;
}
- protected BatchHolder newBatchHolder(int index) { // special method to allow debugging of gen code
- return new BatchHolder(index);
+ protected BatchHolder newBatchHolder(int index, int newBatchHolderSize) { // special method to allow debugging of gen code
+ return new BatchHolder(index, newBatchHolderSize);
}
// Resize the hash table if needed by creating a new one with double the number of buckets.
@@ -831,9 +835,6 @@ public abstract class HashTableTemplate implements HashTable {
return vector;
}
- @Override
- public void setMaxVarcharSize(int size) { maxVarcharSize = size; }
-
// These methods will be code-generated in the context of the outer class
protected abstract void doSetup(@Named("incomingBuild") VectorContainer incomingBuild, @Named("incomingProbe") RecordBatch incomingProbe) throws SchemaChangeException;
@@ -841,4 +842,27 @@ public abstract class HashTableTemplate implements HashTable {
protected abstract int getHashProbe(@Named("incomingRowIdx") int incomingRowIdx, @Named("seedValue") int seedValue) throws SchemaChangeException;
+ @Override
+ public long getActualSize() {
+ Set<AllocationManager.BufferLedger> ledgers = Sets.newHashSet();
+ startIndices.collectLedgers(ledgers);
+
+ long size = 0L;
+
+ for (AllocationManager.BufferLedger ledger: ledgers) {
+ size += ledger.getAccountedSize();
+ }
+
+ for (BatchHolder batchHolder: batchHolders) {
+ size += batchHolder.getActualSize();
+ }
+
+ return size;
+ }
+
+ @Override
+ public String makeDebugString() {
+ return String.format("[numBuckets = %d, numEntries = %d, numBatchHolders = %d, actualSize = %s]",
+ numBuckets(), numEntries, batchHolders.size(), HashJoinMemoryCalculator.PartitionStatSet.prettyPrintBytes(getActualSize()));
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 998e0b1..0c46e36 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -20,13 +20,16 @@ package org.apache.drill.exec.physical.impl.join;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.TimeUnit;
+import java.util.Set;
import com.google.common.collect.Lists;
-import org.apache.drill.common.exceptions.RetryAfterSpillException;
+import com.google.common.collect.Sets;
+import org.apache.commons.io.FileUtils;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.data.JoinCondition;
import org.apache.drill.common.logical.data.NamedExpression;
import org.apache.drill.common.types.TypeProtos;
@@ -34,13 +37,13 @@ import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BaseAllocator;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.physical.base.AbstractBase;
import org.apache.drill.exec.physical.config.HashJoinPOP;
import org.apache.drill.exec.physical.impl.aggregate.SpilledRecordbatch;
import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
@@ -55,15 +58,27 @@ import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.IntVector;
import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VariableWidthVector;
import org.apache.drill.exec.vector.complex.AbstractContainerVector;
import org.apache.calcite.rel.core.JoinRelType;
+/**
+ *
+ */
public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
protected static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashJoinBatch.class);
- private int RECORDS_PER_BATCH = 128; // 1024; // internal batches
+ /**
+ * The maximum number of records within each internal batch.
+ */
+ private int RECORDS_PER_BATCH; // internal batches
+
+ /**
+ * The maximum number of records in each outgoing batch.
+ */
private static final int TARGET_RECORDS_PER_BATCH = 4000;
// Join type, INNER, LEFT, RIGHT or OUTER
@@ -71,32 +86,40 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
// Join conditions
private final List<JoinCondition> conditions;
+ private final List<NamedExpression> rightExpr;
- private final List<Comparator> comparators;
+ /**
+ * Names of the join columns. This names are used in order to help estimate the size of the {@link HashTable}s.
+ */
+ private final Set<String> buildJoinColumns;
// Fields used for partitioning
- private int numPartitions = 1; // must be 2 to the power of bitsInMask (set in setup())
+
+ /**
+ * The number of {@link HashPartition}s. This is configured via a system option and set in {@link #partitionNumTuning(int, HashJoinMemoryCalculator.BuildSidePartitioning)}.
+ */
+ private int numPartitions = 1; // must be 2 to the power of bitsInMask
private int partitionMask = 0; // numPartitions - 1
private int bitsInMask = 0; // number of bits in the MASK
+
+ /**
+ * The master class used to generate {@link HashTable}s.
+ */
private ChainedHashTable baseHashTable;
private boolean buildSideIsEmpty = true;
private boolean canSpill = true;
private boolean wasKilled; // a kill was received, may need to clean spilled partns
+ /**
+ * This array holds the currently active {@link HashPartition}s.
+ */
HashPartition partitions[];
// Number of records in the output container
private int outputRecords;
// Schema of the build side
- private BatchSchema rightSchema = null;
-
- private final HashTableStats htStats = new HashTableStats();
-
- private final MajorType HVtype = MajorType.newBuilder()
- .setMinorType(org.apache.drill.common.types.TypeProtos.MinorType.INT /* dataType */ )
- .setMode(DataMode.REQUIRED /* mode */ )
- .build();
+ private BatchSchema rightSchema;
private int rightHVColPosition;
private BufferAllocator allocator;
@@ -111,17 +134,11 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
private int cycleNum = 0; // primary, secondary, tertiary, etc.
private int originalPartition = -1; // the partition a secondary reads from
IntVector read_HV_vector; // HV vector that was read from the spilled batch
- private int MAX_BATCHES_IN_MEMORY;
- private int MAX_BATCHES_PER_PARTITION;
-
- public class inMemBatchCounter {
- private int inMemBatches;
- public void inc() { inMemBatches++; }
- public void dec() { inMemBatches--; }
- public int value() { return inMemBatches; }
- }
- public inMemBatchCounter inMemBatches = new inMemBatchCounter();
+ private int maxBatchesInMemory;
+ /**
+ * This holds information about the spilled partitions for the build and probe side.
+ */
private static class HJSpilledPartition {
public int innerSpilledBatches;
public String innerSpillFile;
@@ -131,10 +148,12 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
int origPartn;
int prevOrigPartn; }
+ /**
+ * Queue of spilled partitions to process.
+ */
private ArrayList<HJSpilledPartition> spilledPartitionsList;
private HJSpilledPartition spilledInners[]; // for the outer to find the partition
- private int operatorId; // for the spill file name
public enum Metric implements MetricDef {
NUM_BUCKETS,
@@ -179,6 +198,78 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
}
@Override
+ protected boolean prefetchFirstBatchFromBothSides() {
+ leftUpstream = sniffNonEmptyBatch(0, left);
+ rightUpstream = sniffNonEmptyBatch(1, right);
+
+ if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) {
+ state = BatchState.STOP;
+ return false;
+ }
+
+ if (leftUpstream == IterOutcome.OUT_OF_MEMORY || rightUpstream == IterOutcome.OUT_OF_MEMORY) {
+ state = BatchState.OUT_OF_MEMORY;
+ return false;
+ }
+
+ if (checkForEarlyFinish()) {
+ state = BatchState.DONE;
+ return false;
+ }
+
+ state = BatchState.FIRST; // Got our first batches on both sides
+ return true;
+ }
+
+ /**
+ * Currently in order to accurately predict memory usage for spilling, the first non-empty build side and probe side batches are needed. This method
+ * fetches the first non-empty batch from the left or right side.
+ * @param inputIndex Index specifying whether to work with the left or right input.
+ * @param recordBatch The left or right record batch.
+ * @return The {@link org.apache.drill.exec.record.RecordBatch.IterOutcome} for the left or right record batch.
+ */
+ private IterOutcome sniffNonEmptyBatch(int inputIndex, RecordBatch recordBatch) {
+ while (true) {
+ IterOutcome outcome = next(inputIndex, recordBatch);
+
+ switch (outcome) {
+ case OK_NEW_SCHEMA:
+ // We need to have the schema of the build side even when the build side is empty
+ rightSchema = buildBatch.getSchema();
+ // position of the new "column" for keeping the hash values (after the real columns)
+ rightHVColPosition = buildBatch.getContainer().getNumberOfColumns();
+ // new schema can also contain records
+ case OK:
+ if (recordBatch.getRecordCount() == 0) {
+ continue;
+ }
+ // We got a non empty batch
+ default:
+ // Other cases termination conditions
+ return outcome;
+ }
+ }
+ }
+
+ /**
+ * Determines the memory calculator to use. If maxNumBatches is configured simple batch counting is used to spill. Otherwise
+ * memory calculations are used to determine when to spill.
+ * @return The memory calculator to use.
+ */
+ public HashJoinMemoryCalculator getCalculatorImpl() {
+ if (maxBatchesInMemory == 0) {
+ final double safetyFactor = context.getOptions().getDouble(ExecConstants.HASHJOIN_SAFETY_FACTOR_KEY);
+ final double fragmentationFactor = context.getOptions().getDouble(ExecConstants.HASHJOIN_FRAGMENTATION_FACTOR_KEY);
+ final double hashTableDoublingFactor = context.getOptions().getDouble(ExecConstants.HASHJOIN_HASH_DOUBLE_FACTOR_KEY);
+ final String hashTableCalculatorType = context.getOptions().getString(ExecConstants.HASHJOIN_HASHTABLE_CALC_TYPE_KEY);
+
+ return new HashJoinMemoryCalculatorImpl(safetyFactor, fragmentationFactor, hashTableDoublingFactor, hashTableCalculatorType);
+ } else {
+ return new MechanicalHashJoinMemoryCalculator(maxBatchesInMemory);
+ }
+ }
+
+ @Override
public IterOutcome innerNext() {
try {
/* If we are here for the first time, execute the build phase of the
@@ -265,13 +356,14 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
this.cleanup();
throw UserException
.unsupportedError()
- .message("Hash-Join can not partition inner data any further (too many join-key duplicates? - try merge-join)")
+ .message("Hash-Join can not partition inner data any further (too many join-key duplicates? - try merge-join)\n"
+ + "On cycle num %d mem available %d num partitions %d", cycleNum, allocator.getLimit(), numPartitions)
.build(logger);
}
}
logger.debug("Start reading spilled partition {} (prev {}) from cycle {} (with {}-{} batches). More {} spilled partitions left.", currSp.origPartn, currSp.prevOrigPartn, currSp.cycleNum, currSp.outerSpilledBatches, currSp.innerSpilledBatches, spilledPartitionsList.size());
- state = BatchState.FIRST; // build again, initialize probe, etc
+ state = BatchState.FIRST; // TODO need to determine if this is still necessary since prefetchFirstBatchFromBothSides sets this
return innerNext(); // start processing the next spilled partition "recursively"
}
@@ -307,14 +399,19 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
}
private void setupHashTable() throws SchemaChangeException {
+ final List<Comparator> comparators = Lists.newArrayListWithExpectedSize(conditions.size());
+ // When DRILL supports Java 8, use the following instead of the for() loop
+ // conditions.forEach(cond->comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond)));
+ for (int i=0; i<conditions.size(); i++) {
+ JoinCondition cond = conditions.get(i);
+ comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond));
+ }
+
// Setup the hash table configuration object
- int conditionsSize = conditions.size();
- final List<NamedExpression> rightExpr = new ArrayList<>(conditionsSize);
- List<NamedExpression> leftExpr = new ArrayList<>(conditionsSize);
+ List<NamedExpression> leftExpr = new ArrayList<>(conditions.size());
// Create named expressions from the conditions
- for (int i = 0; i < conditionsSize; i++) {
- rightExpr.add(new NamedExpression(conditions.get(i).getRight(), new FieldReference("build_side_" + i)));
+ for (int i = 0; i < conditions.size(); i++) {
leftExpr.add(new NamedExpression(conditions.get(i).getLeft(), new FieldReference("probe_side_" + i)));
}
@@ -328,8 +425,9 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
throw new SchemaChangeException(errorMsg);
}
}
- final HashTableConfig htConfig = new HashTableConfig((int) context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE), HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr, comparators);
+ final HashTableConfig htConfig = new HashTableConfig((int) context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE),
+ true, HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr, comparators);
// Create the chained hash table
baseHashTable =
@@ -352,23 +450,10 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
// ================================
// Set the number of partitions from the configuration (raise to a power of two, if needed)
- numPartitions = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR);
- if ( numPartitions == 1 ) { //
- canSpill = false;
- logger.warn("Spilling is disabled due to configuration setting of num_partitions to 1");
- }
- numPartitions = BaseAllocator.nextPowerOfTwo(numPartitions); // in case not a power of 2
// Based on the number of partitions: Set the mask and bit count
partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
- RECORDS_PER_BATCH = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR);
-
- MAX_BATCHES_IN_MEMORY = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR);
- MAX_BATCHES_PER_PARTITION = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_BATCHES_PER_PARTITION_VALIDATOR);
-
- // =================================
-
// Create the FIFO list of spilled partitions (pairs - inner/outer)
spilledPartitionsList = new ArrayList<>();
@@ -382,17 +467,73 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
* Initialize fields (that may be reused when reading spilled partitions)
*/
private void initializeBuild() {
- assert inMemBatches.value() == 0; // check that no in-memory batches left
baseHashTable.updateIncoming(buildBatch, probeBatch); // in case we process the spilled files
// Recreate the partitions every time build is initialized
for (int part = 0; part < numPartitions; part++ ) {
partitions[part] = new HashPartition(context, allocator, baseHashTable, buildBatch, probeBatch,
- RECORDS_PER_BATCH, spillSet, part, inMemBatches, cycleNum);
+ RECORDS_PER_BATCH, spillSet, part, cycleNum);
}
spilledInners = new HJSpilledPartition[numPartitions];
}
+
+ /**
+ * Tunes the number of partitions used by {@link HashJoinBatch}. If it is not possible to spill it gives up and reverts
+ * to unbounded in memory operation.
+ * @param maxBatchSize
+ * @param buildCalc
+ * @return
+ */
+ private HashJoinMemoryCalculator.BuildSidePartitioning partitionNumTuning(
+ int maxBatchSize,
+ HashJoinMemoryCalculator.BuildSidePartitioning buildCalc) {
+ // Get auto tuning result
+ numPartitions = buildCalc.getNumPartitions();
+
+ if (logger.isDebugEnabled()) {
+ logger.debug(buildCalc.makeDebugString());
+ }
+
+ if (buildCalc.getMaxReservedMemory() > allocator.getLimit()) {
+ // We don't have enough memory to do any spilling. Give up and do no spilling and have no limits
+
+ // TODO dirty hack to prevent regressions. Remove this once batch sizing is implemented.
+ // We don't have enough memory to do partitioning, we have to do everything in memory
+ final String message = String.format("When using the minimum number of partitions %d we require %s memory but only have %s available. " +
+ "Forcing legacy behavoir of using unbounded memory in order to prevent regressions.",
+ numPartitions,
+ FileUtils.byteCountToDisplaySize(buildCalc.getMaxReservedMemory()),
+ FileUtils.byteCountToDisplaySize(allocator.getLimit()));
+ logger.warn(message);
+
+ // create a Noop memory calculator
+ final HashJoinMemoryCalculator calc = getCalculatorImpl();
+ calc.initialize(false);
+ buildCalc = calc.next();
+
+ buildCalc.initialize(true,
+ true, // TODO Fix after growing hash values bug fixed
+ buildBatch,
+ probeBatch,
+ buildJoinColumns,
+ allocator.getLimit(),
+ numPartitions,
+ RECORDS_PER_BATCH,
+ RECORDS_PER_BATCH,
+ maxBatchSize,
+ maxBatchSize,
+ TARGET_RECORDS_PER_BATCH,
+ HashTable.DEFAULT_LOAD_FACTOR);
+
+ numPartitions = 1; // We are only using one partition
+ canSpill = false; // We cannot spill
+ allocator.setLimit(AbstractBase.MAX_ALLOCATION); // Violate framework and force unbounded memory
+ }
+
+ return buildCalc;
+ }
+
/**
* Execute the BUILD phase; first read incoming and split rows into partitions;
* may decide to spill some of the partitions
@@ -400,24 +541,56 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
* @throws SchemaChangeException
*/
public void executeBuildPhase() throws SchemaChangeException {
- final HashJoinMemoryCalculator.BuildSidePartitioning buildCalc = new HashJoinMemoryCalculatorImpl().next();
- boolean hasProbeData = leftUpstream != IterOutcome.NONE;
-
- if ( rightUpstream == IterOutcome.NONE ) { return; } // empty right
+ if (rightUpstream == IterOutcome.NONE) {
+ // empty right
+ return;
+ }
- // skip first batch if count is zero, as it may be an empty schema batch
- if (false && buildBatch.getRecordCount() == 0) {
- for (final VectorWrapper<?> w : buildBatch) {
- w.clear();
+ HashJoinMemoryCalculator.BuildSidePartitioning buildCalc;
+ boolean firstCycle = cycleNum == 0;
+
+ {
+ // Initializing build calculator
+ // Limit scope of these variables to this block
+ int maxBatchSize = firstCycle? RecordBatch.MAX_BATCH_SIZE: RECORDS_PER_BATCH;
+ boolean hasProbeData = leftUpstream != IterOutcome.NONE;
+ boolean doMemoryCalculation = canSpill && hasProbeData;
+ HashJoinMemoryCalculator calc = getCalculatorImpl();
+
+ calc.initialize(doMemoryCalculation);
+ buildCalc = calc.next();
+
+ // We've sniffed first non empty build and probe batches so we have enough information to createa calculator
+ buildCalc.initialize(firstCycle, true, // TODO Fix after growing hash values bug fixed
+ buildBatch,
+ probeBatch,
+ buildJoinColumns,
+ allocator.getLimit(),
+ numPartitions,
+ RECORDS_PER_BATCH,
+ RECORDS_PER_BATCH,
+ maxBatchSize,
+ maxBatchSize,
+ TARGET_RECORDS_PER_BATCH,
+ HashTable.DEFAULT_LOAD_FACTOR);
+
+ if (firstCycle && doMemoryCalculation) {
+ // Do auto tuning
+ buildCalc = partitionNumTuning(maxBatchSize, buildCalc);
}
- rightUpstream = next(buildBatch);
}
- //Setup the underlying hash table
- if ( cycleNum == 0 ) { delayedSetup(); } // first time only
+ if (firstCycle) {
+ // Do initial setup only on the first cycle
+ delayedSetup();
+ }
initializeBuild();
+ // Make the calculator aware of our partitions
+ final HashJoinMemoryCalculator.PartitionStatSet partitionStatSet = new HashJoinMemoryCalculator.PartitionStatSet(partitions);
+ buildCalc.setPartitionStatSet(partitionStatSet);
+
boolean moreData = true;
while (moreData) {
switch (rightUpstream) {
@@ -440,32 +613,15 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
if ( cycleNum > 0 ) {
read_HV_vector = (IntVector) buildBatch.getContainer().getLast();
}
+
// For every record in the build batch, hash the key columns and keep the result
for (int ind = 0; ind < currentRecordCount; ind++) {
int hashCode = ( cycleNum == 0 ) ? partitions[0].getBuildHashCode(ind)
: read_HV_vector.getAccessor().get(ind); // get the hash value from the HV column
int currPart = hashCode & partitionMask ;
hashCode >>>= bitsInMask;
-/*
- int pos = currentBatches[currPart].appendRow(buildBatch.getContainer(),ind);
- currHVVectors[currPart].getMutator().set(pos, hashCode); // store the hash value in the new column
- if ( pos + 1 == RECORDS_PER_BATCH ) {
- // The current decision on when-to-spill is crude
- completeAnInnerBatch(currPart,true,
- isSpilled(currPart) || // once spilled - then spill every new full batch
- canSpill &&
- ( inMemBatches > MAX_BATCHES_IN_MEMORY ||
- tmpBatchesList[currPart].size() > MAX_BATCHES_PER_PARTITION ));
- }
-*/
-
// Append the new inner row to the appropriate partition; spill (that partition) if needed
- partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind, hashCode,
- // The current decision on when-to-spill is crude ...
- partitions[currPart].isSpilled() || // once spilled - then spill every new full batch
- canSpill &&
- ( inMemBatches.value() > MAX_BATCHES_IN_MEMORY ||
- partitions[currPart].getPartitionBatchesCount() > MAX_BATCHES_PER_PARTITION ) ); // may spill if needed
+ partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind, hashCode, buildCalc); // may spill if needed
}
if ( read_HV_vector != null ) {
@@ -483,66 +639,58 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
// the spilled partitions list
for (HashPartition partn : partitions) {
partn.completeAnInnerBatch(false, partn.isSpilled() );
- if ( partn.isSpilled() ) {
- HJSpilledPartition sp = new HJSpilledPartition();
- sp.innerSpillFile = partn.getSpillFile();
- sp.innerSpilledBatches = partn.getPartitionBatchesCount();
- sp.cycleNum = cycleNum; // remember the current cycle
- sp.origPartn = partn.getPartitionNum(); // for debugging / filename
- sp.prevOrigPartn = originalPartition; // for debugging / filename
- spilledPartitionsList.add(sp);
-
- spilledInners[partn.getPartitionNum()] = sp; // for the outer to find the SP later
- partn.closeWriter();
- }
}
+ HashJoinMemoryCalculator.PostBuildCalculations postBuildCalc = buildCalc.next();
+ postBuildCalc.initialize();
+
//
// Traverse all the in-memory partitions' incoming batches, and build their hash tables
//
-/*
- for (int currPart = 0; currPart < numPartitions; currPart++) {
- // each partition is a regular array of batches
- ArrayList<VectorContainer> thisPart = new ArrayList<>();
+ for (int index = 0; index < partitions.length; index++) {
+ final HashPartition partn = partitions[index];
- for (int curr = 0; curr < partitionBatchesCount[currPart]; curr++) {
- VectorContainer nextBatch = tmpBatchesList[currPart].get(curr);
- final int currentRecordCount = nextBatch.getRecordCount();
-
- // For every incoming build batch, we create a matching helper batch
- hjHelpers[currPart].addNewBatch(currentRecordCount);
-
- // Holder contains the global index where the key is hashed into using the hash table
- final IndexPointer htIndex = new IndexPointer();
-
- hashTables[currPart].updateIncoming(nextBatch, probeBatch );
-
- IntVector HV_vector = (IntVector) nextBatch.getValueVector(rightHVColPosition).getValueVector();
+ if (partn.isSpilled()) {
+ // Don't build hash tables for spilled partitions
+ continue;
+ }
- for (int recInd = 0; recInd < currentRecordCount; recInd++) {
- int hashCode = HV_vector.getAccessor().get(recInd);
- try {
- hashTables[currPart].put(recInd, htIndex, hashCode);
- } catch (RetryAfterSpillException RE) {
- throw new OutOfMemoryException("HT put");
- } // Hash Join can not retry yet
- // Use the global index returned by the hash table, to store
- //the current record index and batch index. This will be used
- // later when we probe and find a match.
- //
- hjHelpers[currPart].setCurrentIndex(htIndex.value, curr , recInd);
+ try {
+ if (postBuildCalc.shouldSpill()) {
+ // Spill this partition if we need to make room
+ partn.spillThisPartition();
+ } else {
+ // Only build hash tables for partitions that are not spilled
+ partn.buildContainersHashTableAndHelper();
}
-
- thisPart.add(nextBatch);
+ } catch (OutOfMemoryException e) {
+ final String message = "Failed building hash table on partition " + index + ":\n"
+ + makeDebugString() + "\n"
+ + postBuildCalc.makeDebugString();
+ // Include debug info
+ throw new OutOfMemoryException(message, e);
}
+ }
- partitionContainers.add(thisPart);
-*/
- for (HashPartition partn : partitions) {
- partn.buildContainersHashTableAndHelper();
+ if (logger.isDebugEnabled()) {
+ logger.debug(postBuildCalc.makeDebugString());
}
+ for (HashPartition partn : partitions) {
+ if ( partn.isSpilled() ) {
+ HJSpilledPartition sp = new HJSpilledPartition();
+ sp.innerSpillFile = partn.getSpillFile();
+ sp.innerSpilledBatches = partn.getPartitionBatchesCount();
+ sp.cycleNum = cycleNum; // remember the current cycle
+ sp.origPartn = partn.getPartitionNum(); // for debugging / filename
+ sp.prevOrigPartn = originalPartition; // for debugging / filename
+ spilledPartitionsList.add(sp);
+
+ spilledInners[partn.getPartitionNum()] = sp; // for the outer to find the SP later
+ partn.closeWriter();
+ }
+ }
}
private void setupOutputContainerSchema() {
@@ -592,9 +740,18 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
}
private void allocateVectors() {
- for (final VectorWrapper<?> v : container) {
- v.getValueVector().allocateNew();
+ for (final VectorWrapper<?> vectorWrapper : container) {
+ ValueVector valueVector = vectorWrapper.getValueVector();
+
+ if (valueVector instanceof FixedWidthVector) {
+ ((FixedWidthVector) valueVector).allocateNew(TARGET_RECORDS_PER_BATCH);
+ } else if (valueVector instanceof VariableWidthVector) {
+ ((VariableWidthVector) valueVector).allocateNew(8 * TARGET_RECORDS_PER_BATCH, TARGET_RECORDS_PER_BATCH);
+ } else {
+ valueVector.allocateNew();
+ }
}
+
container.setRecordCount(0); // reset container's counter back to zero records
}
@@ -615,13 +772,25 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
conditions = popConfig.getConditions();
this.popConfig = popConfig;
- comparators = Lists.newArrayListWithExpectedSize(conditions.size());
- // When DRILL supports Java 8, use the following instead of the for() loop
- // conditions.forEach(cond->comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond)));
- for (int i=0; i<conditions.size(); i++) {
- JoinCondition cond = conditions.get(i);
- comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond));
+ rightExpr = new ArrayList<>(conditions.size());
+ buildJoinColumns = Sets.newHashSet();
+
+ for (int i = 0; i < conditions.size(); i++) {
+ final SchemaPath rightPath = (SchemaPath) conditions.get(i).getRight();
+ final PathSegment.NameSegment nameSegment = (PathSegment.NameSegment)rightPath.getLastSegment();
+ buildJoinColumns.add(nameSegment.getPath());
+ final String refName = "build_side_" + i;
+ rightExpr.add(new NamedExpression(conditions.get(i).getRight(), new FieldReference(refName)));
}
+
+ numPartitions = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR);
+ if ( numPartitions == 1 ) { //
+ canSpill = false;
+ logger.warn("Spilling is disabled due to configuration setting of num_partitions to 1");
+ }
+
+ numPartitions = BaseAllocator.nextPowerOfTwo(numPartitions); // in case not a power of 2
+
this.allocator = oContext.getAllocator();
final long memLimit = context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR);
@@ -639,7 +808,11 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
partitions = new HashPartition[0];
}
- public void cleanup() {
+ /**
+ * This method is called when {@link HashJoinBatch} closes. It cleans up left over spilled files that are in the spill queue, and closes the
+ * spillSet.
+ */
+ private void cleanup() {
if ( buildSideIsEmpty ) { return; } // not set up; nothing to clean
if ( spillSet.getWriteBytes() > 0 ) {
stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled
@@ -669,9 +842,33 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
spillSet.close(); // delete the spill directory(ies)
}
+ /**
+ * This creates a string that summarizes the memory usage of the operator.
+ * @return A memory dump string.
+ */
+ public String makeDebugString() {
+ final StringBuilder sb = new StringBuilder();
+
+ for (int partitionIndex = 0; partitionIndex < partitions.length; partitionIndex++) {
+ final String partitionPrefix = "Partition " + partitionIndex + ": ";
+ final HashPartition hashPartition = partitions[partitionIndex];
+ sb.append(partitionPrefix).append(hashPartition.makeDebugString()).append("\n");
+ }
+
+ return sb.toString();
+ }
+
+ /**
+ * Updates the {@link HashTable} and spilling stats after the original build side is processed.
+ *
+ * Note: this does not update all the stats. The cycleNum is updated dynamically in {@link #innerNext()} and the total bytes
+ * written is updated at close time in {@link #cleanup()}.
+ */
private void updateStats() {
if ( buildSideIsEmpty ) { return; } // no stats when the right side is empty
if ( cycleNum > 0 ) { return; } // These stats are only for before processing spilled files
+
+ final HashTableStats htStats = new HashTableStats();
long numSpilled = 0;
HashTableStats newStats = new HashTableStats();
// sum the stats from all the partitions
@@ -983,5 +1180,4 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
(joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) ? ProbeState.PROJECT_RIGHT :
ProbeState.DONE; // else we're done
}
-
-} // public class HashJoinBatch
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
index 55146f4..b5b7183 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
@@ -128,6 +128,12 @@ public class HashJoinHelper {
buildInfoList.add(info);
}
+ /**
+ * Takes a composite index for a key produced by {@link HashTable#probeForKey(int, int)}, and uses it to look up the
+ * index of the first original key in the original data.
+ * @param keyIndex A composite index for a key produced by {@link HashTable#probeForKey(int, int)}
+ * @return The composite index for the first added key record in the original data.
+ */
public int getStartIndex(int keyIndex) {
int batchIdx = keyIndex / HashTable.BATCH_SIZE;
int offsetIdx = keyIndex % HashTable.BATCH_SIZE;
@@ -139,6 +145,12 @@ public class HashJoinHelper {
return sv4.get(offsetIdx);
}
+ /**
+ * Takes a composite index for a key produced by {@link HashJoinHelper#getStartIndex(int)}, and returns the composite index for the
+ * next record in the list of records that match a key. The result is a composite index for a record within the original data set.
+ * @param currentIdx A composite index for a key produced by {@link HashJoinHelper#getStartIndex(int)}.
+ * @return The composite index for the next record in the list of records that match a key. The result is a composite index for a record within the original data set.
+ */
public int getNextIndex(int currentIdx) {
// Get to the links field of the current index to get the next index
int batchIdx = currentIdx >>> SHIFT_SIZE;
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculator.java
similarity index 66%
copy from exec/vector/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculator.java
index 09bcdd8..f5c826a 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculator.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,19 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.drill.exec.vector;
+package org.apache.drill.exec.physical.impl.join;
-public interface FixedWidthVector extends ValueVector {
-
- /**
- * Allocate a new memory space for this vector. Must be called prior to using the ValueVector.
- *
- * @param valueCount Number of values in the vector.
- */
- void allocateNew(int valueCount);
-
- /**
- * Zero out the underlying buffer backing this vector.
- */
- void zeroVector();
+public interface HashJoinHelperSizeCalculator {
+ long calculateSize(HashJoinMemoryCalculator.PartitionStat partitionStat, double fragmentationFactor);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculatorImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculatorImpl.java
new file mode 100644
index 0000000..728fde5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculatorImpl.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import com.google.common.base.Preconditions;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.vector.IntVector;
+
+public class HashJoinHelperSizeCalculatorImpl implements HashJoinHelperSizeCalculator {
+ public static final HashJoinHelperSizeCalculatorImpl INSTANCE = new HashJoinHelperSizeCalculatorImpl();
+
+ private HashJoinHelperSizeCalculatorImpl() {
+ // Do nothing
+ }
+
+ @Override
+ public long calculateSize(HashJoinMemoryCalculator.PartitionStat partitionStat, double fragmentationFactor) {
+ Preconditions.checkArgument(!partitionStat.isSpilled());
+
+ // Account for the size of the SV4 in a hash join helper
+ long joinHelperSize = IntVector.VALUE_WIDTH * RecordBatch.MAX_BATCH_SIZE;
+
+ // Account for the SV4 for each batch that holds links for each batch
+ for (HashJoinMemoryCalculator.BatchStat batchStat: partitionStat.getInMemoryBatches()) {
+ // Note we don't have to round up to nearest power of 2, since we allocate the exact
+ // space needed for the value vector.
+ joinHelperSize += batchStat.getNumRecords() * IntVector.VALUE_WIDTH;
+ }
+
+ // Note the BitSets of the HashJoin helper are stored on heap, so we don't account for them here.
+ // TODO move BitSets to direct memory
+
+ return RecordBatchSizer.multiplyByFactor(joinHelperSize, fragmentationFactor);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
new file mode 100644
index 0000000..f2de0fe
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.io.FileUtils;
+import org.apache.drill.exec.record.RecordBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * <p>
+ * This class is responsible for managing the memory calculations for the HashJoin operator.
+ * Since the HashJoin operator has different phases of execution, this class needs to perform
+ * different memory calculations at each phase. The phases of execution have been broken down
+ * into an explicit state machine diagram below. What ocurrs in each state is described in
+ * the documentation of the {@link HashJoinState} class below. <b>Note:</b> the transition from Probing
+ * and Partitioning back to Build Side Partitioning. This happens we had to spill probe side
+ * partitions and we needed to recursively process spilled partitions. This recursion is
+ * described in more detail in the example below.
+ * </p>
+ * <p>
+ *
+ * +--------------+ <-------+
+ * | Build Side | |
+ * | Partitioning| |
+ * | | |
+ * +------+-------+ |
+ * | |
+ * | |
+ * v |
+ * +--------------+ |
+ * |Probing and | |
+ * |Partitioning | |
+ * | | |
+ * +--------------+ |
+ * | |
+ * +----------------+
+ * |
+ * v
+ * Done
+ * </p>
+ * <p>
+ * An overview of how these states interact can be summarized with the following example.<br/><br/>
+ *
+ * Consider the case where we have 4 partition configured initially.<br/><br/>
+ *
+ * <ol>
+ * <li>We first start consuming build side batches and putting their records into one of 4 build side partitions.</li>
+ * <li>Once we run out of memory we start spilling build side partition one by one</li>
+ * <li>We keep partitioning build side batches until all the build side batches are consumed.</li>
+ * <li>After we have consumed the build side we prepare to probe by building hashtables for the partitions
+ * we have in memory. If we don't have enough room for all the hashtables in memory we spill build side
+ * partitions until we do have enough room.</li>
+ * <li>We now start processing the probe side. For each probe record we determine its build partition. If
+ * the build partition is in memory we do the join for the record and emit it. If the build partition is
+ * not in memory we spill the probe record. We continue this process until all the probe side records are consumed.</li>
+ * <li>If we didn't spill any probe side partitions because all the build side partition were in memory, our join
+ * operation is done. If we did spill probe side partitions we have to recursively repeat this whole process for each
+ * spilled probe and build side partition pair.</li>
+ * </ol>
+ * </p>
+*/
+public interface HashJoinMemoryCalculator extends HashJoinStateCalculator<HashJoinMemoryCalculator.BuildSidePartitioning> {
+ void initialize(boolean doMemoryCalc);
+
+ /**
+ * The interface representing the {@link HashJoinStateCalculator} corresponding to the
+ * {@link HashJoinState#BUILD_SIDE_PARTITIONING} state.
+ */
+ interface BuildSidePartitioning extends HashJoinStateCalculator<PostBuildCalculations> {
+ void initialize(boolean autoTune,
+ boolean reserveHash,
+ RecordBatch buildSideBatch,
+ RecordBatch probeSideBatch,
+ Set<String> joinColumns,
+ long memoryAvailable,
+ int initialPartitions,
+ int recordsPerPartitionBatchBuild,
+ int recordsPerPartitionBatchProbe,
+ int maxBatchNumRecordsBuild,
+ int maxBatchNumRecordsProbe,
+ int outputBatchNumRecords,
+ double loadFactor);
+
+ void setPartitionStatSet(PartitionStatSet partitionStatSet);
+
+ int getNumPartitions();
+
+ long getBuildReservedMemory();
+
+ long getMaxReservedMemory();
+
+ boolean shouldSpill();
+
+ String makeDebugString();
+ }
+
+ /**
+ * The interface representing the {@link HashJoinStateCalculator} corresponding to the
+ * {@link HashJoinState#POST_BUILD_CALCULATIONS} state.
+ */
+ interface PostBuildCalculations extends HashJoinStateCalculator<HashJoinMemoryCalculator> {
+ void initialize();
+
+ boolean shouldSpill();
+
+ String makeDebugString();
+ }
+
+ interface PartitionStat {
+ List<BatchStat> getInMemoryBatches();
+
+ int getNumInMemoryBatches();
+
+ boolean isSpilled();
+
+ long getNumInMemoryRecords();
+
+ long getInMemorySize();
+ }
+
+ /**
+ * This class represents the memory size statistics for an entire set of partitions.
+ */
+ class PartitionStatSet {
+ private static final Logger log = LoggerFactory.getLogger(PartitionStatSet.class);
+ private final PartitionStat[] partitionStats;
+
+ public PartitionStatSet(final PartitionStat... partitionStats) {
+ this.partitionStats = Preconditions.checkNotNull(partitionStats);
+
+ for (PartitionStat partitionStat: partitionStats) {
+ Preconditions.checkNotNull(partitionStat);
+ }
+ }
+
+ public PartitionStat get(int partitionIndex) {
+ return partitionStats[partitionIndex];
+ }
+
+ public int getSize() {
+ return partitionStats.length;
+ }
+
+ // Somewhat inefficient but not a big deal since we don't deal with that many partitions
+ public long getNumInMemoryRecords() {
+ long numRecords = 0L;
+
+ for (final PartitionStat partitionStat: partitionStats) {
+ numRecords += partitionStat.getNumInMemoryRecords();
+ }
+
+ return numRecords;
+ }
+
+ public int getNumInMemoryBatches() {
+ int numBatches = 0;
+
+ for (final PartitionStat partitionStat: partitionStats) {
+ numBatches += partitionStat.getNumInMemoryBatches();
+ }
+
+ return numBatches;
+ }
+
+ // Somewhat inefficient but not a big deal since we don't deal with that many partitions
+ public long getConsumedMemory() {
+ long consumedMemory = 0L;
+
+ for (final PartitionStat partitionStat: partitionStats) {
+ consumedMemory += partitionStat.getInMemorySize();
+ }
+
+ return consumedMemory;
+ }
+
+ public List<Integer> getSpilledPartitions() {
+ return getPartitions(true);
+ }
+
+ public List<Integer> getInMemoryPartitions() {
+ return getPartitions(false);
+ }
+
+ public List<Integer> getPartitions(boolean spilled) {
+ List<Integer> partitionIndices = Lists.newArrayList();
+
+ for (int partitionIndex = 0; partitionIndex < partitionStats.length; partitionIndex++) {
+ final PartitionStat partitionStat = partitionStats[partitionIndex];
+
+ if (partitionStat.isSpilled() == spilled) {
+ partitionIndices.add(partitionIndex);
+ }
+ }
+
+ return partitionIndices;
+ }
+
+ public int getNumInMemoryPartitions() {
+ return getInMemoryPartitions().size();
+ }
+
+ public int getNumSpilledPartitions() {
+ return getSpilledPartitions().size();
+ }
+
+ public boolean allSpilled() {
+ return getSize() == getNumSpilledPartitions();
+ }
+
+ public boolean noneSpilled() {
+ return getSize() == getNumInMemoryPartitions();
+ }
+
+ public String makeDebugString() {
+ final StringBuilder sizeSb = new StringBuilder("Partition Sizes:\n");
+ final StringBuilder batchCountSb = new StringBuilder("Partition Batch Counts:\n");
+ final StringBuilder recordCountSb = new StringBuilder("Partition Record Counts:\n");
+
+ for (int partitionIndex = 0; partitionIndex < partitionStats.length; partitionIndex++) {
+ final PartitionStat partitionStat = partitionStats[partitionIndex];
+ final String partitionPrefix = partitionIndex + ": ";
+
+ sizeSb.append(partitionPrefix);
+ batchCountSb.append(partitionPrefix);
+ recordCountSb.append(partitionPrefix);
+
+ if (partitionStat.isSpilled()) {
+ sizeSb.append("Spilled");
+ batchCountSb.append("Spilled");
+ recordCountSb.append("Spilled");
+ } else if (partitionStat.getNumInMemoryRecords() == 0) {
+ sizeSb.append("Empty");
+ batchCountSb.append("Empty");
+ recordCountSb.append("Empty");
+ } else {
+ sizeSb.append(prettyPrintBytes(partitionStat.getInMemorySize()));
+ batchCountSb.append(partitionStat.getNumInMemoryBatches());
+ recordCountSb.append(partitionStat.getNumInMemoryRecords());
+ }
+
+ sizeSb.append("\n");
+ batchCountSb.append("\n");
+ recordCountSb.append("\n");
+ }
+
+ return sizeSb.toString() + "\n" + batchCountSb.toString() + "\n" + recordCountSb.toString();
+ }
+
+ public static String prettyPrintBytes(long byteCount) {
+ return String.format("%d (%s)", byteCount, FileUtils.byteCountToDisplaySize(byteCount));
+ }
+ }
+
+ class BatchStat {
+ private int numRecords;
+ private long batchSize;
+
+ public BatchStat(int numRecords, long batchSize) {
+ this.numRecords = numRecords;
+ this.batchSize = batchSize;
+ }
+
+ public long getNumRecords()
+ {
+ return numRecords;
+ }
+
+ public long getBatchSize()
+ {
+ return batchSize;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ BatchStat batchStat = (BatchStat) o;
+
+ if (numRecords != batchStat.numRecords) {
+ return false;
+ }
+
+ return batchSize == batchStat.batchSize;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = numRecords;
+ result = 31 * result + (int) (batchSize ^ (batchSize >>> 32));
+ return result;
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
new file mode 100644
index 0000000..5890a42
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
@@ -0,0 +1,833 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.commons.io.FileUtils;
+import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.vector.IntVector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+import static org.apache.drill.exec.physical.impl.join.HashJoinState.INITIALIZING;
+
+public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator {
+ private static final Logger log = LoggerFactory.getLogger(HashJoinMemoryCalculatorImpl.class);
+
+ private final double safetyFactor;
+ private final double fragmentationFactor;
+ private final double hashTableDoublingFactor;
+ private final String hashTableCalculatorType;
+
+ private boolean initialized = false;
+ private boolean doMemoryCalculation;
+
+ public HashJoinMemoryCalculatorImpl(final double safetyFactor,
+ final double fragmentationFactor,
+ final double hashTableDoublingFactor,
+ final String hashTableCalculatorType) {
+ this.safetyFactor = safetyFactor;
+ this.fragmentationFactor = fragmentationFactor;
+ this.hashTableDoublingFactor = hashTableDoublingFactor;
+ this.hashTableCalculatorType = hashTableCalculatorType;
+ }
+
+ public void initialize(boolean doMemoryCalculation) {
+ Preconditions.checkState(!initialized);
+ initialized = true;
+ this.doMemoryCalculation = doMemoryCalculation;
+ }
+
+ public BuildSidePartitioning next() {
+ Preconditions.checkState(initialized);
+
+ if (doMemoryCalculation) {
+ final HashTableSizeCalculator hashTableSizeCalculator;
+
+ if (hashTableCalculatorType.equals(HashTableSizeCalculatorLeanImpl.TYPE)) {
+ hashTableSizeCalculator = new HashTableSizeCalculatorLeanImpl(RecordBatch.MAX_BATCH_SIZE, hashTableDoublingFactor);
+ } else if (hashTableCalculatorType.equals(HashTableSizeCalculatorConservativeImpl.TYPE)) {
+ hashTableSizeCalculator = new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_SIZE, hashTableDoublingFactor);
+ } else {
+ throw new IllegalArgumentException("Invalid calc type: " + hashTableCalculatorType);
+ }
+
+ return new BuildSidePartitioningImpl(hashTableSizeCalculator,
+ HashJoinHelperSizeCalculatorImpl.INSTANCE,
+ fragmentationFactor, safetyFactor);
+ } else {
+ return new NoopBuildSidePartitioningImpl();
+ }
+ }
+
+ @Override
+ public HashJoinState getState() {
+ return INITIALIZING;
+ }
+
+ public static long computeMaxBatchSizeNoHash(final long incomingBatchSize,
+ final int incomingNumRecords,
+ final int desiredNumRecords,
+ final double fragmentationFactor,
+ final double safetyFactor) {
+ long maxBatchSize = HashJoinMemoryCalculatorImpl
+ .computePartitionBatchSize(incomingBatchSize, incomingNumRecords, desiredNumRecords);
+ // Multiple by fragmentation factor
+ return RecordBatchSizer.multiplyByFactors(maxBatchSize, fragmentationFactor, safetyFactor);
+ }
+
+ public static long computeMaxBatchSize(final long incomingBatchSize,
+ final int incomingNumRecords,
+ final int desiredNumRecords,
+ final double fragmentationFactor,
+ final double safetyFactor,
+ final boolean reserveHash) {
+ long size = computeMaxBatchSizeNoHash(incomingBatchSize,
+ incomingNumRecords,
+ desiredNumRecords,
+ fragmentationFactor,
+ safetyFactor);
+
+ if (!reserveHash) {
+ return size;
+ }
+
+ long hashSize = desiredNumRecords * ((long) IntVector.VALUE_WIDTH);
+ hashSize = RecordBatchSizer.multiplyByFactors(hashSize, fragmentationFactor);
+
+ return size + hashSize;
+ }
+
+ public static long computePartitionBatchSize(final long incomingBatchSize,
+ final int incomingNumRecords,
+ final int desiredNumRecords) {
+ return (long) Math.ceil((((double) incomingBatchSize) /
+ ((double) incomingNumRecords)) *
+ ((double) desiredNumRecords));
+ }
+
+ public static class NoopBuildSidePartitioningImpl implements BuildSidePartitioning {
+ private int initialPartitions;
+
+ @Override
+ public void initialize(boolean autoTune,
+ boolean reserveHash,
+ RecordBatch buildSideBatch,
+ RecordBatch probeSideBatch, Set<String> joinColumns,
+ long memoryAvailable,
+ int initialPartitions,
+ int recordsPerPartitionBatchBuild,
+ int recordsPerPartitionBatchProbe,
+ int maxBatchNumRecordsBuild,
+ int maxBatchNumRecordsProbe,
+ int outputBatchNumRecords,
+ double loadFactor) {
+ this.initialPartitions = initialPartitions;
+ }
+
+ @Override
+ public void setPartitionStatSet(PartitionStatSet partitionStatSet) {
+ // Do nothing
+ }
+
+ @Override
+ public int getNumPartitions() {
+ return initialPartitions;
+ }
+
+ @Override
+ public long getBuildReservedMemory() {
+ return 0;
+ }
+
+ @Override
+ public long getMaxReservedMemory() {
+ return 0;
+ }
+
+ @Override
+ public boolean shouldSpill() {
+ return false;
+ }
+
+ @Override
+ public String makeDebugString() {
+ return "No debugging for " + NoopBuildSidePartitioningImpl.class.getCanonicalName();
+ }
+
+ @Nullable
+ @Override
+ public PostBuildCalculations next() {
+ return new NoopPostBuildCalculationsImpl();
+ }
+
+ @Override
+ public HashJoinState getState() {
+ return HashJoinState.BUILD_SIDE_PARTITIONING;
+ }
+ }
+
+ /**
+ * <h1>Basic Functionality</h1>
+ * <p>
+ * At this point we need to reserve memory for the following:
+ * <ol>
+ * <li>An incoming batch</li>
+ * <li>An incomplete batch for each partition</li>
+ * </ol>
+ * If there is available memory we keep the batches for each partition in memory.
+ * If we run out of room and need to start spilling, we need to specify which partitions
+ * need to be spilled.
+ * </p>
+ * <h1>Life Cycle</h1>
+ * <p>
+ * <ul>
+ * <li><b>Step 0:</b> Call {@link #initialize(boolean, boolean, RecordBatch, RecordBatch, Set, long, int, int, int, int, int, int, double)}.
+ * This will initialize the StateCalculate with the additional information it needs.</li>
+ * <li><b>Step 1:</b> Call {@link #getNumPartitions()} to see the number of partitions that fit in memory.</li>
+ * <li><b>Step 2:</b> Call {@link #shouldSpill()} To determine if spilling needs to occurr.</li>
+ * <li><b>Step 3:</b> Call {@link #next()} and get the next memory calculator associated with your next state.</li>
+ * </ul>
+ * </p>
+ */
+ public static class BuildSidePartitioningImpl implements BuildSidePartitioning {
+ public static final Logger log = LoggerFactory.getLogger(BuildSidePartitioning.class);
+
+ private final HashTableSizeCalculator hashTableSizeCalculator;
+ private final HashJoinHelperSizeCalculator hashJoinHelperSizeCalculator;
+ private final double fragmentationFactor;
+ private final double safetyFactor;
+
+ private int maxBatchNumRecordsBuild;
+ private int maxBatchNumRecordsProbe;
+ private long memoryAvailable;
+ private long buildBatchSize;
+ private long probeBatchSize;
+ private int buildNumRecords;
+ private int probeNumRecords;
+ private long maxBuildBatchSize;
+ private long maxProbeBatchSize;
+ private long maxOutputBatchSize;
+ private int initialPartitions;
+ private int partitions;
+ private int recordsPerPartitionBatchBuild;
+ private int recordsPerPartitionBatchProbe;
+ private int outputBatchNumRecords;
+ private Map<String, Long> buildValueSizes;
+ private Map<String, Long> probeValueSizes;
+ private Map<String, Long> keySizes;
+ private boolean autoTune;
+ private boolean reserveHash;
+ private double loadFactor;
+
+ private PartitionStatSet partitionStatsSet;
+ private long partitionBuildBatchSize;
+ private long partitionProbeBatchSize;
+ private long reservedMemory;
+ private long maxReservedMemory;
+
+ private boolean firstInitialized;
+ private boolean initialized;
+
+ public BuildSidePartitioningImpl(final HashTableSizeCalculator hashTableSizeCalculator,
+ final HashJoinHelperSizeCalculator hashJoinHelperSizeCalculator,
+ final double fragmentationFactor,
+ final double safetyFactor) {
+ this.hashTableSizeCalculator = Preconditions.checkNotNull(hashTableSizeCalculator);
+ this.hashJoinHelperSizeCalculator = Preconditions.checkNotNull(hashJoinHelperSizeCalculator);
+ this.fragmentationFactor = fragmentationFactor;
+ this.safetyFactor = safetyFactor;
+ }
+
+ @Override
+ public void initialize(boolean autoTune,
+ boolean reserveHash,
+ RecordBatch buildSideBatch,
+ RecordBatch probeSideBatch,
+ Set<String> joinColumns,
+ long memoryAvailable,
+ int initialPartitions,
+ int recordsPerPartitionBatchBuild,
+ int recordsPerPartitionBatchProbe,
+ int maxBatchNumRecordsBuild,
+ int maxBatchNumRecordsProbe,
+ int outputBatchNumRecords,
+ double loadFactor) {
+ Preconditions.checkNotNull(buildSideBatch);
+ Preconditions.checkNotNull(probeSideBatch);
+ Preconditions.checkNotNull(joinColumns);
+
+ final RecordBatchSizer buildSizer = new RecordBatchSizer(buildSideBatch);
+ final RecordBatchSizer probeSizer = new RecordBatchSizer(probeSideBatch);
+
+ long buildBatchSize = getBatchSizeEstimate(buildSideBatch);
+ long probeBatchSize = getBatchSizeEstimate(probeSideBatch);
+
+ int buildNumRecords = buildSizer.rowCount();
+ int probeNumRecords = probeSizer.rowCount();
+
+ final CaseInsensitiveMap<Long> buildValueSizes = getNotExcludedColumnSizes(
+ joinColumns, buildSizer);
+ final CaseInsensitiveMap<Long> probeValueSizes = getNotExcludedColumnSizes(
+ joinColumns, probeSizer);
+ final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap();
+
+ for (String joinColumn: joinColumns) {
+ final RecordBatchSizer.ColumnSize columnSize = buildSizer.columns().get(joinColumn);
+ keySizes.put(joinColumn, (long)columnSize.getStdNetOrNetSizePerEntry());
+ }
+
+ initialize(autoTune,
+ reserveHash,
+ buildValueSizes,
+ probeValueSizes,
+ keySizes,
+ memoryAvailable,
+ initialPartitions,
+ buildBatchSize,
+ probeBatchSize,
+ buildNumRecords,
+ probeNumRecords,
+ recordsPerPartitionBatchBuild,
+ recordsPerPartitionBatchProbe,
+ maxBatchNumRecordsBuild,
+ maxBatchNumRecordsProbe,
+ outputBatchNumRecords,
+ loadFactor);
+ }
+
+ @VisibleForTesting
+ protected static CaseInsensitiveMap<Long> getNotExcludedColumnSizes(
+ final Set<String> excludedColumns,
+ final RecordBatchSizer batchSizer) {
+ final CaseInsensitiveMap<Long> columnSizes = CaseInsensitiveMap.newHashMap();
+ final CaseInsensitiveMap<Boolean> excludedSet = CaseInsensitiveMap.newHashMap();
+
+ for (final String excludedColumn: excludedColumns) {
+ excludedSet.put(excludedColumn, true);
+ }
+
+ for (final Map.Entry<String, RecordBatchSizer.ColumnSize> entry: batchSizer.columns().entrySet()) {
+ final String columnName = entry.getKey();
+ final RecordBatchSizer.ColumnSize columnSize = entry.getValue();
+
+ columnSizes.put(columnName, (long) columnSize.getStdNetOrNetSizePerEntry());
+ }
+
+ return columnSizes;
+ }
+
+ public static long getBatchSizeEstimate(final RecordBatch recordBatch) {
+ final RecordBatchSizer sizer = new RecordBatchSizer(recordBatch);
+ long size = 0L;
+
+ for (Map.Entry<String, RecordBatchSizer.ColumnSize> column: sizer.columns().entrySet()) {
+ size += PostBuildCalculationsImpl.computeValueVectorSize(recordBatch.getRecordCount(), column.getValue().getStdNetOrNetSizePerEntry());
+ }
+
+ return size;
+ }
+
+ @VisibleForTesting
+ protected void initialize(boolean autoTune,
+ boolean reserveHash,
+ CaseInsensitiveMap<Long> buildValueSizes,
+ CaseInsensitiveMap<Long> probeValueSizes,
+ CaseInsensitiveMap<Long> keySizes,
+ long memoryAvailable,
+ int initialPartitions,
+ long buildBatchSize,
+ long probeBatchSize,
+ int buildNumRecords,
+ int probeNumRecords,
+ int recordsPerPartitionBatchBuild,
+ int recordsPerPartitionBatchProbe,
+ int maxBatchNumRecordsBuild,
+ int maxBatchNumRecordsProbe,
+ int outputBatchNumRecords,
+ double loadFactor) {
+ Preconditions.checkState(!firstInitialized);
+ Preconditions.checkArgument(initialPartitions >= 1);
+ firstInitialized = true;
+
+ this.loadFactor = loadFactor;
+ this.autoTune = autoTune;
+ this.reserveHash = reserveHash;
+ this.buildValueSizes = Preconditions.checkNotNull(buildValueSizes);
+ this.probeValueSizes = Preconditions.checkNotNull(probeValueSizes);
+ this.keySizes = Preconditions.checkNotNull(keySizes);
+ this.memoryAvailable = memoryAvailable;
+ this.buildBatchSize = buildBatchSize;
+ this.probeBatchSize = probeBatchSize;
+ this.buildNumRecords = buildNumRecords;
+ this.probeNumRecords = probeNumRecords;
+ this.initialPartitions = initialPartitions;
+ this.recordsPerPartitionBatchBuild = recordsPerPartitionBatchBuild;
+ this.recordsPerPartitionBatchProbe = recordsPerPartitionBatchProbe;
+ this.maxBatchNumRecordsBuild = maxBatchNumRecordsBuild;
+ this.maxBatchNumRecordsProbe = maxBatchNumRecordsProbe;
+ this.outputBatchNumRecords = outputBatchNumRecords;
+
+ calculateMemoryUsage();
+
+ log.debug("Creating {} partitions when {} initial partitions configured.", partitions, initialPartitions);
+ }
+
+ @Override
+ public void setPartitionStatSet(final PartitionStatSet partitionStatSet) {
+ Preconditions.checkState(!initialized);
+ initialized = true;
+
+ partitionStatsSet = Preconditions.checkNotNull(partitionStatSet);
+ }
+
+ @Override
+ public int getNumPartitions() {
+ return partitions;
+ }
+
+ @Override
+ public long getBuildReservedMemory() {
+ Preconditions.checkState(firstInitialized);
+ return reservedMemory;
+ }
+
+ @Override
+ public long getMaxReservedMemory() {
+ Preconditions.checkState(firstInitialized);
+ return maxReservedMemory;
+ }
+
+ /**
+ * This method calculates the amount of memory we need to reserve while partitioning. It also
+ * calculates the size of a partition batch.
+ */
+ private void calculateMemoryUsage()
+ {
+ // Adjust based on number of records
+ maxBuildBatchSize = computeMaxBatchSizeNoHash(buildBatchSize, buildNumRecords,
+ maxBatchNumRecordsBuild, fragmentationFactor, safetyFactor);
+ maxProbeBatchSize = computeMaxBatchSizeNoHash(probeBatchSize, probeNumRecords,
+ maxBatchNumRecordsProbe, fragmentationFactor, safetyFactor);
+
+ // Safety factor can be multiplied at the end since these batches are coming from exchange operators, so no excess value vector doubling
+ partitionBuildBatchSize = computeMaxBatchSize(buildBatchSize,
+ buildNumRecords,
+ recordsPerPartitionBatchBuild,
+ fragmentationFactor,
+ safetyFactor,
+ reserveHash);
+
+ // Safety factor can be multiplied at the end since these batches are coming from exchange operators, so no excess value vector doubling
+ partitionProbeBatchSize = computeMaxBatchSize(
+ probeBatchSize,
+ probeNumRecords,
+ recordsPerPartitionBatchProbe,
+ fragmentationFactor,
+ safetyFactor,
+ reserveHash);
+
+ maxOutputBatchSize = computeMaxOutputBatchSize(buildValueSizes, probeValueSizes, keySizes,
+ outputBatchNumRecords, safetyFactor, fragmentationFactor);
+
+ long probeReservedMemory;
+
+ for (partitions = initialPartitions;; partitions /= 2) {
+ // The total amount of memory to reserve for incomplete batches across all partitions
+ long incompletePartitionsBatchSizes = ((long) partitions) * partitionBuildBatchSize;
+ // We need to reserve all the space for incomplete batches, and the incoming batch as well as the
+ // probe batch we sniffed.
+ // TODO when batch sizing project is complete we won't have to sniff probe batches since
+ // they will have a well defined size.
+ reservedMemory = incompletePartitionsBatchSizes + maxBuildBatchSize + maxProbeBatchSize;
+
+ probeReservedMemory = PostBuildCalculationsImpl.calculateReservedMemory(
+ partitions,
+ maxProbeBatchSize,
+ maxOutputBatchSize,
+ partitionProbeBatchSize);
+
+ maxReservedMemory = Math.max(reservedMemory, probeReservedMemory);
+
+ if (!autoTune || maxReservedMemory <= memoryAvailable) {
+ // Stop the tuning loop if we are not doing auto tuning, or if we are living within our memory limit
+ break;
+ }
+
+ if (partitions == 2) {
+ // Can't have fewer than 2 partitions
+ break;
+ }
+ }
+
+ if (maxReservedMemory > memoryAvailable) {
+ // We don't have enough memory we need to fail or warn
+
+ String message = String.format("HashJoin needs to reserve %d bytes of memory but there are " +
+ "only %d bytes available. Using %d num partitions with %d initial partitions. Additional info:\n" +
+ "buildBatchSize = %d\n" +
+ "buildNumRecords = %d\n" +
+ "partitionBuildBatchSize = %d\n" +
+ "recordsPerPartitionBatchBuild = %d\n" +
+ "probeBatchSize = %d\n" +
+ "probeNumRecords = %d\n" +
+ "partitionProbeBatchSize = %d\n" +
+ "recordsPerPartitionBatchProbe = %d\n",
+ reservedMemory, memoryAvailable, partitions, initialPartitions,
+ buildBatchSize,
+ buildNumRecords,
+ partitionBuildBatchSize,
+ recordsPerPartitionBatchBuild,
+ probeBatchSize,
+ probeNumRecords,
+ partitionProbeBatchSize,
+ recordsPerPartitionBatchProbe);
+
+ String phase = "Probe phase: ";
+
+ if (reservedMemory > memoryAvailable) {
+ if (probeReservedMemory > memoryAvailable) {
+ phase = "Build and Probe phases: ";
+ } else {
+ phase = "Build phase: ";
+ }
+ }
+
+ message = phase + message;
+ log.warn(message);
+ }
+ }
+
+ public static long computeMaxOutputBatchSize(Map<String, Long> buildValueSizes,
+ Map<String, Long> probeValueSizes,
+ Map<String, Long> keySizes,
+ int outputNumRecords,
+ double safetyFactor,
+ double fragmentationFactor) {
+ long outputSize = HashTableSizeCalculatorConservativeImpl.computeVectorSizes(keySizes, outputNumRecords, safetyFactor)
+ + HashTableSizeCalculatorConservativeImpl.computeVectorSizes(buildValueSizes, outputNumRecords, safetyFactor)
+ + HashTableSizeCalculatorConservativeImpl.computeVectorSizes(probeValueSizes, outputNumRecords, safetyFactor);
+ return RecordBatchSizer.multiplyByFactor(outputSize, fragmentationFactor);
+ }
+
+ @Override
+ public boolean shouldSpill() {
+ Preconditions.checkState(initialized);
+
+ long consumedMemory = reservedMemory;
+
+ if (reserveHash) {
+ // Include the hash sizes for the batch
+ consumedMemory += ((long) IntVector.VALUE_WIDTH) * partitionStatsSet.getNumInMemoryRecords();
+ }
+
+ consumedMemory += RecordBatchSizer.multiplyByFactor(partitionStatsSet.getConsumedMemory(), fragmentationFactor);
+ return consumedMemory > memoryAvailable;
+ }
+
+ @Override
+ public PostBuildCalculations next() {
+ Preconditions.checkState(initialized);
+
+ return new PostBuildCalculationsImpl(memoryAvailable,
+ partitionProbeBatchSize,
+ maxProbeBatchSize,
+ maxOutputBatchSize,
+ partitionStatsSet,
+ keySizes,
+ hashTableSizeCalculator,
+ hashJoinHelperSizeCalculator,
+ fragmentationFactor,
+ safetyFactor,
+ loadFactor,
+ reserveHash);
+ }
+
+ @Override
+ public HashJoinState getState() {
+ return HashJoinState.BUILD_SIDE_PARTITIONING;
+ }
+
+ @Override
+ public String makeDebugString() {
+ final String calcVars = String.format(
+ "Build side calculator vars:\n" +
+ "memoryAvailable = %s\n" +
+ "maxBuildBatchSize = %s\n" +
+ "maxOutputBatchSize = %s\n",
+ PartitionStatSet.prettyPrintBytes(memoryAvailable),
+ PartitionStatSet.prettyPrintBytes(maxBuildBatchSize),
+ PartitionStatSet.prettyPrintBytes(maxOutputBatchSize));
+
+ String partitionStatDebugString = "";
+
+ if (partitionStatsSet != null) {
+ partitionStatDebugString = partitionStatsSet.makeDebugString();
+ }
+
+ return calcVars + "\n" + partitionStatDebugString;
+ }
+ }
+
+ public static class NoopPostBuildCalculationsImpl implements PostBuildCalculations {
+ @Override
+ public void initialize() {
+
+ }
+
+ @Override
+ public boolean shouldSpill() {
+ return false;
+ }
+
+ @Nullable
+ @Override
+ public HashJoinMemoryCalculator next() {
+ return null;
+ }
+
+ @Override
+ public HashJoinState getState() {
+ return HashJoinState.POST_BUILD_CALCULATIONS;
+ }
+
+ @Override
+ public String makeDebugString() {
+ return "Noop " + NoopPostBuildCalculationsImpl.class.getCanonicalName() + " calculator.";
+ }
+ }
+
+ /**
+ * <h1>Basic Functionality</h1>
+ * <p>
+ * In this state, we need to make sure there is enough room to spill probe side batches, if
+ * spilling is necessary. If there is not enough room, we have to evict build side partitions.
+ * If we don't have to evict build side partitions in this state, then we are done. If we do have
+ * to evict build side partitions then we have to recursively repeat the process.
+ * </p>
+ * <h1>Lifecycle</h1>
+ * <p>
+ * <ul>
+ * <li><b>Step 1:</b> Call {@link #initialize()}. This
+ * gives the {@link HashJoinStateCalculator} additional information it needs to compute memory requirements.</li>
+ * <li><b>Step 2:</b> Call {@link #shouldSpill()}. This tells
+ * you which build side partitions need to be spilled in order to make room for probing.</li>
+ * <li><b>Step 3:</b> Call {@link #next()}. After you are done probing
+ * and partitioning the probe side, get the next calculator.</li>
+ * </ul>
+ * </p>
+ */
+ public static class PostBuildCalculationsImpl implements PostBuildCalculations {
+ private final long memoryAvailable;
+ private final long partitionProbeBatchSize;
+ private final long maxProbeBatchSize;
+ private final long maxOutputBatchSize;
+ private final PartitionStatSet buildPartitionStatSet;
+ private final Map<String, Long> keySizes;
+ private final HashTableSizeCalculator hashTableSizeCalculator;
+ private final HashJoinHelperSizeCalculator hashJoinHelperSizeCalculator;
+ private final double fragmentationFactor;
+ private final double safetyFactor;
+ private final double loadFactor;
+ private final boolean reserveHash;
+ // private final long maxOutputBatchSize;
+
+ private boolean initialized;
+ private long consumedMemory;
+
+ public PostBuildCalculationsImpl(final long memoryAvailable,
+ final long partitionProbeBatchSize,
+ final long maxProbeBatchSize,
+ final long maxOutputBatchSize,
+ final PartitionStatSet buildPartitionStatSet,
+ final Map<String, Long> keySizes,
+ final HashTableSizeCalculator hashTableSizeCalculator,
+ final HashJoinHelperSizeCalculator hashJoinHelperSizeCalculator,
+ final double fragmentationFactor,
+ final double safetyFactor,
+ final double loadFactor,
+ final boolean reserveHash) {
+ this.memoryAvailable = memoryAvailable;
+ this.partitionProbeBatchSize = partitionProbeBatchSize;
+ this.maxProbeBatchSize = maxProbeBatchSize;
+ this.maxOutputBatchSize = maxOutputBatchSize;
+ this.buildPartitionStatSet = Preconditions.checkNotNull(buildPartitionStatSet);
+ this.keySizes = Preconditions.checkNotNull(keySizes);
+ this.hashTableSizeCalculator = Preconditions.checkNotNull(hashTableSizeCalculator);
+ this.hashJoinHelperSizeCalculator = Preconditions.checkNotNull(hashJoinHelperSizeCalculator);
+ this.fragmentationFactor = fragmentationFactor;
+ this.safetyFactor = safetyFactor;
+ this.loadFactor = loadFactor;
+ this.reserveHash = reserveHash;
+ }
+
+ // TODO take an incoming Probe RecordBatch
+ @Override
+ public void initialize() {
+ Preconditions.checkState(!initialized);
+ initialized = true;
+ }
+
+ public long getConsumedMemory() {
+ Preconditions.checkState(initialized);
+ return consumedMemory;
+ }
+
+ // TODO move this somewhere else that makes sense
+ public static long computeValueVectorSize(long numRecords, long byteSize)
+ {
+ long naiveSize = numRecords * byteSize;
+ return roundUpToPowerOf2(naiveSize);
+ }
+
+ public static long computeValueVectorSize(long numRecords, long byteSize, double safetyFactor)
+ {
+ long naiveSize = RecordBatchSizer.multiplyByFactor(numRecords * byteSize, safetyFactor);
+ return roundUpToPowerOf2(naiveSize);
+ }
+
+ // TODO move to drill common
+ public static long roundUpToPowerOf2(long num)
+ {
+ Preconditions.checkArgument(num >= 1);
+ return num == 1 ? 1 : Long.highestOneBit(num - 1) << 1;
+ }
+
+ public static long calculateReservedMemory(final int numSpilledPartitions,
+ final long maxProbeBatchSize,
+ final long maxOutputBatchSize,
+ final long partitionProbeBatchSize) {
+ // We need to have enough space for the incoming batch, as well as a batch for each probe side
+ // partition that is being spilled. And enough space for the output batch
+ return maxProbeBatchSize
+ + maxOutputBatchSize
+ + partitionProbeBatchSize * numSpilledPartitions;
+ }
+
+ @Override
+ public boolean shouldSpill() {
+ Preconditions.checkState(initialized);
+
+ long reservedMemory = calculateReservedMemory(
+ buildPartitionStatSet.getNumSpilledPartitions(),
+ maxProbeBatchSize,
+ maxOutputBatchSize,
+ partitionProbeBatchSize);
+
+ // We are consuming our reserved memory plus the amount of memory for each build side
+ // batch and the size of the hashtables and the size of the join helpers
+ consumedMemory = reservedMemory + RecordBatchSizer.multiplyByFactor(buildPartitionStatSet.getConsumedMemory(), fragmentationFactor);
+
+ // Handle early completion conditions
+ if (buildPartitionStatSet.allSpilled()) {
+ // All build side partitions are spilled so our memory calculation is complete
+ return false;
+ }
+
+ for (int partitionIndex: buildPartitionStatSet.getInMemoryPartitions()) {
+ final PartitionStat partitionStat = buildPartitionStatSet.get(partitionIndex);
+
+ if (partitionStat.getNumInMemoryRecords() == 0) {
+ // TODO Hash hoin still allocates empty hash tables and hash join helpers. We should fix hash join
+ // not to allocate empty tables and helpers.
+ continue;
+ }
+
+ long hashTableSize = hashTableSizeCalculator.calculateSize(partitionStat, keySizes, loadFactor, safetyFactor, fragmentationFactor);
+ long hashJoinHelperSize = hashJoinHelperSizeCalculator.calculateSize(partitionStat, fragmentationFactor);
+
+ consumedMemory += hashTableSize + hashJoinHelperSize;
+ }
+
+ return consumedMemory > memoryAvailable;
+ }
+
+ @Nullable
+ @Override
+ public HashJoinMemoryCalculator next() {
+ Preconditions.checkState(initialized);
+
+ if (buildPartitionStatSet.noneSpilled()) {
+ // If none of our partitions were spilled then we were able to probe everything and we are done
+ return null;
+ }
+
+ // Some of our probe side batches were spilled so we have to recursively process the partitions.
+ return new HashJoinMemoryCalculatorImpl(
+ safetyFactor, fragmentationFactor, hashTableSizeCalculator.getDoublingFactor(), hashTableSizeCalculator.getType());
+ }
+
+ @Override
+ public HashJoinState getState() {
+ return HashJoinState.POST_BUILD_CALCULATIONS;
+ }
+
+ @Override
+ public String makeDebugString() {
+ Preconditions.checkState(initialized);
+
+ String calcVars = String.format(
+ "Mem calc stats:\n" +
+ "memoryLimit = %s\n" +
+ "consumedMemory = %s\n" +
+ "maxProbeBatchSize = %s\n" +
+ "maxOutputBatchSize = %s\n",
+ PartitionStatSet.prettyPrintBytes(memoryAvailable),
+ PartitionStatSet.prettyPrintBytes(consumedMemory),
+ PartitionStatSet.prettyPrintBytes(maxProbeBatchSize),
+ PartitionStatSet.prettyPrintBytes(maxOutputBatchSize));
+
+ StringBuilder hashJoinHelperSb = new StringBuilder("Partition Hash Join Helpers\n");
+ StringBuilder hashTableSb = new StringBuilder("Partition Hash Tables\n");
+
+ for (int partitionIndex: buildPartitionStatSet.getInMemoryPartitions()) {
+ final PartitionStat partitionStat = buildPartitionStatSet.get(partitionIndex);
+ final String partitionPrefix = partitionIndex + ": ";
+
+ hashJoinHelperSb.append(partitionPrefix);
+ hashTableSb.append(partitionPrefix);
+
+ if (partitionStat.getNumInMemoryBatches() == 0) {
+ hashJoinHelperSb.append("Empty");
+ hashTableSb.append("Empty");
+ } else {
+ long hashJoinHelperSize = hashJoinHelperSizeCalculator.calculateSize(partitionStat, fragmentationFactor);
+ long hashTableSize = hashTableSizeCalculator.calculateSize(partitionStat, keySizes, loadFactor, safetyFactor, fragmentationFactor);
+
+ hashJoinHelperSb.append(PartitionStatSet.prettyPrintBytes(hashJoinHelperSize));
+ hashTableSb.append(PartitionStatSet.prettyPrintBytes(hashTableSize));
+ }
+
+ hashJoinHelperSb.append("\n");
+ hashTableSb.append("\n");
+ }
+
+ return calcVars
+ + "\n" + buildPartitionStatSet.makeDebugString()
+ + "\n" + hashJoinHelperSb.toString()
+ + "\n" + hashTableSb.toString();
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinState.java
new file mode 100644
index 0000000..33f22be
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinState.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+public enum HashJoinState {
+ INITIALIZING,
+ /**
+ * In this state, the build side of the join operation is partitioned. Each partition is
+ * kept in memory. If we are able to fit all the partitions in memory and we have completely
+ * consumed the build side then we move to the {@link HashJoinState#POST_BUILD_CALCULATIONS}. If we
+ * run out of memory and we still have not consumed all of the build side, we start evicting
+ * partitions from memory to free memory. Then resume processing the build side. We repeat
+ * this process until the entire build side is consumed. After the build side is consumed we
+ * proceed to the {@link HashJoinState#POST_BUILD_CALCULATIONS} state.
+ */
+ BUILD_SIDE_PARTITIONING,
+ /**
+ * In this state, the probe side is consumed. If data in the probe side matches a build side partition
+ * kept in memory, it is joined and sent out. If data in the probe side does not match a build side
+ * partition, then it is spilled to disk. After all the probe side data is consumed processing moves
+ * on to the {@link HashJoinState#POST_BUILD_CALCULATIONS} state if build side partitions are small enough
+ * to fit into memory. If build side partitions can't fit into memory processing moves to the
+ * {@link HashJoinState#POST_BUILD_CALCULATIONS}
+ * state.
+ */
+ POST_BUILD_CALCULATIONS
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinStateCalculator.java
similarity index 53%
copy from exec/vector/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinStateCalculator.java
index 09bcdd8..695a68c 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinStateCalculator.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,19 +15,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.drill.exec.vector;
+package org.apache.drill.exec.physical.impl.join;
-public interface FixedWidthVector extends ValueVector {
+import javax.annotation.Nullable;
+/**
+ * A {@link HashJoinStateCalculator} is a piece of code that compute the memory requirements for one of the states
+ * in the {@link HashJoinState} enum.
+ */
+public interface HashJoinStateCalculator<T extends HashJoinStateCalculator> {
/**
- * Allocate a new memory space for this vector. Must be called prior to using the ValueVector.
- *
- * @param valueCount Number of values in the vector.
+ * Signifies that the current state is complete and returns the next {@link HashJoinStateCalculator}.
+ * Returns null in the case where there is no next state.
+ * @return The next {@link HashJoinStateCalculator} or null if this was the last state.
*/
- void allocateNew(int valueCount);
+ @Nullable
+ T next();
/**
- * Zero out the underlying buffer backing this vector.
+ * The current {@link HashJoinState} corresponding to this calculator.
+ * @return
*/
- void zeroVector();
+ HashJoinState getState();
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/SubOperatorTest.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculator.java
similarity index 65%
copy from exec/java-exec/src/test/java/org/apache/drill/test/SubOperatorTest.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculator.java
index 6bc2afc..04d15ff 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/SubOperatorTest.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculator.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,22 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.drill.test;
+package org.apache.drill.exec.physical.impl.join;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import java.util.Map;
-public class SubOperatorTest extends DrillTest {
+public interface HashTableSizeCalculator {
+ long calculateSize(HashJoinMemoryCalculator.PartitionStat partitionStat,
+ Map<String, Long> keySizes,
+ double loadFactor,
+ double safetyFactor,
+ double fragmentationFactor);
- protected static OperatorFixture fixture;
+ double getDoublingFactor();
- @BeforeClass
- public static void classSetup() throws Exception {
- fixture = OperatorFixture.standardFixture();
- }
-
- @AfterClass
- public static void classTeardown() throws Exception {
- fixture.close();
- }
+ String getType();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorConservativeImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorConservativeImpl.java
new file mode 100644
index 0000000..05354d5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorConservativeImpl.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import com.google.common.base.Preconditions;
+import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.vector.IntVector;
+
+import java.util.Map;
+
+import static org.apache.drill.exec.physical.impl.join.HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl.computeValueVectorSize;
+
+public class HashTableSizeCalculatorConservativeImpl implements HashTableSizeCalculator {
+ public static final String TYPE = "CONSERVATIVE";
+ public static final double HASHTABLE_DOUBLING_FACTOR = 2.0;
+ private final int maxNumRecords;
+ private final double hashTableDoublingFactor;
+
+ public HashTableSizeCalculatorConservativeImpl(int maxNumRecords, double hashTableDoublingFactor) {
+ this.maxNumRecords = maxNumRecords;
+ this.hashTableDoublingFactor = hashTableDoublingFactor;
+ }
+
+ @Override
+ public long calculateSize(final HashJoinMemoryCalculator.PartitionStat partitionStat,
+ final Map<String, Long> keySizes,
+ final double loadFactor,
+ final double safetyFactor,
+ final double fragmentationFactor) {
+ Preconditions.checkArgument(!keySizes.isEmpty());
+ Preconditions.checkArgument(!partitionStat.isSpilled());
+ Preconditions.checkArgument(partitionStat.getNumInMemoryRecords() > 0);
+
+ // The number of entries in the global index array. Note this array may not be completed populated and the degree of population depends on the load factor.
+ long numBuckets = (long) (((double) partitionStat.getNumInMemoryRecords()) * (1.0 / loadFactor));
+ // The number of pairs in the hash table
+ long numEntries = partitionStat.getNumInMemoryRecords();
+ // The number of Batch Holders in the hash table. Note that entries are tightly packed in the Batch Holders (no gaps).
+ long numBatchHolders = (numEntries + maxNumRecords - 1) / maxNumRecords;
+
+ // Include the size of the buckets array
+ long hashTableSize = RecordBatchSizer.multiplyByFactors(computeValueVectorSize(numBuckets, IntVector.VALUE_WIDTH), hashTableDoublingFactor);
+ // Each Batch Holder has an int vector of max size for holding links and hash values
+ hashTableSize += numBatchHolders * 2L * IntVector.VALUE_WIDTH * ((long) maxNumRecords);
+
+ long numFullBatchHolders = numEntries % maxNumRecords == 0? numBatchHolders: numBatchHolders - 1;
+ // Compute the size of the value vectors holding keys in each full bucket
+ hashTableSize += numFullBatchHolders * computeVectorSizes(keySizes, maxNumRecords, safetyFactor);
+
+ if (numFullBatchHolders != numBatchHolders) {
+ // The last bucket is a partial bucket
+ long partialNumEntries = numEntries % maxNumRecords;
+ final long partialSize = computeVectorSizes(keySizes, partialNumEntries, safetyFactor);
+ hashTableSize += RecordBatchSizer.multiplyByFactors(partialSize, hashTableDoublingFactor);
+ }
+
+ return RecordBatchSizer.multiplyByFactors(hashTableSize, fragmentationFactor);
+ }
+
+ @Override
+ public double getDoublingFactor() {
+ return hashTableDoublingFactor;
+ }
+
+ @Override
+ public String getType() {
+ return TYPE;
+ }
+
+ public static long computeVectorSizes(final Map<String, Long> vectorSizes,
+ final long numRecords,
+ final double safetyFactor) {
+ long totalKeySize = 0L;
+
+ for (Map.Entry<String, Long> entry: vectorSizes.entrySet()) {
+ totalKeySize += computeValueVectorSize(numRecords, entry.getValue(), safetyFactor);
+ }
+
+ return totalKeySize;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorLeanImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorLeanImpl.java
new file mode 100644
index 0000000..87d8d1b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorLeanImpl.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import com.google.common.base.Preconditions;
+import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.vector.IntVector;
+
+import java.util.Map;
+
+import static org.apache.drill.exec.physical.impl.join.HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl.computeValueVectorSize;
+
+public class HashTableSizeCalculatorLeanImpl implements HashTableSizeCalculator {
+ public static final String TYPE = "LEAN";
+ private final int maxNumRecords;
+ private final double hashTableDoublingFactor;
+
+ public HashTableSizeCalculatorLeanImpl(int maxNumRecords, double hashTableDoublingFactor) {
+ this.maxNumRecords = maxNumRecords;
+ this.hashTableDoublingFactor = hashTableDoublingFactor;
+ }
+
+ @Override
+ public long calculateSize(final HashJoinMemoryCalculator.PartitionStat partitionStat,
+ final Map<String, Long> keySizes,
+ final double loadFactor,
+ final double safetyFactor,
+ final double fragmentationFactor) {
+ Preconditions.checkArgument(!keySizes.isEmpty());
+ Preconditions.checkArgument(!partitionStat.isSpilled());
+ Preconditions.checkArgument(partitionStat.getNumInMemoryRecords() > 0);
+
+ // The number of entries in the global index array. Note this array may not be completed populated and the degree of population depends on the load factor.
+ long numBuckets = (long) (((double) partitionStat.getNumInMemoryRecords()) * (1.0 / loadFactor));
+ // The number of pairs in the hash table
+ long numEntries = partitionStat.getNumInMemoryRecords();
+ // The number of Batch Holders in the hash table. Note that entries are tightly packed in the Batch Holders (no gaps).
+ long numBatchHolders = (numEntries + maxNumRecords - 1) / maxNumRecords;
+
+ // Include the size of the buckets array
+ long hashTableSize = RecordBatchSizer.multiplyByFactors(computeValueVectorSize(numBuckets, IntVector.VALUE_WIDTH), hashTableDoublingFactor);
+ // Each Batch Holder has an int vector of max size for holding links and hash values
+ hashTableSize += numBatchHolders * 2L * IntVector.VALUE_WIDTH * ((long) maxNumRecords);
+
+ long numFullBatchHolders = numEntries % maxNumRecords == 0? numBatchHolders: numBatchHolders - 1;
+ // Compute the size of the value vectors holding keys in each full bucket
+ hashTableSize += numFullBatchHolders * computeVectorSizes(keySizes, maxNumRecords, safetyFactor);
+
+ if (numFullBatchHolders != numBatchHolders) {
+ // The last bucket is a partial bucket
+ long partialNumEntries = numEntries % maxNumRecords;
+ hashTableSize += computeVectorSizes(keySizes, partialNumEntries, safetyFactor);
+ long maxVectorSize = computeMaxVectorSize(keySizes, partialNumEntries, safetyFactor);
+ hashTableSize -= maxVectorSize;
+ hashTableSize += RecordBatchSizer.multiplyByFactors(maxVectorSize, hashTableDoublingFactor);
+ }
+
+ return RecordBatchSizer.multiplyByFactors(hashTableSize, fragmentationFactor);
+ }
+
+ @Override
+ public double getDoublingFactor() {
+ return hashTableDoublingFactor;
+ }
+
+ @Override
+ public String getType() {
+ return TYPE;
+ }
+
+ public static long computeVectorSizes(final Map<String, Long> vectorSizes,
+ final long numRecords,
+ final double safetyFactor) {
+ long totalKeySize = 0L;
+
+ for (Map.Entry<String, Long> entry: vectorSizes.entrySet()) {
+ totalKeySize += computeValueVectorSize(numRecords, entry.getValue(), safetyFactor);
+ }
+
+ return totalKeySize;
+ }
+
+ public static long computeMaxVectorSize(final Map<String, Long> vectorSizes,
+ final long numRecords,
+ final double safetyFactor) {
+ long maxVectorSize = 0L;
+
+ for (Map.Entry<String, Long> entry: vectorSizes.entrySet()) {
+ maxVectorSize = Math.max(maxVectorSize, computeValueVectorSize(numRecords, entry.getValue(), safetyFactor));
+ }
+
+ return maxVectorSize;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MechanicalHashJoinMemoryCalculator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MechanicalHashJoinMemoryCalculator.java
new file mode 100644
index 0000000..8b367dd
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MechanicalHashJoinMemoryCalculator.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.impl.join;
+
+import com.google.common.base.Preconditions;
+import org.apache.drill.exec.record.RecordBatch;
+
+import javax.annotation.Nullable;
+import java.util.Set;
+
+public class MechanicalHashJoinMemoryCalculator implements HashJoinMemoryCalculator {
+ private final int maxNumInMemBatches;
+
+ private boolean doMemoryCalc;
+
+ public MechanicalHashJoinMemoryCalculator(int maxNumInMemBatches) {
+ this.maxNumInMemBatches = maxNumInMemBatches;
+ }
+
+ @Override
+ public void initialize(boolean doMemoryCalc) {
+ this.doMemoryCalc = doMemoryCalc;
+ }
+
+ @Nullable
+ @Override
+ public BuildSidePartitioning next() {
+ if (doMemoryCalc) {
+ // return the mechanical implementation
+ return new MechanicalBuildSidePartitioning(maxNumInMemBatches);
+ } else {
+ // return Noop implementation
+ return new HashJoinMemoryCalculatorImpl.NoopBuildSidePartitioningImpl();
+ }
+ }
+
+ @Override
+ public HashJoinState getState() {
+ return HashJoinState.INITIALIZING;
+ }
+
+ public static class MechanicalBuildSidePartitioning implements BuildSidePartitioning {
+ private final int maxNumInMemBatches;
+
+ private int initialPartitions;
+ private PartitionStatSet partitionStatSet;
+
+ public MechanicalBuildSidePartitioning(int maxNumInMemBatches) {
+ this.maxNumInMemBatches = maxNumInMemBatches;
+ }
+
+ @Override
+ public void initialize(boolean autoTune,
+ boolean reserveHash,
+ RecordBatch buildSideBatch,
+ RecordBatch probeSideBatch,
+ Set<String> joinColumns,
+ long memoryAvailable,
+ int initialPartitions,
+ int recordsPerPartitionBatchBuild,
+ int recordsPerPartitionBatchProbe,
+ int maxBatchNumRecordsBuild,
+ int maxBatchNumRecordsProbe,
+ int outputBatchNumRecords,
+ double loadFactor) {
+ this.initialPartitions = initialPartitions;
+ }
+
+ @Override
+ public void setPartitionStatSet(PartitionStatSet partitionStatSet) {
+ this.partitionStatSet = Preconditions.checkNotNull(partitionStatSet);
+ }
+
+ @Override
+ public int getNumPartitions() {
+ return initialPartitions;
+ }
+
+ @Override
+ public long getBuildReservedMemory() {
+ return 0;
+ }
+
+ @Override
+ public long getMaxReservedMemory() {
+ return 0;
+ }
+
+ @Override
+ public boolean shouldSpill() {
+ return partitionStatSet.getNumInMemoryBatches() > maxNumInMemBatches;
+ }
+
+ @Override
+ public String makeDebugString() {
+ return "Mechanical build side calculations";
+ }
+
+ @Nullable
+ @Override
+ public PostBuildCalculations next() {
+ return new MechanicalPostBuildCalculations(maxNumInMemBatches, partitionStatSet);
+ }
+
+ @Override
+ public HashJoinState getState() {
+ return HashJoinState.BUILD_SIDE_PARTITIONING;
+ }
+ }
+
+ public static class MechanicalPostBuildCalculations implements PostBuildCalculations {
+ private final int maxNumInMemBatches;
+ private final PartitionStatSet partitionStatSet;
+
+ public MechanicalPostBuildCalculations(int maxNumInMemBatches,
+ PartitionStatSet partitionStatSet) {
+ this.maxNumInMemBatches = maxNumInMemBatches;
+ this.partitionStatSet = Preconditions.checkNotNull(partitionStatSet);
+ }
+
+ @Override
+ public void initialize() {
+ // Do nothing
+ }
+
+ @Override
+ public boolean shouldSpill() {
+ return partitionStatSet.getNumInMemoryBatches() > maxNumInMemBatches;
+ }
+
+ @Override
+ public String makeDebugString() {
+ return "Mechanical post build calculations";
+ }
+
+ @Nullable
+ @Override
+ public HashJoinMemoryCalculator next() {
+ return null;
+ }
+
+ @Override
+ public HashJoinState getState() {
+ return HashJoinState.POST_BUILD_CALCULATIONS;
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java
index b77fa61..42f7e72 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java
@@ -43,6 +43,7 @@ import org.apache.drill.exec.planner.torel.ConversionContext;
* Logical Join implemented in Drill.
*/
public class DrillJoinRel extends DrillJoinRelBase implements DrillRel {
+ public static final String EQUALITY_CONDITION = "==";
/** Creates a DrillJoinRel.
* We do not throw InvalidRelException in Logical planning phase. It's up to the post-logical planning check or physical planning
@@ -88,7 +89,7 @@ public class DrillJoinRel extends DrillJoinRelBase implements DrillRel {
builder.right(rightOp);
for (Pair<Integer, Integer> pair : Pair.zip(leftKeys, rightKeys)) {
- builder.addCondition("==", new FieldReference(leftFields.get(pair.left)), new FieldReference(rightFields.get(pair.right)));
+ builder.addCondition(EQUALITY_CONDITION, new FieldReference(leftFields.get(pair.left)), new FieldReference(rightFields.get(pair.right)));
}
return builder.build();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 054ceec..dfa1424 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.impl.aggregate.SpilledRecordbatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.server.options.OptionValue;
@@ -120,6 +121,13 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
stats.startProcessing();
}
+ if (b instanceof SpilledRecordbatch) {
+ // Don't double count records which were already read and spilled.
+ // TODO evaluate whether swapping out upstream record batch with a SpilledRecordBatch
+ // is the right thing to do.
+ return next;
+ }
+
switch(next) {
case OK_NEW_SCHEMA:
stats.batchReceived(inputIndex, b.getRecordCount(), true);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
index d30f565..7e531f8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
@@ -26,6 +26,7 @@ import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.AllocationManager.BufferLedger;
import org.apache.drill.exec.memory.BaseAllocator;
+import org.apache.drill.exec.physical.impl.xsort.managed.SortMemoryManager;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.NullableVector;
@@ -49,9 +50,30 @@ import static org.apache.drill.exec.vector.AllocationHelper.STD_REPETITION_FACTO
*/
public class RecordBatchSizer {
+ // TODO consolidate common memory estimation helpers
+ public static final double PAYLOAD_FROM_BUFFER = SortMemoryManager.PAYLOAD_FROM_BUFFER;
+ public static final double FRAGMENTATION_FACTOR = 1.0 / PAYLOAD_FROM_BUFFER;
+ public static final double BUFFER_FROM_PAYLOAD = SortMemoryManager.BUFFER_FROM_PAYLOAD;
+
private static final int OFFSET_VECTOR_WIDTH = UInt4Vector.VALUE_WIDTH;
private static final int BIT_VECTOR_WIDTH = UInt1Vector.VALUE_WIDTH;
+ public static long multiplyByFactors(long size, double... factors)
+ {
+ double doubleSize = (double) size;
+
+ for (double factor: factors) {
+ doubleSize *= factor;
+ }
+
+ return (long) doubleSize;
+ }
+
+ public static long multiplyByFactor(long size, double factor)
+ {
+ return (long) (((double) size) * factor);
+ }
+
/**
* Column size information.
*/
@@ -127,6 +149,14 @@ public class RecordBatchSizer {
private Map<String, ColumnSize> children = CaseInsensitiveMap.newHashMap();
/**
+ * Returns true if there is an accurate std size. Otherwise it returns false.
+ * @return True if there is an accurate std size. Otherwise it returns false.
+ */
+ public boolean hasStdDataSize() {
+ return !isVariableWidth && !isRepeated;
+ }
+
+ /**
* std pure data size per entry from Drill metadata, based on type.
* Does not include metadata vector overhead we add for cardinality,
* variable length etc.
@@ -230,6 +260,18 @@ public class RecordBatchSizer {
}
/**
+ * If there is an accurate std net size, that is returned. Otherwise the net size is returned.
+ * @return If there is an accurate std net size, that is returned. Otherwise the net size is returned.
+ */
+ public int getStdNetOrNetSizePerEntry() {
+ if (hasStdDataSize()) {
+ return getStdNetSizePerEntry();
+ } else {
+ return getNetSizePerEntry();
+ }
+ }
+
+ /**
* This is the total data size for the column, including children for map
* columns. Does not include any overhead of metadata vectors.
*/
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 06a89b0..d966f50 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -560,4 +560,28 @@ public class VectorContainer implements VectorAccessible {
schemaChanged = other.schemaChanged;
other.schemaChanged = temp2;
}
+
+ /**
+ * This method create a pretty string for a record in the {@link VectorContainer}.
+ * @param index The index of the record of interest.
+ * @return The string representation of a record.
+ */
+ public String prettyPrintRecord(int index) {
+ final StringBuilder sb = new StringBuilder();
+ String separator = "";
+ sb.append("[");
+
+ for (VectorWrapper vectorWrapper: wrappers) {
+ sb.append(separator);
+ separator = ", ";
+ final String columnName = vectorWrapper.getField().getName();
+ final Object value = vectorWrapper.getValueVector().getAccessor().getObject(index);
+
+ // "columnName" = 11
+ sb.append("\"").append(columnName).append("\" = ").append(value);
+ }
+
+ sb.append("]");
+ return sb.toString();
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 810fd37..183c1f1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -118,10 +118,9 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
new OptionDefinition(PlannerSettings.ENABLE_UNNEST_LATERAL),
new OptionDefinition(PlannerSettings.FORCE_2PHASE_AGGR), // for testing
new OptionDefinition(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR),
- new OptionDefinition(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR),
+ new OptionDefinition(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
new OptionDefinition(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR),
- new OptionDefinition(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR),
- new OptionDefinition(ExecConstants.HASHJOIN_MAX_BATCHES_PER_PARTITION_VALIDATOR),
+ new OptionDefinition(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
new OptionDefinition(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR),
new OptionDefinition(ExecConstants.HASHAGG_MAX_MEMORY_VALIDATOR),
new OptionDefinition(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR), // for tuning
@@ -188,6 +187,10 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
new OptionDefinition(ExecConstants.MIN_MEMORY_PER_BUFFERED_OP),
new OptionDefinition(ExecConstants.NON_BLOCKING_OPERATORS_MEMORY),
new OptionDefinition(ExecConstants.HASH_JOIN_TABLE_FACTOR),
+ new OptionDefinition(ExecConstants.HASHJOIN_HASHTABLE_CALC_TYPE, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
+ new OptionDefinition(ExecConstants.HASHJOIN_SAFETY_FACTOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
+ new OptionDefinition(ExecConstants.HASHJOIN_HASH_DOUBLE_FACTOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
+ new OptionDefinition(ExecConstants.HASHJOIN_FRAGMENTATION_FACTOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
new OptionDefinition(ExecConstants.HASH_AGG_TABLE_FACTOR),
new OptionDefinition(ExecConstants.AVERAGE_FIELD_WIDTH),
new OptionDefinition(ExecConstants.NEW_VIEW_DEFAULT_PERMS_VALIDATOR),
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index f321691..b4969c0 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -420,6 +420,7 @@ drill.exec.options: {
debug.validate_vectors: false,
drill.exec.functions.cast_empty_string_to_null: false,
drill.exec.rpc.fragrunner.timeout: 10000,
+ drill.exec.hashjoin.mem_limit: 0,
# Setting to control if HashAgg should fallback to older behavior of consuming
# unbounded memory. In case of 2 phase Agg when available memory is not enough
# to start at least 2 partitions then HashAgg fallbacks to this case. It can be
@@ -439,10 +440,13 @@ drill.exec.options: {
exec.enable_union_type: false,
exec.errors.verbose: false,
exec.hashjoin.mem_limit: 0,
+ exec.hashjoin.hash_table_calc_type: "LEAN",
+ exec.hashjoin.safety_factor: 1.0,
+ exec.hashjoin.fragmentation_factor: 1.33,
+ exec.hashjoin.hash_double_factor: 2.0,
exec.hashjoin.num_partitions: 32,
exec.hashjoin.num_rows_in_batch: 1024,
- exec.hashjoin.max_batches_in_memory: 128,
- exec.hashjoin.max_batches_per_partition: 512,
+ exec.hashjoin.max_batches_in_memory: 0,
exec.hashagg.mem_limit: 0,
exec.hashagg.min_batches_per_partition: 2,
exec.hashagg.num_partitions: 32,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java
index 6a4ec52..302fb59 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java
@@ -34,6 +34,7 @@ import org.apache.drill.exec.cache.VectorSerializer.Reader;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.test.BaseDirTestWatcher;
import org.apache.drill.test.DirTestWatcher;
import org.apache.drill.test.DrillTest;
import org.apache.drill.test.OperatorFixture;
@@ -52,12 +53,12 @@ import org.junit.Test;
public class TestBatchSerialization extends DrillTest {
@ClassRule
- public static final DirTestWatcher dirTestWatcher = new DirTestWatcher();
+ public static final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
public static OperatorFixture fixture;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
- fixture = OperatorFixture.builder().build();
+ fixture = OperatorFixture.builder(dirTestWatcher).build();
}
@AfterClass
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
index 9fcb6a3..dd90edd 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
@@ -174,6 +174,7 @@ public class TestLargeFileCompilation extends BaseTestQuery {
public void testHashJoin() throws Exception {
String tableName = "wide_table_hash_join";
try {
+ setSessionOption("drill.exec.hashjoin.fallback.enabled", true);
testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION);
testNoResult("alter session set `planner.enable_mergejoin` = false");
testNoResult("alter session set `planner.enable_nestedloopjoin` = false");
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
new file mode 100644
index 0000000..b57329c
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.common;
+
+import com.google.common.collect.Lists;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.physical.impl.aggregate.SpilledRecordbatch;
+import org.apache.drill.exec.physical.impl.join.HashJoinMemoryCalculator;
+import org.apache.drill.exec.physical.impl.join.HashJoinMemoryCalculatorImpl;
+import org.apache.drill.exec.physical.impl.join.JoinUtils;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
+import org.apache.drill.exec.planner.logical.DrillJoinRel;
+import org.apache.drill.exec.proto.ExecProtos;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.test.BaseDirTestWatcher;
+import org.apache.drill.test.OperatorFixture;
+import org.apache.drill.test.rowSet.DirectRowSet;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetBatch;
+import org.apache.drill.test.rowSet.RowSetBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.List;
+
+public class HashPartitionTest {
+ @Rule
+ public final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
+
+ @Test
+ public void noSpillBuildSideTest() throws Exception
+ {
+ new HashPartitionFixture().run(new HashPartitionTestCase() {
+ private RowSet buildRowSet;
+ private RowSet probeRowSet;
+
+ @Override
+ public RecordBatch createBuildBatch(BatchSchema schema, BufferAllocator allocator) {
+ buildRowSet = new RowSetBuilder(allocator, schema)
+ .addRow(1, "green")
+ .addRow(3, "red")
+ .addRow(2, "blue")
+ .build();
+ return new RowSetBatch(buildRowSet);
+ }
+
+ @Override
+ public void createResultBuildBatch(BatchSchema schema, BufferAllocator allocator) {
+ }
+
+ @Override
+ public RecordBatch createProbeBatch(BatchSchema schema, BufferAllocator allocator) {
+ probeRowSet = new RowSetBuilder(allocator, schema)
+ .addRow(.5, "yellow")
+ .addRow(1.5, "blue")
+ .addRow(2.5, "black")
+ .build();
+ return new RowSetBatch(probeRowSet);
+ }
+
+ @Override
+ public void run(SpillSet spillSet,
+ BatchSchema buildSchema,
+ BatchSchema probeSchema,
+ RecordBatch buildBatch,
+ RecordBatch probeBatch,
+ ChainedHashTable baseHashTable,
+ FragmentContext context,
+ OperatorContext operatorContext) throws Exception {
+
+ final HashPartition hashPartition = new HashPartition(context,
+ context.getAllocator(),
+ baseHashTable,
+ buildBatch,
+ probeBatch,
+ 10,
+ spillSet,
+ 0,
+ 0);
+
+ final HashJoinMemoryCalculator.BuildSidePartitioning noopCalc = new HashJoinMemoryCalculatorImpl.NoopBuildSidePartitioningImpl();
+
+ hashPartition.appendInnerRow(buildRowSet.container(), 0, 10, noopCalc);
+ hashPartition.appendInnerRow(buildRowSet.container(), 1, 11, noopCalc);
+ hashPartition.appendInnerRow(buildRowSet.container(), 2, 12, noopCalc);
+ hashPartition.completeAnInnerBatch(false, false);
+ hashPartition.buildContainersHashTableAndHelper();
+
+ {
+ int compositeIndex = hashPartition.probeForKey(0, 16);
+ Assert.assertEquals(-1, compositeIndex);
+ }
+
+ {
+ int compositeIndex = hashPartition.probeForKey(1, 12);
+ int startIndex = hashPartition.getStartIndex(compositeIndex);
+ int nextIndex = hashPartition.getNextIndex(startIndex);
+
+ Assert.assertEquals(2, startIndex);
+ Assert.assertEquals(-1, nextIndex);
+ }
+
+ {
+ int compositeIndex = hashPartition.probeForKey(2, 15);
+ Assert.assertEquals(-1, compositeIndex);
+ }
+
+ buildRowSet.clear();
+ probeRowSet.clear();
+ hashPartition.close();
+ }
+ });
+ }
+
+ @Test
+ public void spillSingleIncompleteBatchBuildSideTest() throws Exception
+ {
+ new HashPartitionFixture().run(new HashPartitionTestCase() {
+ private RowSet buildRowSet;
+ private RowSet probeRowSet;
+ private RowSet actualBuildRowSet;
+
+ @Override
+ public RecordBatch createBuildBatch(BatchSchema schema, BufferAllocator allocator) {
+ buildRowSet = new RowSetBuilder(allocator, schema)
+ .addRow(1, "green")
+ .addRow(3, "red")
+ .addRow(2, "blue")
+ .build();
+ return new RowSetBatch(buildRowSet);
+ }
+
+ @Override
+ public void createResultBuildBatch(BatchSchema schema, BufferAllocator allocator) {
+ final BatchSchema newSchema = BatchSchema.newBuilder()
+ .addFields(schema)
+ .addField(MaterializedField.create(HashPartition.HASH_VALUE_COLUMN_NAME, HashPartition.HVtype))
+ .build();
+ actualBuildRowSet = new RowSetBuilder(allocator, newSchema)
+ .addRow(1, "green", 10)
+ .addRow(3, "red", 11)
+ .addRow(2, "blue", 12)
+ .build();
+ }
+
+ @Override
+ public RecordBatch createProbeBatch(BatchSchema schema, BufferAllocator allocator) {
+ probeRowSet = new RowSetBuilder(allocator, schema)
+ .addRow(.5, "yellow")
+ .addRow(1.5, "blue")
+ .addRow(2.5, "black")
+ .build();
+ return new RowSetBatch(probeRowSet);
+ }
+
+ @Override
+ public void run(SpillSet spillSet,
+ BatchSchema buildSchema,
+ BatchSchema probeSchema,
+ RecordBatch buildBatch,
+ RecordBatch probeBatch,
+ ChainedHashTable baseHashTable,
+ FragmentContext context,
+ OperatorContext operatorContext) {
+
+ final HashPartition hashPartition = new HashPartition(context,
+ context.getAllocator(),
+ baseHashTable,
+ buildBatch,
+ probeBatch,
+ 10,
+ spillSet,
+ 0,
+ 0);
+
+ final HashJoinMemoryCalculator.BuildSidePartitioning noopCalc = new HashJoinMemoryCalculatorImpl.NoopBuildSidePartitioningImpl();
+
+ hashPartition.appendInnerRow(buildRowSet.container(), 0, 10, noopCalc);
+ hashPartition.appendInnerRow(buildRowSet.container(), 1, 11, noopCalc);
+ hashPartition.appendInnerRow(buildRowSet.container(), 2, 12, noopCalc);
+ hashPartition.completeAnInnerBatch(false, false);
+ hashPartition.spillThisPartition();
+ final String spillFile = hashPartition.getSpillFile();
+ final int batchesCount = hashPartition.getPartitionBatchesCount();;
+ hashPartition.closeWriter();
+
+ SpilledRecordbatch spilledBuildBatch = new SpilledRecordbatch(spillFile, batchesCount, context, buildSchema, operatorContext, spillSet);
+ final RowSet actual = DirectRowSet.fromContainer(spilledBuildBatch.getContainer());
+
+ new RowSetComparison(actualBuildRowSet).verify(actual);
+
+ spilledBuildBatch.close();
+ buildRowSet.clear();
+ actualBuildRowSet.clear();
+ probeRowSet.clear();
+ hashPartition.close();
+ }
+ });
+ }
+
+ public class HashPartitionFixture {
+ public void run(HashPartitionTestCase testCase) throws Exception {
+ try (OperatorFixture operatorFixture = new OperatorFixture.Builder(HashPartitionTest.this.dirTestWatcher).build()) {
+
+ final FragmentContext context = operatorFixture.getFragmentContext();
+ final HashJoinPOP pop = new HashJoinPOP(null, null, null, JoinRelType.FULL);
+ final OperatorContext operatorContext = operatorFixture.operatorContext(pop);
+ final DrillConfig config = context.getConfig();
+ final BufferAllocator allocator = operatorFixture.allocator();
+
+ final UserBitShared.QueryId queryId = UserBitShared.QueryId.newBuilder()
+ .setPart1(1L)
+ .setPart2(2L)
+ .build();
+ final ExecProtos.FragmentHandle fragmentHandle = ExecProtos.FragmentHandle.newBuilder()
+ .setQueryId(queryId)
+ .setMinorFragmentId(1)
+ .setMajorFragmentId(2)
+ .build();
+
+ final SpillSet spillSet = new SpillSet(config, fragmentHandle, pop);
+
+ // Create build batch
+ MaterializedField buildColA = MaterializedField.create("buildColA", Types.required(TypeProtos.MinorType.INT));
+ MaterializedField buildColB = MaterializedField.create("buildColB", Types.required(TypeProtos.MinorType.VARCHAR));
+ List<MaterializedField> buildCols = Lists.newArrayList(buildColA, buildColB);
+ final BatchSchema buildSchema = new BatchSchema(BatchSchema.SelectionVectorMode.NONE, buildCols);
+ final RecordBatch buildBatch = testCase.createBuildBatch(buildSchema, allocator);
+ testCase.createResultBuildBatch(buildSchema, allocator);
+
+ // Create probe batch
+ MaterializedField probeColA = MaterializedField.create("probeColA", Types.required(TypeProtos.MinorType.FLOAT4));
+ MaterializedField probeColB = MaterializedField.create("probeColB", Types.required(TypeProtos.MinorType.VARCHAR));
+ List<MaterializedField> probeCols = Lists.newArrayList(probeColA, probeColB);
+ final BatchSchema probeSchema = new BatchSchema(BatchSchema.SelectionVectorMode.NONE, probeCols);
+ final RecordBatch probeBatch = testCase.createProbeBatch(probeSchema, allocator);
+
+ final LogicalExpression buildColExpression = SchemaPath.getSimplePath(buildColB.getName());;
+ final LogicalExpression probeColExpression = SchemaPath.getSimplePath(probeColB.getName());;
+
+ final JoinCondition condition = new JoinCondition(DrillJoinRel.EQUALITY_CONDITION, probeColExpression, buildColExpression);
+ final List<Comparator> comparators = Lists.newArrayList(JoinUtils.checkAndReturnSupportedJoinComparator(condition));
+
+ final List<NamedExpression> buildExpressions = Lists.newArrayList(new NamedExpression(buildColExpression, new FieldReference("build_side_0")));
+ final List<NamedExpression> probeExpressions = Lists.newArrayList(new NamedExpression(probeColExpression, new FieldReference("probe_side_0")));
+
+ final int hashTableSize = (int) context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE);
+ final HashTableConfig htConfig = new HashTableConfig(hashTableSize, HashTable.DEFAULT_LOAD_FACTOR, buildExpressions, probeExpressions, comparators);
+ final ChainedHashTable baseHashTable = new ChainedHashTable(htConfig, context, allocator, buildBatch, probeBatch, null);
+ baseHashTable.updateIncoming(buildBatch, probeBatch);
+
+ testCase.run(spillSet, buildSchema, probeSchema, buildBatch, probeBatch, baseHashTable, context, operatorContext);
+ }
+ }
+ }
+
+ interface HashPartitionTestCase {
+ RecordBatch createBuildBatch(BatchSchema schema, BufferAllocator allocator);
+ void createResultBuildBatch(BatchSchema schema, BufferAllocator allocator);
+ RecordBatch createProbeBatch(BatchSchema schema, BufferAllocator allocator);
+
+ void run(SpillSet spillSet,
+ BatchSchema buildSchema,
+ BatchSchema probeSchema,
+ RecordBatch buildBatch,
+ RecordBatch probeBatch,
+ ChainedHashTable baseHashTable,
+ FragmentContext context,
+ OperatorContext operatorContext) throws Exception;
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashTableAllocationTrackerTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashTableAllocationTrackerTest.java
new file mode 100644
index 0000000..131d82f
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashTableAllocationTrackerTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.common;
+
+import com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class HashTableAllocationTrackerTest {
+ @Test
+ public void testDoubleGetNextCall() {
+ final HashTableConfig config = new HashTableConfig(100, true, .5f, Lists.newArrayList(), Lists.newArrayList(), Lists.newArrayList());
+ final HashTableAllocationTracker tracker = new HashTableAllocationTracker(config, 30);
+
+ for (int counter = 0; counter < 100; counter++) {
+ Assert.assertEquals(30, tracker.getNextBatchHolderSize());
+ }
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testPrematureCommit() {
+ final HashTableConfig config = new HashTableConfig(100, .5f, Lists.newArrayList(), Lists.newArrayList(), Lists.newArrayList());
+ final HashTableAllocationTracker tracker = new HashTableAllocationTracker(config, 30);
+
+ tracker.commit();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testDoubleCommit() {
+ final HashTableConfig config = new HashTableConfig(100, .5f, Lists.newArrayList(), Lists.newArrayList(), Lists.newArrayList());
+ final HashTableAllocationTracker tracker = new HashTableAllocationTracker(config, 30);
+
+ tracker.commit();
+ tracker.commit();
+ }
+
+ @Test
+ public void testOverAsking() {
+ final HashTableConfig config = new HashTableConfig(100, .5f, Lists.newArrayList(), Lists.newArrayList(), Lists.newArrayList());
+ final HashTableAllocationTracker tracker = new HashTableAllocationTracker(config, 30);
+
+ tracker.getNextBatchHolderSize();
+ }
+
+ /**
+ * Test for when we do not know the final size of the hash table.
+ */
+ @Test
+ public void testLifecycle1() {
+ final HashTableConfig config = new HashTableConfig(100, .5f, Lists.newArrayList(), Lists.newArrayList(), Lists.newArrayList());
+ final HashTableAllocationTracker tracker = new HashTableAllocationTracker(config, 30);
+
+ for (int counter = 0; counter < 100; counter++) {
+ Assert.assertEquals(30, tracker.getNextBatchHolderSize());
+ tracker.commit();
+ }
+ }
+
+ /**
+ * Test for when we know the final size of the hash table
+ */
+ @Test
+ public void testLifecycle() {
+ final HashTableConfig config = new HashTableConfig(100, true, .5f, Lists.newArrayList(), Lists.newArrayList(), Lists.newArrayList());
+ final HashTableAllocationTracker tracker = new HashTableAllocationTracker(config, 30);
+
+ Assert.assertEquals(30, tracker.getNextBatchHolderSize());
+ tracker.commit();
+ Assert.assertEquals(30, tracker.getNextBatchHolderSize());
+ tracker.commit();
+ Assert.assertEquals(30, tracker.getNextBatchHolderSize());
+ tracker.commit();
+ Assert.assertEquals(10, tracker.getNextBatchHolderSize());
+ tracker.commit();
+
+ boolean caughtException = false;
+
+ try {
+ tracker.getNextBatchHolderSize();
+ } catch (IllegalStateException ex) {
+ caughtException = true;
+ }
+
+ Assert.assertTrue(caughtException);
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/PartitionStatImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/PartitionStatImpl.java
new file mode 100644
index 0000000..43e02b6
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/PartitionStatImpl.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * This class represents the memory size statistics for an entire partition.
+ */
+public class PartitionStatImpl implements HashJoinMemoryCalculator.PartitionStat {
+ private boolean spilled;
+ private long numRecords;
+ private long partitionSize;
+ private LinkedList<HashJoinMemoryCalculator.BatchStat> batchStats = Lists.newLinkedList();
+
+ public PartitionStatImpl() {
+ }
+
+ public void add(HashJoinMemoryCalculator.BatchStat batchStat) {
+ Preconditions.checkState(!spilled);
+ Preconditions.checkNotNull(batchStat);
+ partitionSize += batchStat.getBatchSize();
+ numRecords += batchStat.getNumRecords();
+ batchStats.addLast(batchStat);
+ }
+
+ public void spill() {
+ Preconditions.checkState(!spilled);
+ spilled = true;
+ partitionSize = 0;
+ numRecords = 0;
+ batchStats.clear();
+ }
+
+ public List<HashJoinMemoryCalculator.BatchStat> getInMemoryBatches()
+ {
+ return Collections.unmodifiableList(batchStats);
+ }
+
+ public int getNumInMemoryBatches()
+ {
+ return batchStats.size();
+ }
+
+ public boolean isSpilled()
+ {
+ return spilled;
+ }
+
+ public long getNumInMemoryRecords()
+ {
+ return numRecords;
+ }
+
+ public long getInMemorySize()
+ {
+ return partitionSize;
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
new file mode 100644
index 0000000..dc05a70
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.record.RecordBatch;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestBuildSidePartitioningImpl {
+ @Test
+ public void testSimpleReserveMemoryCalculationNoHash() {
+ final int maxBatchNumRecords = 20;
+ final HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl calc =
+ new HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl(
+ new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_SIZE, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
+ HashJoinHelperSizeCalculatorImpl.INSTANCE,
+ 2.0,
+ 1.5);
+
+ final CaseInsensitiveMap<Long> buildValueSizes = CaseInsensitiveMap.newHashMap();
+ final CaseInsensitiveMap<Long> probeValueSizes = CaseInsensitiveMap.newHashMap();
+ final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap();
+
+ calc.initialize(true,
+ false,
+ buildValueSizes,
+ probeValueSizes,
+ keySizes,
+ 200,
+ 2,
+ 20,
+ 10,
+ 20,
+ 10,
+ 10,
+ 5,
+ maxBatchNumRecords,
+ maxBatchNumRecords,
+ 10,
+ .75);
+
+ final HashJoinMemoryCalculator.PartitionStatSet partitionStatSet =
+ new HashJoinMemoryCalculator.PartitionStatSet(new PartitionStatImpl(), new PartitionStatImpl());
+ calc.setPartitionStatSet(partitionStatSet);
+
+ long expectedReservedMemory = 60 // Max incoming batch size
+ + 2 * 30 // build side batch for each spilled partition
+ + 60; // Max incoming probe batch size
+ long actualReservedMemory = calc.getBuildReservedMemory();
+
+ Assert.assertEquals(expectedReservedMemory, actualReservedMemory);
+ Assert.assertEquals(2, calc.getNumPartitions());
+ }
+
+ @Test
+ public void testSimpleReserveMemoryCalculationHash() {
+ final int maxBatchNumRecords = 20;
+ final HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl calc =
+ new HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl(
+ new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_SIZE, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
+ HashJoinHelperSizeCalculatorImpl.INSTANCE,
+ 2.0,
+ 1.5);
+
+ final CaseInsensitiveMap<Long> buildValueSizes = CaseInsensitiveMap.newHashMap();
+ final CaseInsensitiveMap<Long> probeValueSizes = CaseInsensitiveMap.newHashMap();
+ final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap();
+
+ calc.initialize(false,
+ true,
+ buildValueSizes,
+ probeValueSizes,
+ keySizes,
+ 350,
+ 2,
+ 20,
+ 10,
+ 20,
+ 10,
+ 10,
+ 5,
+ maxBatchNumRecords,
+ maxBatchNumRecords,
+ 10,
+ .75);
+
+ final HashJoinMemoryCalculator.PartitionStatSet partitionStatSet =
+ new HashJoinMemoryCalculator.PartitionStatSet(new PartitionStatImpl(), new PartitionStatImpl());
+ calc.setPartitionStatSet(partitionStatSet);
+
+ long expectedReservedMemory = 60 // Max incoming batch size
+ + 2 * (/* data size for batch */ 30 + /* Space reserved for hash value vector */ 10 * 4 * 2) // build side batch for each spilled partition
+ + 60; // Max incoming probe batch size
+ long actualReservedMemory = calc.getBuildReservedMemory();
+
+ Assert.assertEquals(expectedReservedMemory, actualReservedMemory);
+ Assert.assertEquals(2, calc.getNumPartitions());
+ }
+
+ @Test
+ public void testAdjustInitialPartitions() {
+ final int maxBatchNumRecords = 20;
+ final HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl calc =
+ new HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl(
+ new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_SIZE, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
+ HashJoinHelperSizeCalculatorImpl.INSTANCE,
+ 2.0,
+ 1.5);
+
+ final CaseInsensitiveMap<Long> buildValueSizes = CaseInsensitiveMap.newHashMap();
+ final CaseInsensitiveMap<Long> probeValueSizes = CaseInsensitiveMap.newHashMap();
+ final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap();
+
+ calc.initialize(
+ true,
+ false,
+ buildValueSizes,
+ probeValueSizes,
+ keySizes,
+ 200,
+ 4,
+ 20,
+ 10,
+ 20,
+ 10,
+ 10,
+ 5,
+ maxBatchNumRecords,
+ maxBatchNumRecords,
+ 10,
+ .75);
+
+ final HashJoinMemoryCalculator.PartitionStatSet partitionStatSet =
+ new HashJoinMemoryCalculator.PartitionStatSet(new PartitionStatImpl(), new PartitionStatImpl(),
+ new PartitionStatImpl(), new PartitionStatImpl());
+ calc.setPartitionStatSet(partitionStatSet);
+
+ long expectedReservedMemory = 60 // Max incoming batch size
+ + 2 * 30 // build side batch for each spilled partition
+ + 60; // Max incoming probe batch size
+ long actualReservedMemory = calc.getBuildReservedMemory();
+
+ Assert.assertEquals(expectedReservedMemory, actualReservedMemory);
+ Assert.assertEquals(2, calc.getNumPartitions());
+ }
+
+ @Test
+ public void testNoRoomInMemoryForBatch1() {
+ final int maxBatchNumRecords = 20;
+
+ final HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl calc =
+ new HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl(
+ new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_SIZE, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
+ HashJoinHelperSizeCalculatorImpl.INSTANCE,
+ 2.0,
+ 1.5);
+
+ final CaseInsensitiveMap<Long> buildValueSizes = CaseInsensitiveMap.newHashMap();
+ final CaseInsensitiveMap<Long> probeValueSizes = CaseInsensitiveMap.newHashMap();
+ final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap();
+
+ calc.initialize(
+ true,
+ false,
+ buildValueSizes,
+ probeValueSizes,
+ keySizes,
+ 180,
+ 2,
+ 20,
+ 10,
+ 20,
+ 10,
+ 10,
+ 5,
+ maxBatchNumRecords,
+ maxBatchNumRecords,
+ 10,
+ .75);
+
+ final PartitionStatImpl partition1 = new PartitionStatImpl();
+ final PartitionStatImpl partition2 = new PartitionStatImpl();
+ final HashJoinMemoryCalculator.PartitionStatSet partitionStatSet =
+ new HashJoinMemoryCalculator.PartitionStatSet(partition1, partition2);
+ calc.setPartitionStatSet(partitionStatSet);
+
+ long expectedReservedMemory = 60 // Max incoming batch size
+ + 2 * 30 // build side batch for each spilled partition
+ + 60; // Max incoming probe batch size
+ long actualReservedMemory = calc.getBuildReservedMemory();
+
+ Assert.assertEquals(expectedReservedMemory, actualReservedMemory);
+ Assert.assertEquals(2, calc.getNumPartitions());
+
+ partition1.add(new HashJoinMemoryCalculator.BatchStat(10, 8));
+ Assert.assertTrue(calc.shouldSpill());
+ }
+
+ @Test
+ public void testCompleteLifeCycle() {
+ final int maxBatchNumRecords = 20;
+ final HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl calc =
+ new HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl(
+ new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_SIZE, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
+ HashJoinHelperSizeCalculatorImpl.INSTANCE,
+ 2.0,
+ 1.5);
+
+ final CaseInsensitiveMap<Long> buildValueSizes = CaseInsensitiveMap.newHashMap();
+ final CaseInsensitiveMap<Long> probeValueSizes = CaseInsensitiveMap.newHashMap();
+ final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap();
+
+ calc.initialize(
+ true,
+ false,
+ buildValueSizes,
+ probeValueSizes,
+ keySizes,
+ 210,
+ 2,
+ 20,
+ 10,
+ 20,
+ 10,
+ 10,
+ 5,
+ maxBatchNumRecords,
+ maxBatchNumRecords,
+ 10,
+ .75);
+
+ final PartitionStatImpl partition1 = new PartitionStatImpl();
+ final PartitionStatImpl partition2 = new PartitionStatImpl();
+ final HashJoinMemoryCalculator.PartitionStatSet partitionStatSet =
+ new HashJoinMemoryCalculator.PartitionStatSet(partition1, partition2);
+ calc.setPartitionStatSet(partitionStatSet);
+
+
+ // Add to partition 1, no spill needed
+ {
+ partition1.add(new HashJoinMemoryCalculator.BatchStat(10, 7));
+ Assert.assertFalse(calc.shouldSpill());
+ }
+
+ // Add to partition 2, no spill needed
+ {
+ partition2.add(new HashJoinMemoryCalculator.BatchStat(10, 8));
+ Assert.assertFalse(calc.shouldSpill());
+ }
+
+ // Add to partition 1, and partition 1 spilled
+ {
+ partition1.add(new HashJoinMemoryCalculator.BatchStat(10, 8));
+ Assert.assertTrue(calc.shouldSpill());
+ partition1.spill();
+ }
+
+ // Add to partition 2, no spill needed
+ {
+ partition2.add(new HashJoinMemoryCalculator.BatchStat(10, 7));
+ Assert.assertFalse(calc.shouldSpill());
+ }
+
+ // Add to partition 2, and partition 2 spilled
+ {
+ partition2.add(new HashJoinMemoryCalculator.BatchStat(10, 8));
+ Assert.assertTrue(calc.shouldSpill());
+ partition2.spill();
+ }
+
+ Assert.assertNotNull(calc.next());
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinHelperSizeCalculatorImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinHelperSizeCalculatorImpl.java
new file mode 100644
index 0000000..bd34df4
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinHelperSizeCalculatorImpl.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.record.RecordBatch;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestHashJoinHelperSizeCalculatorImpl {
+ @Test
+ public void simpleCalculateSize() {
+ final long intSize =
+ ((long) TypeHelper.getSize(TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.INT).build()));
+
+ // Account for the overhead of a selection vector
+ long expected = intSize * RecordBatch.MAX_BATCH_SIZE;
+ // Account for sv4 vector for batches
+ expected += intSize * 3500;
+
+ PartitionStatImpl partitionStat = new PartitionStatImpl();
+ partitionStat.add(new HashJoinMemoryCalculator.BatchStat(1000, 2000));
+ partitionStat.add(new HashJoinMemoryCalculator.BatchStat(2500, 5000));
+
+ long actual = HashJoinHelperSizeCalculatorImpl.INSTANCE.calculateSize(partitionStat, 1.0);
+ Assert.assertEquals(expected, actual);
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinMemoryCalculatorImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinMemoryCalculatorImpl.java
new file mode 100644
index 0000000..4944c87
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinMemoryCalculatorImpl.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.drill.exec.vector.IntVector;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestHashJoinMemoryCalculatorImpl {
+ @Test
+ public void testComputeMaxBatchSizeNoHash() {
+ final long expected = 1200;
+ final long actual = HashJoinMemoryCalculatorImpl.computeMaxBatchSize(
+ 100,
+ 25,
+ 100,
+ 2.0,
+ 1.5,
+ false);
+ final long actualNoHash = HashJoinMemoryCalculatorImpl.computeMaxBatchSizeNoHash(
+ 100,
+ 25,
+ 100,
+ 2.0,
+ 1.5);
+
+ Assert.assertEquals(expected, actual);
+ Assert.assertEquals(expected, actualNoHash);
+ }
+
+ @Test
+ public void testComputeMaxBatchSizeHash()
+ {
+ long expected = HashJoinMemoryCalculatorImpl.computeMaxBatchSizeNoHash(
+ 100,
+ 25,
+ 100,
+ 2.0,
+ 4.0) +
+ 100 * IntVector.VALUE_WIDTH * 2;
+
+ final long actual = HashJoinMemoryCalculatorImpl.computeMaxBatchSize(
+ 100,
+ 25,
+ 100,
+ 2.0,
+ 4.0,
+ true);
+
+ Assert.assertEquals(expected, actual);
+ }
+
+ @Test // Make sure no exception is thrown
+ public void testMakeDebugString()
+ {
+ final PartitionStatImpl partitionStat1 = new PartitionStatImpl();
+ final PartitionStatImpl partitionStat2 = new PartitionStatImpl();
+ final PartitionStatImpl partitionStat3 = new PartitionStatImpl();
+ final PartitionStatImpl partitionStat4 = new PartitionStatImpl();
+
+ final HashJoinMemoryCalculator.PartitionStatSet partitionStatSet =
+ new HashJoinMemoryCalculator.PartitionStatSet(partitionStat1, partitionStat2, partitionStat3, partitionStat4);
+ partitionStat1.add(new HashJoinMemoryCalculator.BatchStat(10, 7));
+ partitionStat2.add(new HashJoinMemoryCalculator.BatchStat(11, 20));
+ partitionStat3.spill();
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java
index 4cfe6b4..37a6a33 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java
@@ -33,8 +33,6 @@ import java.util.List;
@Category({SlowTest.class, OperatorTest.class})
public class TestHashJoinSpill extends PhysicalOpUnitTestBase {
-
-
@SuppressWarnings("unchecked")
@Test
// Should spill, including recursive spill
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashTableSizeCalculatorConservativeImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashTableSizeCalculatorConservativeImpl.java
new file mode 100644
index 0000000..4e9f1c7
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashTableSizeCalculatorConservativeImpl.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import com.google.common.collect.Maps;
+import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.vector.UInt4Vector;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+
+/**
+ * This is a test for the more conservative hash table memory calculator {@link HashTableSizeCalculatorConservativeImpl}.
+ */
+public class TestHashTableSizeCalculatorConservativeImpl {
+ @Test
+ public void testCalculateHashTableSize() {
+ final int maxNumRecords = 40;
+ double loadFactor = .75;
+
+ final Map<String, Long> keySizes = Maps.newHashMap();
+ keySizes.put("a", 3L);
+ keySizes.put("b", 8L);
+
+ // 60 * 4/3 = 80 rounded to nearest power of 2 is 128 buckets
+ long expected = RecordBatchSizer.multiplyByFactor(
+ UInt4Vector.VALUE_WIDTH * 128, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR);
+ // First bucket key value vector sizes
+ expected += HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl.computeValueVectorSize(maxNumRecords, 3L);
+ expected += HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl.computeValueVectorSize(maxNumRecords, 8L);
+
+ // Second bucket key value vector sizes
+ expected += RecordBatchSizer.multiplyByFactor(
+ HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl.computeValueVectorSize(20, 3L), HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR);
+ expected += RecordBatchSizer.multiplyByFactor(
+ HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl.computeValueVectorSize(20, 8L), HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR);
+
+ // Overhead vectors for links and hash values for each batchHolder
+ expected += 2 * UInt4Vector.VALUE_WIDTH // links and hash values */
+ * 2 * maxNumRecords; // num batch holders
+
+ PartitionStatImpl partitionStat = new PartitionStatImpl();
+ partitionStat.add(
+ new HashJoinMemoryCalculator.BatchStat(maxNumRecords + 20, 1));
+
+ final HashTableSizeCalculatorConservativeImpl calc = new HashTableSizeCalculatorConservativeImpl(maxNumRecords, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR);
+ long actual = calc.calculateSize(partitionStat, keySizes, loadFactor, 1.0, 1.0);
+
+ Assert.assertEquals(expected, actual);
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashTableSizeCalculatorLeanImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashTableSizeCalculatorLeanImpl.java
new file mode 100644
index 0000000..5cdf524
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashTableSizeCalculatorLeanImpl.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import com.google.common.collect.Maps;
+import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.vector.UInt4Vector;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+
+/**
+ * This is a test for the more accurate hash table memory calculator {@link HashTableSizeCalculatorLeanImpl}.
+ */
+public class TestHashTableSizeCalculatorLeanImpl {
+ @Test
+ public void testCalculateHashTableSize() {
+ final int maxNumRecords = 40;
+ double loadFactor = .75;
+
+ final Map<String, Long> keySizes = Maps.newHashMap();
+ keySizes.put("a", 3L);
+ keySizes.put("b", 8L);
+
+ // 60 * 4/3 = 80 rounded to nearest power of 2 is 128 buckets
+ long expected = RecordBatchSizer.multiplyByFactor(
+ UInt4Vector.VALUE_WIDTH * 128, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR);
+ // First bucket key value vector sizes
+ expected += HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl.computeValueVectorSize(maxNumRecords, 3L);
+ expected += HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl.computeValueVectorSize(maxNumRecords, 8L);
+
+ // Second bucket key value vector sizes
+ expected += HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl.computeValueVectorSize(20, 3L);
+ expected += RecordBatchSizer.multiplyByFactor(
+ HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl.computeValueVectorSize(20, 8L), HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR);
+
+ // Overhead vectors for links and hash values for each batchHolder
+ expected += 2 * UInt4Vector.VALUE_WIDTH // links and hash values */
+ * 2 * maxNumRecords; // num batch holders
+
+ PartitionStatImpl partitionStat = new PartitionStatImpl();
+ partitionStat.add(
+ new HashJoinMemoryCalculator.BatchStat(maxNumRecords + 20, 1));
+
+ final HashTableSizeCalculatorLeanImpl calc = new HashTableSizeCalculatorLeanImpl(maxNumRecords, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR);
+ long actual = calc.calculateSize(partitionStat, keySizes, loadFactor, 1.0, 1.0);
+
+ Assert.assertEquals(expected, actual);
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPartitionStat.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPartitionStat.java
new file mode 100644
index 0000000..40eec6a
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPartitionStat.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestPartitionStat
+{
+ @Test
+ public void simpleAddBatchTest() {
+ final PartitionStatImpl partitionStat = new PartitionStatImpl();
+
+ comparePartitionStat(partitionStat, true, 0L, 0, 0L);
+ partitionStat.add(new HashJoinMemoryCalculator.BatchStat(1, 2));
+ comparePartitionStat(partitionStat, false, 2, 1, 1);
+ partitionStat.add(new HashJoinMemoryCalculator.BatchStat(2, 3));
+ comparePartitionStat(partitionStat, false, 5, 2, 3);
+ }
+
+ @Test
+ public void simpleSpillTest() {
+ final PartitionStatImpl partitionStat = new PartitionStatImpl();
+
+ Assert.assertFalse(partitionStat.isSpilled());
+ partitionStat.add(new HashJoinMemoryCalculator.BatchStat(1, 2));
+ Assert.assertFalse(partitionStat.isSpilled());
+ partitionStat.spill();
+ comparePartitionStat(partitionStat, true, 0, 0, 0);
+ }
+
+ public static void comparePartitionStat(final HashJoinMemoryCalculator.PartitionStat partitionStat,
+ boolean isEmpty,
+ long inMemorySize,
+ int numInMemoryBatches,
+ long numInMemoryRecords) {
+ Assert.assertEquals(isEmpty, partitionStat.getInMemoryBatches().isEmpty());
+ Assert.assertEquals(inMemorySize, partitionStat.getInMemorySize());
+ Assert.assertEquals(numInMemoryBatches, partitionStat.getNumInMemoryBatches());
+ Assert.assertEquals(numInMemoryRecords, partitionStat.getNumInMemoryRecords());
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPostBuildCalculationsImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPostBuildCalculationsImpl.java
new file mode 100644
index 0000000..1e9bb8a
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPostBuildCalculationsImpl.java
@@ -0,0 +1,434 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class TestPostBuildCalculationsImpl {
+ @Test
+ public void testRoundUpPowerOf2() {
+ long expected = 32;
+ long actual = HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl.roundUpToPowerOf2(expected);
+
+ Assert.assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testRounUpNonPowerOf2ToPowerOf2() {
+ long expected = 32;
+ long actual = HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl.roundUpToPowerOf2(31);
+
+ Assert.assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testComputeValueVectorSizePowerOf2() {
+ long expected = 4;
+ long actual =
+ HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl.computeValueVectorSize(2, 2);
+
+ Assert.assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testComputeValueVectorSizeNonPowerOf2() {
+ long expected = 16;
+ long actual =
+ HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl.computeValueVectorSize(3, 3);
+
+ Assert.assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testProbingAndPartitioningBuildAllInMemoryNoSpill() {
+ final Map<String, Long> keySizes = org.apache.drill.common.map.CaseInsensitiveMap.newHashMap();
+
+ final PartitionStatImpl partition1 = new PartitionStatImpl();
+ final PartitionStatImpl partition2 = new PartitionStatImpl();
+ final HashJoinMemoryCalculator.PartitionStatSet buildPartitionStatSet =
+ new HashJoinMemoryCalculator.PartitionStatSet(partition1, partition2);
+
+ final int recordsPerPartitionBatchBuild = 10;
+
+ addBatches(partition1, recordsPerPartitionBatchBuild,
+ 10, 4);
+ addBatches(partition2, recordsPerPartitionBatchBuild,
+ 10, 4);
+
+ final double fragmentationFactor = 2.0;
+ final double safetyFactor = 1.5;
+
+ HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc =
+ new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl(
+ 290,
+ 15,
+ 60,
+ 20,
+ buildPartitionStatSet,
+ keySizes,
+ new MockHashTableSizeCalculator(10),
+ new MockHashJoinHelperSizeCalculator(10),
+ fragmentationFactor,
+ safetyFactor,
+ .75,
+ false);
+
+ calc.initialize();
+
+ long expected = 60 // maxProbeBatchSize
+ + 160 // in memory partitions
+ + 20 // max output batch size
+ + 2 * 10 // Hash Table
+ + 2 * 10; // Hash join helper
+ Assert.assertFalse(calc.shouldSpill());
+ Assert.assertEquals(expected, calc.getConsumedMemory());
+ Assert.assertNull(calc.next());
+ }
+
+ @Test
+ public void testProbingAndPartitioningBuildAllInMemorySpill() {
+ final Map<String, Long> keySizes = org.apache.drill.common.map.CaseInsensitiveMap.newHashMap();
+
+ final PartitionStatImpl partition1 = new PartitionStatImpl();
+ final PartitionStatImpl partition2 = new PartitionStatImpl();
+ final HashJoinMemoryCalculator.PartitionStatSet buildPartitionStatSet =
+ new HashJoinMemoryCalculator.PartitionStatSet(partition1, partition2);
+
+ final int recordsPerPartitionBatchBuild = 10;
+
+ addBatches(partition1, recordsPerPartitionBatchBuild,
+ 10, 4);
+ addBatches(partition2, recordsPerPartitionBatchBuild,
+ 10, 4);
+
+ final double fragmentationFactor = 2.0;
+ final double safetyFactor = 1.5;
+
+ HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc =
+ new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl(
+ 270,
+ 15,
+ 60,
+ 20,
+ buildPartitionStatSet,
+ keySizes,
+ new MockHashTableSizeCalculator(10),
+ new MockHashJoinHelperSizeCalculator(10),
+ fragmentationFactor,
+ safetyFactor,
+ .75,
+ false);
+
+ calc.initialize();
+
+ long expected = 60 // maxProbeBatchSize
+ + 160 // in memory partitions
+ + 20 // max output batch size
+ + 2 * 10 // Hash Table
+ + 2 * 10; // Hash join helper
+ Assert.assertTrue(calc.shouldSpill());
+ Assert.assertEquals(expected, calc.getConsumedMemory());
+ partition1.spill();
+
+ expected = 60 // maxProbeBatchSize
+ + 80 // in memory partitions
+ + 20 // max output batch size
+ + 10 // Hash Table
+ + 10 // Hash join helper
+ + 15; // partition batch size
+ Assert.assertFalse(calc.shouldSpill());
+ Assert.assertEquals(expected, calc.getConsumedMemory());
+ Assert.assertNotNull(calc.next());
+ }
+
+ @Test
+ public void testProbingAndPartitioningBuildAllInMemoryNoSpillWithHash() {
+ final Map<String, Long> keySizes = org.apache.drill.common.map.CaseInsensitiveMap.newHashMap();
+
+ final PartitionStatImpl partition1 = new PartitionStatImpl();
+ final PartitionStatImpl partition2 = new PartitionStatImpl();
+ final HashJoinMemoryCalculator.PartitionStatSet buildPartitionStatSet =
+ new HashJoinMemoryCalculator.PartitionStatSet(partition1, partition2);
+
+ partition1.spill();
+ partition2.spill();
+
+ final double fragmentationFactor = 2.0;
+ final double safetyFactor = 1.5;
+
+ HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc =
+ new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl(
+ 180,
+ 15,
+ 60,
+ 20,
+ buildPartitionStatSet,
+ keySizes,
+ new MockHashTableSizeCalculator(10),
+ new MockHashJoinHelperSizeCalculator(10),
+ fragmentationFactor,
+ safetyFactor,
+ .75,
+ true);
+
+ calc.initialize();
+
+ long expected = 60 // maxProbeBatchSize
+ + 2 * 5 * 3 // partition batches
+ + 20; // max output batch size
+ Assert.assertFalse(calc.shouldSpill());
+ Assert.assertEquals(expected, calc.getConsumedMemory());
+ Assert.assertNotNull(calc.next());
+ }
+
+ @Test
+ public void testProbingAndPartitioningBuildAllInMemoryWithSpill() {
+ final Map<String, Long> keySizes = org.apache.drill.common.map.CaseInsensitiveMap.newHashMap();
+
+ final PartitionStatImpl partition1 = new PartitionStatImpl();
+ final PartitionStatImpl partition2 = new PartitionStatImpl();
+ final HashJoinMemoryCalculator.PartitionStatSet buildPartitionStatSet =
+ new HashJoinMemoryCalculator.PartitionStatSet(partition1, partition2);
+
+ final int recordsPerPartitionBatchBuild = 10;
+
+ addBatches(partition1, recordsPerPartitionBatchBuild, 10, 4);
+ addBatches(partition2, recordsPerPartitionBatchBuild, 10, 4);
+
+ final double fragmentationFactor = 2.0;
+ final double safetyFactor = 1.5;
+ final long hashTableSize = 10;
+ final long hashJoinHelperSize = 10;
+
+ HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc =
+ new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl(
+ 200,
+ 15,
+ 60,
+ 20,
+ buildPartitionStatSet,
+ keySizes,
+ new MockHashTableSizeCalculator(hashTableSize),
+ new MockHashJoinHelperSizeCalculator(hashJoinHelperSize),
+ fragmentationFactor,
+ safetyFactor,
+ .75,
+ false);
+
+ calc.initialize();
+
+ long expected = 60 // maxProbeBatchSize
+ + 80 // in memory partition
+ + 10 // hash table size
+ + 10 // hash join helper size
+ + 15 // max partition probe batch size
+ + 20; // outgoing batch size
+
+ Assert.assertTrue(calc.shouldSpill());
+ partition1.spill();
+ Assert.assertFalse(calc.shouldSpill());
+ Assert.assertEquals(expected, calc.getConsumedMemory());
+ Assert.assertNotNull(calc.next());
+ }
+
+ @Test
+ public void testProbingAndPartitioningBuildSomeInMemory() {
+ final Map<String, Long> keySizes = org.apache.drill.common.map.CaseInsensitiveMap.newHashMap();
+
+ final PartitionStatImpl partition1 = new PartitionStatImpl();
+ final PartitionStatImpl partition2 = new PartitionStatImpl();
+ final PartitionStatImpl partition3 = new PartitionStatImpl();
+ final PartitionStatImpl partition4 = new PartitionStatImpl();
+ final HashJoinMemoryCalculator.PartitionStatSet buildPartitionStatSet =
+ new HashJoinMemoryCalculator.PartitionStatSet(partition1, partition2, partition3, partition4);
+
+ final int recordsPerPartitionBatchBuild = 10;
+
+ partition1.spill();
+ partition2.spill();
+ addBatches(partition3, recordsPerPartitionBatchBuild, 10, 4);
+ addBatches(partition4, recordsPerPartitionBatchBuild, 10, 4);
+
+ final double fragmentationFactor = 2.0;
+ final double safetyFactor = 1.5;
+ final long hashTableSize = 10;
+ final long hashJoinHelperSize = 10;
+
+ HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc =
+ new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl(
+ 230,
+ 15,
+ 60,
+ 20,
+ buildPartitionStatSet,
+ keySizes,
+ new MockHashTableSizeCalculator(hashTableSize),
+ new MockHashJoinHelperSizeCalculator(hashJoinHelperSize),
+ fragmentationFactor,
+ safetyFactor,
+ .75,
+ false);
+
+ calc.initialize();
+
+ long expected = 60 // maxProbeBatchSize
+ + 80 // in memory partition
+ + 10 // hash table size
+ + 10 // hash join helper size
+ + 15 * 3 // max batch size for each spill probe partition
+ + 20;
+ Assert.assertTrue(calc.shouldSpill());
+ partition3.spill();
+ Assert.assertFalse(calc.shouldSpill());
+ Assert.assertEquals(expected, calc.getConsumedMemory());
+ Assert.assertNotNull(calc.next());
+ }
+
+ @Test
+ public void testProbingAndPartitioningBuildNoneInMemory() {
+
+ final Map<String, Long> keySizes = org.apache.drill.common.map.CaseInsensitiveMap.newHashMap();
+
+ final PartitionStatImpl partition1 = new PartitionStatImpl();
+ final PartitionStatImpl partition2 = new PartitionStatImpl();
+ final HashJoinMemoryCalculator.PartitionStatSet buildPartitionStatSet =
+ new HashJoinMemoryCalculator.PartitionStatSet(partition1, partition2);
+
+ partition1.spill();
+ partition2.spill();
+
+ final double fragmentationFactor = 2.0;
+ final double safetyFactor = 1.5;
+ final long hashTableSize = 10;
+ final long hashJoinHelperSize = 10;
+
+ HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc =
+ new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl(
+ 100,
+ 15,
+ 60,
+ 20,
+ buildPartitionStatSet,
+ keySizes,
+ new MockHashTableSizeCalculator(hashTableSize),
+ new MockHashJoinHelperSizeCalculator(hashJoinHelperSize),
+ fragmentationFactor,
+ safetyFactor,
+ .75,
+ false);
+
+ calc.initialize();
+ Assert.assertFalse(calc.shouldSpill());
+ Assert.assertEquals(110, calc.getConsumedMemory());
+ Assert.assertNotNull(calc.next());
+ }
+
+ @Test // Make sure I don't fail
+ public void testMakeDebugString()
+ {
+ final Map<String, Long> keySizes = org.apache.drill.common.map.CaseInsensitiveMap.newHashMap();
+
+ final PartitionStatImpl partition1 = new PartitionStatImpl();
+ final PartitionStatImpl partition2 = new PartitionStatImpl();
+ final PartitionStatImpl partition3 = new PartitionStatImpl();
+ final PartitionStatImpl partition4 = new PartitionStatImpl();
+ final HashJoinMemoryCalculator.PartitionStatSet buildPartitionStatSet =
+ new HashJoinMemoryCalculator.PartitionStatSet(partition1, partition2, partition3, partition4);
+
+ final int recordsPerPartitionBatchBuild = 10;
+
+ partition1.spill();
+ partition2.spill();
+ addBatches(partition3, recordsPerPartitionBatchBuild, 10, 4);
+ addBatches(partition4, recordsPerPartitionBatchBuild, 10, 4);
+
+ final double fragmentationFactor = 2.0;
+ final double safetyFactor = 1.5;
+ final long hashTableSize = 10;
+ final long hashJoinHelperSize = 10;
+
+ HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl calc =
+ new HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl(
+ 230,
+ 15,
+ 60,
+ 20,
+ buildPartitionStatSet,
+ keySizes,
+ new MockHashTableSizeCalculator(hashTableSize),
+ new MockHashJoinHelperSizeCalculator(hashJoinHelperSize),
+ fragmentationFactor,
+ safetyFactor,
+ .75,
+ false);
+
+ calc.initialize();
+ }
+
+ private void addBatches(PartitionStatImpl partitionStat,
+ int recordsPerPartitionBatchBuild,
+ long batchSize,
+ int numBatches) {
+ for (int counter = 0; counter < numBatches; counter++) {
+ partitionStat.add(new HashJoinMemoryCalculator.BatchStat(
+ recordsPerPartitionBatchBuild, batchSize));
+ }
+ }
+
+ public static class MockHashTableSizeCalculator implements HashTableSizeCalculator {
+ private final long size;
+
+ public MockHashTableSizeCalculator(final long size) {
+ this.size = size;
+ }
+
+ @Override
+ public long calculateSize(HashJoinMemoryCalculator.PartitionStat partitionStat,
+ Map<String, Long> keySizes,
+ double loadFactor, double safetyFactor, double fragmentationFactor) {
+ return size;
+ }
+
+ @Override
+ public double getDoublingFactor() {
+ return HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR;
+ }
+
+ @Override
+ public String getType() {
+ return null;
+ }
+ }
+
+ public static class MockHashJoinHelperSizeCalculator implements HashJoinHelperSizeCalculator {
+ private final long size;
+
+ public MockHashJoinHelperSizeCalculator(final long size)
+ {
+ this.size = size;
+ }
+
+ @Override
+ public long calculateSize(HashJoinMemoryCalculator.PartitionStat partitionStat, double fragmentationFactor) {
+ return size;
+ }
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestBatchValidator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestBatchValidator.java
index ff33c94..6982176 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestBatchValidator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestBatchValidator.java
@@ -32,12 +32,14 @@ import org.apache.drill.exec.vector.RepeatedVarCharVector;
import org.apache.drill.exec.vector.UInt4Vector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VarCharVector;
+import org.apache.drill.test.BaseDirTestWatcher;
import org.apache.drill.test.LogFixture;
import org.apache.drill.test.OperatorFixture;
import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
import org.apache.drill.test.rowSet.schema.SchemaBuilder;
import org.junit.AfterClass;
import org.junit.BeforeClass;
+import org.junit.ClassRule;
import org.junit.Test;
import ch.qos.logback.classic.Level;
@@ -47,13 +49,16 @@ public class TestBatchValidator /* TODO: extends SubOperatorTest */ {
protected static OperatorFixture fixture;
protected static LogFixture logFixture;
+ @ClassRule
+ public static final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
+
@BeforeClass
public static void setUpBeforeClass() throws Exception {
logFixture = LogFixture.builder()
.toConsole()
.logger(BatchValidator.class, Level.TRACE)
.build();
- fixture = OperatorFixture.standardFixture();
+ fixture = OperatorFixture.standardFixture(dirTestWatcher);
}
@AfterClass
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
index 4fe650c..6924810 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
@@ -39,6 +39,7 @@ import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.test.BaseDirTestWatcher;
import org.apache.drill.test.DrillTest;
import org.apache.drill.test.OperatorFixture;
import org.apache.drill.test.rowSet.DirectRowSet;
@@ -51,6 +52,7 @@ import org.apache.drill.test.rowSet.RowSetBuilder;
import org.apache.drill.test.rowSet.RowSetComparison;
import org.apache.drill.test.rowSet.RowSetReader;
import org.apache.drill.test.rowSet.RowSetWriter;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -67,6 +69,9 @@ import io.netty.buffer.DrillBuf;
@Category(OperatorTest.class)
public class TestSortImpl extends DrillTest {
+ @Rule
+ public final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
+
/**
* Create the sort implementation to be used by test.
*
@@ -209,7 +214,7 @@ public class TestSortImpl extends DrillTest {
@Test
public void testNullInput() throws Exception {
- try (OperatorFixture fixture = OperatorFixture.standardFixture()) {
+ try (OperatorFixture fixture = OperatorFixture.standardFixture(dirTestWatcher)) {
SortTestFixture sortTest = new SortTestFixture(fixture);
sortTest.run();
}
@@ -222,7 +227,7 @@ public class TestSortImpl extends DrillTest {
@Test
public void testEmptyInput() throws Exception {
- try (OperatorFixture fixture = OperatorFixture.standardFixture()) {
+ try (OperatorFixture fixture = OperatorFixture.standardFixture(dirTestWatcher)) {
BatchSchema schema = SortTestUtilities.nonNullSchema();
SortTestFixture sortTest = new SortTestFixture(fixture);
sortTest.addInput(fixture.rowSetBuilder(schema)
@@ -238,7 +243,7 @@ public class TestSortImpl extends DrillTest {
@Test
public void testSingleRow() throws Exception {
- try (OperatorFixture fixture = OperatorFixture.standardFixture()) {
+ try (OperatorFixture fixture = OperatorFixture.standardFixture(dirTestWatcher)) {
BatchSchema schema = SortTestUtilities.nonNullSchema();
SortTestFixture sortTest = new SortTestFixture(fixture);
sortTest.addInput(fixture.rowSetBuilder(schema)
@@ -258,7 +263,7 @@ public class TestSortImpl extends DrillTest {
@Test
public void testSingleBatch() throws Exception {
- try (OperatorFixture fixture = OperatorFixture.standardFixture()) {
+ try (OperatorFixture fixture = OperatorFixture.standardFixture(dirTestWatcher)) {
BatchSchema schema = SortTestUtilities.nonNullSchema();
SortTestFixture sortTest = new SortTestFixture(fixture);
sortTest.addInput(fixture.rowSetBuilder(schema)
@@ -281,7 +286,7 @@ public class TestSortImpl extends DrillTest {
@Test
public void testTwoBatches() throws Exception {
- try (OperatorFixture fixture = OperatorFixture.standardFixture()) {
+ try (OperatorFixture fixture = OperatorFixture.standardFixture(dirTestWatcher)) {
BatchSchema schema = SortTestUtilities.nonNullSchema();
SortTestFixture sortTest = new SortTestFixture(fixture);
sortTest.addInput(fixture.rowSetBuilder(schema)
@@ -471,7 +476,7 @@ public class TestSortImpl extends DrillTest {
*/
@Test
public void testModerateBatch() throws Exception {
- try (OperatorFixture fixture = OperatorFixture.standardFixture()) {
+ try (OperatorFixture fixture = OperatorFixture.standardFixture(dirTestWatcher)) {
runJumboBatchTest(fixture, 1000);
}
}
@@ -485,7 +490,7 @@ public class TestSortImpl extends DrillTest {
@Test
public void testLargeBatch() throws Exception {
- try (OperatorFixture fixture = OperatorFixture.standardFixture()) {
+ try (OperatorFixture fixture = OperatorFixture.standardFixture(dirTestWatcher)) {
// partyOnMemory(fixture.allocator());
runJumboBatchTest(fixture, ValueVector.MAX_ROW_COUNT);
}
@@ -566,7 +571,7 @@ public class TestSortImpl extends DrillTest {
@Test
public void testWideRows() throws Exception {
- try (OperatorFixture fixture = OperatorFixture.standardFixture()) {
+ try (OperatorFixture fixture = OperatorFixture.standardFixture(dirTestWatcher)) {
runWideRowsTest(fixture, 1000, ValueVector.MAX_ROW_COUNT);
}
}
@@ -586,7 +591,7 @@ public class TestSortImpl extends DrillTest {
@Test
public void testSpill() throws Exception {
- OperatorFixture.Builder builder = OperatorFixture.builder();
+ OperatorFixture.Builder builder = OperatorFixture.builder(dirTestWatcher);
builder.configBuilder()
.put(ExecConstants.EXTERNAL_SORT_BATCH_LIMIT, 2);
try (OperatorFixture fixture = builder.build()) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java
index 88a7b99..b09b865 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.config.Sort;
import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.test.BaseDirTestWatcher;
import org.apache.drill.test.DrillTest;
import org.apache.drill.test.OperatorFixture;
import org.apache.drill.test.rowSet.RowSet;
@@ -46,7 +47,9 @@ import org.apache.drill.test.rowSet.RowSetUtilities;
import org.joda.time.Period;
import org.junit.AfterClass;
import org.junit.BeforeClass;
+import org.junit.ClassRule;
import org.junit.Ignore;
+import org.junit.Rule;
import org.junit.Test;
import com.google.common.collect.Lists;
@@ -61,9 +64,12 @@ public class TestSorter extends DrillTest {
public static OperatorFixture fixture;
+ @ClassRule
+ public static final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
+
@BeforeClass
public static void setUpBeforeClass() throws Exception {
- fixture = OperatorFixture.builder().build();
+ fixture = OperatorFixture.builder(dirTestWatcher).build();
}
@AfterClass
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
index 16081be..66354ff 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
@@ -42,6 +42,7 @@ import org.apache.drill.exec.server.QueryProfileStoreContext;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.easy.json.JSONRecordReader;
import org.apache.drill.exec.work.batch.IncomingBuffers;
+import org.apache.drill.test.BaseDirTestWatcher;
import org.apache.drill.test.DrillTestWrapper;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -70,6 +71,7 @@ import org.apache.drill.exec.rpc.NamedThreadFactory;
import org.apache.drill.test.OperatorFixture;
import org.junit.After;
import org.junit.Before;
+import org.junit.Rule;
import org.mockito.Mockito;
import java.io.IOException;
@@ -96,6 +98,9 @@ public class PhysicalOpUnitTestBase extends ExecTest {
protected ExecutorService scanExecutor;
protected ExecutorService scanDecodeExecutor;
+ @Rule
+ public final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
+
private final DrillConfig drillConf = DrillConfig.create();
private final ScanResult classpathScan = ClassPathScanner.fromPrescan(drillConf);
private final OperatorCreatorRegistry opCreatorReg = new OperatorCreatorRegistry(classpathScan);
@@ -109,7 +114,7 @@ public class PhysicalOpUnitTestBase extends ExecTest {
Mockito.when(drillbitContext.getScanExecutor()).thenReturn(scanExecutor);
Mockito.when(drillbitContext.getScanDecodeExecutor()).thenReturn(scanDecodeExecutor);
- final OperatorFixture.Builder builder = new OperatorFixture.Builder();
+ final OperatorFixture.Builder builder = new OperatorFixture.Builder(dirTestWatcher);
builder.configBuilder().configProps(drillConf);
operatorFixture = builder
.setScanExecutor(scanExecutor)
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java
index aaeb358..c538ecd 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java
@@ -19,20 +19,30 @@ package org.apache.drill.exec.record;
import static org.junit.Assert.fail;
+import com.google.common.collect.Lists;
import org.apache.drill.categories.VectorTest;
+import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.memory.RootAllocator;
+import org.apache.drill.test.BaseDirTestWatcher;
import org.apache.drill.test.DrillTest;
import org.apache.drill.test.OperatorFixture;
import org.apache.drill.test.rowSet.DirectRowSet;
import org.apache.drill.test.rowSet.RowSet;
import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.apache.drill.test.rowSet.RowSetBuilder;
import org.apache.drill.test.rowSet.RowSetComparison;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
+import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import java.util.List;
+
@Category(VectorTest.class)
public class TestVectorContainer extends DrillTest {
@@ -40,9 +50,12 @@ public class TestVectorContainer extends DrillTest {
// once that is available.
protected static OperatorFixture fixture;
+ @ClassRule
+ public static final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
+
@BeforeClass
public static void setUpBeforeClass() throws Exception {
- fixture = OperatorFixture.standardFixture();
+ fixture = OperatorFixture.standardFixture(dirTestWatcher);
}
@AfterClass
@@ -124,4 +137,30 @@ public class TestVectorContainer extends DrillTest {
leftIndirect.clear();
right.clear();
}
+
+ @Test
+ public void testPrettyPrintRecord() {
+ final MaterializedField colA = MaterializedField.create("colA", Types.required(TypeProtos.MinorType.INT));
+ final MaterializedField colB = MaterializedField.create("colB", Types.required(TypeProtos.MinorType.VARCHAR));
+ final MaterializedField colC = MaterializedField.create("colC", Types.repeated(TypeProtos.MinorType.FLOAT4));
+ final MaterializedField colD = MaterializedField.create("colD", Types.repeated(TypeProtos.MinorType.VARCHAR));
+ final List<MaterializedField> cols = Lists.newArrayList(colA, colB, colC, colD);
+ final BatchSchema batchSchema = new BatchSchema(BatchSchema.SelectionVectorMode.NONE, cols);
+
+ try (RootAllocator allocator = new RootAllocator(10_000_000)) {
+ final RowSet rowSet = new RowSetBuilder(allocator, batchSchema)
+ .addRow(110, "green", new float[]{5.5f, 2.3f}, new String[]{"1a", "1b"})
+ .addRow(1440, "yellow", new float[]{1.0f}, new String[]{"dog"})
+ .build();
+
+ final String expected = "[\"colA\" = 110, \"colB\" = green, \"colC\" = [5.5,2.3], \"colD\" = [\"1a\",\"1b\"]]";
+ final String actual = rowSet.container().prettyPrintRecord(0);
+
+ try {
+ Assert.assertEquals(expected, actual);
+ } finally {
+ rowSet.clear();
+ }
+ }
+ }
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/util/TestQueryMemoryAlloc.java b/exec/java-exec/src/test/java/org/apache/drill/exec/util/TestQueryMemoryAlloc.java
index a62cc41..14a9447 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/util/TestQueryMemoryAlloc.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/util/TestQueryMemoryAlloc.java
@@ -21,8 +21,10 @@ import static org.junit.Assert.assertEquals;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.test.BaseDirTestWatcher;
import org.apache.drill.test.DrillTest;
import org.apache.drill.test.OperatorFixture;
+import org.junit.Rule;
import org.junit.Test;
public class TestQueryMemoryAlloc extends DrillTest {
@@ -30,9 +32,12 @@ public class TestQueryMemoryAlloc extends DrillTest {
public static final long ONE_MB = 1024 * 1024;
public static final long ONE_GB = 1024L * ONE_MB;
+ @Rule
+ public final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
+
@Test
public void testDefaultOptions() throws Exception {
- OperatorFixture.Builder builder = OperatorFixture.builder();
+ OperatorFixture.Builder builder = OperatorFixture.builder(dirTestWatcher);
builder.systemOption(ExecConstants.PERCENT_MEMORY_PER_QUERY_KEY, 0.05);
builder.systemOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY, 2 * ONE_GB);
@@ -60,7 +65,7 @@ public class TestQueryMemoryAlloc extends DrillTest {
@Test
public void testCustomFloor() throws Exception {
- OperatorFixture.Builder builder = OperatorFixture.builder();
+ OperatorFixture.Builder builder = OperatorFixture.builder(dirTestWatcher);
builder.systemOption(ExecConstants.PERCENT_MEMORY_PER_QUERY_KEY, 0.05);
builder.systemOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY, 3 * ONE_GB);
@@ -88,7 +93,7 @@ public class TestQueryMemoryAlloc extends DrillTest {
@Test
public void testCustomPercent() throws Exception {
- OperatorFixture.Builder builder = OperatorFixture.builder();
+ OperatorFixture.Builder builder = OperatorFixture.builder(dirTestWatcher);
builder.systemOption(ExecConstants.PERCENT_MEMORY_PER_QUERY_KEY, 0.10);
builder.systemOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY, 2 * ONE_GB);
@@ -126,7 +131,7 @@ public class TestQueryMemoryAlloc extends DrillTest {
@Test
public void testOpMemory() throws Exception {
- OperatorFixture.Builder builder = OperatorFixture.builder();
+ OperatorFixture.Builder builder = OperatorFixture.builder(dirTestWatcher);
builder.systemOption(ExecConstants.CPU_LOAD_AVERAGE_KEY, 0.7);
builder.systemOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY, 10);
builder.systemOption(ExecConstants.MIN_MEMORY_PER_BUFFERED_OP_KEY, 40 * ONE_MB);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/BaseDirTestWatcher.java b/exec/java-exec/src/test/java/org/apache/drill/test/BaseDirTestWatcher.java
index 21b4a64..ee2244a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/BaseDirTestWatcher.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/BaseDirTestWatcher.java
@@ -66,6 +66,8 @@ public class BaseDirTestWatcher extends DirTestWatcher {
TEST_TMP // Corresponds to the directory that should be mapped to dfs.tmp
}
+ private File codegenDir;
+ private File spillDir;
private File tmpDir;
private File storeDir;
private File dfsTestTmpParentDir;
@@ -91,6 +93,8 @@ public class BaseDirTestWatcher extends DirTestWatcher {
protected void starting(Description description) {
super.starting(description);
+ codegenDir = makeSubDir(Paths.get("codegen"));
+ spillDir = makeSubDir(Paths.get("spill"));
rootDir = makeSubDir(Paths.get("root"));
tmpDir = makeSubDir(Paths.get("tmp"));
storeDir = makeSubDir(Paths.get("store"));
@@ -104,6 +108,8 @@ public class BaseDirTestWatcher extends DirTestWatcher {
*/
public void clear() {
try {
+ FileUtils.cleanDirectory(codegenDir);
+ FileUtils.cleanDirectory(spillDir);
FileUtils.cleanDirectory(rootDir);
FileUtils.cleanDirectory(tmpDir);
FileUtils.cleanDirectory(storeDir);
@@ -146,6 +152,18 @@ public class BaseDirTestWatcher extends DirTestWatcher {
}
/**
+ * Gets the temp directory that should be used to save generated code files.
+ * @return The temp directory that should be used to save generated code files.
+ */
+ public File getCodegenDir() {
+ return codegenDir;
+ }
+
+ public File getSpillDir() {
+ return spillDir;
+ }
+
+ /**
* This methods creates a new directory which can be mapped to <b>dfs.tmp</b>.
*/
public void newDfsTestTmpDir() {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ConfigBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/ConfigBuilder.java
index 02fac0a..4df7af5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ConfigBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ConfigBuilder.java
@@ -130,7 +130,12 @@ public class ConfigBuilder {
configProps = createDefaultProperties();
}
- configProps.put(key, value.toString());
+ if (value instanceof Collection) {
+ configProps.put(key, value);
+ } else {
+ configProps.put(key, value.toString());
+ }
+
return this;
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
index bb63277..8e4e4ff 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
@@ -21,6 +21,14 @@ import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import io.netty.buffer.DrillBuf;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.common.config.DrillConfig;
@@ -28,6 +36,7 @@ import org.apache.drill.common.scanner.ClassPathScanner;
import org.apache.drill.common.scanner.persistence.ScanResult;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.compile.ClassBuilder;
import org.apache.drill.exec.compile.CodeCompiler;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
@@ -113,8 +122,17 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
protected ExecutorService scanExecutor;
protected ExecutorService scanDecoderExecutor;
- public Builder()
+ public Builder(BaseDirTestWatcher dirTestWatcher)
{
+ // Set defaults for tmp dirs correctly
+
+ if (dirTestWatcher != null) {
+ configBuilder.put(ClassBuilder.CODE_DIR_OPTION, dirTestWatcher.getCodegenDir().getAbsolutePath());
+ configBuilder.put(ExecConstants.DRILL_TMP_DIR, dirTestWatcher.getTmpDir().getAbsolutePath());
+ configBuilder.put(ExecConstants.SYS_STORE_PROVIDER_LOCAL_PATH, dirTestWatcher.getStoreDir().getAbsolutePath());
+ configBuilder.put(ExecConstants.SPILL_DIRS, Lists.newArrayList(dirTestWatcher.getSpillDir().getAbsolutePath()));
+ configBuilder.put(ExecConstants.HASHJOIN_SPILL_DIRS, Lists.newArrayList(dirTestWatcher.getSpillDir().getAbsolutePath()));
+ }
}
public ConfigBuilder configBuilder() {
@@ -364,8 +382,8 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
options.close();
}
- public static Builder builder() {
- Builder builder = new Builder();
+ public static Builder builder(BaseDirTestWatcher dirTestWatcher) {
+ Builder builder = new Builder(dirTestWatcher);
builder.configBuilder()
// Required to avoid Dynamic UDF calls for missing or
// ambiguous functions.
@@ -373,8 +391,8 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
return builder;
}
- public static OperatorFixture standardFixture() {
- return builder().build();
+ public static OperatorFixture standardFixture(BaseDirTestWatcher dirTestWatcher) {
+ return builder(dirTestWatcher).build();
}
public RowSetBuilder rowSetBuilder(BatchSchema schema) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/SubOperatorTest.java b/exec/java-exec/src/test/java/org/apache/drill/test/SubOperatorTest.java
index 6bc2afc..c93853f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/SubOperatorTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/SubOperatorTest.java
@@ -19,14 +19,18 @@ package org.apache.drill.test;
import org.junit.AfterClass;
import org.junit.BeforeClass;
+import org.junit.ClassRule;
public class SubOperatorTest extends DrillTest {
protected static OperatorFixture fixture;
+ @ClassRule
+ public static final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
+
@BeforeClass
public static void classSetup() throws Exception {
- fixture = OperatorFixture.standardFixture();
+ fixture = OperatorFixture.standardFixture(dirTestWatcher);
}
@AfterClass
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java
index 92ebdd5..7a027ad 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java
@@ -271,7 +271,7 @@ public class PerformanceTool {
}
public static void main(String args[]) {
- try (OperatorFixture fixture = OperatorFixture.standardFixture();) {
+ try (OperatorFixture fixture = OperatorFixture.standardFixture(null);) {
for (int i = 0; i < 2; i++) {
System.out.println((i==0) ? "Warmup" : "Test run");
new RequiredVectorTester(fixture).runTest();
diff --git a/exec/vector/src/main/codegen/templates/FixedValueVectors.java b/exec/vector/src/main/codegen/templates/FixedValueVectors.java
index 68ee33b..4a98c26 100644
--- a/exec/vector/src/main/codegen/templates/FixedValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/FixedValueVectors.java
@@ -298,6 +298,11 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
return valueCount * ${type.width};
}
+ @Override
+ public int getValueWidth() {
+ return ${type.width};
+ }
+
private class TransferImpl implements TransferPair {
private ${minor.class}Vector to;
diff --git a/exec/vector/src/main/codegen/templates/NullableValueVectors.java b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
index c9c0987..6d386a6 100644
--- a/exec/vector/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
@@ -213,6 +213,13 @@ public final class ${className} extends BaseDataValueVector implements <#if type
return bits.getPayloadByteCount(valueCount) + values.getPayloadByteCount(valueCount);
}
+ <#if type.major != "VarLen">
+ @Override
+ public int getValueWidth(){
+ return bits.getValueWidth() + ${type.width};
+ }
+ </#if>
+
<#if type.major == "VarLen">
@Override
public void allocateNew(int totalBytes, int valueCount) {
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java
index ca2be3a..2473556 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java
@@ -211,6 +211,11 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
data.setZero(0, data.capacity());
}
+ @Override
+ public int getValueWidth() {
+ return VALUE_WIDTH;
+ }
+
public void copyFrom(int inIndex, int outIndex, BitVector from) {
this.mutator.set(outIndex, from.accessor.get(inIndex));
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java
index 09bcdd8..e151e30 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java
@@ -30,4 +30,10 @@ public interface FixedWidthVector extends ValueVector {
* Zero out the underlying buffer backing this vector.
*/
void zeroVector();
+
+ /**
+ * The width of a record in bytes.
+ * @return The width of a record in bytes.
+ */
+ int getValueWidth();
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/UntypedNullVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/UntypedNullVector.java
index e2f9f77..8e3781e 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/UntypedNullVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/UntypedNullVector.java
@@ -91,6 +91,11 @@ public final class UntypedNullVector extends BaseDataValueVector implements Fixe
}
@Override
+ public int getValueWidth() {
+ return VALUE_WIDTH;
+ }
+
+ @Override
public void load(SerializedField metadata, DrillBuf buffer) {
Preconditions.checkArgument(this.field.getName().equals(metadata.getNamePart().getName()),
"The field %s doesn't match the provided metadata %s.", this.field, metadata);
--
To stop receiving notification emails like this one, please contact
boaz@apache.org.